【Kafka源码】万字长文详解Kafka网络模型、副本机制(上)

简介: 【Kafka源码】万字长文详解Kafka网络模型、副本机制

背景

驱动学习:

kafka 是业界消息中间件之楷模,他从网络设计、副本同步机制设计的很优秀,业界也很多公司都使用了 kafka,当初我在腾讯的内部后端 serve 有些场景也是通过 Kafka 解耦和实现一次性语义,我们公司目前 kafka 使用的场景也比较多例如 tracking 收集数据,前端能效组采集接口数据,埋点、大数据平台实时流计算,基本都使用 kafka 作为采集端,完成数据上报。

心得感受:

第一次看 kafka broker 的源码的时候,我是一个不懂 scala 语言的人,刚开始看这个代码很难受,过来这个难受时期,你会发现写 kafka 的作者水平相当厉害,代码写的很好,网络模型简直业界楷模,包括他的注释都写的相当不错,所以学习 kafka 一定会有收获,今天把他分享的大家。

技术分析

一、Kafka Borker 网络机制

1.1 总体架构分析

640.png

总结下,这里大概原理: accept 组件监听 9092 端口,当一个客户端发起建立连接请求时,accept 会完成一个新连接的建立,拿到对应的 channel,然后将这个 channel 交给某个 processor 线程,由它来监听处理这个连接的 read ,write 事件,就是请求与响应。当这个客户端发送一个请求的时候,processor 线程就能监听到,拿到对应的请求信息后,将请求信息塞到 RequestChannel 组件里面的请求队列中。然后一堆 RequestHandler 线程不停的从这个请求队列中获取请求信息,然后进行相应的业务逻辑处理,比如说发送消息的请求,它就会找到对应的 parittion,写到对应 partition 在磁盘的文件中。处理完成业务逻辑后会有个处理结果需要告诉客户端,这个时候 handler 线程会将处理结果塞到对应 processor 的响应队列中。processor 会不停的从自己对应的那个响应队列中获取响应,然后写回给对应的客户端。

1.2 网络接收端设计

1.2.1 接收端网络架构图

640.png

首先我们全局的分析下borker启动到监听总个过程是个怎么样的呢?kafka broker 在启动的时候,会根据你配置的 listeners 初始化它的网络组件,用来接收外界的请求,这个 listeners 你可能没配置过,它默认的配置是 listeners=PLAINTEXT://:9092 就是告诉 kafka 使用哪个协议,监听哪个端口,如果我们没有特殊的要求的话,使用它默认的配置就可以了,顶多是修改下端口这块。这个 listeners 是支持配置多套的,就是你可以监听多个端口,一个 listener 就对应着内部这么一套网络模型,我们就介绍一个 listener 的,多个其实都是一样的,就是对应着多套网络模型而已首先会创建一个 accept 组件,这个组件对应着一个线程运行它,它主要是负责监听这个端口,打开一个 selector,使用的是 java 原生的 nio,打开一个 serverSocketChannel,然后专门监听 accept 事件,建立网络连接的接着会为这个 accept 组件建立创建几个 processor 组件,每个 processor 都对应这个一个线程运行,默认是 3 个,是由 num.network.threads 这个参数配置的,这几个 processor 专门是接收请求,发送响应的,每个 processor 都会打开一个 selector 用于事件监听。当 accept 组件收到一个新连接请求的时候会建立一个新连接,就会拿到一个 socketChannel,将这个连接交给 processor,processor 拿到这个 channel 之后就会注册到对应的 selector 上面,监听它的 read 事件,然后后续关于这个连接发送的请求就由某个 processor 线程来处理,处理完之后再将响应写回到这个连接对应的 channel 中。

1.2.2 接收端源码分析

kafkaServerStartable.startup

启动代码入口 main

socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
socketServer.startup()

创建 kafka socket

