续谈ActiveMQ之java如何操作ActiveMQ(springBoot项目)


声明:本文转载自https://my.oschina.net/u/2486137/blog/1593739,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

 

引入maven依赖          <!-- activemq --> 		 <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-activemq</artifactId>         </dependency>

为了便于管理mq这里统一在xml中配置:

<mq-clients> 	<producer> 		<id>demo.test</id> 		<topic>MQ_TEST</topic> 		<mq.type>1</mq.type> 		<delivery.mode>1</delivery.mode> 		<acknowledge>1</acknowledge> 	</producer> 	<producer> 		<id>demo.test2</id> 		<topic>MQ_TEST2</topic> 		<mq.type>2</mq.type> 		<delivery.mode>1</delivery.mode> 		<acknowledge>1</acknowledge> 	</producer> 	<consumer> 		<id>demo.consumer.test1</id> 		<topic>MQ_TEST</topic> 		<mq.type>1</mq.type> 		<message.listener>com.ule.microtopup.mq.listener.ActiveMQMessageListener</message.listener> 	</consumer> 	<consumer> 		<id>demo.consumer.test2</id> 		<topic>MQ_TEST2</topic> 		<mq.type>2</mq.type> 		<message.listener>com.ule.microtopup.mq.listener.ActiveMQMessageListener2</message.listener> 	</consumer> </mq-clients>

XMLUtil:用来读取xml

