崛起于Springboot2.0.X之整合RabbitMQ企业所有场景开发(46)


声明:本文转载自https://my.oschina.net/mdxlcj/blog/3096549,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

1、博客涵盖点

   1.1 入门级rabbitMQ,了解五种默认的五种开发方案

   1.2 使用ssm xml方式集成rabbitMq,五种模式+死信队列方案+jdk8

   1.3 本博客项目码云地址:==》springboot+RabbitMQ+所有场景

    1、fanout:发布/订阅模式
    2、rounting:路由模式
    3、topic:通配符模式
    4、延迟队列之使用CustomExchange方案:需要安装延迟插件 点击==》安装详情
    5、延迟队列之死信队列

2、场景

引言:(九天博客实时更新修改,即便你是复制到你的网站博客,也看不到每一篇博客的优化,不如关注我哈)
RabbitMQ 场景应用:
1、秒杀场景:高并发请求线程进入消息队列,根据先进先出原则,执行秒杀逻辑
2、延迟队列【两种方式 使用插件延迟 和 死信队列延迟】:
   2.1:用户下订单,但是不支付,超过30分钟订单自动取消
   2.2:用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。
   2.3: 延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试
3、异步操作【异步操作比同步快】:
    3.1:异步记录用户操作日志:用户的登陆app,发送到消息队列,监听记录用户的登陆时间、设备,来源ip等信息...
    3.2:异步发送邮件:注册或者忘记密码的时候,通常某某网站会提示发送你邮箱一个链接,请点击。
    3.3:异步发送短信验证码:用户忘记密码或者使用手机验证码登陆时,可以执行异步,没必要让程序串行完成所有操作最后才能接受到验证码

3、pom文件

     springboot 2.0.X的依赖大家自己加上去吧,应该也适用于 springboot2.1.X。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!--工具类-->
<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>4.6.1</version>
</dependency>

4、application.properties

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.listener.simple.concurrency=3
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.acknowledge-mode=manual

5、java配置类

    4.1 rabbitmq配置

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
@EnableRabbit
public class RabbbitConfig {

    @Value("${spring.rabbitmq.host}")
    public String host;

    @Value("${spring.rabbitmq.port}")
    public int port;

    @Value("${spring.rabbitmq.username}")
    public String username;

    @Value("${spring.rabbitmq.password}")
    public String password;

    @Value("${spring.rabbitmq.virtual-host}")
    public String virtual_host;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtual_host);
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    //配置消费者监听的容器
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式手工确认
        return factory;
    }

    @Bean
    MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

    4.2 Exchange配置

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author:MuJiuTian
 * @Description:所有的exchange列表
 * @Date: Created in 下午11:04 2019/8/19
 */
@Component
@Configuration
public class ExchangeConfig {

    /**
     * 创建类型:fanout交换机
     */

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_exchange",true,false,null);
    }

    /**
     * 创建类型:direct交换机
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct_exchange",true,false,null);
    }

    /**
     * 创建类型:topic交换机
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("IExchange",true,false,null);
    }

    /**
     * 创建类型:custom交换机,该交换机需要安装delay_rabbitmq插件才能运行
     */
    @Bean
    public CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("custom_exchange","x-delayed-message",true,false,args);
    }

    /**
     * 创建类型:headers交换机
     */
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange("headers_exchange",true,false,null);
    }

    /**
     * 延迟:immediate交换机
     */
    @Bean
    public DirectExchange immediateExchange() {
        return new DirectExchange("immediate_exchange");
    }

    /**
     * 延迟:dlx_delay交换机
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("dlx_delay_exchange");
    }

}

    4.3 Queue配置

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author:MuJiuTian
 * @Description: 所有的队列统一配置
 * @Date: Created in 下午11:36 2019/8/19
 */
