一、基本介绍
 RabbitMQ是一个实现了AMQP协议(是一个异步消息传递所使用的应用层协议规范)标准的开源消息代理和队列服务器,它是企业级消息系统,自带了集群、管理、插件等特性。学习RabbitMQ需要理解下面几个概念:
  - 消息(Message):
 -  生产者(Producer):创建消息并且设置标签。 
-  消费者(Consumer): 消费者连接到代理服务器上,接收消息的有效载荷(消费者并不需要消息中的标签) 
二、工作流程
 AMQP工作流程图如下所示: 
  交换机就像邮局,通过它做路由分发,交换机将收到的消息根据路由分发规则分发给绑定的队列。
 
  队列将消息投递给了订阅此队列的消费者或者消费者主动获取
 
 为了保证消息被正确取出并执行,消息投递失败后会重发,AMQP包含了一个消息确认的概念:当一个消息从队列中投递给消费者后,消费者会通知消息代理(Broker),这个通知可以是自动完成的,也可以由处理消息的应用来执行。当消息确认被启用的时候,消息代理不会完全将消息从队列中删除,除非收到来自消费者的确认回执。
 交换机拿到一个消息之后会将它路由给消息队列。它使用哪种路由算法是由交换机类型和被称作绑定(queue_bind)的规则所决定的,目前RabbitMQ提供了如下四种交换机。
  - 直连交换机(direct exchange):根据消息携带的路由键(routing key)将消息投递给对应队列。将一个队列绑定到某个交换机的同时赋予该绑定一个路由键,当一个携带着路由键为XXX的消息发送给直连交换机的时,交换机会把它路由给绑定值同样为XXX的队列。直连交换机用来处理消息的单播路由。
- 主题交换机(topic exchange): 通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机通常用来实现消息的多播路由。发送到主题交换机的消息的路由键,必须是一个“.”分隔的词语列表,这些词语应该和对应的业务相关联,词语的个数可以随意,但是不要超过255字节。绑定键支持通配符:“*“用来表示一个单词,”#“用来表示任意数量单词。
- 扇形交换机(fanout exchange):将消息路由给绑定到它身上的所有队列,且不理会绑定的路由键。扇形交换机用来处理消息的广播路由。扇形交换机允许对单条消息做不同的处理,另外可以把一个消息发给多个任务队列,执行不一样的工作。使用扇形交换机可以有效的解耦生产者和消费者
- 头交换机(headers exchange): 允许匹配AMQP的头而非路由键,使用起来和直连交换机差不多,但是性能却差很多,一般很少用到。
三、安装启动
  - Ubuntu下安装
sudo apt-get install rabbitmq-server sudo service rabbitmq-server start 
  - Centos下安装
yum -y install rabbitmq-server systemctl start rabbitmq-server 
 四、代码演示
 由于采用python来演示流程,所以需要先安装RabbitMQ的python客户端,最常用的客户端是pika,代码支持Python 2.X和3.X
 pip install pika 
 生产者producer.py
 #!/usr/bin/env python # -*-coding:utf-8 -*- import sys import pika  parameters = pika.URLParameters('amqp://guest:guest@localhost:5672') connection = pika.BlockingConnection(parameters) # connection 就是消息代理 channel = connection.channel() # 获得信道  # 声明交换机,指定交换类型为直接交换,最后两个参数表示想要持久化的交换机,其中durable=True表示RabbitMQ在崩溃重启之后会重建队列和交换机 channel.exchange_declare(exchange='web_develop', exchange_type='direct',                          passive=False, durable=True, auto_delete=False)  if len(sys.argv) != 1:     msg = sys.argv[1]  # 使用命令行参数为消息内容 else:     msg = 'hello world'  # 创建一个消息,delivery_mode为2表示让这个消息持久化,重启RabbitMQ也不会丢失 props = pika.BasicProperties(content_type='text/plain', delivery_mode=2)  # basic_publish 表示发送路由键为xxx_routing_key,消息为hello world 的消息给web_develop 这个交换机 channel.basic_publish('web_develop', 'xxx_routing_key', msg, properties=props)  connection.close()  # 关闭连接  
 消费者consumer.py
 #!/usr/bin/env python # -*-coding:utf-8 -*- import pika  # 处理接收到的消息的回调函数,method_frame 携带了投递标记,header_frame表示AMQP信息头的对象,body为消息实体  def on_message(channel, method_frame, header_frame, body):     # 消息确认,确认之后才会删除消息并给消费者发送新的消息     channel.basic_ack(delivery_tag=method_frame.delivery_tag)     print(body)  parameters = pika.URLParameters('amqp://guest:guest@localhost:5672') connection = pika.BlockingConnection(parameters)  # connection 就是消息代理 channel = connection.channel() # 获得信道  # 声明交换机,指定交换类型为直接交换,最后两个参数表示想要持久化的交换机,其中durable=True表示RabbitMQ在崩溃重启之后会重建队列和交换机 channel.exchange_declare(exchange='web_develop', exchange_type='direct',                          passive=False, durable=True, auto_delete=False)  # 声明队列,如果没有就创建 channel.queue_declare(queue='standard', auto_delete=True)  # 通过路由键将队列和交换机绑定 channel.queue_bind(queue='standard', exchange='web_develop', routing_key='xxx_routing_key')  # 订阅队列 channel.basic_consume(on_message, 'standard')  try:     # 开始消费     channel.start_consuming() except KeyboardInterrupt:     # 停止消费     channel.stop_consuming()  # 关闭连接 connection.close() 
 先启动消费者
 python consumer.py 
 
 另开一个终端后,启动生产者
 python producer.py 
 
 可以对生产者的例子进行改动,让它支持消息确认。支持的原理是确保basic_publish的返回值为True。
 生产者producer_with_confirm.py
 #!/usr/bin/env python # -*-coding:utf-8 -*- import sys import pika  parameters = pika.URLParameters('amqp://guest:guest@localhost:5672') connection = pika.BlockingConnection(parameters) # connection 就是消息代理 channel = connection.channel() # 获得信道  # 声明交换机,指定交换类型为直接交换,最后两个参数表示想要持久化的交换机,其中durable=True表示RabbitMQ在崩溃重启之后会重建队列和交换机 channel.exchange_declare(exchange='web_develop', exchange_type='direct',                          passive=False, durable=True, auto_delete=False)  if len(sys.argv) != 1:     msg = sys.argv[1]  # 使用命令行参数为消息内容 else:     msg = 'hello world'  # 创建一个消息,delivery_mode为2表示让这个消息持久化,重启RabbitMQ也不会丢失 props = pika.BasicProperties(content_type='text/plain', delivery_mode=2)  if channel.basic_publish('web_develop', 'xxx_routing_key', msg, properties=props):     print('Message publish was confirmed!') else:     print('Message could not be confirmed!')  connection.close()  # 关闭连接  
 重新执行新的生产者代码发现,当消费者收到消息后,生产者这边会打印出Message publish was confirmed!。