聊聊 Kafka: Consumer 源码解析之 poll 模型

本文涉及的产品
云解析DNS,个人版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
简介: 聊聊 Kafka: Consumer 源码解析之 poll 模型

一、前言

前面的 Kafka 系列文章,我们讲过一篇:聊聊 Kafka:Consumer 源码解析之 ConsumerNetworkClient,那一篇主要讲的是 KafkaConsumer 类以及这个类里最重要的一个属性类 ConsumerNetworkClient。那这一篇我们来讲一下 KafkaConsumer 是怎么去拉取消息的,也就是本篇的的 Poll 的网络模型。

二、Consumer 的示例

下面我们来看一个 KafkaConsumer 的示例程序:

/**
 * @author: 微信公众号【老周聊架构】
 */
public class KafkaConsumerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        // kafka地址,列表格式为host1:port1,host2:port2,..
        props.put("bootstrap.servers", "localhost:9092");
        // key序列化方式 必须设置
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化方式 必须设置
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("group.id", "consumer_riemann_test");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 可消费多个topic,组成一个list
        String topic = "riemann_kafka_test";
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

不难看出,上面 Consumer 拉取消息的主要几个步骤:

  • 构造 Consumer 的相关 Properties 配置
  • 创建 KafkaConsumer 的对象 consumer
  • 订阅相应的 topic 列表
  • 调用 consumer 的 poll 方法拉取订阅的消息

前面三个步骤只是创建了一个 consumer 对象并订阅了 topic 主题,真正的逻辑是在第四步,也就是 poll 方法,这一步是理解 consumer 设计的关键所在。

三、poll 的网络模型

3.1 consumer poll

当一个 consumer 对象创建之后,只有 poll 方法调用时,consumer 才会真正去连接 kafka 集群,进行相关的操作,其 poll 方法具体实现如下:

consumer poll 主要做了如下几件事情:

  • ① 检查这个 consumer 是否可以拉取消息
  • ② 检查这个 consumer 是否订阅了相应的 topic-partition
  • ③ 调用 pollForFetches 方法获取相应的 records
  • ④ 在返回获取的 records 前,发送下一次的 fetch 请求,避免用户在下次请求时线程 block 在 pollForFetches 方法中。
  • ⑤ 如果在给定的时间内(notExpired)获取不到可用的 records,返回空数据。

3.1.1 ①

3.1.2 ②

3.1.3 ③

pollForFetches 方法主要做了以下几个事情:

  • ① 计算本次拉取的超时时间
  • ② 如果数据已经拉回到本地,直接返回数据。
  • ③ 说明上次 fetch 到的数据已经全部拉取了,需要再次发送 fetch 请求,从 broker 拉取数据。
  • ④ 通过调用 NetworkClient 的 poll 方法发起消息拉取操作(触发网络读写)
  • ⑤ 将从 broker 读取到的数据返回(即封装成消息)

3.1.3.1 ①

3.1.3.2 ②


重点看下 CompletedFetch 是 completedFetches 处理后的类型或者是 initializeCompletedFetch 初始化后的类型,几个重要的成员变量如下:


consumer 的 Fetcher 处理从 server 获取的 fetch response 大致分为以下几个过程:

  • 通过 completedFetches.peek() 获取已经成功的 fetch response(在 fetcher.sendFetches() 方法中会把成功的结果放在这个集合中,是拆分为 topic-partition 的粒度放进去的)
  • 获取下一个要处理的 nextInLineFetch,判断 CompletedFetch 是否未初始化,没有的话,则初始化。
  • 通过 fetchRecords(nextInLineFetch, recordsRemaining) 方法处理 CompletedFetch 对象,在这个里面会去验证 nextFetchOffset 是否能对得上,只有 nextFetchOffset 是一致的情况下才会去处理相应的数据,并更新 the fetch offset 的信息,如果 nextFetchOffset 不一致,这里就不会处理,the fetch offset 就不会更新,下次 fetch 请求时是会接着 the fetch offset 的位置去请求相应的数据。
  • 返回相应的 ConsumerRecord 数据。

3.1.3.3 ③

说明上次 fetch 到的数据已经全部拉取了,需要再次发送 fetch 请求,从 broker 拉取数据。

在发送的 fetch 的过程中,总共分为以下两步:

  • prepareFetchRequests():为订阅的所有 topic-partition list 创建 fetch 请求(只要该 topic-partition 没有还在处理的请求),创建的 fetch 请求依然是按照 node 级别创建的。
  • client.send(fetchTarget, request):发送 fetch 请求,并设置相应的 Listener,请求处理成功的话,就加入到 completedFetches 中,在加入这个 completedFetches 集合时,是按照 topic-partition 级别去加入,这样也就方便了后续的处理。

