如何正确的做增量加工

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 回到十多年前,增量加工这个方法并不是一种需要特别需要提出的方法,因为关系数据库的存储与计算性能十分有限(即便是MPP数据库平台也不是全都是做全量加工),增量加工是最普遍的方式。本文讲述了如何在MaxCompute上用与关系数据库的不同的方式做增量数据的加工。

1.增量加工

回到十多年前,增量加工这个方法并不是一种需要特别需要提出的方法,因为关系数据库的存储与计算性能十分有限(即便是MPP数据库平台也不是全都是做全量加工),增量加工是最普遍的方式。

数据库系统是支持事务的,ACID(原子性、一致性、隔离性、持久性)四大特性可以完美的支持在一个数据表上同时做更新、删除、插入操作。数据库系统的数据存储是到每一个4K或者8K这种大小的数据块上的,详细的统计信息与索引结构都允许我们高效来做增量数据处理。

1.1.问题简述

在当前的MaxCompute这种分布式文件系统上,这些操作都变得不容易了。我们的数据块已经是64MB,不是KB这个量级。我们也没有索引这种加速从一千万数据中找到五十行数据的结构。

那么我们怎么在MaxCompute做增量加工呢?说实话,不太好做。因为没有索引结构,我们每一次的处理都是全量数据检索。如果还是跟之前在关系数据库一样频繁的提交,不但无法体现增量加工的性能与资源优势,反而成为了劣势。(如果我们还想使用关系数据库支持的delete、update这些特性,可以看下MaxCompute公共云近期上线的新特性“Transactional表”。)

那么我们要不要做呢?总结一句话:能做的地方还是可以做一下,但是不要勉强,不要大规模的去做,毕竟做增量加工不容易。

2.解决方案

增量加工的前提是我们获取到了增量数据,相比全量数据增量数据是一个更小的集合,然后我们希望利用这个小增量集合来完成数据加工的过程而不是使用全量,这样就可以更快速、更节约的完成整个数据加工过程。

但是增量加工在MaxCompute总结为两个场景:

场景一,全量加工所需资源无法满足时效性要求,性能急需优化;

场景二,增量加工逻辑简单,相比全量加工性能优势明显;

2.1.加工原则

然后我们需要确立一些使用增量加工的原则,突破或者不遵守这些原则都是不合理或者不正确的。

一、增量表(增量状态[U\D\I\K],数据更新时间);

二、2张增量表不能直接关联,必须要有至少一张表是全量;

三、增量加工产出的结果表,还需要记录增量状态和数据更新时间;

四、多个表关联情况下,需要取多个表的增量标识,只要某一个表的关联行是增量就使用该表增量标识;

五、只有主表或则INNER JOIN的表的INSERTDELETE状态可以传递到下一层,其他表的增量状态都是UPDATE

2.2.MERGE逻辑

增量集成到MaxCompute平台的数据落地后,需要做一次MERGE才会产生ODS层的全量数据。所以,MERGE逻辑是最简单和经典的增量加工逻辑。最简单的MERGE逻辑如下:

INSERT OVERWRITE TABLE table_old PARTITION(ds='${ds}')

SELECT `(ds)?+.+`

FROM table_old a --全量表

LEFT ANTI JOIN

table_new b --增量表

ON a.pk = b.pk

AND b.ds = '${ds}'

WHERE a.ds = '${lastdate}'

UNION ALL

SELECT b.*

FROM table_new b

WHERE b.ds = '${ds}' 

-- AND b.operation not in('D')

;

 

这个逻辑使用了一个JOIN加上一个UNION实现了一个MERGE逻辑,把增量合并成一份全量。这里有一个选项【-- AND b.operation not in('D')】,是否要把物理删除从当前全量表中删除,可以根据实际业务需求选择。

2.3.

业务计算逻辑

MERGE逻辑是最简单的一个涉及到增量的逻辑,但是实际业务计算逻辑要比这个场景更加复杂一些。

2.3.1.2张增量表的处理

我们在MERGE里面虽然也是2张表,但是其实这是一张表的增量与全量。如果是2张增量表,那么该如何处理呢。基于两张增量表无法关联的原则,我们必须引入全量表。

1.我们需要利用2张表的当日增量与全量,也就是说有4张表参与计算。

