模式匹配在SQL中应用

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 模式匹配在Flink中经常听到,即CEP。CEP在DataStream API中应用已经非常成熟了,在近两年FlinkSQl中也逐渐应用起来,离线场景中如何应用模式匹配是本文主要研究的方向

很多场景中都会应用到模式匹配如:用户异常行为实时监测、银行卡异地监控、下单未支付等等;FlinkSQL中使用MATCH_RECOGNIZE子句进行复杂事件处理,我们先看个FlinkSQL中如何识别

  1. 在FlinkSQL client中创建一个测试表Ticket 其schema 如下
Ticket
     |-- symbol: String                           # 股票的代号
     |-- price: Long                              # 股票的价格
     |-- tax: Long                                # 股票应纳税额
     |-- rowtime: TimeIndicatorTypeInfo(rowtime)  # 更改这些值的时间点
Flink SQL>CREATETABLE Ticket (>             symbol string,>             price int,>             tax int,>             rowtime TIMESTAMP(3),>             WATERMARK FOR rowtime AS rowtime --模式匹配必须要有水位线>) WITH (>'connector'='filesystem',>'format'='csv',>'path'='file:///mnt/ps/SAS/BigData/file/ticket.csv'>);[INFO] Execute statement succeed.
Flink SQL>
  1. 为了简化,我们只考虑单个股票 ACME 的传入数据。其中的行是连续追加的。查询数据如下
symbol                          price                            tax                        rowtime
ACME                             1212021-09-0109:00:00.000ACME                             1722021-09-0109:00:01.000ACME                             1912021-09-0109:00:02.000ACME                             2132021-09-0109:00:03.000ACME                             2522021-09-0109:00:04.000ACME                             1812021-09-0109:00:05.000ACME                             1512021-09-0109:00:06.000ACME                             1422021-09-0109:00:07.000ACME                             2422021-09-0109:00:08.000ACME                             2522021-09-0109:00:09.000ACME                             1912021-09-0109:00:10.000
  1. 现在的任务是找出一个单一股票价格不断下降的时期
SELECT*FROM Ticket
    MATCH_RECOGNIZE (--只能用于追加表        PARTITION BY symbol --按symbol分组,相同数据会在一个节点进行计算ORDERBY rowtime  --同一组下按事件时间进行排序        MEASURES  --定义输出            START_ROW.rowtimeAS start_tstamp,            LAST(PRICE_DOWN.rowtime)AS bottom_tstamp,            LAST(PRICE_UP.rowtime)AS end_tstamp
        ONE ROW PER MATCH --匹配成功输出一条        AFTER MATCH SKIP TO LAST PRICE_UP --从匹配成功的事件序列中最后一个对应价格上升的事件开始匹配下一次        PATTERN (START_ROW PRICE_DOWN+ PRICE_UP)--定义3个事件:开始行 价格下降 价格回升(+号代表一个或多个数据)        DEFINE  --定义事件的具体含义            PRICE_DOWN AS--上一条价格下降事件的价格为空并且下降事件的价格小于开始行的价格或者下降事件的价格小于上一条的价格(LAST(PRICE_DOWN.price,1)ISNULLAND PRICE_DOWN.price< START_ROW.price)OR                    PRICE_DOWN.price< LAST(PRICE_DOWN.price,1),            PRICE_UP AS--价格回升事件的价格大于上一条价格下降价格                PRICE_UP.price> LAST(PRICE_DOWN.price,1)) MR;--结果如下 从2021-09-01 09:00:04.000开始下降,直到2021-09-01 09:00:08.00回涨symbol      start_tstamp                  bottom_tstamp                     end_tstamp
ACME        2021-09-0109:00:04.0002021-09-0109:00:07.0002021-09-0109:00:08.000
  • 离线SQL如何去实现呢
  1. 先生成示例数据源
