聊聊 Kafka: Consumer 源码解析之 poll 模型

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 聊聊 Kafka: Consumer 源码解析之 poll 模型

一、前言

前面的 Kafka 系列文章,我们讲过一篇:聊聊 Kafka:Consumer 源码解析之 ConsumerNetworkClient,那一篇主要讲的是 KafkaConsumer 类以及这个类里最重要的一个属性类 ConsumerNetworkClient。那这一篇我们来讲一下 KafkaConsumer 是怎么去拉取消息的,也就是本篇的的 Poll 的网络模型。

二、Consumer 的示例

下面我们来看一个 KafkaConsumer 的示例程序:

/**
 * @author: 微信公众号【老周聊架构】
 */
public class KafkaConsumerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        // kafka地址,列表格式为host1:port1,host2:port2,..
        props.put("bootstrap.servers", "localhost:9092");
        // key序列化方式 必须设置
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化方式 必须设置
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("group.id", "consumer_riemann_test");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 可消费多个topic,组成一个list
        String topic = "riemann_kafka_test";
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

不难看出,上面 Consumer 拉取消息的主要几个步骤:

  • 构造 Consumer 的相关 Properties 配置
  • 创建 KafkaConsumer 的对象 consumer
  • 订阅相应的 topic 列表
  • 调用 consumer 的 poll 方法拉取订阅的消息

前面三个步骤只是创建了一个 consumer 对象并订阅了 topic 主题,真正的逻辑是在第四步,也就是 poll 方法,这一步是理解 consumer 设计的关键所在。

三、poll 的网络模型

3.1 consumer poll

当一个 consumer 对象创建之后,只有 poll 方法调用时,consumer 才会真正去连接 kafka 集群,进行相关的操作,其 poll 方法具体实现如下:

consumer poll 主要做了如下几件事情:

  • ① 检查这个 consumer 是否可以拉取消息
  • ② 检查这个 consumer 是否订阅了相应的 topic-partition
  • ③ 调用 pollForFetches 方法获取相应的 records
  • ④ 在返回获取的 records 前,发送下一次的 fetch 请求,避免用户在下次请求时线程 block 在 pollForFetches 方法中。
  • ⑤ 如果在给定的时间内(notExpired)获取不到可用的 records,返回空数据。

3.1.1 ①

3.1.2 ②

3.1.3 ③

pollForFetches 方法主要做了以下几个事情:

  • ① 计算本次拉取的超时时间
  • ② 如果数据已经拉回到本地,直接返回数据。
  • ③ 说明上次 fetch 到的数据已经全部拉取了,需要再次发送 fetch 请求,从 broker 拉取数据。
  • ④ 通过调用 NetworkClient 的 poll 方法发起消息拉取操作(触发网络读写)
  • ⑤ 将从 broker 读取到的数据返回(即封装成消息)

3.1.3.1 ①

3.1.3.2 ②


重点看下 CompletedFetch 是 completedFetches 处理后的类型或者是 initializeCompletedFetch 初始化后的类型,几个重要的成员变量如下:


consumer 的 Fetcher 处理从 server 获取的 fetch response 大致分为以下几个过程:

  • 通过 completedFetches.peek() 获取已经成功的 fetch response(在 fetcher.sendFetches() 方法中会把成功的结果放在这个集合中,是拆分为 topic-partition 的粒度放进去的)
  • 获取下一个要处理的 nextInLineFetch,判断 CompletedFetch 是否未初始化,没有的话,则初始化。
  • 通过 fetchRecords(nextInLineFetch, recordsRemaining) 方法处理 CompletedFetch 对象,在这个里面会去验证 nextFetchOffset 是否能对得上,只有 nextFetchOffset 是一致的情况下才会去处理相应的数据,并更新 the fetch offset 的信息,如果 nextFetchOffset 不一致,这里就不会处理,the fetch offset 就不会更新,下次 fetch 请求时是会接着 the fetch offset 的位置去请求相应的数据。
  • 返回相应的 ConsumerRecord 数据。

