Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(四)CEP篇(一)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(四)CEP篇

一、基本概念

1.1 CEP是什么

哈喽各位!这个是Flink1.13最后一篇了,接下来会给各位小伙伴们分享一些关于数据治理以及数仓方面的内容了!敬请期待!!!!好了,进入正题了哈!!!!

所谓CEP,其实就是“复杂事件处理(Complex Event Processing)”的缩写;而Flink CEP,就是Flink实现的一个用于复杂事件处理的库(library)。复杂事件处理具体的过程是,把事件流中的一个个简单事件,通过一定的规则匹配组合起来,这就是“复杂事件”;然后基于这些满足规则的一组组复杂事件进行转换处理,得到想要的结果进行输出。总结起来,复杂事件处理(CEP)的流程可以分成三个步骤:

(1)定义一个匹配规则
(2)将匹配规则应用到事件流上,检测满足规则的复杂事件
(3)对检测到的复杂事件进行处理,得到结果进行输出

640.png

所以,CEP是针对流处理而言的,分析的是低延迟、频繁产生的事件流。它的主要目的,就是在无界流中检测出特定的数据组合,让我们有机会掌握数据中重要的高阶特征。

1.2模式(Pattern)

CEP的第一步所定义的匹配规则,我们可以把它叫作“模式”(Pattern)。模式的定义主要就是两部分内容:

每个简单事件的特征
简单事件之间的组合关系

事件的组合关系,可以定义严格的近邻关系,也就是两个事件之前不能有任何其他事件;也可以定义宽松的近邻关系,即只要前后顺序正确即可,中间可以有其他事件。另外,还可以反向定义,也就是“谁后面不能跟着谁”。

CEP做的事其实就是在流上进行模式匹配。根据模式的近邻关系条件不同,可以检测连续的事件或不连续但先后发生的事件;模式还可能有时间的限制,如果在设定时间范围内没有满足匹配条件,就会导致模式匹配超时(timeout)。

Flink CEP为我们提供了丰富的API,可以实现上面关于模式的所有功能,这套API就叫作“模式API”(Pattern API)。

1.3 应用场景

CEP的应用场景非常丰富。很多大数据框架,如Spark、Samza、Beam等都提供了不同的CEP解决方案,但没有专门的库(library)。而Flink提供了专门的CEP库用于复杂事件处理,可以说是目前CEP的最佳解决方案。

风险控制

设定一些行为模式,可以对用户的异常行为进行实时检测。当一个用户行为符合了异常行为模式,比如短时间内频繁登录并失败,就可以向用户发送通知信息,或是进行报警提示。

用户画像

利用CEP可以用预先定义好的规则,对用户的行为轨迹进行实时跟踪,从而检测出具有特定行为习惯的一些用户,做出相应的用户画像。

运维监控

对于企业服务的运维管理,可以利用CEP灵活配置多指标、多依赖来实现更复杂的监控模式。

二、快速上手

2.1 需要引入的依赖

想要在代码中使用Flink CEP,需要在项目的pom文件中添加相关依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

如果想要在Flink集群中提交运行CEP作业,应该将依赖的jar包放在/lib目录下。

2.2 一个简单实例

接下来我们考虑一个具体的需求:检测用户行为,如果连续三次登录失败,就输出报警信息。很显然,这是一个复杂事件的检测处理,我们可以使用Flink CEP来实现。我们首先定义数据的类型。这里的用户行为不再是之前的访问事件Event了,所以应该单独定义一个登录事件POJO类。具体实现如下:

public class LoginEvent {
    public String userId;
    public String ipAddress;
    public String eventType;
    public Long timestamp;
    public LoginEvent(String userId, String ipAddress, String eventType, Long timestamp) {
        this.userId = userId;
        this.ipAddress = ipAddress;
        this.eventType = eventType;
        this.timestamp = timestamp;
    }
    public LoginEvent() {}
    @Override
    public String toString() {
        return "LoginEvent{" +
            "userId='" + userId + '\'' +
            ", ipAddress='" + ipAddress + '\'' +
            ", eventType='" + eventType + '\'' +
            ", timestamp=" + timestamp +
            '}';
    }
}

具体代码实现如下:

public class LoginFailDetect {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 获取登录事件流,并提取时间戳、生成水位线
        KeyedStream<LoginEvent, String> stream = env
            .fromElements(
                new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
new LoginEvent("user_2", "192.168.1.29", "success", 6000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
            )
            .assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forMonotonousTimestamps()
                .withTimestampAssigner(
                    new SerializableTimestampAssigner<LoginEvent>() {
                        @Override
                        public long extractTimestamp(LoginEvent loginEvent, long l) {
                            return loginEvent.timestamp;
                        }
                    }
                )
            )
            .keyBy(r -> r.userId);
        // 1. 定义Pattern,连续的三个登录失败事件
        Pattern<LoginEvent, LoginEvent> pattern = Pattern
            .<LoginEvent>begin("first")    // 以第一个登录失败事件开始
            .where(new SimpleCondition<LoginEvent>() {
                @Override
                public boolean filter(LoginEvent loginEvent) throws Exception {
                    return loginEvent.eventType.equals("fail");
                }
            })
            .next("second")    // 接着是第二个登录失败事件
            .where(new SimpleCondition<LoginEvent>() {
                @Override
                public boolean filter(LoginEvent loginEvent) throws Exception {
                    return loginEvent.eventType.equals("fail");
                }
            })
            .next("third")     // 接着是第三个登录失败事件
            .where(new SimpleCondition<LoginEvent>() {
                @Override
                public boolean filter(LoginEvent loginEvent) throws Exception {
                    return loginEvent.eventType.equals("fail");
                }
            });
        // 2. 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern);
        // 3. 将匹配到的复杂事件选择出来,然后包装成字符串报警信息输出
        patternStream
            .select(new PatternSelectFunction<LoginEvent, String>() {
                @Override
                public String select(Map<String, List<LoginEvent>> map) throws Exception {
                    LoginEvent first = map.get("first").get(0);
                    LoginEvent second = map.get("second").get(0);
                    LoginEvent third = map.get("third").get(0);
                    return first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp;
                }
            })
            .print("warning");
        env.execute();
    }
}

三、模式API(Pattern API)

Flink CEP的核心是复杂事件的模式匹配。Flink CEP库中提供了Pattern类,基于它可以调用一系列方法来定义匹配模式,这就是所谓的模式API(Pattern API)。

3.1 个体模式

模式(Pattern)其实就是将一组简单事件组合成复杂事件的“匹配规则”。由于流中事件的匹配是有先后顺序的,因此一个匹配规则就可以表达成先后发生的一个个简单事件,按顺序串联组合在一起。这里的每一个简单事件并不是任意选取的,也需要有一定的条件规则;所以我们就把每个简单事件的匹配规则,叫作“个体模式”(Individual Pattern)。

1. 基本形式

每一个登录失败事件的选取规则,就都是一个个体模式。比如:

.<LoginEvent>begin("first")    // 以第一个登录失败事件开始
    .where(new SimpleCondition<LoginEvent>() {
        @Override
        public boolean filter(LoginEvent loginEvent) throws Exception {
            return loginEvent.eventType.equals("fail");
        }
    })
或者:
.next("second")    // 接着是第二个登录失败事件
    .where(new SimpleCondition<LoginEvent>() {
        @Override
        public boolean filter(LoginEvent loginEvent) throws Exception {
            return loginEvent.eventType.equals("fail");
        }
    })

这些都是个体模式。个体模式一般都会匹配接收一个事件。每个个体模式都以一个“连接词”开始定义的,比如begin、next等等,这是Pattern对象的一个方法(begin是Pattern类的静态方法),返回的还是一个Pattern。这些“连接词”方法有一个String类型参数,这就是当前个体模式唯一的名字,比如这里的“first”、“second”。在之后检测到匹配事件时,就会以这个名字来指代匹配事件。

2. 量词(Quantifiers)

个体模式后面可以跟一个“量词”,用来指定循环的次数。从这个角度分类,个体模式可以包括“单例(singleton)模式”和“循环(looping)模式”。默认情况下,个体模式是单例模式,匹配接收一个事件;当定义了量词之后,就变成了循环模式,可以匹配接收多个事件。在Flink CEP中,可以使用不同的方法指定循环模式,主要有:

.oneOrMore()
匹配事件出现一次或多次,假设a是一个个体模式,a.oneOrMore()表示可以匹配1个或多个a的事件组合。我们有时会用a+来简单表示。
.times(times)
匹配事件发生特定次数(times),例如a.times(3)表示aaa;
.times(fromTimes,toTimes)
指定匹配事件出现的次数范围,最小次数为fromTimes,最大次数为toTimes。例如a.times(2, 4)可以匹配aa,aaa和aaaa。
.greedy()
只能用在循环模式后,使当前循环模式变得“贪心”(greedy),也就是总是尽可能多地去匹配。例如a.times(2, 4).greedy(),如果出现了连续4个a,那么会直接把aaaa检测出来进行处理,其他任意2个a是不算匹配事件的。
.optional()
使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足。
对于一个个体模式pattern来说,后面所有可以添加的量词如下:
// 匹配事件出现4次
pattern.times(4);
// 匹配事件出现4次,或者不出现
pattern.times(4).optional();
// 匹配事件出现2, 3 或者4次
pattern.times(2, 4);
// 匹配事件出现2, 3 或者4次,并且尽可能多地匹配
pattern.times(2, 4).greedy();
// 匹配事件出现2, 3, 4次,或者不出现
pattern.times(2, 4).optional();
// 匹配事件出现2, 3, 4次,或者不出现;并且尽可能多地匹配
pattern.times(2, 4).optional().greedy();
// 匹配事件出现1次或多次
pattern.oneOrMore();
// 匹配事件出现1次或多次,并且尽可能多地匹配
pattern.oneOrMore().greedy();
// 匹配事件出现1次或多次,或者不出现
pattern.oneOrMore().optional();
// 匹配事件出现1次或多次,或者不出现;并且尽可能多地匹配
pattern.oneOrMore().optional().greedy();
// 匹配事件出现2次或多次
pattern.timesOrMore(2);
// 匹配事件出现2次或多次,并且尽可能多地匹配
pattern.timesOrMore(2).greedy();
// 匹配事件出现2次或多次,或者不出现
pattern.timesOrMore(2).optional()
// 匹配事件出现2次或多次,或者不出现;并且尽可能多地匹配
pattern.timesOrMore(2).optional().greedy();

