ActiveMQ (一)


声明:本文转载自https://my.oschina.net/u/4118479/blog/3045575,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。


 今天给大家分享的是ActiveMQ,如有不足,敬请指教。

 那么我们必须知道ActiveMQ是什么。

一、ActiveMQ简介

1.1 ActiveMQ是什么

  • ActiveMQ是一个消息队列应用服务器。支持JMS规范。

1.1.1 JMS概述

  • 全称:Java Message Service ,即为Java消息服务,是一套java消息服务的API标准。(标准即接口)
  • 实现了JMS标准的系统,称之为JMS Provider。

1.1.2 消息队列

1.1.2.1 概念

  • 消息队列是在消息的传输过程中保存消息的容器,提供一种不同进程或者同一进程不同线程直接通讯的方式
图示
  1. Producer:消息生产者,负责产生和发送消息到 Broker;
  2. Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
  3. Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;

1.2 ActiveMQ能做什么

  • 实现两个不同应用(程序)之间的消息通讯。
  • 实现同一个应用,不同模块之间的消息通讯。

1.3 ActiveMQ下载

1.4 ActiveMQ主要特点

  1. 支持多语言、多协议客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 对Spring的支持,ActiveMQ可以很容易整合到Spring的系统里面去。
  3. 支持高可用、高性能的集群模式

二、示例

2.1 需求

  • 使用ActiveMQ实现消息队列模型

2.2 配置步骤说明

  1. 搭建ActiveMQ消息服务器(略)。
  2. 创建一个java项目。
  3. 创建消息生产者,发送消息。
  4. 创建消息消费者,接收消息

2.3 第一部分 创建java项目,导入jar包

图示

2.4 第二部分 创建消息生成者,发送消息

2.4.1 创建MyProducer类,定义sengMsg2MQ方法

package com.xkt.producer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author lzx
 *
 */
public class MyProducer {

	// 定义链接工厂
	private ConnectionFactory factory;
	// 定义链接
	private Connection connection;
	// 定义会话
	private Session session;
	// 定义目的地
	private Destination destination;
	// 定义消息
	private Message message;
	// 定义消息生生产者
	private MessageProducer producer;

	public void sengMsg2MQ(String msg) {

		try {
			/*
			 * 1、创建链接工厂
			 * 
			 * ActiveMQConnectionFactory(userName, password, brokerURL)
			 * 
			 * userName:用户名 默认admin password:密码 默认admin brokerURL:消息服务中心地址
			 * tcp://0.0.0.0:61616 基于tcp协议
			 */
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");

			// 2.创建链接
			connection = factory.createConnection();
			// 开启链接
			connection.start();

			/*
			 * 3、创建会话
			 * 
			 * createSession(transacted, acknowledgeMode)
			 * 
			 * transacted:是否使用事物 true|false true 表示使用事物,每次对消息进行读写之后,要提交事物。如果使用了事物,则消息确认机制失效
			 * false 表示不使用事物
			 * 
			 * acknowledgeMode: 消息确认机制 Session.AUTO_ACKNOWLEDGE -
			 * 自动确认消息机制,一旦读取到消息,则消费成功,消息出队列,避免重复消费 Session.CLIENT_ACKNOWLEDGE -
			 * 客户端确认消息机制,手动确认,即消费了消息成功之后,再确认 Session.DUPS_OK_ACKNOWLEDGE -
			 * 有副本的客户端确认消息机制。集群模式下
			 * 
			 */
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			// 4.创建队列
			destination = session.createQueue("test-mq");

			// 5.创建消息对象
			message = session.createTextMessage(msg);

			// 6.创建消息生产者
			producer = session.createProducer(destination);

			// 7.发送消息
			producer.send(message);

			// session.commit();

			System.out.println("消息发送成功");
		} catch (JMSException e) {
			e.printStackTrace();
			System.out.println("消息发送失败");
		} finally {
			// 回收消息发送者资源
			if (null != producer) {
				try {
					producer.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}

			// 回收会话资源
			if (null != session) {
				try {
					session.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}

			// 回收链接资源
			if (null != connection) {
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

}

2.4.2 创建一个测试类

package com.xkt.test;

import org.junit.Test;

import com.xkt.consumer.Myconsumer;
import com.xkt.producer.MyProducer;

/**
 * @author lzx
 *
 */
public class MessageTest {

	@Test
	public void testSend() {

		try {
			MyProducer producer = new MyProducer();
			producer.sengMsg2MQ("测试发送数据");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

2.4.3 测试

图示
  • 查看ActiveMQ管理控制界面
图示

2.5 第三部分 创建消息消费者,消费消息

2.5.1 创建MyConsumer类

package com.xkt.consumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author lzx
 *
 */
public class Myconsumer {

	private ConnectionFactory factory;

	private Connection connection;

	private Session session;

	private Destination destination;

	private MessageConsumer consumer;

	private Message message;

	public void receiveFromMq() {

		try {
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");

			connection = factory.createConnection();
			connection.start();

			session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

			// 创建目的地, 目的地命名即队列命名, 消息消费者需要通过此命名访问对应的队列
			destination = session.createQueue("queue");

			// 创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地
			consumer = session.createConsumer(destination);

			// 6、读取消息
			message = consumer.receive(5000);

			// 7.提取文本
			if (null != message) {
				if (message instanceof TextMessage) {
					TextMessage tMsg = (TextMessage) message;
					String content = tMsg.getText();
					System.out.println("从列表中读取的是" + content);
				}
			}
			// 在手动确认机制下,消费完消息之后,必须手动确认,让消费的消息出队列否则,会出现重复消费的问题。
			message.acknowledge();

		} catch (JMSException e) {
			e.printStackTrace();
			System.out.println("读取失败");
		} finally {
			if (null != consumer) {
				try {
					consumer.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != session) {
				try {
					session.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != connection) {
				try {
					connection.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}

	}

}

2.5.2 修改测试类MessageTest,新增测试方法

	@Test
	public void testReceive() {

		try {
			Myconsumer consumer = new Myconsumer();
			consumer.receiveFromMq();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

2.5.3 测试

图示
  • 查看ActiveMQ管理控制界面
图示

  在前面的示例中,我们发现消费者每次只能消费一条消息。当队列中有多条消息的时候,我们需要多次运行消费者,才能消费完这些消息。很麻烦!!!!如何解决这个问题呢?我们将在后面的文章中给出。

版权说明:欢迎以任何方式进行转载,但请在转载后注明出处!

本文发表于2019年05月05日 22:00
(c)注:本文转载自https://my.oschina.net/u/4118479/blog/3045575,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2701 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

万稳万当,不如一默。任何一句话,你不说出来便是那句话的主人,你说了出来,便是那句话的奴隶。

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1