【Kafka源码】万字长文详解Kafka网络模型、副本机制(上)

简介: 【Kafka源码】万字长文详解Kafka网络模型、副本机制

背景

驱动学习:

kafka 是业界消息中间件之楷模,他从网络设计、副本同步机制设计的很优秀,业界也很多公司都使用了 kafka,当初我在腾讯的内部后端 serve 有些场景也是通过 Kafka 解耦和实现一次性语义,我们公司目前 kafka 使用的场景也比较多例如 tracking 收集数据,前端能效组采集接口数据,埋点、大数据平台实时流计算,基本都使用 kafka 作为采集端,完成数据上报。

心得感受:

第一次看 kafka broker 的源码的时候,我是一个不懂 scala 语言的人,刚开始看这个代码很难受,过来这个难受时期,你会发现写 kafka 的作者水平相当厉害,代码写的很好,网络模型简直业界楷模,包括他的注释都写的相当不错,所以学习 kafka 一定会有收获,今天把他分享的大家。

技术分析

一、Kafka Borker 网络机制

1.1 总体架构分析

640.png

总结下,这里大概原理: accept 组件监听 9092 端口,当一个客户端发起建立连接请求时,accept 会完成一个新连接的建立,拿到对应的 channel,然后将这个 channel 交给某个 processor 线程,由它来监听处理这个连接的 read ,write 事件,就是请求与响应。当这个客户端发送一个请求的时候,processor 线程就能监听到,拿到对应的请求信息后,将请求信息塞到 RequestChannel 组件里面的请求队列中。然后一堆 RequestHandler 线程不停的从这个请求队列中获取请求信息,然后进行相应的业务逻辑处理,比如说发送消息的请求,它就会找到对应的 parittion,写到对应 partition 在磁盘的文件中。处理完成业务逻辑后会有个处理结果需要告诉客户端,这个时候 handler 线程会将处理结果塞到对应 processor 的响应队列中。processor 会不停的从自己对应的那个响应队列中获取响应,然后写回给对应的客户端。

1.2 网络接收端设计

1.2.1 接收端网络架构图

640.png

首先我们全局的分析下borker启动到监听总个过程是个怎么样的呢?kafka broker 在启动的时候,会根据你配置的 listeners 初始化它的网络组件,用来接收外界的请求,这个 listeners 你可能没配置过,它默认的配置是 listeners=PLAINTEXT://:9092 就是告诉 kafka 使用哪个协议,监听哪个端口,如果我们没有特殊的要求的话,使用它默认的配置就可以了,顶多是修改下端口这块。这个 listeners 是支持配置多套的,就是你可以监听多个端口,一个 listener 就对应着内部这么一套网络模型,我们就介绍一个 listener 的,多个其实都是一样的,就是对应着多套网络模型而已首先会创建一个 accept 组件,这个组件对应着一个线程运行它,它主要是负责监听这个端口,打开一个 selector,使用的是 java 原生的 nio,打开一个 serverSocketChannel,然后专门监听 accept 事件,建立网络连接的接着会为这个 accept 组件建立创建几个 processor 组件,每个 processor 都对应这个一个线程运行,默认是 3 个,是由 num.network.threads 这个参数配置的,这几个 processor 专门是接收请求,发送响应的,每个 processor 都会打开一个 selector 用于事件监听。当 accept 组件收到一个新连接请求的时候会建立一个新连接,就会拿到一个 socketChannel,将这个连接交给 processor,processor 拿到这个 channel 之后就会注册到对应的 selector 上面,监听它的 read 事件,然后后续关于这个连接发送的请求就由某个 processor 线程来处理,处理完之后再将响应写回到这个连接对应的 channel 中。

1.2.2 接收端源码分析

kafkaServerStartable.startup

启动代码入口 main

socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
socketServer.startup()

创建 kafka socket

