开发者社区> 付空> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

【阿里内部应用】利用blink CEP实现流计算中的超时统计问题

简介: 案例与解决方案汇总页:阿里云实时计算产品案例&解决方案汇总 一. 背景介绍 如<利用blink+MQ实现流计算中的延时统计问题>一文中所描述的场景,我们将其简化为以下案例:实时流的数据源结构如下: 物流订单号 支付时间 仓接单时间 仓出库时间 LP1 2018-08-01 08:00 ...
+关注继续查看

案例与解决方案汇总页:
阿里云实时计算产品案例&解决方案汇总

一. 背景介绍

如<利用blink+MQ实现流计算中的延时统计问题>一文中所描述的场景,我们将其简化为以下案例:
实时流的数据源结构如下:

物流订单号 支付时间 仓接单时间 仓出库时间
LP1 2018-08-01 08:00
LP1 2018-08-01 08:00 2018-08-01 09:00
LP2 2018-08-01 09:10
LP2 2018-08-01 09:10 2018-08-01 09:50
LP2 2018-08-01 09:10 2018-08-01 09:50 ​2018-08-01 12:00

我们期望通过以上数据源,按照支付日期统计,每个仓库的仓接单量、仓出库量、仓接单超2H未出库单量、仓接单超6H未出库单量。可以看出,其中LP1仓接单时间是2018-08-01 09:00,但一直到2018-08-01 12:00点之前,一直都没有出库,LP1满足仓接单超2H未出库的行为。

该场景的难点就在于:订单未出库。而对于TT中的源消息流,订单未出库,TT就不会下发新的消息,不下发新的消息,blink就无法被触发计算。而针对上述的场景,对于LP1,我们需要在仓接单时间是2018-08-01 09:00+2H,也就是2018-08-01 11:00的之后,就要知道LP1已经仓接单但超2H未出库了。

二. 解决方案

本文主要是利用blink CEP来实现上述场景,具体实现步骤如下所述。
第一步:在source DDL中定义event_timestamp,并定义sink,如下:

----定义source
create table sourcett_dwd_ri
(
     lg_order_code                  varchar comment '物流订单号'
    ,ded_pay_time                   varchar comment '支付时间'
    ,store_code                     varchar comment '仓库编码'
    ,store_name                     varchar comment '仓库名称'
    ,wms_create_time                varchar comment '仓接单时间'
    ,wms_consign_create_time        varchar comment '仓出库时间'
    ,evtstamp as case when coalesce(wms_create_time, '') <> ''
                      then to_timestamp(wms_create_time, 'yyyy-MM-dd HH:mm:ss')
                      else to_timestamp('1970-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss')
                 end   --构造event_timestamp,如果源表本身带有消息的occur_time,可直接选择occur_time作为event_timestamp
    ,WATERMARK FOR evtstamp AS withOffset(evtstamp, 10000)  --设置延迟10秒处理
)
with
(
     type='tt'
    ,topic='dwd_ri'
    ,accessKey='xxxxxx'
    ,accessId='xxxxxx'
    ,lengthCheck='PAD'
    ,nullValues='\\N|'
);


----定义sink
create table sink_hybrid_blink_cep
(
     ded_pay_date                   varchar comment '支付日期'
    ,store_code                     varchar comment '仓库编码'
    ,store_name                     varchar comment '仓库名称'
    ,wms_create_ord_cnt             bigint  comment '仓接单量'
    ,wms_confirm_ord_cnt            bigint  comment '仓出库量'
    ,wmsin_nowmsout_2h_ord_cnt      bigint  comment '仓接单超2小时未出库单量'
    ,wmsin_nowmsout_6h_ord_cnt      bigint  comment '仓接单超6小时未出库单量'    
    ,sub_partition                  bigint  comment '二级分区(支付日期)'
    ,PRIMARY KEY (ded_pay_date, store_code, sub_partition)
)
with
(
     type='PetaData'
    ,url = 'xxxxxx'
    ,tableName='blink_cep'
    ,userName='xxxxxx'
    ,password='xxxxxx'
    ,bufferSize='30000'
    ,batchSize='3000'
    ,batchWriteTimeoutMs='15000'
);

第二步:根据blink CEP的标准语义进行改写,如下:

create view blink_cep_v1
as
select   '仓接单-仓出库超时' as timeout_type
        ,lg_order_code
        ,wms_create_time as start_time
        ,wms_consign_create_time as end_time
