我的第一个问题是设计问题。我试图确定是否应该以这样的方式编写我的worker,即它只是从队列中弹出一条消息,处理它,然后关闭它。像Kubernetes这样的东西看起来相当微不足道。但是如果它消失(由于错误或事故),它会重新启动?
我问这个问题的原因是,为了实现前者,它感觉“被黑了”,因为我必须使用常见的AMQP库来编写以下内容streadway/amqp(阅读评论):
// Pop will extract a message from the AMQP queue
func (v *Queue) Pop() (data []byte, err error) {
msgs, err := v.Channel.Consume(
v.QueueName, // queue
v.ConsmerID, // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return nil, err
}
// We have to use for .. range because Consume returns
// "<-chan Delivery" but if we only want ONE message popped off
// we return on the first one
for data := range msgs {
return data.Body, nil
}
// We should never get this far...
return nil, errors.New("Something went wrong")
}
此外,<-chan Delivery在这种情况下是什么?它似乎可以插入某种“流”或对象。有没有办法不必为这些数据类型编写for循环?
编辑:我还发现,这个代码似乎会使ENTIRE队列出列,即使它只进行一次for循环迭代(如上面的代码所示)。我不知道为什么会发生这种情况?
要简单地从a中获取单个对象<-chan Delivery,请不要使用range循环,而是使用channel operator <-:
data := <- msgs
return data.Body, nil
至于为什么在获取一条消息后整个队列被清空的原因:这很可能是由于Consumer预取。在使用消息时,客户端实际上不会逐个从代理中弹出它们,而是以可配置的大小批量弹出(如果我没记错的话,默认情况下大约是32或64条消息的顺序)。一旦经纪人向您的消费者发布了这批消息,他们就会进入您的msgs渠道; 如果你在收到第一条消息后不再从那个频道读取,其余的将会消失(至少,auto-ack启用 - 否则,它们将在频道关闭后重新排队)。
要一次只获取一条消息,请使用通道的QoS功能(第一个参数是预取计数):
err := v.Channel.Qos(1, 0, false)
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。