开发者社区> 阿里云实时计算Flink> 正文

Flink SQL 功能解密系列 —— 流计算“撤回(Retraction)”案例分析

简介: 通俗讲retract就是传统数据里面的更新操作,也就是说retract是流式计算场景下对数据更新的处理方式。
+关注继续查看

什么是retraction(撤回)

通俗讲retract就是传统数据里面的更新操作,也就是说retract是流式计算场景下对数据更新的处理
方式。
首先来看下流场景下的一个词频统计列子。

image.png

没有retract会导致最终结果不正确↑:

image.png
retract发挥的作用

下面再分享两个双十一期间retract保证数据正确性的业务case:

case1: 菜鸟物流订单统计

同一个订单的商品在运输过程中,因为各种原因,物流公司是有可能从A变成B的。为了统计物流公司承担的订单数目,菜鸟团队使用blink计算的retraction机制进行变key汇总操作。

-- TT source_table 数据如下:
order_id      tms_company
0001           中通
0002           中通
0003           圆通

-- SQL代码
create view dwd_table as 
select
    order_id,
    StringLast(tms_company)
from source_table
group by order_id;

create view dws_table as 
select 
    tms_company,
    count(distinct order_id) as order_cnt
from dwd_table 
group by tms_company


此时结果为:
tms_company  order_cnt
中通          2
圆通          1

-----------------------
之后又来了一条新数据 0001的订单 配送公司改成 圆通了。这时,第一层group by的会先向下游发送一条 (0001,中通)的撤回消息,第二层group by节点收到撤回消息后,会将这个节点 中通对应的 value减少1,并更新到结果表中;然后第一层的分桶统计逻辑向下游正常发送(0001,圆通)的正向消息,更新了圆通物流对应的订单数目,达到了最初的汇总目的。

order_id      tms_company
0001           中通
0002           中通
0003           圆通
0001           圆通

写入ADS结果会是(满足需求)
tms_company  order_cnt
中通          1
圆通          2

case2: 天猫双十一购物车加购统计:

双11爆款清单与知名综艺IP“火星情报局”跨界合作,汪涵、撒贝宁、陶晶莹等大咖主持加盟,杭州、长沙两地联播,成功打造为“双11子IP”与“双11购物风向标”,树立电商内容综艺化、娱乐化创新典范,为长线模式探索打下基础。

首次深度联动线下场景,在银泰门店落地爆款清单超级大屏,商场人流截停率28%,用户互动时间占营业时间的40%。

选品模式创新,打造最全维度爆款清单:TOP2000性价比爆款+TOP100小黑盒推荐(新品清单)+TOP200买手天团推荐(人群/场景/地域 清单)

核心业务指标

  • 加购金额
  • 加购件数
  • 加购UV

业务计算逻辑

  • 来自TT的数据要进行去重;
  • 以投放场景和购物车维度进行分组,获取每个分组的最后一条(最新)数据;
  • 以投放场景和小时为维度进行分组,统计 加购金额,加购件数和加购UV 业务指标;

业务BlinkSQL代码

--Blink SQL
--********************************************************************--
--Comment: 天猫双11官方爆款清单统计计算
--********************************************************************--
CREATE TABLE dwd_mkt_membercart_ri(
    cart_id      BIGINT, -- '购物车id',
    sku_id       BIGINT, -- '存放商品的skuId,无sku时,为0',
    item_id      BIGINT, -- '外部id:商品id或者skuid',
    quantity     BIGINT, -- '购买数量',
    user_id      BIGINT, -- '用户id',
    status       BIGINT, -- '状态1:正常-1:删除',
    gmt_create   VARCHAR, -- '属性创建时间',
    gmt_modified VARCHAR, -- '属性修改时间',
    biz_id VARCHAR, -- 投放场景,
    start_time VARCHAR, -- 投放开始时间
    end_time VARCHAR, -- 投放结束时间
    activity_price_time VARCHAR, -- 活动开始时间
    price VARCHAR, -- 商品价格
    dbsync_operation BIGINT -- 时间自动用于排序
) 
WITH 
(
    type='tt'
    -- 其他信息省略
);

--groub by 方式重复,防止TT重发
CREATE VIEW distinct_dwd_mkt_membercart_ri AS 
SELECT
    cart_id,
    sku_id,
    item_id,
    quantity,
    user_id,
    status,
    gmt_create,
    gmt_modified,
    biz_id, 
    start_time, 
    end_time, 
    activity_price_time,
    price, 
    dbsync_operation
FROM
    dwd_mkt_membercart_ri
GROUP BY    
    cart_id,
    sku_id,
    item_id,
    quantity,
    user_id,
    status,
    gmt_create,
    gmt_modified,
    biz_id, 
    start_time, 
    end_time, 
    activity_price_time,
    price, 
    dbsync_operation;

