50000字,数仓建设保姆级教程,离线和实时一网打尽(理论+实战) 下 (三)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
数据管理 DMS,安全协同 3个实例 3个月
推荐场景:
学生管理系统数据库
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 数仓建设教程(理论+实战) 下

当提交任务之后:观察Flink WEB UI

image.png


查看ADS层的ads_province_index表数据:


image.png

  • ads_sku_index


首先在MySQL中创建对应的ADS目标表:ads_sku_index


CREATE TABLE ads_sku_index
(
  sku_id BIGINT(10),
  sku_name VARCHAR(100),
  weight DOUBLE,
  tm_id BIGINT(10),
  price DOUBLE,
  spu_id BIGINT(10),
  c3_id BIGINT(10),
  c3_name VARCHAR(100) ,
  c2_id BIGINT(10),
  c2_name VARCHAR(100),
  c1_id BIGINT(10),
  c1_name VARCHAR(100),
  order_amount DOUBLE,
  order_count BIGINT(10),
  sku_count BIGINT(10),
  dt varchar(100),
  PRIMARY KEY (sku_id,dt)
);


向MySQL的ADS层目标装载数据:


-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个商品对应的订单个数
--      2.每天每个商品对应的订单金额
--      3.每天每个商品对应的数量
-- ---------------------------------
CREATE TABLE ads_sku_index
(
  sku_id BIGINT,
  sku_name VARCHAR,
  weight DOUBLE,
  tm_id BIGINT,
  price DOUBLE,
  spu_id BIGINT,
  c3_id BIGINT,
  c3_name VARCHAR ,
  c2_id BIGINT,
  c2_name VARCHAR,
  c1_id BIGINT,
  c1_name VARCHAR,
  order_amount DOUBLE,
  order_count BIGINT,
  sku_count BIGINT,
  dt varchar,
  PRIMARY KEY (sku_id,dt) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/ads',
    'table-name' = 'ads_sku_index', 
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe'
);
-- ---------------------------------
-- dwd_paid_order_detail已支付订单明细宽表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,2),
  create_time STRING,
  pay_time STRING
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'dwd_paid_order_detail',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- 商品指标统计
-- ---------------------------------
CREATE TABLE tmp_sku_index(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
 order_sku_num BIGINT,
    pay_date DATE
)WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_sku_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- tmp_sku_index
-- 数据装载
-- ---------------------------------
INSERT INTO tmp_sku_index
SELECT
      sku_id,
      count(distinct order_id) order_count,-- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
   sum(sku_num) order_sku_num,
      TO_DATE(pay_time,'yyyy-MM-dd') pay_date
