开发者社区> 问答> 正文
1
0
分享

向kafka写数据,偶尔会报 FlinkKafka011Exception 导致Job停止问题

一个任务从Kafka读数据做统计, 将统计结果写回kafka, 偶尔会报 FlinkKafka011Exception 导致Job停止, 请问大家一般怎么处理的,是catch掉,日志输出吗?

生产者构造代码 FlinkKafkaProducer011 producer = new FlinkKafkaProducer011(kafkaOutputTopic, new KafkaSerializationSchema(KAFKA_OUTPUT_TYPE), producerConfig, Optional.of(new KafkaPartitionerByKey<>()), FlinkKafkaProducer011.Semantic.EXACTLY_ONCE, kafkaProducersPoolSize); kafka配置如下: bootstrap.servers= enable.auto.commit=true max.poll.records=1000

偶尔发生下面异常:

2019-07-20 21:41:40,576 INFO org.apache.flink.runtime.taskmanager.Task - Window(SlidingEventTimeWindows(120000, 60000), EventTimeTrigger, InterfaceStatisticsAggregate, InterfaceStatisticsWindow) -> (Map, Sink: Unnamed, Sink: Unnamed) (4/6) (1a9a0b3a306ca983b2fc4227aa7a10fe) switched from RUNNING to FAILED. java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.xxxxxx.aiops.window.InterfaceStatisticsWindow.lambda$apply$0(InterfaceStatisticsWindow.java:35) //这里代码是:WindowFunction 的 apply 方法,在向 Collector out 里输出内容: out.collect(item) at java.util.Collections$SingletonList.forEach(Collections.java:4822) at com.xxxxxx.aiops.window.InterfaceStatisticsWindow.apply(InterfaceStatisticsWindow.java:31) at com.xxxxxx.aiops.window.InterfaceStatisticsWindow.apply(InterfaceStatisticsWindow.java:16) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) ... 7 more Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send data to Kafka: The server disconnected before a response was received. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:994) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:615) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:94) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) ... 25 more

想请问大家一都如何处理这种情况: 是我哪里配置有问题吗? 我现在处理方式是在输出的地方catch这个Exception, 不知道大家怎么处理: /** * @author butnet */ public class InterfaceStatisticsWindow implements WindowFunction<InterfaceStatisticsOutDto, InterfaceStatisticsOutDto, String, TimeWindow> { private static final Logger log = LoggerFactory.getLogger(InterfaceStatisticsWindow.class); private static final long serialVersionUID = 1L;

@Override public void apply(String key, TimeWindow window, Iterable input, Collector out) throws Exception { if (log.isDebugEnabled()) { log.debug("InterfaceStatisticsWindow apply:" + key + " start:" + window.getStart() + " " + window.getEnd()); } input.forEach((Consumer<? super InterfaceStatisticsOutDto>) ir -> { ir.setWindowsStartTime(window.getStart()); ir.setWindowsEndTime(window.getEnd()); ir.setInterfaceName(key); try { out.collect(ir); } catch (Exception ex) { log.info("输出异常: " + ex.toString(), ex); } if (log.isDebugEnabled()) { log.debug("InterfaceStatisticsWindow apply forEach:" + ir.getInterfaceName()); } }); } }

谢谢*来自志愿者整理的flink邮件归档

展开
收起
雪哥哥 2021-12-07 16:09:32 1451 0
举报
飞天免费试用计划
领取免费云资源,开启云上实践第一步
实时计算 Flink 版
5000CU*H 3个月
额度3个月内有效
日志服务 SLS
月写入数据量 50GB 1个月
额度1个月内有效
1 条回答
写回答
取消 提交回答
  • FlinkKafka011Exception: Failed to send data to Kafka: The server disconnected before a response was received.

    看下这个异常,好像是 sink 数据到 kafka 的时候与 kafka 服务器就断开连接了*来自志愿者整理的flink

    2021-12-07 16:30:07 举报
    赞同 评论 打赏

    评论

    全部评论 (0)

    登录后可评论
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等