Kafka/RocketMQ 多线程消费时如何保证消费顺序?

简介: kafka 的消费类 KafkaConsumer 是非线程安全的,因此用户无法在多线程中共享一个 KafkaConsumer 实例,且 KafkaConsumer 本身并没有实现多线程消费逻辑,如需多线程消费,还需要用户自行实现,在这里我会讲到 Kafka 两种多线程消费模型。

上两篇文章都在讨论顺序消息的一些知识,看到有个读者的留言如下:


640.png

这个问题问得非常棒,由于在之前的文章中并没有提及到,因此我在这篇文章中单独讲解,本文将从消费顺序性这个问题出发,深度剖析 Kafka/RocketMQ 消费线程模型。


Kafka



kafka 的消费类 KafkaConsumer 是非线程安全的,因此用户无法在多线程中共享一个 KafkaConsumer 实例,且 KafkaConsumer 本身并没有实现多线程消费逻辑,如需多线程消费,还需要用户自行实现,在这里我会讲到 Kafka 两种多线程消费模型。


1、每个线程维护一个 KafkaConsumer


这样相当于一个进程内拥有多个消费者,也可以说消费组内成员是有多个线程内的 KafkaConsumer 组成的。

640.png

但其实这个消费模型是存在很大问题的,从消费消费模型可看出每个 KafkaConsumer 会负责固定的分区,因此无法提升单个分区的消费能力,如果一个主题分区数量很多,只能通过增加 KafkaConsumer 实例提高消费能力,这样一来线程数量过多,导致项目 Socket 连接开销巨大,项目中一般不用该线程模型去消费。


2、单 KafkaConsumer 实例 + 多 worker 线程


针对第一个线程模型的缺点,我们可采取 KafkaConsumer 实例与消息消费逻辑解耦,把消息消费逻辑放入单独的线程中去处理,线程模型如下:

640.png

从消费线程模型可看出,当 KafkaConsumer 实例与消息消费逻辑解耦后,我们不需要创建多个 KafkaConsumer 实例就可进行多线程消费,还可根据消费的负载情况动态调整 worker 线程,具有很强的独立扩展性,在公司内部使用的多线程消费模型就是用的单 KafkaConsumer 实例 + 多 worker 线程模型。


但这个消费模型由于消费逻辑是利用多线程进行消费的,因此并不能保证其消息的消费顺序,在这里我们可以引入阻塞队列的模型,一个 woker 线程对应一个阻塞队列,线程不断轮训从阻塞队列中获取消息进行消费,对具有相同 key 的消息进行取模,并放入相同的队列中,实现顺序消费, 消费模型如下:

640.png


但是以上两个消费线程模型,存在一个问题:


在消费过程中,如果 Kafka 消费组发生重平衡,此时的分区被分配给其它消费组了,如果拉取回来的消息没有被消费,虽然 Kakfa 可以实现 ConsumerRebalanceListener 接口,在新一轮重平衡前主动提交消费偏移量,但这貌似解决不了未消费的消息被打乱顺序的可能性?


因此在消费前,还需要主动进行判断此分区是否被分配给其它消费者处理,并且还需要锁定该分区在消费当中不能被分配到其它消费者中(但 kafka 目前做不到这一点)。

参考 RocketMQ 的做法:


在消费前主动调用 ProcessQueue#isDropped 方法判断队列是否已过期,并且对该队列进行加锁处理(向 broker 端请求该队列加锁)。


RocketMQ



RocketMQ 不像 Kafka 那么“原生”,RocketMQ 早已为你准备好了你的需求,它本身的消费模型就是单 consumer 实例 + 多 worker 线程模型,有兴趣的小伙伴可以从以下方法观摩 RocketMQ 的消费逻辑:


org.apache.rocketmq.client.impl.consumer.PullMessageService#run


RocketMQ 会为每个队列分配一个 PullRequest,并将其放入 pullRequestQueue,PullMessageService 线程会不断轮询从 pullRequestQueue 中取出 PullRequest 去拉取消息,接着将拉取到的消息给到 ConsumeMessageService 处理,

ConsumeMessageService 有两个子接口:

// 并发消息消费逻辑实现类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
// 顺序消息消费逻辑实现类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;

其中,ConsumeMessageConcurrentlyService 内部有一个线程池,用于并发消费,同样地,如果需要顺序消费,那么 RocketMQ 提供了 ConsumeMessageOrderlyService 类进行顺序消息消费处理。


经过对 Kafka 消费线程模型的思考之后,从 ConsumeMessageOrderlyService 源码中能够看出 RocketMQ 能够实现局部消费顺序,我认为主要有以下两点:


1)RocketMQ 会为每个消息队列建一个对象锁,这样只要线程池中有该消息队列在处理,则需等待处理完才能进行下一次消费,保证在当前 Consumer 内,同一队列的消息进行串行消费。

