基于Flink1.13.3与hudi 0.10.0-release构建准实时数仓

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,1000CU*H 3个月
简介: 本文基于Flink1.13.3与hudi 0.10.0-release构建准实时数仓

本文基于Flink SQL与hudi构建准实时数仓,在Flink从kafka接入数据之后,即将所有数据存于hudi中,包括所有中间处理数据以及最终数据。文章《实时数仓|基于Flink1.11的SQL构建实时数仓探索实践 (qq.com)》描述了基于Flink SQL与kafka构建的实时数仓,本文以上述文章为基础。

在完成本文实践的同时可以同步参考上述文章。

最终结果:

背景介绍

本文电商业务为例,展示准实时数仓的数据处理流程。

组件与配置说明

Flink 1.13.3

flink cdc 2.0.2

hudi 0.10.0 (2021.12.08最新发布版本,地址:https://github.com/apache/hudi/releases/tag/release-0.10.0)

hadoop 3.2.0

zeppelin 0.10.0

mysql 5.7(开启binlog)

kafka 2.5.0

由于zeppelin的便捷性,本文全部基于zeppelin进行任务提交,如果您还不会用zeppelin,那么您可以参考:https://lrting-top.blog.csdn.net/article/details/120681666。当然,如果您不想用zeppelin,用Flink SQL Client提交也是完全没有问题的。

本实验Flink开启checkpoint,设置为60s。

在完成以下任务之前,请确保您已经

  • 部署好Flink 1.13.3,并将hudi对应的Jar包已经正确打包并且放置到Flink的lib目录下,将flink cdc对应的jar包放置到lib目录下。
  • 部署并启动zeppelin 0.10.0,在zeppelin的Flink interpreter上指定了FLINK_HOME以及HADOOP_CLASSPATH
  • 同时还有启动hadoop、mysql、kafka

处理流程

MySQL建表与原始数据载入


下载了上述建表语句之后,进入mysql,新建realtime_dw_demo_1,进入数据库 realtime_dw_demo_1 ,初始化数据库

mysql -u root -p

create database realtime_dw_demo_1;

use database realtime_dw_demo_1;

source realtime_table.sql

将mysql表数据同步到kafka

使用flink cdc将mysql数据同步到kafka中,以下为相关sql语句:

读取mysql源表数据

%flink.ssql

drop table if exists base_category1;
drop table if exists base_category2;
drop table if exists base_category3;
drop table if exists base_province;
drop table if exists base_region;
drop table if exists base_trademark;
drop table if exists date_info;
drop table if exists holiday_info;
drop table if exists holiday_year;
drop table if exists order_detail;
drop table if exists order_info;
drop table if exists order_status_log;
drop table if exists payment_info;
drop table if exists sku_info;
drop table if exists user_info;

---mysql table

CREATE TABLE `base_category1` (
  `id` bigint NOT NULL,
  `name` string NOT NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_category1'
);

CREATE TABLE `base_category2` (
  `id` bigint NOT NULL COMMENT '编号',
  `name` string NOT NULL COMMENT '二级分类名称',
  `category1_id` bigint  NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_category2'
);


CREATE TABLE `base_category3` (
  `id` bigint NOT NULL COMMENT '编号',
  `name` string NOT NULL COMMENT '三级分类名称',
  `category2_id` bigint  NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_category3'
);

CREATE TABLE `base_province` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT '省名称',
  `region_id` int  NULL COMMENT '大区id',
  `area_code` string  NULL COMMENT '行政区位码',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_province'
);

CREATE TABLE `base_region` (
  `id` int NOT NULL COMMENT '大区id',
  `region_name` string  NULL COMMENT '大区名称',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_region'
);

