多网络情况下,Kafka客户端如何选择合适的网络发起请求

简介: 我们都知道, 每个Broker都可以配置多个监听器, 用来用于网络分流。 相关知识请看:一文搞懂Kafka中的listeners和advertised.listeners以及其他通信配置

提示:本文可能已过期,请点击原文查看:多网络情况下,Kafka客户端如何选择合适的网络发起请求


最近有个同学问了我一个非常有意思的问题, 今天我根据这个问题来给大家好好分析一下。

在这里插入图片描述

1前提知识

我们都知道, 每个Broker都可以配置多个监听器, 用来用于网络分流。 相关知识请看:一文搞懂Kafka中的listeners和advertised.listeners以及其他通信配置

然后, 我们客户端中需要配置bootstrap.servers=xxxx:port 来连接到集群中。 然后当Kafka集群Broker数量很多的时候,我们不可能在bootstrap.servers配置所有的地址

所以Kafka是允许你只配置其中部分地址的, 它会通过自身的元信息更新机制,去获取Kafka集群中的所有地址。然后如果需要去跟某一台Broker发起连接的话,就去元信息里面获取地址。

2问题图述

那么问题来了, 既然一台Broker能够配置多个Listener, 也就意味着有多个地址, 那么客户端在跟具体的Broker发起请求的话, 应该选择哪一个Listener?是遍历吗?

在这里插入图片描述

上图客户端部分获取到的 Broker列表EndPoint应该是什么呢?

3问题源码探究

首先, 客户端(生产者、消费者)去获取集群元信息是通过元信息更新器MetadataUpdater

具体的元信息更新器流程请看 客户端发起元信息更新请求.

我们重点看一下, 获取元信息返回之后,是如何解析Broker集群列表的,确定一下是不是把集群所有的 EndPoint都获取了,还是只获取了一部分。

解析返回的元信息

直接定位到关键代码Metadata#handleMetadataResponse

 

private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse, boolean isPartialUpdate, long nowMs) {
 //省略部分
  Map<Integer, Node> nodes = metadataResponse.brokersById();
        if (isPartialUpdate)
            return this.cache.mergeWith(metadataResponse.clusterId(), nodes, partitions,
                unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller(),
                (topic, isInternal) -> !topics.contains(topic) && retainTopic(topic, isInternal, nowMs));
        else
            return new MetadataCache(metadataResponse.clusterId(), nodes, partitions,
                unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller());
}

源码调试

本次启动的Kafka集群网络相关配置如下

①.  server0.properties


listeners = PLAINTEXT://localhost:9090


②. server1.properties


listeners = PLAINTEXT://localhost:9091,TEXT://localhost:9099
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,TEXT:PLAINTEXT


③. server2.properties


listeners = PLAINTEXT://localhost:9090


上面的配置, 只有 server1. 中的监听器配置了2个。

  1. PLAINTEXT://localhost:9091
  2. TEXT://localhost:9099

然后启动一个KafkaProducer客户端, bootstrap.servers=localhost:9099 .

通过Debug发现, KafkaProducer客户端获取到的元信息集群列表只有

localhost:9099 (id: 1 rack: null)

在这里插入图片描述

也就是说,客户端拿到的信息是在对应Broker处理请求那里就已经做好的筛选了。 为了搞清楚为什么这里只拿到了一个EndPoint信息, 我们需要去看看Broker是如何处理请求的。 注意:这里发出去的请求是 UPDATE_METADATA

所以, 从客户端发出UPDATE_METADATA请求之后, 服务端是如何处理的呢?

处理handleTopicMetadataRequest请求

在分析这个请求之前, 我想再补充一点关于服务端网络通信模型的知识.

网络通信模型

了解更多请看:图解Kafka的服务端的网络通信模型

先看一张服务端网络模型架构图

服务端网络模型架构图

在Kafka启动的时候, 会根据Listener配置,启动对应个数的 AcceptorProcessor

比如在我们这个例子中, 有2个Listener配置, 那么就如下图所示(简化)

在这里插入图片描述

Acceptor:是专门用来监听连接过来的新链接请求的。

Kafka启动的时候会创建对应个数的Acceptor,这个Acceptor持有很多的信息, 比如ChannelBuilder,  这个ChannelBuilder持有 ListenerName, 如下图所示

在这里插入图片描述

也就是说, 不管哪个客户端从哪个监听器访问到服务端, 都是可以确定它对应的监听器名称的。

比如, 配置了下面2个监听器, 如果我客户端通过localhost:9099访问到了Broker, 那么跟这个客户端建立链接的Acceptor就是监听器名为:TEXT 的那个。

  1. PLAINTEXT://localhost:9091
  2. TEXT://localhost:9099

知道这么一个前提之后, 我们再来分析如何处理handleTopicMetadataRequest请求

服务端接受请求入队

当对应的Processor监听到请求过来的时候,会将请求解析一下并组装成Request,然后入队

Processor#processCompletedReceives

在这里插入图片描述

我们可以看到, 在组装Request的时候, 是有把listenerName传入的。

所以:Request持有ListenerName。

处理元信息更新请求 handleTopicMetadataRequest

KafkaApis#handleTopicMetadataRequest

关键代码

在这里插入图片描述

从代码中可以看到, 我们拿到了所有的Brokers的EndPoint, 包括多个监听器也都拿到了。

在这里插入图片描述

例子中, server1 配置了2个listener, 这里就有2个EndPoint。

但是真正把数据发往会客户端的时候, 是有根据listenerName做过滤的!

brokers.flatMap(_.getNode(request.context.listenerName)).asJava,

这个listenerName也就是我们上面一开始分析过的, Processor对应着监听器。 所以我们这个例子中, 监听器名是 TEXT

