聊聊 Kafka: Consumer 源码解析之 poll 模型

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 聊聊 Kafka: Consumer 源码解析之 poll 模型

一、前言

前面的 Kafka 系列文章,我们讲过一篇:聊聊 Kafka:Consumer 源码解析之 ConsumerNetworkClient,那一篇主要讲的是 KafkaConsumer 类以及这个类里最重要的一个属性类 ConsumerNetworkClient。那这一篇我们来讲一下 KafkaConsumer 是怎么去拉取消息的,也就是本篇的的 Poll 的网络模型。

二、Consumer 的示例

下面我们来看一个 KafkaConsumer 的示例程序:

/**
 * @author: 微信公众号【老周聊架构】
 */
public class KafkaConsumerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        // kafka地址,列表格式为host1:port1,host2:port2,..
        props.put("bootstrap.servers", "localhost:9092");
        // key序列化方式 必须设置
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化方式 必须设置
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("group.id", "consumer_riemann_test");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 可消费多个topic,组成一个list
        String topic = "riemann_kafka_test";
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

不难看出,上面 Consumer 拉取消息的主要几个步骤:

  • 构造 Consumer 的相关 Properties 配置
  • 创建 KafkaConsumer 的对象 consumer
  • 订阅相应的 topic 列表
  • 调用 consumer 的 poll 方法拉取订阅的消息

前面三个步骤只是创建了一个 consumer 对象并订阅了 topic 主题,真正的逻辑是在第四步,也就是 poll 方法,这一步是理解 consumer 设计的关键所在。

三、poll 的网络模型

3.1 consumer poll

当一个 consumer 对象创建之后,只有 poll 方法调用时,consumer 才会真正去连接 kafka 集群,进行相关的操作,其 poll 方法具体实现如下:

consumer poll 主要做了如下几件事情:

  • ① 检查这个 consumer 是否可以拉取消息
  • ② 检查这个 consumer 是否订阅了相应的 topic-partition
  • ③ 调用 pollForFetches 方法获取相应的 records
  • ④ 在返回获取的 records 前,发送下一次的 fetch 请求,避免用户在下次请求时线程 block 在 pollForFetches 方法中。
  • ⑤ 如果在给定的时间内(notExpired)获取不到可用的 records,返回空数据。

3.1.1 ①

3.1.2 ②

3.1.3 ③

pollForFetches 方法主要做了以下几个事情:

  • ① 计算本次拉取的超时时间
  • ② 如果数据已经拉回到本地,直接返回数据。
  • ③ 说明上次 fetch 到的数据已经全部拉取了,需要再次发送 fetch 请求,从 broker 拉取数据。
  • ④ 通过调用 NetworkClient 的 poll 方法发起消息拉取操作(触发网络读写)
  • ⑤ 将从 broker 读取到的数据返回(即封装成消息)

3.1.3.1 ①

3.1.3.2 ②


重点看下 CompletedFetch 是 completedFetches 处理后的类型或者是 initializeCompletedFetch 初始化后的类型,几个重要的成员变量如下:


consumer 的 Fetcher 处理从 server 获取的 fetch response 大致分为以下几个过程:

  • 通过 completedFetches.peek() 获取已经成功的 fetch response(在 fetcher.sendFetches() 方法中会把成功的结果放在这个集合中,是拆分为 topic-partition 的粒度放进去的)
  • 获取下一个要处理的 nextInLineFetch,判断 CompletedFetch 是否未初始化,没有的话,则初始化。
  • 通过 fetchRecords(nextInLineFetch, recordsRemaining) 方法处理 CompletedFetch 对象,在这个里面会去验证 nextFetchOffset 是否能对得上,只有 nextFetchOffset 是一致的情况下才会去处理相应的数据,并更新 the fetch offset 的信息,如果 nextFetchOffset 不一致,这里就不会处理,the fetch offset 就不会更新,下次 fetch 请求时是会接着 the fetch offset 的位置去请求相应的数据。
  • 返回相应的 ConsumerRecord 数据。

3.1.3.3 ③

说明上次 fetch 到的数据已经全部拉取了,需要再次发送 fetch 请求,从 broker 拉取数据。

