聊聊 Kafka:Producer Metadata 读取与更新机制

简介: 聊聊 Kafka:Producer Metadata 读取与更新机制

一、前言

我们上一篇说了 聊聊 Kafka:Producer 源码解析,这一篇我们来说下 Producer Metadata 的读取与更新机制。上一篇从宏观上介绍了 Producer 的宏观模型,其中通过 waitOnMetadata() 方法获取 topic 的 metadata 信息这一块东西很多,所以单独拎一篇出来讲。

二、Metadata

2.1 什么是 Metadata

Metadata 是指 Kafka 集群的元数据,包含了 Kafka 集群的各种信息,直接看源码便可知:

public class Metadata implements Closeable {
    private final Logger log;
    // retry.backoff.ms: 默认值为100ms,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。
    private final long refreshBackoffMs;
    // metadata.max.age.ms: 默认值为300000,如果在这个时间内元数据没有更新的话会被强制更新。
    private final long metadataExpireMs;
    // 更新版本号,每更新成功1次,version自增1,主要是用于判断metadata是否更新
    private int updateVersion;
    // 请求版本号,每发送一次请求,version自增1
    private int requestVersion;
    // 上一次更新的时间(包含更新失败)
    private long lastRefreshMs;
    // 上一次更新成功的时间
    private long lastSuccessfulRefreshMs;
    private KafkaException fatalException;
    // 非法的topics
    private Set<String> invalidTopics;
    // 未认证的topics
    private Set<String> unauthorizedTopics;
    // 元数据信息的Cache缓存
    private MetadataCache cache = MetadataCache.empty();
    private boolean needFullUpdate;
    private boolean needPartialUpdate;
    // 会收到metadata updates的Listener列表
    private final ClusterResourceListeners clusterResourceListeners;
    private boolean isClosed;
    // 存储Partition最近一次的leaderEpoch
    private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
}

MetadataCache:Kafka 集群中关于 node、topic 和 partition 的信息。(是只读的)

public class MetadataCache {
    private final String clusterId;
    private final Map<Integer, Node> nodes;
    private final Set<String> unauthorizedTopics;
    private final Set<String> invalidTopics;
    private final Set<String> internalTopics;
    private final Node controller;
    private final Map<TopicPartition, PartitionMetadata> metadataByPartition;
    private Cluster clusterInstance;
}

关于 topic 的详细信息(leader 所在节点、replica 所在节点、isr 列表)都是在 Cluster 实例中保存的。

// 保存了Kafka集群中部分nodes、topics和partitions的信息
public final class Cluster {
    private final boolean isBootstrapConfigured;
    // node 列表
    private final List<Node> nodes;
    // 未认证的topics
    private final Set<String> unauthorizedTopics;
    // 非法的topics
    private final Set<String> invalidTopics;
    // kafka内置的topics
    private final Set<String> internalTopics;
    private final Node controller;
    // partition对应的信息,如:leader所在节点、所有的副本、ISR中的副本、offline的副本
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
    // topic和partition信息的对应关系
    private final Map<String, List<PartitionInfo>> partitionsByTopic;
    // topic和可用partition(leader不为null)的对应关系
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
    // node和partition信息的对应关系
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
    // 节点id与节点的对应关系
    private final Map<Integer, Node> nodesById;
    // 集群信息,里面只有一个clusterId
    private final ClusterResource clusterResource;
}
// topic-partition: 包含 topic、partition、leader、replicas、isr
public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
    private final Node[] offlineReplicas;
}

看源码不难理解 Metadata 的主要数据结构,我们大概总结下包含哪些信息:

  • 集群中有哪些节点;
  • 集群中有哪些 topic,这些 topic 有哪些 partition;
  • 每个 partition 的 leader 副本分配在哪个节点上,follower 副本分配在哪些节点上;
  • 每个 partition 的 AR 有哪些副本,ISR 有哪些副本;

2.2 Metadata 的应用场景

