RabbitMQ 快速入门


声明:本文转载自https://my.oschina.net/u/3748347/blog/1619586,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

前面我们介绍了RabbitMQ的基本概念,RabbitMQ基础概念详细介绍。在这里我们做一个简单的列子进行快速入门。

新建Spring Boot项目 引入依赖包

<parent> 	<groupId>org.springframework.boot</groupId> 	<artifactId>spring-boot-starter-parent</artifactId> 	<version>1.5.3.RELEASE</version> 	<relativePath /> <!-- lookup parent from repository --> </parent>  <dependency> 	<groupId>org.springframework.boot</groupId> 	<artifactId>spring-boot-starter-amqp</artifactId> </dependency> 

在启动类上添加启动MQ的注解

@EnableRabbit 

添加配置

# Rabbitmq spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=test spring.rabbitmq.addresses=192.168.35.128:5672 #spring.rabbitmq.addresses=192.168.35.128:5672,192.168.35.129:5672,192.168.35.130:5672 spring.rabbitmq.connection-timeout=50000 #rabbitmq listetner # 消费者最小数量 spring.rabbitmq.listener.concurrency=10 # 消费者最大数量 spring.rabbitmq.listener.max-concurrency=20 # 消息的确认模式 spring.rabbitmq.listener.acknowledge-mode=MANUAL # 每一次发送到消费者的消息数量,它应该大于或等于事务大小(如果使用)。 spring.rabbitmq.listener.prefetch=10 # 消费者端的重试 spring.rabbitmq.listener.retry.enabled=true #rabbitmq publisher # 生产者端的重试 spring.rabbitmq.template.retry.enabled=true #开启发送消息到exchange确认机制 spring.rabbitmq.publisher-confirms=true #开启发送消息到exchange但是exchange没有和队列绑定的确认机制 spring.rabbitmq.publisher-returns=true 

RabbitMQ 所有配置参考:

# RABBIT (RabbitProperties) spring.rabbitmq.addresses= # Comma-separated list of addresses to which the client should connect. spring.rabbitmq.cache.channel.checkout-timeout= # Number of milliseconds to wait to obtain a channel if the cache size has been reached. spring.rabbitmq.cache.channel.size= # Number of channels to retain in the cache. spring.rabbitmq.cache.connection.mode=CHANNEL # Connection factory cache mode. spring.rabbitmq.cache.connection.size= # Number of connections to cache. spring.rabbitmq.connection-timeout= # Connection timeout, in milliseconds; zero for infinite. spring.rabbitmq.dynamic=true # Create an AmqpAdmin bean. spring.rabbitmq.host=localhost # RabbitMQ host. spring.rabbitmq.listener.acknowledge-mode= # Acknowledge mode of container. spring.rabbitmq.listener.auto-startup=true # Start the container automatically on startup. spring.rabbitmq.listener.concurrency= # Minimum number of consumers. spring.rabbitmq.listener.default-requeue-rejected= # Whether or not to requeue delivery failures; default `true`. spring.rabbitmq.listener.max-concurrency= # Maximum number of consumers. spring.rabbitmq.listener.prefetch= # Number of messages to be handled in a single request. It should be greater than or equal to the transaction size (if used). spring.rabbitmq.listener.retry.enabled=false # Whether or not publishing retries are enabled. spring.rabbitmq.listener.retry.initial-interval=1000 # Interval between the first and second attempt to deliver a message. spring.rabbitmq.listener.retry.max-attempts=3 # Maximum number of attempts to deliver a message. spring.rabbitmq.listener.retry.max-interval=10000 # Maximum interval between attempts. spring.rabbitmq.listener.retry.multiplier=1.0 # A multiplier to apply to the previous delivery retry interval. spring.rabbitmq.listener.retry.stateless=true # Whether or not retry is stateless or stateful. spring.rabbitmq.listener.transaction-size= # Number of messages to be processed in a transaction. For best results it should be less than or equal to the prefetch count. spring.rabbitmq.password= # Login to authenticate against the broker. spring.rabbitmq.port=5672 # RabbitMQ port. spring.rabbitmq.publisher-confirms=false # Enable publisher confirms. spring.rabbitmq.publisher-returns=false # Enable publisher returns. spring.rabbitmq.requested-heartbeat= # Requested heartbeat timeout, in seconds; zero for none. spring.rabbitmq.ssl.enabled=false # Enable SSL support. spring.rabbitmq.ssl.key-store= # Path to the key store that holds the SSL certificate. spring.rabbitmq.ssl.key-store-password= # Password used to access the key store. spring.rabbitmq.ssl.trust-store= # Trust store that holds SSL certificates. spring.rabbitmq.ssl.trust-store-password= # Password used to access the trust store. spring.rabbitmq.ssl.algorithm= # SSL algorithm to use. By default configure by the rabbit client library. spring.rabbitmq.template.mandatory=false # Enable mandatory messages. spring.rabbitmq.template.receive-timeout=0 # Timeout for `receive()` methods. spring.rabbitmq.template.reply-timeout=5000 # Timeout for `sendAndReceive()` methods. spring.rabbitmq.template.retry.enabled=false # Set to true to enable retries in the `RabbitTemplate`. spring.rabbitmq.template.retry.initial-interval=1000 # Interval between the first and second attempt to publish a message. spring.rabbitmq.template.retry.max-attempts=3 # Maximum number of attempts to publish a message. spring.rabbitmq.template.retry.max-interval=10000 # Maximum number of attempts to publish a message. spring.rabbitmq.template.retry.multiplier=1.0 # A multiplier to apply to the previous publishing retry interval. spring.rabbitmq.username= # Login user to authenticate to the broker. spring.rabbitmq.virtual-host= # Virtual host to use when connecting to the broker. 

