上文说到Redis实现优先级队列Redis实现优先级队列
  那么代码的实现呢也比较简单
   接口概述
  定义几个消息类型
  package com.air.tqb.model;   import java.io.Serializable;   public class Message implements Serializable{     public Message(Distributable messageBean, Long timeStamp, String docType, Integer times, String from, String to) {         this.messageBean = messageBean;         this.timeStamp = timeStamp;         this.docType = docType;         this.times = times;         this.from = from;         this.to = to;     }       public Message() {     }       public Message(String docType) {         this.docType = docType;     }       public Message(Distributable messageBean, String docType) {         this.messageBean = messageBean;         this.docType = docType;     }       private static final long serialVersionUID = -3005565413488441986L;     private Distributable messageBean;     private Long timeStamp;     private String docType;     private Integer times;     private String from;     private String to;       public Distributable getMessageBean() {         return messageBean;     }       public void setMessageBean(Distributable messageBean) {         this.messageBean = messageBean;     }       public Long getTimeStamp() {         return timeStamp;     }       public void setTimeStamp(Long timeStamp) {         this.timeStamp = timeStamp;     }       public String getDocType() {         return docType;     }       public void setDocType(String docType) {         this.docType = docType;     }       public Integer getTimes() {         return times;     }       public void setTimes(Integer times) {         this.times = times;     }       public String getFrom() {         return from;     }       public void setFrom(String from) {         this.from = from;     }       public String getTo() {         return to;     }       public void setTo(String to) {         this.to = to;     }       @Override     public String toString() {         final StringBuilder sb = new StringBuilder("Message{");         sb.append("messageBean=").append(messageBean);         sb.append(", timeStamp=").append(timeStamp);         sb.append(", docType='").append(docType).append('\'');         sb.append(", times=").append(times);         sb.append(", from='").append(from).append('\'');         sb.append(", to='").append(to).append('\'');         sb.append('}');         return sb.toString();     } }
     - messageBean为业务bean,对应必须要求为Distributable类型
