ActiveMQ (二)


声明:本文转载自https://my.oschina.net/u/4118479/blog/3045926,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。


 今天继续给大家分享的是ActiveMQ,如有不足,敬请指教。

 上次我们说到,我们发现消费者每次只能消费一条消息。当队列中有多条消息的时候,我们需要多次运行消费者,才能消费完这些消息。很麻烦!!!!如何解决这个问题呢?

 这就需要使用ActiveMQ监听器来监听队列,持续消费消息。

一、ActiveMQ监听器

1.1 配置步骤说明

  • 创建一个监听器对象。
  • 修改消费者代码,加载监听器

1.2 配置步骤

1.2.1 创建监听器MyListener类

package com.xkt.listener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * @author lzx
 *
 */
public class MyListener implements MessageListener {

	@Override
	public void onMessage(Message message) {

		if (null != message) {
			if (message instanceof TextMessage) {
				try {
					TextMessage tMsg = (TextMessage) message;
					String content = tMsg.getText();
					System.out.println("监听到的消息是 " + content);
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

}

1.2.2 修改MyConsumer代码,加载监听器

  • 监听器需要持续加载,因此消费程序不能结束。这里我们使用输入流阻塞消费线程结束
package com.xkt.consumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.xkt.listener.MyListener;

/**
 * @author lzx
 *
 */
public class Myconsumer {

	private ConnectionFactory factory;

	private Connection connection;

	private Session session;

	private Destination destination;

	private MessageConsumer consumer;

	public void receiveFromMq() {

		try {
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");

			connection = factory.createConnection();
			connection.start();

			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			// 创建目的地, 目的地命名即队列命名, 消息消费者需要通过此命名访问对应的队列
			destination = session.createQueue("queue");

			// 5.创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地
			consumer = session.createConsumer(destination);

			// 7.加载监听器
			consumer.setMessageListener(new MyListener());
			// 监听器需要持续加载,这里我们使用输入流阻塞当前线程结束。监听指定队列,只要有消息进来,就消费这条消息
			System.in.read();

			// 在java项目中,可以通过IO阻塞程序,持续加载监听器
			// 在web项目中,可以通过配置文件,直接加载监听器。

		} catch (Exception e) {
			e.printStackTrace();
			System.out.println("读取失败");
		} finally {
			if (null != consumer) {
				try {
					consumer.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != session) {
				try {
					session.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != connection) {
				try {
					connection.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}

	}

}

1.2.3 测试

  • 多次运行生产者,发送多条消息到队列中
图示
  • 运行消费者。观察结果
图示
  • 查看ActiveMQ管理控制界面,所有消息都被消费了!
图示

 在以上示例中,只能向一个消费者发送消息。但是有一些场景,需求有多个消费者都能接收到消息,比如:美团APP每天的消息推送。该如何实现呢?

二、Topic模式实现

2.1 配置步骤说明

  1. 搭建ActiveMQ消息服务器。(略)
  2. 创建主题订阅者。
  3. 创建主题发布者。

2.2 配置步骤

2.2.1 创建主题订阅者MySubscriber

  • 说明:主题订阅模式下,可以有多个订阅者。我们这里用多线程来模拟
package com.xkt.subscriber;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author lzx
 *
 */
public class MySubscriber implements Runnable {

	/**
	 * 多线程的线程安全问题 解决方案:
	 * 
	 * (1)加锁 --极不推荐 (2)不使用全局变量 ---> SpringMVC是线程安全的吗? 答:默认不是 解决办法:(1)使用原型模式--不推荐
	 * (2)不使用全局变量 (3)ThreadLocal (3)其它框架来代替,比如redis
	 */
	private TopicConnectionFactory factory;

	private TopicConnection connection;

	private TopicSession session;

	private Topic topic;

	private TopicSubscriber subscriber;

	private Message message;

	@Override
	public void run() {

		try {
			// 1、创建连接工厂
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");

			// 2、创建连接
			connection = factory.createTopicConnection();
			connection.start();

			// 3、创建会话
			session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

			// 4、创建topic主题
			topic = session.createTopic("topic-gzsxt");

			// 5、创建订阅者
			subscriber = session.createSubscriber(topic);

			// 6、订阅
			while (true) {

				message = subscriber.receive();

				if (null != message) {
					if (message instanceof TextMessage) {
						TextMessage tMsg = (TextMessage) message;

						String content = tMsg.getText();

						System.out.println("订阅者: " + Thread.currentThread().getName() + " 接收的消息是:" + content);
					}
				}
			}

		} catch (JMSException e) {

			e.printStackTrace();
		}

	}

}

2.2.2 修改测试类

package com.xkt.test;

import com.xkt.subscriber.MySubscriber;

/**
 * @author lzx
 *
 */
public class TestMQ {

	public static void main(String[] args) {

		MySubscriber sub = new MySubscriber();

		Thread t1 = new Thread(sub);

		Thread t2 = new Thread(sub);

		t1.start();

		t2.start();
	}

}

2.2.3 查看测试结果

  • 查看AcitveMQ管理界面 |图示 | | :------------: | | |

2.2.4 创建主题发布者MyPublisher

package com.xkt.publish;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author lzx
 *
 */
public class MyPublisher {

	private TopicConnectionFactory factory;

	private TopicConnection connection;

	private TopicSession session;

	private Topic topic;

	private TopicPublisher publisher;

	private Message message;

	public void publish(String msg) {

		try {

			// 1、创建连接工厂
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");

			// 2、创建连接
			connection = factory.createTopicConnection();
			connection.start();

			// 3、创建会话
			session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

			// 4、创建topic主题
			topic = session.createTopic("topic-gzsxt");

			// 5、创建发布者
			publisher = session.createPublisher(topic);

			// 6、创建消息对象
			message = session.createTextMessage(msg);

			// 7、发布消息
			publisher.publish(message);

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (null != publisher) {
				try {
					publisher.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != session) {
				try {
					session.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != connection) {
				try {
					connection.stop();

					connection.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != session) {
				try {
					session.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}

	}
}

2.2.5 修改测试类

package com.xkt.test;

import org.junit.Test;

import com.xkt.publish.MyPublisher;
import com.xkt.subscriber.MySubscriber;

/**
 * @author lzx
 *
 */
public class TestMQ {

	public static void main(String[] args) {

		MySubscriber sub = new MySubscriber();

		Thread t1 = new Thread(sub);

		Thread t2 = new Thread(sub);

		t1.start();

		t2.start();
	}

	@Test
	public void publish() {
		MyPublisher publisher = new MyPublisher();

		publisher.publish("hello,欢迎收听FM 89.9频道-交通频道");
	}
}

2.2.6 查看测试结果

2.3 Topic小结

  1. Topic模式能够实现多个订阅者同时消费消息。
  2. Topic主题模式下,消息不会保存,只有在线的订阅者才会接收到消息。
  3. 通常可以用来解决公共消息推送的相关业务。

版权说明:欢迎以任何方式进行转载,但请在转载后注明出处!

本文发表于2019年05月06日 17:00
(c)注:本文转载自https://my.oschina.net/u/4118479/blog/3045926,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1693 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1