3. 条件(Conditions)

对于每个个体模式,匹配事件的核心在于定义匹配条件,也就是选取事件的规则。Flink CEP会按照这个规则对流中的事件进行筛选,判断是否接受当前的事件。对于条件的定义,主要是通过调用Pattern对象的.where()方法来实现的,主要可以分为简单条件、迭代条件、复合条件、终止条件几种类型。此外,也可以调用Pattern对象的.subtype()方法来限定匹配事件的子类型。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
4月前
|
机器学习/深度学习 运维 监控
实时异常检测实战:Flink+PAI 算法模型服务化架构设计
本文深入探讨了基于 Apache Flink 与阿里云 PAI 构建的实时异常检测系统。内容涵盖技术演进、架构设计、核心模块实现及金融、工业等多领域实战案例,解析流处理、模型服务化、状态管理等关键技术,并提供性能优化与高可用方案,助力企业打造高效智能的实时异常检测平台。
346 1
|
6月前
|
SQL 运维 Java
蚂蚁 Flink 实时计算编译任务 Koupleless 架构改造
本文介绍了对Flink实时计算编译任务的Koupleless架构改造。为解决进程模型带来的响应慢、资源消耗大等问题,团队将进程模型改为线程模型,并借助Koupleless的类加载隔离能力实现版本和包的隔离。通过动态装配Plugin及其Classpath,以及Biz运行时仅对依赖Plugin可见的设计,大幅优化了编译任务的性能。结果表明,新架构使编译耗时降低50%,吞吐量提升5倍以上。
蚂蚁 Flink 实时计算编译任务 Koupleless 架构改造
|
8月前
|
SQL 消息中间件 Kafka
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
本文介绍了阿里云实时数仓Hologres负责人姜伟华在Flink Forward Asia 2024上的分享,涵盖实时数仓的发展历程、从实时数仓到实时湖仓的演进,以及总结。文章通过三代实时数仓架构的演变,详细解析了Lambda架构、Kafka实时数仓分层+OLAP、Hologres实时数仓分层复用等方案,并探讨了未来从实时数仓到实时湖仓的演进方向。最后,结合实际案例和Demo展示了Hologres + Flink + Paimon在实时湖仓中的应用,帮助用户根据业务需求选择合适的方案。
1230 20
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
|
7月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
探索Flink动态CEP:杭州银行的实战案例
232 5
|
7月前
|
SQL 消息中间件 Serverless
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
188 4
|
13天前
|
Cloud Native Serverless API
微服务架构实战指南:从单体应用到云原生的蜕变之路
🌟蒋星熠Jaxonic,代码为舟的星际旅人。深耕微服务架构,擅以DDD拆分服务、构建高可用通信与治理体系。分享从单体到云原生的实战经验,探索技术演进的无限可能。
微服务架构实战指南:从单体应用到云原生的蜕变之路
|
3月前
|
缓存 Cloud Native Java
Java 面试微服务架构与云原生技术实操内容及核心考点梳理 Java 面试
本内容涵盖Java面试核心技术实操,包括微服务架构(Spring Cloud Alibaba)、响应式编程(WebFlux)、容器化(Docker+K8s)、函数式编程、多级缓存、分库分表、链路追踪(Skywalking)等大厂高频考点,助你系统提升面试能力。
156 0
|
10月前
|
弹性计算 API 持续交付
后端服务架构的微服务化转型
本文旨在探讨后端服务从单体架构向微服务架构转型的过程,分析微服务架构的优势和面临的挑战。文章首先介绍单体架构的局限性,然后详细阐述微服务架构的核心概念及其在现代软件开发中的应用。通过对比两种架构,指出微服务化转型的必要性和实施策略。最后,讨论了微服务架构实施过程中可能遇到的问题及解决方案。
|
11月前
|
Cloud Native Devops 云计算
云计算的未来:云原生架构与微服务的革命####
【10月更文挑战第21天】 随着企业数字化转型的加速,云原生技术正迅速成为IT行业的新宠。本文深入探讨了云原生架构的核心理念、关键技术如容器化和微服务的优势,以及如何通过这些技术实现高效、灵活且可扩展的现代应用开发。我们将揭示云原生如何重塑软件开发流程,提升业务敏捷性,并探索其对企业IT架构的深远影响。 ####
267 3

热门文章

最新文章