3.1.3.3 ③

说明上次 fetch 到的数据已经全部拉取了,需要再次发送 fetch 请求,从 broker 拉取数据。

在发送的 fetch 的过程中,总共分为以下两步:

  • prepareFetchRequests():为订阅的所有 topic-partition list 创建 fetch 请求(只要该 topic-partition 没有还在处理的请求),创建的 fetch 请求依然是按照 node 级别创建的。
  • client.send(fetchTarget, request):发送 fetch 请求,并设置相应的 Listener,请求处理成功的话,就加入到 completedFetches 中,在加入这个 completedFetches 集合时,是按照 topic-partition 级别去加入,这样也就方便了后续的处理。

从这里可以看出,在每次发送 fetch 请求时,都会向所有可发送的 topic-partition 发送 fetch 请求,调用一次 fetcher.sendFetches(),拉取到的数据,可需要多次 pollForFetches 循环才能处理完,因为 Fetcher 线程是在后台运行,这也保证了尽可能少地阻塞用户的处理线程,因为如果 Fetcher 中没有可处理的数据,用户的线程是会阻塞在 poll 方法中的。

3.1.3.4 ④

调用底层 NetworkClient 提供的接口去发送相应的请求,可以看这一篇:聊聊 Kafka:Producer 的网络模型,只不过之前写的是关于 Producer 的网络模型,现在的是 Consumer 的 poll 模型,最终都会统一与提供的 NetworkClient#poll 进行交互。

3.1.3.5 ⑤

拉取器提供已经拉取到的记录集给 KafkaConsumer 调用,并更新 the consumed position。

3.1.4 ④

如果拉取到的消息集合不为空,再返回该批消息之前,如果还有积压的拉取请求,可以继续发送拉取请求,但此时会禁用 wakeup,主要的目的是用户在处理消息时,KafkaConsumer 还可以继续向 broker 拉取消息。

3.1.5 ⑤

四、总结

相信大家跟着老周上面的思路及源码分析,对 Consumer 的 poll 模型应该有个清晰的认识了。下面这张图代表了 Consumer 的整体网络模型的封装,我们把主要的一些组件了解后,再从整体到局部,我相信你会喜欢这样一层层剥洋葱的感觉的。




欢迎大家关注我的公众号老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。


相关文章
|
3天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
14 2
|
30天前
|
开发框架 供应链 监控
并行开发模型详解:类型、步骤及其应用解析
在现代研发环境中,企业需要在有限时间内推出高质量的产品,以满足客户不断变化的需求。传统的线性开发模式往往拖慢进度,导致资源浪费和延迟交付。并行开发模型通过允许多个开发阶段同时进行,极大提高了产品开发的效率和响应能力。本文将深入解析并行开发模型,涵盖其类型、步骤及如何通过辅助工具优化团队协作和管理工作流。
57 3
|
3天前
|
存储 网络协议 安全
30 道初级网络工程师面试题,涵盖 OSI 模型、TCP/IP 协议栈、IP 地址、子网掩码、VLAN、STP、DHCP、DNS、防火墙、NAT、VPN 等基础知识和技术,帮助小白们充分准备面试,顺利踏入职场
本文精选了 30 道初级网络工程师面试题,涵盖 OSI 模型、TCP/IP 协议栈、IP 地址、子网掩码、VLAN、STP、DHCP、DNS、防火墙、NAT、VPN 等基础知识和技术,帮助小白们充分准备面试,顺利踏入职场。
13 2
|
3天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
16天前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
36 3
|
18天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
49 2
|
22天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
15 1
|
10天前
|
安全 测试技术 Go
Go语言中的并发编程模型解析####
在当今的软件开发领域,高效的并发处理能力是提升系统性能的关键。本文深入探讨了Go语言独特的并发编程模型——goroutines和channels,通过实例解析其工作原理、优势及最佳实践,旨在为开发者提供实用的Go语言并发编程指南。 ####
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
66 0
|
1月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
52 0

推荐镜像

更多