CREATE TABLE `base_trademark` (
  `tm_id` string  NULL COMMENT '品牌id',
  `tm_name` string  NULL COMMENT '品牌名称',
  PRIMARY KEY (`tm_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_trademark'
);


CREATE TABLE `date_info` (
  `date_id` int NOT NULL,
  `week_id` int  NULL,
  `week_day` int  NULL,
  `day` int  NULL,
  `month` int  NULL,
  `quarter` int  NULL,
  `year` int  NULL,
  `is_workday` int  NULL,
  `holiday_id` int  NULL,
  PRIMARY KEY (`date_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'date_info'
);


CREATE TABLE `holiday_info` (
  `holiday_id` int NOT NULL,
  `holiday_name` string  NULL,
  PRIMARY KEY (`holiday_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'holiday_info'
);

CREATE TABLE `holiday_year` (
  `year_id` int  NULL,
  `holiday_id` int  NULL,
  `start_date_id` int  NULL,
  `end_date_id` int  NULL,
  PRIMARY KEY (`end_date_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'holiday_year'
);

CREATE TABLE `order_detail` (
  `id` bigint NOT NULL COMMENT '编号',
  `order_id` bigint  NULL COMMENT '订单编号',
  `sku_id` bigint  NULL COMMENT 'sku_id',
  `sku_name` string  NULL COMMENT 'sku名称(冗余)',
  `img_url` string  NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2)  NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` string  NULL COMMENT '购买个数',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'order_detail'
);


CREATE TABLE `order_info` (
  `id` bigint NOT NULL COMMENT '编号',
  `consignee` string  NULL COMMENT '收货人',
  `consignee_tel` string  NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2)  NULL COMMENT '总金额',
  `order_status` string  NULL COMMENT '订单状态',
  `user_id` bigint  NULL COMMENT '用户id',
  `payment_way` string  NULL COMMENT '付款方式',
  `delivery_address` string  NULL COMMENT '送货地址',
  `order_comment` string  NULL COMMENT '订单备注',
  `out_trade_no` string  NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` string  NULL COMMENT '订单描述(第三方支付用)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  `operate_time` timestamp(3)  NULL COMMENT '操作时间',
  `expire_time` timestamp(3)  NULL COMMENT '失效时间',
  `tracking_no` string  NULL COMMENT '物流单编号',
  `parent_order_id` bigint  NULL COMMENT '父订单编号',
  `img_url` string  NULL COMMENT '图片路径',
  `province_id` int  NULL COMMENT '地区',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'order_info'
);


CREATE TABLE `order_status_log` (
  `id` int NOT NULL,
  `order_id` int  NULL,
  `order_status` int  NULL,
  `operate_time` timestamp(3)  NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'order_status_log'
);


CREATE TABLE `payment_info` (
  `id` bigint NOT NULL COMMENT '编号',
  `out_trade_no` string  NULL COMMENT '对外业务编号',
  `order_id` string  NULL COMMENT '订单编号',
  `user_id` string  NULL COMMENT '用户编号',
  `alipay_trade_no` string  NULL COMMENT '支付宝交易流水编号',
  `total_amount` decimal(16,2)  NULL COMMENT '支付金额',
  `subject` string  NULL COMMENT '交易内容',
  `payment_type` string  NULL COMMENT '支付方式',
  `payment_time` string  NULL COMMENT '支付时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'payment_info'
);


CREATE TABLE `sku_info` (
  `id` bigint NOT NULL COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT '价格',
  `sku_name` string  NULL COMMENT 'sku名称',
  `sku_desc` string  NULL COMMENT '商品规格描述',
  `weight` decimal(10,2)  NULL COMMENT '重量',
  `tm_id` bigint  NULL COMMENT '品牌(冗余)',
  `category3_id` bigint  NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` string  NULL COMMENT '默认显示图片(冗余)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'sku_info'
);

CREATE TABLE `user_info` (
  `id` bigint NOT NULL COMMENT '编号',
  `login_name` string  NULL COMMENT '用户名称',
  `nick_name` string  NULL COMMENT '用户昵称',
  `passwd` string  NULL COMMENT '用户密码',
  `name` string  NULL COMMENT '用户姓名',
  `phone_num` string  NULL COMMENT '手机号',
  `email` string  NULL COMMENT '邮箱',
  `head_img` string  NULL COMMENT '头像',
  `user_level` string  NULL COMMENT '用户级别',
  `birthday` date  NULL COMMENT '用户生日',
  `gender` string  NULL COMMENT '性别 M男,F女',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'user_info'
);

kafka sink表建表语句

%flink.ssql

drop table if exists base_category1_topic;
drop table if exists base_category2_topic;
drop table if exists base_category3_topic;
drop table if exists base_province_topic;
drop table if exists base_region_topic;
drop table if exists base_trademark_topic;
drop table if exists date_info_topic;
drop table if exists holiday_info_topic;
drop table if exists holiday_year_topic;
drop table if exists order_detail_topic;
drop table if exists order_info_topic;
drop table if exists order_status_log_topic;
drop table if exists payment_info_topic;
drop table if exists sku_info_topic;
drop table if exists user_info_topic;

CREATE TABLE `base_category1_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category1'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_category2_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '二级分类名称',
  `category1_id` bigint  NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category2'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `base_category3_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '三级分类名称',
  `category2_id` bigint  NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category3'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_province_topic` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT '省名称',
  `region_id` int  NULL COMMENT '大区id',
  `area_code` string  NULL COMMENT '行政区位码'
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_province'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_region_topic` (
  `id` int NOT NULL COMMENT '大区id',
  `region_name` string  NULL COMMENT '大区名称',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_region'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_trademark_topic` (
  `tm_id` string  NULL COMMENT '品牌id',
  `tm_name` string  NULL COMMENT '品牌名称',
  PRIMARY KEY (`tm_id`) NOT ENFORCED

)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_trademark'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `date_info_topic` (
  `date_id` int NOT NULL,
  `week_id` int  NULL,
  `week_day` int  NULL,
  `day` int  NULL,
  `month` int  NULL,
  `quarter` int  NULL,
  `year` int  NULL,
  `is_workday` int  NULL,
  `holiday_id` int  NULL,
  PRIMARY KEY (`date_id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.date_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `holiday_info_topic` (
  `holiday_id` int NOT NULL,
  `holiday_name` string  NULL,
  PRIMARY KEY (`holiday_id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.holiday_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `holiday_year_topic` (
  `year_id` int  NULL,
  `holiday_id` int  NULL,
  `start_date_id` int  NULL,
  `end_date_id` int  NULL
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.holiday_year'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `order_detail_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `order_id` bigint  NULL COMMENT '订单编号',
  `sku_id` bigint  NULL COMMENT 'sku_id',
  `sku_name` string  NULL COMMENT 'sku名称(冗余)',
  `img_url` string  NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2)  NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` string  NULL COMMENT '购买个数',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_detail'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `order_info_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `consignee` string  NULL COMMENT '收货人',
  `consignee_tel` string  NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2)  NULL COMMENT '总金额',
  `order_status` string  NULL COMMENT '订单状态',
  `user_id` bigint  NULL COMMENT '用户id',
  `payment_way` string  NULL COMMENT '付款方式',
  `delivery_address` string  NULL COMMENT '送货地址',
  `order_comment` string  NULL COMMENT '订单备注',
  `out_trade_no` string  NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` string  NULL COMMENT '订单描述(第三方支付用)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  `operate_time` timestamp(3)  NULL COMMENT '操作时间',
  `expire_time` timestamp(3)  NULL COMMENT '失效时间',
  `tracking_no` string  NULL COMMENT '物流单编号',
  `parent_order_id` bigint  NULL COMMENT '父订单编号',
  `img_url` string  NULL COMMENT '图片路径',
  `province_id` int  NULL COMMENT '地区',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `order_status_log_topic` (
  `id` int NOT NULL ,
  `order_id` int  NULL,
  `order_status` int  NULL,
  `operate_time` timestamp(3)  NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_status_log'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `payment_info_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `out_trade_no` string  NULL COMMENT '对外业务编号',
  `order_id` string  NULL COMMENT '订单编号',
  `user_id` string  NULL COMMENT '用户编号',
  `alipay_trade_no` string  NULL COMMENT '支付宝交易流水编号',
  `total_amount` decimal(16,2)  NULL COMMENT '支付金额',
  `subject` string  NULL COMMENT '交易内容',
  `payment_type` string  NULL COMMENT '支付方式',
  `payment_time` string  NULL COMMENT '支付时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.payment_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `sku_info_topic` (
  `id` bigint NOT NULL  COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT '价格',
  `sku_name` string  NULL COMMENT 'sku名称',
  `sku_desc` string  NULL COMMENT '商品规格描述',
  `weight` decimal(10,2)  NULL COMMENT '重量',
  `tm_id` bigint  NULL COMMENT '品牌(冗余)',
  `category3_id` bigint  NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` string  NULL COMMENT '默认显示图片(冗余)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.sku_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `user_info_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `login_name` string  NULL COMMENT '用户名称',
  `nick_name` string  NULL COMMENT '用户昵称',
  `passwd` string  NULL COMMENT '用户密码',
  `name` string  NULL COMMENT '用户姓名',
  `phone_num` string  NULL COMMENT '手机号',
  `email` string  NULL COMMENT '邮箱',
  `head_img` string  NULL COMMENT '头像',
  `user_level` string  NULL COMMENT '用户级别',
  `birthday` date  NULL COMMENT '用户生日',
  `gender` varchar(1)  NULL COMMENT '性别 M男,F女',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.user_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

insert语句,将mysql binlog数据导入kafka对应的topic

%flink.ssql(runAsOne=true)
insert into base_category1_topic select * from base_category1;
insert into base_category2_topic select * from base_category2;
insert into base_category3_topic select * from base_category3;
insert into base_province_topic select * from base_province;
insert into base_region_topic select * from base_region;
insert into base_trademark_topic select * from base_trademark;
insert into date_info_topic select * from date_info;
insert into holiday_info_topic select * from holiday_info;
insert into holiday_year_topic select * from holiday_year;
insert into order_detail_topic select * from order_detail;
insert into order_info_topic select * from order_info;
insert into order_status_log_topic select * from order_status_log;
insert into payment_info_topic select * from payment_info;
insert into sku_info_topic select * from sku_info;
insert into user_info_topic select * from user_info;

将维表数据导入hudi

将my5.base_province和my1.base_region两张区域维表信息写入hudi COW表中

%flink.ssql
drop table if exists base_province_topic_source;
drop table if exists base_province_hudi;

CREATE TABLE `base_province_topic_source` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT '省名称',
  `region_id` int  NULL COMMENT '大区id',
  `area_code` string  NULL COMMENT '行政区位码'
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_province'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_province_hudi` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT '省名称',
  `region_id` int  NULL COMMENT '大区id',
  `area_code` string  NULL COMMENT '行政区位码',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_province_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into base_province_hudi select * from base_province_topic_source;
%flink.ssql
drop table if exists base_region_topic_source;
drop table if exists base_region_hudi;

CREATE TABLE `base_region_topic_source` (
  `id` int NOT NULL COMMENT '大区id',
  `region_name` string  NULL COMMENT '大区名称',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_region'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_region_hudi` (
  `id` int NOT NULL COMMENT '大区id',
  `region_name` string  NULL COMMENT '大区名称',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_region_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);

%flink.ssql
insert into base_region_hudi select * from base_region_topic_source;

使用上述两张维表创建dim_province表

%flink.ssql
DROP TABLE IF EXISTS dim_province_hudi;
create table dim_province_hudi (
  province_id INT,
  province_name STRING,
  area_code STRING,
  region_id INT,
  region_name STRING ,
  PRIMARY KEY (province_id) NOT ENFORCED
) with (
    'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/dim_province_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'province_id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql

insert into dim_province_hudi
SELECT
  bp.id AS province_id,
  bp.name AS province_name,
  bp.area_code AS area_code,
  br.id AS region_id,
  br.region_name AS region_name
FROM base_region_hudi br 
     JOIN base_province_hudi bp ON br.id= bp.region_id
;

将商品维表my5.base_category1和my5.base_category2两张商品维表信息写入hudi COW表

%flink.ssql
drop table if exists base_category1_topic_source;
drop table if exists base_category1_hudi;
CREATE TABLE `base_category1_topic_source` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category1'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_category1_hudi` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_category1_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);

%flink.ssql
insert into base_category1_hudi select * from base_category1_topic_source;
%flink.ssql
drop table if exists base_category2_topic_source;
drop table if exists base_category2_hudi;
CREATE TABLE `base_category2_topic_source` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  `category1_id` bigint NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category2'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_category2_hudi` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  `category1_id` bigint NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_category2_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);

%flink.ssql
insert into base_category2_hudi select * from base_category2_topic_source;
%flink.ssql
drop table if exists base_category3_topic_source;
drop table if exists base_category3_hudi;
CREATE TABLE `base_category3_topic_source` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  `category2_id` bigint NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category3'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_category3_hudi` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  `category2_id` bigint NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_category3_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);

%flink.ssql
insert into base_category3_hudi select * from base_category3_topic_source;

将商品表导入hudi

%flink.ssql
drop table if exists sku_info_topic_source;
drop table if exists sku_info_topic_hudi;

CREATE TABLE `sku_info_topic_source` (
  `id` bigint NOT NULL  COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT '价格',
  `sku_name` string  NULL COMMENT 'sku名称',
  `sku_desc` string  NULL COMMENT '商品规格描述',
  `weight` decimal(10,2)  NULL COMMENT '重量',
  `tm_id` bigint  NULL COMMENT '品牌(冗余)',
  `category3_id` bigint  NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` string  NULL COMMENT '默认显示图片(冗余)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.sku_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `sku_info_topic_hudi` (
  `id` bigint NOT NULL  COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT '价格',
  `sku_name` string  NULL COMMENT 'sku名称',
  `sku_desc` string  NULL COMMENT '商品规格描述',
  `weight` decimal(10,2)  NULL COMMENT '重量',
  `tm_id` bigint  NULL COMMENT '品牌(冗余)',
  `category3_id` bigint  NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` string  NULL COMMENT '默认显示图片(冗余)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/sku_info_topic_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into sku_info_topic_hudi select * from sku_info_topic_source;

基于上述步骤,我们把商品维表的基础数据同步到hudi中,同样我们使用商品维表创建dim_sku_info视图

%flink.ssql
drop view if exists dim_sku_info;
CREATE VIEW dim_sku_info AS
SELECT
  si.id AS id,
  si.sku_name AS sku_name,
  si.category3_id AS c3_id,
  si.weight AS weight,
  si.tm_id AS tm_id,
  si.price AS price,
  si.spu_id AS spu_id,
  c3.name AS c3_name,
  c2.id AS c2_id,
  c2.name AS c2_name,
  c3.id AS c1_id,
  c3.name AS c1_name
FROM
  sku_info_topic_hudi si 
  JOIN base_category3_hudi c3 ON si.category3_id = c3.id
  JOIN base_category2_hudi c2 ON c3.category2_id =c2.id
  JOIN base_category1_hudi c1 ON c2.category1_id = c1.id
;

DWD层数据处理

经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。

%flink.ssql
drop table if exists ods_order_detail_topic;
drop table if exists ods_order_info_topic;
drop table if exists dwd_paid_order_detail_hudi;

CREATE TABLE `ods_order_detail_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `order_id` bigint  NULL COMMENT '订单编号',
  `sku_id` bigint  NULL COMMENT 'sku_id',
  `sku_name` string  NULL COMMENT 'sku名称(冗余)',
  `img_url` string  NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2)  NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` int  NULL COMMENT '购买个数',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_detail'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset' 
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `ods_order_info_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `consignee` string  NULL COMMENT '收货人',
  `consignee_tel` string  NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2)  NULL COMMENT '总金额',
  `order_status` string  NULL COMMENT '订单状态',
  `user_id` bigint  NULL COMMENT '用户id',
  `payment_way` string  NULL COMMENT '付款方式',
  `delivery_address` string  NULL COMMENT '送货地址',
  `order_comment` string  NULL COMMENT '订单备注',
  `out_trade_no` string  NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` string  NULL COMMENT '订单描述(第三方支付用)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  `operate_time` timestamp(3)  NULL COMMENT '操作时间',
  `expire_time` timestamp(3)  NULL COMMENT '失效时间',
  `tracking_no` string  NULL COMMENT '物流单编号',
  `parent_order_id` bigint  NULL COMMENT '父订单编号',
  `img_url` string  NULL COMMENT '图片路径',
  `province_id` int  NULL COMMENT '地区',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset' 
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE dwd_paid_order_detail_hudi
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,0),
  create_time TIMESTAMP(3),
  pay_time  TIMESTAMP(3),
  primary key (detail_id) not enforced
 ) WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/dwd_paid_order_detail_hudi',
    'scan.startup.mode' = 'earliest-offset',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);
