背景
驱动学习:
kafka 是业界消息中间件之楷模,他从网络设计、副本同步机制设计的很优秀,业界也很多公司都使用了 kafka,当初我在腾讯的内部后端 serve 有些场景也是通过 Kafka 解耦和实现一次性语义,我们公司目前 kafka 使用的场景也比较多例如 tracking 收集数据,前端能效组采集接口数据,埋点、大数据平台实时流计算,基本都使用 kafka 作为采集端,完成数据上报。
心得感受:
第一次看 kafka broker 的源码的时候,我是一个不懂 scala 语言的人,刚开始看这个代码很难受,过来这个难受时期,你会发现写 kafka 的作者水平相当厉害,代码写的很好,网络模型简直业界楷模,包括他的注释都写的相当不错,所以学习 kafka 一定会有收获,今天把他分享的大家。
技术分析
一、Kafka Borker 网络机制
1.1 总体架构分析
总结下
,这里大概原理: accept 组件监听 9092 端口,当一个客户端发起建立连接请求时,accept 会完成一个新连接的建立,拿到对应的 channel,然后将这个 channel 交给某个 processor 线程,由它来监听处理这个连接的 read ,write 事件,就是请求与响应。当这个客户端发送一个请求的时候,processor 线程就能监听到,拿到对应的请求信息后,将请求信息塞到 RequestChannel 组件里面的请求队列中。然后一堆 RequestHandler 线程不停的从这个请求队列中获取请求信息,然后进行相应的业务逻辑处理,比如说发送消息的请求,它就会找到对应的 parittion,写到对应 partition 在磁盘的文件中。处理完成业务逻辑后会有个处理结果需要告诉客户端,这个时候 handler 线程会将处理结果塞到对应 processor 的响应队列中。processor 会不停的从自己对应的那个响应队列中获取响应,然后写回给对应的客户端。
1.2 网络接收端设计
1.2.1 接收端网络架构图
首先我们全局的分析下borker启动到监听总个过程是个怎么样的呢?
kafka broker 在启动的时候,会根据你配置的 listeners 初始化它的网络组件,用来接收外界的请求,这个 listeners 你可能没配置过,它默认的配置是 listeners=PLAINTEXT://:9092 就是告诉 kafka 使用哪个协议,监听哪个端口,如果我们没有特殊的要求的话,使用它默认的配置就可以了,顶多是修改下端口这块。这个 listeners 是支持配置多套的,就是你可以监听多个端口,一个 listener 就对应着内部这么一套网络模型,我们就介绍一个 listener 的,多个其实都是一样的,就是对应着多套网络模型而已首先会创建一个 accept 组件,这个组件对应着一个线程运行它,它主要是负责监听这个端口,打开一个 selector,使用的是 java 原生的 nio,打开一个 serverSocketChannel,然后专门监听 accept 事件,建立网络连接的接着会为这个 accept 组件建立创建几个 processor 组件,每个 processor 都对应这个一个线程运行,默认是 3 个,是由 num.network.threads 这个参数配置的,这几个 processor 专门是接收请求,发送响应的,每个 processor 都会打开一个 selector 用于事件监听。当 accept 组件收到一个新连接请求的时候会建立一个新连接,就会拿到一个 socketChannel,将这个连接交给 processor,processor 拿到这个 channel 之后就会注册到对应的 selector 上面,监听它的 read 事件,然后后续关于这个连接发送的请求就由某个 processor 线程来处理,处理完之后再将响应写回到这个连接对应的 channel 中。
1.2.2 接收端源码分析
kafkaServerStartable.startup
启动代码入口 main
socketServer = new SocketServer(config, metrics, kafkaMetricsTime) socketServer.startup()
创建 kafka socket
def startup() { this.synchronized { ......... endpoints.values.foreach { endpoint => for (i <- processorBeginIndex until processorEndIndex) { processors(i) = new Processor(i, time, maxRequestSize, requestChannel, connectionQuotas, connectionsMaxIdleMs, protocol, config.values, metrics ) } val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) acceptors.put(endpoint, acceptor) Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start() acceptor.awaitStartup() processorBeginIndex = processorEndIndex } }
这里就创建 processors 线程和 acceptor 线程
private[kafka] class Acceptor(val endPoint: EndPoint, val sendBufferSize: Int, val recvBufferSize: Int, brokerId: Int, processors: Array[Processor], connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { private val nioSelector = NSelector.open() val serverChannel = openServerSocket(endPoint.host, endPoint.port) this.synchronized { processors.foreach { processor => Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start() } } // TODO: 这里是重写run methods /** * Accept loop that checks for new connection attempts */ def run() { serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) startupComplete() try { var currentProcessor = 0 while (isRunning) { try { val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") // round robin to the next processor thread currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) } } } } catch { // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due // to a select operation on a specific channel or a bad request. We don't want the // the broker to stop responding to requests from other clients in these scenarios. case e: ControlThrowable => throw e case e: Throwable => error("Error occurred", e) } } } finally { debug("Closing server socket and selector.") swallowError(serverChannel.close()) swallowError(nioSelector.close()) shutdownComplete() } } /* * Create a server socket to listen for connections on. */ private def openServerSocket(host: String, port: Int): ServerSocketChannel = { val socketAddress = if(host == null || host.trim.isEmpty) new InetSocketAddress(port) else new InetSocketAddress(host, port) val serverChannel = ServerSocketChannel.open() serverChannel.configureBlocking(false) serverChannel.socket().setReceiveBufferSize(recvBufferSize) try { serverChannel.socket.bind(socketAddress) info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, serverChannel.socket.getLocalPort)) } catch { case e: SocketException => throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e) } serverChannel } /* * Accept a new connection */ def accept(key: SelectionKey, processor: Processor) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] val socketChannel = serverSocketChannel.accept() try { connectionQuotas.inc(socketChannel.socket().getInetAddress) socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) socketChannel.socket().setSendBufferSize(sendBufferSize) debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]" .format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress, socketChannel.socket.getSendBufferSize, sendBufferSize, socketChannel.socket.getReceiveBufferSize, recvBufferSize)) processor.accept(socketChannel) } catch { case e: TooManyConnectionsException => info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count)) close(socketChannel) } } /** * Wakeup the thread for selection. */ @Override def wakeup = nioSelector.wakeup() }
sacla 跟其他语言不太一样,他可以到处创建函数,然后再函数内部调用外部函数, 首先我们看看这段代码干了一些什么事情,他主要创建 acceptor 线程,然后注册 OP_READ 事件,将接收的线程交给 processor 线程处理,写框架主要在于细节看看是否处理完美,我们首先来看看连接过多怎么办,那么肯定有一个阈值,可以设置,如果超过阈值抛出异常就可以 jvm 自动会帮你捕获到异常,交给业务系统。
/** * Queue up a new connection for reading */ def accept(socketChannel: SocketChannel) { newConnections.add(socketChannel) wakeup() }
弄个一容器缓存起来连接通道,然后唤醒线程进度处理。
override def run() { startupComplete() while(isRunning) { try { // setup any new connections that have been queued up configureNewConnections() // register any new responses for writing processNewResponses() try { selector.poll(300) } catch { case e @ (_: IllegalStateException | _: IOException) => error("Closing processor %s due to illegal state or IO exception".format(id)) swallow(closeAll()) shutdownComplete() throw e } selector.completedReceives.asScala.foreach { receive => try { val channel = selector.channel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) requestChannel.sendRequest(req) } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error("Closing socket for " + receive.source + " because of error", e) close(selector, receive.source) } selector.mute(receive.source) } selector.completedSends.asScala.foreach { send => val resp = inflightResponses.remove(send.destination).getOrElse { throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`") } resp.request.updateRequestMetrics() selector.unmute(send.destination) } selector.disconnected.asScala.foreach { connectionId => val remoteHost = ConnectionId.fromString(connectionId).getOrElse { throw new IllegalStateException(s"connectionId has unexpected format: $connectionId") }.remoteHost // the channel has been closed by the selector but the quotas still need to be updated connectionQuotas.dec(InetAddress.getByName(remoteHost)) } } catch { // We catch all the throwables here to prevent the processor thread from exiting. We do this because // letting a processor exit might cause bigger impact on the broker. Usually the exceptions thrown would // be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel // or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop. case e : ControlThrowable => throw e case e : Throwable => error("Processor got uncaught exception.", e) } }
这个方法就是真正要干事的工人, 他主要实现了发送线程与接收线程解耦
apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator, kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker)
这里主要是 RequestHadler 默认参数# The number of threads doing disk I/O num.io.threads=8 配置这个参数接收与处理线程池的大小
def sendRequest(request: RequestChannel.Request) { requestQueue.put(request) }
这里就是等待消费线程进行消费,请看下面网络消费端的设计