@Configuration
@Component
public class QueueConfig {
    /**
     * 针对fanout交换机的队列
     */
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout_queue_1");
    }

    /**
     * 针对fanout交换机的队列
     */
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout_queue_2");
    }

    /**
     * 针对direct交换机的队列
     */
    @Bean
    public Queue directQueue1() {
        return new Queue("direct_queue_1");
    }

    /**
     * 针对direct交换机的队列
     */
    @Bean
    public Queue directQueue2() {
        return new Queue("direct_queue_2");
    }

    /**
     * 针对topic交换机的队列
     */
    @Bean
    public Queue topicQueue1() {
        return new Queue("topic_queue_1");
    }

    /**
     * 针对topic交换机的队列
     */
    @Bean
    public Queue topicQueue2() {
        return new Queue("topic_queue_2");
    }
    /**
     * 延迟队列
     */
    @Bean
    public Queue delayQueue() {
        return new Queue("delay_queue");
    }

    /**
     * 死信队列方式中的立即消费队列
     */
    @Bean
    public Queue immediateQueue() {
        return new Queue("immediate");
    }

    /**
     * 死信队列方式中的延迟队列
     */
    @Bean
    public Queue dlxDelay() {
        Map<String,Object> map = new HashMap<>();
        //map.put("x-message-ttl",6000);,延迟时间,不过我们不需要在这里配置,在service设置就好了
        // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称
        map.put("x-dead-letter-exchange","immediate_exchange");
        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
        map.put("x-dead-letter-routing-key","immediate_road");
        return new Queue("dlx_delay_queue",true,false,false,map);
    }
}

    4.4 exchange与queue关系绑定配置

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author:MuJiuTian
 * @Description: 所有的exchange与queue之间的routing key
 * @Date: Created in 下午11:39 2019/8/19
 */
@Configuration
public class BindingConfig {

    @Autowired
    ExchangeConfig exchange;

    @Autowired
    QueueConfig queue;

    @Bean
    public Binding bindFanout1() {
        return BindingBuilder.bind(queue.fanoutQueue1()).to(exchange.fanoutExchange());
    }

    @Bean
    public Binding bindFanout2() {
        return BindingBuilder.bind(queue.fanoutQueue2()).to(exchange.fanoutExchange());
    }

    @Bean
    public Binding bindDirectOrange() {
        return BindingBuilder.bind(queue.directQueue1()).to(exchange.directExchange()).with("orange");
    }

    @Bean
    public Binding bindDirectBlack() {
        return BindingBuilder.bind(queue.directQueue2()).to(exchange.directExchange()).with("black");
    }

    @Bean
    public Binding bindDirectGreen() {
        return BindingBuilder.bind(queue.directQueue2()).to(exchange.directExchange()).with("green");
    }

    @Bean
    public Binding bindTopic1(){
        Binding binding= BindingBuilder.bind(queue.topicQueue1()).to(exchange.topicExchange()).with("*.orange.*");
        return binding;
    }

    @Bean
    public Binding bindTopic2(){
        Binding binding= BindingBuilder.bind(queue.topicQueue2()).to(exchange.topicExchange()).with("*.*.rabbit");
        return binding;
    }

    @Bean
    public Binding bindTopic3(){
        Binding binding= BindingBuilder.bind(queue.topicQueue2()).to(exchange.topicExchange()).with("lazy.#");
        return binding;
    }

    @Bean
    public Binding bindCustom() {
        return BindingBuilder.bind(queue.delayQueue()).to(exchange.customExchange()).with("delay_queue_road").noargs();
    }

    @Bean
    public Binding immediate() {
        return BindingBuilder.bind(queue.immediateQueue()).to(exchange.immediateExchange()).with("immediate_road");
    }

    @Bean
    public Binding dlxDelay() {
        return BindingBuilder.bind(queue.dlxDelay()).to(exchange.dlxExchange()).with("dlx_delay_road");
    }
}

6、实体类

import java.io.Serializable;

/**
 * @Author:MuJiuTian
 * @Description:
 * @Date: Created in 下午6:01 2019/8/19
 */
public class Mail implements Serializable {
    private static final long serialVersionUID = -8140693840257585779L;
    private String mailId;
    private String country;
    private Double weight;


    public Mail() {
    }

    public Mail(String mailId, String country, double weight) {
        this.mailId = mailId;
        this.country = country;
        this.weight = weight;
    }

    public String getMailId() {
        return mailId;
    }

    public void setMailId(String mailId) {
        this.mailId = mailId;
    }

    public String getCountry() {
        return country;
    }

    public void setCountry(String country) {
        this.country = country;
    }

    public double getWeight() {
        return weight;
    }

