【Kafka源码】万字长文详解Kafka网络模型、副本机制(上)

简介: 【Kafka源码】万字长文详解Kafka网络模型、副本机制

背景

驱动学习:

kafka 是业界消息中间件之楷模,他从网络设计、副本同步机制设计的很优秀,业界也很多公司都使用了 kafka,当初我在腾讯的内部后端 serve 有些场景也是通过 Kafka 解耦和实现一次性语义,我们公司目前 kafka 使用的场景也比较多例如 tracking 收集数据,前端能效组采集接口数据,埋点、大数据平台实时流计算,基本都使用 kafka 作为采集端,完成数据上报。

心得感受:

第一次看 kafka broker 的源码的时候,我是一个不懂 scala 语言的人,刚开始看这个代码很难受,过来这个难受时期,你会发现写 kafka 的作者水平相当厉害,代码写的很好,网络模型简直业界楷模,包括他的注释都写的相当不错,所以学习 kafka 一定会有收获,今天把他分享的大家。

技术分析

一、Kafka Borker 网络机制

1.1 总体架构分析

640.png

总结下,这里大概原理: accept 组件监听 9092 端口,当一个客户端发起建立连接请求时,accept 会完成一个新连接的建立,拿到对应的 channel,然后将这个 channel 交给某个 processor 线程,由它来监听处理这个连接的 read ,write 事件,就是请求与响应。当这个客户端发送一个请求的时候,processor 线程就能监听到,拿到对应的请求信息后,将请求信息塞到 RequestChannel 组件里面的请求队列中。然后一堆 RequestHandler 线程不停的从这个请求队列中获取请求信息,然后进行相应的业务逻辑处理,比如说发送消息的请求,它就会找到对应的 parittion,写到对应 partition 在磁盘的文件中。处理完成业务逻辑后会有个处理结果需要告诉客户端,这个时候 handler 线程会将处理结果塞到对应 processor 的响应队列中。processor 会不停的从自己对应的那个响应队列中获取响应,然后写回给对应的客户端。

1.2 网络接收端设计

1.2.1 接收端网络架构图

640.png

首先我们全局的分析下borker启动到监听总个过程是个怎么样的呢?kafka broker 在启动的时候,会根据你配置的 listeners 初始化它的网络组件,用来接收外界的请求,这个 listeners 你可能没配置过,它默认的配置是 listeners=PLAINTEXT://:9092 就是告诉 kafka 使用哪个协议,监听哪个端口,如果我们没有特殊的要求的话,使用它默认的配置就可以了,顶多是修改下端口这块。这个 listeners 是支持配置多套的,就是你可以监听多个端口,一个 listener 就对应着内部这么一套网络模型,我们就介绍一个 listener 的,多个其实都是一样的,就是对应着多套网络模型而已首先会创建一个 accept 组件,这个组件对应着一个线程运行它,它主要是负责监听这个端口,打开一个 selector,使用的是 java 原生的 nio,打开一个 serverSocketChannel,然后专门监听 accept 事件,建立网络连接的接着会为这个 accept 组件建立创建几个 processor 组件,每个 processor 都对应这个一个线程运行,默认是 3 个,是由 num.network.threads 这个参数配置的,这几个 processor 专门是接收请求,发送响应的,每个 processor 都会打开一个 selector 用于事件监听。当 accept 组件收到一个新连接请求的时候会建立一个新连接,就会拿到一个 socketChannel,将这个连接交给 processor,processor 拿到这个 channel 之后就会注册到对应的 selector 上面,监听它的 read 事件,然后后续关于这个连接发送的请求就由某个 processor 线程来处理,处理完之后再将响应写回到这个连接对应的 channel 中。

1.2.2 接收端源码分析

kafkaServerStartable.startup

启动代码入口 main

socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
socketServer.startup()

创建 kafka socket