Metadata 在 Kafka 中非常重要,很多场景中都需要从 Metadata 中获取数据或更新数据,例如:

  • KafkaProducer 发送一条消息到指定的 topic 中,需要知道分区的数量,要发送的目标分区,目标分区的 leader,leader 所在的节点地址等,这些信息都要从 Metadata 中获取。
  • 当 Kafka 集群中发生了 leader 选举,节点中 partition 或副本发生了变化等,这些场景都需要更新Metadata 中的数据。

三、Producer 的 Metadata 更新流程

Producer 在调用 doSend() 方法时,第一步就是通过 waitOnMetadata 方法获取该 topic 的 metadata 信息。

总结一下以上代码:

  • 首先会从缓存中获取 cluster 信息,并从中获取 partition 信息,如果可以取到则返回当前的 cluster 信息,如果不含有所需要的 partition 信息时就会更新 metadata;
  • 更新 metadata 的操作会在一个 do ….while 循环中进行,直到 metadata 中含有所需 partition 的信息,该循环中主要做了以下事情:
  • 调用 metadata.requestUpdateForTopic() 方法来获取 updateVersion,即上一次更新成功时的 version,并将 needUpdate 设为 true,强制更新;
  • 调用 sender.wakeup() 方法来唤醒 Sender 线程,Sender 线程中又会唤醒 NetworkClient 线程,在 NetworkClient 中会对 UpdateMetadataRequest 请求进行操作,待会下面会详细介绍;
  • 调用 metadata.awaitUpdate(version, remainingWaitMs) 方法来等待 metadata 的更新,通过比较当前的 updateVersion 与步骤 1 中获取的 updateVersion 来判断是否更新成功;

3.1 org.apache.kafka.clients.NetworkClient#poll

上面提到调用 sender.wakeup() 方法来唤醒 Sender 线程,Sender 线程中又会唤醒 NetworkClient 线程,在 NetworkClient 中会对 UpdateMetadataRequest 请求进行操作。在 NetworkClient 中真正处理请求的是 NetworkClient.poll() 方法,接下来让我们通过分析源码来看下 NetworkClient 是如何处理请求的。


3.2 org.apache.kafka.clients.NetworkClient.DefaultMetadataUpdater#maybeUpdate(long)

我们来看下 metadata 是如何更新的


这里你可能会问,老周啊,最小负载节点是啥呀?

别急,我们来看下面这张图,你就理解了。

LeastLoadedNode 指 Kafka 集群中所有 Node 中负载最小的那一个 Node,它是由每个 Node 在 InFlightRequests 中还未确定的请求数决定的,未确定的请求越少则负载越小。如上图所示,Node1 即为 LeastLoadedNode。

3.3 org.apache.kafka.clients.Metadata#updateRequested

下次更新元数据信息的时间:当前 metadata 信息即将到期的时间即 timeToExpire 和 距离允许更新 metadata 信息的时间 即 timeToAllowUpdate 中的最大值。

timeToExpire:needUpdate 为 true,表示强制更新,此时该值为 0;否则的话,就按照定时更新时间,即元数据信息过期时间(默认是 300000 ms 即 5 分钟)进行周期性更新。

timeToAllowUpdate:默认就是 refreshBackoffMs 的默认值,即 100 ms。

3.4 org.apache.kafka.clients.NetworkClient.DefaultMetadataUpdater#maybeUpdate(long, org.apache.kafka.common.Node)

我们继续跟一下 maybeUpdate 方法:

因此,每次 producer 请求更新 metadata 时,会有以下几种情况:

  • 通道已经 ready,node 可以发送请求,那么就直接发送请求。
  • 如果该 node 正在建立连接,则直接返回。
  • 如果该 node  还没建立连接,则向 broker 初始化连接。

而 KafkaProducer 线程一直是阻塞在两个 while 循环中的,直到 metadata 更新:

  • sender 线程第一次调用 poll,初始化与 node 的连接。
  • sender 线程第二次调用 poll,发送 metadata 请求。
  • sender 线程第三次调用 poll,获取 metadataResponse,并更新 metadata。

3.5 接收 Server 端的响应,更新 Metadata 信息

handleCompletedReceives 是如何处理任何已完成的接收响应,如下:

之后进一步调用 handleSuccessfulResponse。

四、总结

