AnalyticDB PostgreSQL构建一站式实时数仓实践

简介: 本文介绍通过 AnalyticDB PostgreSQL 版基于实时物化视图,构建流批一体的一站式实时数仓解决方案,实现一套系统、一份数据、一次写入,即可在数仓内完成实时数据源头导入到实时分析全流程。

1. 背景

随着信息技术的发展和互联网的普及,业务的时效性越来越高,在数据分析领域,对数据的时效性要求也越来越高。更多的业务开始从传统的基于批处理的离线模式,转向基于流式处理的实时模式。基于流处理引擎+大数据产品构建实时数据解决方案有以下劣势:

1)离线和实时数仓两套架构,代码难以复用,开发和运维成本高;

2)对复杂的分析业务支持有限,如和大的离线维度表关联、多层嵌套、历史数据的回溯等支持度弱;

3)Debug成本高,需要排查组件多,同时需要对各种新概念如窗口函数深度理解才能找到根因;

AnalyticDB PostgreSQL 版基于实时物化视图构建流批一体的一站式实时数仓解决方案,实现一套系统、一份数据、一次写入,即可在数仓内完成实时数据源头导入到实时分析全流程。


2. 技术方案

实时数仓的构建链路通常包含实时数据写入、实时处理、实时分析(消费)三个步骤;在数据量比较小、业务较简单的情况下(如作下累计订单数的统计)时这个流程会比较简单,单纯采用流处理引擎即可完成。但当数据规模较大、数据格式不规范、计算逻辑复杂,同时下游对中间表的表依赖大时,简单的实时分析链路无法满足业务需求,需要借鉴数据仓库分层架构设计即ODS、DWD、DWS、ADS。AnalyticDB PostgreSQL 版基于实时物化视图可以完美地融合离线数仓的分层架构,让用户能够专注于业务设计和应用,数据实时流转链路由产品本身来保障。


2.1 数据架构

AnalyticDB PostgreSQL 版实时数仓的架构如下:

1)实时写入:高性能,低延迟,丰富的数据源支持。写入立即可见,完备的事务支持;

2)实时处理:基于实时物化视图,流式增量的实时对数据进行实时ETL处理。相较于普通(非实时)物化视图,实时物化视图无需手动调用刷新命令,即可实现数据更新时自动同步刷新物化视图。当基表发生变化时,构建在基表上的实时物化视图将会自动更新。还可以在实时物化视图上再创建实时物化视图,当基表发生变化时,相关级联的实时物化视图也会自动更新。基于此特性可以方便地构建分析数据的实时ETL处理链路;

3)实时分析:基于SIMD指令集向量化执行引擎、CBO的查询优化器、列式的存储引擎,实现高性能实时分析。

(AnalyticDB PostgreSQL 版实时数仓架构)


典型的业务流程为:多源端数据(实时、离线)可以同时高性能实时地写入实时数据仓库,在仓内按照事先通过SQL开发的实时物化视图进行实时增量实时处理和加工,完成如字段清洗转换、JOIN加维、多维度GROUP BY预聚合、多链路UNION ALL汇总等数据处理动作。最后,支持直接在实时物化视图上进行实时分析如实时Ad-hoc查询,分析报表,实时看板,实时大盘等业务。


2.2 核心优势

相比于传统方案,基于AnalyticDB PostgreSQL 版的一站式实时数仓方案有5大优势:


2.2.1 架构领先、成本最优

AnalyticDB PostgreSQL 版支持对接RDS业务数据库实时日志构建一站式实时数仓,相比采用流处理引擎+大数据产品成本上更优,同时依赖的组件更小在稳定性和运维性上更优。一站式实时数仓的开发和数据流转都在仓内完成,无需多套系统间反复流转。整体来说:

1)成本:仅一份数据存储,仅一套系统部署,仅一次写入开销,整体资源成本最优;

2)性能:没有复杂的链路流转,资源开销低,并且数据延迟低;

3)开发:通常一套SQL开发即可,无需多系统适配联调等;

4)运维:只需要维护一套系统;

5)维护:数据异常排查便利,数据订正容易。


2.2.2 支持流批一体

AnalyticDB PostgreSQL 版除了支持RDS业务数据库日志外,还支持丰富的数据源写入方式,可以高效完成入仓之后进行融合处理和融合查询:

1)通过DTS对接RDS Binlog日志,实现业务数据准时地同步到实时数仓中;

2)支持实时数据源如消息队列Kafka、RacketMQ等;

3)支持和实时流处理引擎对接,实现数据消费;

