Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(四)CEP篇(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(四)CEP篇

限定子类型

调用.subtype()方法可以为当前模式增加子类型限制条件。例如:

pattern.subtype(SubEvent.class);

这里SubEvent是流中数据类型Event的子类型。这时,只有当事件是SubEvent类型时,才可以满足当前模式pattern的匹配条件。

简单条件(Simple Conditions)

简单条件是最简单的匹配规则,只根据当前事件的特征来决定是否接受它。这在本质上其实就是一个filter操作。代码中我们为.where()方法传入一个SimpleCondition的实例作为参数。SimpleCondition是表示“简单条件”的抽象类,内部有一个.filter()方法,唯一的参数就是当前事件。所以它可以当作FilterFunction来使用。下面是一个具体示例:

pattern.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return value.user.startsWith("A");
    }
});

迭代条件(Iterative Conditions)

简单条件只能基于当前事件做判断,能够处理的逻辑比较有限。在实际应用中,我们可能需要将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件。这种需要依靠之前事件来做判断的条件,就叫作“迭代条件”(Iterative Condition)。

在Flink CEP中,提供了IterativeCondition抽象类。这其实是更加通用的条件表达,查看源码可以发现,.where()方法本身要求的参数类型就是IterativeCondition;而之前的SimpleCondition是它的一个子类。在IterativeCondition中同样需要实现一个filter()方法,不过与SimpleCondition中不同的是,这个方法有两个参数:除了当前事件之外,还有一个上下文Context。调用这个上下文的.getEventsForPattern()方法,传入一个模式名称,就可以拿到这个模式中已匹配到的所有数据了。下面是一个具体示例:

middle.oneOrMore()
    .where(new IterativeCondition<Event>() {
        @Override
        public boolean filter(Event value, Context<Event> ctx) throws Exception {
        // 事件中的user必须以A开头
            if (!value.user.startsWith("A")) {
                return false;
            }
            int sum = value.amount;
            // 获取当前模式之前已经匹配的事件,求所有事件amount之和
            for (Event event : ctx.getEventsForPattern("middle")) {
                sum += event.amount;
            }
            // 在总数量小于100时,当前事件满足匹配规则,可以匹配成功
            return sum < 100;
        }
    });

组合条件(Combining Conditions)

独立定义多个条件,然后在外部把它们连接起来,就可以构成一个“组合条件”(Combining Condition)。最简单的组合条件,就是.where()后面再接一个.where()。因为前面提到过,一个条件就像是一个filter操作,所以每次调用.where()方法都相当于做了一次过滤,连续多次调用就表示多重过滤,最终匹配的事件自然就会同时满足所有条件。这相当于就是多个条件的“逻辑与”(AND)。而多个条件的逻辑或(OR),则可以通过.where()后加一个.or()来实现。

终止条件(Stop Conditions)

对于循环模式而言,还可以指定一个“终止条件”(Stop Condition),表示遇到某个特定事件时当前模式就不再继续循环匹配了。终止条件的定义是通过调用模式对象的.until()方法来实现的,同样传入一个IterativeCondition作为参数。需要注意的是,终止条件只与oneOrMore()或者oneOrMore().optional()结合使用。

3.2 组合模式

有了定义好的个体模式,就可以尝试按一定的顺序把它们连接起来,定义一个完整的复杂事件匹配规则了。这种将多个个体模式组合起来的完整模式,就叫作“组合模式”(Combining Pattern),为了跟个体模式区分有时也叫作“模式序列”(Pattern Sequence)。一个组合模式有以下形式:

Pattern<Event, ?> pattern = Pattern
.<Event>begin("start").where(...)
        .next("next").where(...)
        .followedBy("follow").where(...)
        ...

可以看到,组合模式确实就是一个“模式序列”,是用诸如begin、next、followedBy等表示先后顺序的“连接词”将个体模式串连起来得到的。

1. 初始模式(Initial Pattern)

所有的组合模式,都必须以一个“初始模式”开头;而初始模式必须通过调用Pattern的静态方法.begin()来创建。如下所示:Pattern<Event, ?> start = Pattern.begin("start"); 这里我们调用Pattern的.begin()方法创建了一个初始模式。传入的String类型的参数就是模式的名称;而begin方法需要传入一个类型参数,这就是模式要检测流中事件的基本类型,这里我们定义为Event。调用的结果返回一个Pattern的对象实例。

2. 近邻条件(Contiguity Conditions)

模式之间的组合是通过一些“连接词”方法实现的,这些连接词指明了先后事件之间有着怎样的近邻关系,这就是所谓的“近邻条件”(Contiguity Conditions,也叫“连续性条件”)。Flink CEP中提供了三种近邻关系:

严格近邻(Strict Contiguity)

匹配的事件严格地按顺序一个接一个出现,中间不会有任何其他事件。代码中对应的就是Pattern的.next()方法,名称上就能看出来,“下一个”自然就是紧挨着的。

宽松近邻(Relaxed Contiguity)

宽松近邻只关心事件发生的顺序,而放宽了对匹配事件的“距离”要求,也就是说两个匹配的事件之间可以有其他不匹配的事件出现。代码中对应.followedBy()方法,很明显这表示“跟在后面”就可以,不需要紧紧相邻。