在发送的 fetch 的过程中,总共分为以下两步:

  • prepareFetchRequests():为订阅的所有 topic-partition list 创建 fetch 请求(只要该 topic-partition 没有还在处理的请求),创建的 fetch 请求依然是按照 node 级别创建的。
  • client.send(fetchTarget, request):发送 fetch 请求,并设置相应的 Listener,请求处理成功的话,就加入到 completedFetches 中,在加入这个 completedFetches 集合时,是按照 topic-partition 级别去加入,这样也就方便了后续的处理。

从这里可以看出,在每次发送 fetch 请求时,都会向所有可发送的 topic-partition 发送 fetch 请求,调用一次 fetcher.sendFetches(),拉取到的数据,可需要多次 pollForFetches 循环才能处理完,因为 Fetcher 线程是在后台运行,这也保证了尽可能少地阻塞用户的处理线程,因为如果 Fetcher 中没有可处理的数据,用户的线程是会阻塞在 poll 方法中的。

3.1.3.4 ④

调用底层 NetworkClient 提供的接口去发送相应的请求,可以看这一篇:聊聊 Kafka:Producer 的网络模型,只不过之前写的是关于 Producer 的网络模型,现在的是 Consumer 的 poll 模型,最终都会统一与提供的 NetworkClient#poll 进行交互。

3.1.3.5 ⑤

拉取器提供已经拉取到的记录集给 KafkaConsumer 调用,并更新 the consumed position。

3.1.4 ④

如果拉取到的消息集合不为空,再返回该批消息之前,如果还有积压的拉取请求,可以继续发送拉取请求,但此时会禁用 wakeup,主要的目的是用户在处理消息时,KafkaConsumer 还可以继续向 broker 拉取消息。

3.1.5 ⑤

四、总结

相信大家跟着老周上面的思路及源码分析,对 Consumer 的 poll 模型应该有个清晰的认识了。下面这张图代表了 Consumer 的整体网络模型的封装,我们把主要的一些组件了解后,再从整体到局部,我相信你会喜欢这样一层层剥洋葱的感觉的。




欢迎大家关注我的公众号老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。


相关文章
|
23小时前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
23小时前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
23小时前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
22天前
|
机器学习/深度学习 人工智能 PyTorch
Transformer模型变长序列优化:解析PyTorch上的FlashAttention2与xFormers
本文探讨了Transformer模型中变长输入序列的优化策略,旨在解决深度学习中常见的计算效率问题。文章首先介绍了批处理变长输入的技术挑战,特别是填充方法导致的资源浪费。随后,提出了多种优化技术,包括动态填充、PyTorch NestedTensors、FlashAttention2和XFormers的memory_efficient_attention。这些技术通过减少冗余计算、优化内存管理和改进计算模式,显著提升了模型的性能。实验结果显示,使用FlashAttention2和无填充策略的组合可以将步骤时间减少至323毫秒,相比未优化版本提升了约2.5倍。
41 3
Transformer模型变长序列优化:解析PyTorch上的FlashAttention2与xFormers
|
2天前
|
网络协议 安全 网络安全
探索网络模型与协议:从OSI到HTTPs的原理解析
OSI七层网络模型和TCP/IP四层模型是理解和设计计算机网络的框架。OSI模型包括物理层、数据链路层、网络层、传输层、会话层、表示层和应用层,而TCP/IP模型则简化为链路层、网络层、传输层和 HTTPS协议基于HTTP并通过TLS/SSL加密数据,确保安全传输。其连接过程涉及TCP三次握手、SSL证书验证、对称密钥交换等步骤,以保障通信的安全性和完整性。数字信封技术使用非对称加密和数字证书确保数据的机密性和身份认证。 浏览器通过Https访问网站的过程包括输入网址、DNS解析、建立TCP连接、发送HTTPS请求、接收响应、验证证书和解析网页内容等步骤,确保用户与服务器之间的安全通信。
14 1
|
24天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
51 12
|
20天前
|
PyTorch Shell API
Ascend Extension for PyTorch的源码解析
本文介绍了Ascend对PyTorch代码的适配过程,包括源码下载、编译步骤及常见问题,详细解析了torch-npu编译后的文件结构和三种实现昇腾NPU算子调用的方式:通过torch的register方式、定义算子方式和API重定向映射方式。这对于开发者理解和使用Ascend平台上的PyTorch具有重要指导意义。
|
1天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
103 1
|
2月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
59 1

推荐镜像

更多