- docType为业务类型,对应必须要求不可为空
定义如下接口
  package com.air.tqb.message;   public interface MessageDispatcher {     void consume(); }
  该接口主要完成任务的转发,会一直监听对应的队列
  package com.air.tqb.message;   import com.air.tqb.model.Message;   public interface MessageProcessor {       boolean process(Message message);       boolean isInterestedIn(String docType); }
  该接口表示对应processor处理业务逻辑以及对应需要关注对象
  package com.air.tqb.message;   import com.air.tqb.model.Message;   public interface MessageSender {     void sendMessage(Message message);       void sendMessage(Message message, Priority priority);       void sendFailMessage(Message message);       void sendDeadMessage(Message message); }
  该接口表示发送消息,分别对应发送接口 发送失败信息接口(处理失败)发送死信队列接口(无processsor处理)
  package com.air.tqb.message;   public enum Priority {     high, middle, low; }
  优先级枚举
   实现
  package com.air.tqb.message.redis;   import com.air.tqb.common.AppConstant; import com.air.tqb.helper.MessageHelper; import com.air.tqb.message.AbstartactMessageDispatcher; import com.air.tqb.message.Priority; import com.air.tqb.model.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.stereotype.Component;   import javax.annotation.PostConstruct; import java.util.List;   @Component public class RedisMessageDispatcher extends AbstartactMessageDispatcher {     @Autowired     private RedisConnectionFactory redisConnectionFactory;     @Autowired     private MessageHelper messageHelper;       private byte[][] messageQueueNames = new byte[Priority.values().length][];     private RedisSerializer redisSerializer = new JdkSerializationRedisSerializer();       @PostConstruct     public void init() {         int i = 0;         for (Priority priority : Priority.values()) {             messageQueueNames[i++] = redisSerializer.serialize(AppConstant.MESSAGE_QUEUE_PREFIX + priority.name());         }       }         @Override     public void consume() {         RedisConnection redisConnection = redisConnectionFactory.getConnection();         while (true) {             List<byte[]> rlts = redisConnection.bLPop(0, messageQueueNames);             if (rlts == null || rlts.isEmpty()) {                 //impossible                 logger.warn("blpop list null or empty");             } else {                 try {                     final Message message = (Message) redisSerializer.deserialize(rlts.get(1));                     if (messageHelper.canBeImported(message)) {                         submitNewTask((new Runnable() {                             @Override                             public void run() {                                 boolean rlt = messageHelper.doProcess(message);                                 if (!rlt) {                                     messageHelper.sendFailMessage(message);                                 }                             }                         }));                       } else {                         messageHelper.sendDeadMessage(message);                     }                 } catch (Exception ex) {                     logger.warn(ex.getMessage(), ex);                 }             }         }         }     }
  利用Redis的blpop特性(阻塞)完成监听 
  
  当处理失败后直接丢入failLetter,无人处理则丢入deadLetter
  package com.air.tqb.helper;   import com.air.tqb.message.MessageProcessor; import com.air.tqb.message.MessageSender; import com.air.tqb.message.NoOpMessageProcessor; import com.air.tqb.message.Priority; import com.air.tqb.model.Message; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;   import javax.annotation.PostConstruct; import java.util.List; import java.util.concurrent.TimeUnit;   @Component public class MessageHelper {     @Autowired     private List<MessageProcessor> processorList = Lists.newArrayList();     @Autowired     private MessageSender messageSender;     private static final MessageProcessor NO_OP_MESSAGE_PROCESSOR = new NoOpMessageProcessor();     private LoadingCache<String, MessageProcessor> docTypeCache = null;         @PostConstruct     public void init() {         docTypeCache = CacheBuilder.newBuilder()                 .maximumSize(1000)                 .expireAfterAccess(100, TimeUnit.MINUTES)                 .build(                         new CacheLoader<String, MessageProcessor>() {                             @Override                             public MessageProcessor load(String key) throws Exception {                                 if (Strings.isNullOrEmpty(key)) return NO_OP_MESSAGE_PROCESSOR;                                 for (MessageProcessor processor : processorList) {                                     if (processor.isInterestedIn(key)) {                                         return processor;                                     }                                 }                                 return NO_OP_MESSAGE_PROCESSOR;                             }                         });     }         public boolean canBeImported(Message message) {         Preconditions.checkArgument(message != null && message.getDocType() != null);         MessageProcessor unchecked = docTypeCache.getUnchecked(message.getDocType());         return !unchecked.equals(NO_OP_MESSAGE_PROCESSOR);     }       public boolean doProcess(Message message) {         MessageProcessor unchecked = docTypeCache.getUnchecked(message.getDocType());         return unchecked.process(message);     }       public void sendMessage(Message message, Priority priority) {         messageSender.sendMessage(message, priority);     }       public void sendMessage(Message message) {         this.sendMessage(message, Priority.middle);     }         public void sendFailMessage(Message message) {         messageSender.sendFailMessage(message);     }       public void sendDeadMessage(Message message) {         messageSender.sendDeadMessage(message);     } }
  对应消息发送&处理均在此
  package com.air.tqb.message.redis;   import com.air.tqb.common.AppConstant; import com.air.tqb.message.MessageSender; import com.air.tqb.message.Priority; import com.air.tqb.model.Message; import com.alibaba.fastjson.JSON; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component;   @Component public class RedisMessageSender implements MessageSender {     @Autowired     @Qualifier(value = "redisTemplate")     private RedisTemplate template;       public void sendMessage(Message message, Priority priority) {         template.opsForList().rightPush(AppConstant.MESSAGE_QUEUE_PREFIX + priority.name(), message);     }       public void sendMessage(Message message) {         this.sendMessage(message, Priority.middle);     }         public void sendFailMessage(Message message) {         message.setTimes(message.getTimes() == null ? 1 : message.getTimes() + 1);         template.opsForList().rightPush(AppConstant.MESSAGE_FAILLETTER,message);     }       public void sendDeadMessage(Message message) {         template.opsForList().rightPush(AppConstant.MESSAGE_DEADLETTER, message);     } }
  将对应信息发送到Redis的队列中
  
  一个正常的VIN码关注Processor如下
  package com.air.tqb.message;   import com.air.tqb.model.Message; import org.springframework.stereotype.Component;   @Component public class VinMessageProcessor implements MessageProcessor {     @Override     public boolean process(Message message) {         System.out.println(message);         return true;     }       @Override     public boolean isInterestedIn(String docType) {         return "VIN".equals(docType);     } }