Flink CEP - Flink的复杂事件处理

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 1 Flink CEP 是什么FlinkCEP - Flink的复杂事件处理。它可以让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分

1 Flink CEP 是什么


FlinkCEP - Flink的复杂事件处理。它可以让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分

2 Flink CEP 特点


55.png

目标:从有序的简单事件流中发现一些高阶特征

输入:一个或多个由简单事件构成的事件流

处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件

输出:满足规则的复杂事件


3 Flink CEP 应用场景


风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。

策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。

运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。


4 Flink CEP原理


Flink CEP内部是用NFA(非确定有限自动机)来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。点分为起始状态、中间状态、最终状态三种,边分为take、ignore、proceed三种。


23.png

take:必须存在一个条件判断,当到来的消息满足take边条件判断时,把这个消息放入结果集,将状态转移到下一状态。

ignore:当消息到来时,可以忽略这个消息,将状态自旋在当前不变,是一个自己到自己的状态转移。

proceed:又叫做状态的空转移,当前状态可以不依赖于消息到来而直接转移到下一状态。

模式API (Pattern API)


模式API可以让你定义想从输入流中抽取的复杂模式序列


个体模式:组成复杂规则的每一个单独的模式定义,就是“个体模式”


A 量词


  • 在FlinkCEP中,你可以通过这些方法指定循环模式:
  • pattern.oneOrMore() 指定期望一个给定事件出现一次或者多次的模式
  • pattern.times(#fTimes) 指定期望一个给定事件出现特定次数的模式,例如出现4次A
  • pattern.times(#fromTimes, #toTimes) 指定期望一个给定事件出现次数在一个最小值和最大值中间的模式,比如出现2-4次A
  • pattern.greedy() 方法让循环模式变成贪心的
  • pattern.optional() 方法让所有的模式变成可选的,不管是否是循环模式
  • // 期望出现4次start.times(4)// 期望出现0或者4次
  • start.times(4).optional()// 期望出现2、3或者4次 -
  • start.times(2, 4)// 期望出现2、3或者4次,并且尽可能的重复次数多
  • start.times(2, 4).greedy()// 期望出现0、2、3或者4次
  • start.times(2, 4).optional()// 期望出现0、2、3或者4次,并且尽可能的重复次数多
  • start.times(2, 4).optional().greedy()// 期望出现1到多次
  • start.oneOrMore()// 期望出现1到多次,并且尽可能的重复次数多
  • start.oneOrMore().greedy()// 期望出现0到多次
  • start.oneOrMore().optional()// 期望出现0到多次,并且尽可能的重复次数
  • start.oneOrMore().optional().greedy()// 期望出现2到多次
  • start.timesOrMore(2)// 期望出现2到多次,并且尽可能的重复次数
  • start.timesOrMore(2).greedy()// 期望出现0、2或多次
  • start.timesOrMore(2).optional()// 期望出现0、2或多次,并且尽可能的重复次数多
  • start.timesOrMore(2).optional().greedy()B 条件

对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,指定判断事件属性的条件可以通过

  • 对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,指定判断事件属性的条件可以通过


  • pattern.where() 增加新的条件,多个条件应该同时满足 pattern.where(event => … /*一些判断条件 */)


  • pattern.or() 增加新的条件,多个条件满足任意一个都可以


  • pattern.where(event =>

     … /* 一些判断条件 /) .or(event => … / 替代条件 */)


  • pattern.until()

    为循环模式指定一个停止条件。意思是满足了给定的条件的事件出现后,就不会再有      事件被接受进入模式了。如"(a+ until b)"

    (一个或者更多的"a"直到"b")


  • pattern.oneOrMore().until(event => … /* 替代条件

     */)以上这些可以是可迭代的


  • subtype(subClass) 为当前模式定义一个子类型条件。一个事件只有是这个子类型的时候才能匹配到模式


  • pattern.subtype(classOf[SubEvent]) oneOrMore()
  • 指定模式期望匹配到的事件至少出现一次。默认(在子事件间)使用松散的内部连续性。关于内部连续性的更多信息可以参考连续性。提示:推荐使用until()或者within()来清理状态。


  • pattern.oneOrMore() timesOrMore(#times)
  • 指定模式期望匹配到的事件正好出现的次数。默认(在子事件间)使用松散的内部连续性。关于内部连续性的更多信息可以参考连续性。


  • pattern.timesOrMore(2) optional()
  • 指定这个模式是可选的,也就是说,它可能根本不出现。这对所有之前提到的量词都适用。


  • pattern.oneOrMore().optional() greedy()
  • 指定这个模式是贪心的,也就是说,它会重复尽可能多的次数。这只对量词适用,现在还不支持模式组。


  • pattern.oneOrMore().greedy()迭代条件 :
  • 这是最普遍的条件类型。使用它可以指定一个基于前面已经被接受的事件的属性或者它们的一个子集的统计数据来决定是否接受时间序列的条件。


  • pattern.oneOrMore() .subtype(classOf[SubEvent]) .where( (value, ctx)
  • => { lazy val sum = ctx.getEventsForPattern(“middle”).map(_.getPrice).sum
  • value.getName.startsWith(“foo”) && sum + value.getPrice < 5.0 } )组合模式:很多个体模式组合起来,就形成了整个的模式序列


  • FlinkCEP支持事件之间如下形式的连续策略


33.png

// 严格连续
val strict: Pattern[Event, _] = start.next("event").where(...)
// 松散连续
val relaxed: Pattern[Event, _] = start.followedBy("event").where(...)
// 不确定的松散连续
val nonDetermin: Pattern[Event, _] = start.followedByAny("event").where(...)
// 严格连续的NOT模式
val strictNot: Pattern[Event, _] = start.notNext("not").where(...)
// 松散连续的NOT模式
val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(...)

松散连续意味着跟着的事件中,只有第一个可匹配的事件会被匹配上,而不确定的松散连接情况下,有着同样起始的多个匹配会被输出。举例来说,模式"a b",给定事件序列"a",“c”,“b1”,“b2”,会产生如下的结果:


"a"和"b"之间严格连续:{} (没有匹配),"a"之后的"c"导致"a"被丢弃。


"a"和"b"之间松散连续:{a b1},松散连续会”跳过不匹配的事件直到匹配上的事件”。


"a"和"b"之间不确定的松散连续:{a b1}, {a b2},这是最常见的情况。


也可以为模式定义一个有效时间约束。例如,你可以通过pattern.within()方法指定一个模式应该在10秒内发生。这种时间模式支持处理时间和事件时间.

next.within(Time.seconds(10))循环模式中的连续性

对于循环模式(例如oneOrMore()和times())),默认是松散连续。如果想使用严格连续,你需要使用consecutive()方法明确指定, 如果想使用不确定松散连续,你可以使用allowCombinations()方法。

consecutive

与oneOrMore()和times()一起使用, 在匹配的事件之间施加严格的连续性, 也就是说,任何不匹配的事件都会终止匹配(和next()一样)。如果不使用它,那么就是松散连续(和followedBy()一样)

Pattern.begin("start")
.where(_.getName().equals("c"))
.followedBy("middle")
.where(_.getName().equals("a"))
 .oneOrMore()
 .consecutive()
  .followedBy("end1")
  .where(_.getName().equals("b"))

输入:C D A1 A2 A3 D A4 B,会产生下面的输出:如果施加严格连续性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B}不施加严格连续性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}allowCombinations