%flink.ssql

insert into dwd_paid_order_detail_hudi
SELECT
  od.id,
  oi.id order_id,
  oi.user_id,
  oi.province_id,
  od.sku_id,
  od.sku_name,
  od.sku_num,
  od.order_price,
  oi.create_time,
  oi.operate_time
FROM
    (
    SELECT * 
    FROM ods_order_info_topic
    WHERE order_status = '2' -- 已支付
    ) oi JOIN
    (
    SELECT *
    FROM ods_order_detail_topic
    ) od 
    ON oi.id = od.order_id;

ADS层数据

经过上面的步骤,我们创建了一张dwd_paid_order_detail明细宽表,并将该表存储在了hudi中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。

ads_province_index_hudi

%flink.ssql
drop table if exists ads_province_index_hudi;
drop table if exists tmp_province_index_hudi;

-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个省份的订单数
--      2.每天每个省份的订单金额
-- ---------------------------------

CREATE TABLE ads_province_index_hudi(
  province_id INT,
  area_code STRING,
  province_name STRING,
  region_id INT,
  region_name STRING,
  order_amount DECIMAL(10,2),
  order_count BIGINT,
  dt STRING,
  PRIMARY KEY (province_id, dt) NOT ENFORCED  
) WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/ads_province_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);

-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表
-- ---------------------------------