from     source_dwd_csn_whc_lgt_fl_ord_ri
MATCH_RECOGNIZE
(
         PARTITION BY lg_order_code
         ORDER BY     evtstamp
         MEASURES
                      e1.wms_create_time         as wms_create_time
                     ,e2.wms_consign_create_time as wms_consign_create_time
         ONE ROW PER MATCH WITH TIMEOUT ROWS  --重要,必须设置延迟也下发
         AFTER MATCH SKIP TO NEXT ROW
         PATTERN (e1 -> e2) WITHIN INTERVAL '6' HOUR
         EMIT TIMEOUT (INTERVAL '2' HOUR, INTERVAL '6' HOUR)
         DEFINE
             e1 as e1.wms_create_time is not null and e1.wms_consign_create_time is null
            ,e2 as e2.wms_create_time is not null and e2.wms_consign_create_time is not null
)
where    wms_create_time is not null      --重要,可以大大减少进入CEP的消息量
and      wms_consign_create_time is null  --重要,可以大大减少进入CEP的消息量
;

第三步:根据blink的执行机制,我们通过源实时流sourcett_dwd_ri与超时消息流blink_cep_v1关联,来触发blink对超时消息进行聚合操作,如下:

create view blink_cep_v2
as
select   a.lg_order_code                       as lg_order_code
        ,last_value(a.store_code             ) as store_code
        ,last_value(a.store_name             ) as store_name
        ,last_value(a.ded_pay_time           ) as ded_pay_time
        ,last_value(a.wms_create_time        ) as wms_create_time
        ,last_value(a.real_wms_confirm_time  ) as real_wms_confirm_time
        ,last_value(case when coalesce(a.wms_create_time, '') <> ''
                         and  coalesce(a.real_wms_confirm_time, '') = '' 
                         and  now() - unix_timestamp(a.wms_create_time,'yyyy-MM-dd HH:mm:ss') >= 7200
                         then 'Y' else 'N' end) as flag_01
        ,last_value(case when coalesce(a.wms_create_time, '') <> ''
                         and  coalesce(a.real_wms_confirm_time, '') = '' 
                         and  now() - unix_timestamp(a.wms_create_time,'yyyy-MM-dd HH:mm:ss') >= 21600
                         then 'Y' else 'N' end) as flag_02
from
        (select   lg_order_code                       as lg_order_code
                 ,last_value(store_code             ) as store_code
                 ,last_value(store_name             ) as store_name
                 ,last_value(ded_pay_time           ) as ded_pay_time
                 ,last_value(wms_create_time        ) as wms_create_time
                 ,last_value(wms_consign_create_time) as real_wms_confirm_time
         from     sourcett_dwd_ri
         group by lg_order_code
         ) a
left outer join
        (select   lg_order_code
                 ,count(*) as cnt
         from     blink_cep_v1
         group by lg_order_code
         ) b
on       a.lg_order_code = b.lg_order_code
group by a.lg_order_code
;


insert into sink_hybrid_blink_cep
select   regexp_replace(substring(a.ded_pay_time, 1, 10), '-', '') as ded_pay_date
        ,a.store_code
        ,max(a.store_name)        as store_name
        ,count(case when coalesce(a.wms_create_time, '') <> '' then a.lg_order_code end) as wmsin_ord_cnt
        ,count(case when coalesce(a.real_wms_confirm_time, '') <> '' then a.lg_order_code end) as wmsout_ord_cnt
        ,count(case when a.flag_01 = 'Y' then a.lg_order_code end) as wmsin_nowmsout_2h_ord_cnt
        ,count(case when a.flag_02 = 'Y' then a.lg_order_code end) as wmsin_nowmsout_6h_ord_cnt
        ,cast(regexp_replace(SUBSTRING(ded_pay_time, 1, 10), '-', '') as bigint) as sub_partition
from     blink_cep_v2 as t1
where    coalesce(lg_cancel_time, '') = ''
and      coalesce(ded_pay_time, '') <> ''
group by regexp_replace(substring(ded_pay_time, 1, 10), '-', '')
        ,a.store_code
;

三. 问题拓展

  1. blink CEP的参数比较多,要完全看懂,着实需要一些时间,但CEP的强大是毋庸置疑的。CEP不仅可以解决物流场景中的超时统计问题,风控中的很多场景也是信手拈来。这里有一个风控中的场景,通过上述物流案例的用法,我们是否能推敲出这个场景的用法呢?
    风控案例测试数据如下:
刷卡时间 银行卡ID 刷卡地点
2018-04-13 12:00:00 1 WW
2018-04-13 12:05:00 1 WW1
2018-04-13 12:10:00 1 WW2
2018-04-13 12:20:00 1 WW

