Dataphin实时研发实践—电商场景下的实时数据大屏构建

简介: 实时数据大屏是实时计算的重要应用场景之一,广泛应用在电商业务中,用于实时监控和分析电商平台的运营情况。通过大屏展示实时的销售额、订单量、用户活跃度、商品热度等数据指标,帮助业务人员随时了解业务的实时状态,快速发现问题和机会。同时,通过数据可视化和趋势分析,大屏也提供了决策支持和优化运营的功能,帮助业务人员做出及时的决策和调整策略,优化电商业务的运营效果。下面以电商业务为背景,介绍如何构建经典实时数仓,实现实时数据从业务库到ODS层、DWD层、DWS层全链路流转,基于Dataphin和Quick BI实现实时数据大屏。

1.背景

实时数据大屏是实时计算的重要应用场景之一,广泛应用在电商业务中,用于实时监控和分析电商平台的运营情况。通过大屏展示实时的销售额、订单量、用户活跃度、商品热度等数据指标,帮助业务人员随时了解业务的实时状态,快速发现问题和机会。同时,通过数据可视化和趋势分析,大屏也提供了决策支持和优化运营的功能,帮助业务人员做出及时的决策和调整策略,优化电商业务的运营效果。
下面以电商业务为背景,介绍如何构建经典实时数仓,实现实时数据从业务库到ODS层、DWD层、DWS层全链路流转,基于Dataphin和Quick BI实现实时数据大屏。

2.实时数仓架构设计:

image.png

  1. Flink:
    作为流式计算引擎,用于实时处理和计算实时数据。
    在实时数据大屏中,Flink可以从Kafka中读取实时的流数据,进行实时的数据处理、分析和计算。可以进行数据过滤、转换、聚合等操作,生成实时的统计指标和计算结果,为实时数据大屏提供数据支持。
  2. MySQL:
    作为关系型数据库,用于存储和管理电商业务的核心数据,如用户信息、订单信息、商品信息等。
    在实时数据大屏中,MySQL可以作为数据存储的一部分,存储一些关键数据(如维度数据、关键结果数据等),以供后续的查询和分析。
  3. Hive:
    作为数据仓库,用于存储和管理大规模数据集。
    在实时数据大屏中,Hive可以作为离线数据存储的一部分,存储历史数据、批处理数据等。可以进行数据清洗、转换和聚合等操作,用于离线分析和报表展示。
  4. Kafka:
    作为分布式消息队列,用于实时数据的高吞吐量、低延迟的传输和存储。
    在实时数据大屏中,Kafka扮演了数据转发和传输的角色。电商平台的各类实时数据,如用户行为、订单信息等,可以通过Kafka进行实时的消息传递和存储。

3.具体实现过程:

3.1数据准备

3.1.1数据源

新建MySQL、Kafka数据源:
image.png

3.1.2元表

元表是通过数据管理的跨存储类型表,开发过程中所用到的输入表、输出表、维表可以通过创建元表进行创建和管理,元表具有以下优势:
安全可靠:通过元表可以有效避免直接编写原生Flink DDL语句导致的敏感信息透出问题。
提升效率和体验:通过一次建表,可多次引用。无需重复编写DDL语句,无需进行繁杂的输入、输出、维表映射。简化开发,提升效率和体验。
资产血缘:通过元表可以维护上下游的资产血缘信息。

新建Kafka、MySQL元表:
image.png
image.png

3.2实时数仓构建

新建Flink SQL任务,可以选择引用示例代码,使用cdc实时数据同步入湖/入仓示例代码,通过简单的配置和选择即可自动生成可运行的Flink SQL
image.png
image.png
image.png

3.2.1ODS

Flink CDC做实时数据采集,MySQL中的原始业务数据(订单信息表)实时同步到中kafka做流式数仓ODS层

CREATE TEMPORARY TABLE `ods_kafka_sink` (
  `product_id` BIGINT,
  `order_id` BIGINT NOT NULL,
  `user_id` VARCHAR,
  `price` DOUBLE,
  `order_time` TIMESTAMP(3),
  PRIMARY KEY (`order_id`) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'properties.bootstrap.servers' = '<your properties.bootstrap.servers>',
  'key.format' = 'json',
  'topic' = 'ods_poc',
  'value.format' = 'json',
  'properties.group.id' = 'group_id_order_wide_persist',
  'properties.enable.idempotence' = 'false'
);

INSERT INTO ods_kafka_sink
SELECT
    product_id,
    order_id,
    user_id,
    price,
    order_time
FROM ods_order;

数据持久化到Hive(可使用实时集成或离线集成),参考 ods_order_cdc_persist

3.2.2DWD

订单信息表关联用户信息表,关联商品信息表,将ODS层数据打宽,进入DWD明细层

CREATE TEMPORARY TABLE `ods_kafka_source` (
  `product_id` BIGINT,
  `order_id` BIGINT,
  `user_id` VARCHAR,
  `price` DOUBLE,
  `order_time` TIMESTAMP(3),
  `user_name` VARCHAR,
  `user_gender` VARCHAR,
  `user_age` BIGINT,
  `product_name` VARCHAR,
  `store_id` BIGINT,
  `store_name` VARCHAR,
  `proctime` as PROCTIME()
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<your properties.bootstrap.servers>',
  'topic' = 'ods_poc',
  'value.format' = 'json',
  'properties.group.id' = 'group_id_order_wide'
);

CREATE TEMPORARY TABLE `dwd_kafka` (
  `product_id` BIGINT,
  `order_id` BIGINT,
  `user_id` VARCHAR,
  `price` DOUBLE,
  `order_time` TIMESTAMP(3),
  `user_name` VARCHAR,
  `user_gender` VARCHAR,
  `user_age` BIGINT,
  `product_name` VARCHAR,
  `store_id` BIGINT,
  `store_name` VARCHAR
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<your properties.bootstrap.servers>',
  'topic' = 'dwd_poc',
  'value.format' = 'json',
  'properties.group.id' = 'group_id_order_wide_persist',
  'properties.enable.idempotence' = 'false'
);

