1.背景
实时数据大屏是实时计算的重要应用场景之一,广泛应用在电商业务中,用于实时监控和分析电商平台的运营情况。通过大屏展示实时的销售额、订单量、用户活跃度、商品热度等数据指标,帮助业务人员随时了解业务的实时状态,快速发现问题和机会。同时,通过数据可视化和趋势分析,大屏也提供了决策支持和优化运营的功能,帮助业务人员做出及时的决策和调整策略,优化电商业务的运营效果。
下面以电商业务为背景,介绍如何构建经典实时数仓,实现实时数据从业务库到ODS层、DWD层、DWS层全链路流转,基于Dataphin和Quick BI实现实时数据大屏。
2.实时数仓架构设计:
- Flink:
作为流式计算引擎,用于实时处理和计算实时数据。
在实时数据大屏中,Flink可以从Kafka中读取实时的流数据,进行实时的数据处理、分析和计算。可以进行数据过滤、转换、聚合等操作,生成实时的统计指标和计算结果,为实时数据大屏提供数据支持。 - MySQL:
作为关系型数据库,用于存储和管理电商业务的核心数据,如用户信息、订单信息、商品信息等。
在实时数据大屏中,MySQL可以作为数据存储的一部分,存储一些关键数据(如维度数据、关键结果数据等),以供后续的查询和分析。 - Hive:
作为数据仓库,用于存储和管理大规模数据集。
在实时数据大屏中,Hive可以作为离线数据存储的一部分,存储历史数据、批处理数据等。可以进行数据清洗、转换和聚合等操作,用于离线分析和报表展示。 - Kafka:
作为分布式消息队列,用于实时数据的高吞吐量、低延迟的传输和存储。
在实时数据大屏中,Kafka扮演了数据转发和传输的角色。电商平台的各类实时数据,如用户行为、订单信息等,可以通过Kafka进行实时的消息传递和存储。
3.具体实现过程:
3.1数据准备
3.1.1数据源
新建MySQL、Kafka数据源:
3.1.2元表
元表是通过数据管理的跨存储类型表,开发过程中所用到的输入表、输出表、维表可以通过创建元表进行创建和管理,元表具有以下优势:
安全可靠:通过元表可以有效避免直接编写原生Flink DDL语句导致的敏感信息透出问题。
提升效率和体验:通过一次建表,可多次引用。无需重复编写DDL语句,无需进行繁杂的输入、输出、维表映射。简化开发,提升效率和体验。
资产血缘:通过元表可以维护上下游的资产血缘信息。
新建Kafka、MySQL元表:
3.2实时数仓构建
新建Flink SQL任务,可以选择引用示例代码,使用cdc实时数据同步入湖/入仓示例代码,通过简单的配置和选择即可自动生成可运行的Flink SQL
3.2.1ODS
Flink CDC做实时数据采集,MySQL中的原始业务数据(订单信息表)实时同步到中kafka做流式数仓ODS层
CREATE TEMPORARY TABLE `ods_kafka_sink` (
`product_id` BIGINT,
`order_id` BIGINT NOT NULL,
`user_id` VARCHAR,
`price` DOUBLE,
`order_time` TIMESTAMP(3),
PRIMARY KEY (`order_id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = '<your properties.bootstrap.servers>',
'key.format' = 'json',
'topic' = 'ods_poc',
'value.format' = 'json',
'properties.group.id' = 'group_id_order_wide_persist',
'properties.enable.idempotence' = 'false'
);
INSERT INTO ods_kafka_sink
SELECT
product_id,
order_id,
user_id,
price,
order_time
FROM ods_order;
数据持久化到Hive(可使用实时集成或离线集成),参考 ods_order_cdc_persist
3.2.2DWD
订单信息表关联用户信息表,关联商品信息表,将ODS层数据打宽,进入DWD明细层
CREATE TEMPORARY TABLE `ods_kafka_source` (
`product_id` BIGINT,
`order_id` BIGINT,
`user_id` VARCHAR,
`price` DOUBLE,
`order_time` TIMESTAMP(3),
`user_name` VARCHAR,
`user_gender` VARCHAR,
`user_age` BIGINT,
`product_name` VARCHAR,
`store_id` BIGINT,
`store_name` VARCHAR,
`proctime` as PROCTIME()
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<your properties.bootstrap.servers>',
'topic' = 'ods_poc',
'value.format' = 'json',
'properties.group.id' = 'group_id_order_wide'
);
CREATE TEMPORARY TABLE `dwd_kafka` (
`product_id` BIGINT,
`order_id` BIGINT,
`user_id` VARCHAR,
`price` DOUBLE,
`order_time` TIMESTAMP(3),
`user_name` VARCHAR,
`user_gender` VARCHAR,
`user_age` BIGINT,
`product_name` VARCHAR,
`store_id` BIGINT,
`store_name` VARCHAR
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<your properties.bootstrap.servers>',
'topic' = 'dwd_poc',
'value.format' = 'json',
'properties.group.id' = 'group_id_order_wide_persist',
'properties.enable.idempotence' = 'false'
);
INSERT INTO dwd_kafka
SELECT
o.product_id,
o.order_id,
o.user_id,
o.price,
o.order_time,
u.user_name,
u.user_gender,
u.user_age,
p.product_name,
p.store_id,
p.store_name
FROM ods_kafka_source o
LEFT JOIN user_dim FOR SYSTEM_TIME AS OF o.proctime AS u
ON
o.user_id = u.user_id
LEFT JOIN product_dim FOR SYSTEM_TIME AS OF o.proctime AS p
ON
o.product_id = p.product_id;
数据持久化到Hive(可使用实时集成或离线集成),参考 dwd_order_wide_persist
计算每个商家的每天的成交额,并将结果实时同步到MySQL
CREATE TEMPORARY TABLE `dwd_kafka` (
`product_id` BIGINT,
`order_id` BIGINT,
`user_id` VARCHAR,
`price` DOUBLE,
`order_time` TIMESTAMP(3),
`user_name` VARCHAR,
`user_gender` VARCHAR,
`user_age` BIGINT,
`product_name` VARCHAR,
`store_id` BIGINT,
`store_name` VARCHAR
) WITH
('connector'='kafka'
,'properties.bootstrap.servers'= '<your properties.bootstrap.servers>'
,'topic'='dwd_poc'
,'value.format'='json'
,'properties.group.id'='group_id_dws_store_sale_day'
);
CREATE TEMPORARY TABLE dws_store_sale (
`store_id` BIGINT comment '',
`store_name` VARCHAR comment '',
`order_day` VARCHAR comment '',
`sale` DOUBLE comment ''
) WITH (
'connector'='print'
);
-- dwd层根据order_id去重,防止中间层dwd重跑产生重复数据
CREATE TEMPORARY VIEW dwd_kafka_distinct AS
SELECT
*
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY order_id ORDER BY PROCTIME()
) AS row_num
FROM dwd_kafka
)
WHERE row_num <= 1;
-- 根据store_id进行聚合计算
INSERT INTO dws_store_sale_mysql
SELECT
store_id,
MAX(store_name) as store_name,
DATE_FORMAT(order_time, 'yyyyMMdd') as order_day,
SUM(price) as sale
FROM dwd_kafka_distinct
GROUP BY
store_id,
DATE_FORMAT(order_time, 'yyyyMMdd');
店铺排行榜,计算平台每日前100销量的店铺,并将结果实时同步到MySQL
CREATE TEMPORARY TABLE `dwd_kafka` (
`product_id` BIGINT,
`order_id` BIGINT,
`user_id` VARCHAR,
`price` DOUBLE,
`order_time` TIMESTAMP(3),
`user_name` VARCHAR,
`user_gender` VARCHAR,
`user_age` BIGINT,
`product_name` VARCHAR,
`store_id` BIGINT,
`store_name` VARCHAR
) WITH
('connector'='kafka'
,'properties.bootstrap.servers'= '<your properties.bootstrap.servers>'
,'topic'='dwd_poc'
,'value.format'='json'
,'properties.group.id'='group_id_dws_store_count_top_n'
);
CREATE TEMPORARY TABLE dws_store_count_top_n (
`row_num` BIGINT comment '',
`store_id` BIGINT comment '',
`store_name` VARCHAR comment '',
`order_day` VARCHAR comment '',
`order_id_count` BIGINT comment ''
) WITH (
'connector'='print'
);
-- dwd层根据order_id去重,防止中间层dwd重跑产生重复数据
CREATE TEMPORARY VIEW dwd_kafka_distinct AS
SELECT
*
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY order_id ORDER BY PROCTIME()
) AS row_num
FROM dwd_kafka
)
WHERE row_num <= 1;
-- 根据下单时间计算每日不同店铺的商品下单量
CREATE TEMPORARY VIEW day_count AS
SELECT
store_id,
MAX(store_name) as store_name,
COUNT(order_id) AS order_id_count,
DATE_FORMAT(order_time, 'yyyyMMdd') as order_day
FROM dwd_kafka_distinct
GROUP BY
store_id,
DATE_FORMAT(order_time, 'yyyyMMdd');
INSERT INTO dws_store_count_top_n_mysql
SELECT
row_num,
store_id,
store_name,
order_day,
order_id_count
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY order_day ORDER BY order_id_count DESC
) AS row_num -- 按商品的成交量降序排序
FROM day_count
)
WHERE row_num <= 100;
3.3实时数据大屏搭建
模拟业务订单数据产出,启动 gen_order 任务,每秒生成一条订单数据,每次运行将构建2000条数据
Quick BI 构建数据集,数据集中能查看到对应的表和数据
Quick BI 搭建数据大屏,数据大屏中查看店铺实时销售量排行榜,且能够看到排行前10名店铺的实时销售量