Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(四)CEP篇(二)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(四)CEP篇

限定子类型

调用.subtype()方法可以为当前模式增加子类型限制条件。例如:

pattern.subtype(SubEvent.class);

这里SubEvent是流中数据类型Event的子类型。这时,只有当事件是SubEvent类型时,才可以满足当前模式pattern的匹配条件。

简单条件(Simple Conditions)

简单条件是最简单的匹配规则,只根据当前事件的特征来决定是否接受它。这在本质上其实就是一个filter操作。代码中我们为.where()方法传入一个SimpleCondition的实例作为参数。SimpleCondition是表示“简单条件”的抽象类,内部有一个.filter()方法,唯一的参数就是当前事件。所以它可以当作FilterFunction来使用。下面是一个具体示例:

pattern.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return value.user.startsWith("A");
    }
});

迭代条件(Iterative Conditions)

简单条件只能基于当前事件做判断,能够处理的逻辑比较有限。在实际应用中,我们可能需要将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件。这种需要依靠之前事件来做判断的条件,就叫作“迭代条件”(Iterative Condition)。

在Flink CEP中,提供了IterativeCondition抽象类。这其实是更加通用的条件表达,查看源码可以发现,.where()方法本身要求的参数类型就是IterativeCondition;而之前的SimpleCondition是它的一个子类。在IterativeCondition中同样需要实现一个filter()方法,不过与SimpleCondition中不同的是,这个方法有两个参数:除了当前事件之外,还有一个上下文Context。调用这个上下文的.getEventsForPattern()方法,传入一个模式名称,就可以拿到这个模式中已匹配到的所有数据了。下面是一个具体示例:

middle.oneOrMore()
    .where(new IterativeCondition<Event>() {
        @Override
        public boolean filter(Event value, Context<Event> ctx) throws Exception {
        // 事件中的user必须以A开头
            if (!value.user.startsWith("A")) {
                return false;
            }
            int sum = value.amount;
            // 获取当前模式之前已经匹配的事件,求所有事件amount之和
            for (Event event : ctx.getEventsForPattern("middle")) {
                sum += event.amount;
            }
            // 在总数量小于100时,当前事件满足匹配规则,可以匹配成功
            return sum < 100;
        }
    });

组合条件(Combining Conditions)

独立定义多个条件,然后在外部把它们连接起来,就可以构成一个“组合条件”(Combining Condition)。最简单的组合条件,就是.where()后面再接一个.where()。因为前面提到过,一个条件就像是一个filter操作,所以每次调用.where()方法都相当于做了一次过滤,连续多次调用就表示多重过滤,最终匹配的事件自然就会同时满足所有条件。这相当于就是多个条件的“逻辑与”(AND)。而多个条件的逻辑或(OR),则可以通过.where()后加一个.or()来实现。

终止条件(Stop Conditions)

对于循环模式而言,还可以指定一个“终止条件”(Stop Condition),表示遇到某个特定事件时当前模式就不再继续循环匹配了。终止条件的定义是通过调用模式对象的.until()方法来实现的,同样传入一个IterativeCondition作为参数。需要注意的是,终止条件只与oneOrMore()或者oneOrMore().optional()结合使用。

3.2 组合模式

有了定义好的个体模式,就可以尝试按一定的顺序把它们连接起来,定义一个完整的复杂事件匹配规则了。这种将多个个体模式组合起来的完整模式,就叫作“组合模式”(Combining Pattern),为了跟个体模式区分有时也叫作“模式序列”(Pattern Sequence)。一个组合模式有以下形式:

Pattern<Event, ?> pattern = Pattern
.<Event>begin("start").where(...)
        .next("next").where(...)
        .followedBy("follow").where(...)
        ...

可以看到,组合模式确实就是一个“模式序列”,是用诸如begin、next、followedBy等表示先后顺序的“连接词”将个体模式串连起来得到的。

1. 初始模式(Initial Pattern)

所有的组合模式,都必须以一个“初始模式”开头;而初始模式必须通过调用Pattern的静态方法.begin()来创建。如下所示:Pattern<Event, ?> start = Pattern.begin("start"); 这里我们调用Pattern的.begin()方法创建了一个初始模式。传入的String类型的参数就是模式的名称;而begin方法需要传入一个类型参数,这就是模式要检测流中事件的基本类型,这里我们定义为Event。调用的结果返回一个Pattern的对象实例。

