Kafka网络模型和通信流程剖析

简介:

Kafka网络模型和通信流程剖析
1.概述
最近有同学在学习Kafka的网络通信这块内容时遇到一些疑问,关于网络模型和通信流程的相关内容,这里笔者将通过这篇博客为大家来剖析一下这部分内容。

2.内容
Kafka系统作为一个Message Queue,涉及到的网络通信主要包含以下两个方面:

Pull:Consumer从消息队列中拉取消息数据;
Push:Producer往消息队列中推送消息数据。
要实现高性能的网络通信,可以使用更加底层的TCP协议或者UDP协议来实现。Kafka在Producer、Broker、Consumer之间设计了一套基于TCP层的通信协议,这套协议完全是为了Kafka系统自身需求而定制实现的。

提示:
这里需要注意的是,由于UDP协议是一种不可靠的传输协议,所以Kafka系统采用TCP协议作为服务间的通信协议。
2.1 基本数据类型
通信协议中的基本数据类型分为以下几种:

定长数据类型:例如,int8、int16、int32和、int64,对应到Java语言中,分别是byte、short、int和long
可变数据类型:例如,Java语言中Map、List等
数组:例如,Java语言中的int[]、String[]等
2.2 通信模型
Kafka系统采用的是Reactor多线程模型,即通过一个Acceptor线程处理所有的新连接,通过多个Processor线程对请求进行处理(比如解析协议、封装请求、、转发等)。

提示:
Reactor是一种事件模型,可以将请求提交到一个或者多个服务程序中进行处理。
当收到Client的请求后,Server处理程序使用多路分发策略,由一个非阻塞的线程来接收所有的请求,然后将这些请求转发到对应的工作线程中进行处理。
之后,在Kafka的版本迭代中,新增了一个Handler模块,它通过指定的线程数对请求进行处理。Handler和Processor之间通过一个Block Queue进行连接。如下图所示:

这里 Acceptor是一个继承于AbstractServerThread的线程类,Acceptor的主要目的是监听并且接收Client的请求,同时,建立数据传输通道(SocketChannel),然后通过轮询的方式交给一个Processor处理。其核心代码在Acceptor的run方法中,代码如下:

def run() {

serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
  var currentProcessor = 0
  while (isRunning) {
    try {
      val ready = nioSelector.select(500)
      if (ready > 0) {
        val keys = nioSelector.selectedKeys()
        val iter = keys.iterator()
        while (iter.hasNext && isRunning) {
          try {
            val key = iter.next
            iter.remove()
            if (key.isAcceptable)
              accept(key, processors(currentProcessor))
            else
              throw new IllegalStateException("Unrecognized key state for acceptor thread.")

            // round robin to the next processor thread
            currentProcessor = (currentProcessor + 1) % processors.length
          } catch {
            case e: Throwable => error("Error while accepting connection", e)
          }
        }
      }
    }
    catch {
      // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
      // to a select operation on a specific channel or a bad request. We don't want
      // the broker to stop responding to requests from other clients in these scenarios.
      case e: ControlThrowable => throw e
      case e: Throwable => error("Error occurred", e)
    }
  }
} finally {
  debug("Closing server socket and selector.")
  swallowError(serverChannel.close())
  swallowError(nioSelector.close())
  shutdownComplete()
}

}

这里还有一个块通道(BlockingChannel),用于连接Processor和Handler,其代码如下所示:

class BlockingChannel( val host: String,

                   val port: Int, 
                   val readBufferSize: Int, 
                   val writeBufferSize: Int, 
                   val readTimeoutMs: Int ) extends Logging {

private var connected = false
private var channel: SocketChannel = null
private var readChannel: ReadableByteChannel = null
private var writeChannel: GatheringByteChannel = null
private val lock = new Object()
private val connectTimeoutMs = readTimeoutMs
private var connectionId: String = ""

def connect() = lock synchronized {

if(!connected) {
  try {
    channel = SocketChannel.open()
    if(readBufferSize > 0)
      channel.socket.setReceiveBufferSize(readBufferSize)
    if(writeBufferSize > 0)
      channel.socket.setSendBufferSize(writeBufferSize)
    channel.configureBlocking(true)
    channel.socket.setSoTimeout(readTimeoutMs)
    channel.socket.setKeepAlive(true)
    channel.socket.setTcpNoDelay(true)
    channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)

    writeChannel = channel
    // Need to create a new ReadableByteChannel from input stream because SocketChannel doesn't implement read with timeout
    // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
    readChannel = Channels.newChannel(channel.socket().getInputStream)
    connected = true
    val localHost = channel.socket.getLocalAddress.getHostAddress
    val localPort = channel.socket.getLocalPort
    val remoteHost = channel.socket.getInetAddress.getHostAddress
    val remotePort = channel.socket.getPort
    connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort
    // settings may not match what we requested above
    val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d), connectTimeoutMs = %d."
    debug(msg.format(channel.socket.getSoTimeout,
                     readTimeoutMs,
                     channel.socket.getReceiveBufferSize, 
                     readBufferSize,
                     channel.socket.getSendBufferSize,
                     writeBufferSize,
                     connectTimeoutMs))

  } catch {
    case _: Throwable => disconnect()
  }
}

}