/** 	 * 获取所有节点 	 * @param root 根节点 	 * @param map  记录每个节点及值 	 */ 	@SuppressWarnings("unchecked") 	private static void getNode(Element root, LinkedHashMap<String, String> map) { 		List<Element> list = root.elements(); 		Iterator<Element> iterator = list.iterator(); 		while (iterator.hasNext()) { 			Element element = iterator.next(); 			if (element.elements() != null && element.elements().size() > 0) { 				System.out.println("element:"+element.getName()); 				getNode(element, map);  			} else { 				map.put(element.getParent().getName() + "." + element.getName(), 				        element.getTextTrim()); 			} 		}  	} 	/** 	 * 读XML文件指定节点内容 	 * @param xmlName  xml文件名 	 * @param nodeName 指定节点 	 * @return 	 * @throws Exception 	 */ 	public static Map<String, String> reader(String xmlName,String nodeName)throws Exception{ 		if(StringUtils.isEmpty(xmlName)){ 			throw new NullPointerException("xmlName cannot be null!"); 		} 		 		LinkedHashMap<String, String> returnValue = new LinkedHashMap<String, String>(); 		InputStream in = XMLUtil.class.getClassLoader().getResourceAsStream(xmlName); 		SAXReader reader = new SAXReader(); 		Document document = reader.read(in); 		Element root = document.getRootElement(); 		if(StringUtils.isNotEmpty(nodeName)){ 			root = document.getRootElement().element(nodeName); 		} 		//获取节点 		getNode(root, returnValue); 		 		if (returnValue.size()>0) { 			for (String key : returnValue.keySet()) { 				System.out.println("key:" + key + " ,value:" + returnValue.get(key)); 			}  		} 		return returnValue; 	} 	 	 	/** 	 * 读XML文件所有内容,并将文件转成对象 	 * @param xmlName 文件名 	 * @param cls 	 * @return 	 * @throws Exception 	 */ 	@SuppressWarnings("unchecked")     public static <T> T readerXmlToBean(String xmlName ,Class<?>...cls)throws Exception{ 		if(StringUtils.isEmpty(xmlName)){ 			throw new NullPointerException("xmlName cannot be null!"); 		} 		InputStream in = XMLUtil.class.getClassLoader().getResourceAsStream(xmlName); 		JAXBContext context = JAXBContext.newInstance(cls);// 获取上下文对象           Unmarshaller unmarshaller = context.createUnmarshaller();         T t =  (T)unmarshaller.unmarshal(in); 		return t; 	}

Producer:

@XmlRootElement(name="producer")   public class Producer { 	private String id; 	// 主题 	private String topic; 	// 类型,1-queue,2-topic 	private Integer mqType; 	// 持久化方式 :1-非持久,2-持久化 	private Integer deliveryMode; 	// 签收方式:1-自动签收,2-客户端确认,3-自动批量确认,0-事务提交并确认 	private Integer acknowledge;  //省略get set }

Consumer:

@XmlRootElement(name = "consumer") public class Consumer { 	private String id; 	private String topic; 	private Integer mqType; 	private Class<? extends MessageListener> messageListener; ... }

MessageUtil:mq消息集中处理类,包括发送消息,启动消费监听等

private static MqConnectionFactory mqFactory = MqConnectionFactory.INSTANCE; 	private static Connection conn = null; 	private static Session session = null;  	public static void init() { 		try { 			// 获取一个连接 			if (conn == null) { 				conn = mqFactory.getConnection(); 			} 			conn.start(); 			// 自动提交事务 			if (session == null) { 				/* 				 * Session.AUTO_ACKNOWLEDGE 消息自动签收 				 * Session.CLIENT_ACKNOWLEDGE 客戶端调用acknowledge方法手动签收 				 * Session.DUPS_OK_ACKNOWLEDGE 不是必须签收,消息可能会重复发送。在第二次重新传送消息的时候,消息 				 * 头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。 				 */ 				session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 			} 		} catch (Exception e) { 			e.printStackTrace(); 		} 	}  	/** 	 *  	 * @param obj 序列化对象 	 * @param topic 	 * @param isQueue 	 * @throws Exception 	 */ 	public static void sendObjectMessage(Serializable obj, String id) 	        throws Exception { 		init(); 		Producer p = getProducerById(id); 		MessageProducer producer = getMessageProducer(getDestination(p), p.getDeliveryMode()); 		producer.send(session.createObjectMessage(obj)); 		destroy(producer); 	}  	private static Producer getProducerById(String id) { 		Producer p = MQUtil.getProducerById(id); 		if (p == null) { 			throw new NullPointerException("according to id:" + id + ", not found produer."); 		} 		return p; 	}  	public static void sendTextMessage(String mes, String id) 	        throws Exception { 		init(); 		Producer p = getProducerById(id); 		MessageProducer producer = getMessageProducer(getDestination(p), p.getDeliveryMode()); 		producer.send(session.createTextMessage(mes)); 		destroy(producer); 	}  	private static MessageProducer getMessageProducer(Destination destination, Integer deliveryMode) 	        throws Exception { 		MessageProducer producer = session.createProducer(destination); 		/** 		 * PERSISTENT(持久性消息): 		 * 这是ActiveMQ的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。 		 * 可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。这意味着在持久性消息传送至目标时, 		 * 消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败, 		 * 它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。 		 * NON_PERSISTENT(非持久性消息): 		 * 保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。 		 * 此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。 		 *  		 */ 		producer.setDeliveryMode(deliveryMode); 		return producer; 	}  	private static Destination getDestination(Producer p) throws Exception { 		return getDestination(p.getMqType(), p.getTopic()); 	}  	private static Destination getDestination(Consumer c) throws Exception { 		return getDestination(c.getMqType(), c.getTopic()); 	}  	private static Destination getDestination(Integer mqType, String topic) throws Exception { 		Destination destination = null; 		if (ActiveMqType.QUEUE == mqType) 			destination = session.createQueue(topic); 		else if (ActiveMqType.TOPIC == mqType) 			destination = session.createTopic(topic); 		else 			throw new IllegalArgumentException("mqType must be 1 or 2."); 		return destination; 	} 	/** 	 * 启动所有监听 	 * @param c 	 * @throws Exception 	 */ 	public static void startConsumer(Consumer c) throws Exception { 		init(); 		MessageConsumer consumer = session.createConsumer(MessageUtil.getDestination(c)); 		MessageListener listener = c.getMessageListener().newInstance(); 		consumer.setMessageListener(listener); 	}  	private static void destroy(MessageProducer producer) throws JMSException { 		if (producer != null) { 			producer.close(); 		} 		if (session != null) { 			session.close(); 			session = null; 		} 		if (conn != null) { 			conn.close(); 			conn = null; 		} 	}  	public static void destroy(MessageConsumer consumer) throws JMSException { 		if (consumer != null) { 			consumer.close(); 			consumer = null; 		} 		if (session != null) { 			session.close(); 			session = null; 		} 		if (conn != null) { 			conn.close(); 			conn = null; 		} 	}

细节不在赘述,具体代码已上传至码云:https://gitee.com/savage_xiao/boot.demo/tree/master

有兴趣可以下载下来看一下,其中有包含其他springboot的研究

测试代码:

public static void main(String[] args) { 		try { 			for(int i = 101; i<200;i++){ 				MessageUtil.sendTextMessage("hello world!"+","+(i+1), "demo.test"); 			}         } catch (Exception e) { 	        e.printStackTrace();         } 	}  -----------------------  public static void main(String[] args) { 		try { 			for(int i = 0; i<100;i++){ 				MessageUtil.sendTextMessage("hello world2!"+","+(i+1), "demo.test2"); 			}         } catch (Exception e) { 	        e.printStackTrace();         } 	}

 

测试结果:

listener2:hello world2!,1 listener2:hello world2!,2 listener2:hello world2!,3 listener2:hello world2!,4 listener2:hello world2!,5 .... ....略 listener2:hello world2!,98 listener2:hello world2!,99 listener2:hello world2!,100 listener:hello world!,102 listener:hello world!,103 ... ...略 listener:hello world!,197 listener:hello world!,198 listener:hello world!,199 listener:hello world!,200

ActiveMq如何搭建如何使用请看上一篇:https://my.oschina.net/u/2486137/blog/1539259

本文发表于2017年12月22日 20:32
(c)注:本文转载自https://my.oschina.net/u/2486137/blog/1593739,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1771 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1