【Kafka从成神到升仙系列 六】kafka 不能失去网络通信,就像西方不能失去耶路撒冷

简介: 【Kafka从成神到升仙系列 六】kafka 不能失去网络通信,就像西方不能失去耶路撒冷

服务端的网络架构

初学一个技术,怎么了解该技术的源码至关重要。


对我而言,最佳的阅读源码的方式,那就是:不求甚解,观其大略


你如果进到庐山里头,二话不说,蹲下头来,弯下腰,就对着某棵树某棵小草猛研究而不是说先把庐山的整体脉络研究清楚了,那么你的学习方法肯定效率巨低而且特别痛苦。


最重要的还是慢慢地打击你的积极性,说我的学习怎么那么不 happy 啊,怎么那么没劲那,因为你的学习方法错了,大体读明白,先拿来用,用着用着,很多道理你就明白了。


先从整体上把关源码,再去扣一些细节问题。


举个简单的例子:


如果你刚接触 HashMap,你刚有兴趣去看其源码,在看 HashMap 的时候,有一个知识:当链表长度达到 8 之后,就变为了红黑树,小于 6 就变成了链表,当然,还和当前的长度有关。


这个时候,如果你去深究红黑树、为什么是 8 不是别的,又去查 泊松分布,最终会慢慢的搞死自己。


所以,正确的做法,我们先把这一部分给略过去,知道这个概念即可,等后面我们把整个庐山看完之后,再回过头抠细节。


当然,本章我们讲述Kafka 服务端的网络架构

一、服务端网络整体架构

我们讲过了生产者整个的调用流程及发送流程,今天我们来讲一下服务端到底是怎么样处理客户端的连接的

我们都知道,Kafka 作为一个吞吐量极高的中间件,其通信过程自然而然也起到了关键性的作用

之前我们聊过,Kafka 并未用 Netty 作为其通信框架,而是自己自研的

那么,这个自研的框架到底怎么做的呢?和 Netty 相比又如何?

同样,还有一个前提的问题,希望大家在读本篇博客的时候,能够思考一下:Kafka 如何在高吞吐的状态下仍然能保证单 Partition 的有序性?

废话不多说,我们直接开车!

首先,我们看一下服务端的网络架构图:

  • Acceptor 初始化的时候会注册 OP_ACCEPT 事件,当有客户端连接进来时,会触发该事件并将该事件 轮询 的方式分发给 Processor 处理。
  • Processor 收到 Acceptor 分发的连接时,会注册OP_READ 事件并与内部的 selector 绑定,当下次客户端发送信息时,直接触发 ProcessorOP_READ 事件进行处理。
  • Processor 将客户端的连接请求放入 RequestQueue(仅有一个) 里面,
  • 所有的 Processor 共用一个 RequestQueue
  • KafkaRequestHandlerRequestQueue 中取出请求,通过调用 KafkaApis 得到响应结果,将响应结果放入到 responseQueues ,这里需要注意一点:
  • rocessor 有几个 responseQueue
  • Processor 从对应的 responseQueue 中取出 response,将其通过 SockerChannel 发送给对应的客户端、

这些就是 Kafka 服务端网络的整体架构

下面我们详细的拆解每一部分的实现细节

二、服务端源代码剖析

kafka 服务端的启动类为 kafka.scala,主要启动 KafkaServer 服务端

KafkaServer 的启动代码如下:

socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
socketServer.startup()

实际上真正的服务启动是 SocketServer

1、SocketServer

class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {
  private val endpoints = config.listeners  // 开放的端口数
  private val numProcessorThreads = config.numNetworkThreads // 默认为 3个,即 processor
  private val maxQueuedRequests = config.queuedMaxRequests // request 队列中允许的最多请求数,默认是500
  private val totalProcessorThreads = numProcessorThreads * endpoints.size // 每个端口会对应 N 个 processor
  val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
  private val processors = new Array[Processor](totalProcessorThreads)
  private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
}
// requestQueue:只有一个
// responseQueues:每个 Processor 都对应一个
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
  private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
  private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
  for(i <- 0 until numProcessors)
    responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
  )