def startup() {
    this.synchronized {
      .........
      endpoints.values.foreach { endpoint =>
        for (i <- processorBeginIndex until processorEndIndex) {
          processors(i) = new Processor(i,
            time,
            maxRequestSize,
            requestChannel,
            connectionQuotas,
            connectionsMaxIdleMs,
            protocol,
            config.values,
            metrics
          )
        }
        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        acceptors.put(endpoint, acceptor)
        Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
        acceptor.awaitStartup()
       processorBeginIndex = processorEndIndex
      }
    }

这里就创建 processors 线程和 acceptor 线程

private[kafka] class Acceptor(val endPoint: EndPoint,
                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              brokerId: Int,
                              processors: Array[Processor],
                              connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
  private val nioSelector = NSelector.open()
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
  this.synchronized {
    processors.foreach { processor =>
      Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
    }
  }
  // TODO:  这里是重写run methods
  /**
   * Accept loop that checks for new connection attempts
   */
  def run() {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessor = 0
      while (isRunning) {
        try {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable)
                  accept(key, processors(currentProcessor))
                else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
                // round robin to the next processor thread
                currentProcessor = (currentProcessor + 1) % processors.length
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
          // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
          // to a select operation on a specific channel or a bad request. We don't want the
          // the broker to stop responding to requests from other clients in these scenarios.
          case e: ControlThrowable => throw e
          case e: Throwable => error("Error occurred", e)
        }
      }
    } finally {
      debug("Closing server socket and selector.")
      swallowError(serverChannel.close())
      swallowError(nioSelector.close())
      shutdownComplete()
    }
  }
  /*
   * Create a server socket to listen for connections on.
   */
  private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
    val socketAddress =
      if(host == null || host.trim.isEmpty)
        new InetSocketAddress(port)
      else
        new InetSocketAddress(host, port)
    val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false)
    serverChannel.socket().setReceiveBufferSize(recvBufferSize)
    try {
      serverChannel.socket.bind(socketAddress)
      info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, serverChannel.socket.getLocalPort))
    } catch {
      case e: SocketException =>
        throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e)
    }
    serverChannel
  }
  /*
   * Accept a new connection
   */
  def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
    try {
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      socketChannel.socket().setSendBufferSize(sendBufferSize)
      debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
            .format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress,
                  socketChannel.socket.getSendBufferSize, sendBufferSize,
                  socketChannel.socket.getReceiveBufferSize, recvBufferSize))
      processor.accept(socketChannel)
    } catch {
      case e: TooManyConnectionsException =>
        info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
        close(socketChannel)
    }
  }
  /**
   * Wakeup the thread for selection.
   */
  @Override
  def wakeup = nioSelector.wakeup()
}

sacla 跟其他语言不太一样,他可以到处创建函数,然后再函数内部调用外部函数, 首先我们看看这段代码干了一些什么事情,他主要创建 acceptor 线程,然后注册 OP_READ 事件,将接收的线程交给 processor 线程处理,写框架主要在于细节看看是否处理完美,我们首先来看看连接过多怎么办,那么肯定有一个阈值,可以设置,如果超过阈值抛出异常就可以 jvm 自动会帮你捕获到异常,交给业务系统。

/**
   * Queue up a new connection for reading
   */
  def accept(socketChannel: SocketChannel) {
    newConnections.add(socketChannel)
    wakeup()
  }

弄个一容器缓存起来连接通道,然后唤醒线程进度处理。

override def run() {
    startupComplete()
    while(isRunning) {
      try {
        // setup any new connections that have been queued up
        configureNewConnections()
        // register any new responses for writing
        processNewResponses()
        try {
          selector.poll(300)
        } catch {
          case e @ (_: IllegalStateException | _: IOException) =>
            error("Closing processor %s due to illegal state or IO exception".format(id))
            swallow(closeAll())
            shutdownComplete()
            throw e
        }
        selector.completedReceives.asScala.foreach { receive =>
          try {
            val channel = selector.channel(receive.source)
            val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
              channel.socketAddress)
            val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
            requestChannel.sendRequest(req)
          } catch {
            case e @ (_: InvalidRequestException | _: SchemaException) =>
              // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
              error("Closing socket for " + receive.source + " because of error", e)
              close(selector, receive.source)
          }
          selector.mute(receive.source)
        }
        selector.completedSends.asScala.foreach { send =>
          val resp = inflightResponses.remove(send.destination).getOrElse {
            throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
          }
          resp.request.updateRequestMetrics()
          selector.unmute(send.destination)
        }
        selector.disconnected.asScala.foreach { connectionId =>
          val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
            throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
          }.remoteHost
          // the channel has been closed by the selector but the quotas still need to be updated
          connectionQuotas.dec(InetAddress.getByName(remoteHost))
        }
      } catch {
        // We catch all the throwables here to prevent the processor thread from exiting. We do this because
        // letting a processor exit might cause bigger impact on the broker. Usually the exceptions thrown would
        // be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
        // or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
        case e : ControlThrowable => throw e
        case e : Throwable =>
          error("Processor got uncaught exception.", e)
      }
    }

这个方法就是真正要干事的工人, 他主要实现了发送线程与接收线程解耦

apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)

这里主要是 RequestHadler 默认参数# The number of threads doing disk I/O num.io.threads=8 配置这个参数接收与处理线程池的大小

def sendRequest(request: RequestChannel.Request) {
    requestQueue.put(request)
  }

这里就是等待消费线程进行消费,请看下面网络消费端的设计