with tb1 as(select        symbol,        price,        tax,        rowtime
fromvalues('ACME',12,1,'2021-09-01 09:00:00'),('ACME',17,2,'2021-09-01 09:00:01'),('ACME',19,1,'2021-09-01 09:00:02'),('ACME',21,3,'2021-09-01 09:00:03'),('ACME',25,2,'2021-09-01 09:00:04'),('ACME',18,1,'2021-09-01 09:00:05'),('ACME',15,1,'2021-09-01 09:00:06'),('ACME',14,2,'2021-09-01 09:00:07'),('ACME',24,2,'2021-09-01 09:00:08'),('ACME',25,2,'2021-09-01 09:00:09'),('ACME',19,1,'2021-09-01 09:00:10')               t(symbol,price,tax,rowtime))
  1. 通过计算上一条和下一条的股票价格差判断是否连续下降
tb2 as(select        symbol,        price,        tax,        rowtime,        price-lag(price,1,price) over(partition by symbol orderby rowtime) lag_price_diff,        lead(price,1,price) over(partition by symbol orderby rowtime)-price lead_price_diff,        lead(rowtime,1,rowtime) over(partition by symbol orderby rowtime)as lead_rowtime
from tb1
)--结果展示如下symbol  price tax rowtime lag_price_diff  lead_price_diff
ACME    1212021-09-0109:00:0005ACME    1722021-09-0109:00:0152ACME    1912021-09-0109:00:0222ACME    2132021-09-0109:00:0324ACME    2522021-09-0109:00:044-7ACME    1812021-09-0109:00:05-7-3ACME    1512021-09-0109:00:06-3-1ACME    1422021-09-0109:00:07-110ACME    2422021-09-0109:00:08101ACME    2522021-09-0109:00:091-6ACME    1912021-09-0109:00:10-60
  1. 差值为负值即价格下降,根据此进行划分标签
tb3 as(select        symbol,        price,        tax,        rowtime,        lag_price_diff,        lead_price_diff,        lead_rowtime,        sum(if(lag_price_diff>0,1,0)) over(partition by symbol orderby rowtime) flag
from tb2
where lag_price_diff<0or lead_price_diff <0)--结果如下symbol  price tax rowtime lag_price_diff  lead_price_diff lead_rowtime  flag
ACME  2522021-09-0109:00:044-72021-09-0109:00:051ACME  1812021-09-0109:00:05-7-32021-09-0109:00:061ACME  1512021-09-0109:00:06-3-12021-09-0109:00:071ACME  1422021-09-0109:00:07-1102021-09-0109:00:081ACME  2522021-09-0109:00:091-62021-09-0109:00:102ACME  1912021-09-0109:00:10-602021-09-0109:00:102
  1. 标签flag为2的不符合连续下降,只有一条差值位置,不是连续下降,需要进行过滤
tb4 as(select        symbol,        price,        tax,        rowtime,        lead_rowtime,        flag,        sum(if(lag_price_diff<0,1,0)) over(partition by symbol,flag) ct
from tb3
)--结果如下symbol  price tax rowtime lead_rowtime  flag  ct
ACME  2522021-09-0109:00:042021-09-0109:00:0513ACME  1812021-09-0109:00:052021-09-0109:00:0613ACME  1512021-09-0109:00:062021-09-0109:00:0713ACME  1422021-09-0109:00:072021-09-0109:00:0813ACME  2522021-09-0109:00:092021-09-0109:00:1021ACME  1912021-09-0109:00:102021-09-0109:00:1021
  1. 根据真实情况即连续的定义对数据进行过滤,统计结果
select    symbol,--flag,    min(rowtime) start_tstamp,    max(rowtime) bottom_tstamp,    max(lead_rowtime) end_tstamp
from tb4
where ct >1groupby symbol,flag;--结果如下symbol  start_tstamp        bottom_tstamp       end_tstamp
ACME    2021-09-0109:00:042021-09-0109:00:072021-09-0109:00:08
  • 下面我们在看一个电商中的场景,用户浏览商品后会进行下单,下单后有可能会进行支付,我们需要分析某日某商品进行浏览、收藏、下单、支付的用户
  1. 先生成简单的示例数据
