模式匹配在SQL中应用

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 模式匹配在Flink中经常听到,即CEP。CEP在DataStream API中应用已经非常成熟了,在近两年FlinkSQl中也逐渐应用起来,离线场景中如何应用模式匹配是本文主要研究的方向

很多场景中都会应用到模式匹配如:用户异常行为实时监测、银行卡异地监控、下单未支付等等;FlinkSQL中使用MATCH_RECOGNIZE子句进行复杂事件处理,我们先看个FlinkSQL中如何识别

  1. 在FlinkSQL client中创建一个测试表Ticket 其schema 如下
Ticket
     |-- symbol: String                           # 股票的代号
     |-- price: Long                              # 股票的价格
     |-- tax: Long                                # 股票应纳税额
     |-- rowtime: TimeIndicatorTypeInfo(rowtime)  # 更改这些值的时间点
Flink SQL>CREATETABLE Ticket (>             symbol string,>             price int,>             tax int,>             rowtime TIMESTAMP(3),>             WATERMARK FOR rowtime AS rowtime --模式匹配必须要有水位线>) WITH (>'connector'='filesystem',>'format'='csv',>'path'='file:///mnt/ps/SAS/BigData/file/ticket.csv'>);[INFO] Execute statement succeed.
Flink SQL>
  1. 为了简化,我们只考虑单个股票 ACME 的传入数据。其中的行是连续追加的。查询数据如下
symbol                          price                            tax                        rowtime
ACME                             1212021-09-0109:00:00.000ACME                             1722021-09-0109:00:01.000ACME                             1912021-09-0109:00:02.000ACME                             2132021-09-0109:00:03.000ACME                             2522021-09-0109:00:04.000ACME                             1812021-09-0109:00:05.000ACME                             1512021-09-0109:00:06.000ACME                             1422021-09-0109:00:07.000ACME                             2422021-09-0109:00:08.000ACME                             2522021-09-0109:00:09.000ACME                             1912021-09-0109:00:10.000
  1. 现在的任务是找出一个单一股票价格不断下降的时期
SELECT*FROM Ticket
    MATCH_RECOGNIZE (--只能用于追加表        PARTITION BY symbol --按symbol分组,相同数据会在一个节点进行计算ORDERBY rowtime  --同一组下按事件时间进行排序        MEASURES  --定义输出            START_ROW.rowtimeAS start_tstamp,            LAST(PRICE_DOWN.rowtime)AS bottom_tstamp,            LAST(PRICE_UP.rowtime)AS end_tstamp
        ONE ROW PER MATCH --匹配成功输出一条        AFTER MATCH SKIP TO LAST PRICE_UP --从匹配成功的事件序列中最后一个对应价格上升的事件开始匹配下一次        PATTERN (START_ROW PRICE_DOWN+ PRICE_UP)--定义3个事件:开始行 价格下降 价格回升(+号代表一个或多个数据)        DEFINE  --定义事件的具体含义            PRICE_DOWN AS--上一条价格下降事件的价格为空并且下降事件的价格小于开始行的价格或者下降事件的价格小于上一条的价格(LAST(PRICE_DOWN.price,1)ISNULLAND PRICE_DOWN.price< START_ROW.price)OR                    PRICE_DOWN.price< LAST(PRICE_DOWN.price,1),            PRICE_UP AS--价格回升事件的价格大于上一条价格下降价格                PRICE_UP.price> LAST(PRICE_DOWN.price,1)) MR;--结果如下 从2021-09-01 09:00:04.000开始下降,直到2021-09-01 09:00:08.00回涨symbol      start_tstamp                  bottom_tstamp                     end_tstamp
ACME        2021-09-0109:00:04.0002021-09-0109:00:07.0002021-09-0109:00:08.000
  • 离线SQL如何去实现呢
  1. 先生成示例数据源
with tb1 as(select        symbol,        price,        tax,        rowtime
fromvalues('ACME',12,1,'2021-09-01 09:00:00'),('ACME',17,2,'2021-09-01 09:00:01'),('ACME',19,1,'2021-09-01 09:00:02'),('ACME',21,3,'2021-09-01 09:00:03'),('ACME',25,2,'2021-09-01 09:00:04'),('ACME',18,1,'2021-09-01 09:00:05'),('ACME',15,1,'2021-09-01 09:00:06'),('ACME',14,2,'2021-09-01 09:00:07'),('ACME',24,2,'2021-09-01 09:00:08'),('ACME',25,2,'2021-09-01 09:00:09'),('ACME',19,1,'2021-09-01 09:00:10')               t(symbol,price,tax,rowtime))
  1. 通过计算上一条和下一条的股票价格差判断是否连续下降
