开发者社区> 问答> 正文

正确Go / RabbitMQ方式从队列中“弹出”一条消息?

我的第一个问题是设计问题。我试图确定是否应该以这样的方式编写我的worker,即它只是从队列中弹出一条消息,处理它,然后关闭它。像Kubernetes这样的东西看起来相当微不足道。但是如果它消失(由于错误或事故),它会重新启动?

我问这个问题的原因是,为了实现前者,它感觉“被黑了”,因为我必须使用常见的AMQP库来编写以下内容streadway/amqp(阅读评论):

// Pop will extract a message from the AMQP queue
func (v *Queue) Pop() (data []byte, err error) {

msgs, err := v.Channel.Consume(
    v.QueueName, // queue
    v.ConsmerID, // consumer
    true,        // auto-ack
    false,       // exclusive
    false,       // no-local
    false,       // no-wait
    nil,         // args
)
if err != nil {
    return nil, err
}

// We have to use for .. range because Consume returns
// "<-chan Delivery" but if we only want ONE message popped off
// we return on the first one
for data := range msgs {
    return data.Body, nil
}

// We should never get this far...
return nil, errors.New("Something went wrong")

}
此外,<-chan Delivery在这种情况下是什么?它似乎可以插入某种“流”或对象。有没有办法不必为这些数据类型编写for循环?

编辑:我还发现,这个代码似乎会使ENTIRE队列出列,即使它只进行一次for循环迭代(如上面的代码所示)。我不知道为什么会发生这种情况?

展开
收起
k8s小能手 2018-12-14 15:01:31 3680 0
1 条回答
写回答
取消 提交回答
  • 整合最优质的专家资源和技术资料,问答解疑

    要简单地从a中获取单个对象<-chan Delivery,请不要使用range循环,而是使用channel operator <-:

    data := <- msgs
    return data.Body, nil
    至于为什么在获取一条消息后整个队列被清空的原因:这很可能是由于Consumer预取。在使用消息时,客户端实际上不会逐个从代理中弹出它们,而是以可配置的大小批量弹出(如果我没记错的话,默认情况下大约是32或64条消息的顺序)。一旦经纪人向您的消费者发布了这批消息,他们就会进入您的msgs渠道; 如果你在收到第一条消息后不再从那个频道读取,其余的将会消失(至少,auto-ack启用 - 否则,它们将在频道关闭后重新排队)。

    要一次只获取一条消息,请使用通道的QoS功能(第一个参数是预取计数):

    err := v.Channel.Qos(1, 0, false)

    2019-07-17 23:20:58
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
RocketMQ Client-GO 介绍 立即下载
RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
阿里云栖开发者沙龙PHP技术专场-RabbitMQ 的延时队列和镜像队列原理与实战-钱文品 立即下载