with tb1 as(select        user_id,        shop_id,        user_behav,        op_time,        substr(op_time,1,10) dt
fromvalues('1001','A1','浏览','2021-09-01 17:03:01'),('1001','A1','收藏','2021-09-01 17:04:12'),('1001','A2','浏览','2021-09-01 17:02:02'),('1001','A2','收藏','2021-09-01 17:03:42'),('1001','A2','下单','2021-09-01 17:06:25'),('1002','A1','浏览','2021-09-01 17:00:32'),('1002','A1','收藏','2021-09-01 17:03:12'),('1002','A1','浏览','2021-09-01 17:03:45'),('1002','A1','下单','2021-09-01 17:05:41'),('1002','A1','支付','2021-09-01 17:06:26'),('1003','A1','浏览','2021-09-01 17:08:13'),('1003','A1','浏览','2021-09-01 17:09:14')               t(user_id,shop_id,user_behav,op_time))
  1. 我们只看A1店铺的数据,使用collect_list或者wm_concat(Maxcomputer内置函数,Hive中是concat_wm)进行汇总用户的行为
tb2 as(select        dt,        user_id,-- collect_list(user_behav)        wm_concat(",",user_behav) behavs
from tb1
where shop_id ='A1'groupby dt,user_id
)--展示结果如下dt  user_id behavs
2021-09-011001  浏览,收藏
2021-09-011002  浏览,收藏,浏览,下单,支付
2021-09-011003  浏览,浏览
  1. 匹配规则使用like,这里需要下单之后的行为为支付
select    dt,    user_id,    behavs
from tb2
where behavs like'%浏览%收藏%下单_支付%';--结果如下dt  user_id behavs
2021-09-011002  浏览,收藏,浏览,下单,支付

使用离线SQL分析分析匹配,主要是按维度把所有行为路径进行汇总拼接,然后使用字符匹配或者复杂的使用正则匹配,实际业务分析过程中,如有类似需求,可以参考上述方式。如有更好的方式,欢迎探讨。

拜了个拜

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
SQL Java 数据库
JSP毕业设计宣传网站系统myeclipse开发sql数据库BS模式java编程网页结构
JSP 毕业设计宣传网站系统是一套完善的web设计系统,对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。 二、功能介绍
33 0
|
6月前
|
SQL 前端开发 Java
JSP毕业设计选题系统统myeclipse开发sql数据库BS模式java编程mvc结构
JSP 毕业设计选题系统是一套完善的web设计系统,对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。
28 0
|
6月前
|
SQL Java BI
JSP超市销售管理统myeclipse开发sql数据库BS模式java编程网页结构
JSP 超市销售管理系统是一套完善的web设计系统,对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,系统主要采用B/S模式开发,开发环境为TOMCAT7.0,Myeclipse8.5开发,数据库为SQLSERVER2008,使用java语言开发
25 0
|
4天前
|
SQL 人工智能 自然语言处理
NL2SQL进阶系列(2):DAIL-SQL、DB-GPT开源应用实践详解Text2SQL
NL2SQL进阶系列(2):DAIL-SQL、DB-GPT开源应用实践详解Text2SQL
NL2SQL进阶系列(2):DAIL-SQL、DB-GPT开源应用实践详解Text2SQL
|
1月前
|
存储 SQL
物料清单应用输入模板的SQL存储过程设计
物料清单应用输入模板的SQL存储过程设计
|
3月前
|
SQL Oracle 关系型数据库
Oracle PL/SQL基础知识及应用案例
Oracle PL/SQL基础知识及应用案例
33 0
|
4月前
|
SQL 关系型数据库 C语言
PostgreSQL【应用 03】Docker部署的PostgreSQL扩展SQL之C语言函数(编写、编译、载入)计算向量余弦距离实例分享
PostgreSQL【应用 03】Docker部署的PostgreSQL扩展SQL之C语言函数(编写、编译、载入)计算向量余弦距离实例分享
43 0
|
4月前
|
SQL 关系型数据库 数据库
PostgreSQL【应用 02】扩展SQL之C语言函数(编写、编译、载入)实例分享
PostgreSQL【应用 02】扩展SQL之C语言函数(编写、编译、载入)实例分享
47 0
|
4月前
|
存储 SQL 数据库
数据库第十二次作业 存储过程(PL/SQL语句集)的应用
数据库第十二次作业 存储过程(PL/SQL语句集)的应用
31 0
|
4月前
|
SQL 消息中间件 缓存
Flink SQL中使用DEBUG模式来输出详细的日志信息,
Flink SQL中使用DEBUG模式来输出详细的日志信息,
132 0