nsq 源码分析之tcp协议部分


声明:本文转载自https://my.oschina.net/u/2950272/blog/1816971,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

    最近刚好看到其他几个项目有socket 编程,然后,想了下,在golang 中还没用过socket tcp 编程,于是看了一些im 的协议解析过程,都不是太满意,不是太通用,刚好看到nsq,发现nsq 这部分真是简单粗暴。还用的是标准库的一些东西,非常通用。

    该文章后续仍在不断的更新修改中, 请移步到原文地址http://dmwan.cc

    nsq 的协议文档地址:https://nsq.io/clients/tcp_protocol_spec.html。

    首先,tcp 编程,最麻烦的地方是处理粘包,这个玩意怎么处理,一般是通过分隔符和两次读处理。httppaser 基本会封装这两种,比如tornado 自己封装的readbytes,readuntil,一个是定长读,一个是读到某个标识符为止,中间会为读写分配个缓冲区。这里道理很好理解,定长读,先读约定的header,比如先读4个字节,得到后面有n个字节后,一直循环向buffer 放,而readuntil 每次read 放到buffer,不断find 分隔符。这里的边界条件,不管是python 还是c 写起来都是比较麻烦的事情。

    其实golang 里算是比较简单的,看个例子,下面这个是典型,第一行是command,'\n'分隔符结尾,第二行,因为这个command 有参数,用4个字节约定后面body 长度。

Publish a message to a topic:

PUB <topic_name>\n [ 4-byte size in bytes ][ N-byte binary data ]  <topic_name> - a valid string (optionally having #ephemeral suffix)

    我们做的时候,只需要有两个函数,一个能readuntil,一个能readbytes就ok,golang 有没?其实golang标准库是有的,分别是bufio.ReadSlice ,对应readuntil, 一个是io.ReadFull, 对应readbytes。

    nsq tcp server  流程算是比较清晰的。

    1,tcp accept 后,由一个 handle 处理,每个conn 一个IOLoop, 这个IOLoop 会维持这个链接,进行读写。

 2, IoLoop 为每个conn 新建一个client 对象,这个对象的Reader 实例化的时候用的是*bufio.Reader,而bufio 封装了一系列的io 方法,这里比较核心的是ReadSlice ,能读到后缀为止。读到的line 按空格解析成command 和 参数 , 由后面反射成不同命令,由Exec 执行。

    3, p.Exec 会根据不同的命令,执行不同方法。

    4,有的命令会附带body ,这里body 按照协议,是由4 byte 的size 加具体body 构成。这里解析,分两次读取。以pub 为例:

    readlen 其实就是ReadFull,client.lenSlice 就是一个[4]byte 的字节数组,所以,其实很明显,两次读取,一个读长度,按长度分配buffer,二次读取body。

func readLen(r io.Reader, tmp []byte) (int32, error) { 	_, err := io.ReadFull(r, tmp) 	if err != nil { 		return 0, err 	} 	return int32(binary.BigEndian.Uint32(tmp)), nil }

    上面过程基本就可以 理解整个nsq tcp server 的数据流向了,这里的标准库函数可以很快移植到其他的tcp server 项目中去,只需要构建好自己的protocol,自己的命令反射就ok。

    下面分析下 ReadFull 的源码,相比其他语言,按定长读取,放到buffer,这里还是比较有意思的。这样,读到指定长度err 为nil,读不到,数据为bad data。同理,readslice 方式类似。

// ReadAtLeast reads from r into buf until it has read at least min bytes. // It returns the number of bytes copied and an error if fewer bytes were read. // The error is EOF only if no bytes were read. // If an EOF happens after reading fewer than min bytes, // ReadAtLeast returns ErrUnexpectedEOF. // If min is greater than the length of buf, ReadAtLeast returns ErrShortBuffer. // On return, n >= min if and only if err == nil. func ReadAtLeast(r Reader, buf []byte, min int) (n int, err error) { 	if len(buf) < min { 		return 0, ErrShortBuffer 	} 	for n < min && err == nil { 		var nn int 		nn, err = r.Read(buf[n:])// 这里不会按定长读取1024,而是按照传参,变长读,循环放倒buf,不会出现读多的问题  		n += nn 	} 	if n >= min { 		err = nil 	} else if n > 0 && err == EOF { 		err = ErrUnexpectedEOF 	} 	return }  // ReadFull reads exactly len(buf) bytes from r into buf. // It returns the number of bytes copied and an error if fewer bytes were read. // The error is EOF only if no bytes were read. // If an EOF happens after reading some but not all the bytes, // ReadFull returns ErrUnexpectedEOF. // On return, n == len(buf) if and only if err == nil. func ReadFull(r Reader, buf []byte) (n int, err error) { 	return ReadAtLeast(r, buf, len(buf)) }

     如何防止client 端恶意不传完整参数?设置超时属性就ok。以上。

本文发表于2018年05月23日 12:00
(c)注:本文转载自https://my.oschina.net/u/2950272/blog/1816971,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2569 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1