复杂事件处理(CEP)语句

简介:

MATCH_RECOGNIZE用于从输入流中识别符合指定规则的事件,并按照指定的方式输出。

语法

 
  
  1. SELECT [ ALL | DISTINCT ]
  2. { * | projectItem [, projectItem ]* }
  3. FROM tableExpression
  4. [MATCH_RECOGNIZE (
  5. [PARTITION BY {partitionItem [, partitionItem]*}]
  6. [ORDER BY {orderItem [, orderItem]*}]
  7. [MEASURES {measureItem AS col [, measureItem AS col]*}]
  8. [ONE ROW PER MATCH|ALL ROWS PER MATCH|ONE ROW PER MATCH WITH TIMEOUT ROWS|ALL ROWS PER MATCH WITH TIMEOUT ROWS]
  9. [AFTER MATCH SKIP]
  10. PATTERN (patternVariable[quantifier] [ patternVariable[quantifier]]*) WITHIN intervalExpression
  11. DEFINE {patternVariable AS patternDefinationExpression [, patternVariable AS patternDefinationExpression]*}
  12. )];

参数说明

  • PARTITION BY 指定分区的列,可选项。
  • ORDER BY 可以指定多列,但是必须以event time列或者process time列作为排序的首列,可选项。
  • MEASURES 定义如何根据匹配成功的输入事件构造输出事件。
  • ONE ROW PER MATCH 对于每一次成功的匹配,只会产生一个输出事件。
  • ONE ROW PER MATCH WITH TIMEOUT ROWS 除了匹配成功的时候产生输出外,超时的时候也会产生输出。超时时间由PATTERN语句中的WITHIN语句定义。
  • ALL ROW PER MATCH 对于每一次成功的匹配,对应于每一个输入事件,都会产生一个输出事件。
  • ALL ROW PER MATCH WITH TIMEOUT ROWS 除了匹配成功的时候产生输出外,超时的时候也会产生输出。超时时间由PATTERN语句中的WITHIN语句定义。
  • [ONE ROW PER MATCH|ALL ROWS PER MATCH|ONE ROW PER MATCH WITH TIMEOUT ROWS|ALL ROWS PER MATCH WITH TIMEOUT ROWS]为可选项,默认为ONE ROW PER MATCH
  • AFTER MATCH SKIP TO NEXT ROW 匹配成功之后,从匹配成功的事件序列中的第一个事件的下一个事件开始进行下一次匹配。
  • AFTER MATCH SKIP PAST LAST ROW 匹配成功之后,从匹配成功的事件序列中的最后一个事件的下一个事件开始进行下一次匹配。
  • AFTER MATCH SKIP TO FIRST patternItem 匹配成功之后,从匹配成功的事件序列中第一个对应于patternItem的事件开始下一次匹配。
  • AFTER MATCH SKIP TO LAST patternItem 匹配成功之后,从匹配成功的事件序列中最后一个对应于patternItem的事件开始下一次匹配。
  • PATTERN 定义待识别的事件序列需要满足的规则,需要定义在()中,由一系列自定义的patternVariable构成。

    说明:

    • patternVariable之间若以空格间隔,表示符合这两种patternVariable的事件中间不存在其他事件。
    • patternVariable之间若以->间隔,表示符合这两种patternVariable的事件之间可以存在其它事件。

Quantifier

quantifier用于指定符合patternVariable定义的事件的出现次数。

参数 参数意义
* 0次或多次
+ 1次或多次
? 0次或1次
{n} n次
{n,} 大于等于n次
{n, m} 大于等于n次,小于等于m次
{,m} 小于等于m次

默认为贪婪匹配。比如对于pattern: A -> B+,输入:a b1, b2, b3,输出为:a b1, a b1 b2, a b1 b2 b3。可以在quantifier符号后面加来表示非贪婪匹配。

  • *?
  • +?
  • {n}?
  • {n,}?
  • {n, m}?
  • {,m}?

此时对于上面例子中的pattern及输入,产生的输出为:a b1, a b2, a b1 b2, a b3, a b2 b3, a b1 b2 b3

