Flink CEP - Flink的复杂事件处理

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 1 Flink CEP 是什么FlinkCEP - Flink的复杂事件处理。它可以让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分

1 Flink CEP 是什么


FlinkCEP - Flink的复杂事件处理。它可以让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分

2 Flink CEP 特点


55.png

目标:从有序的简单事件流中发现一些高阶特征

输入:一个或多个由简单事件构成的事件流

处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件

输出:满足规则的复杂事件


3 Flink CEP 应用场景


风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。

策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。

运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。


4 Flink CEP原理


Flink CEP内部是用NFA(非确定有限自动机)来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。点分为起始状态、中间状态、最终状态三种,边分为take、ignore、proceed三种。


23.png

take:必须存在一个条件判断,当到来的消息满足take边条件判断时,把这个消息放入结果集,将状态转移到下一状态。

ignore:当消息到来时,可以忽略这个消息,将状态自旋在当前不变,是一个自己到自己的状态转移。

proceed:又叫做状态的空转移,当前状态可以不依赖于消息到来而直接转移到下一状态。

模式API (Pattern API)


模式API可以让你定义想从输入流中抽取的复杂模式序列


个体模式:组成复杂规则的每一个单独的模式定义,就是“个体模式”


A 量词


  • 在FlinkCEP中,你可以通过这些方法指定循环模式:
  • pattern.oneOrMore() 指定期望一个给定事件出现一次或者多次的模式
  • pattern.times(#fTimes) 指定期望一个给定事件出现特定次数的模式,例如出现4次A
  • pattern.times(#fromTimes, #toTimes) 指定期望一个给定事件出现次数在一个最小值和最大值中间的模式,比如出现2-4次A
  • pattern.greedy() 方法让循环模式变成贪心的
  • pattern.optional() 方法让所有的模式变成可选的,不管是否是循环模式
  • // 期望出现4次start.times(4)// 期望出现0或者4次
  • start.times(4).optional()// 期望出现2、3或者4次 -
  • start.times(2, 4)// 期望出现2、3或者4次,并且尽可能的重复次数多
  • start.times(2, 4).greedy()// 期望出现0、2、3或者4次
  • start.times(2, 4).optional()// 期望出现0、2、3或者4次,并且尽可能的重复次数多
  • start.times(2, 4).optional().greedy()// 期望出现1到多次
  • start.oneOrMore()// 期望出现1到多次,并且尽可能的重复次数多
  • start.oneOrMore().greedy()// 期望出现0到多次
  • start.oneOrMore().optional()// 期望出现0到多次,并且尽可能的重复次数
  • start.oneOrMore().optional().greedy()// 期望出现2到多次
  • start.timesOrMore(2)// 期望出现2到多次,并且尽可能的重复次数
  • start.timesOrMore(2).greedy()// 期望出现0、2或多次
  • start.timesOrMore(2).optional()// 期望出现0、2或多次,并且尽可能的重复次数多
  • start.timesOrMore(2).optional().greedy()B 条件

对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,指定判断事件属性的条件可以通过

  • 对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,指定判断事件属性的条件可以通过


  • pattern.where() 增加新的条件,多个条件应该同时满足 pattern.where(event => … /*一些判断条件 */)


  • pattern.or() 增加新的条件,多个条件满足任意一个都可以


  • pattern.where(event =>

     … /* 一些判断条件 /) .or(event => … / 替代条件 */)


  • pattern.until()

    为循环模式指定一个停止条件。意思是满足了给定的条件的事件出现后,就不会再有      事件被接受进入模式了。如"(a+ until b)"

    (一个或者更多的"a"直到"b")


  • pattern.oneOrMore().until(event => … /* 替代条件

     */)以上这些可以是可迭代的


  • subtype(subClass) 为当前模式定义一个子类型条件。一个事件只有是这个子类型的时候才能匹配到模式


  • pattern.subtype(classOf[SubEvent]) oneOrMore()
  • 指定模式期望匹配到的事件至少出现一次。默认(在子事件间)使用松散的内部连续性。关于内部连续性的更多信息可以参考连续性。提示:推荐使用until()或者within()来清理状态。


  • pattern.oneOrMore() timesOrMore(#times)
  • 指定模式期望匹配到的事件正好出现的次数。默认(在子事件间)使用松散的内部连续性。关于内部连续性的更多信息可以参考连续性。


  • pattern.timesOrMore(2) optional()
  • 指定这个模式是可选的,也就是说,它可能根本不出现。这对所有之前提到的量词都适用。


  • pattern.oneOrMore().optional() greedy()
  • 指定这个模式是贪心的,也就是说,它会重复尽可能多的次数。这只对量词适用,现在还不支持模式组。


  • pattern.oneOrMore().greedy()迭代条件 :
  • 这是最普遍的条件类型。使用它可以指定一个基于前面已经被接受的事件的属性或者它们的一个子集的统计数据来决定是否接受时间序列的条件。


  • pattern.oneOrMore() .subtype(classOf[SubEvent]) .where( (value, ctx)
  • => { lazy val sum = ctx.getEventsForPattern(“middle”).map(_.getPrice).sum
  • value.getName.startsWith(“foo”) && sum + value.getPrice < 5.0 } )组合模式:很多个体模式组合起来,就形成了整个的模式序列


  • FlinkCEP支持事件之间如下形式的连续策略


33.png

// 严格连续
val strict: Pattern[Event, _] = start.next("event").where(...)
// 松散连续
val relaxed: Pattern[Event, _] = start.followedBy("event").where(...)
// 不确定的松散连续
val nonDetermin: Pattern[Event, _] = start.followedByAny("event").where(...)
// 严格连续的NOT模式
val strictNot: Pattern[Event, _] = start.notNext("not").where(...)
// 松散连续的NOT模式
val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(...)

松散连续意味着跟着的事件中,只有第一个可匹配的事件会被匹配上,而不确定的松散连接情况下,有着同样起始的多个匹配会被输出。举例来说,模式"a b",给定事件序列"a",“c”,“b1”,“b2”,会产生如下的结果:


"a"和"b"之间严格连续:{} (没有匹配),"a"之后的"c"导致"a"被丢弃。


"a"和"b"之间松散连续:{a b1},松散连续会”跳过不匹配的事件直到匹配上的事件”。


"a"和"b"之间不确定的松散连续:{a b1}, {a b2},这是最常见的情况。


也可以为模式定义一个有效时间约束。例如,你可以通过pattern.within()方法指定一个模式应该在10秒内发生。这种时间模式支持处理时间和事件时间.

next.within(Time.seconds(10))循环模式中的连续性

对于循环模式(例如oneOrMore()和times())),默认是松散连续。如果想使用严格连续,你需要使用consecutive()方法明确指定, 如果想使用不确定松散连续,你可以使用allowCombinations()方法。

consecutive

与oneOrMore()和times()一起使用, 在匹配的事件之间施加严格的连续性, 也就是说,任何不匹配的事件都会终止匹配(和next()一样)。如果不使用它,那么就是松散连续(和followedBy()一样)

Pattern.begin("start")
.where(_.getName().equals("c"))
.followedBy("middle")
.where(_.getName().equals("a"))
 .oneOrMore()
 .consecutive()
  .followedBy("end1")
  .where(_.getName().equals("b"))

输入:C D A1 A2 A3 D A4 B,会产生下面的输出:如果施加严格连续性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B}不施加严格连续性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}allowCombinations