2)向 Broker 端请求锁定当前顺序消费的队列,防止在消费过程中被分配给其它消费者处理从而打乱消费顺序。


总结



经过这篇文章的分析后,尝试回答文章开头的那个问题:


1)多分区的情况下:


如果想要保证 Kafka 在消费时要保证消费的顺序性,可以使用每个线程维护一个 KafkaConsumer 实例,并且是一条一条地去拉取消息并进行消费(防止重平衡时有可能打乱消费顺序);对于能容忍消息短暂乱序的业务(话说回来, Kafka 集群也不能保证严格的消息顺序),可以使用单 KafkaConsumer 实例 + 多 worker 线程 + 一条线程对应一个阻塞队列消费线程模型。


1)单分区的情况下:


由于单分区不存在重平衡问题,以上两个线程模型的都可以保证消费的顺序性。

另外如果是 RocketMQ,使用 MessageListenerOrderly 监听消费可保证消息消费顺序。


很多人也有这个疑问:既然 Kafka 和 RocketMQ 都不能保证严格的顺序消息,那么顺序消费还有意义吗?


一般来说普通的的顺序消息能够满足大部分业务场景,如果业务能够容忍集群异常状态下消息短暂不一致的情况,则不需要严格的顺序消息。


如果你对文章还有什么疑问和补充或者发现文中有错误的地方,欢迎留言,我们一起探讨。


相关文章
|
26天前
|
消息中间件 运维 Java
招行面试:RocketMQ、Kafka、RabbitMQ,如何选型?
45岁资深架构师尼恩针对一线互联网企业面试题,特别是招商银行的高阶Java后端面试题,进行了系统化梳理。本文重点讲解如何根据应用场景选择合适的消息中间件(如RabbitMQ、RocketMQ和Kafka),并对比三者的性能、功能、可靠性和运维复杂度,帮助求职者在面试中充分展示技术实力,实现“offer直提”。此外,尼恩还提供了《尼恩Java面试宝典PDF》等资源,助力求职者提升架构、设计、开发水平,应对高并发、分布式系统的挑战。更多内容及技术圣经系列PDF,请关注【技术自由圈】获取。
|
6月前
|
消息中间件 Java Kafka
消息传递新纪元:探索RabbitMQ、RocketMQ和Kafka的魅力所在
【8月更文挑战第29天】这段内容介绍了在分布式系统中起到异步通信与解耦作用的消息队列,并详细探讨了三种流行的消息队列产品:RabbitMQ、RocketMQ 和 Kafka。其中,RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,支持多种消息模型;RocketMQ 则是由阿里巴巴开源的具备高性能、高可用性和高可靠性的分布式消息队列,支持事务消息等多种特性;而 Kafka 作为一个由 LinkedIn 开源的分布式流处理平台,以高吞吐量和良好的可扩展性著称。此外,还提供了使用这三种消息队列发送和接收消息的代码示例。总之,这三种消息队列各有优势,适用于不同的业务场景。
86 3
|
3月前
|
消息中间件 大数据 Kafka
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
本文深入探讨了消息队列的核心概念、应用场景及Kafka、RocketMQ、RabbitMQ的优劣势比较,大厂面试高频,必知必会,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
|
3月前
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
139 1
|
4月前
|
消息中间件 存储 监控
说说如何解决RocketMq消息积压?为什么Kafka性能比RocketMq高?它们区别是什么?
【10月更文挑战第8天】在分布式系统中,消息队列扮演着至关重要的角色,它不仅能够解耦系统组件,还能提供异步处理、流量削峰和消息持久化等功能。在众多的消息队列产品中,RocketMQ和Kafka无疑是其中的佼佼者。本文将围绕如何解决RocketMQ消息积压、为什么Kafka性能比RocketMQ高以及它们之间的区别进行深入探讨。
142 1
|
5月前
|
消息中间件 安全 大数据
Kafka多线程Consumer是实现高并发数据处理的有效手段之一
【9月更文挑战第2天】Kafka多线程Consumer是实现高并发数据处理的有效手段之一
428 4
|
6月前
|
消息中间件 Kafka Apache
kafka vs rocketmq: 不要只顾着吞吐量而忘了延迟这个指标
这篇文章讨论了Apache RocketMQ和Kafka的对比,强调RocketMQ在低延迟、消息重试与追踪、海量Topic、多租户等方面进行了优化,特别是在小包非批量和大量分区场景下的吞吐量超越Kafka,适合电商和金融领域等高并发、高可靠和高可用场景。
197 0
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
800 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67814 2
3 张图带你彻底理解 RocketMQ 事务消息
|
消息中间件 Java uml
5张图带你理解 RocketMQ 顺序消息实现机制
5张图带你理解 RocketMQ 顺序消息实现机制
733 1
5张图带你理解 RocketMQ 顺序消息实现机制