复杂事件处理(CEP)语句

简介:

MATCH_RECOGNIZE用于从输入流中识别符合指定规则的事件,并按照指定的方式输出。

语法

 
  
  1. SELECT [ ALL | DISTINCT ]
  2. { * | projectItem [, projectItem ]* }
  3. FROM tableExpression
  4. [MATCH_RECOGNIZE (
  5. [PARTITION BY {partitionItem [, partitionItem]*}]
  6. [ORDER BY {orderItem [, orderItem]*}]
  7. [MEASURES {measureItem AS col [, measureItem AS col]*}]
  8. [ONE ROW PER MATCH|ALL ROWS PER MATCH|ONE ROW PER MATCH WITH TIMEOUT ROWS|ALL ROWS PER MATCH WITH TIMEOUT ROWS]
  9. [AFTER MATCH SKIP]
  10. PATTERN (patternVariable[quantifier] [ patternVariable[quantifier]]*) WITHIN intervalExpression
  11. DEFINE {patternVariable AS patternDefinationExpression [, patternVariable AS patternDefinationExpression]*}
  12. )];

参数说明

  • PARTITION BY 指定分区的列,可选项。
  • ORDER BY 可以指定多列,但是必须以event time列或者process time列作为排序的首列,可选项。
  • MEASURES 定义如何根据匹配成功的输入事件构造输出事件。
  • ONE ROW PER MATCH 对于每一次成功的匹配,只会产生一个输出事件。
  • ONE ROW PER MATCH WITH TIMEOUT ROWS 除了匹配成功的时候产生输出外,超时的时候也会产生输出。超时时间由PATTERN语句中的WITHIN语句定义。
  • ALL ROW PER MATCH 对于每一次成功的匹配,对应于每一个输入事件,都会产生一个输出事件。
  • ALL ROW PER MATCH WITH TIMEOUT ROWS 除了匹配成功的时候产生输出外,超时的时候也会产生输出。超时时间由PATTERN语句中的WITHIN语句定义。
  • [ONE ROW PER MATCH|ALL ROWS PER MATCH|ONE ROW PER MATCH WITH TIMEOUT ROWS|ALL ROWS PER MATCH WITH TIMEOUT ROWS]为可选项,默认为ONE ROW PER MATCH
  • AFTER MATCH SKIP TO NEXT ROW 匹配成功之后,从匹配成功的事件序列中的第一个事件的下一个事件开始进行下一次匹配。
  • AFTER MATCH SKIP PAST LAST ROW 匹配成功之后,从匹配成功的事件序列中的最后一个事件的下一个事件开始进行下一次匹配。
  • AFTER MATCH SKIP TO FIRST patternItem 匹配成功之后,从匹配成功的事件序列中第一个对应于patternItem的事件开始下一次匹配。
  • AFTER MATCH SKIP TO LAST patternItem 匹配成功之后,从匹配成功的事件序列中最后一个对应于patternItem的事件开始下一次匹配。
  • PATTERN 定义待识别的事件序列需要满足的规则,需要定义在()中,由一系列自定义的patternVariable构成。

    说明:

    • patternVariable之间若以空格间隔,表示符合这两种patternVariable的事件中间不存在其他事件。
    • patternVariable之间若以->间隔,表示符合这两种patternVariable的事件之间可以存在其它事件。

Quantifier

quantifier用于指定符合patternVariable定义的事件的出现次数。

参数 参数意义
* 0次或多次
+ 1次或多次
? 0次或1次
{n} n次
{n,} 大于等于n次
{n, m} 大于等于n次,小于等于m次
{,m} 小于等于m次

默认为贪婪匹配。比如对于pattern: A -> B+,输入:a b1, b2, b3,输出为:a b1, a b1 b2, a b1 b2 b3。可以在quantifier符号后面加来表示非贪婪匹配。

  • *?
  • +?
  • {n}?
  • {n,}?
  • {n, m}?
  • {,m}?

此时对于上面例子中的pattern及输入,产生的输出为:a b1, a b2, a b1 b2, a b3, a b2 b3, a b1 b2 b3