与oneOrMore()和times()一起使用, 在匹配的事件中间施加不确定松散连续性(和followedByAny()一样)。如果不使用,就是松散连续(和followedBy()一样)

Pattern.begin("start")
.where(_.getName().equals("c"))
.followedBy("middle")
.where(_.getName().equals("a"))
 .oneOrMore()
 .allowCombinations() 
 .followedBy("end1").where(_.getName().equals("b"))

输入:C D A1 A2 A3 D A4 B,会产生如下的输出:如果使用不确定松散连续: {C A1 B},{C A1 A2 B},{C A1 A3 B},{C A1 A4 B},{C A1 A2 A3 B},{C A1 A2 A4 B},{C A1 A3 A4 B},{C A1 A2 A3 A4 B}如果不使用:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}5 匹配后跳过策略


对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上,你需要指定跳过策略AfterMatchSkipStrategy。有五种跳过策略,如下:


NO_SKIP: 每个成功的匹配都会被输出。

SKIP_TO_NEXT: 丢弃以相同事件开始的所有部分匹配。

SKIP_PAST_LAST_EVENT: 丢弃起始在这个匹配的开始和结束之间的所有部分匹配。

SKIP_TO_FIRST: 丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。

SKIP_TO_LAST: 丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。

val skipStrategy = AfterMatchSkipStrategy.skipToFirst("patternName").throwExceptionOnMiss()Pattern.begin("patternName", skipStrategy)

