50000字,数仓建设保姆级教程,离线和实时一网打尽(理论+实战) 下 (三)

本文涉及的产品
数据管理 DMS,安全协同 3个实例 3个月
推荐场景:
学生管理系统数据库
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 数仓建设教程(理论+实战) 下

当提交任务之后:观察Flink WEB UI

image.png


查看ADS层的ads_province_index表数据:


image.png

  • ads_sku_index


首先在MySQL中创建对应的ADS目标表:ads_sku_index


CREATE TABLE ads_sku_index
(
  sku_id BIGINT(10),
  sku_name VARCHAR(100),
  weight DOUBLE,
  tm_id BIGINT(10),
  price DOUBLE,
  spu_id BIGINT(10),
  c3_id BIGINT(10),
  c3_name VARCHAR(100) ,
  c2_id BIGINT(10),
  c2_name VARCHAR(100),
  c1_id BIGINT(10),
  c1_name VARCHAR(100),
  order_amount DOUBLE,
  order_count BIGINT(10),
  sku_count BIGINT(10),
  dt varchar(100),
  PRIMARY KEY (sku_id,dt)
);


向MySQL的ADS层目标装载数据:


-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个商品对应的订单个数
--      2.每天每个商品对应的订单金额
--      3.每天每个商品对应的数量
-- ---------------------------------
CREATE TABLE ads_sku_index
(
  sku_id BIGINT,
  sku_name VARCHAR,
  weight DOUBLE,
  tm_id BIGINT,
  price DOUBLE,
  spu_id BIGINT,
  c3_id BIGINT,
  c3_name VARCHAR ,
  c2_id BIGINT,
  c2_name VARCHAR,
  c1_id BIGINT,
  c1_name VARCHAR,
  order_amount DOUBLE,
  order_count BIGINT,
  sku_count BIGINT,
  dt varchar,
  PRIMARY KEY (sku_id,dt) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/ads',
    'table-name' = 'ads_sku_index', 
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe'
);
-- ---------------------------------
-- dwd_paid_order_detail已支付订单明细宽表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,2),
  create_time STRING,
  pay_time STRING
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_paid_order_detail',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- 商品指标统计
-- ---------------------------------
CREATE TABLE tmp_sku_index(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
 order_sku_num BIGINT,
    pay_date DATE
)WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_sku_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- 数据装载
-- ---------------------------------
INSERT INTO tmp_sku_index
SELECT
      sku_id,
      count(distinct order_id) order_count,-- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
   sum(sku_num) order_sku_num,
      TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY sku_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
