初识ActiveMQ


声明:本文转载自https://my.oschina.net/u/2486137/blog/1539259,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

①.消息传递方式介绍:

Activemq支持两种方式的消息传递:

    广播模式:1-n的方式,是一种发布订阅模式,像腾讯新闻那样,只要我们微信关注了腾讯新闻,那么每个人都会收到推送的新闻

    队列模式:1-1的方式,只能有一个消费者端消费生产者生产的数据

②.消息类型介绍:

        Activemq提供了两种消息类型:持久化和非持久化:

        消息生产者使用持久(persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者(ProducerAck),这个确认消息暗示broker已经成功接收到消息并把消息保存到二级存储中。这个过程通常称为同步发送。速度较慢,数据基本不会丢失.可以持久化到kahaDB(aMq默认采用kahaDB存储引擎来存储消息)或数据库中

         异步发送不会在受到 broker 的确认之前一直阻塞 Producer.send 方法,速度较快,不过可能会造成数据的丢失.

消息签收方式:

      AUTO_ACKNOWLEDGE    自动确认

      CLIENT_ACKNOWLEDGE    客户端手动确认   

      DUPS_OK_ACKNOWLEDGE   自动批量确认

      SESSION_TRANSACTED    事务提交并确认

③.下载可以到apache activeMQ官网下载

④.我这里下载的是windows 5.10版本的就以此为例做介绍

解压之后进入bin目录根据操作系统找到对应的,activemq.bat文件双击运行

activeMQ内置有一个控制台可以访问http://localhost:8161/查看,默认账户密码皆为admin

⑤.客户端代码(java为例):

 

public class ActiveMqBo { 	 	// 创建连接工厂 	private ConnectionFactory  factory = new ActiveMQConnectionFactory(ActiveMqConstant.URL); 	 	/** 	 * 获取一个连接 	 * @return 	 * @throws JMSException 	 */ 	public  Connection getConnection() throws JMSException { 		Connection conn; 		try { 			conn = factory.createConnection(); 		} catch (JMSException e) { 			e.printStackTrace(); 			throw e; 		} 		return conn; 	} 	 }
public static void main(String[] args) { 		ActiveMqBo mq = new ActiveMqBo(); 		Connection conn = null; 		Session session = null; 		MessageProducer producer = null; 		try { 			// 获取一个连接 			conn = mq.getConnection(); 			conn.start(); 			// 签收方式 			session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 			// 创建一个队列 			Destination destination = session.createQueue("queue-1"); 			// 获取一个生产者 			producer = session.createProducer(destination); 			/* 			 * 持久化,会通过kahadb把消息存入到db.log中,直到被消费后进行清除 			 * 速度较慢              * DeliveryMode.NON_PERSISTENT非持久化              *  DeliveryMode.PERSISTENT持久化 			 */ 			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 			// producer.setTimeToLive(5000);//5秒后过期,这个对点对点模式有效 			for (int i = 0; i < 1000; i++) { 				MessageDto mes = new MessageDto(); 				mes.setCode("" + (i + 1)); 				mes.setMessage("send mes:" + (i + 1)); 				producer.send(session.createObjectMessage(mes)); 				System.out.println("send mes : " + mes); 			}  		} catch (Exception e) { 			e.printStackTrace(); 		} finally { 			try { 				if (producer != null) { 					producer.close(); 				} 				if (session != null) { 					session.close(); 				} 				if (conn != null) { 					conn.close(); 				} 			} catch (JMSException e) { 				e.printStackTrace(); 			} 		} 	}

日志部分:

SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/D:/repository/ch/qos/logback/logback-classic/1.0.13/logback-classic-1.0.13.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/D:/repository/org/apache/activemq/activemq-all/5.10.0/activemq-all-5.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder] 11:27:42.251 [main] DEBUG o.a.a.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=127.0.0.1, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 11:27:42.252 [ActiveMQ Transport: tcp:///127.0.0.1:61616@54559] DEBUG o.a.a.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 11:27:42.257 [main] DEBUG o.a.a.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=127.0.0.1, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 11:27:42.260 [ActiveMQ Transport: tcp:///127.0.0.1:61616@54559] DEBUG o.a.a.transport.WireFormatNegotiator - tcp:///127.0.0.1:61616@54559 before negotiation: OpenWireFormat{version=10, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807} 11:27:42.261 [ActiveMQ Transport: tcp:///127.0.0.1:61616@54559] DEBUG o.a.a.transport.WireFormatNegotiator - tcp:///127.0.0.1:61616@54559 after negotiation: OpenWireFormat{version=10, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600} send mes : MessageDto [code=1, message=send mes:1] send mes : MessageDto [code=2, message=send mes:2] send mes : MessageDto [code=3, message=send mes:3] send mes : MessageDto [code=4, message=send mes:4] send mes : MessageDto [code=5, message=send mes:5] send mes : MessageDto [code=6, message=send mes:6] send mes : MessageDto [code=7, message=send mes:7] send mes : MessageDto [code=8, message=send mes:8] ........省略 send mes : MessageDto [code=996, message=send mes:996] send mes : MessageDto [code=997, message=send mes:997] send mes : MessageDto [code=998, message=send mes:998] send mes : MessageDto [code=999, message=send mes:999] send mes : MessageDto [code=1000, message=send mes:1000] 11:27:42.490 [main] DEBUG o.a.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@2e0f6c25[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds. 11:27:42.490 [ActiveMQ Transport: tcp:///127.0.0.1:61616@54559] DEBUG o.a.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@768508c2[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds. 11:27:42.491 [main] DEBUG o.a.a.transport.tcp.TcpTransport - Stopping transport tcp:///127.0.0.1:61616@54559 11:27:42.492 [main] DEBUG o.a.a.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@b30e9f8[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 11:27:42.493 [ActiveMQ Task-1] DEBUG o.a.a.transport.tcp.TcpTransport - Closed socket Socket[addr=/127.0.0.1,port=61616,localport=54559] 11:27:42.493 [main] DEBUG o.a.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@b30e9f8[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0] 

这里我们再看控制台,会发现queue1有数据信息,因为我们是非持久化方式发送消息,我们关掉activemq后在重启会发现数据丢失

,再以持久化的方式测试:

只需要将这里设置为PERSISTENT即可

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

我们再启动刚才的代码会发现发送数据的速度很明显的降低,但是我们关闭activemq后再重启,刷新控制台数据没有丢失.

⑥.消费端:

public class Consumer { 	 	public static void main(String[] args) { 		try { 			MessageUtil.getConsumer("queue-1", Boolean.TRUE);         } catch (JMSException e) { 	        e.printStackTrace();         }     } 	 }
public class MessageUtil { 	private static ActiveMqBo mq = new ActiveMqBo(); 	private static Connection conn = null; 	private static Session session = null; 	private static void init(){ 		try { 			// 获取一个连接 			if(conn == null){ 				conn = mq.getConnection(); 			} 			conn.start(); 			// 自动提交事务 			if(session == null){ 				 /*Session.AUTO_ACKNOWLEDGE  消息自动签收                   Session.CLIENT_ACKNOWLEDGE  客戶端调用acknowledge方法手动签收                   Session.DUPS_OK_ACKNOWLEDGE 不是必须签收,消息可能会重复发送。在第二次重新传送消息的时候,消息                   	头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。*/ 				session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 			} 		} catch (Exception e) { 			e.printStackTrace(); 		} 	} 	/** 	 *  	 * @param obj 序列化对象 	 * @param topic 	 * @param isQueue 	 * @throws JMSException 	 */ 	public static void sendObjectMessage(Serializable obj,String topic,boolean isQueue) throws JMSException{ 		init(); 		MessageProducer producer = getProducer(getDestination(topic, isQueue)); 		producer.send(session.createObjectMessage(obj)); 		destroy(producer); 	} 	 	public static void sendTextMessage(String mes,String topic,boolean isQueue) throws JMSException{ 		init(); 		MessageProducer producer = getProducer(getDestination(topic, isQueue)); 		producer.send(session.createTextMessage(mes)); 		destroy(producer); 	} 	 	private static MessageProducer getProducer(Destination destination) throws JMSException{ 		MessageProducer producer = session.createProducer(destination); 		/** PERSISTENT(持久性消息): 		 *   这是ActiveMQ的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。 		 *   可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。这意味着在持久性消息传送至目标时, 		 *   消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败, 		 *   它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。 		 *  NON_PERSISTENT(非持久性消息): 		 *   保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。 		 *   此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。 		 *    		 */ 		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 		return producer; 	}  	private static Destination getDestination(String topic, boolean isQueue) throws JMSException { 		Destination destination = null; 		if (isQueue) { 			destination = session.createQueue(topic); 		} else { 			destination = session.createTopic(topic); 		} 		return destination; 	} 	 	public static void getConsumer(String topic, boolean isQueue) throws JMSException{ 		init(); 		MessageConsumer consumer = session.createConsumer(getDestination(topic, isQueue)); 		if(Arrays.asList(ActiveMqConstant.QUEUES).contains(topic) || 				Arrays.asList(ActiveMqConstant.TOPICS).contains(topic) ){ 			MessageListener listener = ActiveMqConstant.listeners.get(topic); 			consumer.setMessageListener(listener); 		} 	} 	 	private static void destroy(MessageProducer producer) throws JMSException{ 		if(producer != null){ 			producer.close(); 		} 		if(session!=null){ 			session.close(); 			session = null; 		} 		if(conn!=null){ 			conn.close(); 			conn = null; 		} 	} 	public static void destroy(MessageConsumer consumer) throws JMSException{ 		if(consumer != null){ 			consumer.close(); 			consumer = null; 		} 		if(session!=null){ 			session.close(); 			session = null; 		} 		if(conn!=null){ 			conn.close(); 			conn = null; 		} 	}
public class ActiveMqConstant { 	public static final String USERNAME = "admin", PASSWORD = "admin"; 	public static final String URL = "tcp://127.0.0.1:61616"; 	public static final String[] QUEUES = { "queue-1", "queue2", "queue3" }; 	public static final String[] TOPICS = { "topic1", "topic2", "topic3" }; 	public static Map<String, MessageListener> listeners = new LinkedHashMap<String, MessageListener>(); 	static{ 		init(); 	} 	private static void init(){ 		//后期可以从xml中配置获取 		listeners.put("queue-1",new ActiveMQMessageListener()); 		listeners.put("queue2",new ActiveMQMessageListener2()); 		listeners.put("queue3",new ActiveMQMessageListener3()); 		 		listeners.put("topic1",new ActiveMQMessageListener()); 		listeners.put("topic2",new ActiveMQMessageListener2	()); 		listeners.put("topic3",new ActiveMQMessageListener3()); 		 	} 	private ActiveMqConstant() { 	}  }
....省略 receiver:MessageDto [code=402, message=send mes:402] receiver:MessageDto [code=403, message=send mes:403] receiver:MessageDto [code=404, message=send mes:404] ...省略 receiver:MessageDto [code=2085, message=send mes:2085] receiver:MessageDto [code=2086, message=send mes:2086] receiver:MessageDto [code=2087, message=send mes:2087] receiver:MessageDto [code=2088, message=send mes:2088] receiver:MessageDto [code=2089, message=send mes:2089] 13:53:21.896 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 13:53:21.899 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 13:53:31.897 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 13:53:31.897 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 13:53:41.897 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - 30002ms elapsed since last read check. 13:53:41.898 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 13:53:41.898 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 13:53:51.899 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 13:53:51.899 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 13:54:01.900 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 13:54:01.901 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 13:54:11.899 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - 30002ms elapsed since last read check. 13:54:11.901 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 13:54:11.901 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 13:54:21.902 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 13:54:21.902 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 13:54:31.903 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 13:54:31.903 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 

  再次刷新控制台看界面变化

⑦.了解activemq的目录结构 ,会发现如下几个文件:

db.data,db.redo,db-1.log

在消息未被消费之前会将数据保存在db-*.log中, 其中activemq默认每超过32m重新生成一个新的日志文件.

db.data:存储btree索引 ,BTree索引,保存消息的引用,并按照message ID排序。
db.redo:用来保证MQ broker未干净关闭情况下,用于BTree index的重建。

注意:对于非持久化的数据如果未及时消费,当activemq宕机时,保存的db-*.log等信息在下次启动时全部丢失.

广播模式不再介绍,跟队列方式相似,可以自己多开几个consumer看接收到的内容是否一致

 

 

 

本文发表于2017年09月18日 14:37
(c)注:本文转载自https://my.oschina.net/u/2486137/blog/1539259,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2193 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1