2. 近邻条件(Contiguity Conditions)

模式之间的组合是通过一些“连接词”方法实现的,这些连接词指明了先后事件之间有着怎样的近邻关系,这就是所谓的“近邻条件”(Contiguity Conditions,也叫“连续性条件”)。Flink CEP中提供了三种近邻关系:

严格近邻(Strict Contiguity)

匹配的事件严格地按顺序一个接一个出现,中间不会有任何其他事件。代码中对应的就是Pattern的.next()方法,名称上就能看出来,“下一个”自然就是紧挨着的。

宽松近邻(Relaxed Contiguity)

宽松近邻只关心事件发生的顺序,而放宽了对匹配事件的“距离”要求,也就是说两个匹配的事件之间可以有其他不匹配的事件出现。代码中对应.followedBy()方法,很明显这表示“跟在后面”就可以,不需要紧紧相邻。

640.png

非确定性宽松近邻(Non-Deterministic Relaxed Contiguity)

这种近邻关系更加宽松。所谓“非确定性”是指可以重复使用之前已经匹配过的事件;这种近邻条件下匹配到的不同复杂事件,可以以同一个事件作为开始,所以匹配结果一般会比宽松近邻更多。代码中对应.followedByAny()方法。

95b9c428f86e7ba70eedfed93e2fe935.png

3. 其他限制条件

除了上面提到的next()、followedBy()、followedByAny()可以分别表示三种近邻条件,我们还可以用否定的“连接词”来组合个体模式。主要包括:

.notNext()
表示前一个模式匹配到的事件后面,不能紧跟着某种事件。
.notFollowedBy()
表示前一个模式匹配到的事件后面,不会出现某种事件。这里需要注意,由于notFollowedBy()是没有严格限定的;流数据不停地到来,我们永远不能保证之后“不会出现某种事件”。所以一个模式序列不能以notFollowedBy()结尾,这个限定条件主要用来表示“两个事件中间不会出现某种事件”。

另外,Flink CEP中还可以为模式指定一个时间限制,这是通过调用.within()方法实现的。方法传入一个时间参数,这是模式序列中第一个事件到最后一个事件之间的最大时间间隔,只有在这期间成功匹配的复杂事件才是有效的。下面是模式序列中所有限制条件在代码中的定义:

// 严格近邻条件
Pattern<Event, ?> strict = start.next("middle").where(...);
// 宽松近邻条件
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
// 非确定性宽松近邻条件
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
// 不能严格近邻条件
Pattern<Event, ?> strictNot = start.notNext("not").where(...);
// 不能宽松近邻条件
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
// 时间限制条件
middle.within(Time.seconds(10));

4. 循环模式中的近邻条件

在循环模式中,近邻关系同样有三种:严格近邻、宽松近邻以及非确定性宽松近邻。对于定义了量词(如oneOrMore()、times())的循环模式,默认内部采用的是宽松近邻。也就是说,当循环匹配多个事件时,它们中间是可以有其他不匹配事件的;相当于用单例模式分别定义、再用followedBy()连接起来。

.consecutive()

为循环模式中的匹配事件增加严格的近邻条件,保证所有匹配事件是严格连续的。也就是说,一旦中间出现了不匹配的事件,当前循环检测就会终止。这起到的效果跟模式序列中的next()一样,需要与循环量词times()、oneOrMore()配合使用。于是,检测连续三次登录失败的代码可以改成:

// 1. 定义Pattern,登录失败事件,循环检测3次
Pattern<LoginEvent, LoginEvent> pattern = Pattern
        .<LoginEvent>begin("fails")
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return loginEvent.eventType.equals("fail");
            }
        }).times(3).consecutive();

这样显得更加简洁;而且即使要扩展到连续100次登录失败,也只需要改动一个参数而已。

.allowCombinations()

除严格近邻外,也可以为循环模式中的事件指定非确定性宽松近邻条件,表示可以重复使用已经匹配的事件。这需要调用.allowCombinations()方法来实现,实现的效果与.followedByAny()相同。

3.3 模式组