过滤完了之后,是不是发现只有一个复合要求,也就是他自己有一个TEXT监听器。

假如我们客户端发起请求的时候,**bootstrap.servers=localhost:9091, 是不是就命中的PLAINTEXT**监听器。

因为这个例子中每个Broker都配置了PLAINTEXT监听器, 所以最终会返回3个EndPoint。

4结论

客户端对服务端发起请求的时候, 会根据命中的服务端的监听器, 然后根据这个命中的服务端监听器名listenerName,过滤集群中其他Broker同样是配置了这个监听器名称的EndPoint。

同样用一张图来回答一下最开始的问题图述的问题

在这里插入图片描述

因为图片里面的case,是从listener2 监听到的请求, 那么所有Broker的EndPoint也要用Listener2的监听器名称来进行过滤,也就会得到图中的结果。

5问题

如果客户端bootstrap.servers配置了多个地址,并且这些地址对应的监听器名字还不一样会有啥后果?

举个例子:

服务端配置:

server1



listeners = PLAINTEXT1://IP1:9090,PLAINTEXT3://IP1:9092

server2



listeners = PLAINTEXT1://IP2:9090,PLAINTEXT2://IP2:9091,PLAINTEXT3://IP2:9092

server3




listeners = PLAINTEXT1://IP3:9090,PLAINTEXT2://IP3:9091,PLAINTEXT3://IP3:9092

客户端配置

bootstrap.servers=IP1:9090,IP2:9090,IP3:9091

首先,客户端发起请求的时候,是去bootstrap.servers获取一个最小负载的IP, 然后去获取元数据。

比如第一次更新的时候,我们去 IP1:9090请求元数据了。拿到的listenerName=PLAINTEXT1

这个时候我们拿到的Brokers是 {IP1:9090、IP2:9090、IP3:9090} .

当后续更新的时候,如果去IP3:9091获取数据的时候,拿到的listenerName=PLAINTEXT2

这个时候我们拿到的Brokers是 {IP2:9091、IP3:9091} .

所以:客户端配置bootstrap.servers的时候,尽量配置的地址都是属于同一个ListenerName 。

目录
相关文章
|
7月前
|
JSON 中间件 Go
Go 网络编程:HTTP服务与客户端开发
Go 语言的 `net/http` 包功能强大,可快速构建高并发 HTTP 服务。本文从创建简单 HTTP 服务入手,逐步讲解请求与响应对象、URL 参数处理、自定义路由、JSON 接口、静态文件服务、中间件编写及 HTTPS 配置等内容。通过示例代码展示如何使用 `http.HandleFunc`、`http.ServeMux`、`http.Client` 等工具实现常见功能,帮助开发者掌握构建高效 Web 应用的核心技能。
403 61
|
5月前
|
XML JSON JavaScript
从解决跨域CSOR衍生知识 Network 网络请求深度解析:从快递系统到请求王国-优雅草卓伊凡
从解决跨域CSOR衍生知识 Network 网络请求深度解析:从快递系统到请求王国-优雅草卓伊凡
149 0
从解决跨域CSOR衍生知识 Network 网络请求深度解析:从快递系统到请求王国-优雅草卓伊凡
|
7月前
|
运维 网络协议 Go
Go网络编程:基于TCP的网络服务端与客户端
本文介绍了使用 Go 语言的 `net` 包开发 TCP 网络服务的基础与进阶内容。首先简述了 TCP 协议的基本概念和通信流程,接着详细讲解了服务端与客户端的开发步骤,并提供了简单回显服务的示例代码。同时,文章探讨了服务端并发处理连接的方法,以及粘包/拆包、异常检测、超时控制等进阶技巧。最后通过群聊服务端的实战案例巩固知识点,并总结了 TCP 在高可靠性场景中的优势及 Go 并发模型带来的便利性。
|
Ubuntu 网络协议 Unix
02理解网络IO:实现服务与客户端通信
网络IO指客户端与服务端通过网络进行数据收发的过程,常见于微信、QQ等应用。本文详解如何用C语言实现一个支持多客户端连接的TCP服务端,涉及socket编程、线程处理及通信流程,并分析“一消息一线程”模式的优缺点。
369 0
|
11月前
|
监控 Linux PHP
【02】客户端服务端C语言-go语言-web端PHP语言整合内容发布-优雅草网络设备监控系统-2月12日优雅草简化Centos stream8安装zabbix7教程-本搭建教程非docker搭建教程-优雅草solution
【02】客户端服务端C语言-go语言-web端PHP语言整合内容发布-优雅草网络设备监控系统-2月12日优雅草简化Centos stream8安装zabbix7教程-本搭建教程非docker搭建教程-优雅草solution
399 20
|
12月前
|
前端开发 小程序 Java
uniapp-网络数据请求全教程
这篇文档介绍了如何在uni-app项目中使用第三方包发起网络请求
870 3
|
缓存 网络协议 CDN
在网页请求到显示的过程中,如何优化网络通信速度?
在网页请求到显示的过程中,如何优化网络通信速度?
341 59
|
11月前
|
监控 关系型数据库 MySQL
【01】客户端服务端C语言-go语言-web端PHP语言整合内容发布-优雅草网络设备监控系统-硬件设备实时监控系统运营版发布-本产品基于企业级开源项目Zabbix深度二开-分步骤实现预计10篇合集-自营版
【01】客户端服务端C语言-go语言-web端PHP语言整合内容发布-优雅草网络设备监控系统-硬件设备实时监控系统运营版发布-本产品基于企业级开源项目Zabbix深度二开-分步骤实现预计10篇合集-自营版
343 0
|
数据采集 Web App开发 开发工具
|
数据安全/隐私保护