1.1 初始化

对于初始化来说,主要完成 Processoracceptor 的创建

def startup() {
  this.synchronized {
    // 发送和接受的缓存区大小
    val sendBufferSize = config.socketSendBufferBytes
    val recvBufferSize = config.socketReceiveBufferBytes
    val brokerId = config.brokerId
    var processorBeginIndex = 0
    // endpoint:开放的端口数,默认一个 Broker 开放一个
    endpoints.values.foreach { endpoint =>
      val protocol = endpoint.protocolType
      val processorEndIndex = processorBeginIndex + numProcessorThreads
      // Processor:默认为三个
      for (i <- processorBeginIndex until processorEndIndex 默认为 3
        processors(i) = newProcessor(i, connectionQuotas, protocol)
      // Acceptor: 默认一个
      val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
        processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
      acceptors.put(endpoint, acceptor)
      // 等待线程的启动
      acceptor.awaitStartup()
      processorBeginIndex = processorEndIndex
    }
  }

1.2 Acceptor 处理

上面我们创建完了AcceptorProcessor,首先看一下 Acceptor 的处理

  • 首先向 nioSelector 注册接受 OP_ACCEPT 事件,监听是否有新的连接请求
  • 如果有新的连接请求接入,将该连接的 SocketChannel 交于 processors 进行处理
  • 由于 processor 存在多个,以轮询的方式去交付,保证 processor 的负载均衡
def run() {
  // 注册OP_ACCEPT事件
  serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
  // 线程启动完成
  startupComplete()
  try {
    var currentProcessor = 0
    // 死循环
    while (isRunning) {
      try {
        // 查看有没有注册的连接进来
        val ready = nioSelector.select(500)
        if (ready > 0) {
          // 拿出所有的keys并遍历
          val keys = nioSelector.selectedKeys()
          val iter = keys.iterator()
          while (iter.hasNext && isRunning) {
            try {
              val key = iter.next
              // 用完即删
              iter.remove()
              // 如果当前的是接受事件,则进行接受事件相应的处理
              if (key.isAcceptable)
                accept(key, processors(currentProcessor))
              // 轮询的方式选择下一个Processor线程
              currentProcessor = (currentProcessor + 1) % processors.length
            }
          }
        }
      }
    }
  }
}

交于 processors 处理的逻辑:

  • 拿到当前 ServerSocketChannel 上的 socketChannel 并进行一些对应的配置
  • socketChannel 放入 newConnections 中并唤醒我们的 processor
/*
 * 接受一个新连接
 */
def accept(key: SelectionKey, processor: Processor) {
  // accept 事件发生时,获取注册到 selector 上的 ServerSocketChannel
  val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
  val socketChannel = serverSocketChannel.accept()
  try {
    // socketChannel的各种配置
    connectionQuotas.inc(socketChannel.socket().getInetAddress)
    socketChannel.configureBlocking(false)
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setKeepAlive(true)
    socketChannel.socket().setSendBufferSize(sendBufferSize)
    processor.accept(socketChannel)
  }
}
// 将新的 SocketChannel放入到newConnections中去
def accept(socketChannel: SocketChannel) {
    newConnections.add(socketChannel)
    // 唤醒 Processor 的 selector(如果此时在阻塞的话)
    wakeup()
}

1.3 Processor 处理

前面我们讲过,AcceptorsocketChannel 放到了 newConnections 队列中并唤醒我们的 Processor 线程

我们可以猜测到,Processor 肯定是从 newConnections 中拿出 socketChannel 去处理

我们的猜测正不正确呢?来看看源码怎么说

override def run() {
    startupComplete()
    while (isRunning) {
      try {
        /**
         * 从 newConnections 弹出当前的 channel
         * 将当前的 channel 绑定到 nioSelector 并注册 OP_READ 事件
         */
        configureNewConnections()
        /**
         * 拿到属于自己的 responseQueues 并处理其中的 response
         * 其中 response 分为三类:
         * NoOpAction:如果这个请求不需要返回 response,再次注册 OP_READ 监听事件
         * SendAction:需要发送,后续注册 OP_WRITE 监听事件,最终通过 poll 发送(类似我们的生产者消息发送)
         * CloseConnectionAction:需要关闭的 response
         */
        processNewResponses()
        /**
         * 选择器轮询各种事件,请求和发送响应
         * 比如上面我们需要发送的 response,就通过 poll 发送出去(代码逻辑和生产者类似,不再细讲)
         */
        poll()
        /**
         * 服务端处理器的运行方法在调用选择器的轮询后,处理已经完成的请求接收
         * 请求接受:将请求放入到 requestQueue 中并删除掉 OP_READ 事件注册
         */
        processCompletedReceives()
        /**
         * 服务端处理器的运行方法在调用选择器的轮询后,处理已经完成的响应发送
         * 响应发送:当有写请求时加入inflightResponses,当写请求完成后删除并添加 OP_READ 事件监听
         */
        processCompletedSends()
        processDisconnected()
      }
    }
    swallowError(closeAll())
    shutdownComplete()
  }

我们通过源码可以看到,总共分五个步骤:

  • ProcessornewConnections 取出 socketChannel 并注册 OP_READ 事件监听
  • 处理 responseQueuesresponse,总共分三个类型:
  • NoOpAction:如果这个请求不需要返回 response,再次注册 OP_READ 监听事件
  • SendAction:需要发送,后续注册 OP_WRITE 监听事件,最终通过 poll 发送(类似我们的生产者消息发送)
  • seConnectionAction:需要关闭的 response
  • 上面我们注册了 OP_WRITE 事件,在 poll 阶段会被监听到并发送至客户端
  • 处理客户端的一些请求,将其放入到 requestQueue 并删除掉 OP_READ 事件监听
  • 处理响应请求,当有写请求时加入 inflightResponses,当写请求完成后删除并添加 OP_READ 事件监听

当然,我们可以简单的理解一下整个流程:


这里给大家留一个小问题:为什么要频繁的删除掉 OP_READ 事件监听、增加 OP_READ 事件监听?

2、KafkaRequestHandlerPool

按照我们架构图所示,不出所料的话,应该到 KafkaRequestHandlerPool 这一部分了


通过架构图我们可以得知这部分的主要功能:

  • 获取 requestQueue 中的请求,通过 KafkaApis 得到对应的结果
  • 将结果放入到响应队列(responseQueues)中

2.1 初始化

  • KafkaRequestHandlerPool 中创建 numThreadsKafkaRequestHandler 并启动
  • 在初始化 KafkaRequestHandler的时候,
  • 我们发现其入参有个 requestChannel,这个入参是 Processor 存放 request 请求的地方,也是 Handler 处理完请求存放 response 的地方
class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              numThreads: Int) extends Logging with KafkaMetricsGroup {
  private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
  val threads = new Array[Thread](numThreads)
  val runnables = new Array[KafkaRequestHandler](numThreads)
  for(i <- 0 until numThreads) {
    // 开启 numThreads 个 KafkaRequestHandler 并启动
    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
    threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
    threads(i).start()
  }
}

2.2 KafkaRequestHandler

  • RequestChannel 得到 Requests 并交由 KafkaApis 去处理
def run() {
  while(true) {
    try {
      var req : RequestChannel.Request = null
      while (req == null) {
        val startSelectTime = SystemTime.nanoseconds
        req = requestChannel.receiveRequest(300)
        val idleTime = SystemTime.nanoseconds - startSelectTime
        aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
      }
      req.requestDequeueTimeMs = SystemTime.milliseconds
      apis.handle(req)
    }
  }
}

3、KafkaApis

上面讲到 KafkaRequestHandlerRequestChannel 得到 Requests 并交由 KafkaApis 去处理

那么到底是一个怎么样的处理逻辑呢?

def handle(request: RequestChannel.Request) {
  try {
      format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
    ApiKeys.forId(request.requestId) match {
      case ApiKeys.PRODUCE => handleProducerRequest(request)
      case ApiKeys.FETCH => handleFetchRequest(request)
      case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
      case ApiKeys.METADATA => handleTopicMetadataRequest(request)
      case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
      case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
      case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
      case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
      case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
      case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
      case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
      case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
      case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
      case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
      case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
      case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
      case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
      case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
      case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
      case requestId => throw new KafkaException("Unknown api code " + requestId)
    }
  } finally
    request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
  • 根据 ApiKeys 不同的类别,走不同的处理方式
  • 这里的类别
  PRODUCE(0, "Produce"),
    FETCH(1, "Fetch"),
    LIST_OFFSETS(2, "Offsets"),
    METADATA(3, "Metadata"),
    LEADER_AND_ISR(4, "LeaderAndIsr"),
    STOP_REPLICA(5, "StopReplica"),
    UPDATE_METADATA_KEY(6, "UpdateMetadata"),
    CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
    OFFSET_COMMIT(8, "OffsetCommit"),
    OFFSET_FETCH(9, "OffsetFetch"),
    GROUP_COORDINATOR(10, "GroupCoordinator"),
    JOIN_GROUP(11, "JoinGroup"),
    HEARTBEAT(12, "Heartbeat"),
    LEAVE_GROUP(13, "LeaveGroup"),
    SYNC_GROUP(14, "SyncGroup"),
    DESCRIBE_GROUPS(15, "DescribeGroups"),
    LIST_GROUPS(16, "ListGroups"),
    SASL_HANDSHAKE(17, "SaslHandshake"),
    API_VERSIONS(18, "ApiVersions");

3.1 响应返回

当我们的 ApiKeys 处理完相对应的请求时,会执行以下方法:

// 将响应发送回套接字服务器,以便通过网络发送
def sendResponse(response: RequestChannel.Response) {
  // 将得到的响应放入到 responseQueues 中
  responseQueues(response.processor).put(response)
  for(onResponse <- responseListeners)
    // 调用对应 processor 的 wakeup 方法
    onResponse(response.processor)
}

至于每个类型的请求是如何处理的,这一章我们暂时不讲

我们继续完善一下上面的图片:

三、问题解析

经过我们上面的讲述,相信大家对整个 服务端网络整体架构 有了更深的认识

还记得我在文中提到的两个问题嘛?

  • Kafka 如何在高吞吐的状态下仍然能保证单 Partition 的有序性?
  • 接下来就是见证奇迹的时刻,也是面试的时候装逼的时刻,这一刻,你就是天选!

首先,我们从生产者的发送讲起,众所周知,生产者在发送服务端时会将相同 Partition 的放到一起,具体可见:Kafka 生产者全流程

所以我们的客户端与服务端的请求如下:


从上面我们可以看到,客户端(Producer)向服务端发送了 1、2、3 总共三条数据且三条数据处于一个 Partition

对于这三条数据来说,发送时是有序的,按照 1、2、3 的顺序,服务端落日志肯定也是有序的 1、2、3

问题来了,我们上面讲了客户端的请求都会被扔到 requestQueue 中,让 KafkaRequestHandler 去通过 KafkaApis 处理并将响应扔到 responseQueues

假如,我们全程没有不去删除 OP_READ 事件监听,会发生什么情况?大家可以想一下,给个提示:KafkaRequestHandler是多线程的

如上图所示,如果我们 不去删除 OP_READ 事件监听的话,我们的 1、2、3 三条信息会都放入到requestQueue 中,那么我们的 KafkaRequestHandler 去拉取的时候,会出现乱序的现象。

比如,我们三个 KafkaRequestHandler 分别拉取到一条消息:

这个时候,三个 KafkaRequestHandler 线程同时去调用 KafkaApis 落日志,那么这种方式怎么可能保证有序性呢?

kafka 的开发者采取了 mute 的解决方式,将所有接受的事件先放到 kernel 中,

每次只取一个请求,取完就关闭,等该请求的 response 过来后,再重新增加 OP_READ 事件的监听。

通过上述的方式,kafka 做到了分区落日志的有序性。


四、总结

这一篇文章主要从 Kafka 服务端的网络架构入手,剖析了服务端网络如何连接、如何处理、如何返回的。

1 + N + M 的架构思想

  • 1Acceptor
  • NProcessor
  • MKafkaRequestHandler

其实,看过博主 Netty 系列的读者应该可以感觉到,Kafka 服务端的这种网络架构正是著名的 Reactor模型

对应关系如下:

  • boss ====》Acceptor ===》前台
  • work ====》Processor ===》服务员

这里讲一个故事更形象化一些:

  • 当你去酒店住宿的时候,首先需要去前台登记入住手续,

登记完成后,前台会给你一个房间的钥匙。这个就相当于我们连接初始化连接的时候,boss 为刚连接进来的客户端分配 SocketChannel。

之后,前台会让服务员领你去房间,如果你有什么需要,都可以跟这个服务员说。这个相当于我们的 boss 将该客户端的连接交给了 work 线程,任何的业务处理都交由 work 线程去做。









相关文章
|
1天前
|
消息中间件 存储 缓存
Kafka【基础知识 01】消息队列介绍+Kafka架构及核心概念(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 01】消息队列介绍+Kafka架构及核心概念(图片来源于网络)
123 2
|
1天前
|
消息中间件 存储 缓存
Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
51 1
|
8月前
|
消息中间件 缓存 Java
聊聊 Kafka: Producer 的网络模型
聊聊 Kafka: Producer 的网络模型
|
消息中间件 缓存 Java
【Kafka从成神到升仙系列 五】面试官问我 Kafka 生产者的网络架构,我直接开始从源码背起.......
【Kafka从成神到升仙系列 五】面试官问我 Kafka 生产者的网络架构,我直接开始从源码背起.......
【Kafka从成神到升仙系列 五】面试官问我 Kafka 生产者的网络架构,我直接开始从源码背起.......
|
消息中间件 存储 缓存
【Kafka源码】万字长文详解Kafka网络模型、副本机制(下)
【Kafka源码】万字长文详解Kafka网络模型、副本机制
140 0
【Kafka源码】万字长文详解Kafka网络模型、副本机制(下)
|
消息中间件 缓存 前端开发
【Kafka源码】万字长文详解Kafka网络模型、副本机制(上)
【Kafka源码】万字长文详解Kafka网络模型、副本机制
177 0
【Kafka源码】万字长文详解Kafka网络模型、副本机制(上)
|
消息中间件 Kafka
多网络情况下,Kafka客户端如何选择合适的网络发起请求
我们都知道, 每个Broker都可以配置多个监听器, 用来用于网络分流。 相关知识请看:一文搞懂Kafka中的listeners和advertised.listeners以及其他通信配置
多网络情况下,Kafka客户端如何选择合适的网络发起请求
|
消息中间件 Kafka
Kafka是如何应用NIO实现网络通信的?(下)
Kafka是如何应用NIO实现网络通信的?
102 0
Kafka是如何应用NIO实现网络通信的?(下)
|
消息中间件 缓存 监控
Kafka是如何应用NIO实现网络通信的?(上)
Kafka是如何应用NIO实现网络通信的?
283 0
Kafka是如何应用NIO实现网络通信的?(上)
|
消息中间件 缓存 网络协议

热门文章

最新文章