rabbitmq 客户端golang实战


声明:本文转载自https://my.oschina.net/ureyishere/blog/1801876,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

rabbitmq消息模式

rabbitmq中进行消息控制的组建可以分为以下几部分:

  1. exchange:rabbitmq中的路由部件,控制消息的转发路径;
  2. queue:rabbitmq的消息队列,可以有多个消费者从队列中读取消息;
  3. consumer:消息的消费者;

rabbitmq在使用过程中可以单独使用queue进行消息传递(例如celery就可以使用单个queue进行多对多的消息传递),也利用exchange与queue构建多种消息模式,主要包括fanout、direct和topic方式,模式的使用方式在此放一张图,不再此做详细解释。

rabbitmq

我在使用的rabbitmq的过程中,主要是进行消息的广播及主题订阅:

[producer] -> [exchange] ->fanout-> [queue of consumer] -> [consumer]        |                             /|\        ------->[exchange] ->topic------ 

不同的设备连接到rabbitmq中创建自己的queue,将queue绑定的两个不同的exchange,分别接收广播消息及主题消息。通过配置queue的持久化及消息过期时间,则可以在设备短暂下线的情况下,将消息缓存在queue中,之后上线后再从queue中读取消息。

rabbitmq客户端

rabbitmq客户端本质上是实现amqp协议的通信过程,golang的基础package使用的是github.com/streadway/amqp

在此主要对客户端构建中的一些问题进行陈述,详细的客户端构建代码请参见:rabbitmq_client.go

创建queue

exchange和queue实际上都是通过amqp协议进行创建的,如果在创建过程时,rabbitmq中已经有相同名称的exchange或queue但属性不则会创建失败。通常情况下exchange的属性不会变化,但是queue可能会修改过期时间、消息TTL等属性,因此实现过程中,若queue创建不成功则进行删除后再创建(在我的应用场景中queue与消费者绑定,因此不存在误删在使用中的queue的问题):