tb2 as(select        symbol,        price,        tax,        rowtime,        price-lag(price,1,price) over(partition by symbol orderby rowtime) lag_price_diff,        lead(price,1,price) over(partition by symbol orderby rowtime)-price lead_price_diff,        lead(rowtime,1,rowtime) over(partition by symbol orderby rowtime)as lead_rowtime
from tb1
)--结果展示如下symbol  price tax rowtime lag_price_diff  lead_price_diff
ACME    1212021-09-0109:00:0005ACME    1722021-09-0109:00:0152ACME    1912021-09-0109:00:0222ACME    2132021-09-0109:00:0324ACME    2522021-09-0109:00:044-7ACME    1812021-09-0109:00:05-7-3ACME    1512021-09-0109:00:06-3-1ACME    1422021-09-0109:00:07-110ACME    2422021-09-0109:00:08101ACME    2522021-09-0109:00:091-6ACME    1912021-09-0109:00:10-60
  1. 差值为负值即价格下降,根据此进行划分标签
tb3 as(select        symbol,        price,        tax,        rowtime,        lag_price_diff,        lead_price_diff,        lead_rowtime,        sum(if(lag_price_diff>0,1,0)) over(partition by symbol orderby rowtime) flag
from tb2
where lag_price_diff<0or lead_price_diff <0)--结果如下symbol  price tax rowtime lag_price_diff  lead_price_diff lead_rowtime  flag
ACME  2522021-09-0109:00:044-72021-09-0109:00:051ACME  1812021-09-0109:00:05-7-32021-09-0109:00:061ACME  1512021-09-0109:00:06-3-12021-09-0109:00:071ACME  1422021-09-0109:00:07-1102021-09-0109:00:081ACME  2522021-09-0109:00:091-62021-09-0109:00:102ACME  1912021-09-0109:00:10-602021-09-0109:00:102
  1. 标签flag为2的不符合连续下降,只有一条差值位置,不是连续下降,需要进行过滤
tb4 as(select        symbol,        price,        tax,        rowtime,        lead_rowtime,        flag,        sum(if(lag_price_diff<0,1,0)) over(partition by symbol,flag) ct
from tb3
)--结果如下symbol  price tax rowtime lead_rowtime  flag  ct
ACME  2522021-09-0109:00:042021-09-0109:00:0513ACME  1812021-09-0109:00:052021-09-0109:00:0613ACME  1512021-09-0109:00:062021-09-0109:00:0713ACME  1422021-09-0109:00:072021-09-0109:00:0813ACME  2522021-09-0109:00:092021-09-0109:00:1021ACME  1912021-09-0109:00:102021-09-0109:00:1021
  1. 根据真实情况即连续的定义对数据进行过滤,统计结果
select    symbol,--flag,    min(rowtime) start_tstamp,    max(rowtime) bottom_tstamp,    max(lead_rowtime) end_tstamp
from tb4
where ct >1groupby symbol,flag;--结果如下symbol  start_tstamp        bottom_tstamp       end_tstamp
ACME    2021-09-0109:00:042021-09-0109:00:072021-09-0109:00:08
  • 下面我们在看一个电商中的场景,用户浏览商品后会进行下单,下单后有可能会进行支付,我们需要分析某日某商品进行浏览、收藏、下单、支付的用户
  1. 先生成简单的示例数据
