开发者社区> 问答> 正文

flink timerservice注册的timer定时器只有少部分触发是什么原因?

由于业务需求,需要对一条流中的数据分4个步骤处理,处理完第一步后,再过15分钟进行第二步,第二步完成后再过30分钟处理第三步,然后60分钟后进行第四步,我这边通过timerservice设置定时器的方式进行的实现,但是运行过程中发现第二步15分钟的定时器只有非常少部分消息到时间触发了定时任务onTimer中的逻辑,部分代码实现如下:

DataStream dataStream = env.addSource(…); final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> missPingback = …; final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level1Tag = …; final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level2Tag = …; final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level3Tag = …;

SingleOutputStreamOperator missDs = dataStream .map(…) .filter(…) .assignTimestampsAndWatermarks(new AssignWaterMark()) .keyBy(0) .timeWindow(Time.seconds(winSize)) .process(new BatchMergeProcessFunction(missPingback));

SingleOutputStreamOperator level1MissedDs = missDs .getSideOutput(missPingback) .keyBy(0) // ********** 这一步中,处理类中15分钟的定时器只有非常少部分数据的定时器timer触发了,触发比例10%不到。 ******** .process(new HbaseTimerProcessFunc(level1Tag, 15));

SingleOutputStreamOperator level2MissedDs = level1MissedDs .keyBy(0) .process(new HbaseTimerProcessFunc(level2Tag, 30));

SingleOutputStreamOperator level3MissedDs = level2MissedDs .keyBy(0) .process(new HbaseTimerProcessFunc(level3Tag, 60)); DataStream l1Ds = level1MissedDs.getSideOutput(level1Tag); DataStream l2Ds = level2MissedDs.getSideOutput(level2Tag); DataStream l3Ds = level3MissedDs.getSideOutput(level3Tag);

missDs.union(l1Ds).union(l2Ds).union(l3Ds).addSink(…);

——————————— —> HbaseTimerProcessFunc.class 中定时器设置方法 @Override public void processElement(SimplifiedPingbackMsg value, Context context, Collector out) throws Exception { long ts = System.currentTimeMillis() + 60 * 1000 * timeout; pbState.put(value.getEventId() + ts, value); context.timerService().registerEventTimeTimer(ts); MetricModifyUtils.modifyPerDay(timerTotalCounter, 1); } ———————————

// 水印提取类,类似于直接用了ProcessTime来提取的 class AssignWaterMark implements AssignerWithPeriodicWatermarks<Tuple2<String, SimplifiedPingbackMsg>> {

private long maxOutOfOrderness = 60 * 1000 * 5; private long currentMaxTimestamp = 0L;

@Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxOutOfOrderness); }

@Override public long extractTimestamp(Tuple2<String, SimplifiedPingbackMsg> element, long previousElementTimestamp) { long timestamp = System.currentTimeMillis(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } }

所以为啥这里的实现会出现此类情况呢,这种情况该如何去排查为啥会有绝大多数timer不触发呢,我看了webui中水印生成是正常的。*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-08 11:02:05 1632 0
1 条回答
写回答
取消 提交回答
  • 具体原因不太清楚,但建议使用context.timeService().currentEventTime()和currentProcessingTime()来获取当前的时间。

    排查方法的话,不知道你有没有做算子的单元测试,如果还没有的话可以通过flink test util[1][2]做单元测试来debug排查, 可以比较明确的观察到timeService上的Timer状态。

    [1] https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

    *来自志愿者整理的flink

    2021-12-08 11:23:58
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载