分享一些 Kafka 消费数据的小经验

简介: 之前写过一篇《从源码分析如何优雅的使用 Kafka 生产者》 ,有生产者自然也就有消费者。建议对 Kakfa 还比较陌生的朋友可以先看看。就我的使用经验来说,大部分情况都是处于数据下游的消费者角色。也用 Kafka 消费过日均过亿的消息(不得不佩服 Kakfa 的设计),本文将借助我使用 Kakfa 消费数据的经验来聊聊如何高效的消费数据。

单线程消费


以之前生产者中的代码为例,事先准备好了一个 Topic:data-push,3个分区。


先往里边发送 100 条消息,没有自定义路由策略,所以消息会均匀的发往三个分区。


先来谈谈最简单的单线程消费,如下图所示:



由于数据散列在三个不同分区,所以单个线程需要遍历三个分区将数据拉取下来。


单线程消费的示例代码:



这段代码大家在官网也可以找到:将数据取出放到一个内存缓冲中最后写入数据库的过程。


先不讨论其中的 offset 的提交方式。



通过消费日志可以看出:


取出的 100 条数据确实是分别遍历了三个分区。


单线程消费虽然简单,但存在以下几个问题:


  • 效率低下。如果分区数几十上百个,单线程无法高效的取出数据。


  • 可用性很低。一旦消费线程阻塞,甚至是进程挂掉,那么整个消费程序都将出现问题。


多线程消费


既然单线程有诸多问题,那是否可以用多线程来提高效率呢?


在多线程之前不得不将消费模式分为两种进行探讨:消费组、独立消费者。


这两种消费模式对应的处理方式有着很大的不同,所以很有必要单独来讲。


独立消费者模式


先从独立消费者模式谈起,这种模式相对于消费组来说用的相对小众一些。


看一个简单示例即可知道它的用法:



值得注意的是:独立消费者可以不设置 group.id 属性。


也是发送100条消息,消费结果如下:



通过 API 可以看出:我们可以手动指定需要消费哪些分区。


比如 data-push Topic 有三个分区,我可以手动只消费其中的 1 2 分区,第三个可以视情况来消费。


同时它也支持多线程的方式,每个线程消费指定分区进行消费。



为了直观,只发送了 10 条数据。



根据消费结果可以看出:


c1 线程只取 0 分区;c2 只取 1 分区;c3 只取 2 分区的数据。


甚至我们可以将消费者多进程部署,这样的消费方式如下:



假设 Topic:data-push 的分区数为 4 个,那我们就可以按照图中的方式创建两个进程。


每个进程内有两个线程,每个线程再去消费对应的分区。


这样当我们性能不够新增 Topic 的分区数时,消费者这边只需要这样水平扩展即可,非常的灵活。


这种自定义分区消费的方式在某些场景下还是适用的,比如生产者每次都将某一类的数据只发往一个分区。这样我们就可以只针对这一个分区消费。


但这种方式有一个问题:可用性不高,当其中一个进程挂掉之后;该进程负责的分区数据没法转移给其他进程处理。


消费组模式


消费组模式应当是使用最多的一种消费方式。


我们可以创建 N 个消费者实例(new KafkaConsumer()),当这些实例都用同一个 group.id 来创建时,他们就属于同一个消费组。


在同一个消费组中的消费实例可以收到消息,但一个分区的消息只会发往一个消费实例。


还是借助官方的示例图来更好的理解它。



某个 Topic 有四个分区 p0 p1 p2 p3,同时创建了两个消费组 groupA,groupB


  • A 消费组中有两个消费实例 C1、C2


  • B 消费组中有四个消费实例 C3、C4、C5、C6


这样消息是如何划分到每个消费实例的呢?


通过图中可以得知:


  • A 组中的 C1 消费了 P0 和 P3 分区;C2 消费 P1、P2 分区。


  • B 组有四个实例,所以每个实例消费一个分区;也就是消费实例和分区是一一对应的。


需要注意的是:


这里的消费实例简单的可以理解为 new KafkaConsumer它和进程没有关系


比如说某个 Topic 有三个分区,但是我启动了两个进程来消费它。


其中每个进程有两个消费实例,那其实就相当于有四个实例了。


这时可能就会问 4 个实例怎么消费 3 个分区呢?


消费组自平衡


这个 Kafka 已经帮我做好了,它会来做消费组里的 Rebalance


比如上面的情况,3 个分区却有 4 个消费实例;最终肯定只有三个实例能取到消息。但至于是哪三个呢,这点 Kakfa 会自动帮我们分配好。


看个例子,还在之前的 data-push 这个 Topic,其中有三个分区。


当其中一个进程(其中有三个线程,每个线程对应一个消费实例)时,消费结果如下:



里边的 20 条数据都被这个进程的三个实例消费掉。


这时我新启动了一个进程,程序和上面那个一模一样;这样就相当于有两个进程,同时就是 6 个实例。


我再发送 10 条消息会发现:


进程1 只取到了分区 1 里的两条数据(之前是所有数据都是进程1里的线程获取的)。




同时进程2则消费了剩下的 8 条消息,分别是分区 0、2 的数据(总的还是只有三个实例取到了数据,只是分别在不同的进程里)。



当我关掉进程2,再发送10条数据时会发现所有数据又被进程1里的三个线程消费了。


通过这些测试相信大家已经可以看到消费组的优势了。


我们可以在一个消费组中创建多个消费实例来达到高可用、高容错的特性,不会出现单线程以及独立消费者挂掉之后数据不能消费的情况。同时基于多线程的方式也极大的提高了消费效率。


而当新增消费实例或者是消费实例挂掉时 Kakfa 会为我们重新分配消费实例与分区的关系就被称为消费组 Rebalance


发生这个的前提条件一般有以下几个:


  • 消费组中新增消费实例。


  • 消费组中消费实例 down 掉。


  • 订阅的 Topic 分区数发生变化。


  • 如果是正则订阅 Topic 时,匹配的 Topic 数发生变化也会导致 Rebalance


所以推荐使用这样的方式消费数据,同时扩展性也非常好。当性能不足新增分区时只需要启动新的消费实例加入到消费组中即可。


总结


本次只分享了几个不同消费数据的方式,并没有着重研究消费参数、源码;这些内容感兴趣的话可以在下次分享。


文中提到的部分源码可以在这里查阅:


github.com/crossoverJi…


相关文章
|
30天前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
44 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
261 9
|
3月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
128 0
|
3月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 缓存 Kafka
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
66 3
|
4月前
|
消息中间件 存储 Kafka
kafka 在 zookeeper 中保存的数据内容
kafka 在 zookeeper 中保存的数据内容
50 3
|
3月前
|
消息中间件 SQL Java
实时数仓 Hologres产品使用合集之如何用python将kafka数据写入
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。