例如,给定一个模式b+ c和一个数据流b1 b2 b3 c,不同跳过策略之间的不同如下:66.png

5 模式的检测


在指定了要寻找的模式后,该把它们应用到输入流上来发现可能的匹配了。为了在事件流上运行你的模式,需要创建一个PatternStream。给定一个输入流input,一个模式pattern和一个可选的用来对使用事件时间时有同样时间戳或者同时到达的事件进行排序的比较器comparator

object Cep {
 def main(args: Array[String]): Unit = { 
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) 
val loginDS: DataStream[LoginInfo] = env.fromElements("") .map(data => { 
val datas: Array[String] = data.split(",") LoginInfo(datas(0), datas(1), datas(2).toInt,datas(3).toLong) } ) 
// 创建规则
 val pattern: Pattern[LoginInfo, LoginInfo] = Pattern.begin[LoginInfo]("start") 
// 增加新的条件,多个条件应该同时满足 
.where(_.age > 1) .where(_.age < 12)
// 应用规则:将规则应用到数据流中 
val loginPS: PatternStream[LoginInfo] = CEP.pattern(loginDS,pattern) 
// 获取规则的结果 
val ds: DataStream[String] = loginPS.select( 
map => {
 map.toString }) ds.print() env.execute() 
 }
}
case class LoginInfo( id:String, name:String, age:Int, loginTime:Long
 )

6 从模式中选取


创建 PatternStream 之后,就可以应用 select 或者 flatselect 方法,从检测到的事件序列中提取事件了


select() 方法需要输入一个 select function 作为参数,每个成功匹配的事件序列都会调用它


select() 以一个 Map[String,Iterable [IN]] 来接收匹配到的事件序列,其中 key 就是每个模式的名称,而 value 就是所有接收到的事件的 Iterable 类型

val loginPS: PatternStream[LoginInfo] = CEP.pattern(loginDS,pattern) 
val outputTag: OutputTag[String] = OutputTag[String]("side-output") 
val result: SingleOutputStreamOperator[CompleLogin] =loginPS.select(outputTag){ (pattern: Map[String, Iterable[LoginInfo]], timestamp: Long, out: Collector[TimeoutLogin]) => out.collect(TimeoutLogin()) } { (pattern: mutable.Map[String, Iterable[LoginInfo]], out: Collector[CompleLogin]) => out.collect(CompleLogin()) } 
val timeoutResult: DataStream[TimeoutLogin] =result.getSideOutput(outputTag)
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
程序员 API 数据安全/隐私保护
Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)
Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)
|
1月前
|
定位技术 流计算
在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
【2月更文挑战第12天】在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
35 3
|
2月前
|
SQL 消息中间件 Apache
flink问题之cep超时事件如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
39 1
|
3月前
|
SQL Java 流计算
Flink 中支持的事件时间属性数据类型必须为 TIMESTAMP
【1月更文挑战第19天】【1月更文挑战第95篇】Flink 中支持的事件时间属性数据类型必须为 TIMESTAMP
42 2
|
3月前
|
监控 Java 流计算
Flink中的事件时间和处理时间有什么区别?为什么事件时间在流计算中很重要?
Flink中的事件时间和处理时间有什么区别?为什么事件时间在流计算中很重要?
30 0
|
4月前
|
缓存 关系型数据库 MySQL
这个错误提示表明Flink CDC在解析MySQL的二进制日志时,找不到对应表的TableMap事件。
这个错误提示表明Flink CDC在解析MySQL的二进制日志时,找不到对应表的TableMap事件。
145 2
|
4月前
|
数据库 流计算
Flink CDC的工作原理是通过监听数据库的变更事件
Flink CDC的工作原理是通过监听数据库的变更事件
56 1
|
7月前
|
SQL Java API
Flink CEP在实时风控场景的落地与优化
Flink CEP在实时风控场景的落地与优化
|
8月前
|
运维 监控 数据处理
Flink的正则表达式--CEP规则引擎
Flink的正则表达式--CEP规则引擎
|
SQL 消息中间件 JSON
Flink CEP 新特性进展与在实时风控场景的落地
本次分享将会介绍 Flink 社区在 1.16 中对 Flink CEP 所做的增强与优化。
Flink CEP 新特性进展与在实时风控场景的落地