4)支持通过数据同步或读外表的形式将数据写入到 AnalyticDB PostgreSQL 版中;


基于实时物化视图构建的实时处理链路,仅需要开发一份SQL,即可同时支持流式和批式数据处理。在实时物化视图首次创建时,是基于批处理的方式执行SQL,完成全量结果集的计算,之后在实时写入的过程中增量地流式维护SQL的结果集。并且可以支持针对只有部分数据需要实时更新,其余部分数据的变化只需要定时更新的场景,可以设置参与流式JOIN的部分表不参与实时更新,然后在合适的时候,通过Refresh的批方式,批量刷新定时更新的结果集。

(流批一体优势)


2.2.3 支持复杂批处理任务

实时数仓建设过程中的一大难点就是将原有的复杂批处理任务,转化为实时处理任务,通常来说批处理可以较为轻松地支持复杂的SQL语法,尤其是多重嵌套等复杂SQL,而流处理对SQL的语法的限制较多,AnalyticDB PostgreSQL版基于传统数仓对复杂SQL查询支持的优势,相比流计算引擎可以在复杂批处理任务转化为实时处理任务时有更小的改造成本,甚至实践中很多客户的大部分批处理任务可以直接实时化。


2.2.4 支持无限窗口

数据库引擎,通常都是面向磁盘存储设计的,相比于基于内存设计的流计算引擎,可以更好的支持超大表的实时JOIN,尤其是多大表复杂的实时JOIN。基于AnalyticDB PostgreSQL版的实时物化视图,可以支持任意历史数据的回溯,不受窗口限制。对于历史数据的订正和回溯,实时物化视图非常便捷,只需要对原始数据做更新即可自动反映到实时链路中。


2.2.5 简易透明查询改写

传统的数仓分层,需要业务SQL显式指定访问预处理的结果集。而在一站式实时数仓内,实时分析和实时处理是合并在同一套系统内,可以相互打通和感知,轻松支持透明的查询改写。实时分析的业务可以在固定访问SQL不用变更的情况下,通过搭建和撤销实时处理链路,对实时分析进行加速和取消加速。依托数据库强大的优化器,不仅可以自动优选最优加速方案,还可以方便地进行冷热链路的切换和维护。


下图就是一个典型的透明查询改写的例子,业务SQL只需要固定访问基表进行聚合操作。优化器会自动优选通过实时物化视图定义的实时预聚合链路,方便地匹配最优groupby结果集,并且可以进行冷热链路切换,无需修改代码。

(透明查询改写)


3. 典型场景实践


3.1 数据集加载

AnalyticDB PostgreSQL 版已内置标准TPCH数据集,对于新建和已存在的实例,可以通过如下方式自动加载一份TPC-H数据集,加载完成后数据集位于adb_sampledata_realtime数据库中。


3.1.1 新建实例

新实例在创建时可以选择加载“样本数据集”


3.1.2 已有实例

对于已经存在的实例,可以通过控制台加载数据集


3.2 场景实践1-实时大宽表

主要的业务场景包含维表join、实时宽表、数据汇总等,如下图所示:

(实时大宽表)


以TPC-H标准数据集提供的数据为例子,customer表存储了客户信息,supplier表存储了供应商信息,nation表存储了国家信息,region表存储了亚洲、欧洲、美洲等洲际区域信息,orders表存储了订单信息,lineitem表存储了详单信息(一个订单可以存在多条详单),part表存储了商品(商品为零件)的详细信息。在进行实时处理时,会分成几步进行逐步拉宽操作:

1)customer表和supllier表需要和nation以及region表进行JOIN,补充上客户及供应商的国家、洲际信息。

2)lineitem表和customer表通过orders中间表JOIN,并按照国家进行过滤(这里仅是为了演示效果,实际业务场景中,往往不同国家的lineitem数据源可能就是分渠道流转的)生成分国家所有订单的详细信息(包含订单的客户信息的宽表)

3)lineitem_china和lineitem_us表union all成包含所有数据的lineitem_wide宽表。


-- 供应商信息维表,实时补全地区和国家名称
DROP MATERIALIZED VIEW if exists supplier_info;
CREATE INCREMENTAL MATERIALIZED VIEW supplier_info AS 
SELECT 
  s.*, 
  r.r_name s_rname, 
  n.n_name s_nname
FROM 
  supplier s 
  join nation n ON s.s_nationkey = n.n_nationkey 
  join region r ON n.n_regionkey = r.r_regionkey