注意:

  • WITHIN 定义符合规则的事件序列的最大时间跨度。
  • 静态窗口
    格式:INTERVAL ‘string’ timeUnit [ TO timeUnit ]
    示例:INTERVAL ‘10’ SECOND, INTERVAL ‘45’ DAY, INTERVAL ‘10:20’ MINUTE TO SECOND, INTERVAL ‘10:20.10’ MINUTE TO SECOND, INTERVAL ‘10:20’ HOUR TO MINUTE, INTERVAL ‘1-5’ YEAR TO MONTH
  • 动态窗口
    格式: INTERVAL intervalExpression
    示例: INTERVAL A.windowTime + 10,其中A为pattern定义中第一个patternVariable。
    在intervalExpression的定义中,可以使用pattern定义中出现过的patternVariable。当前只能使用第一个patternVariable。intervalExpression中可以使用UDF,intervalExpression的结果必须为long,单位为millisecond, 表示窗口的大小。
  • DEFINE 定义在PATTERN中出现的patternVariable的具体含义,若某个patternVariable在DEFINE中没有定义,则认为对于每一个事件,该patternVariable都成立。

在MEASURES和DEFINE语句中,可以使用如下函数。

函数 函数意义
Row Pattern Column References 形式为: patternVariable.col。表示访问patternVariable所对应的事件的指定的列。
PREV 只能用在DEFINE语句中,一般与Row Pattern Column References合用。用于访问指定的pattern所对应的事件之前偏移指定的offset所对应的事件的指定的列。
示例:对于DOWN AS DOWN.price < PREV(DOWN.price),PREV(A.price)表示当前事件的前一个事件的price列的值。注意,DOWN.price等价于PREV(DOWN.price, 0)。 PREV(DOWN.price)等价于PREV(DOWN.price, 1)。
FIRST、LAST 一般与Row Pattern Column References合用,用于访问指定的pattern所对应的事件序列中的指定偏移位置的事件。
示例:FIRST(A.price, 3)表示pattern A所对应的事件序列中的第3个事件。LAST(A.price, 3)表示pattern A所对应的事件序列中的倒数第3个事件。

输出列:

函数 输出列
ONE ROW PER MATCH 包括 partition by中指定的列及measures中定义的列。 对于partition by中已经指定的列,在measures中无需重复定义。
ONE ROW PER MATCH WITH TIMEOUT ROWS 除匹配成功的时候产生输出外,超时的时候也会产生输出,超时时间由PATTERN语句中的WITHIN语句定义。

注意:

  1. 定义pattern的时候,最好也定义WITHIN,否则可能会造成state越来越大。
  2. order by中定义的首列必须为event time列或者process time列。

示例

 
  
  1. SELECT *
  2. FROM Ticker MATCH_RECOGNIZE (
  3. PARTITION BY symbol
  4. ORDER BY tstamp
  5. MEASURES STRT.tstamp AS start_tstamp,
  6. LAST(DOWN.tstamp) AS bottom_tstamp,
  7. LAST(UP.tstamp) AS end_tstamp
  8. ONE ROW PER MATCH
  9. AFTER MATCH SKIP TO NEXT ROW
  10. PATTERN (STRT DOWN+ UP+) WITHIN INTERVAL '10' SECOND
  11. DEFINE
  12. DOWN AS DOWN.price < PREV(DOWN.price),
  13. UP AS UP.price > PREV(UP.price)
  14. ) MR
  15. ORDER BY MR.symbol, MR.start_tstamp;

测试数据

timestamp(TIMESTAMP) card_id(VARCHAR) location(VARCHAR) action(VARCHAR)
2018-04-13 12:00:00 1 WW Tom
2018-04-13 12:05:00 1 WW1 Tom
2018-04-13 12:10:00 1 WW2 Tom
2018-04-13 12:20:00 1 WW Tom

