flink cep 跳过策略 AfterMatchSkipStrategy.skipPastLastEvent() 匹配过的不再匹配 碧坑指南

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 指示匹配过程后的跳过策略今天讲的是 flink cep 如何实现 多个窗口之间的滚动匹配即避免以下这种情况出现,当然是否需要避免取决你的工作需求或者要学习什么东西flink cep pattern 代码

指示匹配过程后的跳过策略

今天讲的是 flink cep 如何实现 多个窗口之间的滚动匹配

即避免以下这种情况出现,当然是否需要避免取决你的工作需求或者要学习什么东西

flink cep pattern 代码

//定义一个匹配模式
val pattern: Pattern[weather, weather] = Pattern
      .begin[weather]("one")
      //一次或多次
      .timesOrMore(12)
      //时间是12小时的范围 相当于窗口
      .within(Time.hours(12))
out -> {[1,May 10 15:00:00 ],[1,May 10 16:00:00 ],[1,May 10 17:00:00 ],[1,May 10 18:00:00 ]}
out -> {[1,May 10 16:00:00 ],[1,May 10 17:00:00 ],[1,May 10 18:00:00 ],[1,May 10 19:00:00 ]}
out -> {[1,May 10 17:00:00 ],[1,May 10 18:00:00 ],[1,May 10 19:00:00 ],[1,May 10 20:00:00 ]}

然而你想要的是这样的匹配规则,则是匹配过一次之后就不再使用这条数据作为其他匹配的数据源

out -> {[1,May 10 15:00:00 ],[1,May 10 16:00:00 ],[1,May 10 17:00:00 ],[1,May 10 18:00:00 ]}
out -> {[1,May 10 19:00:00 ],[1,May 10 20:00:00 ],[1,May 10 21:00:00 ],[1,May 10 22:00:00 ]}
out -> {[1,May 10 23:00:00 ],[1,May 11 00:00:00 ],[1,May 11 01:00:00 ],[1,May 11 02:00:00 ]}

修改匹配模式代码

    /**
     * AfterMatchSkipStrategy 指示匹配过程后的跳过策略
     */
     //定义一个匹配模式
    val pattern: Pattern[weather, weather] = Pattern
    //AfterMatchSkipStrategy 指示匹配过程后的跳过策略
    //skipPastLastEvent  丢弃在发出的匹配结束之前开始的每个部分匹配
      .begin[weather]("one" , AfterMatchSkipStrategy.skipPastLastEvent())
            //一次或多次
      .timesOrMore(1)
      //时间是12小时的范围 相当于窗口
      .within(Time.hours(5))

得到臆想的数据

然后就可以得到我们想要的数据啦 看一下我的实际数据

==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0)))
 ---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0)))
 ==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0)))
 ---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0)))
 ==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0)))
 ---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0)))
 ==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0)))
 ---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0)))

前方有危险

这个方法的应用实际上是有问题的,我们来分析一下

我们的规则是匹配过的数据不可以再次进行匹配,然而当我们设置了.timesOrMore(1)们会发现数据里每次只有一条数据,这是为什么呢?

我分析.timesOrMore(1)会使模式匹配将所有可以匹配到的数据数据量达到1次时就进行收集,如果我们加上了.begin[weather]("one",AfterMatchSkipStrategy.skipPastLastEvent()),這個我们刚刚学了,他是使将所有匹配过的数据不再进行匹配,这个规则再加上.timesOrMore(1)会默认有一条1条数据的匹配,所以造成了所有的数据匹配都是一条。!?

重点碧坑

如果我们需要用到.begin[weather]("one",AfterMatchSkipStrategy.skipPastLastEvent())

.timesOrMore(1)那末我们一定要考虑每次匹配的数据至少有几条,其实这里就已经固定了条数,而不再是或者更更多条数的匹配规则,这里可以考虑动态更改匹配条数,可以做数据库持久化,然后实时动态更新。动态更改所需匹配的固定条数,这里由于才疏学浅,尚未摸索到更好的方法,后续几天如果有解决方案我会更新这个博客。革命尚未成功-同志仍须努力