-- ---------------------------------
-- tmp_sku_index_source
-- 使用该临时汇总表,作为数据源
-- ---------------------------------
CREATE TABLE tmp_sku_index_source(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    order_sku_num BIGINT,
    pay_date DATE,
    proctime as PROCTIME()   -- 通过计算列产生一个处理时间列
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_sku_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- DIM层,商品维表,
-- 创建商品维表数据源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_sku_info`;
CREATE TABLE dim_sku_info (
  id BIGINT,
  sku_name STRING,
  c3_id BIGINT,
  weight DECIMAL(10,2),
  tm_id BIGINT,
  price DECIMAL(10,2),
  spu_id BIGINT,
  c3_name STRING,
  c2_id BIGINT,
  c2_name STRING,
  c1_id BIGINT,
  c1_name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'dim_sku_info', 
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'scan.fetch-size' = '100'
);
-- ---------------------------------
-- 向ads_sku_index装载数据
-- 维表JOIN
-- ---------------------------------
INSERT INTO ads_sku_index
SELECT
  sku_id ,
  sku_name ,
  weight ,
  tm_id ,
  price ,
  spu_id ,
  c3_id ,
  c3_name,
  c2_id ,
  c2_name ,
  c1_id ,
  c1_name ,
  sc.order_amount,
  sc.order_count ,
  sc.order_sku_num ,
  cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_source sc 
  JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds
  ON ds.id = sc.sku_id;


当提交任务之后:观察Flink WEB UI:


image.png


查看ADS层的ads_sku_index表数据


image.png

5. FineBI展示

image.png


七、数据治理



数仓建设真正的难点不在于数仓设计,而在于后续业务发展起来,业务线变的庞大之后的数据治理,包括资产治理、数据质量监控、数据指标体系的建设等。


其实数据治理的范围很⼴,包含数据本⾝的管理、数据安全、数据质量、数据成本等。在DAMA 数据管理知识体系指南中,数据治理位于数据管理“车轮图”的正中央,是数据架构、数据建模、数据存储、数据安全、数据质量、元数据管理、主数据管理等10大数据管理领域的总纲,为各项数据管理活动提供总体指导策略。

image.png



1. 数据治理之道是什么


1. 数据治理需要体系建设


为发挥数据价值需要满足三个要素:合理的平台架构、完善的治理服务、体系化的运营手段。


根据企业的规模、所属行业、数据量等情况选择合适的平台架构;治理服务需要贯穿数据全生命周期,保证数据在采集、加工、共享、存储、应用整个过程中的完整性、准确性、一致性和实效性;运营手段则应当包括规范的优化、组织的优化、平台的优化以及流程的优化等等方面。


2. 数据治理需要夯实基础


数据治理需要循序渐进,但在建设初期至少需要关注三个方面:数据规范、数据质量、数据安全。规范化的模型管理是保障数据可以被治理的前提条件,高质量的数据是数据可用的前提条件,数据的安全管控是数据可以共享交换的前提条件。


3. 数据治理需要IT赋能


数据治理不是一堆规范文档的堆砌,而是需要将治理过程中所产生的的规范、流程、标准落地到IT平台上,在数据生产过程中通过“以终为始”前向的方式进行数据治理,避免事后稽核带来各种被动和运维成本的增加。


4. 数据治理需要聚焦数据


数据治理的本质是管理数据,因此需要加强元数据管理和主数据管理,从源头治理数据,补齐数据的相关属性和信息,比如:元数据、质量、安全、业务逻辑、血缘等,通过元数据驱动的方式管理数据生产、加工和使用。


5. 数据治理需要建管一体化


数据模型血缘与任务调度的一致性是建管一体化的关键,有助于解决数据管理与数据生产口径不一致的问题,避免出现两张皮的低效管理模式。


2. 浅谈数据治理方式


如上面所说,数据治理的范围非常广,其中最重要的是数据质量治理,而数据质量涉及的范围也很广,贯穿数仓的整个生命周期,从数据产生->数据接入->数据存储->数据处理->数据输出->数据展示,每个阶段都需要质量治理,评价维度包括完整性、规范性、一致性、准确性、唯一性、关联性等。


在系统建设的各个阶段都应该根据标准进行数据质量检测和规范,及时进行治理,避免事后的清洗工作。


质量检测可参考以下维度:


维度 衡量标准
完整性 业务指定必须的数据是否缺失,不允许为空字符或者空值等。例如,数据源是否完整、维度取值是否完整、数据取值是否完整等
时效性 当需要使用时,数据能否反映当前事实。即数据必须及时,能够满足系统对数据时间的要求。例如处理(获取、整理、清洗、加载等)的及时性
唯一性 在指定的数据集中数据值是否唯一
参照完整性 数据项是否在父表中有定义
依赖一致性 数据项取值是否满足与其他数据项之间的依赖关系
正确性 数据内容和定义是否一致
精确性 数据精度是否达到业务规则要求的位数
技术有效性 数据项是否按已定义的格式标准组织
业务有效性 数据项是否符合已定义的
可信度 根据客户调查或客户主动提供获得
可用性 数据可用的时间和数据需要被访问时间的比例
可访问性 数据是否便于自动化读取


下面是根据美团的技术文章总结的几点具体治理方式:


1. 规范治理


规范是数仓建设的保障。为了避免出现指标重复建设和数据质量差的情况,统一按照最详细、可落地的方法进行规范建设。


(1) 词根


词根是维度和指标管理的基础,划分为普通词根与专有词根,提高词根的易用性和关联性。


  • 普通词根:描述事物的最小单元体,如:交易-trade。
  • 专有词根:具备约定成俗或行业专属的描述体,如:美元-USD。
相关实践学习
数据库实验室挑战任务-初级任务
本场景介绍如何开通属于你的免费云数据库,在RDS-MySQL中完成对学生成绩的详情查询,执行指定类型SQL。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
2月前
|
存储 JSON 大数据
大数据离线数仓---金融审批数仓
大数据离线数仓---金融审批数仓
133 1
|
5月前
|
SQL 分布式计算 数据可视化
滴滴出行大数据数仓实战
滴滴出行大数据数仓实战
124 0
滴滴出行大数据数仓实战
|
5月前
|
关系型数据库 MySQL BI
用友畅捷通基于阿里云 EMR StarRocks 搭建实时湖仓实战分享
本文从用友畅捷通公司介绍及业务背景;数据仓库技术选型、实际案例及未来规划等方面,分享了用友畅捷通基于阿里云 EMR StarRocks 搭建实时湖仓的实战经验。
607 0
用友畅捷通基于阿里云 EMR StarRocks 搭建实时湖仓实战分享
|
7月前
|
SQL 存储 分布式计算
数仓 Hive HA 介绍与实战操作
数仓 Hive HA 介绍与实战操作
|
2月前
|
存储 SQL HIVE
金融审批数仓(离线)--DWD层、ADS层
金融审批数仓(离线)--DWD层、ADS层
|
3月前
|
SQL 分布式计算 数据库
离线数仓--大数据技术之DolphinScheduler
离线数仓--大数据技术之DolphinScheduler
155 2
|
4月前
|
数据挖掘 数据库
离线数仓6.0--- 数据仓库 ER模型-范式理论,维度模型、维度建模理论之事实表、维度建模理论之维度表
离线数仓6.0--- 数据仓库 ER模型-范式理论,维度模型、维度建模理论之事实表、维度建模理论之维度表
130 0
|
4月前
|
存储 分布式计算 关系型数据库
|
7月前
|
SQL 消息中间件 存储
从理论到实践,实时湖仓功能架构设计与落地实战
实时湖仓是「实时计算」和「数据湖」的一种结合应用场景,并不是具体指一个产品模块。本文主要介绍了平台通过相关功能的设计,让数据开发可以更简单更直观地了解 Flink Catalog、数据湖、流批一体等概念,并在实际业务场景中更方便地去落地实践。
124 0
|
7月前
|
大数据 数据挖掘 数据处理
直播预约丨《实时湖仓实践五讲》第二讲:实时湖仓功能架构设计与落地实战
《实时湖仓实践五讲》是袋鼠云打造的系列直播活动,将围绕实时湖仓的建设趋势和通用问题,邀请奋战于企业数字化一线的核心产品&技术专家,结合实践案例分析,和听众共同探讨实时湖仓领域的前沿技术。 《实时湖仓实践五讲》第二讲——《实时湖仓功能架构设计与落地实战》将于10月11日 15:00-16:00开播。 快快预约直播吧~
35 0