Kafka 消费者之 findCoordinator源码解析

本文涉及的产品
云解析DNS,个人版 1个月
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: Kafka 消费者之 findCoordinator源码解析

1在这里我们先来梳理一下consumeGroup的相关知识

1、首先,我们会给每个consume设置groupId,对于相同groupId且订阅相同topic的consume,会组成consumeGroup,如图一所示

2、对于Server端的topic来说,会有partition这个概念,如图二所示

图二

3、现在我们有多个consume及多个partition,到底由哪个consume来消费哪个partition呢?就由consume启动时的分区分配策略来决定。

  • 如果consume数量小于partition的数量,则一个consume有可能消费多个分区,如图三所示
  • 如果consume数量大于partition的数量,则会有consume线程空跑,如图四所示
    4、kafka的内置topic:consumer_offsets专门记录消费位点信息,既然是内置topic,那自然也会有partition及partition leader的概念,对于同一个groupId的消费位点都会记录在同一个partition中,在这篇文章中findCoordinator即是找到该groupId对应的partition的leader节点,我们知道这个节点才能将位移信息提交到这里保存,如果该partition还有其他副本,则该节点还会与其他副本同步位移信息。与该节点交互都是由GroupCoordinator完成的。

在这里插入图片描述

2findCoordinator流程展示

在这里插入图片描述

3客户端源码分析

这里还是放一下findCoordinator的代码,看其他consume的代码就发现客户端跟kafkaServer通信的格式大多是这样的,如果通信一次发现该GroupCoordinator的信息还未获取到则继续重试,直到超时,这里的超时时间即为poll时传入的超时时间,这个时间设置贯穿了整个consume的运行代码。

   protectedsynchronizedbooleanensureCoordinatorReady(final Timer timer) {
       //如果还未加入group则与group通信
       if (!coordinatorUnknown())
           returntrue;

       do {
           if (findCoordinatorException != null && !(findCoordinatorException instanceof RetriableException)) {
               final RuntimeException fatalException = findCoordinatorException;
               findCoordinatorException = null;
               throw fatalException;
           }
           final RequestFuture<Void> future = lookupCoordinator();
           client.poll(future, timer);
         //如果还没回调完成则说明是超时的
           if (!future.isDone()) {
               // ran out of time
               break;
           }

           if (future.failed()) {
               if (future.isRetriable()) {
                   log.debug("Coordinator discovery failed, refreshing metadata");
                   client.awaitMetadataUpdate(timer);
               } else
                   throw future.exception();
               //获取group的信息之后client会与group对应的节点建立连接,如果不可用则还会重试
           } elseif (coordinator != null && client.isUnavailable(coordinator)) {
               // we found the coordinator, but the connection has failed, so mark
               // it dead and backoff before retrying discovery
               markCoordinatorUnknown();
               timer.sleep(rebalanceConfig.retryBackoffMs);
           }
           //如果与group通信成功则会跳出循环
       } while (coordinatorUnknown() && timer.notExpired());

       return !coordinatorUnknown();
   }

这里还有一点,跟踪代码可以看到以下代码在每次check以及与Server端通信完成之后都会有一样的逻辑,可以仔细思考一下,coordinator即获取到的group节点对象,client.isUnavailable(coordinator)是在与group建立连接,每次判断coordinator不为空且client与group连接失败,则将coordinator置空,为什么会这样呢?很有可能是请求到group的信息之后发现该节点已下线或者不可用,此时服务端很有可能也在进行选举,所以我们需要将coordinator清空,待服务端选举完成后再次通信。

 protectedsynchronized Node checkAndGetCoordinator() {
       if (coordinator != null && client.isUnavailable(coordinator)) {
           markCoordinatorUnknown(true);
           returnnull;
       }
       returnthis.coordinator;
   }

org.apache.kafka.clients.consumer.internals.AbstractCoordinator#lookupCoordinator

这段代码有个亮点就是先寻找的负载最小节点,然后与该节点通信获取group节点的信息。

  protectedsynchronized RequestFuture<Void> lookupCoordinator() {
       if (findCoordinatorFuture == null) {
           // find a node to ask about the coordinator
           //与最小负载的node通信
           Node node = this.client.leastLoadedNode();
           if (node == null) {
               log.debug("No broker available to send FindCoordinator request");
               return RequestFuture.noBrokersAvailable();
           } else {
               findCoordinatorFuture = sendFindCoordinatorRequest(node);
               // remember the exception even after the future is cleared so that
               // it can still be thrown by the ensureCoordinatorReady caller
               findCoordinatorFuture.addListener(new RequestFutureListener<Void>() {
                   @Override
                   publicvoidonSuccess(Void value) {} // do nothing

                   @Override
                   publicvoidonFailure(RuntimeException e) {
                       findCoordinatorException = e;
                   }
               });
           }
       }
       return findCoordinatorFuture;
   }

