Kafka生产者客户端几种异常Case详解

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 1生产者UserCallBack异常异常日志ERROR Error executing user-provided callback on message for topic-partition 'Topic1-0' (org.apache.kafka.clients.producer.internals.ProducerBatch)通常还会有具体的异常栈信息异常源码ProducerBatch#completeFutureAndFireCallbacks

提示:本文可能已过期,请点击原文查看:生产者客户端常见异常Case解决方案集锦

作者石臻臻, CSDN博客之星Top5Kafka Contributornacos Contributor华为云 MVP ,腾讯云TVP, 滴滴Kafka技术专家KnowStreaming


KnowStreaming  是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,当你导师带你参与开源!

1生产者UserCallBack异常

异常日志

ERROR Error executing user-provided callback on message for topic-partition 'Topic1-0' 
(org.apache.kafka.clients.producer.internals.ProducerBatch)
通常还会有具体的异常栈信息

异常源码

ProducerBatch#completeFutureAndFireCallbacks

 

private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
        // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
        produceFuture.set(baseOffset, logAppendTime, exception);
        // execute callbacks
        for (Thunk thunk : thunks) {
            try {
                if (exception == null) {
                    RecordMetadata metadata = thunk.future.value();
                    if (thunk.callback != null)
                        thunk.callback.onCompletion(metadata, null);
                } else {
                    if (thunk.callback != null)
                        thunk.callback.onCompletion(null, exception);
                }
            } catch (Exception e) {
                log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
            }
        }
        produceFuture.done();
    }

这段代码是生产者调用回调接口, 每条消息都有一个thunk.callback, 当你看到这个异常日志的时候就应该知道是用户自定义的callback抛出异常了。

日志里面有提示你是哪个TopicPartition有问题。

有什么影响:

影响的是你自身的回调业务逻辑。

那么消息是发送成功还是失败了呢? 判断消息是否发送成不是UserCallBack决定的。就算你这里抛异常了,那么消息该成功还是成功。

解决办法

UserCallback这个回调很重要,它是在整个I/O线程里面的,它的性能会影响这个生产者发送消息的性能。


解决问题的办法也很简单,根据抛出来的具体异常日志进行处理,比如空指针啊什么的。或者直接这个接口里面的逻辑用异步线程处理。

2消息累加器内存不够

异常日志

Failed to allocate memory within the configured max blocking time 60000 ms.

异常源码

BufferPool#allocate

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        try {
            // check if we have a free buffer of the right size pooled
            if (size == poolableSize && !this.free.isEmpty()){
                return this.free.pollFirst();
            }
            // now check if the request is immediately satisfiable with the
            // memory on hand or if we need to block
            int freeListSize = freeSize() * this.poolableSize;
            if (this.nonPooledAvailableMemory + freeListSize >= size) {
                //省略...
            } else {
                // 当内存不够的时候将会被阻塞阻塞
                int accumulated = 0;
                Condition moreMemory = this.lock.newCondition();
                try {
                    long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                    this.waiters.addLast(moreMemory);
                    // loop over and over until we have a buffer or have reserved
                    // enough memory to allocate one
                    // 当内存不够,则会阻塞,当有内存释放的时候会唤醒阻塞,继续内存分配
                    // 但是释放的内存不一定满足当前需要的内存size,则继续阻塞,等到下一次的内存继续释放,循环这个过程知道内存足够分配。
                    // 阻塞的最大时长maxTimeToBlockMs,注意:就算循环了多次唤醒分配,这个时候是从第一次开始算的,也就是累积时间超过这个时间就会超时
                    while (accumulated < size) {
                        long startWaitNs = time.nanoseconds();
                        long timeNs;
                        boolean waitingTimeElapsed;
                        try {
                            System.out.println("allocate开始阻塞,等待内存释放,剩余等待时间:="+remainingTimeToBlockNs);
                            waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                        } finally {
                            long endWaitNs = time.nanoseconds();
                            timeNs = Math.max(0L, endWaitNs - startWaitNs);
                            recordWaitTime(timeNs);
                        }
                        if (this.closed)
                            throw new KafkaException("Producer closed while allocating memory");
                        if (waitingTimeElapsed) {
                            this.metrics.sensor("buffer-exhausted-records").record();
                            throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                        }
                        remainingTimeToBlockNs -= timeNs;
                        // check if we can satisfy this request from the free list,
                        // otherwise allocate memory
                        if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                            // just grab a buffer from the free list
                            // 从内存池中获取第一个bytebuffer返回
                            buffer = this.free.pollFirst();
                            accumulated = size;
                            System.out.println("allocate 被唤醒,将bufferPool第一个pool返回并分配:size="+size);
                        } else {
                                //省略....
                        }
                    }
                } finally {
                    //省略....
                }
            }
        } finally {
                  //省略....
        }
    }



可以看到上面的代码是在尝试向消息累加器里面插入数据, 然后在分配内存的时候发现内存不够了, 不够怎么办呢?那么等呗, 等内存够了就可以插入了

什么叫够?为什么等了可能内存就够了?

关于这个你可以看下我之前写的文章 图解Kafka Producer中的消息缓存模型

消息累加器中的内存大小是配置buffer.memory(33554432 (32M))控制的 消息发送成功了之后,会将内存释放掉,那么累加器也就可以接着缓存消息了。

当然,也不能一直等待下去吧,所以这个最大等待时间是max.block.ms(6000)。

异常原因

消息累加器满了, 这个是最根本的原因,但是造成这个现象原因确有很多。

  1. 消息累加器设置的太小了
  2. 消息生产的速度太快了
  3. 消息发送的速度跟不上生产的速度

解决方案

上面的原因也只是笼统的描述, 比如说 消息发送速度跟不上生产速度,那么为何会造成这种情况呢,我们又得去深究。

解决方案也没有一个统一的办法,我们只能是根据具体现象去做调优尝试。

  1. buffer.memory 尝试设置大一点。具体调整多少我觉得看你能够分配给生产者多少内存,一般情况下越大越好,如果你想寻找一个比较合适的值,我建议你可以通过JMX指标buffer-available-bytes:当前消息累加器中可用缓冲区内存大小 来观察, 如果你经常看到这个值非常小,比如只有一个不到的batch.size的大小,说明内存设置太小了, 需要适当调整大一点。调整后如果在一个比较合理的范围,那么就差不多了。
  2. max.block.ms 调不调整我觉得问题不大, 默认等待1分钟感觉已经挺长了,当然如果你能接受更长的等待时间(延迟),适当的调整确实能够缓解一定的问题。但是笔者认为调整其他的参数更有意义。
  3. 检查一下自己的UserCallBack(用户回调)是不是有性能问题。因为用户回调接口是在I/O线程中执行的, 如果用户在该回调接口里面写的方法性能很差,会增加整个调用链路的时间, 链路不结束,消息了累加器中的消息就一直不能释放。
  4. 尝试将 max.in.flight.requests.per.connection设置大一点。 生产者针对每个Node的网络并发度是max.in.flight.requests.per.connection决定的。并发度小了消息发送速度就小, 累加器中的消息迟迟不能被发送。
  5. 检查是否被Broker限流了,适当调整限流值。可以通过JMX查看指标: ①.  produce-throttle-time-avg(请求被Broker限流的平均时间(毫秒)) ②. produce-throttle-time-max(请求被Broker限流的最长时间(毫秒))

监控

平时可以通过JMX指标(type=producer-metrics)buffer-exhausted-total(由于缓冲区耗尽而丢弃的记录发送总数)、buffer-exhausted-rate(由于缓冲区耗尽而丢弃的平均每秒记录发送数) 来监控是否出现了该种异常。

3Node网络异常

异常日志

[2022-04-25 13:42:23,457] WARN [Producer clientId=console-producer] Got error produce response with correlation id 47 on topic-partition Topic4-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
[2022-04-25 13:42:54,604] WARN [Producer clientId=console-producer] Received invalid metadata error in produce request on partition Topic4-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)

异常源码

Sender#completeBatch

 

if (error.exception() instanceof InvalidMetadataException) {
                if (error.exception() instanceof UnknownTopicOrPartitionException) {
                    log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
                            "topic-partition may not exist or the user may not have Describe access to it",
                        batch.topicPartition);
                } else {
                    log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " +
                            "to request metadata update now", batch.topicPartition, error.exception(response.errorMessage).toString());
                }
                metadata.requestUpdate();
            }

异常原因

出现上面2行警告日志的原因是分别是

Received invalid metadata error in produce request on partition Topic4-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now (org.apache.kafka.clients.producer.internals.Sender)

Request因为网络异常,开始重试,并打印了警告日志,并尝试更新元信息。一般这个日志同时会伴随着重试的日志。如下



Got error produce response with correlation id 47 on topic-partition Topic4-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION

但是,如果已经没有伴随重试的日志了,说明该Batch已经不满足重试条件了,后续会被处理完发送消息失败. 相关的日志如下



org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for Topic4-0:4010 ms has passed since batch creation

解决方案

网络都异常了,排查哪为啥网络异常。

4Batch 超出交付时间

异常日志



org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for Topic4-0:4010 ms has passed since batch creation

异常源码

Sender#sendProducerData

 

List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
        expiredBatches.addAll(expiredInflightBatches);
        if (!expiredBatches.isEmpty())
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {
            String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
                + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
            failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
            if (transactionManager != null && expiredBatch.inRetry()) {
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                transactionManager.markSequenceUnresolved(expiredBatch);
            }
        }

注意: 这里是标记了一些异常信息,然后通过UserCallBack返回,如果你没有定义在UserCallBack打印或者拦截器中打印,一般是看不到的。例如ErrorLoggingCallback这个回调类就打印了异常信息。

异常解释

这个异常是因为有一部分的ProducerBatch一直没有发送结果, 导致超出了最大交付时间delivery.timeout.ms 。这个时候会将这一部分ProducerBatch以发送失败处理。

判断是否超时的条件是:

【现在的时间 - Batch的创建时间 > 最大交付时间(delivery.timeout.ms) 】

关于本部分异常,强烈建议你先了解一下相关知识图解Kafka Producer 消息缓存模型

异常原因

造成这异常分两种情况。

1  、Batch一直处于正在发送中(inFlightBatches)

Batches的生命周期是:创建Batch -> 准备发送(inFlightBatches) -> 发送Request -> 处理Response -> 用户回调 -> 释放Batch(同时从inFlightBatches移除)

Batch自从加入到 inFlightBatches 中之后一直迟迟没有完成整个请求链路。

发送/处理Request 时间(包括重试时间)+ 处理Response + UserCallBack 用户回调  > delivery.timeout.ms

假如,发起的Request的目标Node网络异常,也会造成这个情况

2、Batch一直停留在缓存区未被发送

一般这种情况出现的概率很低, 但是不是没有可能

①. 当你的消息生产速度很快, 你设置的buffer.memory 也很大, 一下子存放了非常多的消息。 然而消息发送的性能来不及将这些缓存中的消息在 delivery.timeout.ms最大交付时间内发送出去。

那也就会有一些Batch超时了,以发送失败处理。

②. 命中Bug. 这个Bug就是我之前提的 有个成为Kafka Contributor的机会不要错过...但是该Bug命中条件很苛刻,基本可以忽略不计。目前该Bug的PR已在Kafka_3.3版本提交合并

解决方案

  1. 看看是不是有NETWORK_EXCEPTION告警日志, 有可能某台Node挂掉了说不定。该异常伴随的日志请看上一个异常CASE!
  2. 优先查看UserCallBack回调接口是不是有性能问题, 建议用异步线程处理回调。
  3. 排查Request请求的性能问题,可以通过Jmx指标:request-latency-avg(请求延迟的平均值) request-latency-max(请求延迟的最大值)
    这个指标有两个可以监控的ytpe ①. type = producer-metrics 监控整个生产者②.type = producer-node-metrics 监控生产者与具体的Node的指标
  4. delivery.timeout.ms设置大一点

可以通过监控 producer-metrics 来判断是否存在Request请求性能问题,然后通过 producer-node-metrics排查具体哪个Node有问题


你好,我是彦祖,滴滴Kafka技术专家,LogiKM PMC, CSDN 年度博客之星Topc5、华为云MVP。现在在深度参与开源社区的建设。 想进滴滴交流群,欢迎加彦祖微信,拉你进群交流,跟众多大厂技术大佬一起交流学习~ 图片 最近整理了一份计算机类的书籍,包含python、java、大数据、人工智能、算法等,种类特别齐全。获取方式:关注公众号:石臻臻的杂货铺,回复:福利,就可以获得这份超级大礼!


5LogiKM

LogiKM源码

相关文章
|
8月前
|
消息中间件 大数据 Kafka
【Kafka】Kafka 中生产者运行流程
【4月更文挑战第10天】【Kafka】Kafka 中生产者运行流程
|
8月前
|
消息中间件 缓存 Java
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
85 0
|
2月前
|
消息中间件 Kafka
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
|
2月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
93 2
|
3月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
49 1
|
4月前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
5月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
|
6月前
|
消息中间件 Java Kafka
Java 客户端访问kafka
Java 客户端访问kafka
50 9
|
6月前
|
消息中间件 存储 缓存
深入理解Kafka核心设计及原理(二):生产者
深入理解Kafka核心设计及原理(二):生产者
99 8
|
5月前
|
消息中间件 安全 机器人
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
104 0

热门文章

最新文章