def disconnect() = lock synchronized {

if(channel != null) {
  swallow(channel.close())
  swallow(channel.socket.close())
  channel = null
  writeChannel = null
}
// closing the main socket channel *should* close the read channel
// but let's do it to be sure.
if(readChannel != null) {
  swallow(readChannel.close())
  readChannel = null
}
connected = false

}

def isConnected = connected

def send(request: RequestOrResponse): Long = {

if(!connected)
  throw new ClosedChannelException()

val send = new RequestOrResponseSend(connectionId, request)
send.writeCompletely(writeChannel)

}

def receive(): NetworkReceive = {

if(!connected)
  throw new ClosedChannelException()

val response = readCompletely(readChannel)
response.payload().rewind()

response

}

private def readCompletely(channel: ReadableByteChannel): NetworkReceive = {

val response = new NetworkReceive
while (!response.complete())
  response.readFromReadableChannel(channel)
response

}

}

3.通信过程
Kafka系统的通信框架也是经过了不同的版本迭代的。例如,在Kafka老的版本中,以NIO作为网络通信的基础,通过将多个Socket连接注册到一个Selector上进行监听,只用一个线程就能管理多个连接,这极大的节省了多线程的资源开销。

在Kafka之后的新版本中,依然以NIO作为网络通信的基础,也使用了Reactor多线程模型,不同的是,新版本将具体的业务处理模块(Handler模块)独立出去了,并用单独的线程池进行控制。如下图所示:

通过上图,我们可以总结一下Kafka的通信流程:

Client向Server发送请求时,Acceptor负责接收TCP请求,连接成功后传递给Processor线程;
Processor线程接收到新的连接后,将其注册到自身的Selector中,并监听READ事件
当Client在当前连接对象上写入数据时,会触发READ事件,根据TCP协议调用Handler进行处理
Handler处理完成后,可能会有返回值给Client,并将Handler返回的结果绑定Response端进行发送
通过总结和分析,我们可以知道Kafka新版中独立Handler模块,用这样以下几点优势:

能够单独指定Handler的线程数,便于调优和管理
防止一个过大的请求阻塞一个Processor线程
Request、Handler、Response之间都是通过队列来进行连接的,这样它们彼此之间不存在耦合现象,对提升Kafka系统的性能很有帮助
这里需要注意的是,在Kafka的网络通信中,RequestChannel为Processor线程与Handler线程之间数据交换提供了一个缓冲区,是通信中Request和Response缓存的地方。因此,其作用就是在通信中起到了一个数据缓冲队列的作用。Processor线程将读取到的请求添加至RequestChannel的全局队列(requestQueue)中,Handler线程从请求队列中获取并处理,处理完成后将Response添加至RequestChannel的响应队列(responseQueues)中,通过responseListeners唤醒对应的Processor线程,最后Processor线程从响应队列中取出后发送到Client。实现代码如下:

class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil
private val requestQueue = new ArrayBlockingQueueRequestChannel.Request
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors)

responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()

newGauge(

"RequestQueueSize",
new Gauge[Int] {
  def value = requestQueue.size
}

)

newGauge("ResponseQueueSize", new Gauge[Int]{

def value = responseQueues.foldLeft(0) {(total, q) => total + q.size()}

})

for (i <- 0 until numProcessors) {

newGauge("ResponseQueueSize",
  new Gauge[Int] {
    def value = responseQueues(i).size()
  },
  Map("processor" -> i.toString)
)

}