org.apache.kafka.clients.NetworkClient#leastLoadedNode

我们先来看看是如何寻找负载最小节点的,这里代码还是挺讲究的,首先就是取随机数,防止每次都从第一个节点连接,如果判断没有在途的request则直接返回该节点,否则取在途request最小的节点,如果该节点不存在,则依次取连接的节点、需要重试的节点,如果找到不为null的节点则返回该节点,否则返回null。

public Node leastLoadedNode(long now) {
       List<Node> nodes = this.metadataUpdater.fetchNodes();
       if (nodes.isEmpty())
           thrownew IllegalStateException("There are no nodes in the Kafka cluster");
       int inflight = Integer.MAX_VALUE;

       Node foundConnecting = null;
       Node foundCanConnect = null;
       Node foundReady = null;
       //随机取一个节点
       int offset = this.randOffset.nextInt(nodes.size());
       for (int i = 0; i < nodes.size(); i++) {
           int idx = (offset + i) % nodes.size();
           Node node = nodes.get(idx);
           //如果该节点是可连接的,且selector空闲,且发送队列空闲则可以发送请求
           if (canSendRequest(node.idString(), now)) {
               //inFlightRequests记录了已发送请求但还未收到response的request,这里判定如果该节点没有这种数据则直接作为最小负载节点返回
               int currInflight = this.inFlightRequests.count(node.idString());
               if (currInflight == 0) {
                   // if we find an established connection with no in-flight requests we can stop right away
                   log.trace("Found least loaded node {} connected with no in-flight requests", node);
                   return node;
                   //否则取inFlightRequests中最小count的节点作为最小负载节点
               } elseif (currInflight < inflight) {
                   // otherwise if this is the best we have found so far, record that
                   inflight = currInflight;
                   foundReady = node;
               }
           } elseif (connectionStates.isPreparingConnection(node.idString())) {
               foundConnecting = node;
           } elseif (canConnect(node, now)) {
               //如果该节点未被记录或者断连之后超过重试时间,则允许设置该节点
               foundCanConnect = node;
           } else {
               log.trace("Removing node {} from least loaded node selection since it is neither ready " +
                       "for sending or connecting", node);
           }
       }

       // We prefer established connections if possible. Otherwise, we will wait for connections
       // which are being established before connecting to new nodes.
       //优先取状态良好的节点
       if (foundReady != null) {
           log.trace("Found least loaded node {} with {} inflight requests", foundReady, inflight);
           return foundReady;
       } elseif (foundConnecting != null) {
           log.trace("Found least loaded connecting node {}", foundConnecting);
           return foundConnecting;
       } elseif (foundCanConnect != null) {
           log.trace("Found least loaded node {} with no active connection", foundCanConnect);
           return foundCanConnect;
       } else {
           log.trace("Least loaded node selection failed to find an available node");
           returnnull;
       }
   }

拆解FindCoordinatorRequest

通过下图我们来看看发送了哪些数据,key_type有两种枚举,一种是GROUP,另一种是TRANSACTION,如果type为GROUP的话那key就是groupId

在这里插入图片描述

4服务端源码分析

kafka.server.KafkaApis#handleFindCoordinatorRequest

服务端还是通过KafkaApi来处理请求,代码也比较简单。

defhandleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
   val findCoordinatorRequest = request.body[FindCoordinatorRequest]
   //校验数据
   //……省略部分代码
     // get metadata (and create the topic if necessary)
     val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
       caseCoordinatorType.GROUP =>
           //4.1 找到对应发分区
         val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
           //4.2 获取对应的元数据
         val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
         (partition, metadata)

       caseCoordinatorType.TRANSACTION =>
         val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
         val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
         (partition, metadata)

       case _ =>
         thrownewInvalidRequestException("Unknown coordinator type in FindCoordinator request")
     }
     //组装返回参数
    //……省略部分代码
   }
 }

kafka.coordinator.group.GroupMetadataManager#partitionFor

我们知道consume消费后对应的位点是保存在kafka的内部名为"__consumer_offsets"的内置topic中,内置topic初始化时由offsets.topic.num.partitions 参数来决定分区数,默认值是50,相同consumeGroup的offset最终会保存在其中一个分区中,而保存在哪个分区就由下面这段代码来决定,可以看到逻辑很简单,就是取groupId的hashCode,然后对总的分区数取模。比如groupId为"consume_group",最终就会在34号分区保存位点。

 defpartitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount

kafka.server.KafkaApis#getOrCreateInternalTopic