声明队列

@Configuration @ConditionalOnBean({RabbitTemplate.class}) public class RabbitConfig {      /**      * 方法rabbitAdmin的功能描述:动态声明queue、exchange、routing      *      * @param connectionFactory      * @return      * @author : yuhao.wang      */     @Bean     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);         // 发放奖励队列交换机         DirectExchange exchange = new DirectExchange(RabbitConstants.MQ_EXCHANGE_SEND_AWARD);          //声明发送优惠券的消息队列(Direct类型的exchange)         Queue couponQueue = queue(RabbitConstants.QUEUE_NAME_SEND_COUPON);         rabbitAdmin.declareQueue(couponQueue);         rabbitAdmin.declareExchange(exchange);         rabbitAdmin.declareBinding(BindingBuilder.bind(couponQueue).to(exchange).with(RabbitConstants.MQ_ROUTING_KEY_SEND_COUPON));          return rabbitAdmin;     }      public Queue queue(String name) {         // 是否持久化         boolean durable = true;         // 仅创建者可以使用的私有队列,断开后自动删除         boolean exclusive = false;         // 当所有消费客户端连接断开后,是否自动删除队列         boolean autoDelete = false;         return new Queue(name, durable, exclusive, autoDelete, args);     } } 

在这里我们申明了一个RabbitConstants.QUEUE_NAME_SEND_COUPON队列,并声明了一个DirectExchange 类型的交换器,通过Bind将队列、交换机和路由RabbitConstants.MQ_ROUTING_KEY_SEND_COUPON的关系进行绑定。

消息的生产者