640.png

非确定性宽松近邻(Non-Deterministic Relaxed Contiguity)

这种近邻关系更加宽松。所谓“非确定性”是指可以重复使用之前已经匹配过的事件;这种近邻条件下匹配到的不同复杂事件,可以以同一个事件作为开始,所以匹配结果一般会比宽松近邻更多。代码中对应.followedByAny()方法。

95b9c428f86e7ba70eedfed93e2fe935.png

3. 其他限制条件

除了上面提到的next()、followedBy()、followedByAny()可以分别表示三种近邻条件,我们还可以用否定的“连接词”来组合个体模式。主要包括:

.notNext()
表示前一个模式匹配到的事件后面,不能紧跟着某种事件。
.notFollowedBy()
表示前一个模式匹配到的事件后面,不会出现某种事件。这里需要注意,由于notFollowedBy()是没有严格限定的;流数据不停地到来,我们永远不能保证之后“不会出现某种事件”。所以一个模式序列不能以notFollowedBy()结尾,这个限定条件主要用来表示“两个事件中间不会出现某种事件”。

另外,Flink CEP中还可以为模式指定一个时间限制,这是通过调用.within()方法实现的。方法传入一个时间参数,这是模式序列中第一个事件到最后一个事件之间的最大时间间隔,只有在这期间成功匹配的复杂事件才是有效的。下面是模式序列中所有限制条件在代码中的定义:

// 严格近邻条件
Pattern<Event, ?> strict = start.next("middle").where(...);
// 宽松近邻条件
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
// 非确定性宽松近邻条件
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
// 不能严格近邻条件
Pattern<Event, ?> strictNot = start.notNext("not").where(...);
// 不能宽松近邻条件
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
// 时间限制条件
middle.within(Time.seconds(10));

4. 循环模式中的近邻条件

在循环模式中,近邻关系同样有三种:严格近邻、宽松近邻以及非确定性宽松近邻。对于定义了量词(如oneOrMore()、times())的循环模式,默认内部采用的是宽松近邻。也就是说,当循环匹配多个事件时,它们中间是可以有其他不匹配事件的;相当于用单例模式分别定义、再用followedBy()连接起来。

.consecutive()

为循环模式中的匹配事件增加严格的近邻条件,保证所有匹配事件是严格连续的。也就是说,一旦中间出现了不匹配的事件,当前循环检测就会终止。这起到的效果跟模式序列中的next()一样,需要与循环量词times()、oneOrMore()配合使用。于是,检测连续三次登录失败的代码可以改成:

// 1. 定义Pattern,登录失败事件,循环检测3次
Pattern<LoginEvent, LoginEvent> pattern = Pattern
        .<LoginEvent>begin("fails")
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return loginEvent.eventType.equals("fail");
            }
        }).times(3).consecutive();

这样显得更加简洁;而且即使要扩展到连续100次登录失败,也只需要改动一个参数而已。

.allowCombinations()

除严格近邻外,也可以为循环模式中的事件指定非确定性宽松近邻条件,表示可以重复使用已经匹配的事件。这需要调用.allowCombinations()方法来实现,实现的效果与.followedByAny()相同。

3.3 模式组

一般来说,代码中定义的模式序列,就是我们在业务逻辑中匹配复杂事件的规则。不过在有些非常复杂的场景中,可能需要划分多个“阶段”,每个“阶段”又有一连串的匹配规则。为了应对这样的需求,Flink CEP允许我们以“嵌套”的方式来定义模式。之前在模式序列中,我们用begin()、next()、followedBy()、followedByAny()这样的“连接词”来组合个体模式,这些方法的参数就是一个个体模式的名称;而现在它们可以直接以一个模式序列作为参数,就将模式序列又一次连接组合起来了。这样得到的就是一个“模式组”(Groups of Patterns)。


四、模式的检测处理

利用Pattern API定义好模式还只是整个复杂事件处理的第一步,接下来还需要将模式应用到事件流上、检测提取匹配的复杂事件并定义处理转换的方法,最终得到想要的输出信息。

4.1 将模式应用到流上

将模式应用到事件流上的代码非常简单,只要调用CEP类的静态方法.pattern(),将数据流(DataStream)和模式(Pattern)作为两个参数传入就可以了。最终得到的是一个PatternStream:

DataStream<Event> inputStream = ...
Pattern<Event, ?> pattern = ...
PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);

这里的DataStream,也可以通过keyBy进行按键分区得到KeyedStream,接下来对复杂事件的检测就会针对不同的key单独进行了。模式中定义的复杂事件,发生是有先后顺序的,这里“先后”的判断标准取决于具体的时间语义。默认情况下采用事件时间语义,那么事件会以各自的时间戳进行排序;如果是处理时间语义,那么所谓先后就是数据到达的顺序。

4.2 处理匹配事件