DISTRIBUTED BY (s_suppkey);
-- 顾客信息维表,实时补全地区和国家名称
DROP MATERIALIZED VIEW if exists customer_info;
CREATE INCREMENTAL MATERIALIZED VIEW customer_info AS 
SELECT 
  c.*, 
  r.r_name c_rname, 
  n.n_name c_nname
FROM 
  customer c 
  join nation n ON c.c_nationkey = n.n_nationkey 
  join region r ON n.n_regionkey = r.r_regionkey
DISTRIBUTED BY (c_custkey);
-- 从lineitem原始数据中清洗并抽取出买家为中国地区的事实数据,同时关联“顾客信息”、“零件信息”等维表,形成中国区客户视角宽表
DROP MATERIALIZED VIEW if exists lineitem_china;
CREATE INCREMENTAL MATERIALIZED VIEW lineitem_china AS
SELECT
  l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity,
  l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus,
  l_shipdate, l_commitdate, l_receiptdate,
  CASE
    WHEN l_shipinstruct = 'NONE' THEN 0
    WHEN l_shipinstruct = 'COLLECT COD' THEN 1
    WHEN l_shipinstruct = 'DELIVER IN PERSON' THEN 2
    WHEN l_shipinstruct = 'TAKE BACK RETURN' THEN 3
  END l_shipinstruct_flag,
  CASE WHEN l_shipmode LIKE '%AIR%' THEN 0 ELSE 1 END l_shipmode_flag,
  o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate,
  p_partkey, p_name, p_type, p_size, p_retailprice,
  c_custkey, c_name, c_phone, c_nname, c_rname
FROM
  lineitem l
  LEFT JOIN orders o ON l.l_orderkey = o.o_orderkey
  LEFT JOIN part p ON l.l_partkey = p.p_partkey
  LEFT JOIN customer_info c ON o.o_custkey = c.c_custkey
WHERE
  c_nname = 'CHINA'
DISTRIBUTED BY (l_orderkey);
-- 从lineitem原始数据中清洗并抽取出买家为美国地区的事实数据,同时关联“顾客信息”、“零件信息”等维表,形成美国区客户视角宽表
DROP MATERIALIZED VIEW if exists lineitem_us;
CREATE INCREMENTAL MATERIALIZED VIEW lineitem_us AS
SELECT
  l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity,
  l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus,
  l_shipdate, l_commitdate, l_receiptdate,
  CASE
    WHEN l_shipinstruct = 'NONE' THEN 0
    WHEN l_shipinstruct = 'COLLECT COD' THEN 1
    WHEN l_shipinstruct = 'DELIVER IN PERSON' THEN 2
    WHEN l_shipinstruct = 'TAKE BACK RETURN' THEN 3
  END l_shipinstruct_flag,
  CASE WHEN l_shipmode LIKE '%AIR%' THEN 0 ELSE 1 END l_shipmode_flag,
  o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate,
  p_partkey, p_name, p_type, p_size, p_retailprice,
  c_custkey, c_name, c_phone, c_nname, c_rname
FROM
  lineitem l
  LEFT JOIN orders o ON l.l_orderkey = o.o_orderkey
  LEFT JOIN part p ON l.l_partkey = p.p_partkey
  LEFT JOIN customer_info c ON o.o_custkey = c.c_custkey
WHERE
  c_nname = 'UNITED STATES'
DISTRIBUTED BY (l_orderkey);
-- 明细数据所有维度汇总宽表,包括供应商、客户、零件信息
DROP MATERIALIZED VIEW if exists lineitem_wide;
CREATE INCREMENTAL MATERIALIZED VIEW lineitem_wide AS
SELECT
  l.*,
  s_suppkey, s_name, s_nname, s_rname
FROM
  lineitem_china l
  LEFT JOIN supplier_info s ON l.l_suppkey = s.s_suppkey
UNION ALL
SELECT
  l.*,
  s_suppkey, s_name, s_nname, s_rname
FROM
  lineitem_us l
  LEFT JOIN supplier_info s ON l.l_suppkey = s.s_suppkey
DISTRIBUTED BY (l_orderkey);


实时分析效果

在进行分析查询时,例如TPCH标准查询的Q7查询,在使用实时处理后的宽表进行查询可以提升5~10倍的查询性能:

-- 原始TPCH Q7
SELECT supp_nation,
         cust_nation,
         l_year,
         sum(volume) AS revenue
