1、博客涵盖点
1.1 入门级rabbitMQ,了解五种默认的五种开发方案
1.2 使用ssm xml方式集成rabbitMq,五种模式+死信队列方案+jdk8
1.3 本博客项目码云地址:==》springboot+RabbitMQ+所有场景
1、fanout:发布/订阅模式
2、rounting:路由模式
3、topic:通配符模式
4、延迟队列之使用CustomExchange方案:需要安装延迟插件 点击==》安装详情
5、延迟队列之死信队列
2、场景
引言:(九天博客实时更新修改,即便你是复制到你的网站博客,也看不到每一篇博客的优化,不如关注我哈)
RabbitMQ 场景应用:
1、秒杀场景:高并发请求线程进入消息队列,根据先进先出原则,执行秒杀逻辑
2、延迟队列【两种方式 使用插件延迟 和 死信队列延迟】:
2.1:用户下订单,但是不支付,超过30分钟订单自动取消
2.2:用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。
2.3: 延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试
3、异步操作【异步操作比同步快】:
3.1:异步记录用户操作日志:用户的登陆app,发送到消息队列,监听记录用户的登陆时间、设备,来源ip等信息...
3.2:异步发送邮件:注册或者忘记密码的时候,通常某某网站会提示发送你邮箱一个链接,请点击。
3.3:异步发送短信验证码:用户忘记密码或者使用手机验证码登陆时,可以执行异步,没必要让程序串行完成所有操作最后才能接受到验证码
3、pom文件
springboot 2.0.X的依赖大家自己加上去吧,应该也适用于 springboot2.1.X。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--工具类-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.6.1</version>
</dependency>
4、application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.listener.simple.concurrency=3
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.acknowledge-mode=manual
5、java配置类
4.1 rabbitmq配置
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableRabbit
public class RabbbitConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtual_host;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtual_host);
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
//配置消费者监听的容器
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式手工确认
return factory;
}
@Bean
MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
4.2 Exchange配置
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @Author:MuJiuTian
* @Description:所有的exchange列表
* @Date: Created in 下午11:04 2019/8/19
*/
@Component
@Configuration
public class ExchangeConfig {
/**
* 创建类型:fanout交换机
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_exchange",true,false,null);
}
/**
* 创建类型:direct交换机
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct_exchange",true,false,null);
}
/**
* 创建类型:topic交换机
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("IExchange",true,false,null);
}
/**
* 创建类型:custom交换机,该交换机需要安装delay_rabbitmq插件才能运行
*/
@Bean
public CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("custom_exchange","x-delayed-message",true,false,args);
}
/**
* 创建类型:headers交换机
*/
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("headers_exchange",true,false,null);
}
/**
* 延迟:immediate交换机
*/
@Bean
public DirectExchange immediateExchange() {
return new DirectExchange("immediate_exchange");
}
/**
* 延迟:dlx_delay交换机
*/
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx_delay_exchange");
}
}
4.3 Queue配置
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @Author:MuJiuTian
* @Description: 所有的队列统一配置
* @Date: Created in 下午11:36 2019/8/19
*/
@Configuration
@Component
public class QueueConfig {
/**
* 针对fanout交换机的队列
*/
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout_queue_1");
}
/**
* 针对fanout交换机的队列
*/
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout_queue_2");
}
/**
* 针对direct交换机的队列
*/
@Bean
public Queue directQueue1() {
return new Queue("direct_queue_1");
}
/**
* 针对direct交换机的队列
*/
@Bean
public Queue directQueue2() {
return new Queue("direct_queue_2");
}
/**
* 针对topic交换机的队列
*/
@Bean
public Queue topicQueue1() {
return new Queue("topic_queue_1");
}
/**
* 针对topic交换机的队列
*/
@Bean
public Queue topicQueue2() {
return new Queue("topic_queue_2");
}
/**
* 延迟队列
*/
@Bean
public Queue delayQueue() {
return new Queue("delay_queue");
}
/**
* 死信队列方式中的立即消费队列
*/
@Bean
public Queue immediateQueue() {
return new Queue("immediate");
}
/**
* 死信队列方式中的延迟队列
*/
@Bean
public Queue dlxDelay() {
Map<String,Object> map = new HashMap<>();
//map.put("x-message-ttl",6000);,延迟时间,不过我们不需要在这里配置,在service设置就好了
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称
map.put("x-dead-letter-exchange","immediate_exchange");
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
map.put("x-dead-letter-routing-key","immediate_road");
return new Queue("dlx_delay_queue",true,false,false,map);
}
}
4.4 exchange与queue关系绑定配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author:MuJiuTian
* @Description: 所有的exchange与queue之间的routing key
* @Date: Created in 下午11:39 2019/8/19
*/
@Configuration
public class BindingConfig {
@Autowired
ExchangeConfig exchange;
@Autowired
QueueConfig queue;
@Bean
public Binding bindFanout1() {
return BindingBuilder.bind(queue.fanoutQueue1()).to(exchange.fanoutExchange());
}
@Bean
public Binding bindFanout2() {
return BindingBuilder.bind(queue.fanoutQueue2()).to(exchange.fanoutExchange());
}
@Bean
public Binding bindDirectOrange() {
return BindingBuilder.bind(queue.directQueue1()).to(exchange.directExchange()).with("orange");
}
@Bean
public Binding bindDirectBlack() {
return BindingBuilder.bind(queue.directQueue2()).to(exchange.directExchange()).with("black");
}
@Bean
public Binding bindDirectGreen() {
return BindingBuilder.bind(queue.directQueue2()).to(exchange.directExchange()).with("green");
}
@Bean
public Binding bindTopic1(){
Binding binding= BindingBuilder.bind(queue.topicQueue1()).to(exchange.topicExchange()).with("*.orange.*");
return binding;
}
@Bean
public Binding bindTopic2(){
Binding binding= BindingBuilder.bind(queue.topicQueue2()).to(exchange.topicExchange()).with("*.*.rabbit");
return binding;
}
@Bean
public Binding bindTopic3(){
Binding binding= BindingBuilder.bind(queue.topicQueue2()).to(exchange.topicExchange()).with("lazy.#");
return binding;
}
@Bean
public Binding bindCustom() {
return BindingBuilder.bind(queue.delayQueue()).to(exchange.customExchange()).with("delay_queue_road").noargs();
}
@Bean
public Binding immediate() {
return BindingBuilder.bind(queue.immediateQueue()).to(exchange.immediateExchange()).with("immediate_road");
}
@Bean
public Binding dlxDelay() {
return BindingBuilder.bind(queue.dlxDelay()).to(exchange.dlxExchange()).with("dlx_delay_road");
}
}
6、实体类
import java.io.Serializable;
/**
* @Author:MuJiuTian
* @Description:
* @Date: Created in 下午6:01 2019/8/19
*/
public class Mail implements Serializable {
private static final long serialVersionUID = -8140693840257585779L;
private String mailId;
private String country;
private Double weight;
public Mail() {
}
public Mail(String mailId, String country, double weight) {
this.mailId = mailId;
this.country = country;
this.weight = weight;
}
public String getMailId() {
return mailId;
}
public void setMailId(String mailId) {
this.mailId = mailId;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public double getWeight() {
return weight;
}
public void setWeight(double weight) {
this.weight = weight;
}
@Override
public String toString() {
return "Mail [mailId=" + mailId + ", country=" + country + ", weight="
+ weight + "]";
}
}
7、service层
public interface Producer {
void sendMessage(String exchange, String rountingKey, Object object);
void delayMessage(String exchange, String rountingKey, long time, Object object);
void dlxDelayMessage(String exchange, String rountingKey, long time, Object object);
void sendAndReceive(String exchange, String rountingKey, Object object);
}
import com.example.rabbit.service.Producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* @Author:MuJiuTian
* @Description:
* @Date: Created in 下午9:52 2019/8/19
*/
@Service
@Transactional
public class ProducerImpl implements Producer {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* @Author:MuJiuTian
* @Date:2019/8/20 下午4:10
* @Description:
*/
@Override
public void sendMessage(String exchange, String rountingKey, Object object) {
rabbitTemplate.convertAndSend(exchange,rountingKey,object);
}
/**
* @Author:MuJiuTian
* @Date:2019/8/20 下午4:41
* @Description:
*/
@Override
public void delayMessage(String exchange, String rountingKey, long time, Object object) {
rabbitTemplate.convertAndSend(exchange,rountingKey,object,message -> {
message.getMessageProperties().setHeader("x-delay",time);
return message;
});
}
@Override
public void dlxDelayMessage(String exchange, String rountingKey, long time, Object object) {
rabbitTemplate.convertAndSend(exchange, rountingKey, object, message -> {
message.getMessageProperties().setExpiration(time + "");
return message;
});
}
/**
* @Author:MuJiuTian
* @Date:2019/8/20 下午4:46
* @Description:发送与消费一步完成,前提是监听器业务逻辑处理没有任何异常
*/
@Override
public void sendAndReceive(String exchange, String rountingKey, Object object) {
rabbitTemplate.convertSendAndReceive(exchange,rountingKey,object);
}
}
8、controller层
import cn.hutool.core.date.DateUtil;
import com.example.rabbit.entity.Mail;
import com.example.rabbit.service.impl.ProducerImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.Random;
/**
* @Author:MuJiuTian
* @Description:
* @Date: Created in 下午10:23 2019/8/19
*/
@RestController
public class RabbitController {
@Autowired
ProducerImpl producer;
/**
* @Author:MuJiuTian
* @Date:2019/8/20 上午10:59
* @Description:使用fanout交换机模式测试rabbit,该模式没有routingKey
*/
@RequestMapping(value = "/fanout")
public void fanout() {
Mail mail = randomMail();
producer.sendMessage("fanout_exchange",null,mail);
}
/**
* @Author:MuJiuTian
* @Date:2019/8/20 上午11:00
* @Description:使用direct交换机模式测试rabbit,支持routingKey多路由模式
*/
@RequestMapping(value = "/direct")
public void direct() {
Mail mail = randomMail();
producer.sendMessage("direct_exchange","",mail);
}
/**
* @Author:MuJiuTian
* @Date:2019/8/20 上午11:00
* @Description:使用topic交换机模式测试rabbit,支持routingKey通配符模式
*/
@RequestMapping(value = "/topic")
@ResponseBody
public void topic() {
Mail mail = randomMail();
//producer.sendMessage("IExchange","lazy.mm",mail);
producer.sendMessage("IExchange","love.orange.hate",mail);
}
/**
* @Author:MuJiuTian
* @Date:2019/8/20 下午4:34
* @Description:延迟队列测试,毫秒为单位
*/
@GetMapping(value = "/delay")
@ResponseBody
public void delay() {
Mail mail = randomMail();
String now = DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss");
System.out.println("延迟发送时间:"+now+"数据:"+mail.toString());
producer.delayMessage("custom_exchange","delay_queue_road",3000,mail);
}
/**
* @Author:MuJiuTian
* @Date:2019/8/21 上午10:17
* @Description:延迟队列死信队列方式
*/
@GetMapping(value = "/dlxDelay")
public void dlxDelay() {
Mail mail = randomMail();
String now = DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss");
System.out.println("延迟发送时间:"+now+"数据:"+mail.toString());
producer.dlxDelayMessage("dlx_delay_exchange","dlx_delay_road",3000,mail);
}
/**
* 随机创建一个Mail实体对象,供接口测试
*/
public static Mail randomMail() {
Mail mail = new Mail();
mail.setMailId(new Random().nextInt(100)+"");
mail.setCountry("China");
mail.setWeight(new Random().nextDouble());
return mail;
}
}
9、监听器
import cn.hutool.core.date.DateUtil;
import com.example.rabbit.entity.Mail;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.DateFormat;
import java.util.Date;
import java.util.Map;
/**
* @Author:MuJiuTian
* @Description: Message包含 @Payload Object obj和@Headers Map<String,Object> heads两者
* @Payload @Headers @Header(name = "amqp_deliveryTag") @RabbitListener @RabbitHandler 总共5个注解的使用
* @Date: Created in 下午10:06 2019/8/19
*/
@Component
public class MyListener {
@Autowired
RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "fanout_queue_1")
public void fanoutQueue1(Mail mail) throws IOException {
System.out.println("fanout_queue_1队列取出消息"+mail.toString());
}
@RabbitListener(queues = "fanout_queue_2")
public void fanoutQueue2(Mail mail) throws IOException {
System.out.println("fanout_queue_2队列取出消息"+mail.toString());
}
@RabbitListener(queues = "direct_queue_1")
public void directQueue1(Mail mail) {
System.out.println("direct_queue_1队列取出消息"+mail.toString());
}
@RabbitListener(queues = "direct_queue_2")
public void directQueue2(Mail mail) {
System.out.println("direct_queue_2队列取出消息"+mail.toString());
}
@RabbitListener(queues = "topic_queue_1")
public void topicQueue1(Mail mail) {
System.out.println("从topic_queue_1取出消息"+mail.toString());
}
@RabbitListener(queues = "topic_queue_2")
public void topicQueue2(@Payload Mail mail, @Headers Map<String,Object> heads,Channel channel) throws IOException {
System.out.println("到达监听器,准备处理RabbitMQ业务逻辑,从topic_queue_2取出消息=="+mail.toString());
//第一步:业务逻辑处理,如活动秒杀
//第二部:业务逻辑处理成功之后,消费掉消息
channel.basicAck(Long.valueOf(heads.get("amqp_deliveryTag").toString()),true);
}
@RabbitListener(queues = "delay_queue")
public void delay(@Payload Mail mail, @Header(name = "amqp_deliveryTag") long deliveryTag,Channel channel) throws IOException {
System.out.println("延迟队列接受时间:"+ DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
//第一步:业务逻辑处理,如下订单内30分钟不支付情况下,自动取消订单,这里就不写了,主要体现rabbitmq的延迟功能
//第二部:业务逻辑处理成功之后,消费掉消息
channel.basicAck(deliveryTag,false);
}
@RabbitListener(queues = "immediate")
@RabbitHandler
public void immediate(@Payload Mail mail) {
System.out.println("此刻时间是:"+ DateUtil.format(new Date(), DateFormat.getDateTimeInstance())+"要处理的数据="+mail);
}
}
10、项目启动
项目启动后,打开localhost:15672,里面的exchange和queue会自动配置好,不过还是要检查一下exchange和queue有没有绑定关系好,都可以了进行测试,如下:
10.1 topic测试:http://localhost:8080/topic

10.2 延迟队列,使用CustomExchange测试:http://localhost:8080/delay

10.3 延迟队列,方式二,使用死信队列方式测试:http://localhost:8080/dlxDelay

喜欢我就关注我吧....嘻嘻嘻。