与oneOrMore()和times()一起使用, 在匹配的事件中间施加不确定松散连续性(和followedByAny()一样)。如果不使用,就是松散连续(和followedBy()一样)

Pattern.begin("start")
.where(_.getName().equals("c"))
.followedBy("middle")
.where(_.getName().equals("a"))
 .oneOrMore()
 .allowCombinations() 
 .followedBy("end1").where(_.getName().equals("b"))

输入:C D A1 A2 A3 D A4 B,会产生如下的输出:如果使用不确定松散连续: {C A1 B},{C A1 A2 B},{C A1 A3 B},{C A1 A4 B},{C A1 A2 A3 B},{C A1 A2 A4 B},{C A1 A3 A4 B},{C A1 A2 A3 A4 B}如果不使用:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}5 匹配后跳过策略


对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上,你需要指定跳过策略AfterMatchSkipStrategy。有五种跳过策略,如下:


NO_SKIP: 每个成功的匹配都会被输出。

SKIP_TO_NEXT: 丢弃以相同事件开始的所有部分匹配。

SKIP_PAST_LAST_EVENT: 丢弃起始在这个匹配的开始和结束之间的所有部分匹配。

SKIP_TO_FIRST: 丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。

SKIP_TO_LAST: 丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。

val skipStrategy = AfterMatchSkipStrategy.skipToFirst("patternName").throwExceptionOnMiss()Pattern.begin("patternName", skipStrategy)