FROM 
    (SELECT n1.n_name AS supp_nation,
         n2.n_name AS cust_nation,
         extract( year
    FROM l_shipdate ) AS l_year, l_extendedprice * (1 - l_discount) AS volume
    FROM supplier, lineitem, orders, customer, nation n1, nation n2
    WHERE s_suppkey = l_suppkey
            AND o_orderkey = l_orderkey
            AND c_custkey = o_custkey
            AND s_nationkey = n1.n_nationkey
            AND c_nationkey = n2.n_nationkey
            AND ( ( n1.n_name = 'CHINA'
            AND n2.n_name = 'UNITED STATES' )
            OR ( n1.n_name = 'UNITED STATES'
            AND n2.n_name = 'CHINA' ) )
            AND l_shipdate
        BETWEEN date '1995-01-01'
            AND date '1996-12-31' ) AS shipping
GROUP BY  supp_nation, cust_nation, l_year
ORDER BY  supp_nation, cust_nation, l_year;
-- 使用实时宽表进行AD-HOC查询
SELECT supp_nation,
         cust_nation,
         l_year,
         sum(volume) AS revenue
FROM 
    (SELECT s_nname AS supp_nation,
         c_nname AS cust_nation,
         extract( year
    FROM l_shipdate ) AS l_year, l_extendedprice * (1 - l_discount) AS volume
    FROM lineitem_wide
    WHERE ( ( s_nname = 'CHINA'
            AND c_nname = 'UNITED STATES' )
            OR ( s_nname = 'UNITED STATES'
            AND c_nname = 'CHINA' ) )
            AND l_shipdate
        BETWEEN date '1995-01-01'
            AND date '1996-12-31' ) AS shipping
GROUP BY  supp_nation, cust_nation, l_year
ORDER BY  supp_nation, cust_nation, l_year;


3.3 场景实践2-实时多维聚合汇总

(实时多维聚合)


以TPC-H标准数据集提供的表为例子,lineitem表存储了详单信息,supplier表存储了供应商信息,通过lineitem详单表JOIN supplier供应商信息表之后,再对订单状态进行初步的聚合汇总,产出带区域信息的汇总数据lineitem_sup_sum实时物化视图,然后基于分区域的数据可以进一步聚合汇总,产出全域的汇总数据lineitem_sum实时物化视图。


-- lineitem按供应商维度进行部分汇总统计,可以提供按供应商地域(国家、地区)维度的分析,同时使用 l_shipdate分区,用于后续计算时按时间条件过滤
DROP MATERIALIZED VIEW if exists lineitem_sup_sum;
CREATE INCREMENTAL MATERIALIZED VIEW lineitem_sup_sum AS
SELECT
  sum(l_quantity) as sum_qty, 
  sum(l_extendedprice) as sum_base_price, 
  sum(
    l_extendedprice * (1 - l_discount)
  ) as sum_disc_price, 
  sum(
    l_extendedprice * (1 - l_discount) * (1 + l_tax)
  ) as sum_charge, 
  count(*) as count_order, 
  l_returnflag, 
  l_linestatus, 
  l_shipdate, 
  l_shipmode, 
  s_rname, 
  s_nname
FROM
  lineitem 
  LEFT JOIN supplier_info s ON l_suppkey = s_suppkey 
GROUP BY
  l_returnflag, 
  l_linestatus, 
  l_shipdate, 
  l_shipmode, 
  s_rname, 
  s_nname
DISTRIBUTED BY (l_shipdate)
;
-- 基于lineitem_sup_sum,进一步汇总,提供排除供应商维度的汇总数据,数据量较小,无需再分区
DROP MATERIALIZED VIEW if exists lineitem_sum;
CREATE INCREMENTAL MATERIALIZED VIEW lineitem_sum AS
SELECT
  sum(sum_qty) as sum_qty, 
  sum(sum_base_price) as sum_base_price, 
  sum(sum_disc_price) as sum_disc_price, 
  sum(sum_charge) as sum_charge, 
  sum(count_order) as count_order, 
  l_returnflag, 
  l_linestatus, 
  l_shipdate, 
  l_shipmode
FROM
  lineitem_sup_sum
GROUP BY
  l_returnflag, 
  l_linestatus, 
  l_shipdate, 
  l_shipmode
DISTRIBUTED BY (l_shipdate);


实时分析效果

可以使用透明查询改写,来使用实时预聚合的实时处理结果集(默认开启),通过开关透明查询改写,可以发现,聚合分析的效果提升10~20倍以上:

-- AD-HOC的聚合查询(自动透明查询改写)
set enable_incremental_matview_query_rewrite to off; -- 关闭查询改写查看原始SQL效果
SELECT
  sum(l_quantity) as sum_qty, 
  sum(l_extendedprice) as sum_base_price, 
  sum(
    l_extendedprice * (1 - l_discount)
  ) as sum_disc_price, 
  sum(
    l_extendedprice * (1 - l_discount) * (1 + l_tax)
  ) as sum_charge
FROM
  lineitem 
  LEFT JOIN supplier_info s ON l_suppkey = s_suppkey
WHERE
  l_shipdate BETWEEN date '1995-01-01' AND date '1996-12-31'
GROUP BY
  l_returnflag, 
  l_linestatus,
  l_shipmode;
-- 透明查询改写使用实时预聚合的实时物化视图结果集进行计算
SELECT
  sum(l_quantity) as sum_qty, 
  sum(l_extendedprice) as sum_base_price, 
  sum(
    l_extendedprice * (1 - l_discount)
  ) as sum_disc_price, 
  sum(
    l_extendedprice * (1 - l_discount) * (1 + l_tax)
  ) as sum_charge
FROM
  lineitem 
  LEFT JOIN supplier_info s ON l_suppkey = s_suppkey
WHERE
  l_shipdate BETWEEN date '1995-01-01' AND date '1996-12-31'
GROUP BY
  l_returnflag, 
  l_linestatus,
  l_shipmode;


3.4 场景实践3-实时+离线数据结合

在实际的实时场景中,往往有部分事实大表如详单信息等需要实时反应在分析结果中,部分维表的更新频率不高,且其中部分属性字段的变化对时效性要求不是太高。出于实时更新的代价角度衡量,可以采用流批结合的方式进行。

(流式+批式结合)


以TPCH标准数据集提供的表为例子,customer表存储了顾客信息,orders表存储了订单信息,lineitem表存储了详单信息(一个订单可以存在多条详单),part表存储了商品(商品为零件)的详细信息。

我们通过lineitem表、orders表、customer表、part表的JOIN可以产出一张实时宽表lineitem_stream,用来获取包含完整订单信息。在创建实时物化视图时,我们可以通过“WITH(bypass='customer_info,part')”来指定customer_info、part两张表表的变化不实时回补历史数据,而其余表的数据更新动作都会实时反应在实时宽表的结果集中。之后通过定时执行全量的refresh动作,在lineitem_stream实时宽表中回补customer_info、part两张表的更新。


-- WITH(bypass='customer_info,part') 指定customer_info、part表的更新不回溯历史join数据,lineitem和orders新写入的数据会实时join最新的customer_info和part信息,但是不会回溯历史信息
DROP MATERIALIZED VIEW if exists lineitem_stream;
CREATE INCREMENTAL MATERIALIZED VIEW lineitem_stream WITH(bypass='customer_info,part') AS
SELECT
  l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity,
  l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus,
  l_shipdate, l_commitdate, l_receiptdate,
  CASE
    WHEN l_shipinstruct = 'NONE' THEN 0
    WHEN l_shipinstruct = 'COLLECT COD' THEN 1
    WHEN l_shipinstruct = 'DELIVER IN PERSON' THEN 2
    WHEN l_shipinstruct = 'TAKE BACK RETURN' THEN 3
  END l_shipinstruct_flag,
  CASE WHEN l_shipmode LIKE '%AIR%' THEN 0 ELSE 1 END l_shipmode_flag,
  o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate,
  p_partkey, p_name, p_type, p_size, p_retailprice,
  c_custkey, c_name, c_phone, c_nname, c_rname
FROM
  lineitem l
  LEFT JOIN orders o ON l.l_orderkey = o.o_orderkey
  LEFT JOIN part p ON l.l_partkey = p.p_partkey
  LEFT JOIN customer_info c ON o.o_custkey = c.c_custkey
WHERE
  c_nname = 'CHINA'
DISTRIBUTED BY (l_orderkey);
-- 定时批式回补customer_info和part数据
REFRESH MATERIALIZED VIEW lineitem_stream;


4、实时链路监控

为了在实时数据仓库构建过程中减轻Debug难度,AnalyticDB PostgreSQL 版本提供了对实时物化视图链路的白屏化监控,可以让用户观察实时链路中每一个节点的数据处理情况。

(实时链路监控)


更多信息参考:产品详情


作者介绍
目录

相关产品

  • 云原生数据仓库AnalyticDB MySQL版
  • 云原生数据仓库 AnalyticDB PostgreSQL版