-- 每个投放和购物车数据的最后一条
CREATE VIEW tmp_dwd_mkt_membercart_ri AS 
SELECT 
    biz_id as biz_id,
    LAST_VALUE(user_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as user_id,
    LAST_VALUE(item_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as item_id, 
    LAST_VALUE(sku_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as sku_id,
    LAST_VALUE(start_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as start_time,
    LAST_VALUE(end_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as end_time,
    LAST_VALUE(activity_price_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as activity_price_time,
    LAST_VALUE(price,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as price,
    LAST_VALUE(quantity,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as quantity,
    LAST_VALUE(status,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as status,
    LAST_VALUE(gmt_modified,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as gmt_modified,
    LAST_VALUE(gmt_create,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as gmt_create,
    LAST_VALUE(dbsync_operation,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as dbsync_operation
FROM distinct_dwd_mkt_membercart_ri
WHERE DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMdd')=DATE_FORMAT(gmt_modified , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMdd')
GROUP BY 
cart_id,biz_id;

--存储小时维度的计算结果
CREATE TABLE result_mkt_membercart_ri_eh(
    id VARCHAR, 
    data_time VARCHAR,  
    all_preheating_cart_cnt BIGINT, -- 预热期间的 加购件数
    all_preheating_cart_alipay BIGINT,-- 预热期间的 加购金额
    eh_all_preheating_cart_uv BIGINT,-- 预热期间的 加购UV
    all_cart_cnt BIGINT, -- 投放期间的 加购件数
    all_cart_alipay BIGINT, -- 投放期间的 加购金额
    eh_all_cart_uv BIGINT, -- 投放期间的 加购UV
    primary key(id,data_time)
) WITH (
    type = 'custom',
     -- 其他信息省略
    timeDiv='hour'
) ;
--统计小时维度的 xx xx xx 业务指标
INSERT INTO result_mkt_membercart_ri_eh 
SELECT 
    biz_id,
    DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMddHH') data_time, 
    `sum`(case when gmt_modified<=COALESCE(activity_price_time,end_time) then quantity else 0 end) as all_preheating_cart_cnt,
    `sum`(case when gmt_modified<=COALESCE(activity_price_time,end_time) then quantity*CAST(price AS BIGINT) else 0 end) as all_preheating_cart_alipay,
    `sum`((case when gmt_modified<=COALESCE(activity_price_time,end_time) then user_id end)) eh_all_preheating_cart_uv,
    `sum`(quantity) as all_cart_cnt,
    `sum`(quantity*CAST(price AS BIGINT)) as all_cart_alipay,
    `count`(distinct user_id) eh_all_cart_uv
FROM tmp_dwd_mkt_membercart_ri 
WHERE 
   status>0 
GROUP BY  biz_id ,DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMddHH') ;

上面case2天猫业务场景里面的加购金额统计来说,当每个投放场景的购物车的数据发生变化时候,就意味着上面【CREATE VIEW tmp_dwd_mkt_membercart_ri 】中的LAST_VALUE发生变化,最外层的sum统计【INSERT INTO result_mkt_membercart_ri_eh 】就要将前一条的LAST_VALUE【VALUE-1】撤回,用update的新LAST_VALUE【VALUE-2】进行求和统计,这样blink就需要有一种机制将VALUE-1进行撤回,利用【VALUE-2】进行计算,这种机制我们称为retract。

retract 实现原理参考

https://docs.google.com/document/d/18XlGPcfsGbnPSApRipJDLPg5IFNGTQjnz7emkVpZlkw/edit#heading=h.cjkoun4w44l4

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Apache Flink CDC 批流融合技术原理分析
以 Flink SQL 案例来介绍 Flink CDC 2.0 的使用,并解读 CDC 中的核心设计。
821 0
使用Spark SQL进行流式机器学习计算(上)
今天来和大家简单说一下如何使用Spark SQL进行流式数据的机器学习处理
1848 0
从 0 到 1 通过 Flink + Tablestore 进行大数据处理与分析
阿里云实时计算Flink版是一套基于 Apache Flink 构建的⼀站式实时大数据分析平台。在大数据场景下,实时计算 Flink 可提供端到端亚秒级实时数据流批处理能力。表格存储 Tablestore (又名 OTS)是阿里云自研的多模型结构化数据存储,可提供海量结构化数据的存储、查询分析服务。表格存储的双引擎架构支持千万TPS和毫秒级延迟的服务能力,可作为大数据计算的极佳上下游存储。
465 0
9.28直播预告|AnalyticDB for PostgreSQL功能发布 - 外表联邦分析&列存引擎增强
本次分享主要介绍云原生数据仓库ADB PG公共云近期发布的两项重要功能,外表联邦分析和列存引擎增强的技术解析,和最佳使用实践,欢迎大家观看直播。
1288 0
《Kafka Stream》调研:一种轻量级流计算模式
流计算,已经有Storm、Spark,Samza,包括最近新起的Flink,Kafka为什么再自己做一套流计算呢?Kafka Stream 与这些框架比有什么优势?Samza、Consumer Group已经包装了Kafka轻量级的消费功能,难道不够吗? 花了一些时间阅读[docs](http
24299 0
表格存储 SQL 功能快速上手
# 功能介绍 表格存储(Tablestore)是阿里云自研的多模型结构化数据存储,提供海量结构化数据存储以及快速的查询和分析服务。表格存储的分布式存储和强大的索引引擎能够支持 PB 级存储、千万 TPS 以及毫秒级延迟的服务能力。使用表格存储你可以方便的存储和查询你的海量数据。​ 表格存储正式发布了 SQL 功能,满足用户业务平滑迁移到表格存储并可以继续通过 SQL 方式访问表格存储,表格存储
642 0
新功能初探 | RDS MySQL 8.0 支持 DML 语句 returning
MySQL 对于 statement 执行结果报文通常分为两类 Resultset 和 OK/ERR,针对 DML 语句则返回OK/ERR 报文,其中包括几个影响记录,扫描记录等属性。
7011 0
一文让你秒懂批量计算的功能特性
用户使用工具(如 SDK,命令行工具等)向 BatchCompute 提交作业,BatchCompute 使用用户指定的镜像(如:ubuntu)启动虚拟机(VM), 在虚拟机中运行用户程序, 运行完成后释放虚拟机(VM)。
567 0
637
文章
18
问答
来源圈子
更多
实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache Flink 构建的企业级、高性能实时大数据处理系统,由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。
+ 订阅
相关文档: 实时计算(流计算)
文章排行榜
最热
最新
相关电子书
更多
JS零基础入门教程(上册)
立即下载
性能优化方法论
立即下载
手把手学习日志服务SLS,云启实验室实战指南
立即下载