指示匹配过程后的跳过策略
今天讲的是 flink cep 如何实现 多个窗口之间的滚动匹配
即避免以下这种情况出现,当然是否需要避免取决你的工作需求或者要学习什么东西
flink cep pattern 代码
//定义一个匹配模式 val pattern: Pattern[weather, weather] = Pattern .begin[weather]("one") //一次或多次 .timesOrMore(12) //时间是12小时的范围 相当于窗口 .within(Time.hours(12))
out -> {[1,May 10 15:00:00 ],[1,May 10 16:00:00 ],[1,May 10 17:00:00 ],[1,May 10 18:00:00 ]} out -> {[1,May 10 16:00:00 ],[1,May 10 17:00:00 ],[1,May 10 18:00:00 ],[1,May 10 19:00:00 ]} out -> {[1,May 10 17:00:00 ],[1,May 10 18:00:00 ],[1,May 10 19:00:00 ],[1,May 10 20:00:00 ]}
然而你想要的是这样的匹配规则,则是匹配过一次之后就不再使用这条数据作为其他匹配的数据源
out -> {[1,May 10 15:00:00 ],[1,May 10 16:00:00 ],[1,May 10 17:00:00 ],[1,May 10 18:00:00 ]} out -> {[1,May 10 19:00:00 ],[1,May 10 20:00:00 ],[1,May 10 21:00:00 ],[1,May 10 22:00:00 ]} out -> {[1,May 10 23:00:00 ],[1,May 11 00:00:00 ],[1,May 11 01:00:00 ],[1,May 11 02:00:00 ]}
修改匹配模式代码
/** * AfterMatchSkipStrategy 指示匹配过程后的跳过策略 */ //定义一个匹配模式 val pattern: Pattern[weather, weather] = Pattern //AfterMatchSkipStrategy 指示匹配过程后的跳过策略 //skipPastLastEvent 丢弃在发出的匹配结束之前开始的每个部分匹配 .begin[weather]("one" , AfterMatchSkipStrategy.skipPastLastEvent()) //一次或多次 .timesOrMore(1) //时间是12小时的范围 相当于窗口 .within(Time.hours(5))
得到臆想的数据
然后就可以得到我们想要的数据啦 看一下我的实际数据
==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0))) ---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0))) ==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0))) ---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0))) ==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0))) ---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0))) ==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0))) ---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0)))
前方有危险
这个方法的应用实际上是有问题的,我们来分析一下
我们的规则是匹配过的数据不可以再次进行匹配,然而当我们设置了.timesOrMore(1)
我们会发现数据里每次只有一条数据,这是为什么呢?
我分析.timesOrMore(1)
会使模式匹配将所有可以匹配到的数据数据量达到1次时就进行收集,如果我们加上了.begin[weather]("one",AfterMatchSkipStrategy.skipPastLastEvent())
,這個我们刚刚学了,他是使将所有匹配过的数据不再进行匹配,这个规则再加上.timesOrMore(1)
会默认有一条1条数据的匹配,所以造成了所有的数据匹配都是一条。!?
重点碧坑
如果我们需要用到.begin[weather]("one",AfterMatchSkipStrategy.skipPastLastEvent())
和.timesOrMore(1)那末我们一定要考虑每次匹配的数据至少有几条,其实这里就已经固定了条数,而不再是或者更更多条数的匹配规则,这里可以考虑动态更改匹配条数,可以做数据库持久化,然后实时动态更新。动态更改所需匹配的固定条数,这里由于才疏学浅,尚未摸索到更好的方法,后续几天如果有解决方案我会更新这个博客。革命尚未成功-同志仍须努力
跳过策略
接下来我们讨论不同跳过策略对匹配结果的影响:
- 不跳过(NO_SKIP)
- 代码调用 AfterMatchSkipStrategy.noSkip()。这是默认策略,所有可能的匹配都会输出。所以这里会输出完整的 6 个匹配。
- 跳至下一个(SKIP_TO_NEXT)
- 代码调用 AfterMatchSkipStrategy.skipToNext()。找到一个 a1 开始的最大匹配之后,跳过a1 开始的所有其他匹配,直接从下一个 a2 开始匹配起。当然 a2 也是如此跳过其他匹配。最终得到(a1 a2 a3 b),(a2 a3 b),(a3 b)。可以看到,这种跳过策略跟使用.greedy()效果是相同的。
- 跳过所有子匹配(SKIP_PAST_LAST_EVENT)
- 代码调用 AfterMatchSkipStrategy.skipPastLastEvent()。找到 a1 开始的匹配(a1 a2 a3 b)之后,直接跳过所有 a1 直到 a3 开头的匹配,相当于把这些子匹配都跳过了。最终得到(a1 a2 a3 b),这是最为精简的跳过策略。
- 跳至第一个(SKIP_TO_FIRST[a])
- 代码调用 AfterMatchSkipStrategy.skipToFirst(“a”),这里传入一个参数,指明跳至哪个模式的第一个匹配事件。找到 a1 开始的匹配(a1 a2 a3 b)后,跳到以最开始一个 a(也就是 a1)为开始的匹配,相当于只留下 a1 开始的匹配。最终得到(a1 a2 a3 b),(a1 a2 b),(a1 b)。
- 跳至最后一个(SKIP_TO_LAST[a])
- 代码调用 AfterMatchSkipStrategy.skipToLast(“a”),同样传入一个参数,指明跳至哪个模式的最后一个匹配事件。找到 a1 开始的匹配(a1 a2 a3 b)后,跳过所有 a1、a2 开始的匹配,跳到以最后一个 a(也就是 a3)为开始的匹配。最终得到(a1 a2 a3 b),(a3 b)。