很多场景中都会应用到模式匹配如:用户异常行为实时监测、银行卡异地监控、下单未支付等等;FlinkSQL中使用MATCH_RECOGNIZE子句进行复杂事件处理,我们先看个FlinkSQL中如何识别
- 在FlinkSQL client中创建一个测试表Ticket 其schema 如下
Ticket |-- symbol: String # 股票的代号 |-- price: Long # 股票的价格 |-- tax: Long # 股票应纳税额 |-- rowtime: TimeIndicatorTypeInfo(rowtime) # 更改这些值的时间点
Flink SQL>CREATETABLE Ticket (> symbol string,> price int,> tax int,> rowtime TIMESTAMP(3),> WATERMARK FOR rowtime AS rowtime --模式匹配必须要有水位线>) WITH (>'connector'='filesystem',>'format'='csv',>'path'='file:///mnt/ps/SAS/BigData/file/ticket.csv'>);[INFO] Execute statement succeed. Flink SQL>
- 为了简化,我们只考虑单个股票
ACME
的传入数据。其中的行是连续追加的。查询数据如下
symbol price tax rowtime ACME 1212021-09-0109:00:00.000ACME 1722021-09-0109:00:01.000ACME 1912021-09-0109:00:02.000ACME 2132021-09-0109:00:03.000ACME 2522021-09-0109:00:04.000ACME 1812021-09-0109:00:05.000ACME 1512021-09-0109:00:06.000ACME 1422021-09-0109:00:07.000ACME 2422021-09-0109:00:08.000ACME 2522021-09-0109:00:09.000ACME 1912021-09-0109:00:10.000
- 现在的任务是找出一个单一股票价格不断下降的时期
SELECT*FROM Ticket MATCH_RECOGNIZE (--只能用于追加表 PARTITION BY symbol --按symbol分组,相同数据会在一个节点进行计算ORDERBY rowtime --同一组下按事件时间进行排序 MEASURES --定义输出 START_ROW.rowtimeAS start_tstamp, LAST(PRICE_DOWN.rowtime)AS bottom_tstamp, LAST(PRICE_UP.rowtime)AS end_tstamp ONE ROW PER MATCH --匹配成功输出一条 AFTER MATCH SKIP TO LAST PRICE_UP --从匹配成功的事件序列中最后一个对应价格上升的事件开始匹配下一次 PATTERN (START_ROW PRICE_DOWN+ PRICE_UP)--定义3个事件:开始行 价格下降 价格回升(+号代表一个或多个数据) DEFINE --定义事件的具体含义 PRICE_DOWN AS--上一条价格下降事件的价格为空并且下降事件的价格小于开始行的价格或者下降事件的价格小于上一条的价格(LAST(PRICE_DOWN.price,1)ISNULLAND PRICE_DOWN.price< START_ROW.price)OR PRICE_DOWN.price< LAST(PRICE_DOWN.price,1), PRICE_UP AS--价格回升事件的价格大于上一条价格下降价格 PRICE_UP.price> LAST(PRICE_DOWN.price,1)) MR;--结果如下 从2021-09-01 09:00:04.000开始下降,直到2021-09-01 09:00:08.00回涨symbol start_tstamp bottom_tstamp end_tstamp ACME 2021-09-0109:00:04.0002021-09-0109:00:07.0002021-09-0109:00:08.000
- 离线SQL如何去实现呢
- 先生成示例数据源
with tb1 as(select symbol, price, tax, rowtime fromvalues('ACME',12,1,'2021-09-01 09:00:00'),('ACME',17,2,'2021-09-01 09:00:01'),('ACME',19,1,'2021-09-01 09:00:02'),('ACME',21,3,'2021-09-01 09:00:03'),('ACME',25,2,'2021-09-01 09:00:04'),('ACME',18,1,'2021-09-01 09:00:05'),('ACME',15,1,'2021-09-01 09:00:06'),('ACME',14,2,'2021-09-01 09:00:07'),('ACME',24,2,'2021-09-01 09:00:08'),('ACME',25,2,'2021-09-01 09:00:09'),('ACME',19,1,'2021-09-01 09:00:10') t(symbol,price,tax,rowtime))
- 通过计算上一条和下一条的股票价格差判断是否连续下降
tb2 as(select symbol, price, tax, rowtime, price-lag(price,1,price) over(partition by symbol orderby rowtime) lag_price_diff, lead(price,1,price) over(partition by symbol orderby rowtime)-price lead_price_diff, lead(rowtime,1,rowtime) over(partition by symbol orderby rowtime)as lead_rowtime from tb1 )--结果展示如下symbol price tax rowtime lag_price_diff lead_price_diff ACME 1212021-09-0109:00:0005ACME 1722021-09-0109:00:0152ACME 1912021-09-0109:00:0222ACME 2132021-09-0109:00:0324ACME 2522021-09-0109:00:044-7ACME 1812021-09-0109:00:05-7-3ACME 1512021-09-0109:00:06-3-1ACME 1422021-09-0109:00:07-110ACME 2422021-09-0109:00:08101ACME 2522021-09-0109:00:091-6ACME 1912021-09-0109:00:10-60
- 差值为负值即价格下降,根据此进行划分标签
tb3 as(select symbol, price, tax, rowtime, lag_price_diff, lead_price_diff, lead_rowtime, sum(if(lag_price_diff>0,1,0)) over(partition by symbol orderby rowtime) flag from tb2 where lag_price_diff<0or lead_price_diff <0)--结果如下symbol price tax rowtime lag_price_diff lead_price_diff lead_rowtime flag ACME 2522021-09-0109:00:044-72021-09-0109:00:051ACME 1812021-09-0109:00:05-7-32021-09-0109:00:061ACME 1512021-09-0109:00:06-3-12021-09-0109:00:071ACME 1422021-09-0109:00:07-1102021-09-0109:00:081ACME 2522021-09-0109:00:091-62021-09-0109:00:102ACME 1912021-09-0109:00:10-602021-09-0109:00:102
- 标签flag为2的不符合连续下降,只有一条差值位置,不是连续下降,需要进行过滤
tb4 as(select symbol, price, tax, rowtime, lead_rowtime, flag, sum(if(lag_price_diff<0,1,0)) over(partition by symbol,flag) ct from tb3 )--结果如下symbol price tax rowtime lead_rowtime flag ct ACME 2522021-09-0109:00:042021-09-0109:00:0513ACME 1812021-09-0109:00:052021-09-0109:00:0613ACME 1512021-09-0109:00:062021-09-0109:00:0713ACME 1422021-09-0109:00:072021-09-0109:00:0813ACME 2522021-09-0109:00:092021-09-0109:00:1021ACME 1912021-09-0109:00:102021-09-0109:00:1021
- 根据真实情况即连续的定义对数据进行过滤,统计结果
select symbol,--flag, min(rowtime) start_tstamp, max(rowtime) bottom_tstamp, max(lead_rowtime) end_tstamp from tb4 where ct >1groupby symbol,flag;--结果如下symbol start_tstamp bottom_tstamp end_tstamp ACME 2021-09-0109:00:042021-09-0109:00:072021-09-0109:00:08
- 下面我们在看一个电商中的场景,用户浏览商品后会进行下单,下单后有可能会进行支付,我们需要分析某日某商品进行浏览、收藏、下单、支付的用户
- 先生成简单的示例数据
with tb1 as(select user_id, shop_id, user_behav, op_time, substr(op_time,1,10) dt fromvalues('1001','A1','浏览','2021-09-01 17:03:01'),('1001','A1','收藏','2021-09-01 17:04:12'),('1001','A2','浏览','2021-09-01 17:02:02'),('1001','A2','收藏','2021-09-01 17:03:42'),('1001','A2','下单','2021-09-01 17:06:25'),('1002','A1','浏览','2021-09-01 17:00:32'),('1002','A1','收藏','2021-09-01 17:03:12'),('1002','A1','浏览','2021-09-01 17:03:45'),('1002','A1','下单','2021-09-01 17:05:41'),('1002','A1','支付','2021-09-01 17:06:26'),('1003','A1','浏览','2021-09-01 17:08:13'),('1003','A1','浏览','2021-09-01 17:09:14') t(user_id,shop_id,user_behav,op_time))
- 我们只看A1店铺的数据,使用collect_list或者wm_concat(Maxcomputer内置函数,Hive中是concat_wm)进行汇总用户的行为
tb2 as(select dt, user_id,-- collect_list(user_behav) wm_concat(",",user_behav) behavs from tb1 where shop_id ='A1'groupby dt,user_id )--展示结果如下dt user_id behavs 2021-09-011001 浏览,收藏 2021-09-011002 浏览,收藏,浏览,下单,支付 2021-09-011003 浏览,浏览
- 匹配规则使用like,这里需要下单之后的行为为支付
select dt, user_id, behavs from tb2 where behavs like'%浏览%收藏%下单_支付%';--结果如下dt user_id behavs 2021-09-011002 浏览,收藏,浏览,下单,支付
使用离线SQL分析分析匹配,主要是按维度把所有行为路径进行汇总拼接,然后使用字符匹配或者复杂的使用正则匹配,实际业务分析过程中,如有类似需求,可以参考上述方式。如有更好的方式,欢迎探讨。
拜了个拜