相关文章
|
11天前
|
安全 网络安全 数据安全/隐私保护
访问控制列表(ACL)是网络安全中的一种重要机制,用于定义和管理对网络资源的访问权限
访问控制列表(ACL)是网络安全中的一种重要机制,用于定义和管理对网络资源的访问权限。它通过设置一系列规则,控制谁可以访问特定资源、在什么条件下访问以及可以执行哪些操作。ACL 可以应用于路由器、防火墙等设备,分为标准、扩展、基于时间和基于用户等多种类型,广泛用于企业网络和互联网中,以增强安全性和精细管理。
62 7
|
1月前
|
消息中间件 分布式计算 算法
大数据-63 Kafka 高级特性 分区 副本机制 宕机恢复 Leader选举
大数据-63 Kafka 高级特性 分区 副本机制 宕机恢复 Leader选举
49 5
大数据-63 Kafka 高级特性 分区 副本机制 宕机恢复 Leader选举
|
1月前
|
机器学习/深度学习 算法 数据安全/隐私保护
基于BP神经网络的苦瓜生长含水量预测模型matlab仿真
本项目展示了基于BP神经网络的苦瓜生长含水量预测模型,通过温度(T)、风速(v)、模型厚度(h)等输入特征,预测苦瓜的含水量。采用Matlab2022a开发,核心代码附带中文注释及操作视频。模型利用BP神经网络的非线性映射能力,对试验数据进行训练,实现对未知样本含水量变化规律的预测,为干燥过程的理论研究提供支持。
|
11天前
|
存储 网络协议 安全
30 道初级网络工程师面试题,涵盖 OSI 模型、TCP/IP 协议栈、IP 地址、子网掩码、VLAN、STP、DHCP、DNS、防火墙、NAT、VPN 等基础知识和技术,帮助小白们充分准备面试,顺利踏入职场
本文精选了 30 道初级网络工程师面试题,涵盖 OSI 模型、TCP/IP 协议栈、IP 地址、子网掩码、VLAN、STP、DHCP、DNS、防火墙、NAT、VPN 等基础知识和技术,帮助小白们充分准备面试,顺利踏入职场。
36 2
|
12天前
|
运维 网络协议 算法
7 层 OSI 参考模型:详解网络通信的层次结构
7 层 OSI 参考模型:详解网络通信的层次结构
34 1
|
1月前
|
网络协议 前端开发 Java
网络协议与IO模型
网络协议与IO模型
网络协议与IO模型
|
1月前
|
机器学习/深度学习 网络架构 计算机视觉
目标检测笔记(一):不同模型的网络架构介绍和代码
这篇文章介绍了ShuffleNetV2网络架构及其代码实现,包括模型结构、代码细节和不同版本的模型。ShuffleNetV2是一个高效的卷积神经网络,适用于深度学习中的目标检测任务。
75 1
目标检测笔记(一):不同模型的网络架构介绍和代码
|
1月前
|
网络协议 Java 应用服务中间件
深入浅出Tomcat网络通信的高并发处理机制
【10月更文挑战第3天】本文详细解析了Tomcat在处理高并发网络请求时的机制,重点关注了其三种不同的IO模型:NioEndPoint、Nio2EndPoint 和 AprEndPoint。NioEndPoint 采用多路复用模型,通过 Acceptor 接收连接、Poller 监听事件及 Executor 处理请求;Nio2EndPoint 则使用 AIO 异步模型,通过回调函数处理连接和数据就绪事件;AprEndPoint 通过 JNI 调用本地库实现高性能,但已在 Tomcat 10 中弃用
深入浅出Tomcat网络通信的高并发处理机制
|
23天前
|
网络协议 算法 网络性能优化
计算机网络常见面试题(一):TCP/IP五层模型、TCP三次握手、四次挥手,TCP传输可靠性保障、ARQ协议
计算机网络常见面试题(一):TCP/IP五层模型、应用层常见的协议、TCP与UDP的区别,TCP三次握手、四次挥手,TCP传输可靠性保障、ARQ协议、ARP协议
|
28天前
|
机器学习/深度学习 人工智能 算法
【车辆车型识别】Python+卷积神经网络算法+深度学习+人工智能+TensorFlow+算法模型
车辆车型识别,使用Python作为主要编程语言,通过收集多种车辆车型图像数据集,然后基于TensorFlow搭建卷积网络算法模型,并对数据集进行训练,最后得到一个识别精度较高的模型文件。再基于Django搭建web网页端操作界面,实现用户上传一张车辆图片识别其类型。
72 0
【车辆车型识别】Python+卷积神经网络算法+深度学习+人工智能+TensorFlow+算法模型
下一篇
无影云桌面