一般来说,代码中定义的模式序列,就是我们在业务逻辑中匹配复杂事件的规则。不过在有些非常复杂的场景中,可能需要划分多个“阶段”,每个“阶段”又有一连串的匹配规则。为了应对这样的需求,Flink CEP允许我们以“嵌套”的方式来定义模式。之前在模式序列中,我们用begin()、next()、followedBy()、followedByAny()这样的“连接词”来组合个体模式,这些方法的参数就是一个个体模式的名称;而现在它们可以直接以一个模式序列作为参数,就将模式序列又一次连接组合起来了。这样得到的就是一个“模式组”(Groups of Patterns)。


四、模式的检测处理

利用Pattern API定义好模式还只是整个复杂事件处理的第一步,接下来还需要将模式应用到事件流上、检测提取匹配的复杂事件并定义处理转换的方法,最终得到想要的输出信息。

4.1 将模式应用到流上

将模式应用到事件流上的代码非常简单,只要调用CEP类的静态方法.pattern(),将数据流(DataStream)和模式(Pattern)作为两个参数传入就可以了。最终得到的是一个PatternStream:

DataStream<Event> inputStream = ...
Pattern<Event, ?> pattern = ...
PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);

这里的DataStream,也可以通过keyBy进行按键分区得到KeyedStream,接下来对复杂事件的检测就会针对不同的key单独进行了。模式中定义的复杂事件,发生是有先后顺序的,这里“先后”的判断标准取决于具体的时间语义。默认情况下采用事件时间语义,那么事件会以各自的时间戳进行排序;如果是处理时间语义,那么所谓先后就是数据到达的顺序。

4.2 处理匹配事件

基于PatternStream可以调用一些转换方法,对匹配的复杂事件进行检测和处理,并最终得到一个正常的DataStream。PatternStream的转换操作主要可以分成两种:简单的选择提取(select)操作,和更加通用的处理(process)操作。与DataStream的转换类似,具体实现也是在调用API时传入一个函数类:选择操作传入的是一个PatternSelectFunction,处理操作传入的则是一个PatternProcessFunction。

1. 匹配事件的选择提取(select)

处理匹配事件最简单的方式,就是从PatternStream中直接把匹配的复杂事件提取出来,包装成想要的信息输出,这个操作就是“选择”(select)。

PatternSelectFunction

代码中基于PatternStream直接调用.select()方法,传入一个PatternSelectFunction作为参数。

PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);
DataStream<String> result = patternStream.select(new MyPatternSelectFunction());

这里的MyPatternSelectFunction是PatternSelectFunction的一个具体实现。PatternSelectFunction是Flink CEP提供的一个函数类接口,它会将检测到的匹配事件保存在一个Map里,对应的key就是这些事件的名称。这里的“事件名称”就对应着在模式中定义的每个个体模式的名称;而个体模式可以是循环模式,一个名称会对应多个事件,所以最终保存在Map里的value就是一个事件的列表(List)。下面是MyPatternSelectFunction的一个具体实现:

class MyPatternSelectFunction implements PatternSelectFunction<Event, String>{ 
@Override
    public String select(Map<String, List<Event>> pattern) throws Exception {
        Event startEvent = pattern.get("start").get(0);
        Event middleEvent = pattern.get("middle").get(0);
        return startEvent.toString() + " " + middleEvent.toString();
    }
}

可以通过名称从Map中选择提取出对应的事件。注意调用Map的.get(key)方法后得到的是一个事件的List;如果个体模式是单例的,那么List中只有一个元素,直接调用.get(0)就可以把它取出。当然,如果个体模式是循环的,List中就有可能有多个元素了。例如我们对连续登录失败检测的改进,可以将匹配到的事件包装成String类型的报警信息输出,代码如下:

// 1. 定义Pattern,登录失败事件,循环检测3次
Pattern<LoginEvent, LoginEvent> pattern = Pattern
        .<LoginEvent>begin("fails")
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return loginEvent.eventType.equals("fail");
            }
        }).times(3).consecutive();
// 2. 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern);
// 3. 将匹配到的复杂事件选择出来,然后包装成报警信息输出
patternStream
        .select(new PatternSelectFunction<LoginEvent, String>() {
            @Override
            public String select(Map<String, List<LoginEvent>> map) throws Exception {
// 只有一个模式,匹配到了3个事件,放在List中
                LoginEvent first = map.get("fails").get(0);
                LoginEvent second = map.get("fails").get(1);
                LoginEvent third = map.get("fails").get(2);
                return first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp;
            }
        })
        .print("warning");

