RabbitMQ系列一:初识RabbitMQ


声明:本文转载自https://my.oschina.net/zhangyangyang/blog/1574086,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

一、基本介绍

RabbitMQ是一个实现了AMQP协议(是一个异步消息传递所使用的应用层协议规范)标准的开源消息代理和队列服务器,它是企业级消息系统,自带了集群、管理、插件等特性。学习RabbitMQ需要理解下面几个概念:

  1. 消息(Message):
  • 有效载荷(Payload),也就是要传输的数据,数据类型可以是纯文本也可以是json 
  • 标签(Label),它包含交换机的名字和可选的主题(topic)标记等,AMQP仅仅描述了标签,而RabbitMQ决定了把这个消息发给哪个消费者。 
  1. 生产者(Producer):创建消息并且设置标签。

  2. 消费者(Consumer): 消费者连接到代理服务器上,接收消息的有效载荷(消费者并不需要消息中的标签)

二、工作流程

AMQP工作流程图如下所示: 输入图片说明

交换机就像邮局,通过它做路由分发,交换机将收到的消息根据路由分发规则分发给绑定的队列。

队列将消息投递给了订阅此队列的消费者或者消费者主动获取

为了保证消息被正确取出并执行,消息投递失败后会重发,AMQP包含了一个消息确认的概念:当一个消息从队列中投递给消费者后,消费者会通知消息代理(Broker),这个通知可以是自动完成的,也可以由处理消息的应用来执行。当消息确认被启用的时候,消息代理不会完全将消息从队列中删除,除非收到来自消费者的确认回执。

交换机拿到一个消息之后会将它路由给消息队列。它使用哪种路由算法是由交换机类型和被称作绑定(queue_bind)的规则所决定的,目前RabbitMQ提供了如下四种交换机。

  1. 直连交换机(direct exchange):根据消息携带的路由键(routing key)将消息投递给对应队列。将一个队列绑定到某个交换机的同时赋予该绑定一个路由键,当一个携带着路由键为XXX的消息发送给直连交换机的时,交换机会把它路由给绑定值同样为XXX的队列。直连交换机用来处理消息的单播路由。
  2. 主题交换机(topic exchange): 通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机通常用来实现消息的多播路由。发送到主题交换机的消息的路由键,必须是一个“.”分隔的词语列表,这些词语应该和对应的业务相关联,词语的个数可以随意,但是不要超过255字节。绑定键支持通配符:“*“用来表示一个单词,”#“用来表示任意数量单词。
  3. 扇形交换机(fanout exchange):将消息路由给绑定到它身上的所有队列,且不理会绑定的路由键。扇形交换机用来处理消息的广播路由。扇形交换机允许对单条消息做不同的处理,另外可以把一个消息发给多个任务队列,执行不一样的工作。使用扇形交换机可以有效的解耦生产者和消费者
  4. 头交换机(headers exchange): 允许匹配AMQP的头而非路由键,使用起来和直连交换机差不多,但是性能却差很多,一般很少用到。

三、安装启动

  1. Ubuntu下安装
sudo apt-get install rabbitmq-server sudo service rabbitmq-server start 
  1. Centos下安装
yum -y install rabbitmq-server systemctl start rabbitmq-server 

四、代码演示

由于采用python来演示流程,所以需要先安装RabbitMQ的python客户端,最常用的客户端是pika,代码支持Python 2.X和3.X

pip install pika 

生产者producer.py

#!/usr/bin/env python # -*-coding:utf-8 -*- import sys import pika  parameters = pika.URLParameters('amqp://guest:guest@localhost:5672') connection = pika.BlockingConnection(parameters) # connection 就是消息代理 channel = connection.channel() # 获得信道  # 声明交换机,指定交换类型为直接交换,最后两个参数表示想要持久化的交换机,其中durable=True表示RabbitMQ在崩溃重启之后会重建队列和交换机 channel.exchange_declare(exchange='web_develop', exchange_type='direct',                          passive=False, durable=True, auto_delete=False)  if len(sys.argv) != 1:     msg = sys.argv[1]  # 使用命令行参数为消息内容 else:     msg = 'hello world'  # 创建一个消息,delivery_mode为2表示让这个消息持久化,重启RabbitMQ也不会丢失 props = pika.BasicProperties(content_type='text/plain', delivery_mode=2)  # basic_publish 表示发送路由键为xxx_routing_key,消息为hello world 的消息给web_develop 这个交换机 channel.basic_publish('web_develop', 'xxx_routing_key', msg, properties=props)  connection.close()  # 关闭连接  

消费者consumer.py

#!/usr/bin/env python # -*-coding:utf-8 -*- import pika  # 处理接收到的消息的回调函数,method_frame 携带了投递标记,header_frame表示AMQP信息头的对象,body为消息实体  def on_message(channel, method_frame, header_frame, body):     # 消息确认,确认之后才会删除消息并给消费者发送新的消息     channel.basic_ack(delivery_tag=method_frame.delivery_tag)     print(body)  parameters = pika.URLParameters('amqp://guest:guest@localhost:5672') connection = pika.BlockingConnection(parameters)  # connection 就是消息代理 channel = connection.channel() # 获得信道  # 声明交换机,指定交换类型为直接交换,最后两个参数表示想要持久化的交换机,其中durable=True表示RabbitMQ在崩溃重启之后会重建队列和交换机 channel.exchange_declare(exchange='web_develop', exchange_type='direct',                          passive=False, durable=True, auto_delete=False)  # 声明队列,如果没有就创建 channel.queue_declare(queue='standard', auto_delete=True)  # 通过路由键将队列和交换机绑定 channel.queue_bind(queue='standard', exchange='web_develop', routing_key='xxx_routing_key')  # 订阅队列 channel.basic_consume(on_message, 'standard')  try:     # 开始消费     channel.start_consuming() except KeyboardInterrupt:     # 停止消费     channel.stop_consuming()  # 关闭连接 connection.close() 

先启动消费者

python consumer.py 

输入图片说明

另开一个终端后,启动生产者

python producer.py 

输入图片说明

可以对生产者的例子进行改动,让它支持消息确认。支持的原理是确保basic_publish的返回值为True。

生产者producer_with_confirm.py

#!/usr/bin/env python # -*-coding:utf-8 -*- import sys import pika  parameters = pika.URLParameters('amqp://guest:guest@localhost:5672') connection = pika.BlockingConnection(parameters) # connection 就是消息代理 channel = connection.channel() # 获得信道  # 声明交换机,指定交换类型为直接交换,最后两个参数表示想要持久化的交换机,其中durable=True表示RabbitMQ在崩溃重启之后会重建队列和交换机 channel.exchange_declare(exchange='web_develop', exchange_type='direct',                          passive=False, durable=True, auto_delete=False)  if len(sys.argv) != 1:     msg = sys.argv[1]  # 使用命令行参数为消息内容 else:     msg = 'hello world'  # 创建一个消息,delivery_mode为2表示让这个消息持久化,重启RabbitMQ也不会丢失 props = pika.BasicProperties(content_type='text/plain', delivery_mode=2)  if channel.basic_publish('web_develop', 'xxx_routing_key', msg, properties=props):     print('Message publish was confirmed!') else:     print('Message could not be confirmed!')  connection.close()  # 关闭连接  

重新执行新的生产者代码发现,当消费者收到消息后,生产者这边会打印出Message publish was confirmed!。

本文发表于2017年11月15日 18:33
(c)注:本文转载自https://my.oschina.net/zhangyangyang/blog/1574086,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1953 讨论 0 喜欢 1

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1