Kafka 生产者的网络架构
初学一个技术,怎么了解该技术的源码至关重要。
对我而言,最佳的阅读源码的方式,那就是:不求甚解,观其大略
你如果进到庐山里头,二话不说,蹲下头来,弯下腰,就对着某棵树某棵小草猛研究而不是说先把庐山的整体脉络研究清楚了,那么你的学习方法肯定效率巨低而且特别痛苦。
最重要的还是慢慢地打击你的积极性,说我的学习怎么那么不 happy 啊,怎么那么没劲那,因为你的学习方法错了,大体读明白,先拿来用,用着用着,很多道理你就明白了。
先从整体上把关源码,再去扣一些细节问题。
举个简单的例子:
如果你刚接触 HashMap,你刚有兴趣去看其源码,在看 HashMap 的时候,有一个知识:当链表长度达到 8 之后,就变为了红黑树,小于 6 就变成了链表,当然,还和当前的长度有关。
这个时候,如果你去深究红黑树、为什么是 8 不是别的,又去查 泊松分布,最终会慢慢的搞死自己。
所以,正确的做法,我们先把这一部分给略过去,知道这个概念即可,等后面我们把整个庐山看完之后,再回过头抠细节。
当然,本章我们讲述Kafka 生产者的网络架构
一、引言
kafka生产端的组成主要由以下几方面构成:
- 生产端的初始化
- 元数据的更新
- 缓存池(BufferPool)机制
- 网络架构模型
- 消息发送
其中,我们 生产端的初始化、元数据的更新、缓存池(BufferPool)机制已经介绍完毕,今天我们来看看 网络架构模型
废话不多说,老司机开始发车
二、网络架构模型
从我们之前的讲解中,我们可以知道,生产端最重要的几个技术点:
- KafkaProducer:主要将消息发送至
RecordAccumulator
并唤醒Sender
- Sender:调用
NetworkClient
将RecordAccumulator
的消息发送至Broker
- NetworkClient:
Kafka
对Java NIO
的封装
而正是它们几个组成了 Kafka 生产者的网络架构,其网络模型如下:
不难看出,我们 Kafka
生产者最终的网络架构也是使用的 Java NIO
,和我们的 Netty
殊途同归。
至于 kafka
为什么不用 Netty
做通信组件,
,这个之间在 【Kafka从成神到升仙系列 三】你真的了解 Kafka 的元数据嘛 已经讲过,此处不再叙述,有兴趣的同学可以跳转阅读。
三、网络架构整体流程
上面我们了解了 Kafka
生产端的几个网络组件及其对应的关系
我们深入的看一下,这几个组件之间到底是如何进行数据的处理及业务的处理的
网络架构整体流程如下所示:
这里涉及的主要几个方法:
- KafkaProducer
- waitOnMetadata:等待更新元数据
- accumulator.append:消息发送到缓冲区
- sender.wakeup:唤醒
Sender
线程
- Sender
- accumulator.ready:得到符合发送规定的节点
- metadata.requestUpdate:是否更新元数据
- remove any nodes:删除尚未建立连接的节点
- accumulator.drain:得到每个节点需要发送的消息批次
- createProduceRequests:组装成客户端请求
- client.send:调用
NetworkClient
设置事件类型 - client.poll:调用
NetworkClient
发送消息
- NetworkClient
- send:调用
Selector
设置事件类型
- poll:调用
Selector
发送消息
- Kafka-Selector
- send:设置事件类型
- poll:发送消息
可能大多数的小伙伴这个时候已经有点晕了,没关系,我们本篇文章就是解决你晕的问题的
我们会从 Producer
的源码一直会讲到 Selector
的源码并最终通过打日志的方式验证我们的猜想
戴好安全带,我们发车了
四、网络架构源码剖析
1、KafkaProducer
对于 KafkaProducer
来说,其最重要的功能就是将 record
发送至我们的 RecordAccumulator
中去
1.1 waitOnMetadata
这个方法相信看过上篇博客:【Kafka从成神到升仙系列 三】你真的了解 Kafka 的元数据嘛,已经有印象
对,没错,这个就是我们 kafka
在发送消息时,会优先请求 Broker
获取元数据信息,然后再去发送消息
具体细节的话,这里也不叙述了
总之:第一次发送消息时,这里会判断当前是否拿到了元数据。如果没有拿到元数据信息,这里会堵塞循环并唤醒 Sender
线程,让其帮忙更新元数据。
1.2 accumulator.append
这个其实我们这篇博客中也讲过:【Kafka从成神到升仙系列 二】生产者如何将消息放入到内存缓冲区
具体的细节如上,更多的细节可以参考上面那篇博客
1.3 sender.wakeup
当我们 **首次获取元数据 **或者 当前的 batch 满了 或者 一个新的 batch 创建了,我们都可以去唤醒我们的 Sender
,让这个线程执行我们的业务。
- 首次获取元数据:让
Sender
去更新元数据信息 - 当前的
batch
满了 或者 一个新的batch
创建:让Sender
将batch
发送至Broker
那这个 sender.wakeup
到底执行了什么呢,我们一起来看看其执行流程与执行代码
// 类 = KafkaProducer sender.wakeup(); // 类 = Sender public void wakeup() { this.client.wakeup(); } // 类 = NetworkClient public void wakeup() { this.selector.wakeup(); } // 类 = Selector public void wakeup() { this.nioSelector.wakeup(); } // 类 = WindowsSelectorImpl public Selector wakeup() { // Java NIO 包里面的操作 }
这里可以看到,整体的调用流程和我们上面的 网络架构 是一样的,也侧面验证了我们上面的 网络架构 是正确的。
不难看出,sender.wakeup()
实际上是唤醒了 Java NIO
里面的 Selector
,让其能够接受所有的 keys
,从而完成通信的链接与发送
2. Sender
Sender 线程的东西稍微有点多,但核心只有两个:
- 更新元数据消息
- 将消息发送至
Broker
当 Sender
线程启动时,会启动如下代码:
public void run() { while (running) { run(time.milliseconds()); } } void run(long now) { // 业务代码 }
从代码中不难看出,当我们启动 Sender
线程之后,Sender
线程会不断的轮询调用 run(long now)
该方法,执行其业务。
那 run(long now)
方法到底做了些什么呢,我们一起来看一下
2.1 accumulator.ready
遍历所有的 TopicPartition,获取每一个 TopicPartition 的 Leader 节点
弹出每一个 TopicPartition 的第一个 batch,校验该 batch 有没有符合发送的规定
如果该 batch 符合了发送的规定后,将节点放至 readyNodes 中,标识该节点已经可以发送数据了
public ReadyCheckResult ready(Cluster cluster, long nowMs) { // 准备好的节点 Set<Node> readyNodes = new HashSet<>(); // 遍历所有的 TopicPartition for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<RecordBatch> deque = entry.getValue(); // 获取当前Partition的leader节点 Node leader = cluster.leaderFor(part); if (leader == null) { unknownLeadersExist = true; } else if (!readyNodes.contains(leader) && !muted.contains(part)) { synchronized (deque) { // 弹出每一个 TopicPartition 的第一个batch RecordBatch batch = deque.peekFirst(); if (batch != null) { // bactch 满足 batch.size() 或者 时间达到 linger.ms、 boolean full = deque.size() > 1 || batch.records.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { // 将当前的节点添加至准备好的队列中 readyNodes.add(leader); } else { nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } // 最终返回该节点(这里最重要的还是 Set<String> 也就是准备好的节点集合) return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist); } public ReadyCheckResult(Set<Node> readyNodes, long nextReadyCheckDelayMs, boolean unknownLeadersExist) { this.readyNodes = readyNodes; this.nextReadyCheckDelayMs = nextReadyCheckDelayMs; this.unknownLeadersExist = unknownLeadersExist; }
2.2 metadata.requestUpdate
- 如果发现有
TopicPartition
没有 leader,那么这里就调用requestUpdate()
方法更新 metadata
// 如果这个地方是 True,说明我们上面有的 TopicPartition 的 leader 节点为 null if (result.unknownLeadersExist){ // 更新元数据 this.metadata.requestUpdate(); } // 设置标记位为true,后续进行更新 public synchronized int requestUpdate() { this.needUpdate = true; return this.version; }
2.3 remove any nodes
- 遍历所有准备好的节点,利用
NetworkClient
来判断改节点是不是已经准备完毕 - 如果该节点未准备完毕,则从
readyNodes
中剔除 - 节点未准备完毕,会初始化链接该节点,便于下一次的消息发送
PS:这里可能会有同学对上面已经准备好了,下面为什么还有准备好的逻辑筛选有疑问
- 第一步筛选的是
TopicPartition
对应的batch
已经满足了发送的必要 - 第二步筛选的是
TopicPartition
对应的Broker
是否建立了链接,如果不是则初始化链接
// 遍历所有准备好的节点 Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); // 利用 NetworkClient 来判断改节点是不是已经准备完毕 // 如果还未准备好,从准备好的队列中剔除掉 if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // 判断节是否准备好发送 // 如果没有准备好发送,则会与该节点初始化链接,便于下一次的消息发送 public boolean ready(Node node, long now) { // 已经准备好 if (isReady(node, now)){ return true; } // 与该节点的初始化 if (connectionStates.canConnect(node.idString(), now)){ initiateConnect(node, now); } return false; }
2.4 accumulator.drain
- 遍历所有准备好的
readyNodes
,得到该Broker
上所有的PartitionInfo
信息,判断该Partition
是否被处理中,如果没有在处理中则获取其对应的Deque
- 弹出队列中的
First
,判断其是否在backoff (没有重试过,或者重试了但是间隔已经达到了retryBackoffMs)
且加上该 batch 的大小 < maxRequestSize
,该batch
符合规定 - 将该
batch
放进readyRecordBatchList
中,最终放进Map
,这样我们一个Broker
可以发送的batch
就已经整理完毕。 - 最终我们得到
Map>
,key
代表当前已经连接好的Broker
,value
代表当前需要发送的batch
// 生成节点对应的batch消息 Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,result.readyNodes,this.maxRequestSize, now); public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes,int maxSize,long now) { Map<Integer, List<RecordBatch>> batches = new HashMap<>(); // 遍历所有准备好的node节点 for (Node node : nodes) { int size = 0; // 通过node节点获取其所有的Partition List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); // 存储该节点需要发送的Batch List<RecordBatch> ready = new ArrayList<>(); int start = drainIndex = drainIndex % parts.size(); do { // 取Partition PartitionInfo part = parts.get(drainIndex); TopicPartition tp = new TopicPartition(part.topic(), part.partition()); // 当分区没有正在进行的批处理时 if (!muted.contains(tp)) { // 获取该分区的所有的RecordBatch Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition())); if (deque != null) { synchronized (deque) { // 查看队列第一个 RecordBatch first = deque.peekFirst(); if (first != null) { // 判断其重试与时间 boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now; if (!backoff) { // 判断是否超越最大发送限制 if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { break; } else { // 取出队列第一个 RecordBatch batch = deque.pollFirst(); batch.records.close(); // 当前发送的大小累积 size += batch.records.sizeInBytes(); // 放入准备好的列表中 ready.add(batch); batch.drainedMs = now; } } } } } } this.drainIndex = (this.drainIndex + 1) % parts.size(); } while (start != drainIndex); // 将节点与准备好的batch列表对应 batches.put(node.id(), ready); } // 最终返回:所有准备好的节点与对应的batch列表 return batches; }
2.5 createProduceRequests
- 遍历刚刚我们得到的
Map,组装成客户端请求
List<ClientRequest> requests = createProduceRequests(batches, now); // 组装客户端请求 private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) { List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size()); for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet()) requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue())); return requests; }
2.6 client.send
遍历每一个客户端请求并进行发送
PS:这里的发送是通过
KafkaClient
提供的接口,具体由 NetworkClient
实现,我们后面会讲
for (ClientRequest request : requests){ client.send(request, now); }
2.7 client.poll
发送消息
PS:这里也同样是通过
KafkaClient
提供的接口,具体由 NetworkClient
实现,我们后面会讲
this.client.poll(pollTimeout, now);
3. NetworkClient
我们的
Sender
将 Producer
发送的消息进行 校验、筛选、组装,让我们的 NetworkClient
进一步的将消息发送
3.1 send
拿到当前客户端请求的
node
,校验其是否有权限如果有权限的话,我们设置下时间并添加到到
inFlightRequests
,调用selector
进行发送(这里提前剧透一下,send
方法虽然叫发送,实际上并没有发送,只是注册了写事件,后面会讲到)
inFlightRequeests 的作用:
缓存已经发出去但还没有收到响应的请求,保存对象的具体形式为
Map>
配置参数
max.in.flight.requests.per.connection
,默认值为5,即每个连接最多只能缓存5个未收到响应的请求,超过这个数值之后便不能再往这个连接发送更多的请求了
public void send(ClientRequest request, long now) { // 拿到当前客户端请求的node String nodeId = request.request().destination(); // 是否可以发送请求(我们前面已经校验过,一般情况下都能够发送) if (!canSendRequest(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); doSend(request, now); } private void doSend(ClientRequest request, long now) { // 设置时间 request.setSendTimeMs(now); // 将当前请求添加到 inFlightRequests this.inFlightRequests.add(request); selector.send(request.request()); }
3.2 poll
判断当前需要更新元数据,如果需要则更新元数据
调用
selector
的poll
方法进行Socket IO
的操作(这里也在后面会讲到)处理完成之后的操作
处理已经完成的 send
处理从 server 端接收到 Receive
处理连接失败那些连接
处理新建立的那些连接
处理超时的连接
如果回调的话,处理回调的信息
public List<ClientResponse> poll(long timeout, long now) { // 判断当前需要更新元数据,如果需要则更新元数据 long metadataTimeout = metadataUpdater.maybeUpdate(now); // 调用 selector 的 poll 方法进行 Socket IO 的操作 this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); // 处理完成之后的操作 long updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); // 处理已经完成的 send(不需要 response 的 request,如 send) handleCompletedSends(responses, updatedNow); // 处理从 server 端接收到 Receive(如 Metadata 请求) handleCompletedReceives(responses, updatedNow); // 处理连接失败那些连接,重新请求 meta handleDisconnections(responses, updatedNow); // 处理新建立的那些连接(还不能发送请求,比如:还未认证) handleConnections(); // 处理超时的连接 handleTimedOutRequests(responses, updatedNow); // 处理回调的信息 for (ClientResponse response : responses) { if (response.request().hasCallback()) { try { response.request().callback().onComplete(response); } catch (Exception e) { log.error("Uncaught error in request completion:", e); } } } // 返回响应结果 return responses; }
4. Selector
终于来到了我们的最后一步,
Kafka
自己封装的 Selector
,这个哥们就是真正发送消息的地方
激动的心,颤抖的手,跟着我一起看看
Selector
到底是怎么发送消息的
4.1send
根据当前节点的编号拿到当前客户端的
channel
向当前的
KafkaChannel
注册写事件
写事件触发的时间:当 Scoket缓冲区 有空闲时,触发该事件‘
从这里可以看出来,我们的
send
方法其实也没有真正的发送消息,只是向 KafkaChannel
注册了 写事件
,保障后面 poll
轮旋事件发送的正确性。
public void send(Send send) { // 根据当前节点的编号拿到当前客户端的channel KafkaChannel channel = channelOrFail(send.destination()); try { // 向当前的 KafkaChannel 注册写事件 channel.setSend(send); } catch (CancelledKeyException e) { this.failedSends.add(send.destination()); close(channel); } } public void setSend(Send send) { this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); }
4.2 poll
清除相关记录
获取就绪事件
处理 io 操作
将处理得到的
stagedReceives
添加到completedReceives
中(NetworkClient
处理响应)关闭老的连接
由于这个方法比较重要,所以我们一个一个的讲,跟着我们的思路来
public void poll(long timeout) throws IOException { // 清除相关缓存记录 clear(); // 获取就绪事件 long startSelect = time.nanoseconds(); int readyKeys = select(timeout); long endSelect = time.nanoseconds(); currentTimeNanos = endSelect; this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); // 处理 io 操作 if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { pollSelectionKeys(this.nioSelector.selectedKeys(), false); pollSelectionKeys(immediatelyConnectedKeys, true); } // 将处理得到的 stagedReceives 添加到 completedReceives 中 addToCompletedReceives(); long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); // 关闭老的连接 maybeCloseOldestConnection(); }
4.2.1 clear
clear()
方法是在每次 poll()
执行的第一步,它作用的就是清理上一次 poll 过程产生的部分缓存。
这里的缓存是不是感觉有点熟悉,他就是我们之前在
NetworkClient
的 **处理完成之后的操作 **对应的缓存,忘了的小伙伴可以回去看一下
private void clear() { this.completedSends.clear(); this.completedReceives.clear(); this.connected.clear(); this.disconnected.clear(); this.disconnected.addAll(this.failedSends); this.failedSends.clear(); }
4.2.2 select
select(ms)
方法主要通过调用 nioSelector
的 select
方法,返回我们就绪事件的数量
这里的
nioSelector
是属于 java.nio.channels.Selector
的,也就是我们 Java NIO
包里面的
nioSelector.selectNow
:非阻塞的,当前操作没有通道准备好立即返回,返回是0nioSelector.select
:阻塞的,当前没有通道准备好会阻塞住,最长时间为long ms
private int select(long ms) throws IOException { if (ms == 0L) { return this.nioSelector.selectNow(); } else { return this.nioSelector.select(ms); } }
4.2.3 pollSelectionKeys
pollSelectionKeys(this.nioSelector.selectedKeys(), false); pollSelectionKeys(immediatelyConnectedKeys, true);
这部分是
socket IO
的主要部分,发送 Send
及接收 Receive
都是在这里完成的,在 poll()
方法中,这个方法会调用两次:
第一次调用的目的是:处理已经就绪的事件,进行相应的
IO
操作;第二次调用的目的是:处理新建立的那些连接,
添加缓存及传输层(
Kafka
又封装了一次,这里后续文章会讲述)的握手与认证。
我们来剖析下
pollSelectionKeys
整理的步骤:
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) { // 拿到当前所有准备好的keys Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { // 获取key并删除它,防止重复使用 SelectionKey key = iterator.next(); iterator.remove(); // 根据 key 拿到对应的附件 KafkaChannel KafkaChannel channel = channel(key); sensors.maybeRegisterConnectionMetrics(channel.id()); lruConnections.put(channel.id(), currentTimeNanos); try { // 处理所有已经完成握手(Tcp)的连接(正常或立即) if (isImmediatelyConnected || key.isConnectable()) { if (channel.finishConnect()) { this.connected.add(channel.id()); this.sensors.connectionCreated.record(); } else continue; } // 如果通道未准备好,请完成准备 if (channel.isConnected() && !channel.ready()) channel.prepare(); // 如果通道已准备好从任何具有可读数据的连接中读取 if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); } // 如果通道准备好了,就向缓冲区中有空间且我们有数据的任何套接字写入 if (channel.ready() && key.isWritable()) { Send send = channel.write(); if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } // 取消所有失效的套接字 if (!key.isValid()) { close(channel); this.disconnected.add(channel.id()); } } } }
拿到所有准备好的
keys
,获取keys
并删除它(防止重复使用),根据key
拿到对应的附件KafkaChannel
处理以下几种情况:
所有已经完成握手的连接
通道未准备好的
key
通道准备好的数据
可写入的
key
取消所有失效的套接字
其中我们不难看出,最重要的当属 处理可写入的
key
,我们有必要来详细说说 Send send = channel.write();
的实现
public Send write() throws IOException { Send result = null; if (send != null && send(send)) { result = send; send = null; } return result; } // 是否发送成功 private boolean send(Send send) throws IOException { // 写入消息 send.writeTo(transportLayer); // 写完之后取消写事件,防止无限触发写事件 if (send.completed()) transportLayer.removeInterestOps(SelectionKey.OP_WRITE); return send.completed(); } // 通过客户端的channel向服务端发送信息 public long writeTo(GatheringByteChannel channel) throws IOException { long written = channel.write(buffers); remaining -= written; if (channel instanceof TransportLayer) pending = ((TransportLayer) channel).hasPendingWrites(); return written; }
最终还是调用了我们
Java NIO
中的 channel.write(buffers)
方法完成发送消息。
4.2.4 addToCompletedReceives
client
的时序性而是通过 InFlightRequests
和 RecordAccumulator
的 mutePartition
来保证的。因此对于 Client
端而言,这里接收到的所有 Receive
都会被放入到 completedReceives
的集合中等待后续处理。
这里面的数据主要我们上面 ```pollSelectionKeys
中添加的,然后在这放入到
completedReceives,随后被我们
NetworkClient` 中被处理
// 处理响应放入到 completedReceives 中 private void addToCompletedReceives() { if (!this.stagedReceives.isEmpty()) { Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator(); while (iter.hasNext()) { Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next(); KafkaChannel channel = entry.getKey(); if (!channel.isMute()) { Deque<NetworkReceive> deque = entry.getValue(); NetworkReceive networkReceive = deque.poll(); this.completedReceives.add(networkReceive); this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit()); if (deque.isEmpty()) iter.remove(); } } } }
五、总结
终于写完了,其实最开始学
kafka
的时候是今年 2
月份,那时候还不懂什么是 IO
,看源码的通信基本看不懂
后来,花了几个月的时间学了 操作系统 --> 计算机网络 --> Linux 通信 --> Java NIO --> Netty,现在看
Kafka
的通信就变得通透了。
另外,基本现在所有源码的通信都有
Netty
架构的影子
所以,如果你也想学源码的话,最好是先看看
Netty
的相关知识,学完之后,你会发现,通信架构不过如此。
如果你能看到这,想必已经跟完了整个
Producer
的网络架构部分,你有没有感觉到一个事情:网络架构就是整个生产者运行的全部流程
的,本来我以为讲网络架构就是网络架构,越写越发觉,这不就是整个生产者发送的全部流程嘛
所以,我们生产者全部的文章就结束了,总体如下: