开发者社区> 问答> 正文

flink频繁报向kafka提交offset失败的错误

大家晚上好,我们这里用flink消费kafka,往mysql写。topic有9个分区,flink任务并行度是3。任务却一直报这个错:2019-08-21 19:33:07,058 WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async Kafka commit failed. org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:792) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:738) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257) 我已经尝试将session.timeout.ms设置成60s,但结果还是一样的。flink任务的逻辑是kafka->map->filter->split,最后select出来两个流,插到两个不同的mysql表。这里是因为并行度是3的原因,所以导致了触发了consumer的rebalance吗?*来自志愿者整理的flink邮件归档

展开
收起
小阿怪 2021-12-07 22:15:44 3137 0
2 条回答
写回答
取消 提交回答
  • 技术小白

    是你并发和分区数不一致导致的吧

    2022-03-16 09:57:27
    赞同 展开评论 打赏
  • 你这个用的是什么版本,我看社区版现在都是支持 kafka assign 接口,走 subscribe 接口的话会走 kafka group 管理,这个出现下面问题的几率会比较大,比如:consumer 超过一定时间没有调用 poll 接口就会触发。*来自志愿者整理的flink邮件归档

    2021-12-08 10:46:28
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载