我有一个持续的JSONArray数据产生到Kafka主题,我想处理具有EventTime特征的记录。为了达到这个目标,我必须为包含在JSONArray中的每个记录分配水印。
我没有找到实现这一目标的便捷方法。我的解决方案是使用DataStreamSource>中的数据,然后迭代List并使用匿名ProcessFunction将Object收集到下游,最后为此下游分配水印。
主要代码如下:
DataStreamSource<List<MockData>> listDataStreamSource = KafkaSource.genStream(env);
SingleOutputStreamOperator<MockData> convertToPojo = listDataStreamSource
.process(new ProcessFunction<List<MockData>, MockData>() {
@Override
public void processElement(List<MockData> value, Context ctx, Collector<MockData> out)
throws Exception {
value.forEach(mockData -> out.collect(mockData));
}
});
convertToPojo.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<MockData>(Time.seconds(5)) {
@Override
public long extractTimestamp(MockData element) {
return element.getTimestamp();
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> countStream = convertToPojo
.keyBy("country").window(
SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10)))
.process(
new FlinkEventTimeCountFunction()).name("count elements");
代码似乎没问题,运行没有错误。但是ProcessWindowFunction从未触发过。我跟踪了Flink源代码,发现EventTimeTrigger永远不会返回TriggerResult.FIRE,由TriggerContext.getCurrentWatermark引起,一直返回Long.MIN_VALUE。
什么是在事件时间处理List的正确方法?
问题是您正在将keyBy和window操作应用于convertToPojo流,而不是具有时间戳和水印的流(您没有将其分配给变量)。
如果您或多或少地编写代码,它应该工作:
listDataStreamSource = KafkaSource ...
convertToPojo = listDataStreamSource.process ...
pojoPlusWatermarks = convertToPojo.assignTimestampsAndWatermarks ...
countStream = pojoPlusWatermarks.keyBy ...
在convertToPojo流上调用assignTimestampsAndWatermarks不会修改该流,而是创建包含时间戳和水印的新数据流对象。您需要将窗口应用于该新数据流。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。