Metadata 会在下面两种情况下进行更新:

  • 强制更新:调用 Metadata.requestUpdate() 将 needFullUpdate 置为 true 来强制更新。
  • 周期性更新:通过 Metadata 的 lastSuccessfulRefreshMs 和 metadataExpireMs 来实现,一般情况下,默认周期时间就是 metadataExpireMs,5 分钟时长。

在 NetworkClient 的 poll() 方法调用时,会去检查两种更新机制,只要达到一种,就会触发更新操作。

Metadata 的强制更新会在以下几种情况下进行:

  • initConnect 方法调用时,初始化连接;
  • poll() 方法中对 handleDisconnections() 方法调用来处理连接断开的情况,这时会触发强制更新;
  • poll() 方法中对 handleTimedOutRequests() 来处理请求超时时;
  • 发送消息时,如果无法找到 partition 的 leader;
  • 处理 Producer 响应(handleProduceResponse),如果返回关于 Metadata 过期的异常,比如:没有 topic-partition 的相关 meta 或者 client 没有权限获取其 metadata。

强制更新主要是用于处理各种异常情况。

好了,Producer Metadata 读取与更新机制就说到这,我们下一期再见。



Java

相关文章
|
3月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
96 4
|
3月前
|
消息中间件 Java 大数据
Kafka ISR机制详解!
本文详细解析了Kafka的ISR(In-Sync Replicas)机制,阐述其工作原理及如何确保消息的高可靠性和高可用性。ISR动态维护与Leader同步的副本集,通过不同ACK确认机制(如acks=0、acks=1、acks=all),平衡可靠性和性能。此外,ISR机制支持故障转移,当Leader失效时,可从ISR中选取新的Leader。文章还包括实例分析,展示了ISR在不同场景下的变化,并讨论了其优缺点,帮助读者更好地理解和应用ISR机制。
117 0
Kafka ISR机制详解!
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
73 0
|
3月前
|
消息中间件 Java Kafka
Kafka ACK机制详解!
本文深入剖析了Kafka的ACK机制,涵盖其原理、源码分析及应用场景,并探讨了acks=0、acks=1和acks=all三种级别的优缺点。文中还介绍了ISR(同步副本)的工作原理及其维护机制,帮助读者理解如何在性能与可靠性之间找到最佳平衡。适合希望深入了解Kafka消息传递机制的开发者阅读。
269 0
|
5月前
|
消息中间件 监控 算法
Kafka Producer 的性能优化技巧
【8月更文第29天】Apache Kafka 是一个分布式流处理平台,它以其高吞吐量、低延迟和可扩展性而闻名。对于 Kafka Producer 来说,正确的配置和编程实践可以显著提高其性能。本文将探讨一些关键的优化策略,并提供相应的代码示例。
214 1
|
5月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
162 4
|
5月前
|
消息中间件 负载均衡 Java
揭秘Kafka背后的秘密!Kafka 架构设计大曝光:深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理及流传输设计的高效率消息系统。其核心特性包括高吞吐量、低延迟及出色的可扩展性。Kafka采用分布式日志模型,支持数据分区与副本,确保数据可靠性和持久性。系统由Producer(消息生产者)、Consumer(消息消费者)及Broker(消息服务器)组成。Kafka支持消费者组,实现数据并行处理,提升整体性能。通过内置的故障恢复机制,即使部分节点失效,系统仍能保持稳定运行。提供的Java示例代码展示了如何使用Kafka进行消息的生产和消费,并演示了故障转移处理过程。
62 3
|
5月前
|
消息中间件 Java Kafka
如何在Kafka分布式环境中保证消息的顺序消费?深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据管道和流处理设计的分布式平台,以其高效的消息发布与订阅功能著称。在分布式环境中确保消息按序消费颇具挑战。本文首先介绍了Kafka通过Topic分区实现消息排序的基本机制,随后详细阐述了几种保证消息顺序性的策略,包括使用单分区Topic、消费者组搭配单分区消费、幂等性生产者以及事务支持等技术手段。最后,通过一个Java示例演示了如何利用Kafka消费者确保消息按序消费的具体实现过程。
192 3
|
5月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
123 2