spring-boot-rabbitmq动态管理


声明:本文转载自https://my.oschina.net/dengfuwei/blog/1595044,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

使用spring boot + rabbitmq的时候,在开发过程中,可能会想要临时停用/启用监听,或修改监听消费者数量。如果每次修改都重启比较浪费时间,所以研究了一下不停机就启用停用监听或修改一些配置

一. 关于rabbitmq监听的配置

  • 配置属性类:RabbitProperties,包含rabbitmq的认证、监听、发送者以及其他的一些配置
  • 自动配置类:RabbitAutoConfiguration,主要配置rabbitmq的连接工厂和发送者等,不包含监听的配置
  • rabbitmq监听的配置是RabbitAnnotationDrivenConfiguration,是通过RabbitAutoConfiguration引入的
@Configuration @ConditionalOnClass({ RabbitTemplate.class, Channel.class }) @EnableConfigurationProperties(RabbitProperties.class) @Import(RabbitAnnotationDrivenConfiguration.class) public class RabbitAutoConfiguration {     ... } 
  • RabbitAnnotationDrivenConfiguration中主要就是监听工厂的配置、监听工厂,但是这里也只是创建bean,并没有真正的初始化
  • 通过配置里的bean类名,分析一下,rabbitmq的监听肯定是由监听工厂创建的,所以找到监听工厂SimpleRabbitListenerContainerFactory
@Bean @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( 		SimpleRabbitListenerContainerFactoryConfigurer configurer, 		ConnectionFactory connectionFactory) {     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();     configurer.configure(factory, connectionFactory);     return factory; } 
  • 既然自动配置里面没有初始化监听,那就应该是在其他地方调用的,进入监听工厂类中,发现有initializeContainer(SimpleMessageListenerContainer instance)方法,猜测初始化肯定与这个方法有关,所以查看有哪些地方调用,于是找到RabbitListenerEndpointRegistry.createListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory)方法中有创建监听容器和初始化的代码
/**  * Create and start a new {@link MessageListenerContainer} using the specified factory.  * @param endpoint the endpoint to create a {@link MessageListenerContainer}.  * @param factory the {@link RabbitListenerContainerFactory} to use.  * @return the {@link MessageListenerContainer}.  */ protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint, 		RabbitListenerContainerFactory<?> factory) {      MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);          if (listenerContainer instanceof InitializingBean) {     	try {             ((InitializingBean) listenerContainer).afterPropertiesSet();     	}     	catch (Exception ex) {             throw new BeanInitializationException("Failed to initialize message listener container", ex);     	}     }          int containerPhase = listenerContainer.getPhase();     if (containerPhase < Integer.MAX_VALUE) {  // a custom phase value     	if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {             throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +             		this.phase + " vs " + containerPhase);     	}     	this.phase = listenerContainer.getPhase();     }          return listenerContainer; } 
  • 继续找调用这个方法的地方,找到RabbitListenerEndpointRegistrar.afterPropertiesSet()方法之后,发现调用的地方很多了

    调用RabbitListenerEndpointRegistrar.afterPropertiesSet()的地方

  • 看看afterPropertiesSet方法,是InitializingBean接口中的,猜测应该是spring容器创建bean之后都会调用的bean初始化的方法,所以查找找到RabbitListenerEndpointRegistrar是在哪里创建的实例。原来是在RabbitListenerAnnotationBeanPostProcessor中的私有属性,而RabbitListenerAnnotationBeanPostProcessor是在RabbitBootstrapConfiguration这个自动配置里面初始化的,所以这就找到rabbitmq初始化监听的源头了

二. 动态管理rabbitmq监听

  • 回到最初的问题,想要动态的启用停用mq的监听,所以先看看初始化配置的类,既然有初始化,那可能会有相关的管理,于是在RabbitListenerEndpointRegistry中找到了start()和stop()方法,里面有对监听容器进行操作,主要源码如下
/**  * @return the managed {@link MessageListenerContainer} instance(s).  */ public Collection<MessageListenerContainer> getListenerContainers() {     return Collections.unmodifiableCollection(this.listenerContainers.values()); } 	 @Override public void start() {     for (MessageListenerContainer listenerContainer : getListenerContainers()) {     	startIfNecessary(listenerContainer);     } }  /**  * Start the specified {@link MessageListenerContainer} if it should be started  * on startup or when start is called explicitly after startup.  * @see MessageListenerContainer#isAutoStartup()  */ private void startIfNecessary(MessageListenerContainer listenerContainer) {     if (this.contextRefreshed || listenerContainer.isAutoStartup()) {     	listenerContainer.start();     } }  @Override public void stop() {     for (MessageListenerContainer listenerContainer : getListenerContainers()) {     	listenerContainer.stop();     } } 
  • 写个controller,注入RabbitListenerEndpointRegistry,使用start()和stop()对监听进行启用停用的操作,并且RabbitListenerEndpointRegistry实例还可以获取监听容器,对监听的一些参数也能进行修改,比如消费者数量。代码如下:
import java.util.Set;  import javax.annotation.Resource;  import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;  import com.itopener.framework.ResultMap;  /**  * Created by fuwei.deng on 2017年7月24日.  */ @RestController @RequestMapping("rabbitmq/listener") public class RabbitMQController {      @Resource     private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;          @RequestMapping("stop")     public ResultMap stop(){     	rabbitListenerEndpointRegistry.stop();     	return ResultMap.buildSuccess();     }          @RequestMapping("start")     public ResultMap start(){     	rabbitListenerEndpointRegistry.start();     	return ResultMap.buildSuccess();     }          @RequestMapping("setup")     public ResultMap setup(int consumer, int maxConsumer){     	Set<String> containerIds = rabbitListenerEndpointRegistry.getListenerContainerIds();     	SimpleMessageListenerContainer container = null;     	for(String id : containerIds){     		container = (SimpleMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(id);     		if(container != null){     			container.setConcurrentConsumers(consumer);     			container.setMaxConcurrentConsumers(maxConsumer);     		}     	}     	return ResultMap.buildSuccess();     }      } 

本文发表于2017年12月26日 22:32
(c)注:本文转载自https://my.oschina.net/dengfuwei/blog/1595044,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2466 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1