从这里可以看出,在每次发送 fetch 请求时,都会向所有可发送的 topic-partition 发送 fetch 请求,调用一次 fetcher.sendFetches(),拉取到的数据,可需要多次 pollForFetches 循环才能处理完,因为 Fetcher 线程是在后台运行,这也保证了尽可能少地阻塞用户的处理线程,因为如果 Fetcher 中没有可处理的数据,用户的线程是会阻塞在 poll 方法中的。

3.1.3.4 ④

调用底层 NetworkClient 提供的接口去发送相应的请求,可以看这一篇:聊聊 Kafka:Producer 的网络模型,只不过之前写的是关于 Producer 的网络模型,现在的是 Consumer 的 poll 模型,最终都会统一与提供的 NetworkClient#poll 进行交互。

3.1.3.5 ⑤

拉取器提供已经拉取到的记录集给 KafkaConsumer 调用,并更新 the consumed position。

3.1.4 ④

如果拉取到的消息集合不为空,再返回该批消息之前,如果还有积压的拉取请求,可以继续发送拉取请求,但此时会禁用 wakeup,主要的目的是用户在处理消息时,KafkaConsumer 还可以继续向 broker 拉取消息。

3.1.5 ⑤

四、总结

相信大家跟着老周上面的思路及源码分析,对 Consumer 的 poll 模型应该有个清晰的认识了。下面这张图代表了 Consumer 的整体网络模型的封装,我们把主要的一些组件了解后,再从整体到局部,我相信你会喜欢这样一层层剥洋葱的感觉的。




欢迎大家关注我的公众号老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。


相关文章
|
1天前
|
关系型数据库 分布式数据库 数据库
PolarDB-X源码解析:揭秘分布式事务处理
【7月更文挑战第3天】**PolarDB-X源码解析:揭秘分布式事务处理** PolarDB-X,应对大规模分布式事务挑战,基于2PC协议确保ACID特性。通过预提交和提交阶段保证原子性与一致性,使用一致性快照隔离和乐观锁减少冲突,结合故障恢复机制确保高可用。源码中的事务管理逻辑展现了优化的分布式事务处理流程,为开发者提供了洞察分布式数据库核心技术的窗口。随着开源社区的发展,更多创新实践将促进数据库技术进步。
10 3
|
5天前
|
消息中间件 Kafka 程序员
Kafka面试必备:深度解析Replica副本的作用与机制
**Kafka的Replica副本是保证数据可靠性的关键机制。每个Partition有Leader和Follower副本,Leader处理读写请求及管理同步,Follower被动同步并准备成为新Leader。从Kafka 2.4开始,Follower在完全同步时也可提供读服务,提升性能。数据一致性通过高水位机制和Leader Epoch机制保证,后者更精确地判断和恢复数据一致性,增强系统容错能力。**
12 1
|
6天前
|
消息中间件 监控 Kafka
深入解析:Kafka 为何不支持全面读写分离?
**Kafka 2.4 引入了有限的读写分离,允许Follower处理只读请求,以缓解Leader压力。但这不适用于所有场景,特别是实时数据流和日志分析,因高一致性需求及PULL同步方式导致的复制延迟,可能影响数据实时性和一致性。在设计系统时需考虑具体业务需求。**
7 1
|
7天前
|
NoSQL Java Redis
【源码解析】自动配置的这些细节都不知道,别说你会 springboot
【源码解析】自动配置的这些细节都不知道,别说你会 springboot
|
2天前
|
前端开发 开发者
深入解析Vite.js源码
【7月更文挑战第1天】Vite.js 深入解析:以其无bundle开发、动态ES模块加载提升开发效率;本地HTTP服务器配合WebSocket实现热更新;按需加载减少资源占用;预构建优化生产环境性能;基于Rollup的插件系统增强灵活性。Vite,一个创新且高效的前端构建工具。
11 0
|
7天前
|
存储 关系型数据库 MySQL
深入探索MySQL:成本模型解析与查询性能优化
深入探索MySQL:成本模型解析与查询性能优化
|
7天前
|
Java 容器 Spring
Spring5源码解析5-ConfigurationClassPostProcessor (上)
Spring5源码解析5-ConfigurationClassPostProcessor (上)
|
18天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
18天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
17天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
775 0

推荐镜像

更多