聊聊 Kafka: Producer 的网络模型

简介: 聊聊 Kafka: Producer 的网络模型

一、Producer 的网络模型

我们前面几篇有说 Producer 发送流程的源码分析,但那个是大的轮廓,涉及到发送很多相关的内容,比如:

  • 获取 topic 的 metadata 信息
  • key 和 value 的序列化
  • 获取该 record 要发送到的 partition
  • 向 RecordAccmulator 中追加 record 数据
  • 唤醒 sender 线程发送 RecordBatch



那这篇老周主要来说下 Producer 的网络模型,这里直接给出 Producer 的网络模型图,如下:


从图中可以看出,KafkaProducer 相当于客户端,与 Sender 调用层交互,Sender 调用 NetworkClient,NetworkClient 调用 Selector,而 Selector 底层封装了 Java NIO 的相关接口。心中有了 Producer 的网络模型大致轮廓后,我们接下来就可以来分析 Producer 的网络模型。

二、Producer 与 Broker 的交互流程

我们在业务代码通过生产者 producer 调用 send 方法来发送消息,不难发现都是通过走 Producer 的实现类 KafkaProducer 的 send 方法:

2.1 org.apache.kafka.clients.producer.KafkaProducer#doSend

上面的两个 send 方法最终会走到 doSend 方法里来:

这块的源码老周在前两篇的 Producer 源码解析那一篇分析了的哈,这里主要说下与 Broker 通信的交互分析。主要有两点:

  • waitOnMetadata():请求 tp(topic-partition)元数据 metadata 更新,中间会调用 sender.wakeup()。
  • accumulator.append():将 record 对应的 tp 写入到 deque 中,如果该 tp 对应的 deque batch 是满了或者新建了一个 batch,则会调用 sender.wakeup()。


主要看下 sender.wakeup() 方法,主要作用就是将 Sender 线程从阻塞中唤醒。

2.2 org.apache.kafka.clients.producer.internals.Sender#wakeup

/**
 * Wake up the selector associated with this send thread
 */
public void wakeup() {
    this.client.wakeup();
}
/**
 * Interrupt the client if it is blocked waiting on I/O.
 */
@Override
public void wakeup() {
    this.selector.wakeup();
}
/**
 * Interrupt the nioSelector if it is blocked waiting to do I/O.
 */
@Override
public void wakeup() {
    this.nioSelector.wakeup();
}

不难发现,调用链是:

Sender -> NetworkClient -> Selector(Kafka 封装的)-> Selector(java.nio.channels.Selector Java NIO)

wakeup() 的主要作用就是唤醒阻塞在 select()/select(long) 上的线程,为什么要唤醒?因为注册了新的 channel 或者事件。

再回到 Kafka 这里,KafkaProducer 中 dosend() 方法调用 sender.wakeup() 方法作用就很明显了。作用就是:当有新的 RecordBatch 创建后,旧的 RecordBatch 就可以发送了,如果线程阻塞在 select() 方法中,就将其唤醒,Sender 重新开始运行 run() 方法,在这个方法中,旧的 RecordBatch 将会被选中,进而可以及时将这些请求发送出去。

2.3 org.apache.kafka.clients.producer.internals.Sender#run

跟到 runOnce 方法里去:

继续跟,核心是 Sender 线程每次循环具体执行的地方,即 sendProducerData() 方法:

最后调用 client.poll() 方法,关于 socket 的一些实际的读写操作。

我们来小结一下 Sender.run() 方法的大致流程,主要分为以下五步:

  • accumulator.ready():遍历整个 batches(key:TopicPartition,value: Deque>),如果 ProducerBatch 不为空,就将其对应的 leader 选出来,最后会返回一个可以发送 ReadyCheckResult 实例,readyNodes 是主要的成员变量。


  • 如果有 tp 的 leader 是未知的,就强制 metadata 更新。遍历未知 leader 的主题(包含 leader 选举未决的主题以及可能已经过期的主题),再次将主题添加到元数据以确保它被包含并请求元数据更新,因为有消息要发送到该主题,调用 requestUpdate() 方法来更新。
  • accumulator.drain():遍历每个 leader (第一步中选出)节点,获取该节点上所有的 tp,如果该 tp 对应的 ProducerBatch 不在 backoff 期间(没有重试过或者重试了但是间隔已经达到了 retryBackoffMs),并且 ProducerBatch 的大小不大于 maxSize(一个 request 的最大限制默认 1 MB)或 ProducerBatch 的集合是空的,那么就把这个 ProducerBatch 添加 list 中,最终返回的类型为 Map<Integer, List>,key 为 leader.id,value 为要发送的 ProducerBatch 的列表。
  • sendProduceRequests():发送 Producer 请求,这个方法会调用 NetworkClient.send() 来发送 clientRequest。
  • NetworkClient.poll():关于 socket 的一些实际的读写操作,这个方法会继续调用 Kafka 封装的 Selector.poll(),跟进去底层是调用的 Java NIO 的 Selector.poll()。

