Springboot集成——阿里RocketMQ使用心得


声明:本文转载自https://my.oschina.net/u/3670641/blog/1560267,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

一、阿里云官网---帮助文档

https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh

按照官网步骤,创建Topic、申请发布(生产者)、申请订阅(消费者)

二、代码

1、配置:

public class MqConfig {     /**      * 启动测试之前请替换如下 XXX 为您的配置      */     public static final String PUBLIC_TOPIC = "test";//公网测试     public static final String PUBLIC_PRODUCER_ID = "PID_SCHEDULER";     public static final String PUBLIC_CONSUMER_ID = "CID_SERVICE";      public static final String ACCESS_KEY = "123";     public static final String SECRET_KEY = "123";     public static final String TAG = "";     public static final String THREAD_NUM = "25";//消费端线程数     /**      * ONSADDR 请根据不同Region进行配置      * 公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet      * 公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal      * 杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal      * 深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal      */     public static final String ONSADDR = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"; }

阿里云用 公有云生成,测试用公网

2、生产者

配置文件:producer.xml

<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd"> <beans>     <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean"           init-method="start" destroy-method="shutdown">         <property name="properties">             <map>                 <entry key="ProducerId" value="" /> <!-- PID,请替换 -->                 <entry key="AccessKey" value="" /> <!-- ACCESS_KEY,请替换 -->                 <entry key="SecretKey" value="" /> <!-- SECRET_KEY,请替换 -->                 <!--PropertyKeyConst.ONSAddr 请根据不同Region进行配置                  公网测试: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet                  公有云生产: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal                  杭州金融云: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal                  深圳金融云: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal -->                 <entry key="ONSAddr" value="http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/>             </map>         </property>     </bean> </beans>

启动方式,在使用类的全局里设置:

//初始化生产者     private ApplicationContext ctx;     private ProducerBean producer;      @Value("${producerConfig.enabled}")//开关,spring配置项,true为开启,false关闭     private boolean producerConfigEnabled;      @PostConstruct     public void init(){         if (true == producerConfigEnabled) {             ctx = new ClassPathXmlApplicationContext("producer.xml");             producer = (ProducerBean) ctx.getBean("producer");         }     }

发送代码:

 try {       String jsonC = JsonUtils.toJson(elevenMessage);       Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes());       SendResult sendResult = producer.send(message);       if (sendResult != null) {           logger.info(".Send mq message success!”;        } else {           logger.warn(".sendResult is null.........");       }       } catch (Exception e) {            logger.warn("DoubleElevenAllPreService");            Thread.sleep(1000);//如果有异常,休眠1秒       }

发送消息的代码一点要捕获异常,不然会重复发送。

这里的TOPIC用自己创建的,elevenMessage是自己要发送的内容

3、消费者

配置启动类:

@Configuration @ConditionalOnProperty(value = "consumerConfig.enabled", havingValue = "true", matchIfMissing = true) public class ConsumerConfig {      private Logger logger = LoggerFactory.getLogger(LoggerAppenderType.smsdist.name());      @Bean     public Consumer consumerFactory(){         Properties consumerProperties = new Properties();         consumerProperties.setProperty(PropertyKeyConst.ConsumerId, MqConfig.CONSUMER_ID);         consumerProperties.setProperty(PropertyKeyConst.AccessKey, MqConfig.ACCESS_KEY);         consumerProperties.setProperty(PropertyKeyConst.SecretKey, MqConfig.SECRET_KEY);         //consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums,MqConfig.THREAD_NUM);         consumerProperties.setProperty(PropertyKeyConst.ONSAddr, MqConfig.ONSADDR);         Consumer consumer = ONSFactory.createConsumer(consumerProperties);         consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new DoubleElevenMessageListener());         consumer.start();         logger.info("ConsumerConfig start success.");                   return consumer;      } }

CID和ONSADDR一点要选对,用自己的,消费者线程数等可以在这里配置

创建监听器类,消费消息:

@Component public class MessageListener implements MessageListener {     private Logger logger = LoggerFactory.getLogger("remind");      protected static ElevenReposity elevenReposity;     @Resource     public void setElevenReposity(ElevenReposity elevenReposity){         MessageListener .elevenReposity=elevenReposity;     }       @Override     public Action consume(Message message, ConsumeContext consumeContext) {          if(message.getTopic().equals("自己的TOPIC")){//避免消费到其他消息  json转换报错             try {              byte[] body = message.getBody();             String res = new String(body);                          //res 是生产者传过来的消息内容                  //业务代码              }else{                 logger.warn("!");             }              } catch (Exception e) {                 logger.error("MessageListener.consume error:" + e.getMessage(), e);             }              logger.info("MessageListener.Receive message”);             //如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater             return Action.CommitMessage;         }else{             logger.warn();             return Action.ReconsumeLater;         }      }

注意,由于消费者是多线程的,所以对象要用static+set注入,并且无法调用父类的方法和变量

消费者状态可以查看消费者是否连接成功,消费是否延迟,消费速度等

重置消费位点可以清空所有消息

本文发表于2017年11月03日 16:34
(c)注:本文转载自https://my.oschina.net/u/3670641/blog/1560267,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 3884 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1