with tb1 as(select        user_id,        shop_id,        user_behav,        op_time,        substr(op_time,1,10) dt
fromvalues('1001','A1','浏览','2021-09-01 17:03:01'),('1001','A1','收藏','2021-09-01 17:04:12'),('1001','A2','浏览','2021-09-01 17:02:02'),('1001','A2','收藏','2021-09-01 17:03:42'),('1001','A2','下单','2021-09-01 17:06:25'),('1002','A1','浏览','2021-09-01 17:00:32'),('1002','A1','收藏','2021-09-01 17:03:12'),('1002','A1','浏览','2021-09-01 17:03:45'),('1002','A1','下单','2021-09-01 17:05:41'),('1002','A1','支付','2021-09-01 17:06:26'),('1003','A1','浏览','2021-09-01 17:08:13'),('1003','A1','浏览','2021-09-01 17:09:14')               t(user_id,shop_id,user_behav,op_time))
  1. 我们只看A1店铺的数据,使用collect_list或者wm_concat(Maxcomputer内置函数,Hive中是concat_wm)进行汇总用户的行为
tb2 as(select        dt,        user_id,-- collect_list(user_behav)        wm_concat(",",user_behav) behavs
from tb1
where shop_id ='A1'groupby dt,user_id
)--展示结果如下dt  user_id behavs
2021-09-011001  浏览,收藏
2021-09-011002  浏览,收藏,浏览,下单,支付
2021-09-011003  浏览,浏览
  1. 匹配规则使用like,这里需要下单之后的行为为支付
select    dt,    user_id,    behavs
from tb2
where behavs like'%浏览%收藏%下单_支付%';--结果如下dt  user_id behavs
2021-09-011002  浏览,收藏,浏览,下单,支付

使用离线SQL分析分析匹配,主要是按维度把所有行为路径进行汇总拼接,然后使用字符匹配或者复杂的使用正则匹配,实际业务分析过程中,如有类似需求,可以参考上述方式。如有更好的方式,欢迎探讨。

拜了个拜

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
SQL 存储 测试技术
SQL在构建系统中的应用:关键步骤与技巧
在构建基于数据库的应用系统时,SQL(Structured Query Language)作为与数据库交互的核心语言,扮演着至关重要的角色
|
2月前
|
SQL 数据库
如何应用SQL约束条件?
【10月更文挑战第28天】如何应用SQL约束条件?
80 11
|
3月前
|
SQL Oracle 关系型数据库
SQL语言的主要标准及其应用技巧
SQL(Structured Query Language)是数据库领域的标准语言,广泛应用于各种数据库管理系统(DBMS)中,如MySQL、Oracle、SQL Server等
|
2月前
|
SQL 监控 安全
员工上网行为监控软件:SQL 在数据查询监控中的应用解析
在数字化办公环境中,员工上网行为监控软件对企业网络安全和管理至关重要。通过 SQL 查询和分析数据库中的数据,企业可以精准了解员工的上网行为,包括基础查询、复杂条件查询、数据统计与分析等,从而提高网络管理和安全防护的效率。
32 0
|
3月前
|
SQL 数据库 数据库管理
数据库SQL函数应用技巧与方法
在数据库管理中,SQL函数是处理和分析数据的强大工具
|
3月前
|
SQL 存储 数据库
SQL在构建系统中的应用:关键要素与编写技巧
在构建基于数据库的系统时,SQL(Structured Query Language)扮演着至关重要的角色
|
3月前
|
SQL 关系型数据库 数据库
SQL数据库:核心原理与应用实践
随着信息技术的飞速发展,数据库管理系统已成为各类组织和企业中不可或缺的核心组件。在众多数据库管理系统中,SQL(结构化查询语言)数据库以其强大的数据管理能力和灵活性,广泛应用于各类业务场景。本文将深入探讨SQL数据库的基本原理、核心特性以及实际应用。一、SQL数据库概述SQL数据库是一种关系型数据库
122 5
|
3月前
|
SQL 存储 Oracle
Oracle数据库SQL语句详解与应用指南
在数字化时代,数据库已成为各类企业和组织不可或缺的核心组件。Oracle数据库作为业界领先的数据库管理系统之一,广泛应用于各种业务场景。掌握Oracle数据库的SQL语句是数据库管理员、开发人员及运维人员的基本技能。本文将详细介绍Oracle数据库SQL语句的基本概念、语法、应用及最佳实践。一、Or
93 3
|
3月前
|
SQL
sql树型应用总结
sql树型应用总结
35 2
|
3月前
|
SQL 数据库 索引
内连接(INNER JOIN)在SQL中的简单应用与技巧
在SQL查询中,内连接(INNER JOIN)是一种基本且常用的连接类型,用于从两个或多个表中检索匹配的记录