2.如果不想让全量直接关联,那么就需要先找到两个增量表的主键的并集。然后从两个表的全量中拆出这个并集的集合,再去关联。

逻辑如下:

-- ta_add ta表的增量表

-- ta_all ta表的全量表

-- tb_add tb表的增量表

-- tb_all tb表的全量表

-- 注意这个场景使用了mapjoin,增量表的数据量是有限制的

with tx_add as(

 select distinct pk from(

 select pk from ta_add where ds = '${ds}'

 union all

 select pk from tb_add where ds = '${ds}')t)

,ta_add2(

 select/*+mapjoin(t2)*/ t1.*

   from ta_all t1 join tx_add t2 on t1.pk=t2.pk

  where t1.ds = '${ds}'

)

,tb_add2(

 select/*+mapjoin(t2)*/ t1.*

   from tb_all t1 join tx_add t2 on t1.pk=t2.pk

  where t1.ds = '${ds}'

)

insert overwrite table tc_add partition (ds='${ds}')

select *

from ta_add2 t1 join tb_add2 t2 on t1.pk=t2.pk

;

 

这个逻辑利用了增量表比较小,可以利用了MAPJOIN的特性,可以快速的产出两个可以关联的并集再去关联。因为避免了大表的重分布,所以,可以大幅提升运行效率,降低资源消耗。(在这里增量的意义是表真的很大,如果全量是两张百万级的表,建议测试一下性能,可能直接关联更简单效率更高。所以,在MaxCompute做增量加工计算很多场景是没必要的。)

2.3.2.2张以上增量表的处理

我们一般说的增量加工的表还是指业务表,而不是代码表、参数表这种小表。这种万级的小表,增量与全量关联计算的性能差距可以忽略。百万级这种量级的表,增量计算也是意义不大的。我们看下上一小节那段冗长的逻辑,其实原本只需要2行就可以,现在已经变得如此的复杂。2张以上的表,如果使用同一PK关联,2张以上表的这个逻辑还是可以沿用的。如果有多个不同的关联PK,这个问题就从一维搞成了二维,除非实在不得已,不建议再去搞增量加工了。

我在这个优化工作的过程中遇到的场景,就是远远大于2张以上的表的增量加工,并且关联的PK也是多个。原来开发者选取了主表作为增量表,其他的表都是全量表的计算逻辑。因为这是一个分钟级的任务,原来的开发者应该还是希望从性能的角度做一些高效的设计。

但是在这个场景,第一主表并不是太大(百万级),相反左关联的表有上千万的。所以,我并未看到这个增量加工带来巨大性能提升的意义。第二主表的增量是通过一个指定的时间区间来识别近一个时间片段的,这种在集成的源是源系统的时候是可行的。但是这里刚好是一个不稳定的备库的备库,所以,使用固定时间区间可能会因为数据延迟导致未被识别为增量。

索性,我就直接改为全量加工了,这样就没问题了。但是这样就无法识别出哪些数据是加工都的增量了,这就涉及到下面要提到的增量推送的问题。

2.4.增量推送逻辑

有两种思路可以获取需要推送的增量,一种是从原始增量开始就一直保留增量标志字段,另一种是从最终结果中利用T和T+1两个全量比对出增量。在上面提到的场景,我们就遇到了第一个场景,我们需要在加工环节保持增量识别标志,并对这个字段在关联后的结果进行计算。

2.4.1.增量标志计算

增量标志要用来计算,一定是可以计算的。在上一节我提到系统有延迟业务的数据时间是不可靠的了,那如何来判断增量呢?我们的集成任务其实很难去从数据库的备库获取可靠的时间戳,但是我们本次集成的增量数据一定是一个确定的增量集合,所以,这个ETL_DATE(一般是我们dataworks的bizdate或者yyyymmddhhmiss)就是我们在MaxCompute批量做加工可用的增量时间戳。数据库同步工具识别出来的数据的变化状态有增、删、改、主键更新(I、D、U、K)四种,我们是可以直接利用的。

所以,我们在这里使用的逻辑如下:

select ...

,casewhen a.etl_partition ='${ds}'then a.etl_partition

     when b.etl_partition ='${ds}'then b.etl_partition

     ...

     else a.etl_partition endas etl_date

,casewhen a.etl_partition ='${ds}'then a.operation

     when b.etl_partition ='${ds}'then'U'

     ...

     else a.operation endas operation