def startup() {
    this.synchronized {
      .........
      endpoints.values.foreach { endpoint =>
        for (i <- processorBeginIndex until processorEndIndex) {
          processors(i) = new Processor(i,
            time,
            maxRequestSize,
            requestChannel,
            connectionQuotas,
            connectionsMaxIdleMs,
            protocol,
            config.values,
            metrics
          )
        }
        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        acceptors.put(endpoint, acceptor)
        Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
        acceptor.awaitStartup()
       processorBeginIndex = processorEndIndex
      }
    }

这里就创建 processors 线程和 acceptor 线程

private[kafka] class Acceptor(val endPoint: EndPoint,
                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              brokerId: Int,
                              processors: Array[Processor],
                              connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
  private val nioSelector = NSelector.open()
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
  this.synchronized {
    processors.foreach { processor =>
      Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
    }
  }
  // TODO:  这里是重写run methods
  /**
   * Accept loop that checks for new connection attempts
   */
  def run() {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessor = 0
      while (isRunning) {
        try {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable)
                  accept(key, processors(currentProcessor))
                else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
                // round robin to the next processor thread
                currentProcessor = (currentProcessor + 1) % processors.length
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
          // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
          // to a select operation on a specific channel or a bad request. We don't want the
          // the broker to stop responding to requests from other clients in these scenarios.
          case e: ControlThrowable => throw e
          case e: Throwable => error("Error occurred", e)
        }
      }
    } finally {
      debug("Closing server socket and selector.")
      swallowError(serverChannel.close())
      swallowError(nioSelector.close())
      shutdownComplete()
    }
  }
  /*
   * Create a server socket to listen for connections on.
   */
  private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
    val socketAddress =
      if(host == null || host.trim.isEmpty)
        new InetSocketAddress(port)
      else
        new InetSocketAddress(host, port)
    val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false)
    serverChannel.socket().setReceiveBufferSize(recvBufferSize)
    try {
      serverChannel.socket.bind(socketAddress)
      info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, serverChannel.socket.getLocalPort))
    } catch {
      case e: SocketException =>
        throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e)
    }
    serverChannel
  }
  /*
   * Accept a new connection
   */
  def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
    try {
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      socketChannel.socket().setSendBufferSize(sendBufferSize)
      debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
            .format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress,
                  socketChannel.socket.getSendBufferSize, sendBufferSize,
                  socketChannel.socket.getReceiveBufferSize, recvBufferSize))
      processor.accept(socketChannel)
    } catch {
      case e: TooManyConnectionsException =>
        info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
        close(socketChannel)
    }
  }
  /**
   * Wakeup the thread for selection.
   */
  @Override
  def wakeup = nioSelector.wakeup()
}

sacla 跟其他语言不太一样,他可以到处创建函数,然后再函数内部调用外部函数, 首先我们看看这段代码干了一些什么事情,他主要创建 acceptor 线程,然后注册 OP_READ 事件,将接收的线程交给 processor 线程处理,写框架主要在于细节看看是否处理完美,我们首先来看看连接过多怎么办,那么肯定有一个阈值,可以设置,如果超过阈值抛出异常就可以 jvm 自动会帮你捕获到异常,交给业务系统。

/**
   * Queue up a new connection for reading
   */
  def accept(socketChannel: SocketChannel) {
    newConnections.add(socketChannel)
    wakeup()
  }

弄个一容器缓存起来连接通道,然后唤醒线程进度处理。

override def run() {
    startupComplete()
    while(isRunning) {
      try {
        // setup any new connections that have been queued up
        configureNewConnections()
        // register any new responses for writing
        processNewResponses()
        try {
          selector.poll(300)
        } catch {
          case e @ (_: IllegalStateException | _: IOException) =>
            error("Closing processor %s due to illegal state or IO exception".format(id))
            swallow(closeAll())
            shutdownComplete()
            throw e
        }
        selector.completedReceives.asScala.foreach { receive =>
          try {
            val channel = selector.channel(receive.source)
            val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
              channel.socketAddress)
            val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
            requestChannel.sendRequest(req)
          } catch {
            case e @ (_: InvalidRequestException | _: SchemaException) =>
              // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
              error("Closing socket for " + receive.source + " because of error", e)
              close(selector, receive.source)
          }
          selector.mute(receive.source)
        }
        selector.completedSends.asScala.foreach { send =>
          val resp = inflightResponses.remove(send.destination).getOrElse {
            throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
          }
          resp.request.updateRequestMetrics()
          selector.unmute(send.destination)
        }
        selector.disconnected.asScala.foreach { connectionId =>
          val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
            throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
          }.remoteHost
          // the channel has been closed by the selector but the quotas still need to be updated
          connectionQuotas.dec(InetAddress.getByName(remoteHost))
        }
      } catch {
        // We catch all the throwables here to prevent the processor thread from exiting. We do this because
        // letting a processor exit might cause bigger impact on the broker. Usually the exceptions thrown would
        // be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
        // or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
        case e : ControlThrowable => throw e
        case e : Throwable =>
          error("Processor got uncaught exception.", e)
      }
    }

这个方法就是真正要干事的工人, 他主要实现了发送线程与接收线程解耦

apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)

这里主要是 RequestHadler 默认参数# The number of threads doing disk I/O num.io.threads=8 配置这个参数接收与处理线程池的大小

def sendRequest(request: RequestChannel.Request) {
    requestQueue.put(request)
  }

这里就是等待消费线程进行消费,请看下面网络消费端的设计