测试案例

 
   
  1. CREATE TABLE datahub_stream (
  2. `timestamp` TIMESTAMP,
  3. card_id VARCHAR,
  4. location VARCHAR,
  5. `action` VARCHAR,
  6. WATERMARK wf FOR `timestamp` AS withOffset(`timestamp`, 1000)
  7. ) WITH (
  8. type = 'datahub'
  9. ...
  10. );
  11. CREATE TABLE rds_out (
  12. start_timestamp TIMESTAMP,
  13. end_timestamp TIMESTAMP,
  14. card_id VARCHAR,
  15. event VARCHAR
  16. ) WITH (
  17. type= 'rds'
  18. ...
  19. );
  20. --案例描述
  21. -- 当相同的card_id在十分钟内,从两个不同的location发生刷卡现象,就会触发报警机制,以便于监测信用卡盗刷等现象
  22. -- 定义计算逻辑
  23. insert into rds_out
  24. select
  25. `start_timestamp`,
  26. `end_timestamp`,
  27. card_id, `event`
  28. from datahub_stream
  29. MATCH_RECOGNIZE (
  30. PARTITION BY card_id -- card_id分区,将相同卡号的数据分到同一个计算节点上。
  31. ORDER BY `timestamp` -- 在窗口内,对事件时间进行排序。
  32. MEASURES --定义如何根据匹配成功的输入事件构造输出事件。
  33. e2.`action` as `event`,
  34. e1.`timestamp` as `start_timestamp`, --第一次的事件时间为start_timestamp
  35. LAST(e2.`timestamp`) as `end_timestamp`--最新的事件时间为end_timestamp
  36. ONE ROW PER MATCH --匹配成功输出一条。
  37. AFTER MATCH SKIP TO NEXT ROW--匹配后跳转到下一行。
  38. PATTERN (e1 e2+) WITHIN INTERVAL '10' MINUTE -- 定义两个事件,e1e2
  39. DEFINE --定义在PATTERN中出现的patternVariable的具体含义。
  40. e1 as e1.action = 'Tom', --事件一的action标记为Tom
  41. e2 as e2.action = 'Tom' and e2.location <> e1.location --事件二的action标记为Tom,且事件一和事件二的location不一致。
  42. );

测试结果

start_timestamp(TIMESTAMP) end_timestamp(TIMESTAMP) card_id(VARCHAR) event(VARCHAR)
2018-04-13 20:00:00.0 2018-04-13 20:05:00.0 1 Tom
2018-04-13 20:05:00.0 2018-04-13 20:10:00.0 1 Tom
本文转自实时计算—— 复杂事件处理(CEP)语句
相关文章
|
3月前
|
存储 前端开发 JavaScript
简单实现一个事件触发器
简单实现一个事件触发器
32 0
|
2月前
|
定位技术 流计算
在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
【2月更文挑战第12天】在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
38 3
|
12月前
|
传感器 算法 数据挖掘
「事件架构」ESP和CEP有什么区别?
「事件架构」ESP和CEP有什么区别?
|
存储 传感器 JSON
传递事件流
批处理领域,作业的输入和输出是文件(也许在分布式文件系统)。流处理领域中的等价物是啥呢?
38 0
事件驱动(Event driven)——函数的异步调用方式总结
事件驱动(Event driven)——函数的异步调用方式总结自制脑图, 函数的异步调用方式有利于拉平负载,提高任务的成功率,但也带来了一系列挑战。我们结合用户的实际场景,可总结为下述几类:
120 0
事件驱动(Event driven)——函数的异步调用方式总结
|
Serverless
事件驱动(Event driven)——函数计算异步调用
事件驱动(Event driven)——函数计算异步调用自制脑图
70 0
事件驱动(Event driven)——函数计算异步调用
|
SQL 机器学习/深度学习 运维
(1)Flink CEP复杂事件处理引擎介绍
复杂事件处理(CEP)既是把不同的数据看做不同的事件,并且通过分析事件之间的关系建立起一套事件关系序列库。利用过滤,聚合,关联性,依赖,层次等技术,最终实现由简单关系产生高级事件关系。 复杂事件主要应用场景:主要用于信用卡欺诈检测、用户风险检测、设备故障检测、攻击行为分析等领域。 Flink CEP能够利用的场景较多,在实际业务场景中也有了广泛的使用案例与经验积累。比如
(1)Flink CEP复杂事件处理引擎介绍
|
监控 调度
ETL(二):表达式组件的使用(三)
ETL(二):表达式组件的使用(三)
ETL(二):表达式组件的使用(三)
|
数据采集 数据库
ETL(二):表达式组件的使用(二)
ETL(二):表达式组件的使用(二·)
ETL(二):表达式组件的使用(二)
ETL(二):表达式组件的使用(一)
ETL(二):表达式组件的使用(一)
ETL(二):表达式组件的使用(一)