跳过策略

接下来我们讨论不同跳过策略对匹配结果的影响:

  • 不跳过(NO_SKIP)
  • 代码调用 AfterMatchSkipStrategy.noSkip()。这是默认策略,所有可能的匹配都会输出。所以这里会输出完整的 6 个匹配。


  • 跳至下一个(SKIP_TO_NEXT)
  • 代码调用 AfterMatchSkipStrategy.skipToNext()。找到一个 a1 开始的最大匹配之后,跳过a1 开始的所有其他匹配,直接从下一个 a2 开始匹配起。当然 a2 也是如此跳过其他匹配。最终得到(a1 a2 a3 b),(a2 a3 b),(a3 b)。可以看到,这种跳过策略跟使用.greedy()效果是相同的。


  • 跳过所有子匹配(SKIP_PAST_LAST_EVENT)
  • 代码调用 AfterMatchSkipStrategy.skipPastLastEvent()。找到 a1 开始的匹配(a1 a2 a3 b)之后,直接跳过所有 a1 直到 a3 开头的匹配,相当于把这些子匹配都跳过了。最终得到(a1 a2 a3 b),这是最为精简的跳过策略。


  • 跳至第一个(SKIP_TO_FIRST[a])
  • 代码调用 AfterMatchSkipStrategy.skipToFirst(“a”),这里传入一个参数,指明跳至哪个模式的第一个匹配事件。找到 a1 开始的匹配(a1 a2 a3 b)后,跳到以最开始一个 a(也就是 a1)为开始的匹配,相当于只留下 a1 开始的匹配。最终得到(a1 a2 a3 b),(a1 a2 b),(a1 b)。


  • 跳至最后一个(SKIP_TO_LAST[a])
  • 代码调用 AfterMatchSkipStrategy.skipToLast(“a”),同样传入一个参数,指明跳至哪个模式的最后一个匹配事件。找到 a1 开始的匹配(a1 a2 a3 b)后,跳过所有 a1、a2 开始的匹配,跳到以最后一个 a(也就是 a3)为开始的匹配。最终得到(a1 a2 a3 b),(a3 b)。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
流计算
在Flink中,Regular Join(包括Left Join)的结果顺序是由Flink的分区策略和数据的分布方式共同决定的
在Flink中,Regular Join(包括Left Join)的结果顺序是由Flink的分区策略和数据的分布方式共同决定的
62 1
|
1月前
|
分布式计算 监控 大数据
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
65 0
|
1月前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
41 0
|
1月前
|
分布式计算 监控 大数据
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
53 0
|
3月前
|
监控 Java API
【揭秘】如何用Flink CEP揪出那些偷偷摸摸连续登录失败的“捣蛋鬼”?——一场数据流中的侦探游戏
【8月更文挑战第26天】Flink 是一款先进的流处理框架,提供复杂事件处理(CEP)功能以识别实时数据流中的特定模式。CEP 在 Flink 中通过 `CEP` API 实现,支持基于模式匹配的事件检测。本文通过监测用户连续三次登录失败的具体案例介绍 Flink CEP 的工作原理与应用方法。首先创建 Flink 环境并定义数据源,接着利用 CEP 定义连续三次失败登录的模式,最后处理匹配结果并输出警报。Flink CEP 能够轻松扩展至更复杂的场景,如异常行为检测和交易欺诈检测等,有效应对多样化的业务需求。
44 0
|
4月前
|
监控 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在进行全量数据初始化时,连接器一般会采用什么策略
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
数据处理 Apache 流计算
【Flink】Flink的CEP机制
【4月更文挑战第21天】【Flink】Flink的CEP机制
|
6月前
|
资源调度 监控 Java
实时计算 Flink版产品使用合集之如何使用CEP库进行数据处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
定位技术 流计算
在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
【2月更文挑战第12天】在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
67 3
|
6月前
|
SQL 消息中间件 Apache
flink问题之cep超时事件如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。