我有如下代码,从kafka消费数据,然后根据数据所在的秒(服务器时钟)进行keyby,获取数据所在的分钟的代码:
public static String timeStampToDate(Long timestamp){ ThreadLocal threadLocal = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); String format = threadLocal.get().format(new Date(timestamp)); return format.substring(0,19); }
根据数据所在的分钟keyBy后,我用了一个1min的滚动窗口,每500ms trigger一次,如下:
. . . //根据数据所在的分钟(processingTime) keyBy KeyedStream<ShareRealTimeData, String> keyByStream = signoutTimeAndWM.keyBy(new KeySelector<ShareRealTimeData, String>() { @Override public String getKey(ShareRealTimeData value) throws Exception { return DateUtilMinutes.timeStampToDate(new Date().getTime()); } });
SingleOutputStreamOperator<TreeMap<Double, Tuple2<String, String>>> topNforEveWindow = keyByStream
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(1000)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.milliseconds(500))) // .evictor(TimeEvictor.of(Time.seconds(0), true)) .process(new MyProcessWindowFuncation());
//sink topNforEveWindow.printToErr("topNforEveWindow====");
. . .
程序运行时,随机在某些整分钟时抛出以下空指针警告: 19:49:22,001 WARN org.apache.flink.runtime.taskmanager.Task [] - Window(TumblingProcessingTimeWindows(1000), ContinuousProcessingTimeTrigger, TimeEvictor, ProcessWindowFunction$4) -> Sink: Print to Std. Err (3/8) (222821e43f98390a2f5e3baeb5b542a8) switched from RUNNING to FAILED. java.lang.NullPointerException: null at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336) ~[flink-runtime_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159) ~[flink-runtime_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:99) ~[flink-runtime_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:203) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-runtime_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-runtime_2.11-1.11.2.jar:1.11.2] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
请帮忙查看是什么原因?*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
这个问题en...出在如下地方:
KeyedStream<ShareRealTimeData, String> keyByStream = signoutTimeAndWM.keyBy(new KeySelector<ShareRealTimeData, String>() { @Override public String getKey(ShareRealTimeData value) throws Exception { return DateUtilMinutes.timeStampToDate(new Date().getTime()); // 此处,不可以使用new Date这种当前时间。 } });
修改,如果非要实现这种效果,可以先通过flatMap方式,针对每个元素 new Date 然后将这个date设置到ShareRealTimeData类中的一个字段(比如叫做key)。 然后再 keyBy(e->e.getKey()) 基于key这个字段做keyBy,效果一样,但不会出你这个问题。
原理比较复杂,和Flink的key分发机制有关,你这种写法会导致一个元素的key不稳定,因为实际就是<随机>的key。*来自志愿者整理的flink邮件归档