如何让消息队列达到最大吞吐量?

简介: 如何让消息队列达到最大吞吐量?

你在使用消息队列的时候关注过吞吐量吗?

思考过吞吐量的影响因素吗?

考虑过怎么提高吗?

总结过最佳实践吗?

本文带你一起探讨下消息队列消费端高吞吐的 Go 框架实现。Let’s go!

关于吞吐量的一些思考

  • 写入消息队列吞吐量取决于以下两个方面
    最佳吞吐量是让其中之一打满,而一般情况下内网带宽都会非常高,不太可能被打满,所以自然就是讲消息队列的写入速度打满,这就就有两个点需要平衡
    go-zero 的 PeriodicalExecutorChunkExecutor 就是为了这种情况设计的
  • 批量写入的消息量大小或者字节数多少
  • 延迟多久写入
  • 网络带宽
  • 消息队列(比如Kafka)写入速度
  • 从消息队列里消费消息的吞吐量取决于以下两个方面
    这里有个核心问题是不能不考虑业务处理速度,而读取过多的消息到内存里,否则可能会引起两个问题:
  • 内存占用过高,甚至出现OOM,pod 也是有 memory limit
  • 停止 pod 时堆积的消息来不及处理而导致消息丢失
  • 消息队列的读取速度,一般情况下消息队列本身的读取速度相比于处理消息的速度都是足够快的
  • 处理速度,这个依赖于业务

解决方案和实现

借用一下 Rob Pike 的一张图,这个跟队列消费异曲同工。左边4个 gopher 从队列里取,右边4个 gopher 接过去处理。比较理想的结果是左边和右边速率基本一致,没有谁浪费,没有谁等待,中间交换处也没有堆积。

我们来看看 go-zero 是怎么实现的:

  • Producer
for {
  select {
  case <-q.quit:
   logx.Info("Quitting producer")
   return
  default:
   if v, ok := q.produceOne(producer); ok {
    q.channel <- v
   }
  }
 }

没有退出事件就会通过 produceOne 去读取一个消息,成功后写入 channel。利用 chan 就可以很好的解决读取和消费的衔接问题。

  • Consumer
for {
  select {
  case message, ok := <-q.channel:
   if ok {
    q.consumeOne(consumer, message)
   } else {
    logx.Info("Task channel was closed, quitting consumer...")
    return
   }
  case event := <-eventChan:
   consumer.OnEvent(event)
  }
 }

这里如果拿到消息就去处理,当 okfalse 的时候表示 channel 已被关闭,可以退出整个处理循环了。同时我们还在 redis queue 上支持了 pause/resume,我们原来在社交场景里大量使用这样的队列,可以通知 consumer 暂停和继续。

  • 启动 queue,有了这些我们就可以通过控制 producer/consumer 的数量来达到吞吐量的调优了
func (q *Queue) Start() {
 q.startProducers(q.producerCount)
 q.startConsumers(q.consumerCount)
 q.producerRoutineGroup.Wait()
 close(q.channel)
 q.consumerRoutineGroup.Wait()
}

这里需要注意的是,先要停掉 producer,再去等 consumer 处理完。

到这里核心控制代码基本就讲完了,其实看起来还是挺简单的,也可以到 https://github.com/tal-tech/go-zero/tree/master/core/queue 去看完整实现。

如何使用

基本的使用流程:

  1. 创建 producer 或  consumer
  2. 启动 queue
  3. 生产消息 / 消费消息

对应到 queue 中,大致如下:

创建 queue

// 生产者创建工厂
producer := newMockedProducer()
// 消费者创建工厂
consumer := newMockedConsumer()
// 将生产者以及消费者的创建工厂函数传递给 NewQueue()
q := queue.NewQueue(func() (Producer, error) {
  return producer, nil
}, func() (Consumer, error) {
  return consumer, nil
})

我们看看 NewQueue 需要什么参数:

  1. producer 工厂方法
  2. consumer 工厂方法

producer & consumer 的工厂函数传递  queue ,由它去负责创建。框架提供了 ProducerConsumer 的接口以及工厂方法定义,然后整个流程的控制 queue 实现会自动完成。

生产 message

我们通过自定义一个 mockedProducer 来模拟:

