
今天给大家分享的是ActiveMQ,如有不足,敬请指教。
那么我们必须知道ActiveMQ是什么。
一、ActiveMQ简介
1.1 ActiveMQ是什么
- ActiveMQ是一个消息队列应用服务器。支持JMS规范。
1.1.1 JMS概述
- 全称:Java Message Service ,即为Java消息服务,是一套java消息服务的API标准。(标准即接口)
- 实现了JMS标准的系统,称之为JMS Provider。
1.1.2 消息队列
1.1.2.1 概念
- 消息队列是在消息的传输过程中保存消息的容器,提供一种不同进程或者同一进程不同线程直接通讯的方式
图示 |
 |
- Producer:消息生产者,负责产生和发送消息到 Broker;
- Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
- Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;
1.2 ActiveMQ能做什么
- 实现两个不同应用(程序)之间的消息通讯。
- 实现同一个应用,不同模块之间的消息通讯。
1.3 ActiveMQ下载
1.4 ActiveMQ主要特点
- 支持多语言、多协议客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
- 对Spring的支持,ActiveMQ可以很容易整合到Spring的系统里面去。
- 支持高可用、高性能的集群模式
二、示例
2.1 需求
2.2 配置步骤说明
- 搭建ActiveMQ消息服务器(略)。
- 创建一个java项目。
- 创建消息生产者,发送消息。
- 创建消息消费者,接收消息
2.3 第一部分 创建java项目,导入jar包
图示 |
 |
2.4 第二部分 创建消息生成者,发送消息
2.4.1 创建MyProducer类,定义sengMsg2MQ方法
package com.xkt.producer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* @author lzx
*
*/
public class MyProducer {
// 定义链接工厂
private ConnectionFactory factory;
// 定义链接
private Connection connection;
// 定义会话
private Session session;
// 定义目的地
private Destination destination;
// 定义消息
private Message message;
// 定义消息生生产者
private MessageProducer producer;
public void sengMsg2MQ(String msg) {
try {
/*
* 1、创建链接工厂
*
* ActiveMQConnectionFactory(userName, password, brokerURL)
*
* userName:用户名 默认admin password:密码 默认admin brokerURL:消息服务中心地址
* tcp://0.0.0.0:61616 基于tcp协议
*/
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");
// 2.创建链接
connection = factory.createConnection();
// 开启链接
connection.start();
/*
* 3、创建会话
*
* createSession(transacted, acknowledgeMode)
*
* transacted:是否使用事物 true|false true 表示使用事物,每次对消息进行读写之后,要提交事物。如果使用了事物,则消息确认机制失效
* false 表示不使用事物
*
* acknowledgeMode: 消息确认机制 Session.AUTO_ACKNOWLEDGE -
* 自动确认消息机制,一旦读取到消息,则消费成功,消息出队列,避免重复消费 Session.CLIENT_ACKNOWLEDGE -
* 客户端确认消息机制,手动确认,即消费了消息成功之后,再确认 Session.DUPS_OK_ACKNOWLEDGE -
* 有副本的客户端确认消息机制。集群模式下
*
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建队列
destination = session.createQueue("test-mq");
// 5.创建消息对象
message = session.createTextMessage(msg);
// 6.创建消息生产者
producer = session.createProducer(destination);
// 7.发送消息
producer.send(message);
// session.commit();
System.out.println("消息发送成功");
} catch (JMSException e) {
e.printStackTrace();
System.out.println("消息发送失败");
} finally {
// 回收消息发送者资源
if (null != producer) {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
// 回收会话资源
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
// 回收链接资源
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
2.4.2 创建一个测试类
package com.xkt.test;
import org.junit.Test;
import com.xkt.consumer.Myconsumer;
import com.xkt.producer.MyProducer;
/**
* @author lzx
*
*/
public class MessageTest {
@Test
public void testSend() {
try {
MyProducer producer = new MyProducer();
producer.sengMsg2MQ("测试发送数据");
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.4.3 测试
图示 |
 |
图示 |
 |
2.5 第三部分 创建消息消费者,消费消息
2.5.1 创建MyConsumer类
package com.xkt.consumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* @author lzx
*
*/
public class Myconsumer {
private ConnectionFactory factory;
private Connection connection;
private Session session;
private Destination destination;
private MessageConsumer consumer;
private Message message;
public void receiveFromMq() {
try {
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 创建目的地, 目的地命名即队列命名, 消息消费者需要通过此命名访问对应的队列
destination = session.createQueue("queue");
// 创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地
consumer = session.createConsumer(destination);
// 6、读取消息
message = consumer.receive(5000);
// 7.提取文本
if (null != message) {
if (message instanceof TextMessage) {
TextMessage tMsg = (TextMessage) message;
String content = tMsg.getText();
System.out.println("从列表中读取的是" + content);
}
}
// 在手动确认机制下,消费完消息之后,必须手动确认,让消费的消息出队列否则,会出现重复消费的问题。
message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
System.out.println("读取失败");
} finally {
if (null != consumer) {
try {
consumer.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != session) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
2.5.2 修改测试类MessageTest,新增测试方法
@Test
public void testReceive() {
try {
Myconsumer consumer = new Myconsumer();
consumer.receiveFromMq();
} catch (Exception e) {
e.printStackTrace();
}
}
2.5.3 测试
图示 |
 |
图示 |
 |
在前面的示例中,我们发现消费者每次只能消费一条消息。当队列中有多条消息的时候,我们需要多次运行消费者,才能消费完这些消息。很麻烦!!!!如何解决这个问题呢?我们将在后面的文章中给出。
版权说明:欢迎以任何方式进行转载,但请在转载后注明出处!