一、基本介绍
RabbitMQ是一个实现了AMQP协议(是一个异步消息传递所使用的应用层协议规范)标准的开源消息代理和队列服务器,它是企业级消息系统,自带了集群、管理、插件等特性。学习RabbitMQ需要理解下面几个概念:
- 消息(Message):
-
生产者(Producer):创建消息并且设置标签。
-
消费者(Consumer): 消费者连接到代理服务器上,接收消息的有效载荷(消费者并不需要消息中的标签)
二、工作流程
AMQP工作流程图如下所示: 
交换机就像邮局,通过它做路由分发,交换机将收到的消息根据路由分发规则分发给绑定的队列。
队列将消息投递给了订阅此队列的消费者或者消费者主动获取
为了保证消息被正确取出并执行,消息投递失败后会重发,AMQP包含了一个消息确认的概念:当一个消息从队列中投递给消费者后,消费者会通知消息代理(Broker),这个通知可以是自动完成的,也可以由处理消息的应用来执行。当消息确认被启用的时候,消息代理不会完全将消息从队列中删除,除非收到来自消费者的确认回执。
交换机拿到一个消息之后会将它路由给消息队列。它使用哪种路由算法是由交换机类型和被称作绑定(queue_bind)的规则所决定的,目前RabbitMQ提供了如下四种交换机。
- 直连交换机(direct exchange):根据消息携带的路由键(routing key)将消息投递给对应队列。将一个队列绑定到某个交换机的同时赋予该绑定一个路由键,当一个携带着路由键为XXX的消息发送给直连交换机的时,交换机会把它路由给绑定值同样为XXX的队列。直连交换机用来处理消息的单播路由。
- 主题交换机(topic exchange): 通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机通常用来实现消息的多播路由。发送到主题交换机的消息的路由键,必须是一个“.”分隔的词语列表,这些词语应该和对应的业务相关联,词语的个数可以随意,但是不要超过255字节。绑定键支持通配符:“*“用来表示一个单词,”#“用来表示任意数量单词。
- 扇形交换机(fanout exchange):将消息路由给绑定到它身上的所有队列,且不理会绑定的路由键。扇形交换机用来处理消息的广播路由。扇形交换机允许对单条消息做不同的处理,另外可以把一个消息发给多个任务队列,执行不一样的工作。使用扇形交换机可以有效的解耦生产者和消费者
- 头交换机(headers exchange): 允许匹配AMQP的头而非路由键,使用起来和直连交换机差不多,但是性能却差很多,一般很少用到。
三、安装启动
- Ubuntu下安装
sudo apt-get install rabbitmq-server sudo service rabbitmq-server start
- Centos下安装
yum -y install rabbitmq-server systemctl start rabbitmq-server
四、代码演示
由于采用python来演示流程,所以需要先安装RabbitMQ的python客户端,最常用的客户端是pika,代码支持Python 2.X和3.X
pip install pika
生产者producer.py
#!/usr/bin/env python # -*-coding:utf-8 -*- import sys import pika parameters = pika.URLParameters('amqp://guest:guest@localhost:5672') connection = pika.BlockingConnection(parameters) # connection 就是消息代理 channel = connection.channel() # 获得信道 # 声明交换机,指定交换类型为直接交换,最后两个参数表示想要持久化的交换机,其中durable=True表示RabbitMQ在崩溃重启之后会重建队列和交换机 channel.exchange_declare(exchange='web_develop', exchange_type='direct', passive=False, durable=True, auto_delete=False) if len(sys.argv) != 1: msg = sys.argv[1] # 使用命令行参数为消息内容 else: msg = 'hello world' # 创建一个消息,delivery_mode为2表示让这个消息持久化,重启RabbitMQ也不会丢失 props = pika.BasicProperties(content_type='text/plain', delivery_mode=2) # basic_publish 表示发送路由键为xxx_routing_key,消息为hello world 的消息给web_develop 这个交换机 channel.basic_publish('web_develop', 'xxx_routing_key', msg, properties=props) connection.close() # 关闭连接
消费者consumer.py
#!/usr/bin/env python # -*-coding:utf-8 -*- import pika # 处理接收到的消息的回调函数,method_frame 携带了投递标记,header_frame表示AMQP信息头的对象,body为消息实体 def on_message(channel, method_frame, header_frame, body): # 消息确认,确认之后才会删除消息并给消费者发送新的消息 channel.basic_ack(delivery_tag=method_frame.delivery_tag) print(body) parameters = pika.URLParameters('amqp://guest:guest@localhost:5672') connection = pika.BlockingConnection(parameters) # connection 就是消息代理 channel = connection.channel() # 获得信道 # 声明交换机,指定交换类型为直接交换,最后两个参数表示想要持久化的交换机,其中durable=True表示RabbitMQ在崩溃重启之后会重建队列和交换机 channel.exchange_declare(exchange='web_develop', exchange_type='direct', passive=False, durable=True, auto_delete=False) # 声明队列,如果没有就创建 channel.queue_declare(queue='standard', auto_delete=True) # 通过路由键将队列和交换机绑定 channel.queue_bind(queue='standard', exchange='web_develop', routing_key='xxx_routing_key') # 订阅队列 channel.basic_consume(on_message, 'standard') try: # 开始消费 channel.start_consuming() except KeyboardInterrupt: # 停止消费 channel.stop_consuming() # 关闭连接 connection.close()
先启动消费者
python consumer.py

另开一个终端后,启动生产者
python producer.py

可以对生产者的例子进行改动,让它支持消息确认。支持的原理是确保basic_publish的返回值为True。
生产者producer_with_confirm.py
#!/usr/bin/env python # -*-coding:utf-8 -*- import sys import pika parameters = pika.URLParameters('amqp://guest:guest@localhost:5672') connection = pika.BlockingConnection(parameters) # connection 就是消息代理 channel = connection.channel() # 获得信道 # 声明交换机,指定交换类型为直接交换,最后两个参数表示想要持久化的交换机,其中durable=True表示RabbitMQ在崩溃重启之后会重建队列和交换机 channel.exchange_declare(exchange='web_develop', exchange_type='direct', passive=False, durable=True, auto_delete=False) if len(sys.argv) != 1: msg = sys.argv[1] # 使用命令行参数为消息内容 else: msg = 'hello world' # 创建一个消息,delivery_mode为2表示让这个消息持久化,重启RabbitMQ也不会丢失 props = pika.BasicProperties(content_type='text/plain', delivery_mode=2) if channel.basic_publish('web_develop', 'xxx_routing_key', msg, properties=props): print('Message publish was confirmed!') else: print('Message could not be confirmed!') connection.close() # 关闭连接
重新执行新的生产者代码发现,当消费者收到消息后,生产者这边会打印出Message publish was confirmed!。