这里是先从当前node的元数据缓存中拿到对应topic的数据,如果没有,则创建。从这段代码也可以猜想kafka内置topic的创建原理,是一种懒加载的思想,当第一个consume接入之后才会创建对应topicPartition文件。

 privatedefgetOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponse.TopicMetadata = {
   val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName)
   topicMetadata.headOption.getOrElse(createInternalTopic(topic))
 }

这里的topicMetadata就是对应入参topic返回类似列表的对象,因为入参只有一个topic,所以直接取第一个数据,数据结构见下图,可以更直观的理解返回参数。

在这里插入图片描述在这里插入图片描述

kafka.server.KafkaApis#createTopic

Topic创建的流程如下图所示,详情请看 Topic创建流程源码分析

拆解FindCoordinatorResponse

通过下图我们来看看返回了哪些数据,可以看到前面取了很多数据,最终拼到返回参数里面的只有leader所在的节点信息

在这里插入图片描述

5总结

这块代码本身不是很复杂,主要是有一些细节需要考虑,通过仔细思量这些细节对我们今后分析consume异常会大有好处。流程总结如下 1、寻找最小负载节点信息 2、向最小负载节点发送FindCoordinatorRequest 3、最小负载节点处理该请求。

  • 首先找到该groupId对应的分区
  • 通过内存中缓存的metaData获取该分区的信息,如果不存在则创建topic
  • 返回查找到的分区leader信息

  👇🏻  扫描 下方 关注公众号 参与每周福利👇🏻

相关文章
|
11天前
|
存储 安全 Java
深度长文解析SpringWebFlux响应式框架15个核心组件源码
以上是Spring WebFlux 框架核心组件的全部介绍了,希望可以帮助你全面深入的理解 WebFlux的原理,关注【威哥爱编程】,主页里可查看V哥每天更新的原创技术内容,让我们一起成长。
|
12天前
|
关系型数据库 分布式数据库 数据库
PolarDB-X源码解析:揭秘分布式事务处理
【7月更文挑战第3天】**PolarDB-X源码解析:揭秘分布式事务处理** PolarDB-X,应对大规模分布式事务挑战,基于2PC协议确保ACID特性。通过预提交和提交阶段保证原子性与一致性,使用一致性快照隔离和乐观锁减少冲突,结合故障恢复机制确保高可用。源码中的事务管理逻辑展现了优化的分布式事务处理流程,为开发者提供了洞察分布式数据库核心技术的窗口。随着开源社区的发展,更多创新实践将促进数据库技术进步。
18 3
|
16天前
|
消息中间件 Kafka 程序员
Kafka面试必备:深度解析Replica副本的作用与机制
**Kafka的Replica副本是保证数据可靠性的关键机制。每个Partition有Leader和Follower副本,Leader处理读写请求及管理同步,Follower被动同步并准备成为新Leader。从Kafka 2.4开始,Follower在完全同步时也可提供读服务,提升性能。数据一致性通过高水位机制和Leader Epoch机制保证,后者更精确地判断和恢复数据一致性,增强系统容错能力。**
24 1
|
17天前
|
消息中间件 监控 Kafka
深入解析:Kafka 为何不支持全面读写分离?
**Kafka 2.4 引入了有限的读写分离,允许Follower处理只读请求,以缓解Leader压力。但这不适用于所有场景,特别是实时数据流和日志分析,因高一致性需求及PULL同步方式导致的复制延迟,可能影响数据实时性和一致性。在设计系统时需考虑具体业务需求。**
18 1
|
13天前
|
前端开发 开发者
深入解析Vite.js源码
【7月更文挑战第1天】Vite.js 深入解析:以其无bundle开发、动态ES模块加载提升开发效率;本地HTTP服务器配合WebSocket实现热更新;按需加载减少资源占用;预构建优化生产环境性能;基于Rollup的插件系统增强灵活性。Vite,一个创新且高效的前端构建工具。
20 0
|
14天前
|
消息中间件 Java Kafka
使用Java编写Kafka生产者和消费者示例
使用Java编写Kafka生产者和消费者示例
16 0
|
19天前
|
消息中间件 SQL 存储
ClickHouse(21)ClickHouse集成Kafka表引擎详细解析
ClickHouse的Kafka表引擎允许直接从Apache Kafka流中消费数据,支持多种数据格式如JSONEachRow。创建Kafka表时需指定参数如brokers、topics、group和format。关键参数包括`kafka_broker_list`、`kafka_topic_list`、`kafka_group_name`和`kafka_format`。Kafka特性包括发布/订阅、容错存储和流处理。通过设置`kafka_num_consumers`可以调整并行消费者数量。Kafka引擎还支持Kerberos认证。虚拟列如`_topic`、`_offset`等提供元数据信息。
48 0
|
29天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
29天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
28天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
923 0

推荐镜像

更多