上文说到Redis实现优先级队列Redis实现优先级队列
那么代码的实现呢也比较简单
接口概述
定义几个消息类型
package com.air.tqb.model; import java.io.Serializable; public class Message implements Serializable{ public Message(Distributable messageBean, Long timeStamp, String docType, Integer times, String from, String to) { this.messageBean = messageBean; this.timeStamp = timeStamp; this.docType = docType; this.times = times; this.from = from; this.to = to; } public Message() { } public Message(String docType) { this.docType = docType; } public Message(Distributable messageBean, String docType) { this.messageBean = messageBean; this.docType = docType; } private static final long serialVersionUID = -3005565413488441986L; private Distributable messageBean; private Long timeStamp; private String docType; private Integer times; private String from; private String to; public Distributable getMessageBean() { return messageBean; } public void setMessageBean(Distributable messageBean) { this.messageBean = messageBean; } public Long getTimeStamp() { return timeStamp; } public void setTimeStamp(Long timeStamp) { this.timeStamp = timeStamp; } public String getDocType() { return docType; } public void setDocType(String docType) { this.docType = docType; } public Integer getTimes() { return times; } public void setTimes(Integer times) { this.times = times; } public String getFrom() { return from; } public void setFrom(String from) { this.from = from; } public String getTo() { return to; } public void setTo(String to) { this.to = to; } @Override public String toString() { final StringBuilder sb = new StringBuilder("Message{"); sb.append("messageBean=").append(messageBean); sb.append(", timeStamp=").append(timeStamp); sb.append(", docType='").append(docType).append('\''); sb.append(", times=").append(times); sb.append(", from='").append(from).append('\''); sb.append(", to='").append(to).append('\''); sb.append('}'); return sb.toString(); } }
- messageBean为业务bean,对应必须要求为Distributable类型
- docType为业务类型,对应必须要求不可为空
定义如下接口
package com.air.tqb.message; public interface MessageDispatcher { void consume(); }
该接口主要完成任务的转发,会一直监听对应的队列
package com.air.tqb.message; import com.air.tqb.model.Message; public interface MessageProcessor { boolean process(Message message); boolean isInterestedIn(String docType); }
该接口表示对应processor处理业务逻辑以及对应需要关注对象
package com.air.tqb.message; import com.air.tqb.model.Message; public interface MessageSender { void sendMessage(Message message); void sendMessage(Message message, Priority priority); void sendFailMessage(Message message); void sendDeadMessage(Message message); }
该接口表示发送消息,分别对应发送接口 发送失败信息接口(处理失败)发送死信队列接口(无processsor处理)
package com.air.tqb.message; public enum Priority { high, middle, low; }
优先级枚举
实现
package com.air.tqb.message.redis; import com.air.tqb.common.AppConstant; import com.air.tqb.helper.MessageHelper; import com.air.tqb.message.AbstartactMessageDispatcher; import com.air.tqb.message.Priority; import com.air.tqb.model.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.List; @Component public class RedisMessageDispatcher extends AbstartactMessageDispatcher { @Autowired private RedisConnectionFactory redisConnectionFactory; @Autowired private MessageHelper messageHelper; private byte[][] messageQueueNames = new byte[Priority.values().length][]; private RedisSerializer redisSerializer = new JdkSerializationRedisSerializer(); @PostConstruct public void init() { int i = 0; for (Priority priority : Priority.values()) { messageQueueNames[i++] = redisSerializer.serialize(AppConstant.MESSAGE_QUEUE_PREFIX + priority.name()); } } @Override public void consume() { RedisConnection redisConnection = redisConnectionFactory.getConnection(); while (true) { List<byte[]> rlts = redisConnection.bLPop(0, messageQueueNames); if (rlts == null || rlts.isEmpty()) { //impossible logger.warn("blpop list null or empty"); } else { try { final Message message = (Message) redisSerializer.deserialize(rlts.get(1)); if (messageHelper.canBeImported(message)) { submitNewTask((new Runnable() { @Override public void run() { boolean rlt = messageHelper.doProcess(message); if (!rlt) { messageHelper.sendFailMessage(message); } } })); } else { messageHelper.sendDeadMessage(message); } } catch (Exception ex) { logger.warn(ex.getMessage(), ex); } } } } }
利用Redis的blpop特性(阻塞)完成监听

当处理失败后直接丢入failLetter,无人处理则丢入deadLetter
package com.air.tqb.helper; import com.air.tqb.message.MessageProcessor; import com.air.tqb.message.MessageSender; import com.air.tqb.message.NoOpMessageProcessor; import com.air.tqb.message.Priority; import com.air.tqb.model.Message; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.List; import java.util.concurrent.TimeUnit; @Component public class MessageHelper { @Autowired private List<MessageProcessor> processorList = Lists.newArrayList(); @Autowired private MessageSender messageSender; private static final MessageProcessor NO_OP_MESSAGE_PROCESSOR = new NoOpMessageProcessor(); private LoadingCache<String, MessageProcessor> docTypeCache = null; @PostConstruct public void init() { docTypeCache = CacheBuilder.newBuilder() .maximumSize(1000) .expireAfterAccess(100, TimeUnit.MINUTES) .build( new CacheLoader<String, MessageProcessor>() { @Override public MessageProcessor load(String key) throws Exception { if (Strings.isNullOrEmpty(key)) return NO_OP_MESSAGE_PROCESSOR; for (MessageProcessor processor : processorList) { if (processor.isInterestedIn(key)) { return processor; } } return NO_OP_MESSAGE_PROCESSOR; } }); } public boolean canBeImported(Message message) { Preconditions.checkArgument(message != null && message.getDocType() != null); MessageProcessor unchecked = docTypeCache.getUnchecked(message.getDocType()); return !unchecked.equals(NO_OP_MESSAGE_PROCESSOR); } public boolean doProcess(Message message) { MessageProcessor unchecked = docTypeCache.getUnchecked(message.getDocType()); return unchecked.process(message); } public void sendMessage(Message message, Priority priority) { messageSender.sendMessage(message, priority); } public void sendMessage(Message message) { this.sendMessage(message, Priority.middle); } public void sendFailMessage(Message message) { messageSender.sendFailMessage(message); } public void sendDeadMessage(Message message) { messageSender.sendDeadMessage(message); } }
对应消息发送&处理均在此
package com.air.tqb.message.redis; import com.air.tqb.common.AppConstant; import com.air.tqb.message.MessageSender; import com.air.tqb.message.Priority; import com.air.tqb.model.Message; import com.alibaba.fastjson.JSON; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; @Component public class RedisMessageSender implements MessageSender { @Autowired @Qualifier(value = "redisTemplate") private RedisTemplate template; public void sendMessage(Message message, Priority priority) { template.opsForList().rightPush(AppConstant.MESSAGE_QUEUE_PREFIX + priority.name(), message); } public void sendMessage(Message message) { this.sendMessage(message, Priority.middle); } public void sendFailMessage(Message message) { message.setTimes(message.getTimes() == null ? 1 : message.getTimes() + 1); template.opsForList().rightPush(AppConstant.MESSAGE_FAILLETTER,message); } public void sendDeadMessage(Message message) { template.opsForList().rightPush(AppConstant.MESSAGE_DEADLETTER, message); } }
将对应信息发送到Redis的队列中

一个正常的VIN码关注Processor如下
package com.air.tqb.message; import com.air.tqb.model.Message; import org.springframework.stereotype.Component; @Component public class VinMessageProcessor implements MessageProcessor { @Override public boolean process(Message message) { System.out.println(message); return true; } @Override public boolean isInterestedIn(String docType) { return "VIN".equals(docType); } }