    public void setWeight(double weight) {
        this.weight = weight;
    }

    @Override
    public String toString() {
        return "Mail [mailId=" + mailId + ", country=" + country + ", weight="
                + weight + "]";
    }
}

7、service层

public interface Producer {
    void sendMessage(String exchange, String rountingKey, Object object);

    void delayMessage(String exchange, String rountingKey, long time, Object object);

    void dlxDelayMessage(String exchange, String rountingKey, long time, Object object);

    void sendAndReceive(String exchange, String rountingKey, Object object);
}
import com.example.rabbit.service.Producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
 * @Author:MuJiuTian
 * @Description:
 * @Date: Created in 下午9:52 2019/8/19
 */
@Service
@Transactional
public class ProducerImpl implements Producer {

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * @Author:MuJiuTian
     * @Date:2019/8/20 下午4:10
     * @Description:
     */
    @Override
    public void sendMessage(String exchange, String rountingKey, Object object) {
       rabbitTemplate.convertAndSend(exchange,rountingKey,object);
    }

    /**
     * @Author:MuJiuTian
     * @Date:2019/8/20 下午4:41
     * @Description:
     */
    @Override
    public void delayMessage(String exchange, String rountingKey, long time, Object object) {
        rabbitTemplate.convertAndSend(exchange,rountingKey,object,message -> {
            message.getMessageProperties().setHeader("x-delay",time);
            return message;
        });
    }

    @Override
    public void dlxDelayMessage(String exchange, String rountingKey, long time, Object object) {
        rabbitTemplate.convertAndSend(exchange, rountingKey, object, message -> {
            message.getMessageProperties().setExpiration(time + "");
            return message;
        });
    }

    /**
     * @Author:MuJiuTian
     * @Date:2019/8/20 下午4:46
     * @Description:发送与消费一步完成,前提是监听器业务逻辑处理没有任何异常
     */
    @Override
    public void sendAndReceive(String exchange, String rountingKey, Object object) {
        rabbitTemplate.convertSendAndReceive(exchange,rountingKey,object);
    }

}

8、controller层

import cn.hutool.core.date.DateUtil;
import com.example.rabbit.entity.Mail;
import com.example.rabbit.service.impl.ProducerImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.Random;

/**
 * @Author:MuJiuTian
 * @Description:
 * @Date: Created in 下午10:23 2019/8/19
 */
@RestController
public class RabbitController {
    @Autowired
    ProducerImpl producer;

    /**
     * @Author:MuJiuTian
     * @Date:2019/8/20 上午10:59
     * @Description:使用fanout交换机模式测试rabbit,该模式没有routingKey
     */
    @RequestMapping(value = "/fanout")
    public void fanout() {
        Mail mail = randomMail();
        producer.sendMessage("fanout_exchange",null,mail);
    }

    /**
     * @Author:MuJiuTian
     * @Date:2019/8/20 上午11:00
     * @Description:使用direct交换机模式测试rabbit,支持routingKey多路由模式
     */
    @RequestMapping(value = "/direct")
    public void direct() {
        Mail mail = randomMail();
        producer.sendMessage("direct_exchange","",mail);
    }

    /**
     * @Author:MuJiuTian
     * @Date:2019/8/20 上午11:00
     * @Description:使用topic交换机模式测试rabbit,支持routingKey通配符模式
     */
    @RequestMapping(value = "/topic")
    @ResponseBody
    public void topic() {
        Mail mail = randomMail();
        //producer.sendMessage("IExchange","lazy.mm",mail);
        producer.sendMessage("IExchange","love.orange.hate",mail);
    }

    /**
     * @Author:MuJiuTian
     * @Date:2019/8/20 下午4:34
     * @Description:延迟队列测试,毫秒为单位
     */
    @GetMapping(value = "/delay")
    @ResponseBody
    public void delay() {
        Mail mail  = randomMail();
        String now = DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss");
        System.out.println("延迟发送时间:"+now+"数据:"+mail.toString());
        producer.delayMessage("custom_exchange","delay_queue_road",3000,mail);
    }