/**  * Rabbit 发送消息  *  * @author yuhao.wang  */ @Service public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback, InitializingBean {     private final Logger logger = LoggerFactory.getLogger(RabbitSender.class);      /**      * Rabbit MQ 客户端      */     @Autowired     private RabbitTemplate rabbitTemplate;      /**      * 系统配置      */     @Autowired     private SystemConfig systemConfig;      /**      * 发送MQ消息      *      * @param exchangeName 交换机名称      * @param routingKey   路由名称      * @param message      发送消息体      */     public void sendMessage(String exchangeName, String routingKey, Object message) {          // 获取CorrelationData对象         CorrelationData correlationData = this.correlationData(message);         rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationData);     }      /**      * 用于实现消息发送到RabbitMQ交换器后接收ack回调。      * 如果消息发送确认失败就进行重试。      *      * @param correlationData      * @param ack      * @param cause      */     @Override     public void confirm(org.springframework.amqp.rabbit.support.CorrelationData correlationData, boolean ack, String cause) {         // 消息回调确认失败处理         if (!ack) {             // 这里以做消息的从发等处理             logger.info("消息发送失败,消息ID:{}", correlationData.getId());         } else {             logger.info("消息发送成功,消息ID:{}", correlationData.getId());         }     }      /**      * 用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。      * 基本上来说线程不可能出现这种情况,除非手动将已经存在的队列删掉,否则在测试阶段肯定能测试出来。      */     @Override     public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {         logger.error("MQ消息发送失败,replyCode:{}, replyText:{},exchange:{},routingKey:{},消息体:{}",                 replyCode, replyText, exchange, routingKey, JSON.toJSONString(message.getBody()));          // TODO 保存消息到数据库     }      /**      * 消息相关数据(消息ID)      *      * @param message      * @return      */     private CorrelationData correlationData(Object message) {          return new CorrelationData(UUID.randomUUID().toString(), message);     }      @Override     public void afterPropertiesSet() throws Exception {         rabbitTemplate.setConfirmCallback(this);         rabbitTemplate.setReturnCallback(this);     } } 

ConfirmCallback和ReturnCallback

在这个里我们主要实现了ConfirmCallback和ReturnCallback两个接口。这两个接口主要是用来发送消息后回调的。因为rabbit发送消息是只管发,至于发没发成功,发送方法不管。

  • ConfirmCallback:当消息成功到达exchange的时候触发的ack回调。
  • ReturnCallback:当消息成功到达exchange,但是没有队列与之绑定的时候触发的ack回调。基本上来说线上不可能出现这种情况,除非手动将已经存在的队列删掉,否则在测试阶段肯定能测试出来。

消息的发送方式

  • rabbitTemplate.send(message); //发消息,参数类型为org.springframework.amqp.core.Message
  • rabbitTemplate.convertAndSend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.Message后发送,这个是异步的。消息是否发送成功需要用到ConfirmCallback和ReturnCallback回调函数类确认。
  • rabbitTemplate.convertSendAndReceive(message) //转换并发送消息,且等待消息者返回响应消息。这是一个RPC方法,当发送消息过后,该方法会一直阻塞在哪里等待返回结果,直到请求超时。可以通过配置spring.rabbitmq.template.reply-timeout来配置超时时间。

消息的消费者

 /**  * 发放优惠券的MQ处理  *  * @author yuhao.wang  */ public class SendMessageListener {      private final Logger logger = LoggerFactory.getLogger(SendMessageListener.class);      @RabbitListener(queues = RabbitConstants.QUEUE_NAME_SEND_COUPON)     public void process(SendMessage sendMessage, Channel channel, Message message) throws Exception {         try {             // 参数校验             Assert.notNull(sendMessage, "sendMessage 消息体不能为NULL");               logger.info("处理MQ消息");         } catch (Exception e) {             logger.error("MQ消息处理异常,消息ID:{},消息体:{}", message.getMessageProperties().getCorrelationIdString(),                     JSON.toJSONString(sendMessage), e);         } finally {             // 确认消息已经消费成功             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);         }     } } 

使用 @RabbitListener注解,并在注解上指定你要监听的队列名称,这样子消费者就声明好了。这里有两点要注意一下:

  • 监听消息的参数(如上面的sendMessage参数)一定要和发送的时候相匹配,也可以不使用sendMessage参数,直接在Message参数里面获取消息体。
  • 如果有返回信息,直接return就好(rabbitTemplate.convertSendAndReceive()方法就会有返回值)。

源码

https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

spring-boot-student-rabbitmq 工程

参考:

本文发表于2018年02月07日 14:31
(c)注:本文转载自https://my.oschina.net/u/3748347/blog/1619586,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1789 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1