注意:

  • WITHIN 定义符合规则的事件序列的最大时间跨度。
  • 静态窗口
    格式:INTERVAL ‘string’ timeUnit [ TO timeUnit ]
    示例:INTERVAL ‘10’ SECOND, INTERVAL ‘45’ DAY, INTERVAL ‘10:20’ MINUTE TO SECOND, INTERVAL ‘10:20.10’ MINUTE TO SECOND, INTERVAL ‘10:20’ HOUR TO MINUTE, INTERVAL ‘1-5’ YEAR TO MONTH
  • 动态窗口
    格式: INTERVAL intervalExpression
    示例: INTERVAL A.windowTime + 10,其中A为pattern定义中第一个patternVariable。
    在intervalExpression的定义中,可以使用pattern定义中出现过的patternVariable。当前只能使用第一个patternVariable。intervalExpression中可以使用UDF,intervalExpression的结果必须为long,单位为millisecond, 表示窗口的大小。
  • DEFINE 定义在PATTERN中出现的patternVariable的具体含义,若某个patternVariable在DEFINE中没有定义,则认为对于每一个事件,该patternVariable都成立。

在MEASURES和DEFINE语句中,可以使用如下函数。

函数 函数意义
Row Pattern Column References 形式为: patternVariable.col。表示访问patternVariable所对应的事件的指定的列。
PREV 只能用在DEFINE语句中,一般与Row Pattern Column References合用。用于访问指定的pattern所对应的事件之前偏移指定的offset所对应的事件的指定的列。
示例:对于DOWN AS DOWN.price < PREV(DOWN.price),PREV(A.price)表示当前事件的前一个事件的price列的值。注意,DOWN.price等价于PREV(DOWN.price, 0)。 PREV(DOWN.price)等价于PREV(DOWN.price, 1)。
FIRST、LAST 一般与Row Pattern Column References合用,用于访问指定的pattern所对应的事件序列中的指定偏移位置的事件。
示例:FIRST(A.price, 3)表示pattern A所对应的事件序列中的第3个事件。LAST(A.price, 3)表示pattern A所对应的事件序列中的倒数第3个事件。

输出列:

函数 输出列
ONE ROW PER MATCH 包括 partition by中指定的列及measures中定义的列。 对于partition by中已经指定的列,在measures中无需重复定义。
ONE ROW PER MATCH WITH TIMEOUT ROWS 除匹配成功的时候产生输出外,超时的时候也会产生输出,超时时间由PATTERN语句中的WITHIN语句定义。

注意:

  1. 定义pattern的时候,最好也定义WITHIN,否则可能会造成state越来越大。
  2. order by中定义的首列必须为event time列或者process time列。

示例

 
  
  1. SELECT *
  2. FROM Ticker MATCH_RECOGNIZE (
  3. PARTITION BY symbol
  4. ORDER BY tstamp
  5. MEASURES STRT.tstamp AS start_tstamp,
  6. LAST(DOWN.tstamp) AS bottom_tstamp,
  7. LAST(UP.tstamp) AS end_tstamp
  8. ONE ROW PER MATCH
  9. AFTER MATCH SKIP TO NEXT ROW
  10. PATTERN (STRT DOWN+ UP+) WITHIN INTERVAL '10' SECOND
  11. DEFINE
  12. DOWN AS DOWN.price < PREV(DOWN.price),
  13. UP AS UP.price > PREV(UP.price)
  14. ) MR
  15. ORDER BY MR.symbol, MR.start_tstamp;

测试数据

timestamp(TIMESTAMP) card_id(VARCHAR) location(VARCHAR) action(VARCHAR)
2018-04-13 12:00:00 1 WW Tom
2018-04-13 12:05:00 1 WW1 Tom
2018-04-13 12:10:00 1 WW2 Tom
2018-04-13 12:20:00 1 WW Tom