type mockedProducer struct {
 total int32
 count int32
  // 使用waitgroup来模拟任务的完成
 wait  sync.WaitGroup
}
// 实现 Producer interface 的方法:Produce()
func (p *mockedProducer) Produce() (string, bool) {
 if atomic.AddInt32(&p.count, 1) <= p.total {
  p.wait.Done()
  return "item", true
 }
 time.Sleep(time.Second)
 return "", false
}

queue 中的生产者编写都必须实现:

  • Produce():由开发者编写生产消息的逻辑
  • AddListener():添加事件 listener

消费 message

我们通过自定义一个 mockedConsumer 来模拟:

type mockedConsumer struct {
 count  int32
}
func (c *mockedConsumer) Consume(string) error {
 atomic.AddInt32(&c.count, 1)
 return nil
}

启动  queue

启动,然后验证我们上述的生产者和消费者之间的数据是否传输成功:

func main() {
 // 创建 queue
 q := NewQueue(func() (Producer, error) {
  return newMockedProducer(), nil
 }, func() (Consumer, error) {
  return newMockedConsumer(), nil
 })
  // 启动panic了也可以确保stop被执行以清理资源
  defer q.Stop()
 // 启动
 q.Start()
}

以上就是 queue 最简易的实现示例。我们通过这个 core/queue 框架实现了基于 rediskafka 等的消息队列服务,在不同业务场景中经过了充分的实践检验。你也可以根据自己的业务实际情况,实现自己的消息队列服务。

整体设计

整体流程如上图:

  1. 全体的通信都由 channel 进行
  2. ProducerConsumer 的数量可以设定以匹配不同业务需求
  3. ProduceConsume 具体实现由开发者定义,queue 负责整体流程

总结

本篇文章讲解了如何通过 channel 来平衡从队列中读取和处理消息的速度,以及如何实现一个通用的消息队列处理框架,并通过 mock 示例简单展示了如何基于 core/queue 实现一个消息队列处理服务。你可以通过类似的方式实现一个基于 rocketmq 等的消息队列处理服务。

关于 go-zero 更多的设计和实现文章,可以关注『微服务实践』公众号。

项目地址

https://github.com/tal-tech/go-zero

相关文章
|
2月前
|
消息中间件 存储 负载均衡
现代消息队列与云存储问题之消息队列支持定时消息和延迟队列的问题如何解决
现代消息队列与云存储问题之消息队列支持定时消息和延迟队列的问题如何解决
|
2月前
|
消息中间件 监控
异步处理和消息队列的结合使用
异步处理和消息队列的结合使用
|
2月前
|
消息中间件 监控 JavaScript
异步处理和消息队列
异步处理和消息队列
|
3月前
|
消息中间件 监控 Kubernetes
消息队列 MQ使用问题之没有实际的消息流量,磁盘和带宽的使用情况较高,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之没有实际的消息流量,磁盘和带宽的使用情况较高,是什么原因
|
2月前
|
消息中间件 存储 Kafka
现代消息队列与云存储问题之现代消息队列处理消息顺序性的问题如何解决
现代消息队列与云存储问题之现代消息队列处理消息顺序性的问题如何解决
|
3月前
|
消息中间件 负载均衡 Java
消息队列 MQ使用问题之负载不均衡是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 数据安全/隐私保护 RocketMQ
消息队列 MQ使用问题之遇到消费速度是固定的并且导致了堆积,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件
RabbitMQ 实现消息队列延迟
RabbitMQ 实现消息队列延迟
168 0
|
5月前
|
消息中间件 Kafka 数据库
【后端面经】【消息队列】22 | 消息队列:消息队列可以用来解决什么问题?-02 超时场景+性能问题
【5月更文挑战第7天】 本文介绍了电商中订单超时取消的处理方法,通过使用消息队列实现延时消息。当订单30分钟后未支付,消息队列将触发取消操作,但需注意并发问题,如采用分布式锁或乐观锁避免并发更新订单状态。乐观锁确保只有订单状态为未支付时才允许支付。主流消息队列如RocketMQ支持延迟消息,而Kafka不支持。 使用消息队列的好处在于解耦和提高系统性能、扩展性和可用性。同步调用会导致性能下降,因为必须等待所有调用完成。并发调用虽可提升性能,但仍逊于消息队列,且无法解决扩展性和可用性问题。
97 1
|
5月前
|
消息中间件 存储 监控
中间件消息队列堆积
中间件消息队列堆积是一个常见的问题
41 1
下一篇
无影云桌面