kafka学习六-生产延迟操作

简介: 这里思考问题,什么时候会用到延迟组件,同时哪些时候会用到延迟组件,同时为什么要用延迟组件?从kafkaApi中,我们可以知道具体的逻辑实现都是在这里实现的:DelayedOperation调用过程同时基于时间轮进行时间过期的检测操作。也即从这里我们可以看到DelayedProduce是协助副本管理器完成相应的延迟操作的,而副本管理器则主要是完成将生产者发送的消息写入到leader副本、管理follwer副本与leader副本之间的同步以及副本角色之间的转换。在上面的生产延迟中,我们可以看到在消息写入leader副本时需要DelayedProdue的协助。同时我们也可以看到:当生产请求的

这里思考问题,什么时候会用到延迟组件,同时哪些时候会用到延迟组件,同时为什么要用延迟组件?

从kafkaApi中,我们可以知道具体的逻辑实现都是在这里实现的:

caseApiKeys.PRODUCE=>handleProduceRequest(request)

这里以处理生产请求为例,看其操作流程:

/*** Handle a produce request* 处理生产请求*/defhandleProduceRequest(request: RequestChannel.Request) {
//拿到生产者请求的内容valproduceRequest=request.body[ProduceRequest]
//追加大小valnumBytesAppended=request.header.toStruct.sizeOf+request.sizeOfBodyInBytes// the callback for sending a produce response//发送生产响应的回调defsendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
valmergedResponseStatus=responseStatus++unauthorizedTopicResponses++nonExistingTopicResponsesvarerrorInResponse=false//生产响应回调defproduceResponseCallback(bandwidthThrottleTimeMs: Int) {
//生产请求的ack = 0的请求/*** 如果生产者request.required.acks = 0,则不需要任何操作; 但是,如果在处理请求时出现任何错误,* 由于生产者不希望响应,则服务器将关闭套接字服务器,以便生产者客户端将知道发生了一些错误并刷新其元数据*/if (produceRequest.acks==0) { //acks = 0 即不需要acks,没啥需要特别做的// no operation needed if producer request.required.acks = 0; however, if there is any error in handling// the request, since no response is expected by the producer, the server will close socket server so that// the producer client will know that some error has happened and will refresh its metadataif (errorInResponse) {
valexceptionsSummary=mergedResponseStatus.map { case (topicPartition, status) =>topicPartition->status.error.exceptionName          }.mkString(", ")
info(
s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} "+s"from client id ${request.header.clientId} with ack=0\n"+s"Topic and partition to exceptions: $exceptionsSummary"          )
//关闭连接closeConnection(request, newProduceResponse(mergedResponseStatus.asJava).errorCounts)
        } else {
//发送没有操作响应的操作//发送NoOp响应豁免节流阀sendNoOpResponseExemptThrottle(request)
        }
      } else { //ack =1 //发送响应sendResponseMaybeThrottle(request, requestThrottleMs=>newProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs+requestThrottleMs))
      }
    }
//acks =-1的情况// When this callback is triggered, the remote API call has completedrequest.apiRemoteCompleteTimeNanos=time.nanosecondsquotas.produce.maybeRecordAndThrottle(
request.session.sanitizedUser,
request.header.clientId,
numBytesAppended,
produceResponseCallback)
  }
//处理统计回调defprocessingStatsCallback(processingStats: Map[TopicPartition, RecordsProcessingStats]): Unit= {
processingStats.foreach { case (tp, info) =>updateRecordsProcessingStats(request, tp, info)
    }
  }
// call the replica manager to append messages to the replicas//副本管理进行追加消息调用  重要replicaManager.appendRecords(
timeout=produceRequest.timeout.toLong,
requiredAcks=produceRequest.acks,
internalTopicsAllowed=internalTopicsAllowed,
isFromClient=true,
entriesPerPartition=authorizedRequestInfo,
responseCallback=sendResponseCallback,
processingStatsCallback=processingStatsCallback)
// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;// hence we clear its data here in order to let GC reclaim its memory since it is already appended to log/*** 如果将请求放入炼狱延迟组件,它将具有保留的引用,因此无法进行垃圾收集;* 因此,我们在这里清除其数据是为了让GC回收其内存,因为它已被附加到日志中*/produceRequest.clearPartitionRecords()
  }
}