def startup() {
    this.synchronized {
      .........
      endpoints.values.foreach { endpoint =>
        for (i <- processorBeginIndex until processorEndIndex) {
          processors(i) = new Processor(i,
            time,
            maxRequestSize,
            requestChannel,
            connectionQuotas,
            connectionsMaxIdleMs,
            protocol,
            config.values,
            metrics
          )
        }
        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        acceptors.put(endpoint, acceptor)
        Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
        acceptor.awaitStartup()
       processorBeginIndex = processorEndIndex
      }
    }

这里就创建 processors 线程和 acceptor 线程

private[kafka] class Acceptor(val endPoint: EndPoint,
                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              brokerId: Int,
                              processors: Array[Processor],
                              connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
  private val nioSelector = NSelector.open()
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
  this.synchronized {
    processors.foreach { processor =>
      Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
    }
  }
  // TODO:  这里是重写run methods
  /**
   * Accept loop that checks for new connection attempts
   */
  def run() {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessor = 0
      while (isRunning) {
        try {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable)
                  accept(key, processors(currentProcessor))
                else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
                // round robin to the next processor thread
                currentProcessor = (currentProcessor + 1) % processors.length
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
          // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
          // to a select operation on a specific channel or a bad request. We don't want the
          // the broker to stop responding to requests from other clients in these scenarios.
          case e: ControlThrowable => throw e
          case e: Throwable => error("Error occurred", e)
        }
      }
    } finally {
      debug("Closing server socket and selector.")
      swallowError(serverChannel.close())
      swallowError(nioSelector.close())
      shutdownComplete()
    }
  }
  /*
   * Create a server socket to listen for connections on.
   */
  private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
    val socketAddress =
      if(host == null || host.trim.isEmpty)
        new InetSocketAddress(port)
      else
        new InetSocketAddress(host, port)
    val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false)
    serverChannel.socket().setReceiveBufferSize(recvBufferSize)
    try {
      serverChannel.socket.bind(socketAddress)
      info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, serverChannel.socket.getLocalPort))
    } catch {
      case e: SocketException =>
        throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e)
    }
    serverChannel
  }
  /*
   * Accept a new connection
   */
  def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
    try {
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      socketChannel.socket().setSendBufferSize(sendBufferSize)
      debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
            .format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress,
                  socketChannel.socket.getSendBufferSize, sendBufferSize,
                  socketChannel.socket.getReceiveBufferSize, recvBufferSize))
      processor.accept(socketChannel)
    } catch {
      case e: TooManyConnectionsException =>
        info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
        close(socketChannel)
    }
  }
  /**
   * Wakeup the thread for selection.
   */
  @Override
  def wakeup = nioSelector.wakeup()
}

sacla 跟其他语言不太一样,他可以到处创建函数,然后再函数内部调用外部函数, 首先我们看看这段代码干了一些什么事情,他主要创建 acceptor 线程,然后注册 OP_READ 事件,将接收的线程交给 processor 线程处理,写框架主要在于细节看看是否处理完美,我们首先来看看连接过多怎么办,那么肯定有一个阈值,可以设置,如果超过阈值抛出异常就可以 jvm 自动会帮你捕获到异常,交给业务系统。

/**
   * Queue up a new connection for reading
   */
  def accept(socketChannel: SocketChannel) {
    newConnections.add(socketChannel)
    wakeup()
  }

弄个一容器缓存起来连接通道,然后唤醒线程进度处理。

override def run() {
    startupComplete()
    while(isRunning) {
      try {
        // setup any new connections that have been queued up
        configureNewConnections()
        // register any new responses for writing
        processNewResponses()
        try {
          selector.poll(300)
        } catch {
          case e @ (_: IllegalStateException | _: IOException) =>
            error("Closing processor %s due to illegal state or IO exception".format(id))
            swallow(closeAll())
            shutdownComplete()
            throw e
        }
        selector.completedReceives.asScala.foreach { receive =>
          try {
            val channel = selector.channel(receive.source)
            val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
              channel.socketAddress)
            val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
            requestChannel.sendRequest(req)
          } catch {
            case e @ (_: InvalidRequestException | _: SchemaException) =>
              // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
              error("Closing socket for " + receive.source + " because of error", e)
              close(selector, receive.source)
          }
          selector.mute(receive.source)
        }
        selector.completedSends.asScala.foreach { send =>
          val resp = inflightResponses.remove(send.destination).getOrElse {
            throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
          }
          resp.request.updateRequestMetrics()
          selector.unmute(send.destination)
        }
        selector.disconnected.asScala.foreach { connectionId =>
          val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
            throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
          }.remoteHost
          // the channel has been closed by the selector but the quotas still need to be updated
          connectionQuotas.dec(InetAddress.getByName(remoteHost))
        }
      } catch {
        // We catch all the throwables here to prevent the processor thread from exiting. We do this because
        // letting a processor exit might cause bigger impact on the broker. Usually the exceptions thrown would
        // be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
        // or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
        case e : ControlThrowable => throw e
        case e : Throwable =>
          error("Processor got uncaught exception.", e)
      }
    }

