kafka - advertised.listeners and listeners

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
注册配置 MSE Nacos/ZooKeeper,182元/月
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
简介:

listeners,

Listener List - Comma-separated list of URIs we will listen on and their protocols. 
Specify hostname as 0.0.0.0 to bind to all interfaces. 
Leave hostname empty to bind to default interface. 
Examples of legal listener lists: PLAINTEXT://myhost:9092,TRACE://:9091 PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093

 

advertised.listeners,

Listeners to publish to ZooKeeper for clients to use, if different than the listeners above. 
In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, the value for `listeners` will be used.

 

listeners

是kafka真正bind的地址,

复制代码
/**
 * An NIO socket server. The threading model is
 *   1 Acceptor thread that handles new connections
 *   Acceptor has N Processor threads that each have their own selector and read requests from sockets
 *   M Handler threads that handle requests and produce responses back to the processor threads for writing.
 */
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {

  private val endpoints = config.listeners
    
  /**
   * Start the socket server
   */
  def startup() {
    
    
      endpoints.values.foreach { endpoint =>
        val protocol = endpoint.protocolType
        val processorEndIndex = processorBeginIndex + numProcessorThreads

        for (i <- processorBeginIndex until processorEndIndex)
          processors(i) = newProcessor(i, connectionQuotas, protocol)

        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        acceptors.put(endpoint, acceptor)
        Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
        acceptor.awaitStartup()

        processorBeginIndex = processorEndIndex
      }
  }
复制代码

在socketServer中,可以看到,确实在SocketServer中accept的是listeners

为每个endpoint都建立acceptor和processer

 

advertised.listeners

是暴露给外部的listeners,如果没有设置,会用listeners

KafkaServer.startup

复制代码
        /* tell everyone we are alive */
        val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
          if (endpoint.port == 0)
            (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
          else
            (protocol, endpoint)
        }
        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
          config.interBrokerProtocolVersion)
        kafkaHealthcheck.startup()
复制代码

这里读出advertisedListeners,传给KafkaHealthcheck

复制代码
/**
 * This class registers the broker in zookeeper to allow 
 * other brokers and consumers to detect failures. It uses an ephemeral znode with the path:
 *   /brokers/ids/[0...N] --> advertisedHost:advertisedPort
 *   
 * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise
 * we are dead.
 */
class KafkaHealthcheck(brokerId: Int,
                       advertisedEndpoints: Map[SecurityProtocol, EndPoint],
                       zkUtils: ZkUtils,
                       rack: Option[String],
                       interBrokerProtocolVersion: ApiVersion) extends Logging {
复制代码

像注释你们看到的,

KafkaHealthcheck就是把broker信息注册到zk里面的ephemeral znode,然后当znode消失就知道broker挂了

所以这里注册到zk中的一定是advertisedListeners

复制代码
/**
   * Register this broker as "alive" in zookeeper
   */
  def register() {
    val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
    val updatedEndpoints = advertisedEndpoints.mapValues(endpoint =>
      if (endpoint.host == null || endpoint.host.trim.isEmpty)
        EndPoint(InetAddress.getLocalHost.getCanonicalHostName, endpoint.port, endpoint.protocolType) //如果没有host,默认读取InetAddress.getLocalHost.getCanonicalHostName
      else
        endpoint
    )

    // the default host and port are here for compatibility with older client
    // only PLAINTEXT is supported as default
    // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect
    val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null)) //生成plaintextEndpoint节点,兼容老版本
    zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort, rack, //新的版本只会读updatedEndpoints
      interBrokerProtocolVersion)
  }
复制代码

 

 

问题是如果kafka间同步到底用的是什么listener,

ReplicaManager.makeFollowers

中会创建FetchThread,

        val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
          new TopicAndPartition(partition) -> BrokerAndInitialOffset(
            metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerSecurityProtocol),
            partition.getReplica().get.logEndOffset.messageOffset)).toMap
        replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)

这个逻辑是,broker间做同步的时候,创建FetchThread时的情况,

可以看到,broker信息还是从metadataCache取到的,

从metadataCache取出相应的broker,然后调用getBrokerEndPoint(config.interBrokerSecurityProtocol),取到相应的endpoint

security.inter.broker.protocol,Security protocol used to communicate between brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.

 

而用户拿到的broker信息,

KafkaApis.handleTopicMetadataRequest

复制代码
val brokers = metadataCache.getAliveBrokers
    
    val responseBody = new MetadataResponse(
      brokers.map(_.getNode(request.securityProtocol)).asJava,
      metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
      completeTopicMetadata.asJava,
      requestVersion
    )
复制代码

这里取决于什么安全协议,request.securityProtocol

