寻找协调器FindCoordinatorRequest请求流程

简介: 寻找协调器FindCoordinatorRequest请求流程

提示:本文可能已过期,请点击原文查看:寻找协调器FindCoordinatorRequest请求流程

@

1客户端发起请求

我们在分析消费者的时候, 有看到调用FindCoordinatorRequest的请求


   private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
       // initiate the group metadata request
       log.debug("Sending FindCoordinator request to broker {}", node);
       FindCoordinatorRequest.Builder requestBuilder =
               new FindCoordinatorRequest.Builder(
                       new FindCoordinatorRequestData()
                           .setKeyType(CoordinatorType.GROUP.id())
                           .setKey(this.rebalanceConfig.groupId));
       return client.send(node, requestBuilder)
               .compose(new FindCoordinatorResponseHandler());
   }
   

2Broker处理请求



defhandleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
   val findCoordinatorRequest = request.body[FindCoordinatorRequest]

// 根据协调器类型判断是否授权过
   if (findCoordinatorRequest.data.keyType == CoordinatorType.GROUP.id &&
       !authorize(request.context, DESCRIBE, GROUP, findCoordinatorRequest.data.key))
     sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
   elseif (findCoordinatorRequest.data.keyType == CoordinatorType.TRANSACTION.id &&
       !authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
     sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
   else {
     // get metadata (and create the topic if necessary)
     val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
       caseCoordinatorType.GROUP =>
         val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
         val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
         (partition, metadata)

       caseCoordinatorType.TRANSACTION =>
         val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
         val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
         (partition, metadata)

       case _ =>
         thrownewInvalidRequestException("Unknown coordinator type in FindCoordinator request")
     }

     defcreateResponse(requestThrottleMs: Int): AbstractResponse = {
       defcreateFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
         newFindCoordinatorResponse(
             newFindCoordinatorResponseData()
               .setErrorCode(error.code)
               .setErrorMessage(error.message)
               .setNodeId(node.id)
               .setHost(node.host)
               .setPort(node.port)
               .setThrottleTimeMs(requestThrottleMs))
       }
       val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
         createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
       } else {
         val coordinatorEndpoint = topicMetadata.partitions.asScala
           .find(_.partitionIndex == partition)
           .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
           .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId))
           .flatMap(_.getNode(request.context.listenerName))
           .filterNot(_.isEmpty)

         coordinatorEndpoint match {
           caseSome(endpoint) =>
             createFindCoordinatorResponse(Errors.NONE, endpoint)
           case _ =>
             createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
         }
       }
       trace("Sending FindCoordinator response %s for correlation id %d to client %s."
         .format(responseBody, request.header.correlationId, request.header.clientId))
       responseBody
     }
     sendResponseMaybeThrottle(request, createResponse)
   }
 }



简单校验

根据协调器类型判断是否有被授权。协调器类型有  GROUP((byte) 0), TRANSACTION((byte) 1)两种

获取分区号和元信息

这里的接口分两种情况,一个是协调列席为GROUP  一个是 TRANSACTION他们的处理逻辑都是一样的,只是处理的Topic不一样

GROUP 对应的Topic是  __consumer_offsets

TRANSACTION 对应的Topic是__transaction_state

这里我们主要分析一下 GROUP的情况

  1. 去zk获取/brokers/topic/__consumer_offsets 数据  找到消费者Topic的分区总数。默认是50.  (由offsets.topic.num.partitions 控制)找到分区数之和后, 则计算 Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount(groupID按分区数取模运算)获取到了分区号partition;
  2. 然后接着获取该Topic的元信息, 这里需要注意的是 去获取元信息应该走的是什么 监听协议(listenerName) 呢?这个主要是看当前处理请求的Broker是通过哪个入口来的。比如说该Broker有两个监听口,listeners = INTER://xxx.xx.xx.100:9091, OUTSIDE://xxx.xx.xx.101:9092 .如果客户端发起请求的时候是对xxx.xx.xx.101:9092发起的请求,那么这个对应的监听器就是 OUTSIDE .  那么Broker去获取__consumer_offsets元信息发起请求的时候也是会用的 OUTSIDE 协议。
  3. 如果发现没有这个Topic的元信息,则需要去创建__consumer_offsetsTopic 。 注意:创建这个Topic的的几个特殊属性:
属性 描述
cleanup.policy compact 日志清理策略为 :紧缩
segment.bytes 10010241024 一个日志段的大小
compression.type producer 压缩类型 为跟生产者保持一致

构建返回数据 createResponse

这里才是真正的找到协调器的主要逻辑, 这里的判断逻辑是

上面我们获取到的分区号是partition, 我们同样获取到了__consumer_offsets的元信息Metadata。

那我们就可以获取到这个分区号, 并且就能够找到该分区的LeaderId所属在哪个Broker上。

知道了哪个Broker, 那我们就能够获取到对应的EndPoint, 一个Broker可能同时有多个EndPoint(配置了多个监听器),那么我们应该使用哪个EndPoint呢?

这个的判断逻辑与上面说过的一样,客户端发起请求时候的监听器是哪个,那么这里就应该用哪个监听器。

在这里插入图片描述

注意:如果找到的分区Leader不存在 那么这个协调器就不存在

然后会返回异常:


The coordinator is not available

3问题

  1. 如果客户端走的外网监听器访问的集群,那么在客户端发起请求之后到集群内部,触发内部调用链的请求,那么内部这个调用链是用什么监听器访问的呢?

从客户端 -> Broker -> 其他Broker. 这是一个调用链路,从最开始用的是什么监听器那么这条链路上都是用的这个监听器!具体请看:多网络情况下,Kafka客户端如何选择合适的网络发起请求

相关文章
|
2月前
|
XML Java 数据库
在微服务架构中,请求常跨越多个服务,涉及多组件交互,问题定位因此变得复杂
【9月更文挑战第8天】在微服务架构中,请求常跨越多个服务,涉及多组件交互,问题定位因此变得复杂。日志作为系统行为的第一手资料,传统记录方式因缺乏全局视角而难以满足跨服务追踪需求。本文通过一个电商系统的案例,介绍如何在Spring Boot应用中手动实现日志链路追踪,提升调试效率。我们生成并传递唯一追踪ID,确保日志记录包含该ID,即使日志分散也能串联。示例代码展示了使用过滤器设置追踪ID,并在日志记录及配置中自动包含该ID。这种方法不仅简化了问题定位,还具有良好的扩展性,适用于各种基于Spring Boot的微服务架构。
50 3
|
6月前
修正flowable的发起流程中根据用户信息流转不同的流程
修正flowable的发起流程中根据用户信息流转不同的流程
63 0
|
4月前
|
监控 中间件 数据处理
中间件流程协调与调度
【7月更文挑战第6天】
73 2
|
6月前
|
监控 安全 持续交付
【专栏】Webhook是服务器主动发送事件通知的机制,打破传统客户端轮询模式,实现数据实时高效传递。
【4月更文挑战第29天】Webhook是服务器主动发送事件通知的机制,打破传统客户端轮询模式,实现数据实时高效传递。常用于持续集成部署、第三方服务集成、实时数据同步和监控告警。具有实时性、高效性和灵活性优势,但也面临安全风险和调试挑战。理解并善用Webhook能提升系统性能,广泛应用于现代软件开发和集成。
415 0
|
6月前
|
消息中间件 存储 负载均衡
|
6月前
"发起审批流"和"发起审批实例"是两个相关的概念
"发起审批流"和"发起审批实例"是两个相关的概念【1月更文挑战第9天】【1月更文挑战第43篇】
62 2
|
存储 缓存 监控
聊聊消息中心的设计与实现逻辑
消息通知的流程设计,在各个业务线中通过消息中心提供的接口方法,将不同场景下的消息内容提交到消息中心,消息中心进行统一维护管理,并根据消息的来源和去向,适配相应的推送逻辑。
1173 0
聊聊消息中心的设计与实现逻辑
|
消息中间件 监控 调度
业务状态流转
业务状态流转
|
人工智能 负载均衡 监控
支付宝定时任务怎么做?三层分发任务处理框架介绍
本文将从单机定时调度开始,循序渐进地带领大家了解五福定制三层分发任务处理框架。
24690 3
支付宝定时任务怎么做?三层分发任务处理框架介绍
|
前端开发
多个异步之间的协作方案
业务逻辑可能依赖两个通过回调或事件传递
74 0