代码实现Redis异步任务


声明:本文转载自https://my.oschina.net/qixiaobo025/blog/1537626,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

上文说到Redis实现优先级队列Redis实现优先级队列

那么代码的实现呢也比较简单

接口概述

定义几个消息类型

package com.air.tqb.model;   import java.io.Serializable;   public class Message implements Serializable{     public Message(Distributable messageBean, Long timeStamp, String docType, Integer times, String from, String to) {         this.messageBean = messageBean;         this.timeStamp = timeStamp;         this.docType = docType;         this.times = times;         this.from = from;         this.to = to;     }       public Message() {     }       public Message(String docType) {         this.docType = docType;     }       public Message(Distributable messageBean, String docType) {         this.messageBean = messageBean;         this.docType = docType;     }       private static final long serialVersionUID = -3005565413488441986L;     private Distributable messageBean;     private Long timeStamp;     private String docType;     private Integer times;     private String from;     private String to;       public Distributable getMessageBean() {         return messageBean;     }       public void setMessageBean(Distributable messageBean) {         this.messageBean = messageBean;     }       public Long getTimeStamp() {         return timeStamp;     }       public void setTimeStamp(Long timeStamp) {         this.timeStamp = timeStamp;     }       public String getDocType() {         return docType;     }       public void setDocType(String docType) {         this.docType = docType;     }       public Integer getTimes() {         return times;     }       public void setTimes(Integer times) {         this.times = times;     }       public String getFrom() {         return from;     }       public void setFrom(String from) {         this.from = from;     }       public String getTo() {         return to;     }       public void setTo(String to) {         this.to = to;     }       @Override     public String toString() {         final StringBuilder sb = new StringBuilder("Message{");         sb.append("messageBean=").append(messageBean);         sb.append(", timeStamp=").append(timeStamp);         sb.append(", docType='").append(docType).append('\'');         sb.append(", times=").append(times);         sb.append(", from='").append(from).append('\'');         sb.append(", to='").append(to).append('\'');         sb.append('}');         return sb.toString();     } }
  1. messageBean为业务bean,对应必须要求为Distributable类型
  2. docType为业务类型,对应必须要求不可为空

定义如下接口

package com.air.tqb.message;   public interface MessageDispatcher {     void consume(); }

该接口主要完成任务的转发,会一直监听对应的队列

package com.air.tqb.message;   import com.air.tqb.model.Message;   public interface MessageProcessor {       boolean process(Message message);       boolean isInterestedIn(String docType); }

该接口表示对应processor处理业务逻辑以及对应需要关注对象

package com.air.tqb.message;   import com.air.tqb.model.Message;   public interface MessageSender {     void sendMessage(Message message);       void sendMessage(Message message, Priority priority);       void sendFailMessage(Message message);       void sendDeadMessage(Message message); }

该接口表示发送消息,分别对应发送接口 发送失败信息接口(处理失败)发送死信队列接口(无processsor处理)

package com.air.tqb.message;   public enum Priority {     high, middle, low; }

优先级枚举

实现