复制代码
public enum SecurityProtocol {
    /** Un-authenticated, non-encrypted channel */
    PLAINTEXT(0, "PLAINTEXT", false),
    /** SSL channel */
    SSL(1, "SSL", false),
    /** SASL authenticated, non-encrypted channel */
    SASL_PLAINTEXT(2, "SASL_PLAINTEXT", false),
    /** SASL authenticated, SSL channel */
    SASL_SSL(3, "SASL_SSL", false),
    /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */
    TRACE(Short.MAX_VALUE, "TRACE", true);
复制代码

可以看到不同的协议,可以有不同的地址

 

Broker

复制代码
/**
    * Create a broker object from id and JSON string.
    *
    * @param id
    * @param brokerInfoString
    *
    * Version 1 JSON schema for a broker is:
    * {
    *   "version":1,
    *   "host":"localhost",
    *   "port":9092
    *   "jmx_port":9999,
    *   "timestamp":"2233345666"
    * }
    *
    * Version 2 JSON schema for a broker is:
    * {
    *   "version":2,
    *   "host":"localhost",
    *   "port":9092
    *   "jmx_port":9999,
    *   "timestamp":"2233345666",
    *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"]
    * }
    *
    * Version 3 (current) JSON schema for a broker is:
    * {
    *   "version":3,
    *   "host":"localhost",
    *   "port":9092
    *   "jmx_port":9999,
    *   "timestamp":"2233345666",
    *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
    *   "rack":"dc1"
    * }
    */
  def createBroker(id: Int, brokerInfoString: String): Broker = {
    if (brokerInfoString == null)
      throw new BrokerNotAvailableException(s"Broker id $id does not exist")
    try {
      Json.parseFull(brokerInfoString) match {
        case Some(m) =>
          val brokerInfo = m.asInstanceOf[Map[String, Any]]
          val version = brokerInfo("version").asInstanceOf[Int]
          val endpoints =
            if (version < 1)
              throw new KafkaException(s"Unsupported version of broker registration: $brokerInfoString")
            else if (version == 1) {
              val host = brokerInfo("host").asInstanceOf[String]
              val port = brokerInfo("port").asInstanceOf[Int]
              Map(SecurityProtocol.PLAINTEXT -> new EndPoint(host, port, SecurityProtocol.PLAINTEXT))
            }
            else {
              val listeners = brokerInfo("endpoints").asInstanceOf[List[String]]
              listeners.map { listener =>
                val ep = EndPoint.createEndPoint(listener)
                (ep.protocolType, ep)
              }.toMap
            }
          val rack = brokerInfo.get("rack").filter(_ != null).map(_.asInstanceOf[String])
          new Broker(id, endpoints, rack)
        case None =>
          throw new BrokerNotAvailableException(s"Broker id $id does not exist")
      }
    } catch {
      case t: Throwable =>
        throw new KafkaException(s"Failed to parse the broker info from zookeeper: $brokerInfoString", t)
    }
  }
}
复制代码

可以看到,老版本的是用host,port

而新版本都是用endpoints,里面可以定义各种协议下的listeners

 

zkUtil

复制代码
/**
   * This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker
   * or throws an exception if the broker dies before the query to zookeeper finishes
   *
   * @param brokerId The broker id
   * @return An optional Broker object encapsulating the broker metadata
   */
  def getBrokerInfo(brokerId: Int): Option[Broker] = {
    readDataMaybeNull(BrokerIdsPath + "/" + brokerId)._1 match {
      case Some(brokerInfo) => Some(Broker.createBroker(brokerId, brokerInfo))
      case None => None
    }
  }
复制代码

zkUtil只是读出zk中相应的内容并createBroker

 

结论,

listeners,用于server真正bind

advertisedListeners, 用于开发给用户,如果没有设定,直接使用listeners

 

当前kafka没有区分内外部的流量,一旦设置advertisedListeners,所有流量都会使用这个配置,明显不合理啊

https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic

会解决这个问题

相关文章
|
消息中间件 安全 Kafka
一文搞懂Kafka中的listeners配置策略
1. listeners中的plaintext controller external是什么意思? 2. Kraft模式下controller和broker有何区别? 3. 集群节点之间同步什么数据,通过哪个端口,是否可以自定义端口? 4. 客户端通过哪个端口连接到kafka,通过9092连接的是什么,broker还是controller? 5. 为controller配置了单独的端口有什么用? 6. control.plane.listener.name与controller.listener.names有何区别?
2817 2
|
10月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
469 1
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
320 1
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
1168 9
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
255 3
|
消息中间件 存储 Kafka
kafka 在 zookeeper 中保存的数据内容
kafka 在 zookeeper 中保存的数据内容
215 3
|
消息中间件 负载均衡 Kafka
微服务数据问题之Kafka实现高可用如何解决
微服务数据问题之Kafka实现高可用如何解决
169 1
|
消息中间件 存储 负载均衡
微服务数据问题之Kafka作为元数据节点如何解决
微服务数据问题之Kafka作为元数据节点如何解决
157 1

热门文章

最新文章