flink cep 如何实现 跳过策略

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: flink cep 如何实现 跳过策略

指示匹配过程后的跳过策略

今天讲的是 flink cep 如何实现 多个窗口之间的滚动匹配

即避免以下这种情况出现,当然是否需要避免取决你的工作需求或者要学习什么东西

flink cep pattern 代码


```scala

//定义一个匹配模式

val pattern: Pattern[weather, weather] = Pattern

     .begin[weather]("one")

     //一次或多次

     .timesOrMore(12)

     //时间是12小时的范围 相当于窗口

     .within(Time.hours(12))

```


```bash

out -> {[1,May 10 15:00:00 ],[1,May 10 16:00:00 ],[1,May 10 17:00:00 ],[1,May 10 18:00:00 ]}

out -> {[1,May 10 16:00:00 ],[1,May 10 17:00:00 ],[1,May 10 18:00:00 ],[1,May 10 19:00:00 ]}

out -> {[1,May 10 17:00:00 ],[1,May 10 18:00:00 ],[1,May 10 19:00:00 ],[1,May 10 20:00:00 ]}

```

然而你想要的是这样的匹配规则,则是匹配过一次之后就不再使用这条数据作为其他匹配的数据源


```bash

out -> {[1,May 10 15:00:00 ],[1,May 10 16:00:00 ],[1,May 10 17:00:00 ],[1,May 10 18:00:00 ]}

out -> {[1,May 10 19:00:00 ],[1,May 10 20:00:00 ],[1,May 10 21:00:00 ],[1,May 10 22:00:00 ]}

out -> {[1,May 10 23:00:00 ],[1,May 11 00:00:00 ],[1,May 11 01:00:00 ],[1,May 11 02:00:00 ]}

```

修改匹配模式代码


```scala

   /**

    * AfterMatchSkipStrategy 指示匹配过程后的跳过策略

    */

    //定义一个匹配模式

   val pattern: Pattern[weather, weather] = Pattern

   //AfterMatchSkipStrategy 指示匹配过程后的跳过策略

   //skipPastLastEvent  丢弃在发出的匹配结束之前开始的每个部分匹配

     .begin[weather]("one" , AfterMatchSkipStrategy.skipPastLastEvent())

           //一次或多次

     .timesOrMore(1)

     //时间是12小时的范围 相当于窗口

     .within(Time.hours(5))

```

## 得到臆想的数据

然后就可以得到我们想要的数据啦  看一下我的实际数据


```bash

==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0)))

---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0)))

==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0)))

---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0)))

==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0)))

---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0)))

==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0)))

---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0)))

```

## **前方有危险**

这个方法的应用实际上是有问题的,我们来分析一下

我们的规则是匹配过的数据不可以再次进行匹配,然而当我们设置了`.timesOrMore(1)`我们会发现数据里每次只有一条数据,这是为什么呢?

我分析`.timesOrMore(1)`会使模式匹配将所有可以匹配到的数据数据量达到1次时就进行收集,如果我们加上了`.begin[weather]("one",AfterMatchSkipStrategy.skipPastLastEvent())`,這個我们刚刚学了,他是使将所有匹配过的数据不再进行匹配,这个规则再加上`.timesOrMore(1)`会默认有一条1条数据的匹配,所以造成了所有的数据匹配都是一条。!?

## **重点碧坑**

如果我们需要用到`.begin[weather]("one",AfterMatchSkipStrategy.skipPastLastEvent())`和`.timesOrMore(1)`那末我们一定要考虑每次匹配的数据至少有几条,其实这里就已经固定了条数,而不再是或者更更多条数的匹配规则,这里可以考虑动态更改匹配条数,可以做数据库持久化,然后实时动态更新。动态更改所需匹配的固定条数,这里由于才疏学浅,尚未摸索到更好的方法,后续几天如果有解决方案我会更新这个博客。



`革命尚未成功-同志仍须努力`



## **跳过策略**

接下来我们讨论不同跳过策略对匹配结果的影响:


- 不跳过(NO_SKIP)

代码调用 AfterMatchSkipStrategy.noSkip()。这是默认策略,所有可能的匹配都会输出。所以这里会输出完整的 6 个匹配。


- 跳至下一个(SKIP_TO_NEXT)

代码调用 AfterMatchSkipStrategy.skipToNext()。找到一个 a1 开始的最大匹配之后,跳过a1 开始的所有其他匹配,直接从下一个 a2 开始匹配起。当然 a2 也是如此跳过其他匹配。最终得到(a1 a2 a3 b),(a2 a3 b),(a3 b)。可以看到,这种跳过策略跟使用.greedy()效果是相同的。


-  跳过所有子匹配(SKIP_PAST_LAST_EVENT)

代码调用 AfterMatchSkipStrategy.skipPastLastEvent()。找到 a1 开始的匹配(a1 a2 a3 b)之后,直接跳过所有 a1 直到 a3 开头的匹配,相当于把这些子匹配都跳过了。最终得到(a1 a2 a3 b),这是最为精简的跳过策略。


- 跳至第一个(SKIP_TO_FIRST[a])

代码调用 AfterMatchSkipStrategy.skipToFirst(“a”),这里传入一个参数,指明跳至哪个模式的第一个匹配事件。找到 a1 开始的匹配(a1 a2 a3 b)后,跳到以最开始一个 a(也就是 a1)为开始的匹配,相当于只留下 a1 开始的匹配。最终得到(a1 a2 a3 b),(a1 a2 b),(a1 b)。


- 跳至最后一个(SKIP_TO_LAST[a])

代码调用 AfterMatchSkipStrategy.skipToLast(“a”),同样传入一个参数,指明跳至哪个模式的最后一个匹配事件。找到 a1 开始的匹配(a1 a2 a3 b)后,跳过所有 a1、a2 开始的匹配,跳到以最后一个 a(也就是 a3)为开始的匹配。最终得到(a1 a2 a3 b),(a3 b)。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
10月前
|
流计算
在Flink中,Regular Join(包括Left Join)的结果顺序是由Flink的分区策略和数据的分布方式共同决定的
在Flink中,Regular Join(包括Left Join)的结果顺序是由Flink的分区策略和数据的分布方式共同决定的
41 1
|
1月前
|
监控 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在进行全量数据初始化时,连接器一般会采用什么策略
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
数据处理 Apache 流计算
【Flink】Flink的CEP机制
【4月更文挑战第21天】【Flink】Flink的CEP机制
|
3月前
|
资源调度 监控 Java
实时计算 Flink版产品使用合集之如何使用CEP库进行数据处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
定位技术 流计算
在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
【2月更文挑战第12天】在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
59 3
|
3月前
|
SQL 消息中间件 Apache
flink问题之cep超时事件如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
3月前
|
存储 流计算
在Flink CDC中,Checkpoint的清理策略通常有两种设置方式
在Flink CDC中,Checkpoint的清理策略通常有两种设置方式
173 5
|
12月前
|
运维 监控 数据处理
Flink的正则表达式--CEP规则引擎
Flink的正则表达式--CEP规则引擎
|
Apache 流计算
Flink CEP 在抖音电商的业务实践|电商行业实践专栏上线
Flink-learning 学训平台和电商行业实践专栏来啦!
6450 0
Flink CEP 在抖音电商的业务实践|电商行业实践专栏上线
|
流计算
从Flink 重启策略机制能学习到什么?
最近在学习Flink ,在看到Flink的重启策略机制时感觉这个设计很好。
97 0