PatternFlatSelectFunction

除此之外,PatternStream还有一个类似的方法是.flatSelect(),传入的参数是一个PatternFlatSelectFunction。从名字上就能看出,这是PatternSelectFunction的“扁平化”版本;内部需要实现一个flatSelect()方法,它与之前select()的不同就在于没有返回值,而是多了一个收集器(Collector)参数out,通过调用out.collet()方法就可以实现多次发送输出数据了。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
4月前
|
机器学习/深度学习 运维 监控
实时异常检测实战:Flink+PAI 算法模型服务化架构设计
本文深入探讨了基于 Apache Flink 与阿里云 PAI 构建的实时异常检测系统。内容涵盖技术演进、架构设计、核心模块实现及金融、工业等多领域实战案例,解析流处理、模型服务化、状态管理等关键技术,并提供性能优化与高可用方案,助力企业打造高效智能的实时异常检测平台。
346 1
|
6月前
|
SQL 运维 Java
蚂蚁 Flink 实时计算编译任务 Koupleless 架构改造
本文介绍了对Flink实时计算编译任务的Koupleless架构改造。为解决进程模型带来的响应慢、资源消耗大等问题,团队将进程模型改为线程模型,并借助Koupleless的类加载隔离能力实现版本和包的隔离。通过动态装配Plugin及其Classpath,以及Biz运行时仅对依赖Plugin可见的设计,大幅优化了编译任务的性能。结果表明,新架构使编译耗时降低50%,吞吐量提升5倍以上。
蚂蚁 Flink 实时计算编译任务 Koupleless 架构改造
|
8月前
|
SQL 消息中间件 Kafka
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
本文介绍了阿里云实时数仓Hologres负责人姜伟华在Flink Forward Asia 2024上的分享,涵盖实时数仓的发展历程、从实时数仓到实时湖仓的演进,以及总结。文章通过三代实时数仓架构的演变,详细解析了Lambda架构、Kafka实时数仓分层+OLAP、Hologres实时数仓分层复用等方案,并探讨了未来从实时数仓到实时湖仓的演进方向。最后,结合实际案例和Demo展示了Hologres + Flink + Paimon在实时湖仓中的应用,帮助用户根据业务需求选择合适的方案。
1230 20
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
|
7月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
探索Flink动态CEP:杭州银行的实战案例
232 5
|
7月前
|
SQL 消息中间件 Serverless
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
188 4
|
13天前
|
Cloud Native Serverless API
微服务架构实战指南:从单体应用到云原生的蜕变之路
🌟蒋星熠Jaxonic,代码为舟的星际旅人。深耕微服务架构,擅以DDD拆分服务、构建高可用通信与治理体系。分享从单体到云原生的实战经验,探索技术演进的无限可能。
微服务架构实战指南:从单体应用到云原生的蜕变之路
|
3月前
|
缓存 Cloud Native Java
Java 面试微服务架构与云原生技术实操内容及核心考点梳理 Java 面试
本内容涵盖Java面试核心技术实操,包括微服务架构(Spring Cloud Alibaba)、响应式编程(WebFlux)、容器化(Docker+K8s)、函数式编程、多级缓存、分库分表、链路追踪(Skywalking)等大厂高频考点,助你系统提升面试能力。
156 0
|
10月前
|
弹性计算 API 持续交付
后端服务架构的微服务化转型
本文旨在探讨后端服务从单体架构向微服务架构转型的过程,分析微服务架构的优势和面临的挑战。文章首先介绍单体架构的局限性,然后详细阐述微服务架构的核心概念及其在现代软件开发中的应用。通过对比两种架构,指出微服务化转型的必要性和实施策略。最后,讨论了微服务架构实施过程中可能遇到的问题及解决方案。
|
11月前
|
Cloud Native Devops 云计算
云计算的未来:云原生架构与微服务的革命####
【10月更文挑战第21天】 随着企业数字化转型的加速,云原生技术正迅速成为IT行业的新宠。本文深入探讨了云原生架构的核心理念、关键技术如容器化和微服务的优势,以及如何通过这些技术实现高效、灵活且可扩展的现代应用开发。我们将揭示云原生如何重塑软件开发流程,提升业务敏捷性,并探索其对企业IT架构的深远影响。 ####
267 3

热门文章

最新文章