一个任务从Kafka读数据做统计, 将统计结果写回kafka, 偶尔会报 FlinkKafka011Exception 导致Job停止, 请问大家一般怎么处理的,是catch掉,日志输出吗?
生产者构造代码 FlinkKafkaProducer011 producer = new FlinkKafkaProducer011(kafkaOutputTopic, new KafkaSerializationSchema(KAFKA_OUTPUT_TYPE), producerConfig, Optional.of(new KafkaPartitionerByKey<>()), FlinkKafkaProducer011.Semantic.EXACTLY_ONCE, kafkaProducersPoolSize); kafka配置如下: bootstrap.servers= enable.auto.commit=true max.poll.records=1000
偶尔发生下面异常:
2019-07-20 21:41:40,576 INFO org.apache.flink.runtime.taskmanager.Task - Window(SlidingEventTimeWindows(120000, 60000), EventTimeTrigger, InterfaceStatisticsAggregate, InterfaceStatisticsWindow) -> (Map, Sink: Unnamed, Sink: Unnamed) (4/6) (1a9a0b3a306ca983b2fc4227aa7a10fe) switched from RUNNING to FAILED. java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.xxxxxx.aiops.window.InterfaceStatisticsWindow.lambda$apply$0(InterfaceStatisticsWindow.java:35) //这里代码是:WindowFunction 的 apply 方法,在向 Collector out 里输出内容: out.collect(item) at java.util.Collections$SingletonList.forEach(Collections.java:4822) at com.xxxxxx.aiops.window.InterfaceStatisticsWindow.apply(InterfaceStatisticsWindow.java:31) at com.xxxxxx.aiops.window.InterfaceStatisticsWindow.apply(InterfaceStatisticsWindow.java:16) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) ... 7 more Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send data to Kafka: The server disconnected before a response was received. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:994) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:615) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:94) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) ... 25 more
想请问大家一都如何处理这种情况: 是我哪里配置有问题吗? 我现在处理方式是在输出的地方catch这个Exception, 不知道大家怎么处理: /** * @author butnet */ public class InterfaceStatisticsWindow implements WindowFunction<InterfaceStatisticsOutDto, InterfaceStatisticsOutDto, String, TimeWindow> { private static final Logger log = LoggerFactory.getLogger(InterfaceStatisticsWindow.class); private static final long serialVersionUID = 1L;
@Override public void apply(String key, TimeWindow window, Iterable input, Collector out) throws Exception { if (log.isDebugEnabled()) { log.debug("InterfaceStatisticsWindow apply:" + key + " start:" + window.getStart() + " " + window.getEnd()); } input.forEach((Consumer<? super InterfaceStatisticsOutDto>) ir -> { ir.setWindowsStartTime(window.getStart()); ir.setWindowsEndTime(window.getEnd()); ir.setInterfaceName(key); try { out.collect(ir); } catch (Exception ex) { log.info("输出异常: " + ex.toString(), ex); } if (log.isDebugEnabled()) { log.debug("InterfaceStatisticsWindow apply forEach:" + ir.getInterfaceName()); } }); } }
谢谢*来自志愿者整理的flink邮件归档
FlinkKafka011Exception: Failed to send data to Kafka: The server disconnected before a response was received.
看下这个异常,好像是 sink 数据到 kafka 的时候与 kafka 服务器就断开连接了*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
你好,我是AI助理
可以解答问题、推荐解决方案等
评论
全部评论 (0)