开发者社区> 问答> 正文

会话窗口关闭时间的问题

各位大神,你们好:

最近有一个问题一直困扰着我:我设置的会话窗口,会在非活动状态10s后结束 窗口,发现它会在下次窗口生成时才发送本窗口处理完的数据,而我想在本次窗口结束 时发送这个数据,应该如何处理?万分感激

// 这里配置了kafka的信息,并进行数据流的输入

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

FlinkKafkaConsumer010 kafkaSource = new FlinkKafkaConsumer010<>("rfid-input-topic",

new RfidRawDataSchema(), props);

kafkaSource.assignTimestampsAndWatermarks(new CarTimestamp());

DataStream dataStream = env.addSource(kafkaSource);

// 会话窗口:如果用户处于非活动状态长达10s,则认为会话结束。Reduce中写 的是窗口融合的方法

DataStream outputStream = dataStream.keyBy("uniqueId")

.window(EventTimeSessionWindows.withGap(Time.seconds(10))).reduce(new

RfidReduceFunction());

//通过kafka数据流的输出

outputStream.addSink(new FlinkKafkaProducer010<>("rfid-output-topic", new RfidRawDataSchema(), props));

try {

env.execute("Flink add data source");

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}*来自志愿者整理的flink邮件归档

展开
收起
毛毛虫雨 2021-12-07 14:06:07 653 0
1 条回答
写回答
取消 提交回答
  • 您好,下面是个人理解:

    首先,这个问题不是Session窗口的问题,滚动和滑动是一样的。

    时间窗口的计算输出是由时间特性确定的,目前 1. 只有processing-time即处理时间(没有水位线处理乱序)能够满足及时输出窗口的结果。 2. 把eventtime的水位线时间戳设置为System.currentTimeMillis()也是可以及时输出窗口的结果,但是只会消费Flink程序启动后接收到的新消息,之前的消息是处理不到的,即便是新的消费者组group.id和earliest也无效【意思就是容错和重播失效,当然还可以再反复验证】。

    目前EventTime-事件时间做到实时正确性的前提:数据的事件时间间隔小,或者小于窗口时间间隔就可以了,保证数据流不中断,这样就把不及时输出窗口的时间点无限推到无穷大的未来,即程序最终崩溃或者下线那一刻。

    水位线是用来处理事件乱序的,水位线的增长依赖数据的输入,这个是很明显的咯,assignTimestampsAndWatermarks的时候根据事件时间推算的嘛,而且还会减掉一点时间,就是多掳一点数据,所以数据中断了,就是水位线停止增长了。

    然后再来看,事件时间窗口默认使用的窗口触发源码: onElement和onEventTime时才有机会TriggerResult.FIRE; onElement时会判断水位线。

    onEventTime时会根据水位线设置的时间戳定时器进行时间比较。 onEventTime往上找会找到InternalTimerServiceImpl#advanceWatermark 再往上找会到AbstractStreamOperator#processWatermark, 也就是和新的数据进来有关。

    结论就是,如果当前事件时间窗口的end时间还没到,然而水位线是小于这个end时间的,如果处理乱序的间隔比较大,甚至会有多个窗口的end时间都大于最近的水位线时间戳,那不就是把窗口往后退了嘛...只有更后面的数据到来,新的水位线增长上去,前面滞留的窗口数据才有机会输出。

    所以我的想法是,在每一个时间窗口上面加上一个判断,只要当前窗口未关闭未触发,窗口的end时间大于或等于自然时间点就触发【保证只触发一次就好】,不需要等到下一次水位线增长。

    另外,目前的事件时间是符合自然的实时流数据语义的,可是,业务数据有时候间隔还是蛮大的,毕竟有一些阶段数据比较密集,有一些阶段数据比较稀疏。

    以上为个人理解,也遇到同样的问题,甚至认为事件时间在Flink这里毫无意义,如有哪里不对的地方,做梦都想肯定是哪里不对,欢迎讨论,如果真的不对,希望能给出正确的demo,这样就可以完美的用于生产了。

    还有就是我默认为,窗口是根据事件已经确定好了的: 时间窗口的生成:

    模板方法-处理水位线:AbstractStreamOperator#processWatermark

    InternalTimerServiceImpl#advanceWatermark

    默认的事件时间触发器:

    在 2019-04-29 18:06:30,by1507118@buaa.edu.cn 写道:

    各位大神,你们好:

    最近有一个问题一直困扰着我:我设置的会话窗口,会在非活动状态10s后结束窗口,发现它会在下次窗口生成时才发送本窗口处理完的数据,而我想在本次窗口结束时发送这个数据,应该如何处理?万分感激

    // 这里配置了kafka的信息,并进行数据流的输入

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    FlinkKafkaConsumer010 kafkaSource = new FlinkKafkaConsumer010<>("rfid-input-topic",

    new RfidRawDataSchema(), props);

    kafkaSource.assignTimestampsAndWatermarks(new CarTimestamp());

    DataStream dataStream = env.addSource(kafkaSource);

    // 会话窗口:如果用户处于非活动状态长达10s,则认为会话结束。Reduce中写的是窗口融合的方法

    DataStream outputStream = dataStream.keyBy("uniqueId")

    .window(EventTimeSessionWindows.withGap(Time.seconds(10))).reduce(new

    RfidReduceFunction());

    //通过kafka数据流的输出

    outputStream.addSink(new FlinkKafkaProducer010<>("rfid-output-topic", new RfidRawDataSchema(), props));

    try {

    env.execute("Flink add data source");

    } catch (Exception e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }*来自志愿者整理的flink

    2021-12-07 15:25:59
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载