from tablea a

left join tableb on a.pk=b.pk

...

where ...;

 

所以这种方式是可以把增量状态保持下去的,但是因为这个计算后的结果其实一次次的叠加后,可能就不知道对不对了。所以,在具体的业务场景还要具体的去看。

2.4.2.全字段比对

全字段比对是一种暴力的计算方法,不需要增量加工,我也可以计算出增量。并且这种计算结果还是真实可靠的,相对于一个经过多层计算后的业务结果表来说,更是如此。

全字段比对逻辑如下:

一、T+1日表比T日表多的记录,INSERT状态;

二、T日表比T +1日表多的记录,DELETE状态;

三、T+1日表比T日表,关联后相同主键的非主键字段值不一致的,UPDATE状态;

这个比对十分消耗计算资源,尤其是一些最细业务粒度的交易表、事件表。但是对一些用户表这种表来说,问题倒是不大。比对逻辑如下:

-- I

select a.*,'I'as operation

 from      table1 a

 left join table1 b on a.pk = b.pk and b.ds='${lastdate}'

 where a.ds='${ds}'

  and b.pk isnull

;

-- D

select a.*,'D'as operation

 from      table1 a

 left join table1 b on a.pk = b.pk and b.ds='${ds}'

 where a.ds='${lastdate}'

  and b.pk isnull

;

-- U

select a.*,'U'as operation

 from table1 a

 join table1 b on a.pk = b.pk and b.ds='${ds}'

 where a.ds='${lastdate}'

  and(coalesce(b.col,'')<>coalesce(b.col,'') -- 字符

   orcoalesce(b.col,0)<>coalesce(b.col,0)   -- 数值

   orcoalesce(b.col,'0001-01-01')<>coalesce(b.col,'0001-01-01'))  -- 日期

;

 

全字段比对看起来其实并不优美,实在是有点粗暴。当然你也许会有更容易识别增量的方式,可以多试试,这将是你保底的方法。

3.实践总结

通过上面的内容,我们对增量加工的方法有了一定了解。希望我文中提到的方法能帮助大家在日后在项目中正确的使用增量加工的方法,并通过这个方法在部分场景获得显著的性能改进。另外我还是要提到一点,就是增量加工逻辑比全量加工更加复杂,并且还会遇到更为复杂的异常排查、补数据等维护等问题。大家在实际项目中,一定要权衡好利弊,再定夺方案。

MaxCompute 二维码拼图.png

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
1月前
业务增量数据入仓以及增全量合并工作
增量数据入库与增量全合并工作
71 1
|
6月前
|
消息中间件 SQL Kafka
离线数仓(四)【数仓数据同步策略】(1)
离线数仓(四)【数仓数据同步策略】
|
6月前
|
消息中间件 JSON 分布式计算
离线数仓(四)【数仓数据同步策略】(3)
离线数仓(四)【数仓数据同步策略】
|
6月前
|
canal 关系型数据库 MySQL
离线数仓(四)【数仓数据同步策略】(2)
离线数仓(四)【数仓数据同步策略】
|
6月前
|
消息中间件 JSON Java
离线数仓(四)【数仓数据同步策略】(4)
离线数仓(四)【数仓数据同步策略】
|
存储 监控 应用服务中间件
日志服务之数据清洗与入湖
本教程介绍如何使用日志服务接入NGINX模拟数据,通过数据加工对数据进行清洗并归档至OSS中进行存储。
|
消息中间件 canal SQL
4、离线数仓数据同步策略(全量表数据同步、增量表数据同步、首日同步、采集通道脚本)(一)
4、离线数仓数据同步策略(全量表数据同步、增量表数据同步、首日同步、采集通道脚本)(一)
|
SQL 消息中间件 JSON
4、离线数仓数据同步策略(全量表数据同步、增量表数据同步、首日同步、采集通道脚本)(二)
4、离线数仓数据同步策略(全量表数据同步、增量表数据同步、首日同步、采集通道脚本)(二)
|
存储 数据采集 移动开发
日志服务之数据清洗与入湖-1
日志服务之数据清洗与入湖-1
125 0
日志服务之数据清洗与入湖-1
|
存储 数据采集 移动开发
日志服务之数据清洗与入湖-3
日志服务之数据清洗与入湖-3
137 0
日志服务之数据清洗与入湖-3