这个方法就是真正要干事的工人, 他主要实现了发送线程与接收线程解耦

apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)

这里主要是 RequestHadler 默认参数# The number of threads doing disk I/O num.io.threads=8 配置这个参数接收与处理线程池的大小

def sendRequest(request: RequestChannel.Request) {
    requestQueue.put(request)
  }

这里就是等待消费线程进行消费,请看下面网络消费端的设计

相关文章
|
4月前
|
网络协议 算法 Java
基于Reactor模型的高性能网络库之Tcpserver组件-上层调度器
TcpServer 是一个用于管理 TCP 连接的类,包含成员变量如事件循环(EventLoop)、连接池(ConnectionMap)和回调函数等。其主要功能包括监听新连接、设置线程池、启动服务器及处理连接事件。通过 Acceptor 接收新连接,并使用轮询算法将连接分配给子事件循环(subloop)进行读写操作。调用链从 start() 开始,经由线程池启动和 Acceptor 监听,最终由 TcpConnection 管理具体连接的事件处理。
170 2
|
4月前
基于Reactor模型的高性能网络库之Tcpconnection组件
TcpConnection 由 subLoop 管理 connfd,负责处理具体连接。它封装了连接套接字,通过 Channel 监听可读、可写、关闭、错误等
156 1
|
4月前
|
JSON 监控 网络协议
干货分享“对接的 API 总是不稳定,网络分层模型” 看电商 API 故障的本质
本文从 OSI 七层网络模型出发,深入剖析电商 API 不稳定的根本原因,涵盖物理层到应用层的典型故障与解决方案,结合阿里、京东等大厂架构,详解如何构建高稳定性的电商 API 通信体系。
|
1月前
|
机器学习/深度学习 数据采集 人工智能
深度学习实战指南:从神经网络基础到模型优化的完整攻略
🌟 蒋星熠Jaxonic,AI探索者。深耕深度学习,从神经网络到Transformer,用代码践行智能革命。分享实战经验,助你构建CV、NLP模型,共赴二进制星辰大海。
|
2月前
|
机器学习/深度学习 传感器 算法
【无人车路径跟踪】基于神经网络的数据驱动迭代学习控制(ILC)算法,用于具有未知模型和重复任务的非线性单输入单输出(SISO)离散时间系统的无人车的路径跟踪(Matlab代码实现)
【无人车路径跟踪】基于神经网络的数据驱动迭代学习控制(ILC)算法,用于具有未知模型和重复任务的非线性单输入单输出(SISO)离散时间系统的无人车的路径跟踪(Matlab代码实现)
195 2
|
2月前
|
机器学习/深度学习 并行计算 算法
【CPOBP-NSWOA】基于豪冠猪优化BP神经网络模型的多目标鲸鱼寻优算法研究(Matlab代码实现)
【CPOBP-NSWOA】基于豪冠猪优化BP神经网络模型的多目标鲸鱼寻优算法研究(Matlab代码实现)
|
4月前
基于Reactor模型的高性能网络库之Poller(EpollPoller)组件
封装底层 I/O 多路复用机制(如 epoll)的抽象类 Poller,提供统一接口支持多种实现。Poller 是一个抽象基类,定义了 Channel 管理、事件收集等核心功能,并与 EventLoop 绑定。其子类 EPollPoller 实现了基于 epoll 的具体操作,包括事件等待、Channel 更新和删除等。通过工厂方法可创建默认的 Poller 实例,实现多态调用。
289 60
|
4月前
|
安全 调度
基于Reactor模型的高性能网络库之核心调度器:EventLoop组件
它负责:监听事件(如 I/O 可读写、定时器)、分发事件、执行回调、管理事件源 Channel 等。
284 57
|
4月前
基于Reactor模型的高性能网络库之时间篇
是一个用于表示时间戳(精确到微秒)**的简单封装类
197 57
|
3月前
|
算法 安全 网络安全
【多智能体系统】遭受DoS攻击的网络物理多智能体系统的弹性模型预测控制MPC研究(Simulink仿真实现)
【多智能体系统】遭受DoS攻击的网络物理多智能体系统的弹性模型预测控制MPC研究(Simulink仿真实现)
181 0
下一篇
oss云网关配置