这里思考问题,什么时候会用到延迟组件,同时哪些时候会用到延迟组件,同时为什么要用延迟组件?
从kafkaApi中,我们可以知道具体的逻辑实现都是在这里实现的:
caseApiKeys.PRODUCE=>handleProduceRequest(request)
这里以处理生产请求为例,看其操作流程:
/*** Handle a produce request* 处理生产请求*/defhandleProduceRequest(request: RequestChannel.Request) { //拿到生产者请求的内容valproduceRequest=request.body[ProduceRequest] //追加大小valnumBytesAppended=request.header.toStruct.sizeOf+request.sizeOfBodyInBytes// the callback for sending a produce response//发送生产响应的回调defsendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { valmergedResponseStatus=responseStatus++unauthorizedTopicResponses++nonExistingTopicResponsesvarerrorInResponse=false//生产响应回调defproduceResponseCallback(bandwidthThrottleTimeMs: Int) { //生产请求的ack = 0的请求/*** 如果生产者request.required.acks = 0,则不需要任何操作; 但是,如果在处理请求时出现任何错误,* 由于生产者不希望响应,则服务器将关闭套接字服务器,以便生产者客户端将知道发生了一些错误并刷新其元数据*/if (produceRequest.acks==0) { //acks = 0 即不需要acks,没啥需要特别做的// no operation needed if producer request.required.acks = 0; however, if there is any error in handling// the request, since no response is expected by the producer, the server will close socket server so that// the producer client will know that some error has happened and will refresh its metadataif (errorInResponse) { valexceptionsSummary=mergedResponseStatus.map { case (topicPartition, status) =>topicPartition->status.error.exceptionName }.mkString(", ") info( s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} "+s"from client id ${request.header.clientId} with ack=0\n"+s"Topic and partition to exceptions: $exceptionsSummary" ) //关闭连接closeConnection(request, newProduceResponse(mergedResponseStatus.asJava).errorCounts) } else { //发送没有操作响应的操作//发送NoOp响应豁免节流阀sendNoOpResponseExemptThrottle(request) } } else { //ack =1 //发送响应sendResponseMaybeThrottle(request, requestThrottleMs=>newProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs+requestThrottleMs)) } } //acks =-1的情况// When this callback is triggered, the remote API call has completedrequest.apiRemoteCompleteTimeNanos=time.nanosecondsquotas.produce.maybeRecordAndThrottle( request.session.sanitizedUser, request.header.clientId, numBytesAppended, produceResponseCallback) } //处理统计回调defprocessingStatsCallback(processingStats: Map[TopicPartition, RecordsProcessingStats]): Unit= { processingStats.foreach { case (tp, info) =>updateRecordsProcessingStats(request, tp, info) } } // call the replica manager to append messages to the replicas//副本管理进行追加消息调用 重要replicaManager.appendRecords( timeout=produceRequest.timeout.toLong, requiredAcks=produceRequest.acks, internalTopicsAllowed=internalTopicsAllowed, isFromClient=true, entriesPerPartition=authorizedRequestInfo, responseCallback=sendResponseCallback, processingStatsCallback=processingStatsCallback) // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;// hence we clear its data here in order to let GC reclaim its memory since it is already appended to log/*** 如果将请求放入炼狱延迟组件,它将具有保留的引用,因此无法进行垃圾收集;* 因此,我们在这里清除其数据是为了让GC回收其内存,因为它已被附加到日志中*/produceRequest.clearPartitionRecords() } }
这里主要关注三个情况,对acks的操作,这里acks=0,1,-1的情况,其中0表示不需要响应,此时不做任何操作,也即免录,等于1的时候,立即进行响应。这里我们重点关注-1的情况,因为此时会涉及到延迟组件操作。
//记录用户/客户ID更改了一些被限制的指标(产生/消耗的字节,请求处理时间等)如果违反配额,//则在延迟后调用回调,否则立即调用回调。节流时间计算可能被覆盖 子类。defmaybeRecordAndThrottle(sanitizedUser: String, clientId: String, value: Double, callback: Int=>Unit): Int= { if (quotasEnabled) { //获取或创建QuotaSensorsvalclientSensors=getOrCreateQuotaSensors(sanitizedUser, clientId) recordAndThrottleOnQuotaViolation(clientSensors, value, callback) } else { // Don't record any metrics if quotas are not enabled at any levelvalthrottleTimeMs=0callback(throttleTimeMs) throttleTimeMs } }
执行appendRecords追加消息操作
/*** Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;* the callback function will be triggered either when timeout or the required acks are satisfied;* if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.* 将消息追加到分区的leader副本,然后等待它们被复制到其他副本; 当超时或所需的acks满足时,将触发回调函数;* 如果回调函数本身已经在某个对象上同步,则传递此对象以避免死锁。*/defappendRecords(timeout: Long, requiredAcks: Short, internalTopicsAllowed: Boolean, isFromClient: Boolean, entriesPerPartition: Map[TopicPartition, MemoryRecords], responseCallback: Map[TopicPartition, PartitionResponse] =>Unit, delayedProduceLock: Option[Lock] =None, processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] =>Unit=_=> ()) { if (isValidRequiredAcks(requiredAcks)) { valsTime=time.milliseconds//追加到本地logvallocalProduceResults=appendToLocalLog(internalTopicsAllowed=internalTopicsAllowed, isFromClient=isFromClient, entriesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(time.milliseconds-sTime)) valproduceStatus=localProduceResults.map { case (topicPartition, result) =>topicPartition->ProducePartitionStatus( result.info.lastOffset+1, // required offsetnewPartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset)) // respon } //处理统计回调函数processingStatsCallback(localProduceResults.mapValues(_.info.recordsProcessingStats)) //延迟生产请求要求if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { // create delayed produce operation//创建延迟生产操作valproduceMetadata=ProduceMetadata(requiredAcks, produceStatus) //创建新的延迟生产valdelayedProduce=newDelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock) // create a list of (topic, partition) pairs to use as keys for this delayed produce operation//创建(主题,分区)对的列表,以用作此延迟的生产操作的键valproducerRequestKeys=entriesPerPartition.keys.map(newTopicPartitionOperationKey(_)).toSeq// try to complete the request immediately, otherwise put it into the purgatory// this is because while the delayed produce operation is being created, new// requests may arrive and hence make this operation completable.//尝试立即完成请求,否则将其放入炼狱,这是因为在创建延迟的生产操作时,新的请求可能会到达并因此使该操作可完成。delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) } else { // we can respond immediatelyvalproduceResponseStatus=produceStatus.mapValues(status=>status.responseStatus) responseCallback(produceResponseStatus) } } else { // If required.acks is outside accepted range, something is wrong with the client// Just return an error and don't handle the request at allvalresponseStatus=entriesPerPartition.map { case (topicPartition, _) =>topicPartition->newPartitionResponse(Errors.INVALID_REQUIRED_ACKS, LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset) } //响应回调responseCallback(responseStatus) } }
重点关注:DelayedProduce,这里可以看下图
从这张图中,我们可以看到延迟生产是属于延迟操作组件中的一部分。
classDelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, responseCallback: Map[TopicPartition, PartitionResponse] =>Unit, lockOpt: Option[Lock] =None) extendsDelayedOperation(delayMs, lockOpt)
可以看到延迟生产继承了延迟操作,也即它用于延迟操作中的所有方法。
DelayedOperation调用过程同时基于时间轮进行时间过期的检测操作。也即从这里我们可以看到DelayedProduce是协助副本管理器完成相应的延迟操作的,而副本管理器则主要是完成将生产者发送的消息写入到leader副本、管理follwer副本与leader副本之间的同步以及副本角色之间的转换。在上面的生产延迟中,我们可以看到在消息写入leader副本时需要DelayedProdue的协助。同时我们也可以看到:当生产请求的acks=-1时,意味着生产者需要等待该分区的所有副本都与leader副本同步完成之后再向生产者应答,此时必然会经历延迟操作。
也即DelayedProduce的作用则是协助副本管理器在acks=-1时,延迟回调responseCallback向生产者做出响应。
同时可以看到:DelayedProduce能够知晓的条件以及逻辑处理:
写操作发送异常,此时会更新该分区的ProduceResponsePartitionStatus.PartitionResponse.errorCode,同时更新acksPending=false
当分区Leader发生迁移时,此时需要更新该分区的生产分区状态和acksPending=false
ISR副本同步完成,Leader副本的HW高水位已大于requiredOffset。通过Partition.checkEnoughReplicaReachOffset处理后会修改DelayedProduce初始化时对PartitionResponse.errorCode所设置的默认值
也即如果需要调用延迟操作,需要先经过tryComplete,此时会进行相关条件的检查,如果满足,则执行forceComplete,然后完成延迟onComplete操作,然后执行响应。
从老外给生产延迟炼狱可以看到这个过程对于生产者来说是煎熬的,因为它需要等到主从副本同步再进行响应,这个过程确实有炼狱的感觉。