开发者社区 问答 正文

flink 空指针警告

我有如下代码,从kafka消费数据,然后根据数据所在的秒(服务器时钟)进行keyby,获取数据所在的分钟的代码:

public static String timeStampToDate(Long timestamp){ ThreadLocal threadLocal = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); String format = threadLocal.get().format(new Date(timestamp)); return format.substring(0,19); }

根据数据所在的分钟keyBy后,我用了一个1min的滚动窗口,每500ms trigger一次,如下:

. . . //根据数据所在的分钟(processingTime) keyBy KeyedStream<ShareRealTimeData, String> keyByStream = signoutTimeAndWM.keyBy(new KeySelector<ShareRealTimeData, String>() { @Override public String getKey(ShareRealTimeData value) throws Exception { return DateUtilMinutes.timeStampToDate(new Date().getTime()); } });

SingleOutputStreamOperator<TreeMap<Double, Tuple2<String, String>>> topNforEveWindow = keyByStream

.window(TumblingProcessingTimeWindows.of(Time.milliseconds(1000)))

.trigger(ContinuousProcessingTimeTrigger.of(Time.milliseconds(500))) // .evictor(TimeEvictor.of(Time.seconds(0), true)) .process(new MyProcessWindowFuncation());

//sink topNforEveWindow.printToErr("topNforEveWindow====");

. . .

程序运行时,随机在某些整分钟时抛出以下空指针警告: 19:49:22,001 WARN org.apache.flink.runtime.taskmanager.Task [] - Window(TumblingProcessingTimeWindows(1000), ContinuousProcessingTimeTrigger, TimeEvictor, ProcessWindowFunction$4) -> Sink: Print to Std. Err (3/8) (222821e43f98390a2f5e3baeb5b542a8) switched from RUNNING to FAILED. java.lang.NullPointerException: null at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336) ~[flink-runtime_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159) ~[flink-runtime_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:99) ~[flink-runtime_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:203) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-runtime_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-runtime_2.11-1.11.2.jar:1.11.2] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]

请帮忙查看是什么原因?*来自志愿者整理的flink邮件归档

展开
收起
毛毛虫雨 2021-12-08 11:24:48 1079 分享 版权
1 条回答
写回答
取消 提交回答
  • 这个问题en...出在如下地方:

    KeyedStream<ShareRealTimeData, String> keyByStream = signoutTimeAndWM.keyBy(new KeySelector<ShareRealTimeData, String>() { @Override public String getKey(ShareRealTimeData value) throws Exception { return DateUtilMinutes.timeStampToDate(new Date().getTime()); // 此处,不可以使用new Date这种当前时间。 } });

    修改,如果非要实现这种效果,可以先通过flatMap方式,针对每个元素 new Date 然后将这个date设置到ShareRealTimeData类中的一个字段(比如叫做key)。 然后再 keyBy(e->e.getKey()) 基于key这个字段做keyBy,效果一样,但不会出你这个问题。

    原理比较复杂,和Flink的key分发机制有关,你这种写法会导致一个元素的key不稳定,因为实际就是<随机>的key。*来自志愿者整理的flink邮件归档

    2021-12-08 16:20:14
    赞同 展开评论