相关文章
|
1月前
|
安全 网络安全 数据安全/隐私保护
访问控制列表(ACL)是网络安全中的一种重要机制,用于定义和管理对网络资源的访问权限
访问控制列表(ACL)是网络安全中的一种重要机制,用于定义和管理对网络资源的访问权限。它通过设置一系列规则,控制谁可以访问特定资源、在什么条件下访问以及可以执行哪些操作。ACL 可以应用于路由器、防火墙等设备,分为标准、扩展、基于时间和基于用户等多种类型,广泛用于企业网络和互联网中,以增强安全性和精细管理。
183 7
|
4月前
|
缓存 应用服务中间件 nginx
Web服务器的缓存机制与内容分发网络(CDN)
【8月更文第28天】随着互联网应用的发展,用户对网站响应速度的要求越来越高。为了提升用户体验,Web服务器通常会采用多种技术手段来优化页面加载速度,其中最重要的两种技术就是缓存机制和内容分发网络(CDN)。本文将深入探讨这两种技术的工作原理及其实现方法,并通过具体的代码示例加以说明。
445 1
|
2月前
|
网络协议 Java 应用服务中间件
深入浅出Tomcat网络通信的高并发处理机制
【10月更文挑战第3天】本文详细解析了Tomcat在处理高并发网络请求时的机制,重点关注了其三种不同的IO模型:NioEndPoint、Nio2EndPoint 和 AprEndPoint。NioEndPoint 采用多路复用模型,通过 Acceptor 接收连接、Poller 监听事件及 Executor 处理请求;Nio2EndPoint 则使用 AIO 异步模型,通过回调函数处理连接和数据就绪事件;AprEndPoint 通过 JNI 调用本地库实现高性能,但已在 Tomcat 10 中弃用
深入浅出Tomcat网络通信的高并发处理机制
|
2月前
|
消息中间件 JSON 大数据
大数据-65 Kafka 高级特性 分区 Broker自动再平衡 ISR 副本 宕机恢复再重平衡 实测
大数据-65 Kafka 高级特性 分区 Broker自动再平衡 ISR 副本 宕机恢复再重平衡 实测
77 4
|
2月前
|
消息中间件 SQL 分布式计算
大数据-74 Kafka 高级特性 稳定性 - 控制器、可靠性 副本复制、失效副本、副本滞后 多图一篇详解
大数据-74 Kafka 高级特性 稳定性 - 控制器、可靠性 副本复制、失效副本、副本滞后 多图一篇详解
29 2
|
2月前
|
消息中间件 Java 大数据
Kafka ISR机制详解!
本文详细解析了Kafka的ISR(In-Sync Replicas)机制,阐述其工作原理及如何确保消息的高可靠性和高可用性。ISR动态维护与Leader同步的副本集,通过不同ACK确认机制(如acks=0、acks=1、acks=all),平衡可靠性和性能。此外,ISR机制支持故障转移,当Leader失效时,可从ISR中选取新的Leader。文章还包括实例分析,展示了ISR在不同场景下的变化,并讨论了其优缺点,帮助读者更好地理解和应用ISR机制。
94 0
Kafka ISR机制详解!
|
2月前
|
消息中间件 Java Kafka
Kafka ACK机制详解!
本文深入剖析了Kafka的ACK机制,涵盖其原理、源码分析及应用场景,并探讨了acks=0、acks=1和acks=all三种级别的优缺点。文中还介绍了ISR(同步副本)的工作原理及其维护机制,帮助读者理解如何在性能与可靠性之间找到最佳平衡。适合希望深入了解Kafka消息传递机制的开发者阅读。
244 0
|
4月前
|
消息中间件 负载均衡 Java
揭秘Kafka背后的秘密!Kafka 架构设计大曝光:深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理及流传输设计的高效率消息系统。其核心特性包括高吞吐量、低延迟及出色的可扩展性。Kafka采用分布式日志模型,支持数据分区与副本,确保数据可靠性和持久性。系统由Producer(消息生产者)、Consumer(消息消费者)及Broker(消息服务器)组成。Kafka支持消费者组,实现数据并行处理,提升整体性能。通过内置的故障恢复机制,即使部分节点失效,系统仍能保持稳定运行。提供的Java示例代码展示了如何使用Kafka进行消息的生产和消费,并演示了故障转移处理过程。
56 3
|
4月前
|
消息中间件 Java Kafka
如何在Kafka分布式环境中保证消息的顺序消费?深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据管道和流处理设计的分布式平台,以其高效的消息发布与订阅功能著称。在分布式环境中确保消息按序消费颇具挑战。本文首先介绍了Kafka通过Topic分区实现消息排序的基本机制,随后详细阐述了几种保证消息顺序性的策略,包括使用单分区Topic、消费者组搭配单分区消费、幂等性生产者以及事务支持等技术手段。最后,通过一个Java示例演示了如何利用Kafka消费者确保消息按序消费的具体实现过程。
171 3
|
4月前
|
消息中间件 负载均衡 Java
"深入Kafka核心:探索高效灵活的Consumer机制,以Java示例展示数据流的优雅消费之道"
【8月更文挑战第10天】在大数据领域,Apache Kafka凭借其出色的性能成为消息传递与流处理的首选工具。Kafka Consumer作为关键组件,负责优雅地从集群中提取并处理数据。它支持消息的负载均衡与容错,通过Consumer Group实现消息的水平扩展。下面通过一个Java示例展示如何启动Consumer并消费数据,同时体现了Kafka Consumer设计的灵活性与高效性,使其成为复杂消费场景的理想选择。
140 4