基于PatternStream可以调用一些转换方法,对匹配的复杂事件进行检测和处理,并最终得到一个正常的DataStream。PatternStream的转换操作主要可以分成两种:简单的选择提取(select)操作,和更加通用的处理(process)操作。与DataStream的转换类似,具体实现也是在调用API时传入一个函数类:选择操作传入的是一个PatternSelectFunction,处理操作传入的则是一个PatternProcessFunction。

1. 匹配事件的选择提取(select)

处理匹配事件最简单的方式,就是从PatternStream中直接把匹配的复杂事件提取出来,包装成想要的信息输出,这个操作就是“选择”(select)。

PatternSelectFunction

代码中基于PatternStream直接调用.select()方法,传入一个PatternSelectFunction作为参数。

PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);
DataStream<String> result = patternStream.select(new MyPatternSelectFunction());

这里的MyPatternSelectFunction是PatternSelectFunction的一个具体实现。PatternSelectFunction是Flink CEP提供的一个函数类接口,它会将检测到的匹配事件保存在一个Map里,对应的key就是这些事件的名称。这里的“事件名称”就对应着在模式中定义的每个个体模式的名称;而个体模式可以是循环模式,一个名称会对应多个事件,所以最终保存在Map里的value就是一个事件的列表(List)。下面是MyPatternSelectFunction的一个具体实现:

class MyPatternSelectFunction implements PatternSelectFunction<Event, String>{ 
@Override
    public String select(Map<String, List<Event>> pattern) throws Exception {
        Event startEvent = pattern.get("start").get(0);
        Event middleEvent = pattern.get("middle").get(0);
        return startEvent.toString() + " " + middleEvent.toString();
    }
}

可以通过名称从Map中选择提取出对应的事件。注意调用Map的.get(key)方法后得到的是一个事件的List;如果个体模式是单例的,那么List中只有一个元素,直接调用.get(0)就可以把它取出。当然,如果个体模式是循环的,List中就有可能有多个元素了。例如我们对连续登录失败检测的改进,可以将匹配到的事件包装成String类型的报警信息输出,代码如下:

// 1. 定义Pattern,登录失败事件,循环检测3次
Pattern<LoginEvent, LoginEvent> pattern = Pattern
        .<LoginEvent>begin("fails")
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return loginEvent.eventType.equals("fail");
            }
        }).times(3).consecutive();
// 2. 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern);
// 3. 将匹配到的复杂事件选择出来,然后包装成报警信息输出
patternStream
        .select(new PatternSelectFunction<LoginEvent, String>() {
            @Override
            public String select(Map<String, List<LoginEvent>> map) throws Exception {
// 只有一个模式,匹配到了3个事件,放在List中
                LoginEvent first = map.get("fails").get(0);
                LoginEvent second = map.get("fails").get(1);
                LoginEvent third = map.get("fails").get(2);
                return first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp;
            }
        })
        .print("warning");

PatternFlatSelectFunction

除此之外,PatternStream还有一个类似的方法是.flatSelect(),传入的参数是一个PatternFlatSelectFunction。从名字上就能看出,这是PatternSelectFunction的“扁平化”版本;内部需要实现一个flatSelect()方法,它与之前select()的不同就在于没有返回值,而是多了一个收集器(Collector)参数out,通过调用out.collet()方法就可以实现多次发送输出数据了。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
存储 Cloud Native 数据处理
Flink 2.0 状态管理存算分离架构演进
本文整理自阿里云智能 Flink 存储引擎团队负责人梅源在 Flink Forward Asia 2023 的分享,梅源结合阿里内部的实践,分享了状态管理的演进和 Flink 2.0 存算分离架构的选型。
1025 1
Flink 2.0 状态管理存算分离架构演进
|
2月前
|
数据处理 Apache 流计算
【Flink】Flink的CEP机制
【4月更文挑战第21天】【Flink】Flink的CEP机制
|
2月前
|
资源调度 监控 Java
实时计算 Flink版产品使用合集之如何使用CEP库进行数据处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
分布式计算 API 数据处理
Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
【2月更文挑战第15天】Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
101 1
|
2月前
|
SQL API 数据处理
新一代实时数据集成框架 Flink CDC 3.0 —— 核心技术架构解析
本文整理自阿里云开源大数据平台吕宴全关于新一代实时数据集成框架 Flink CDC 3.0 的核心技术架构解析。
1054 0
新一代实时数据集成框架 Flink CDC 3.0 —— 核心技术架构解析
|
2月前
|
定位技术 流计算
在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
【2月更文挑战第12天】在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
57 3
|
2月前
|
SQL 消息中间件 Apache
flink问题之cep超时事件如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
Java API 调度
Flink 原理与实现:架构和拓扑概览
## 架构 要了解一个系统,一般都是从架构开始。我们关心的问题是:系统部署成功后各个节点都启动了哪些服务,各个服务之间又是怎么交互和协调的。下方是 Flink 集群启动后架构图。 ![](http://img3.tbcdn.cn/5476e8b07b923/TB1ObBnJFXXXXXt
7981 0
|
28天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
923 0
|
28天前
|
存储 SQL 关系型数据库
实时计算 Flink版操作报错合集之按时间恢复时,报错:在尝试读取binlog时发现所需的binlog位置不再可用,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
718 0