请教个问题,代码一直报 java.lang.NullPointerException错误,但变量明明初始化了,却仍然有该错误,找不到原因,请诸位指教。谢谢。 错误如下: 2021-01-12 04:36:09,950 INFO org.apache.flink.runtime.taskmanager.Task - Window(TumblingEventTimeWindows(60000), EventTimeTrigger, WashDataDetectionFunction) -> Map -> Map -> Sink: Unnamed (1/1) (b015c7cebf71e744f6b50136cdc32e20) switched from RUNNING to FAILED. java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:216) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1238) at java.util.ArrayList$SubList.size(ArrayList.java:1048) at java.util.AbstractList.add(AbstractList.java:108) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90) at com.airen.lasttime.keyedfunction.WashDataDetectionFunction.temperatureFall(WashDataDetectionFunction.java:264) at com.airen.lasttime.keyedfunction.WashDataDetectionFunction.washDetection(WashDataDetectionFunction.java:206) at com.airen.lasttime.keyedfunction.WashDataDetectionFunction.process(WashDataDetectionFunction.java:94) at com.airen.lasttime.keyedfunction.WashDataDetectionFunction.process(WashDataDetectionFunction.java:33) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:784) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213) ... 10 more
涉及的代码如下: private SingleOutputStreamOperator dataClean(DataStream source, ParameterTool pt) { WindowedStream<SockRawEvent, String, TimeWindow> windowStream = source.keyBy(x -> x.mac).window(TumblingEventTimeWindows.of(Time.seconds(60)));
SingleOutputStreamOperator afterWashDetection = windowStream.process(new WashDataDetectionFunction());
SingleOutputStreamOperator afterIdleAndShortLiveClean = afterWashDetection.keyBy(x -> x.mac) .window(TumblingEventTimeWindows.of(Time.seconds(60))).process(new IdleAndShortLiveDataCleanFunction());
DataStream washedData = afterWashDetection.getSideOutput(WashDataDetectionFunction.getWashOutputTag()) .map(x -> SockRowV2.of(x, SockRawMarkEnum.WASH)); writeToKinesis(pt, washedData); DataStream cleanedData = afterIdleAndShortLiveClean.getSideOutput(IdleAndShortLiveDataCleanFunction.getIdleShortLiveOutputTag()) .map(x -> SockRowV2.of(x, SockRawMarkEnum.IDLE_SHORT_LIVE)); writeToKinesis(pt, cleanedData);
return afterIdleAndShortLiveClean; } WashDataDetectionFunction类的process函数如下: @Override public void process(String s, Context context, Iterable elements, Collector out) throws Exception { ArrayList eventList = new ArrayList<>(1600); for (SockRawEvent e : elements) { eventList.add(e); } eventList = eventList.stream().sorted(new Comparator () { @Override public int compare(SockRawEvent o1, SockRawEvent o2) { return Long.compare(o1.utctime.getMillis(), o2.utctime.getMillis()); } }).collect(Collectors.toCollection(ArrayList::new)); // //System.out.println(String.format("windows(%d, %d) key: %s, data: %d", context.window().getStart(), context.window().getEnd(), s, eventList.size())); Map<Long, List > gmap = eventList.stream().collect(Collectors.groupingBy(x -> x.utctime.getMillis() / 180000)); for (Map.Entry<Long, List > entry : gmap.entrySet()) washDetection(context,entry.getValue(), out);
} WashDataDetectionFunction类的temperatureFall方法如下,其中错误中提到的 at com.airen.lasttime.keyedfunction.WashDataDetectionFunction.temperatureFall(WashDataDetectionFunction.java:264)是“if ( null != las3MinuteData && null != (last3m=las3MinuteData.value())) { ”这一行。 private boolean temperatureFall(List washDetectionSockValueList) throws IOException { if (ObjectUtil.isEmpty(washDetectionSockValueList)) return false;
final int temperatureFallThreshold = 3;
List last3m = null; //时间降序 List socksSorted = washDetectionSockValueList.stream() .sorted(new WashDetectionComparator()) .collect(Collectors.toList()); double[] t = new double[socksSorted.size()]; for (int i = 0; i < socksSorted.size(); i++) { t[i] = socksSorted.get(i).getAvgTemper(); } double currVarTempe = StatisticsUtils.getStandardDeviation(t); double lastVarTempe = 0; long startTime; if ( null != las3MinuteData && null != (last3m=las3MinuteData.value())) { last3m.addAll(socksSorted); //更新las3MinuteData if (last3m.size() < 3 * dataNumPerMinute) { las3MinuteData.update(last3m); startTime = last3m.get(0).getUserTime(); } else { List last3mTmp = last3m.subList(last3m.size() - 3 * dataNumPerMinute, last3m.size()); las3MinuteData.update(last3mTmp); startTime = last3mTmp.get(0).getUserTime(); } if (currVarTempe < temperatureFallThreshold) { double[] lltt = new double[last3m.size()]; for (int i = 0; i < last3m.size(); i++) { lltt[i] = last3m.get(i).getAvgTemper(); } lastVarTempe = StatisticsUtils.getStandardDeviation(lltt); } } else { //las3MinuteData为空,更新las3MinuteData if (socksSorted.size() < 3 * dataNumPerMinute) { last3m = socksSorted; } else { last3m = socksSorted.subList(socksSorted.size() - 3 * dataNumPerMinute, socksSorted.size()); } las3MinuteData.update(last3m); startTime = last3m.get(0).getUserTime(); } boolean full = lastVarTempe > temperatureFallThreshold || currVarTempe > temperatureFallThreshold; if (full) { sockCacheStartTime.update(startTime); } return full; }*来自志愿者整理的flink邮件归档
这个错误其实是kryo初始化时候扔出来的。你自定义的类 SockRowV2,WashDetectionSockValue 等,不符合Flink关于pojo的定义,所以会回退到使用kryo进行序列化/反序列化。建议将相关类在kryo上进行注册 [1]。特别地,如果是thrift或者protobuf的类,需要单独注册[2],更好的方法其实是建议将你们的自定义类修改为满足Flink的POJO类 [3]
[1] https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#kryo [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html [3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#rules-for-pojo-types*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。