func (clt *Client) queInit(server *broker, ifFresh bool) (err error) {  	var num int 	ch := clt.ch  	if ifFresh { 		num, err = ch.QueueDelete( 			server.quePrefix+"."+clt.device, 			false, 			false, 			false, 		) 		if err != nil { 			return 		} 		log.Println("[RABBITMQ_CLIENT]", clt.device, "queue deleted with", num, "message purged") 	}  	args := make(amqp.Table) 	args["x-message-ttl"] = messageTTL 	args["x-expires"] = queueExpire 	q, err := ch.QueueDeclare( 		server.quePrefix+"."+clt.device, // name 		true,  // durable 		false, // delete when usused 		false, // exclusive 		false, // no-wait 		args,  // arguments 	)     // 注意在此配置的两个参数,详细用意请参见 http://next.rabbitmq.com/ttl.html 	if err != nil { 		return 	}  	for _, topic := range server.topics { 		err = ch.QueueBind( 			q.Name, 			topic.keyPrefix+"."+clt.device, 			topic.chanName, 			false, 			nil, 		) 		if err != nil { 			return 		} 	}  	clt.que = q 	return } 

消息接收

对于消费者消息的接收过程如下所示:

msgs, err := clt.ch.Consume( 		clt.que.Name, // queue 		clt.device,   // consumer 		false,        // auto ack 		false,        // exclusive 		false,        // no local 		false,        // no wait 		nil,          // args 	) 	if err != nil { 		clt.Close() 		log.Println("[RABBITMQ_CLIENT]", "Start consume ERROR:", err) 		return nil 	}  	clt.msgs = msgs 	clt.pubChan = make(chan *publishMsg, 4)  	go func() { 		cc := make(chan *amqp.Error) 		e := <-clt.ch.NotifyClose(cc) 		log.Println("[RABBITMQ_CLIENT]", "channel close error:", e.Error()) 		clt.cancel() 	}()  	go func() { 		for d := range msgs { 			msg := d.Body 			msgProcess(d.Exchange, msg) 			d.Ack(false) 		} 	}()  

通过ch.Consume调用可以得到一个接收消息的msgs channel,在此没有配置auto ack,而是在消息处理结束之后,通过调用d.Ack(false)反馈ACK,这样可以保证消息在被处理之后,再进行确认。消费过程中,还调用ch.NotifyClose(cc)amqp.Channel的关闭进行侦听。

注意:在一个gorontinue中同时对msgs和notifyClose两个channel进行读取可能会导致死锁。因为msgs被关闭就会结束相应的gorontinue,此时notifyClose因为没有接收者,而在amqp.channel关闭的过程中出现死锁。

消息发送

在amqp的消息发送过程中,其对于消息的确认机制略有些蛋疼。因为在发送的时候不可配置发送的消息id,但在接收确认时,消息id是按照自然数递增的,也就是说发送者需要按照自然数递增的顺序自己维护发送的消息id。相关代码如下所示:

func (clt *Client) publishProc() { 	ticker := time.NewTicker(tickTime) 	deliveryMap := make(map[uint64]*publishMsg)  	defer func() { 		atomic.AddInt32(&clt.onPublish, -1) 		ticker.Stop() 		for _, msg := range deliveryMap { 			msg.ackErr = errCancel 			msg.cancel() 		} 	}()  	var deliveryTag uint64 = 1 	var ackTag uint64 = 1 	var pMsg *publishMsg 	for { 		select {  		case <-clt.ctx.Done(): 			return  		case pMsg = <-clt.pubChan: 			pMsg.startTime = time.Now() 			err := clt.sendPublish(pMsg.topicId, pMsg.keySuffix, pMsg.msg, pMsg.expire) 			if err != nil { 				pMsg.ackErr = err 				pMsg.cancel() 			} 			deliveryMap[deliveryTag] = pMsg 			deliveryTag++  		case c, ok := <-clt.confirm: 			if !ok { 				log.Println("[RABBITMQ_CLIENT]", "client Publish notify channel error") 				return 			} 			pMsg = deliveryMap[c.DeliveryTag] 			// fmt.Println("DeliveryTag:", c.DeliveryTag) 			delete(deliveryMap, c.DeliveryTag) 			if c.Ack { 				pMsg.ackErr = nil 				pMsg.cancel() 			} else { 				pMsg.ackErr = errNack 				pMsg.cancel() 			} 		case <-ticker.C: 			now := time.Now() 			for { 				if len(deliveryMap) == 0 { 					break 				} 				pMsg = deliveryMap[ackTag] 				if pMsg != nil { 					if now.Sub(pMsg.startTime.Add(pubTime)) > 0 { 						pMsg.ackErr = errTimeout 						pMsg.cancel() 						delete(deliveryMap, ackTag) 					} else { 						break 					} 				} 				ackTag++ 			} 		} 	} } 

发送过程的构造要点:

  1. 使用一个map[uint64]*publishMsg存储已经发送的消息,map的键为消息的id;
  2. 接收到确认消息后,通过消息的反馈机制反馈确认信息,并从map中删除消息;
  3. 在每一个tick,按照递增的id检查map中是否有超时消息,通过消息的反馈机制反馈超时信息;
  4. 在协程退出时向每个消息发送反馈信息,并删除消息。

需要注意的是,消息反馈并没有使用channel,因为消息的接收者可能因为超时不再侦听channel,从而导致发送过程出现阻塞。可以用长度不为0的反馈channel使得发送过程不阻塞,但是着需要等待gc后才能释放反馈channel的内存。因此在此并没有使用channel接收反馈,而是通过context的事件来告知发送方消息发送过程结束,反馈信息则提前写在publishMsgackErr中。

总结

作为golang的入门级选手,在实现rabbitmq客户端过程中还是踩了一些坑,最后的实现还是可以算是高效可靠。rabbitmq的库本身有心跳机制来维持与服务器之间的连接,但依据实现mqtt客户端的经验,还是自己实现了心跳来保障客户端上层连接的可靠性。因此在接收和发送两方面,该客户端实现还是经受住了考验,欢迎大家参考。

本文发表于2018年04月27日 22:38
(c)注:本文转载自https://my.oschina.net/ureyishere/blog/1801876,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2054 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1