/* Send a request to be handled, potentially blocking until there is room in the queue for the request /
def sendRequest(request: RequestChannel.Request) {

requestQueue.put(request)

}

/* Send a response back to the socket server to be sent over the network /
def sendResponse(response: RequestChannel.Response) {

responseQueues(response.processor).put(response)
for(onResponse <- responseListeners)
  onResponse(response.processor)

}

/* No operation to take for the request, need to read more over the network /
def noOperation(processor: Int, request: RequestChannel.Request) {

responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.NoOpAction))
for(onResponse <- responseListeners)
  onResponse(processor)

}

/* Close the connection for the request /
def closeConnection(processor: Int, request: RequestChannel.Request) {

responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.CloseConnectionAction))
for(onResponse <- responseListeners)
  onResponse(processor)

}

/* Get the next request or block until specified time has elapsed /
def receiveRequest(timeout: Long): RequestChannel.Request =

requestQueue.poll(timeout, TimeUnit.MILLISECONDS)

/* Get the next request or block until there is one /
def receiveRequest(): RequestChannel.Request =

requestQueue.take()

/* Get a response for the given processor if there is one /
def receiveResponse(processor: Int): RequestChannel.Response = {

val response = responseQueues(processor).poll()
if (response != null)
  response.request.responseDequeueTimeMs = Time.SYSTEM.milliseconds
response

}

def addResponseListener(onResponse: Int => Unit) {

responseListeners ::= onResponse

}

def shutdown() {

requestQueue.clear()

}
}

4.总结
通过认真阅读和分析Kafka的网络通信层代码,可以收获不少关于NIO的网络通信知识。通过对Kafka的源代码进行阅读和学习,这对大规模Kafka集群性能的调优和问题定位排查是很有帮助的。

原文地址https://www.cnblogs.com/smartloli/p/12287130.html

相关文章
|
14天前
|
安全 测试技术 网络架构
【专栏】编写网络设备割接方案的七个步骤,包括明确割接目标、收集信息、制定计划、设计流程、风险评估、准备测试环境和编写文档。
【4月更文挑战第28天】本文介绍了编写网络设备割接方案的七个步骤,包括明确割接目标、收集信息、制定计划、设计流程、风险评估、准备测试环境和编写文档。通过实际案例分析,展示了如何成功完成割接,确保业务连续性和稳定性。遵循这些步骤,可提高割接成功率,为公司的网络性能和安全提供保障。
|
2天前
|
网络协议 算法 Java
【Java网络编程】网络编程概述、UDP通信(DatagramPacket 与 DatagramSocket)
【Java网络编程】网络编程概述、UDP通信(DatagramPacket 与 DatagramSocket)
15 3
|
3天前
|
消息中间件 存储 网络协议
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
Apache Kafka的单分区写入性能在某些严格保序场景中至关重要,但其现有线程模型限制了性能发挥。本文分析了Kafka的串行处理模型,包括SocketServer、KafkaChannel、RequestChannel等组件,指出其通过KafkaChannel状态机确保请求顺序处理,导致处理效率低下。AutoMQ提出流水线处理模型,简化KafkaChannel状态机,实现网络解析、校验定序和持久化的阶段间并行化,提高处理效率。测试结果显示,AutoMQ的极限吞吐是Kafka的2倍,P99延迟降低至11ms。
11 3
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
|
3天前
|
安全
AC/DC电源模块在通信与网络设备中的应用的研究
AC/DC电源模块在通信与网络设备中的应用的研究
AC/DC电源模块在通信与网络设备中的应用的研究
|
3天前
BOSHIDA AC/DC电源模块在通信与网络设备中的应用研究
BOSHIDA AC/DC电源模块在通信与网络设备中的应用研究
BOSHIDA AC/DC电源模块在通信与网络设备中的应用研究
|
6天前
|
机器学习/深度学习 数据可视化 算法
R语言神经网络与决策树的银行顾客信用评估模型对比可视化研究
R语言神经网络与决策树的银行顾客信用评估模型对比可视化研究
|
6天前
|
机器学习/深度学习 数据可视化 数据挖掘
R语言神经网络模型金融应用预测上证指数时间序列可视化
R语言神经网络模型金融应用预测上证指数时间序列可视化
|
6天前
|
机器学习/深度学习 数据可视化 算法
SPSS Modeler决策树和神经网络模型对淘宝店铺服装销量数据预测可视化|数据分享
SPSS Modeler决策树和神经网络模型对淘宝店铺服装销量数据预测可视化|数据分享
|
6天前
|
监控 网络协议 安全
计算机网络概述及 参考模型
计算机网络概述及 参考模型
|
12天前
|
机器学习/深度学习 PyTorch 算法框架/工具
Python用GAN生成对抗性神经网络判别模型拟合多维数组、分类识别手写数字图像可视化
Python用GAN生成对抗性神经网络判别模型拟合多维数组、分类识别手写数字图像可视化

热门文章

最新文章