package com.air.tqb.message.redis;   import com.air.tqb.common.AppConstant; import com.air.tqb.helper.MessageHelper; import com.air.tqb.message.AbstartactMessageDispatcher; import com.air.tqb.message.Priority; import com.air.tqb.model.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.stereotype.Component;   import javax.annotation.PostConstruct; import java.util.List;   @Component public class RedisMessageDispatcher extends AbstartactMessageDispatcher {     @Autowired     private RedisConnectionFactory redisConnectionFactory;     @Autowired     private MessageHelper messageHelper;       private byte[][] messageQueueNames = new byte[Priority.values().length][];     private RedisSerializer redisSerializer = new JdkSerializationRedisSerializer();       @PostConstruct     public void init() {         int i = 0;         for (Priority priority : Priority.values()) {             messageQueueNames[i++] = redisSerializer.serialize(AppConstant.MESSAGE_QUEUE_PREFIX + priority.name());         }       }         @Override     public void consume() {         RedisConnection redisConnection = redisConnectionFactory.getConnection();         while (true) {             List<byte[]> rlts = redisConnection.bLPop(0, messageQueueNames);             if (rlts == null || rlts.isEmpty()) {                 //impossible                 logger.warn("blpop list null or empty");             } else {                 try {                     final Message message = (Message) redisSerializer.deserialize(rlts.get(1));                     if (messageHelper.canBeImported(message)) {                         submitNewTask((new Runnable() {                             @Override                             public void run() {                                 boolean rlt = messageHelper.doProcess(message);                                 if (!rlt) {                                     messageHelper.sendFailMessage(message);                                 }                             }                         }));                       } else {                         messageHelper.sendDeadMessage(message);                     }                 } catch (Exception ex) {                     logger.warn(ex.getMessage(), ex);                 }             }         }         }     }

利用Redis的blpop特性(阻塞)完成监听 

当处理失败后直接丢入failLetter,无人处理则丢入deadLetter

package com.air.tqb.helper;   import com.air.tqb.message.MessageProcessor; import com.air.tqb.message.MessageSender; import com.air.tqb.message.NoOpMessageProcessor; import com.air.tqb.message.Priority; import com.air.tqb.model.Message; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;   import javax.annotation.PostConstruct; import java.util.List; import java.util.concurrent.TimeUnit;   @Component public class MessageHelper {     @Autowired     private List<MessageProcessor> processorList = Lists.newArrayList();     @Autowired     private MessageSender messageSender;     private static final MessageProcessor NO_OP_MESSAGE_PROCESSOR = new NoOpMessageProcessor();     private LoadingCache<String, MessageProcessor> docTypeCache = null;         @PostConstruct     public void init() {         docTypeCache = CacheBuilder.newBuilder()                 .maximumSize(1000)                 .expireAfterAccess(100, TimeUnit.MINUTES)                 .build(                         new CacheLoader<String, MessageProcessor>() {                             @Override                             public MessageProcessor load(String key) throws Exception {                                 if (Strings.isNullOrEmpty(key)) return NO_OP_MESSAGE_PROCESSOR;                                 for (MessageProcessor processor : processorList) {                                     if (processor.isInterestedIn(key)) {                                         return processor;                                     }                                 }                                 return NO_OP_MESSAGE_PROCESSOR;                             }                         });     }         public boolean canBeImported(Message message) {         Preconditions.checkArgument(message != null && message.getDocType() != null);         MessageProcessor unchecked = docTypeCache.getUnchecked(message.getDocType());         return !unchecked.equals(NO_OP_MESSAGE_PROCESSOR);     }       public boolean doProcess(Message message) {         MessageProcessor unchecked = docTypeCache.getUnchecked(message.getDocType());         return unchecked.process(message);     }       public void sendMessage(Message message, Priority priority) {         messageSender.sendMessage(message, priority);     }       public void sendMessage(Message message) {         this.sendMessage(message, Priority.middle);     }         public void sendFailMessage(Message message) {         messageSender.sendFailMessage(message);     }       public void sendDeadMessage(Message message) {         messageSender.sendDeadMessage(message);     } }

对应消息发送&处理均在此

package com.air.tqb.message.redis;   import com.air.tqb.common.AppConstant; import com.air.tqb.message.MessageSender; import com.air.tqb.message.Priority; import com.air.tqb.model.Message; import com.alibaba.fastjson.JSON; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component;   @Component public class RedisMessageSender implements MessageSender {     @Autowired     @Qualifier(value = "redisTemplate")     private RedisTemplate template;       public void sendMessage(Message message, Priority priority) {         template.opsForList().rightPush(AppConstant.MESSAGE_QUEUE_PREFIX + priority.name(), message);     }       public void sendMessage(Message message) {         this.sendMessage(message, Priority.middle);     }         public void sendFailMessage(Message message) {         message.setTimes(message.getTimes() == null ? 1 : message.getTimes() + 1);         template.opsForList().rightPush(AppConstant.MESSAGE_FAILLETTER,message);     }       public void sendDeadMessage(Message message) {         template.opsForList().rightPush(AppConstant.MESSAGE_DEADLETTER, message);     } }

将对应信息发送到Redis的队列中

一个正常的VIN码关注Processor如下

package com.air.tqb.message;   import com.air.tqb.model.Message; import org.springframework.stereotype.Component;   @Component public class VinMessageProcessor implements MessageProcessor {     @Override     public boolean process(Message message) {         System.out.println(message);         return true;     }       @Override     public boolean isInterestedIn(String docType) {         return "VIN".equals(docType);     } }

本文发表于2017年09月15日 08:35
(c)注:本文转载自https://my.oschina.net/qixiaobo025/blog/1537626,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1668 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1