    /**
     * @Author:MuJiuTian
     * @Date:2019/8/21 上午10:17
     * @Description:延迟队列死信队列方式
     */
    @GetMapping(value = "/dlxDelay")
    public void dlxDelay() {
        Mail mail  = randomMail();
        String now = DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss");
        System.out.println("延迟发送时间:"+now+"数据:"+mail.toString());
        producer.dlxDelayMessage("dlx_delay_exchange","dlx_delay_road",3000,mail);
    }

    /**
     * 随机创建一个Mail实体对象,供接口测试
     */
    public static Mail randomMail() {
        Mail mail = new Mail();
        mail.setMailId(new Random().nextInt(100)+"");
        mail.setCountry("China");
        mail.setWeight(new Random().nextDouble());
        return mail;
    }

}

9、监听器

import cn.hutool.core.date.DateUtil;
import com.example.rabbit.entity.Mail;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.text.DateFormat;
import java.util.Date;
import java.util.Map;

/**
 * @Author:MuJiuTian
 * @Description: Message包含 @Payload Object obj和@Headers Map<String,Object> heads两者
 * @Payload @Headers @Header(name = "amqp_deliveryTag") @RabbitListener @RabbitHandler 总共5个注解的使用
 * @Date: Created in 下午10:06 2019/8/19
 */
@Component
public class MyListener {

    @Autowired
    RabbitTemplate rabbitTemplate;


    @RabbitListener(queues = "fanout_queue_1")
    public void fanoutQueue1(Mail mail) throws IOException {
        System.out.println("fanout_queue_1队列取出消息"+mail.toString());
    }

    @RabbitListener(queues = "fanout_queue_2")
    public void fanoutQueue2(Mail mail) throws IOException {
        System.out.println("fanout_queue_2队列取出消息"+mail.toString());
    }

    @RabbitListener(queues = "direct_queue_1")
    public void directQueue1(Mail mail) {
        System.out.println("direct_queue_1队列取出消息"+mail.toString());
    }

    @RabbitListener(queues = "direct_queue_2")
    public void directQueue2(Mail mail) {
        System.out.println("direct_queue_2队列取出消息"+mail.toString());
    }

    @RabbitListener(queues = "topic_queue_1")
    public void topicQueue1(Mail mail) {
        System.out.println("从topic_queue_1取出消息"+mail.toString());
    }

    @RabbitListener(queues = "topic_queue_2")
    public void topicQueue2(@Payload Mail mail, @Headers Map<String,Object> heads,Channel channel) throws IOException {
        System.out.println("到达监听器,准备处理RabbitMQ业务逻辑,从topic_queue_2取出消息=="+mail.toString());
        //第一步:业务逻辑处理,如活动秒杀

        //第二部:业务逻辑处理成功之后,消费掉消息
        channel.basicAck(Long.valueOf(heads.get("amqp_deliveryTag").toString()),true);
    }

    @RabbitListener(queues = "delay_queue")
    public void delay(@Payload Mail mail, @Header(name = "amqp_deliveryTag") long deliveryTag,Channel channel) throws IOException {
        System.out.println("延迟队列接受时间:"+ DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss"));

        //第一步:业务逻辑处理,如下订单内30分钟不支付情况下,自动取消订单,这里就不写了,主要体现rabbitmq的延迟功能

        //第二部:业务逻辑处理成功之后,消费掉消息
        channel.basicAck(deliveryTag,false);
    }

    @RabbitListener(queues = "immediate")
    @RabbitHandler
    public void immediate(@Payload Mail mail) {
        System.out.println("此刻时间是:"+ DateUtil.format(new Date(), DateFormat.getDateTimeInstance())+"要处理的数据="+mail);
    }
}

10、项目启动

    项目启动后,打开localhost:15672,里面的exchange和queue会自动配置好,不过还是要检查一下exchange和queue有没有绑定关系好,都可以了进行测试,如下:

    10.1 topic测试:http://localhost:8080/topic

    10.2 延迟队列,使用CustomExchange测试:http://localhost:8080/delay

    10.3 延迟队列,方式二,使用死信队列方式测试:http://localhost:8080/dlxDelay

喜欢我就关注我吧....嘻嘻嘻。        

 

本文发表于2019年08月23日 12:00
(c)注:本文转载自https://my.oschina.net/mdxlcj/blog/3096549,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1610 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1