CREATE TABLE tmp_province_index_hudi(
    province_id INT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    pay_date DATE,
    primary key(province_id) not enforced
)WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/tmp_province_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'

);

%flink.ssql
-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表数据装载
-- ---------------------------------
INSERT INTO tmp_province_index_hudi
SELECT
      province_id,
      count(distinct order_id) order_count,-- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
      TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date
FROM dwd_paid_order_detail_hudi
GROUP BY province_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql

INSERT INTO ads_province_index_hudi
SELECT
  pc.province_id,
  dp.area_code,
  dp.province_name,
  dp.region_id,
  dp.region_name,
  pc.order_amount,
  pc.order_count,
  cast(pc.pay_date as VARCHAR)
FROM
tmp_province_index_hudi pc
  JOIN dim_province_hudi as dp 
  ON dp.province_id = pc.province_id;

查看ADS层的ads_province_index_hudi表数据:

ads_sku_index_hudi

%flink.ssql

-- ---------------------------------
-- 使用 DDL创建hudi中的ADS层表
-- 指标:1.每天每个商品对应的订单个数
--      2.每天每个商品对应的订单金额
--      3.每天每个商品对应的数量
-- ---------------------------------


drop table if exists ads_sku_index_hudi;
CREATE TABLE ads_sku_index_hudi
(
  sku_id BIGINT,
  sku_name VARCHAR,
  weight DOUBLE,
  tm_id BIGINT,
  price DOUBLE,
  spu_id BIGINT,
  c3_id BIGINT,
  c3_name VARCHAR ,
  c2_id BIGINT,
  c2_name VARCHAR,
  c1_id BIGINT,
  c1_name VARCHAR,
  order_amount DOUBLE,
  order_count BIGINT,
  sku_count BIGINT,
  dt varchar,
  PRIMARY KEY (sku_id,dt) NOT ENFORCED
) with (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/ads_sku_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);