INSERT INTO dwd_kafka
SELECT
    o.product_id,
    o.order_id,
    o.user_id,
    o.price,
    o.order_time,
    u.user_name,
    u.user_gender,
    u.user_age,
    p.product_name,
    p.store_id,
    p.store_name
FROM ods_kafka_source o
LEFT JOIN user_dim FOR SYSTEM_TIME AS OF o.proctime AS u
ON 
o.user_id = u.user_id
LEFT JOIN product_dim FOR SYSTEM_TIME AS OF o.proctime AS p
ON 
o.product_id = p.product_id; 

数据持久化到Hive(可使用实时集成或离线集成),参考 dwd_order_wide_persist

计算每个商家的每天的成交额,并将结果实时同步到MySQL

CREATE TEMPORARY TABLE `dwd_kafka` (
`product_id`    BIGINT,
`order_id`    BIGINT,
`user_id`    VARCHAR,
`price`    DOUBLE,
`order_time`    TIMESTAMP(3),
`user_name`    VARCHAR,
`user_gender`    VARCHAR,
`user_age`    BIGINT,
`product_name`    VARCHAR,
`store_id`    BIGINT,
`store_name`    VARCHAR
) WITH
('connector'='kafka'
,'properties.bootstrap.servers'= '<your properties.bootstrap.servers>'
,'topic'='dwd_poc'
,'value.format'='json'
,'properties.group.id'='group_id_dws_store_sale_day'
);

CREATE TEMPORARY TABLE dws_store_sale ( 
    `store_id`    BIGINT    comment '',
    `store_name` VARCHAR    comment '',
    `order_day` VARCHAR    comment '',
    `sale`    DOUBLE    comment ''
) WITH ( 
'connector'='print'
);


-- dwd层根据order_id去重,防止中间层dwd重跑产生重复数据
CREATE TEMPORARY VIEW dwd_kafka_distinct AS
SELECT
    *
FROM (
    SELECT *,
    ROW_NUMBER() OVER (
        PARTITION BY order_id ORDER BY PROCTIME()
        ) AS row_num
    FROM dwd_kafka
)
WHERE row_num <= 1;

-- 根据store_id进行聚合计算
INSERT INTO dws_store_sale_mysql
SELECT
    store_id,
    MAX(store_name) as store_name,
    DATE_FORMAT(order_time, 'yyyyMMdd') as order_day,
    SUM(price) as sale
FROM dwd_kafka_distinct
GROUP BY
    store_id,
    DATE_FORMAT(order_time, 'yyyyMMdd');    

店铺排行榜,计算平台每日前100销量的店铺,并将结果实时同步到MySQL

CREATE TEMPORARY TABLE `dwd_kafka` (
`product_id`    BIGINT,
`order_id`    BIGINT,
`user_id`    VARCHAR,
`price`    DOUBLE,
`order_time`    TIMESTAMP(3),
`user_name`    VARCHAR,
`user_gender`    VARCHAR,
`user_age`    BIGINT,
`product_name`    VARCHAR,
`store_id`    BIGINT,
`store_name`    VARCHAR
) WITH
('connector'='kafka'
,'properties.bootstrap.servers'= '<your properties.bootstrap.servers>'
,'topic'='dwd_poc'
,'value.format'='json'
,'properties.group.id'='group_id_dws_store_count_top_n'
);

CREATE TEMPORARY TABLE dws_store_count_top_n ( 
    `row_num`    BIGINT    comment '',
    `store_id`    BIGINT    comment '',
    `store_name` VARCHAR    comment '',
    `order_day` VARCHAR    comment '',
    `order_id_count`    BIGINT    comment ''
) WITH ( 
'connector'='print'
);


-- dwd层根据order_id去重,防止中间层dwd重跑产生重复数据
CREATE TEMPORARY VIEW dwd_kafka_distinct AS
SELECT
    *
FROM (
    SELECT *,
    ROW_NUMBER() OVER (
        PARTITION BY order_id ORDER BY PROCTIME()
        ) AS row_num
    FROM dwd_kafka
)
WHERE row_num <= 1;


-- 根据下单时间计算每日不同店铺的商品下单量
CREATE TEMPORARY VIEW day_count AS
SELECT 
    store_id,
    MAX(store_name) as store_name,
    COUNT(order_id) AS order_id_count,
    DATE_FORMAT(order_time, 'yyyyMMdd') as order_day
FROM dwd_kafka_distinct
GROUP BY 
    store_id,
    DATE_FORMAT(order_time, 'yyyyMMdd');


INSERT INTO dws_store_count_top_n_mysql
SELECT 
    row_num,
    store_id,
    store_name,
    order_day,
    order_id_count
FROM (
    SELECT *,
    ROW_NUMBER() OVER (
        PARTITION BY order_day ORDER BY order_id_count DESC
        ) AS row_num -- 按商品的成交量降序排序
    FROM day_count
)
WHERE row_num <= 100;
    

3.3实时数据大屏搭建

模拟业务订单数据产出,启动 gen_order 任务,每秒生成一条订单数据,每次运行将构建2000条数据
image.png
Quick BI 构建数据集,数据集中能查看到对应的表和数据
image.png
image.png
Quick BI 搭建数据大屏,数据大屏中查看店铺实时销售量排行榜,且能够看到排行前10名店铺的实时销售量
E6D8547C-3B90-4599-993B-D157C3FEBB7D.gif

作者介绍
目录