2.4 org.apache.kafka.clients.NetworkClient#poll

主要分为以下三步:

  • metadataUpdater.maybeUpdate():判断是否需要更新 metadata 元数据。选择具有最少未完成请求且至少符合连接条件的节点。此方法将更喜欢具有现有连接的节点,但如果所有现有连接都在使用,则可能会选择我们还没有连接的节点。如果不存在连接,则此方法将首选最近连接尝试最少的节点。这种方法永远不会选择一个没有现有连接的节点,并且我们在重新连接回退期间断开了连接,或者选择了一个被限制的活动连接。
  • selector.poll():进行 socket 相关的 IO 操作。
  • 处理完成后的操作:在一个 select() 过程之后的相关处理
  • handleAbortedSends(responses):如果由于不受支持的版本异常或断开连接而中止发送,请立即处理,无需等待 Selector#poll。
  • handleCompletedSends(responses, updatedNow):处理任何已完成的请求发送。特别是如果预期没有响应,则认为请求已完成。
  • handleCompletedReceives(responses, updatedNow):处理任何已完成的接收并使用接收到的响应更新响应列表。(MetadataResponse、ApiVersionsResponse 都是在这处理的)
  • handleDisconnections(responses, updatedNow):处理连接失败的那些连接,重新请求 metadata。
  • handleConnections():处理新建立的那些连接(还不能发送请求,比如:还未认证)
  • handleInitiateApiVersionRequests(updatedNow):对那些新建立的连接,发送 apiVersionRequest(默认情况:第一次建立连接时,需要向 Broker 发送 ApiVersionRequest 请求)
  • handleTimedOutRequests(responses, updatedNow):处理 timeout 的连接,关闭该连接,并刷新 metadata。

2.5 org.apache.kafka.common.network.Selector#poll

Kafka 中的 Selector 类主要是 Java NIO 相关接口的封装,Socket 相关 IO 操作都是在这个类中完成的。主要操作都是在下面这个方法中调用的:

2.5.1 org.apache.kafka.common.network.Selector#clear

2.5.2 org.apache.kafka.common.network.Selector#select

Selector.select() 方法底层还是调用的 Java NIO 的原生接口,这里的 nioSelector 其实就是 java.nio.channels.Selector 的实例对象,这个方法最坏情况下,会阻塞 ms 的时间,如果在一次轮询,只要有一个 Channel 的事件就绪,它就会立马返回。

/**
 * Check for data, waiting up to the given timeout.
 *
 * @param timeoutMs Length of time to wait, in milliseconds, which must be non-negative
 * @return The number of keys ready
 */
private int select(long timeoutMs) throws IOException {
    if (timeoutMs < 0L)
        throw new IllegalArgumentException("timeout should be >= 0");
    if (timeoutMs == 0L)
        return this.nioSelector.selectNow();
    else
        return this.nioSelector.select(timeoutMs);
}

2.5.3 org.apache.kafka.common.network.Selector#pollSelectionKeys

这部分代码是 socket IO 的主要部分,发送 Send 及接收 Receive 都是在这里完成的,在 poll() 方法中,这个方法会调用三次:

  • 处理从具有缓冲数据的通道的轮询事件
  • 处理已经就绪的事件,进行相应的 IO 操作。
  • 处理新建立的那些连接,添加缓存及传输层的握手与认证。

2.5.4 org.apache.kafka.common.network.Selector#addToCompletedReceives

/**
 * adds a receive to completed receives
 */
private void addToCompletedReceives(KafkaChannel channel, NetworkReceive networkReceive, long currentTimeMs) {
    if (hasCompletedReceive(channel))
        throw new IllegalStateException("Attempting to add second completed receive to channel " +                      channel.id());
    // 添加到 completedReceives 中
    this.completedReceives.put(channel.id(), networkReceive);
    sensors.recordCompletedReceive(channel.id(), networkReceive.size(), currentTimeMs);
}

接收到的所有 Receive 都会被放入到 completedReceives 的集合中等待后续处理。

三、总结

总结的话我就不复述了,上面的源码流程说的很清楚了。最后再来一张 Producer 网络模型的时序图:

