t-io对半包和粘包的处理


声明:本文转载自https://my.oschina.net/talenttan/blog/1610690,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

先上一下代码,再简单解说一下

package org.tio.core.task;import java.nio.ByteBuffer;import java.util.List;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.tio.core.Aio;import org.tio.core.ChannelAction;import org.tio.core.ChannelContext;import org.tio.core.GroupContext;import org.tio.core.PacketHandlerMode;import org.tio.core.exception.AioDecodeException;import org.tio.core.intf.AioListener;import org.tio.core.intf.Packet;import org.tio.core.stat.ChannelStat;import org.tio.core.stat.IpStat;import org.tio.core.utils.ByteBufferUtils;import org.tio.utils.SystemTimer;/**
 * 解码任务对象,一个连接对应一个本对象
 *
 * @author 谭耀武
 * 2012-08-09
 */public class DecodeRunnable implements Runnable {	private static final Logger log = LoggerFactory.getLogger(DecodeRunnable.class);	/**
	 *
	 * @param channelContext
	 * @param packet
	 * @param byteCount
	 * @author tanyaowu
	 */
	public static void handler(ChannelContext channelContext, Packet packet, int byteCount) {
		GroupContext groupContext = channelContext.getGroupContext();
		PacketHandlerMode packetHandlerMode = groupContext.getPacketHandlerMode();

		HandlerRunnable handlerRunnable = channelContext.getHandlerRunnable();		if (packetHandlerMode == PacketHandlerMode.QUEUE) {
			handlerRunnable.addMsg(packet);
			groupContext.getTioExecutor().execute(handlerRunnable);
		} else {
			handlerRunnable.handler(packet);
		}
	}	private ChannelContext channelContext = null;	/**
	 * 上一次解码剩下的数据
	 */
	private ByteBuffer lastByteBuffer = null;	/**
	 * 新收到的数据
	 */
	private ByteBuffer newByteBuffer = null;	/**
	 *
	 */
	public DecodeRunnable(ChannelContext channelContext) {		this.channelContext = channelContext;
	}	/**
	 * 清空处理的队列消息
	 */
	public void clearMsgQueue() {
		lastByteBuffer = null;
		newByteBuffer = null;
	}	/**
	 * @see java.lang.Runnable#run()
	 *
	 * @author tanyaowu
	 * 2017年3月21日 下午4:26:39
	 *
	 */
	@Override
	public void run() {
		ByteBuffer byteBuffer = newByteBuffer;		if (byteBuffer != null) {			if (lastByteBuffer != null) {
				byteBuffer = ByteBufferUtils.composite(lastByteBuffer, byteBuffer);
				lastByteBuffer = null;
			}
		} else {			return;
		}

		label_2: while (true) {			try {				int initPosition = byteBuffer.position();
				GroupContext groupContext = channelContext.getGroupContext();
				Packet packet = null;
				Integer packetNeededLength = channelContext.getPacketNeededLength();				if (packetNeededLength != null) {
					log.info("{}, 解码所需长度:{}", channelContext, packetNeededLength);					int readableLength = byteBuffer.limit() - initPosition;					if (readableLength >= packetNeededLength) {
						packet = groupContext.getAioHandler().decode(byteBuffer, channelContext);
					}
				} else {
					packet = groupContext.getAioHandler().decode(byteBuffer, channelContext);
				}				if (packet == null)// 数据不够,解不了码
				{
					lastByteBuffer = ByteBufferUtils.copy(byteBuffer, initPosition, byteBuffer.limit());
					ChannelStat channelStat = channelContext.getStat();					int decodeFailCount = channelStat.getDecodeFailCount() + 1;
					channelStat.setDecodeFailCount(decodeFailCount);					int len = byteBuffer.limit() - initPosition;
					log.info("{} 解码失败, 本次共失败{}次,参与解码的数据长度共{}字节", channelContext, decodeFailCount, len);					if (decodeFailCount > 5) {						if (packetNeededLength == null) {
							log.warn("{} 解码失败, 本次共失败{}次,参与解码的数据长度共{}字节,请考虑要不要拉黑这个ip", channelContext, decodeFailCount, len);
						}
					}					return;
				} else //解码成功
				{
					channelContext.setPacketNeededLength(null);
					channelContext.getStat().setLatestTimeOfReceivedPacket(SystemTimer.currentTimeMillis());

					ChannelStat channelStat = channelContext.getStat();
					channelStat.setDecodeFailCount(0);					int afterDecodePosition = byteBuffer.position();					int len = afterDecodePosition - initPosition;

					channelContext.getGroupContext().getGroupStat().getReceivedPackets().incrementAndGet();
					channelContext.getStat().getReceivedPackets().incrementAndGet();

					List<Long> list = groupContext.ipStats.durationList;					for (Long v : list) {
						IpStat ipStat = groupContext.ipStats.get(v, channelContext.getClientNode().getIp());
						ipStat.getReceivedPackets().incrementAndGet();
					}

					channelContext.traceClient(ChannelAction.RECEIVED, packet, null);

					packet.setByteCount(len);

					AioListener aioListener = channelContext.getGroupContext().getAioListener();					try {						if (log.isDebugEnabled()) {
							log.debug("{} 收到消息 {}", channelContext, packet.logstr());
						}
						aioListener.onAfterReceived(channelContext, packet, len);
					} catch (Throwable e) {
						log.error(e.toString(), e);
					}					if (log.isDebugEnabled()) {
						log.debug("{}, 解包获得一个packet:{}", channelContext, packet.logstr());
					}
					handler(channelContext, packet, len);					int remainingLength = byteBuffer.limit() - byteBuffer.position();					if (remainingLength > 0)//组包后,还剩有数据
					{						if (log.isDebugEnabled()) {
							log.debug("{},组包后,还剩有数据:{}", channelContext, remainingLength);
						}						continue label_2;
					} else//组包后,数据刚好用完
					{
						lastByteBuffer = null;
						log.debug("{},组包后,数据刚好用完", channelContext);						return;
					}
				}
			} catch (Throwable e) {
				channelContext.setPacketNeededLength(null);
				log.error(channelContext + ", " + byteBuffer + ", 解码异常:" + e.toString(), e);
				Aio.close(channelContext, e, "解码异常:" + e.getMessage());				if (e instanceof AioDecodeException) {
					GroupContext groupContext = channelContext.getGroupContext();
					List<Long> list = groupContext.ipStats.durationList;					for (Long v : list) {
						IpStat ipStat = groupContext.ipStats.get(v, channelContext.getClientNode().getIp());
						ipStat.getDecodeErrorCount().incrementAndGet();
					}
				}				return;
			}
		}
	}	/**
	 * @param newByteBuffer the newByteBuffer to set
	 */
	public void setNewByteBuffer(ByteBuffer newByteBuffer) {		this.newByteBuffer = newByteBuffer;
	}	@Override
	public String toString() {		return this.getClass().getSimpleName() + ":" + channelContext.toString();
	}

}


对于半包

业务端需要在AioHandler.decode()里返回一个null对象给框架,框架拿到null后,就会认为这是个半包,进而把收到的数据暂存到DecodeRunnable.lastByteBuffer,当后面再收到数据时,把DecodeRunnable.lastByteBuffer和新收到的数据组成一个新的bytebuffer给业务端,如此循环,直到业务端能组成一个packet对象给框架层。

对于粘包

业务端在AioHandler.decode()方法中,解码一个packet对象返回给框架后,框架会判断是否有多余的byte没有被处理,如果有,则拿剩下的byte(bytebuffer)让业务端继续解码,直到业务端返回null或是返回packet但没有剩余byte为止。

小结

框架层已经做好半包和粘包的工作,业务层只需要按着业务协议解码即可,框架会处理好剩下的byte或是上次没处理完的byte的。

不好意思,又是一篇不太长的博客,希望对大家有帮助!

本文发表于2018年01月22日 12:32
(c)注:本文转载自https://my.oschina.net/talenttan/blog/1610690,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 3196 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1