使用Flink为DateStreamSource <List <T >>分配水印的正确方法-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

使用Flink为DateStreamSource <List <T >>分配水印的正确方法

flink小助手 2018-12-13 14:17:27 1758

我有一个持续的JSONArray数据产生到Kafka主题,我想处理具有EventTime特征的记录。为了达到这个目标,我必须为包含在JSONArray中的每个记录分配水印。

我没有找到实现这一目标的便捷方法。我的解决方案是使用DataStreamSource>中的数据,然后迭代List并使用匿名ProcessFunction将Object收集到下游,最后为此下游分配水印。

主要代码如下:

DataStreamSource<List<MockData>> listDataStreamSource = KafkaSource.genStream(env);
SingleOutputStreamOperator<MockData> convertToPojo = listDataStreamSource
    .process(new ProcessFunction<List<MockData>, MockData>() {
      @Override
      public void processElement(List<MockData> value, Context ctx, Collector<MockData> out)
          throws Exception {
        value.forEach(mockData -> out.collect(mockData));
      }
    });
convertToPojo.assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor<MockData>(Time.seconds(5)) {
      @Override
      public long extractTimestamp(MockData element) {
        return element.getTimestamp();
      }
    });
SingleOutputStreamOperator<Tuple2<String, Long>> countStream = convertToPojo
    .keyBy("country").window(
        SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10)))
    .process(
        new FlinkEventTimeCountFunction()).name("count elements");

代码似乎没问题,运行没有错误。但是ProcessWindowFunction从未触发过。我跟踪了Flink源代码,发现EventTimeTrigger永远不会返回TriggerResult.FIRE,由TriggerContext.getCurrentWatermark引起,一直返回Long.MIN_VALUE。

什么是在事件时间处理List的正确方法?

消息中间件 Kafka 数据安全/隐私保护 流计算
分享到
取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:20:36

    问题是您正在将keyBy和window操作应用于convertToPojo流,而不是具有时间戳和水印的流(您没有将其分配给变量)。

    如果您或多或少地编写代码,它应该工作:

    listDataStreamSource = KafkaSource ...
    convertToPojo = listDataStreamSource.process ...
    pojoPlusWatermarks = convertToPojo.assignTimestampsAndWatermarks ...
    countStream = pojoPlusWatermarks.keyBy ...
    在convertToPojo流上调用assignTimestampsAndWatermarks不会修改该流,而是创建包含时间戳和水印的新数据流对象。您需要将窗口应用于该新数据流。

    0 0
微服务
使用钉钉扫一扫加入圈子
+ 订阅

构建可靠、高效、易扩展的技术基石

推荐文章
相似问题