希望对你有所帮助,我们下期再见。



Java

相关文章
|
4月前
|
C++
基于Reactor模型的高性能网络库之地址篇
这段代码定义了一个 InetAddress 类,是 C++ 网络编程中用于封装 IPv4 地址和端口的常见做法。该类的主要作用是方便地表示和操作一个网络地址(IP + 端口)
282 58
|
4月前
|
网络协议 算法 Java
基于Reactor模型的高性能网络库之Tcpserver组件-上层调度器
TcpServer 是一个用于管理 TCP 连接的类,包含成员变量如事件循环(EventLoop)、连接池(ConnectionMap)和回调函数等。其主要功能包括监听新连接、设置线程池、启动服务器及处理连接事件。通过 Acceptor 接收新连接,并使用轮询算法将连接分配给子事件循环(subloop)进行读写操作。调用链从 start() 开始,经由线程池启动和 Acceptor 监听,最终由 TcpConnection 管理具体连接的事件处理。
166 2
|
4月前
基于Reactor模型的高性能网络库之Tcpconnection组件
TcpConnection 由 subLoop 管理 connfd,负责处理具体连接。它封装了连接套接字,通过 Channel 监听可读、可写、关闭、错误等
154 1
|
4月前
|
JSON 监控 网络协议
干货分享“对接的 API 总是不稳定,网络分层模型” 看电商 API 故障的本质
本文从 OSI 七层网络模型出发,深入剖析电商 API 不稳定的根本原因,涵盖物理层到应用层的典型故障与解决方案,结合阿里、京东等大厂架构,详解如何构建高稳定性的电商 API 通信体系。
|
6月前
|
域名解析 网络协议 安全
计算机网络TCP/IP四层模型
本文介绍了TCP/IP模型的四层结构及其与OSI模型的对比。网络接口层负责物理网络接口,处理MAC地址和帧传输;网络层管理IP地址和路由选择,确保数据包准确送达;传输层提供端到端通信,支持可靠(TCP)或不可靠(UDP)传输;应用层直接面向用户,提供如HTTP、FTP等服务。此外,还详细描述了数据封装与解封装过程,以及两模型在层次划分上的差异。
1142 13
|
6月前
|
网络协议 中间件 网络安全
计算机网络OSI七层模型
OSI模型分为七层,各层功能明确:物理层传输比特流,数据链路层负责帧传输,网络层处理数据包路由,传输层确保端到端可靠传输,会话层管理会话,表示层负责数据格式转换与加密,应用层提供网络服务。数据在传输中经过封装与解封装过程。OSI模型优点包括标准化、模块化和互操作性,但也存在复杂性高、效率较低及实用性不足的问题,在实际中TCP/IP模型更常用。
889 10
|
1月前
|
机器学习/深度学习 数据采集 人工智能
深度学习实战指南:从神经网络基础到模型优化的完整攻略
🌟 蒋星熠Jaxonic,AI探索者。深耕深度学习,从神经网络到Transformer,用代码践行智能革命。分享实战经验,助你构建CV、NLP模型,共赴二进制星辰大海。
|
2月前
|
机器学习/深度学习 传感器 算法
【无人车路径跟踪】基于神经网络的数据驱动迭代学习控制(ILC)算法,用于具有未知模型和重复任务的非线性单输入单输出(SISO)离散时间系统的无人车的路径跟踪(Matlab代码实现)
【无人车路径跟踪】基于神经网络的数据驱动迭代学习控制(ILC)算法,用于具有未知模型和重复任务的非线性单输入单输出(SISO)离散时间系统的无人车的路径跟踪(Matlab代码实现)
193 2
|
2月前
|
机器学习/深度学习 并行计算 算法
【CPOBP-NSWOA】基于豪冠猪优化BP神经网络模型的多目标鲸鱼寻优算法研究(Matlab代码实现)
【CPOBP-NSWOA】基于豪冠猪优化BP神经网络模型的多目标鲸鱼寻优算法研究(Matlab代码实现)
|
4月前
基于Reactor模型的高性能网络库之Poller(EpollPoller)组件
封装底层 I/O 多路复用机制(如 epoll)的抽象类 Poller,提供统一接口支持多种实现。Poller 是一个抽象基类,定义了 Channel 管理、事件收集等核心功能,并与 EventLoop 绑定。其子类 EPollPoller 实现了基于 epoll 的具体操作,包括事件等待、Channel 更新和删除等。通过工厂方法可创建默认的 Poller 实例,实现多态调用。
288 60

热门文章

最新文章