在kafka启动时,首先执行的broker的操作,然后接着会执行生产者操作,接着将生产者的消息放入到存储中,此时生产者和broker会进行交互,而消费者发送消息,接着消费者会和broker交互。前面我们知道kafka在kafkaApi中会处理具体的请求。首先,我们再次来看kafkaApi的handle,可以看到其入参的参数是RequestChannel.request,也即我们需要找到ReuqestChannel,回忆在RocketMQ中,我们也可以看到请求的参数:ChannelHandlerContext和request在Processor中。也即request.header.apiKey匹配到case样例函数。从Sender中,我们可以看到sendProducerData和poll两个方法中有构建请求和完整响应,此时可以看到kafkaApi中基于请求的,也即必然会有处理器。
因此首先在Sender中sendProducer中先将发送的消息构建成客户端请求ClientRequest,然后将其放入到InFlightRequest中缓存,当收到响应或者出现异常调用RequestCompletionHandler对象。根据其send方法可以看到在KafkaChannel中会设置放送setSend,此时可以看到其在传输层添加了写入的操作动作,而根据Netty基于事件驱动的方式,其就是告诉了网络传输层,消息发送之后,可以进行写入操作了。进入到SocketServer中,此时我们知道SocketServer中包含接收器Acceptor和若干个处理器Processors,而此时的处理器Processor已经启动,因此可以看到run方法中会将启动完整,同时进行轮询,如果running的话,会执行配置新的连接、处理配置新的响应和poll操作、处理完整接收或者发送,处理断开连接操作。对相应的请求进行匹配,从而到kafkaApi中进行具体的处理,然后poll操作中在KafkaChannel中执行写入writeTo和读操作read,此时会将消息写入或者从中获取消息。
/*** Top-level method that handles all requests and multiplexes to the right api* 顶级方法,用于处理所有请求并多路复用到正确的api*/defhandle(request: RequestChannel.Request) { try { //使用Scala的模式匹配,配置到每一个对应的请求,进行相应的处理trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};"+s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}") request.header.apiKeymatch { //匹配生产,处理生产请求caseApiKeys.PRODUCE=>handleProduceRequest(request) //匹配获取消息,处理获取请求caseApiKeys.FETCH=>handleFetchRequest(request) //匹配偏移量列表,处理偏移量列表请求caseApiKeys.LIST_OFFSETS=>handleListOffsetRequest(request) //匹配元数据,处理主体元数据请求caseApiKeys.METADATA=>handleTopicMetadataRequest(request) //匹配主和正在同步的副本 ISR:in sync replica 正在同步副本,AR: all replica 所有副本caseApiKeys.LEADER_AND_ISR=>handleLeaderAndIsrRequest(request) //停止复制,处理停止复制请求caseApiKeys.STOP_REPLICA=>handleStopReplicaRequest(request) //更新元数据caseApiKeys.UPDATE_METADATA=>handleUpdateMetadataRequest(request) //控制器关闭caseApiKeys.CONTROLLED_SHUTDOWN=>handleControlledShutdownRequest(request) //偏移量提交请求caseApiKeys.OFFSET_COMMIT=>handleOffsetCommitRequest(request) //偏移量获取请求caseApiKeys.OFFSET_FETCH=>handleOffsetFetchRequest(request) //找到协调器caseApiKeys.FIND_COORDINATOR=>handleFindCoordinatorRequest(request) //加入组请求caseApiKeys.JOIN_GROUP=>handleJoinGroupRequest(request) //心跳请求caseApiKeys.HEARTBEAT=>handleHeartbeatRequest(request) //离开组请求caseApiKeys.LEAVE_GROUP=>handleLeaveGroupRequest(request) //同步组请求caseApiKeys.SYNC_GROUP=>handleSyncGroupRequest(request) //处理描述组请求caseApiKeys.DESCRIBE_GROUPS=>handleDescribeGroupRequest(request) //组列表请求caseApiKeys.LIST_GROUPS=>handleListGroupsRequest(request) //处理Sasl握手请求caseApiKeys.SASL_HANDSHAKE=>handleSaslHandshakeRequest(request) caseApiKeys.API_VERSIONS=>handleApiVersionsRequest(request) //创建主题请求caseApiKeys.CREATE_TOPICS=>handleCreateTopicsRequest(request) //删除主题请求caseApiKeys.DELETE_TOPICS=>handleDeleteTopicsRequest(request) //删除消息请求caseApiKeys.DELETE_RECORDS=>handleDeleteRecordsRequest(request) //初始化生产者id请求caseApiKeys.INIT_PRODUCER_ID=>handleInitProducerIdRequest(request) //处理leaderEpoch请求的偏移caseApiKeys.OFFSET_FOR_LEADER_EPOCH=>handleOffsetForLeaderEpochRequest(request) //添加分区到事务请求caseApiKeys.ADD_PARTITIONS_TO_TXN=>handleAddPartitionToTxnRequest(request) //添加偏移量到事务请求caseApiKeys.ADD_OFFSETS_TO_TXN=>handleAddOffsetsToTxnRequest(request) caseApiKeys.END_TXN=>handleEndTxnRequest(request) caseApiKeys.WRITE_TXN_MARKERS=>handleWriteTxnMarkersRequest(request) caseApiKeys.TXN_OFFSET_COMMIT=>handleTxnOffsetCommitRequest(request) caseApiKeys.DESCRIBE_ACLS=>handleDescribeAcls(request) caseApiKeys.CREATE_ACLS=>handleCreateAcls(request) caseApiKeys.DELETE_ACLS=>handleDeleteAcls(request) caseApiKeys.ALTER_CONFIGS=>handleAlterConfigsRequest(request) caseApiKeys.DESCRIBE_CONFIGS=>handleDescribeConfigsRequest(request) caseApiKeys.ALTER_REPLICA_LOG_DIRS=>handleAlterReplicaLogDirsRequest(request) caseApiKeys.DESCRIBE_LOG_DIRS=>handleDescribeLogDirsRequest(request) caseApiKeys.SASL_AUTHENTICATE=>handleSaslAuthenticateRequest(request) //创建新分区请求caseApiKeys.CREATE_PARTITIONS=>handleCreatePartitionsRequest(request) caseApiKeys.CREATE_DELEGATION_TOKEN=>handleCreateTokenRequest(request) caseApiKeys.RENEW_DELEGATION_TOKEN=>handleRenewTokenRequest(request) caseApiKeys.EXPIRE_DELEGATION_TOKEN=>handleExpireTokenRequest(request) caseApiKeys.DESCRIBE_DELEGATION_TOKEN=>handleDescribeTokensRequest(request) //删除队请求caseApiKeys.DELETE_GROUPS=>handleDeleteGroupsRequest(request) } } catch { casee: FatalExitError=>throwecasee: Throwable=>handleError(request, e) } finally { request.apiLocalCompleteTimeNanos=time.nanoseconds } }
那我们需要弄懂的是,生产者和消费者、broker之间的联系。此时我们可以先弄清楚生产者和broker之间的联系。在SocketServer中,一个Acceptor中包含多个Processor线程,每个Processor线程拥有自己的Selector,主要用于从连接中读取请求和写回响应。Processor线程和Handler线程之间传递数据是通过RequestChannel完成的。在发送生产数据sendProducerData(now)中可以看到:
/*** Create a produce request from the given record batches* 从给定的消息批次创建生产请求*/privatevoidsendProduceRequest(longnow, intdestination, shortacks, inttimeout, List<ProducerBatch>batches) { Map<TopicPartition, MemoryRecords>produceRecordsByPartition=newHashMap<>(batches.size()); finalMap<TopicPartition, ProducerBatch>recordsByPartition=newHashMap<>(batches.size()); //查找创建消息集时使用的最低魔数版本byteminUsedMagic=apiVersions.maxUsableProduceMagic(); for (ProducerBatchbatch : batches) { if (batch.magic() <minUsedMagic) minUsedMagic=batch.magic(); } //循环批次for (ProducerBatchbatch : batches) { TopicPartitiontp=batch.topicPartition; MemoryRecordsrecords=batch.records(); /*** 必要时向下转换为使用的最小魔法。 通常,在生产者开始构建批处理的时间与我们发送请求的时间之间可能会有延迟,* 并且我们可能已根据过时的元数据选择了消息格式。 在最坏的情况下,我们乐观地选择使用新的消息格式,* 但是发现代理不支持它,因此需要在客户端上进行下转换,然后再发送。 这旨在处理集群升级周围的极端情况,* 在这些情况下,代理可能并不都支持相同的消息格式版本。* 例如,如果分区从支持新魔术版本的代理迁移到不支持新魔术版本的代理,则我们将需要转换。*/if (!records.hasMatchingMagic(minUsedMagic)) records=batch.records().downConvert(minUsedMagic, 0, time).records(); produceRecordsByPartition.put(tp, records); recordsByPartition.put(tp, batch); } StringtransactionalId=null; if (transactionManager!=null&&transactionManager.isTransactional()) { transactionalId=transactionManager.transactionalId(); } //重要,请求构建 类似createProduceRequestProduceRequest.BuilderrequestBuilder=ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout, produceRecordsByPartition, transactionalId); //处理生产响应RequestCompletionHandlercallback=newRequestCompletionHandler() { publicvoidonComplete(ClientResponseresponse) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; StringnodeId=Integer.toString(destination); //新的客户端请求,放入请求 重要ClientRequestclientRequest=client.newClientRequest(nodeId, requestBuilder, now, acks!=0, callback); //发送消息client.send(clientRequest, now); log.trace("Sent produce request to {}: {}", nodeId, requestBuilder); }
发送消息的过程中会经历InFightReuqst:这个过程中会不断的充实ClientRequest
//进行消息发送privatevoiddoSend(ClientRequestclientRequest, booleanisInternalRequest, longnow, AbstractRequestrequest) { //节点idStringnodeId=clientRequest.destination(); //请求头信息RequestHeaderheader=clientRequest.makeHeader(request.version()); if (log.isDebugEnabled()) { intlatestClientVersion=clientRequest.apiKey().latestVersion(); if (header.apiVersion() ==latestClientVersion) { log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request, clientRequest.correlationId(), nodeId); } else { log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}", header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), nodeId); } } //发送消息Sendsend=request.toSend(nodeId, header); //创建flight请求 航班中的要求InFlightRequestinFlightRequest=newInFlightRequest( header, clientRequest.createdTimeMs(), clientRequest.destination(), clientRequest.callback(), clientRequest.expectResponse(), isInternalRequest, request, send, now); //添加,所有的客户端请求都会放入到InFlightRequestthis.inFlightRequests.add(inFlightRequest); //执行发送方法selector.send(inFlightRequest.send); }
可以看到在KafkaChannel中会设置放送setSend,此时可以看到其在传输层添加了写入的操作动作,而根据Netty基于事件驱动的方式,其就是告诉了网络传输层,消息发送之后,可以进行写入操作了。此时会告知kafka的broker,此时需要执行I/O写入操作了。由于此前broke通常会在服务器中比生产者和消费者早启动,因此请求一来,就会进入到接收器中,而接收器根据请求,将请求分发到处理器中,而处理器通过请求通道经过kafkaRequestHandlerPool,将请求转发到kafkaApi业务逻辑处理器层中处理每一个请求。
publicvoidsetSend(Sendsend) { if (this.send!=null) thrownewIllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is "+id); this.send=send; //传输层添加操作,也即写入操作this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); }
进入到SocketServer中,此时我们知道SocketServer中包含接收器Acceptor和若干个处理器Processors,而此时的处理器Processor已经启动,因此可以看到run方法中会将启动完整,同时进行轮询,如果running的话,会执行配置新的连接、处理配置新的响应和poll操作、处理完整接收或者发送,处理断开连接操作:
SocketServer#Processor#run
//重要 启动overridedefrun() { startupComplete() try { while (isRunning) { try { // setup any new connections that have been queued upconfigureNewConnections() // register any new responses for writingprocessNewResponses() poll() processCompletedReceives() processCompletedSends() processDisconnected() } catch { // We catch all the throwables here to prevent the processor thread from exiting. We do this because// letting a processor exit might cause a bigger impact on the broker. This behavior might need to be// reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would// be either associated with a specific socket channel or a bad request. These exceptions are caught and// processed by the individual methods above which close the failing channel and continue processing other// channels. So this catch block should only ever see ControlThrowables.casee: Throwable=>processException("Processor got uncaught exception.", e) } } } finally { debug("Closing selector - processor "+id) CoreUtils.swallow(closeAll(), this, Level.ERROR) shutdownComplete() } }
而此时,我们可以看到处理新的响应中
processNewResponses-> sendResponse
//发送响应protected[network] defsendResponse(response: RequestChannel.Response, responseSend: Send) { valconnectionId=response.request.context.connectionIdtrace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response") // `channel` can be None if the connection was closed remotely or if selector closed it for being idle for too longif (channel(connectionId).isEmpty) { warn(s"Attempting to send response via channel for which there is no open connection, connection id $connectionId") response.request.updateRequestMetrics(0L, response) } // Invoke send for closingChannel as well so that the send is failed and the channel closed properly and// removed from the Selector after discarding any pending staged receives.// `openOrClosingChannel` can be None if the selector closed the connection because it was idle for too longif (openOrClosingChannel(connectionId).isDefined) { //执行选择器发送操作 selector.send(responseSend) inflightResponses+= (connectionId->response) } }
而我们知道不管是发送消息还是消费消息都会执行重要的poll操作,因此可以看到poll操作中必然会执行读和写的操作。
发送响应:
/*** Do actual reads and writes to sockets.* 实际读取和写入套接字。*/publicList<ClientResponse>poll(longtimeout, longnow) { ensureActive(); if (!abortedSends.isEmpty()) { // If there are aborted sends because of unsupported version exceptions or disconnects,// handle them immediately without waiting for Selector#poll.List<ClientResponse>responses=newArrayList<>(); //处理中止发送handleAbortedSends(responses); //完整响应completeResponses(responses); returnresponses; } //元数据更新longmetadataTimeout=metadataUpdater.maybeUpdate(now); try { //执行poll操作this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOExceptione) { log.error("Unexpected error during I/O", e); } // process completed actions//处理完整操作longupdatedNow=this.time.milliseconds(); List<ClientResponse>responses=newArrayList<>(); //处理完整发送handleCompletedSends(responses, updatedNow); //处理完整接收handleCompletedReceives(responses, updatedNow); //处理不连接handleDisconnections(responses, updatedNow); //处理连接handleConnections(); handleInitiateApiVersionRequests(updatedNow); handleTimedOutRequests(responses, updatedNow); //执行完整响应操作completeResponses(responses); returnresponses; }
同时poll操作中会有写入和读的操作:
/*** handle any ready I/O on a set of selection keys* 在一组选择键上处理任何准备就绪的I/O** @param selectionKeys set of keys to handle* @param isImmediatelyConnected true if running over a set of keys for just-connected sockets* @param currentTimeNanos time at which set of keys was determined*/// package-private for testingvoidpollSelectionKeys(Set<SelectionKey>selectionKeys, booleanisImmediatelyConnected, longcurrentTimeNanos) { for (SelectionKeykey : determineHandlingOrder(selectionKeys)) { KafkaChannelchannel=channel(key); longchannelStartTimeNanos=recordTimePerConnection?time.nanoseconds() : 0; // register all per-connection metrics at oncesensors.maybeRegisterConnectionMetrics(channel.id()); if (idleExpiryManager!=null) idleExpiryManager.update(channel.id(), currentTimeNanos); booleansendFailed=false; try { /* complete any connections that have finished their handshake (either normally or immediately) *///完成所有已完成握手的连接(正常或立即)if (isImmediatelyConnected||key.isConnectable()) { if (channel.finishConnect()) { this.connected.add(channel.id()); this.sensors.connectionCreated.record(); SocketChannelsocketChannel= (SocketChannel) key.channel(); log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}", socketChannel.socket().getReceiveBufferSize(), socketChannel.socket().getSendBufferSize(), socketChannel.socket().getSoTimeout(), channel.id()); } elsecontinue; } //如果通道尚未准备好,请完成准备if (channel.isConnected() &&!channel.ready()) { try { //准备通道channel.prepare(); } catch (AuthenticationExceptione) { sensors.failedAuthentication.record(); throwe; } if (channel.ready()) //进行认证消息sensors.successfulAuthentication.record(); } //尝试读取attemptRead(key, channel); if (channel.hasBytesBuffered()) { keysWithBufferedRead.add(key); } /* if channel is ready write to any sockets that have space in their buffer and for which we have data *///如果通道已准备好,则写入缓冲区中有空间且有数据的任何套接字if (channel.ready() &&key.isWritable()) { Sendsend=null; try { //进行写操作send=channel.write(); } catch (Exceptione) { sendFailed=true; throwe; } if (send!=null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } /* cancel any defunct sockets */if (!key.isValid()) close(channel, CloseMode.GRACEFUL); }finally { maybeRecordTimePerConnection(channel, channelStartTimeNanos); } } }
在KafkaChannel中进行写和读操作:
//执行写入操作publicSendwrite() throwsIOException { Sendresult=null; //重点 send(send)if (send!=null&&send(send)) { result=send; send=null; } returnresult; } //执行写入操作privatebooleansend(Sendsend) throwsIOException { //执行写入操作,写入传输层操作,写完整后,移除操作keysend.writeTo(transportLayer); if (send.completed()) transportLayer.removeInterestOps(SelectionKey.OP_WRITE); returnsend.completed(); }
在KafkaChannel中进行读操作:
//尝试读取privatevoidattemptRead(SelectionKeykey, KafkaChannelchannel) throwsIOException { //if channel is ready and has bytes to read from socket or buffer, and has no//previous receive(s) already staged or otherwise in progress then read from it/*** 如果通道已准备好,并且有要从套接字或缓冲区读取的字节,并且尚未暂存或没有进行任何先前的接收,则从该通道读取*/if (channel.ready() && (key.isReadable() ||channel.hasBytesBuffered()) &&!hasStagedReceive(channel) &&!explicitlyMutedChannels.contains(channel)) { NetworkReceivenetworkReceive; //如果网络接收不为空,也即读取操作不为空,则while ((networkReceive=channel.read()) !=null) { madeReadProgressLastPoll=true; //添加数据和通道addToStagedReceives(channel, networkReceive); } if (channel.isMute()) { outOfMemory=true; //channel has muted itself due to memory pressure. } else { madeReadProgressLastPoll=true; } } } //网络接收 读操作publicNetworkReceiveread() throwsIOException { NetworkReceiveresult=null; if (receive==null) { //创建一个新的网络接收对象receive=newNetworkReceive(maxReceiveSize, id, memoryPool); } //进行接收操作receive(receive); //如果接收完整,则将结果进行返回if (receive.complete()) { receive.payload().rewind(); result=receive; receive=null; } elseif (receive.requiredMemoryAmountKnown() &&!receive.memoryAllocated() &&isInMutableState()) { //pool must be out of memory, mute ourselves.//缓冲池oom,执行静止操作mute(); } returnresult; } //进行接收privatelongreceive(NetworkReceivereceive) throwsIOException { returnreceive.readFrom(transportLayer); }
可以看到读写是采用的批量式的:GatheringByteChannel和ScatteringByteChannel
publiclongreadFrom(ScatteringByteChannelchannel) throwsIOException { returnreadFromReadableChannel(channel); } //使用ByteBuffer的方式写入publiclongwriteTo(GatheringByteChannelchannel) throwsIOException { //在通道中写入longwritten=channel.write(buffers); if (written<0) thrownewEOFException("Wrote negative bytes to channel. This shouldn't happen."); remaining-=written; //有待写pending=TransportLayers.hasPendingWrites(channel); returnwritten; }