例如,给定一个模式b+ c和一个数据流b1 b2 b3 c,不同跳过策略之间的不同如下:66.png

5 模式的检测


在指定了要寻找的模式后,该把它们应用到输入流上来发现可能的匹配了。为了在事件流上运行你的模式,需要创建一个PatternStream。给定一个输入流input,一个模式pattern和一个可选的用来对使用事件时间时有同样时间戳或者同时到达的事件进行排序的比较器comparator

object Cep {
 def main(args: Array[String]): Unit = { 
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) 
val loginDS: DataStream[LoginInfo] = env.fromElements("") .map(data => { 
val datas: Array[String] = data.split(",") LoginInfo(datas(0), datas(1), datas(2).toInt,datas(3).toLong) } ) 
// 创建规则
 val pattern: Pattern[LoginInfo, LoginInfo] = Pattern.begin[LoginInfo]("start") 
// 增加新的条件,多个条件应该同时满足 
.where(_.age > 1) .where(_.age < 12)
// 应用规则:将规则应用到数据流中 
val loginPS: PatternStream[LoginInfo] = CEP.pattern(loginDS,pattern) 
// 获取规则的结果 
val ds: DataStream[String] = loginPS.select( 
map => {
 map.toString }) ds.print() env.execute() 
 }
}
case class LoginInfo( id:String, name:String, age:Int, loginTime:Long
 )

6 从模式中选取


创建 PatternStream 之后,就可以应用 select 或者 flatselect 方法,从检测到的事件序列中提取事件了


select() 方法需要输入一个 select function 作为参数,每个成功匹配的事件序列都会调用它


select() 以一个 Map[String,Iterable [IN]] 来接收匹配到的事件序列,其中 key 就是每个模式的名称,而 value 就是所有接收到的事件的 Iterable 类型

val loginPS: PatternStream[LoginInfo] = CEP.pattern(loginDS,pattern) 
val outputTag: OutputTag[String] = OutputTag[String]("side-output") 
val result: SingleOutputStreamOperator[CompleLogin] =loginPS.select(outputTag){ (pattern: Map[String, Iterable[LoginInfo]], timestamp: Long, out: Collector[TimeoutLogin]) => out.collect(TimeoutLogin()) } { (pattern: mutable.Map[String, Iterable[LoginInfo]], out: Collector[CompleLogin]) => out.collect(CompleLogin()) } 
val timeoutResult: DataStream[TimeoutLogin] =result.getSideOutput(outputTag)
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
分布式计算 监控 大数据
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
65 0
|
11天前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
34 9
|
1月前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
41 0
|
1月前
|
分布式计算 监控 大数据
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
53 0
|
3月前
|
消息中间件 存储 Kafka
【Azure 事件中心】Flink消费Event Hub中事件, 使用Azure默认示例代码,始终获取新产生的事件,如何消费旧事件呢?
【Azure 事件中心】Flink消费Event Hub中事件, 使用Azure默认示例代码,始终获取新产生的事件,如何消费旧事件呢?
|
3月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
监控 Java API
【揭秘】如何用Flink CEP揪出那些偷偷摸摸连续登录失败的“捣蛋鬼”?——一场数据流中的侦探游戏
【8月更文挑战第26天】Flink 是一款先进的流处理框架,提供复杂事件处理(CEP)功能以识别实时数据流中的特定模式。CEP 在 Flink 中通过 `CEP` API 实现,支持基于模式匹配的事件检测。本文通过监测用户连续三次登录失败的具体案例介绍 Flink CEP 的工作原理与应用方法。首先创建 Flink 环境并定义数据源,接着利用 CEP 定义连续三次失败登录的模式,最后处理匹配结果并输出警报。Flink CEP 能够轻松扩展至更复杂的场景,如异常行为检测和交易欺诈检测等,有效应对多样化的业务需求。
44 0
|
4月前
|
运维 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在处理MySQL表新增数据记录时,没有正确触发变更事件,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之同步MySQL数据并EP(复杂事件处理)时,编译报错,如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
122 1
|
5月前
|
关系型数据库 数据库 流计算
实时计算 Flink版操作报错合集之在使用Flink CDC TiDB Connector时,无法获取到事件,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
410 0

热门文章

最新文章