FROM dwd_paid_order_detail
GROUP BY sku_id,TO_DATE(pay_time,'yyyy-MM-dd')
;
-- ---------------------------------
-- tmp_sku_index_source
-- 使用该临时汇总表,作为数据源
-- ---------------------------------
CREATE TABLE tmp_sku_index_source(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    order_sku_num BIGINT,
    pay_date DATE,
    proctime as PROCTIME()   -- 通过计算列产生一个处理时间列
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'tmp_sku_index',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
-- ---------------------------------
-- DIM层,商品维表,
-- 创建商品维表数据源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_sku_info`;
CREATE TABLE dim_sku_info (
  id BIGINT,
  sku_name STRING,
  c3_id BIGINT,
  weight DECIMAL(10,2),
  tm_id BIGINT,
  price DECIMAL(10,2),
  spu_id BIGINT,
  c3_name STRING,
  c2_id BIGINT,
  c2_name STRING,
  c1_id BIGINT,
  c1_name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://kms-1:3306/dim',
    'table-name' = 'dim_sku_info', 
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123qwe',
    'scan.fetch-size' = '100'
);
-- ---------------------------------
-- 向ads_sku_index装载数据
-- 维表JOIN
-- ---------------------------------
INSERT INTO ads_sku_index
SELECT
  sku_id ,
  sku_name ,
  weight ,
  tm_id ,
  price ,
  spu_id ,
  c3_id ,
  c3_name,
  c2_id ,
  c2_name ,
  c1_id ,
  c1_name ,
  sc.order_amount,
  sc.order_count ,
  sc.order_sku_num ,
  cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_source sc 
  JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds
  ON ds.id = sc.sku_id;


当提交任务之后:观察Flink WEB UI:


image.png


查看ADS层的ads_sku_index表数据


image.png

5. FineBI展示

image.png


七、数据治理



数仓建设真正的难点不在于数仓设计,而在于后续业务发展起来,业务线变的庞大之后的数据治理,包括资产治理、数据质量监控、数据指标体系的建设等。


其实数据治理的范围很⼴,包含数据本⾝的管理、数据安全、数据质量、数据成本等。在DAMA 数据管理知识体系指南中,数据治理位于数据管理“车轮图”的正中央,是数据架构、数据建模、数据存储、数据安全、数据质量、元数据管理、主数据管理等10大数据管理领域的总纲,为各项数据管理活动提供总体指导策略。

image.png



1. 数据治理之道是什么


1. 数据治理需要体系建设


为发挥数据价值需要满足三个要素:合理的平台架构、完善的治理服务、体系化的运营手段。


根据企业的规模、所属行业、数据量等情况选择合适的平台架构;治理服务需要贯穿数据全生命周期,保证数据在采集、加工、共享、存储、应用整个过程中的完整性、准确性、一致性和实效性;运营手段则应当包括规范的优化、组织的优化、平台的优化以及流程的优化等等方面。


2. 数据治理需要夯实基础


数据治理需要循序渐进,但在建设初期至少需要关注三个方面:数据规范、数据质量、数据安全。规范化的模型管理是保障数据可以被治理的前提条件,高质量的数据是数据可用的前提条件,数据的安全管控是数据可以共享交换的前提条件。


3. 数据治理需要IT赋能


数据治理不是一堆规范文档的堆砌,而是需要将治理过程中所产生的的规范、流程、标准落地到IT平台上,在数据生产过程中通过“以终为始”前向的方式进行数据治理,避免事后稽核带来各种被动和运维成本的增加。


4. 数据治理需要聚焦数据


数据治理的本质是管理数据,因此需要加强元数据管理和主数据管理,从源头治理数据,补齐数据的相关属性和信息,比如:元数据、质量、安全、业务逻辑、血缘等,通过元数据驱动的方式管理数据生产、加工和使用。


5. 数据治理需要建管一体化


数据模型血缘与任务调度的一致性是建管一体化的关键,有助于解决数据管理与数据生产口径不一致的问题,避免出现两张皮的低效管理模式。


2. 浅谈数据治理方式


如上面所说,数据治理的范围非常广,其中最重要的是数据质量治理,而数据质量涉及的范围也很广,贯穿数仓的整个生命周期,从数据产生->数据接入->数据存储->数据处理->数据输出->数据展示,每个阶段都需要质量治理,评价维度包括完整性、规范性、一致性、准确性、唯一性、关联性等。


在系统建设的各个阶段都应该根据标准进行数据质量检测和规范,及时进行治理,避免事后的清洗工作。


质量检测可参考以下维度:


维度 衡量标准
完整性 业务指定必须的数据是否缺失,不允许为空字符或者空值等。例如,数据源是否完整、维度取值是否完整、数据取值是否完整等
时效性 当需要使用时,数据能否反映当前事实。即数据必须及时,能够满足系统对数据时间的要求。例如处理(获取、整理、清洗、加载等)的及时性
唯一性 在指定的数据集中数据值是否唯一
参照完整性 数据项是否在父表中有定义
依赖一致性 数据项取值是否满足与其他数据项之间的依赖关系
正确性 数据内容和定义是否一致
精确性 数据精度是否达到业务规则要求的位数
技术有效性 数据项是否按已定义的格式标准组织
业务有效性 数据项是否符合已定义的
可信度 根据客户调查或客户主动提供获得
可用性 数据可用的时间和数据需要被访问时间的比例
可访问性 数据是否便于自动化读取


下面是根据美团的技术文章总结的几点具体治理方式:


1. 规范治理


规范是数仓建设的保障。为了避免出现指标重复建设和数据质量差的情况,统一按照最详细、可落地的方法进行规范建设。


(1) 词根


词根是维度和指标管理的基础,划分为普通词根与专有词根,提高词根的易用性和关联性。


  • 普通词根:描述事物的最小单元体,如:交易-trade。
  • 专有词根:具备约定成俗或行业专属的描述体,如:美元-USD。
相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
2月前
|
消息中间件 Java Kafka
实时数仓Kappa架构:从入门到实战
【11月更文挑战第24天】随着大数据技术的不断发展,企业对实时数据处理和分析的需求日益增长。实时数仓(Real-Time Data Warehouse, RTDW)应运而生,其中Kappa架构作为一种简化的数据处理架构,通过统一的流处理框架,解决了传统Lambda架构中批处理和实时处理的复杂性。本文将深入探讨Kappa架构的历史背景、业务场景、功能点、优缺点、解决的问题以及底层原理,并详细介绍如何使用Java语言快速搭建一套实时数仓。
193 4
|
7月前
|
数据采集 大数据
大数据实战项目之电商数仓(二)
大数据实战项目之电商数仓(二)
166 0
|
7月前
|
数据挖掘
离线数仓(十)【ADS 层开发】(2)
离线数仓(十)【ADS 层开发】
|
5月前
|
消息中间件 监控 关系型数据库
Serverless 应用的监控与调试问题之实时离线数仓一体化常用的解决方案有什么问题
Serverless 应用的监控与调试问题之实时离线数仓一体化常用的解决方案有什么问题
|
6月前
|
存储 DataWorks Java
DataWorks产品使用合集之开发离线数仓时,需要多个工作空间的情况有哪些
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
7月前
|
消息中间件 分布式计算 Hadoop
大数据实战项目之电商数仓(一)
大数据实战项目之电商数仓(一)
356 0
|
7月前
|
存储 消息中间件 NoSQL
Flink 实时数仓(一)【实时数仓&离线数仓对比】(2)
Flink 实时数仓(一)【实时数仓&离线数仓对比】
|
7月前
|
存储 消息中间件 Kafka
Flink 实时数仓(一)【实时数仓&离线数仓对比】(1)
Flink 实时数仓(一)【实时数仓&离线数仓对比】
|
7月前
|
SQL
离线数仓(十)【ADS 层开发】(5)
离线数仓(十)【ADS 层开发】

热门文章

最新文章