这里主要关注三个情况,对acks的操作,这里acks=0,1,-1的情况,其中0表示不需要响应,此时不做任何操作,也即免录,等于1的时候,立即进行响应。这里我们重点关注-1的情况,因为此时会涉及到延迟组件操作。

//记录用户/客户ID更改了一些被限制的指标(产生/消耗的字节,请求处理时间等)如果违反配额,//则在延迟后调用回调,否则立即调用回调。节流时间计算可能被覆盖 子类。defmaybeRecordAndThrottle(sanitizedUser: String, clientId: String, value: Double, callback: Int=>Unit): Int= {
if (quotasEnabled) {
//获取或创建QuotaSensorsvalclientSensors=getOrCreateQuotaSensors(sanitizedUser, clientId)
recordAndThrottleOnQuotaViolation(clientSensors, value, callback)
  } else {
// Don't record any metrics if quotas are not enabled at any levelvalthrottleTimeMs=0callback(throttleTimeMs)
throttleTimeMs  }
}

执行appendRecords追加消息操作

/*** Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;* the callback function will be triggered either when timeout or the required acks are satisfied;* if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.* 将消息追加到分区的leader副本,然后等待它们被复制到其他副本; 当超时或所需的acks满足时,将触发回调函数;* 如果回调函数本身已经在某个对象上同步,则传递此对象以避免死锁。*/defappendRecords(timeout: Long,
requiredAcks: Short,
internalTopicsAllowed: Boolean,
isFromClient: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] =>Unit,
delayedProduceLock: Option[Lock] =None,
processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] =>Unit=_=> ()) {
if (isValidRequiredAcks(requiredAcks)) {
valsTime=time.milliseconds//追加到本地logvallocalProduceResults=appendToLocalLog(internalTopicsAllowed=internalTopicsAllowed,
isFromClient=isFromClient, entriesPerPartition, requiredAcks)
debug("Produce to local log in %d ms".format(time.milliseconds-sTime))
valproduceStatus=localProduceResults.map { case (topicPartition, result) =>topicPartition->ProducePartitionStatus(
result.info.lastOffset+1, // required offsetnewPartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset)) // respon     }
//处理统计回调函数processingStatsCallback(localProduceResults.mapValues(_.info.recordsProcessingStats))
//延迟生产请求要求if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
// create delayed produce operation//创建延迟生产操作valproduceMetadata=ProduceMetadata(requiredAcks, produceStatus)
//创建新的延迟生产valdelayedProduce=newDelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation//创建(主题,分区)对的列表,以用作此延迟的生产操作的键valproducerRequestKeys=entriesPerPartition.keys.map(newTopicPartitionOperationKey(_)).toSeq// try to complete the request immediately, otherwise put it into the purgatory// this is because while the delayed produce operation is being created, new// requests may arrive and hence make this operation completable.//尝试立即完成请求,否则将其放入炼狱,这是因为在创建延迟的生产操作时,新的请求可能会到达并因此使该操作可完成。delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
     } else {
// we can respond immediatelyvalproduceResponseStatus=produceStatus.mapValues(status=>status.responseStatus)
responseCallback(produceResponseStatus)
     }
   } else {
// If required.acks is outside accepted range, something is wrong with the client// Just return an error and don't handle the request at allvalresponseStatus=entriesPerPartition.map { case (topicPartition, _) =>topicPartition->newPartitionResponse(Errors.INVALID_REQUIRED_ACKS,
LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
     }
//响应回调responseCallback(responseStatus)
   }
 }

重点关注:DelayedProduce,这里可以看下图

从这张图中,我们可以看到延迟生产是属于延迟操作组件中的一部分。

微信图片_20221214025834.jpg


