开发者社区> 问答> 正文

Flink - 事件时间滑动窗口,由于时间间隔,窗口中缺少数据

假设我有一系列股市交易事件,如下所示:

technical1, ALXN, 1/1/2016
technical1, CELG, 1/1/2016
technical2, ALXN, 1/2/2016
technical2, CELG, 1/2/2016
. . .
technicalN, ALXN, 4/1/2018
technicalN, CELG, 4/1/2018
这样,技术N(其中N是某个数字)代表日终库存的第N个技术交易条目[开放(浮动),高(浮动),低(浮动),收盘(浮动),交易量(整数)]给定公司的市场交易数据。(即股票代码GOOG的技术1与股票代码MSFT的技术1不同。)如:

12.52, 19.25, 09.11, 17.54, 120532, GOOG, 1/1/2017
14.37, 29.52, 01.53, 12.96, 627156, MSFT, 1/1/2017
(请注意,这些交易价格/交易量完全是虚构的。)

假设我想创建一个大小为2的窗口,间隔为1天,这样我们的数据看起来像这样:

[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; technical5, GOOG, 12/30/2017]
[technical4, MSFT, 12/29/2017; technical5, MSFT, 12/30/2017]
[technical5, GOOG, 12/30/2017; technical6, GOOG, 12/31/2017]
[technical5, MSFT, 12/30/2017; technical6, MSFT, 12/31/2017]
[technical6, GOOG, 12/31/2017; technical7, GOOG, 01/01/2018]
[technical6, MSFT, 12/31/2017; technical7, MSFT, 01/01/2018]
[technical7, GOOG, 01/01/2018; technical8, GOOG, 01/02/2018]
[technical7, MSFT, 01/01/2018; technical8, MSFT, 01/02/2018]
[technical8, GOOG, 01/02/2018; technical9, GOOG, 01/03/2018]
[technical8, MSFT, 01/02/2018; technical9, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]
. . .
这样会很好,但这是有问题的,因为股市交易日期不是连续的...换句话说,如果我正确理解Flink的机制(我可能是错的),使用事件时滑动窗口的问题就像这个:

DataStream input = ...;

// sliding event-time windows
input
.keyBy((TechnicalDataEntry technical) -> technical.ticker)
.window(SlidingEventTimeWindows.of(Time.day(2), Time.day(1))) // Window size of 2 days, sliding interval of 1 day
.();
在这样的数据上,日期值不是连续的(意味着它们遵循包含一个或多个缺失日的不连续性的离散系列),因为股票市场关闭的日期没有股票市场数据,例如假期或周末。因此,考虑到这一点,我们的流实际上最终会看起来更像这样(因为交易在12/30/2017,12/31/2017和01/01/2018关闭):

[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; NULL]
[technical4, MSFT, 12/29/2017; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; technical8, GOOG, 01/02/2018]
[NULL; technical8, MSFT, 01/02/2018]
[technical8, GOOG, 01/02/2018; technical9, GOOG, 01/03/2018]
[technical8, MSFT, 01/02/2018; technical9, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]
如何让我的Flink流忽略丢失的日期(而不是窗口或连接或映射连续的非缺失日期),以便我的流看起来像这样:

[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; technical5, GOOG, 01/02/2018]
[technical4, MSFT, 12/29/2017; technical5, MSFT, 01/02/2018]
[technical5, GOOG, 01/02/2018; technical6, GOOG, 01/03/2018]
[technical5, MSFT, 01/02/2018; technical6, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]

(注意:请忽略我通过字符串“技术”(如technical1,technical2等)递增数字的方式,因为正如我已经提到的那样,该值仅用于本文中的描述性目的而不是实际存在于数据中。确定两个交易分录是否连续的唯一方法是通过股票代码对它们进行分组并按交易日期对它们进行排序。假设不存在重复事件。)

展开
收起
flink小助手 2018-12-06 18:07:43 5027 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    如果我理解正确你的问题是因为有一段时间你没有收到事件,那么窗户就不会表现得很好,因为他们不知道时间的流逝。

    你有一个选择是peridiocally发出这样的水印:

    streamEnvironment.addSource(new SourceFunction

        @Override
        public void run(final SourceContext<Object> ctx) {
            (...)
    
            ctx.emitWatermark(new Watermark(timestamp));
        }
    
        @Override
        public void cancel() {
    
        }
    })

    请记住,如果您在水印之前收到事件,它们将被忽略,因此水印发射的周期性是“窗口准确度”和容忍迟发事件之间的权衡。

    2019-07-17 23:18:39
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载