-- ---------------------------------
-- tmp_sku_index
-- 商品指标统计
-- ---------------------------------
drop table if exists tmp_sku_index_hudi;
CREATE TABLE tmp_sku_index_hudi(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    order_sku_num BIGINT,
    pay_date DATE,
    PRIMARY KEY (sku_id) NOT ENFORCED
)WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/tmp_sku_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);
%flink.ssql

-- ---------------------------------
-- tmp_sku_index
-- 数据装载
-- ---------------------------------

INSERT INTO tmp_sku_index_hudi
SELECT
      sku_id,
      count(distinct order_id) order_count, -- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
      sum(sku_num) order_sku_num,
      TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date
FROM dwd_paid_order_detail_hudi
GROUP BY sku_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql
INSERT INTO ads_sku_index_hudi
SELECT
  sku_id ,
  sku_name ,
  weight ,
  tm_id ,
  price ,
  spu_id ,
  c3_id ,
  c3_name,
  c2_id ,
  c2_name ,
  c1_id ,
  c1_name ,
  sc.order_amount,
  sc.order_count ,
  sc.order_sku_num ,
  cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_hudi sc 
  JOIN dim_sku_info ds
  ON ds.id = sc.sku_id
%flink.ssql
select * from ads_sku_index_hudi;

相关文章
|
2月前
|
存储 JSON 数据处理
Flink基于Paimon的实时湖仓解决方案的演进
本文源自Apache CommunityOverCode Asia 2025,阿里云专家苏轩楠分享Flink与Paimon构建实时湖仓的演进实践。深度解析Variant数据类型、Lookup Join优化等关键技术,提升半结构化数据处理效率与系统可扩展性,推动实时湖仓在生产环境的高效落地。
247 0
Flink基于Paimon的实时湖仓解决方案的演进
消息中间件 存储 传感器
129 0
|
2月前
|
存储 人工智能 监控
淘宝闪购基于Flink&Paimon的Lakehouse生产实践:从实时数仓到湖仓一体化的演进之路
本文整理自淘宝闪购(饿了么)大数据架构师王沛斌在 Flink Forward Asia 2025 上海站的分享,深度解析其基于 Apache Flink 与 Paimon 的 Lakehouse 架构演进与落地实践,涵盖实时数仓发展、技术选型、平台建设及未来展望。
468 0
淘宝闪购基于Flink&Paimon的Lakehouse生产实践:从实时数仓到湖仓一体化的演进之路
|
3月前
|
机器学习/深度学习 算法 大数据
构建数据中台,为什么“湖仓一体”成了大厂标配?
在大数据时代,数据湖与数据仓库各具优势,但单一架构难以应对复杂业务需求。湖仓一体通过融合数据湖的灵活性与数据仓的规范性,实现数据分层治理、统一调度,既能承载海量多源数据,又能支撑高效分析决策,成为企业构建数据中台、推动智能化转型的关键路径。
|
4月前
|
存储 SQL 分布式计算
MaxCompute x 聚水潭:基于近实时数仓解决方案构建统一增全量一体化数据链路
聚水潭作为中国领先的电商SaaS ERP服务商,致力于为88,400+客户提供全链路数字化解决方案。其核心ERP产品助力企业实现数据驱动的智能决策。为应对业务扩展带来的数据处理挑战,聚水潭采用MaxCompute近实时数仓Delta Table方案,有效提升数据新鲜度和计算效率,提效比例超200%,资源消耗显著降低。未来,聚水潭将进一步优化数据链路,结合MaxQA实现实时分析,赋能商家快速响应市场变化。
175 0
|
4月前
|
分布式计算 Serverless OLAP
实时数仓Hologres V3.1版本发布,Serverless型实例从零开始构建OLAP系统
Hologres推出Serverless型实例,支持按需计费、无需独享资源,适合新业务探索分析。高性能查询内表及MaxCompute/OSS外表,弹性扩展至512CU,性能媲美主流开源产品。新增Dynamic Table升级、直读架构优化及ChatBI解决方案,助力高效数据分析。
实时数仓Hologres V3.1版本发布,Serverless型实例从零开始构建OLAP系统
|
6月前
|
存储 消息中间件 OLAP
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
本文整理自淘天集团高级数据开发工程师朱奥在Flink Forward Asia 2024的分享,围绕实时数仓优化展开。内容涵盖项目背景、核心策略、解决方案、项目价值及未来计划五部分。通过引入Paimon和Hologres技术,解决当前流批存储不统一、实时数据可见性差等痛点,实现流批一体存储与高效近实时数据加工。项目显著提升了数据时效性和开发运维效率,降低了使用门槛与成本,并规划未来在集团内推广湖仓一体架构,探索更多技术创新场景。
1252 3
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
|
6月前
|
人工智能 自然语言处理 数据挖掘
云上玩转Qwen3系列之三:PAI-LangStudio x Hologres构建ChatBI数据分析Agent应用
PAI-LangStudio 和 Qwen3 构建基于 MCP 协议的 Hologres ChatBI 智能 Agent 应用,通过将 Agent、MCP Server 等技术和阿里最新的推理模型 Qwen3 编排在一个应用流中,为大模型提供了 MCP+OLAP 的智能数据分析能力,使用自然语言即可实现 OLAP 数据分析的查询效果,减少了幻觉。开发者可以基于该模板进行灵活扩展和二次开发,以满足特定场景的需求。
|
7月前
|
自然语言处理 安全 数据挖掘
Hologres+函数计算+Qwen3,对接MCP构建企业级数据分析 Agent
本文介绍了通过阿里云Hologres、函数计算FC和通义千问Qwen3构建企业级数据分析Agent的解决方案。大模型在数据分析中潜力巨大,但面临实时数据接入与跨系统整合等挑战。MCP(模型上下文协议)提供标准化接口,实现AI模型与外部资源解耦。方案利用SSE模式连接,具备高实时性、良好解耦性和轻量级特性。Hologres作为高性能实时数仓,支持多源数据毫秒级接入与分析;函数计算FC以Serverless模式部署,弹性扩缩降低成本;Qwen3则具备强大的推理与多语言能力。用户可通过ModelScope的MCP Playground快速体验,结合TPC-H样例数据完成复杂查询任务。
|
7月前
|
人工智能 关系型数据库 OLAP
光云科技 X AnalyticDB:构建 AI 时代下的云原生企业级数仓
AnalyticDB承载了光云海量数据的实时在线分析,为各个业务线的商家提供了丝滑的数据服务,实时物化视图、租户资源隔离、冷热分离等企业级特性,很好的解决了SaaS场景下的业务痛点,也平衡了成本。同时也基于通义+AnalyticDB研发了企业级智能客服、智能导购等行业解决方案,借助大模型和云计算为商家赋能。
508 17

热门文章

最新文章

下一篇
开通oss服务