测试案例

 
   
  1. CREATE TABLE datahub_stream (
  2. `timestamp` TIMESTAMP,
  3. card_id VARCHAR,
  4. location VARCHAR,
  5. `action` VARCHAR,
  6. WATERMARK wf FOR `timestamp` AS withOffset(`timestamp`, 1000)
  7. ) WITH (
  8. type = 'datahub'
  9. ...
  10. );
  11. CREATE TABLE rds_out (
  12. start_timestamp TIMESTAMP,
  13. end_timestamp TIMESTAMP,
  14. card_id VARCHAR,
  15. event VARCHAR
  16. ) WITH (
  17. type= 'rds'
  18. ...
  19. );
  20. --案例描述
  21. -- 当相同的card_id在十分钟内,从两个不同的location发生刷卡现象,就会触发报警机制,以便于监测信用卡盗刷等现象
  22. -- 定义计算逻辑
  23. insert into rds_out
  24. select
  25. `start_timestamp`,
  26. `end_timestamp`,
  27. card_id, `event`
  28. from datahub_stream
  29. MATCH_RECOGNIZE (
  30. PARTITION BY card_id -- card_id分区,将相同卡号的数据分到同一个计算节点上。
  31. ORDER BY `timestamp` -- 在窗口内,对事件时间进行排序。
  32. MEASURES --定义如何根据匹配成功的输入事件构造输出事件。
  33. e2.`action` as `event`,
  34. e1.`timestamp` as `start_timestamp`, --第一次的事件时间为start_timestamp
  35. LAST(e2.`timestamp`) as `end_timestamp`--最新的事件时间为end_timestamp
  36. ONE ROW PER MATCH --匹配成功输出一条。
  37. AFTER MATCH SKIP TO NEXT ROW--匹配后跳转到下一行。
  38. PATTERN (e1 e2+) WITHIN INTERVAL '10' MINUTE -- 定义两个事件,e1e2
  39. DEFINE --定义在PATTERN中出现的patternVariable的具体含义。
  40. e1 as e1.action = 'Tom', --事件一的action标记为Tom
  41. e2 as e2.action = 'Tom' and e2.location <> e1.location --事件二的action标记为Tom,且事件一和事件二的location不一致。
  42. );

测试结果

start_timestamp(TIMESTAMP) end_timestamp(TIMESTAMP) card_id(VARCHAR) event(VARCHAR)
2018-04-13 20:00:00.0 2018-04-13 20:05:00.0 1 Tom
2018-04-13 20:05:00.0 2018-04-13 20:10:00.0 1 Tom
本文转自实时计算—— 复杂事件处理(CEP)语句
相关文章
|
SQL 存储 分布式计算
spark执行sql的原理是什么
spark执行sql的原理是什么
346 1
|
消息中间件 网络协议 算法
流量回放工具之 goreplay 核心源码分析
【6月更文挑战第3天】流量回放工具之 goreplay 核心源码分析
386 3
|
存储 算法 Linux
Gzip的压缩级别
【4月更文挑战第28天】Gzip的压缩级别
737 2
|
机器学习/深度学习 前端开发 数据可视化
数据分析web可视化神器---streamlit框架,无需懂前端也能搭建出精美的web网站页面
数据分析web可视化神器---streamlit框架,无需懂前端也能搭建出精美的web网站页面
1224 0
|
API 索引
Elasticsearch Index Shard Allocation 索引分片分配策略
Elasticsearch Index Shard Allocation 索引分片分配策略
434 1
|
PyTorch 算法框架/工具 Python
PyTorch中的forward的理解
PyTorch中的forward的理解
513 0
|
SQL 消息中间件 关系型数据库
Flink报错问题之提交flink sql任务报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
传感器 机器学习/深度学习 运维
多变量时间序列数据集整理和对应标准化处理代码合集
# 前言 最近在做多变量时间序列异常检测相关的工作,顺带也整理了目前市面上比较常用的五个多变量时间序列异常检测数据集,测试集都有标好的label,这五个数据集应该是在这个领域最为常用benchmark的数据集,整理主要来自于很多顶会的对比实验。 本文主要介绍五个数据集的具体信息和对应的标准化处理,并给出处理的代码和最终标准化的格式。 tips:作为一个写了四五年博客的职场小白来说,准备从今天开
4592 1
|
运维 算法 大数据
基于实时计算(Flink)与高斯模型构建实时异常检测系统
案例与解决方案汇总页:阿里云实时计算产品案例&解决方案汇总 1. 概述 异常检测(anomaly detection)指的是对不符合预期模式或数据集(英语:dataset)中其他项目的项目、事件或观测值的识别。
8487 0
|
存储 人工智能 并行计算
喜马拉雅基于DeepRec构建AI平台实践
快速落地大模型训练和推理能力,带来业务指标和后续算法优化空间的显著提升。喜马拉雅AI云,是面向公司人员提供的一套从数据、特征、模型到服务的全流程一站式算法工具平台。