classDelayedProduce(delayMs: Long,
produceMetadata: ProduceMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicPartition, PartitionResponse] =>Unit,
lockOpt: Option[Lock] =None)
extendsDelayedOperation(delayMs, lockOpt)

可以看到延迟生产继承了延迟操作,也即它用于延迟操作中的所有方法。

                                             微信图片_20221214025830.jpg


      DelayedOperation调用过程同时基于时间轮进行时间过期的检测操作。也即从这里我们可以看到DelayedProduce是协助副本管理器完成相应的延迟操作的,而副本管理器则主要是完成将生产者发送的消息写入到leader副本、管理follwer副本与leader副本之间的同步以及副本角色之间的转换。在上面的生产延迟中,我们可以看到在消息写入leader副本时需要DelayedProdue的协助。同时我们也可以看到:当生产请求的acks=-1时,意味着生产者需要等待该分区的所有副本都与leader副本同步完成之后再向生产者应答,此时必然会经历延迟操作。

     也即DelayedProduce的作用则是协助副本管理器在acks=-1时,延迟回调responseCallback向生产者做出响应。

同时可以看到:DelayedProduce能够知晓的条件以及逻辑处理:

写操作发送异常,此时会更新该分区的ProduceResponsePartitionStatus.PartitionResponse.errorCode,同时更新acksPending=false

当分区Leader发生迁移时,此时需要更新该分区的生产分区状态和acksPending=false

ISR副本同步完成,Leader副本的HW高水位已大于requiredOffset。通过Partition.checkEnoughReplicaReachOffset处理后会修改DelayedProduce初始化时对PartitionResponse.errorCode所设置的默认值

     也即如果需要调用延迟操作,需要先经过tryComplete,此时会进行相关条件的检查,如果满足,则执行forceComplete,然后完成延迟onComplete操作,然后执行响应。

从老外给生产延迟炼狱可以看到这个过程对于生产者来说是煎熬的,因为它需要等到主从副本同步再进行响应,这个过程确实有炼狱的感觉。


目录
相关文章
|
2月前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
971 0
|
23天前
|
消息中间件 Java Kafka
kafka Linux环境搭建安装及命令创建队列生产消费消息
kafka Linux环境搭建安装及命令创建队列生产消费消息
36 4
|
27天前
|
消息中间件 存储 缓存
面试题Kafka问题之Kafka的生产消费基本流程如何解决
面试题Kafka问题之Kafka的生产消费基本流程如何解决
31 1
|
1月前
|
消息中间件 存储 Kafka
微服务分布问题之Kafka分区的副本和分布如何解决
微服务分布问题之Kafka分区的副本和分布如何解决
|
1月前
|
消息中间件 Kafka 数据处理
实时计算 Flink版操作报错合集之使用kafka connector时,报错:java.lang.ClassNotFoundException,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
消息中间件 Kafka 程序员
彻底搞懂Kafka生产消费流程,这篇文章就够了!
```markdown 🚀 Kafka 生产消费流程揭秘:Producer 创建守护线程Sender,消息经拦截器→序列化器→分区器→缓冲区。批量发送基于batch.size或linger.ms条件。acks参数控制可靠性,从0(最快但不可靠)到all(最可靠)。消息重试和元数据返回确保不丢失。关注“软件求生”公众号,探索更多技术! ```
43 1
|
2月前
|
消息中间件 Java Kafka
kafka 磁盘扩容与数据均衡操作代码
Kafka 的磁盘扩容和数据均衡是与保证Kafka集群可用性和性能相关的两个重要方面。在 Kafka 中,分区数据的存储和平衡对集群的运行至关重要。以下是有关Kafka磁盘扩容和数据均衡的一些建议
33 1
|
2月前
|
消息中间件 Kafka
KafKa脚本操作
KafKa脚本操作
14 1
|
2月前
|
消息中间件 算法 Java
go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
|
2月前
|
消息中间件 JSON Kafka
实时计算 Flink版操作报错合集之kafka源表没有指定group.id,遇到报错,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。

热门文章

最新文章