我们认为,当一张银行卡在10min之内,在不同的地点被刷卡大于等于两次,我们就期望对消费者出发预警机制。

  1. blink CEP是万能的么?答案是否定的,当消息乱序程度比较高的时候,实时性和准确性就成了一对矛盾的存在。要想实时性比较高,必然要求设置的offset越小越好,但offset设置比较小,就直接可能导致很多eventtime<watermark-offset的消息,直接被丢弃,准确性很难保证。比如,在CP回传物流详情的时候,经常回传的时间跟实操的时间差异很大(实操时间是10点,但回传时间是15点),如果以实操时间作为eventtime,可能就会导致这种差异很大的消息被直接丢掉,无法进入CEP,进而无法触发CEP后续的计算,在使用CEP的过程中,应该注意这一点。

四. 作者简介

花名:缘桥,来自菜鸟-CTO-数据部-仓配数据研发,主要负责菜鸟仓配业务的离线和实时数据仓库建设以及创新数据技术和工具的探索和应用。

image

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
主流实时流处理计算框架Flink初体验。
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。Flink 被设计为在所有常见的集群环境中运行,以内存中的速度和任何规模执行计算。Apache Flink 是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架。
46 0
Flink实时计算pv、uv的几种方法
我的Git地址实时统计pv、uv是再常见不过的大数据统计需求了,前面出过一篇SparkStreaming实时统计pv,uv的案例,这里用Flink实时计算pv,uv。我们需要统计不同数据类型每天的pv,uv情况。因此计算pv、uv的几种方法。
316 0
实时计算 Flink 版应用场景与产品介绍
本文由阿里巴巴高级产品专家陈守元老师分享,详细讲解实时计算 Flink 的具体业务场景并分享实时计算 Flink 的相关应用案例。
6610 0
【行业应用】阿里云实时计算 Flink 版 IoT 行业解决方案
物联网拉近分散的资讯,统整物与物的数位信息,主要应用领域包括以下方面:运输和物流领域、健康医疗领域、智慧环境(家庭、办公、工厂)领域、个人和社会领域等,具有十分广阔的市场应用前景。物联网将智能感知、识别技术、网络通信与普适计算等技术融合起来,被认为是继计算机、互联网、智能手机之后世界信息产业发展的下一个风口。
6103 0
【行业应用】阿里云实时计算 Flink 版游戏行业解决方案
游戏作为新兴崛起的娱乐产业,当下发展得如火如荼,其吸金能力和趣味性也吸引更多企业与人才投入其中。游戏行业公司主要分为发行和制作两类,游戏的类型可细分为手游、页游和端游三种,随着移动端设备的更新发展以及 5G 时代的降临,手游将迎来绝佳的发展时机。
2674 0
【行业应用】阿里云实时计算 Flink 版金融行业解决方案
基于实时计算 Flink 版的解决方案可帮助金融机构从容应对上述挑战,通过 Flink 构建实时数仓、实时反欺诈系统,助力金融机构快速构建实时风控体系。
9582 0
实时计算Flink——应用场景
实时计算 Flink使用Flink SQL,主打流式数据分析场景。目前在如下领域有使用场景。 实时ETL 集成流计算现有的诸多数据通道和SQL灵活的加工能力,对流式数据进行实时清洗、归并、结构化处理。
7307 0
新华网与阿里合资创立云计算公司
本文讲的是新华网与阿里合资创立云计算公司【IT168 评论】11日晚,新华网发布公告称,新华网股份有限公司、新华新媒文化传播有限公司、中国经济信息社、杭州阿里创业投资有限公司以及杭州数问云投资合伙企业(有限合伙)拟签署《合资经营协议》并共同设立新华智云科技有限公司,合资公司的注册资本为49019.6079万元。
2049 0
新华网与阿里合资创立云计算公司
本文讲的是新华网与阿里合资创立云计算公司【IT168 评论】11日晚,新华网发布公告称,新华网股份有限公司、新华新媒文化传播有限公司、中国经济信息社、杭州阿里创业投资有限公司以及杭州数问云投资合伙企业(有限合伙)拟签署《合资经营协议》并共同设立新华智云科技有限公司,合资公司的注册资本为49019.6079万元。
1292 0
+关注
付空
阿里云实时计算产品经理付空
22
文章
0
问答
来源圈子
更多
实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache Flink 构建的企业级、高性能实时大数据处理系统,由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。
+ 订阅
相关文档: 实时计算(流计算)
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载