基于Flink1.13.3与hudi 0.10.0-release构建准实时数仓

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时数仓Hologres,5000CU*H 100GB 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 本文基于Flink1.13.3与hudi 0.10.0-release构建准实时数仓

本文基于Flink SQL与hudi构建准实时数仓,在Flink从kafka接入数据之后,即将所有数据存于hudi中,包括所有中间处理数据以及最终数据。文章《实时数仓|基于Flink1.11的SQL构建实时数仓探索实践 (qq.com)》描述了基于Flink SQL与kafka构建的实时数仓,本文以上述文章为基础。

在完成本文实践的同时可以同步参考上述文章。

最终结果:

背景介绍

本文电商业务为例,展示准实时数仓的数据处理流程。

组件与配置说明

Flink 1.13.3

flink cdc 2.0.2

hudi 0.10.0 (2021.12.08最新发布版本,地址:https://github.com/apache/hudi/releases/tag/release-0.10.0)

hadoop 3.2.0

zeppelin 0.10.0

mysql 5.7(开启binlog)

kafka 2.5.0

由于zeppelin的便捷性,本文全部基于zeppelin进行任务提交,如果您还不会用zeppelin,那么您可以参考:https://lrting-top.blog.csdn.net/article/details/120681666。当然,如果您不想用zeppelin,用Flink SQL Client提交也是完全没有问题的。

本实验Flink开启checkpoint,设置为60s。

在完成以下任务之前,请确保您已经

  • 部署好Flink 1.13.3,并将hudi对应的Jar包已经正确打包并且放置到Flink的lib目录下,将flink cdc对应的jar包放置到lib目录下。
  • 部署并启动zeppelin 0.10.0,在zeppelin的Flink interpreter上指定了FLINK_HOME以及HADOOP_CLASSPATH
  • 同时还有启动hadoop、mysql、kafka

处理流程

MySQL建表与原始数据载入


下载了上述建表语句之后,进入mysql,新建realtime_dw_demo_1,进入数据库 realtime_dw_demo_1 ,初始化数据库

mysql -u root -p

create database realtime_dw_demo_1;

use database realtime_dw_demo_1;

source realtime_table.sql

将mysql表数据同步到kafka

使用flink cdc将mysql数据同步到kafka中,以下为相关sql语句:

读取mysql源表数据

%flink.ssql

drop table if exists base_category1;
drop table if exists base_category2;
drop table if exists base_category3;
drop table if exists base_province;
drop table if exists base_region;
drop table if exists base_trademark;
drop table if exists date_info;
drop table if exists holiday_info;
drop table if exists holiday_year;
drop table if exists order_detail;
drop table if exists order_info;
drop table if exists order_status_log;
drop table if exists payment_info;
drop table if exists sku_info;
drop table if exists user_info;

---mysql table

CREATE TABLE `base_category1` (
  `id` bigint NOT NULL,
  `name` string NOT NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_category1'
);

CREATE TABLE `base_category2` (
  `id` bigint NOT NULL COMMENT '编号',
  `name` string NOT NULL COMMENT '二级分类名称',
  `category1_id` bigint  NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_category2'
);


CREATE TABLE `base_category3` (
  `id` bigint NOT NULL COMMENT '编号',
  `name` string NOT NULL COMMENT '三级分类名称',
  `category2_id` bigint  NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_category3'
);

CREATE TABLE `base_province` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT '省名称',
  `region_id` int  NULL COMMENT '大区id',
  `area_code` string  NULL COMMENT '行政区位码',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_province'
);

CREATE TABLE `base_region` (
  `id` int NOT NULL COMMENT '大区id',
  `region_name` string  NULL COMMENT '大区名称',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_region'
);

CREATE TABLE `base_trademark` (
  `tm_id` string  NULL COMMENT '品牌id',
  `tm_name` string  NULL COMMENT '品牌名称',
  PRIMARY KEY (`tm_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_trademark'
);


CREATE TABLE `date_info` (
  `date_id` int NOT NULL,
  `week_id` int  NULL,
  `week_day` int  NULL,
  `day` int  NULL,
  `month` int  NULL,
  `quarter` int  NULL,
  `year` int  NULL,
  `is_workday` int  NULL,
  `holiday_id` int  NULL,
  PRIMARY KEY (`date_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'date_info'
);


CREATE TABLE `holiday_info` (
  `holiday_id` int NOT NULL,
  `holiday_name` string  NULL,
  PRIMARY KEY (`holiday_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'holiday_info'
);

CREATE TABLE `holiday_year` (
  `year_id` int  NULL,
  `holiday_id` int  NULL,
  `start_date_id` int  NULL,
  `end_date_id` int  NULL,
  PRIMARY KEY (`end_date_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'holiday_year'
);

CREATE TABLE `order_detail` (
  `id` bigint NOT NULL COMMENT '编号',
  `order_id` bigint  NULL COMMENT '订单编号',
  `sku_id` bigint  NULL COMMENT 'sku_id',
  `sku_name` string  NULL COMMENT 'sku名称(冗余)',
  `img_url` string  NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2)  NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` string  NULL COMMENT '购买个数',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'order_detail'
);


CREATE TABLE `order_info` (
  `id` bigint NOT NULL COMMENT '编号',
  `consignee` string  NULL COMMENT '收货人',
  `consignee_tel` string  NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2)  NULL COMMENT '总金额',
  `order_status` string  NULL COMMENT '订单状态',
  `user_id` bigint  NULL COMMENT '用户id',
  `payment_way` string  NULL COMMENT '付款方式',
  `delivery_address` string  NULL COMMENT '送货地址',
  `order_comment` string  NULL COMMENT '订单备注',
  `out_trade_no` string  NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` string  NULL COMMENT '订单描述(第三方支付用)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  `operate_time` timestamp(3)  NULL COMMENT '操作时间',
  `expire_time` timestamp(3)  NULL COMMENT '失效时间',
  `tracking_no` string  NULL COMMENT '物流单编号',
  `parent_order_id` bigint  NULL COMMENT '父订单编号',
  `img_url` string  NULL COMMENT '图片路径',
  `province_id` int  NULL COMMENT '地区',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'order_info'
);


CREATE TABLE `order_status_log` (
  `id` int NOT NULL,
  `order_id` int  NULL,
  `order_status` int  NULL,
  `operate_time` timestamp(3)  NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'order_status_log'
);


CREATE TABLE `payment_info` (
  `id` bigint NOT NULL COMMENT '编号',
  `out_trade_no` string  NULL COMMENT '对外业务编号',
  `order_id` string  NULL COMMENT '订单编号',
  `user_id` string  NULL COMMENT '用户编号',
  `alipay_trade_no` string  NULL COMMENT '支付宝交易流水编号',
  `total_amount` decimal(16,2)  NULL COMMENT '支付金额',
  `subject` string  NULL COMMENT '交易内容',
  `payment_type` string  NULL COMMENT '支付方式',
  `payment_time` string  NULL COMMENT '支付时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'payment_info'
);


CREATE TABLE `sku_info` (
  `id` bigint NOT NULL COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT '价格',
  `sku_name` string  NULL COMMENT 'sku名称',
  `sku_desc` string  NULL COMMENT '商品规格描述',
  `weight` decimal(10,2)  NULL COMMENT '重量',
  `tm_id` bigint  NULL COMMENT '品牌(冗余)',
  `category3_id` bigint  NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` string  NULL COMMENT '默认显示图片(冗余)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'sku_info'
);

CREATE TABLE `user_info` (
  `id` bigint NOT NULL COMMENT '编号',
  `login_name` string  NULL COMMENT '用户名称',
  `nick_name` string  NULL COMMENT '用户昵称',
  `passwd` string  NULL COMMENT '用户密码',
  `name` string  NULL COMMENT '用户姓名',
  `phone_num` string  NULL COMMENT '手机号',
  `email` string  NULL COMMENT '邮箱',
  `head_img` string  NULL COMMENT '头像',
  `user_level` string  NULL COMMENT '用户级别',
  `birthday` date  NULL COMMENT '用户生日',
  `gender` string  NULL COMMENT '性别 M男,F女',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'user_info'
);

kafka sink表建表语句

%flink.ssql

drop table if exists base_category1_topic;
drop table if exists base_category2_topic;
drop table if exists base_category3_topic;
drop table if exists base_province_topic;
drop table if exists base_region_topic;
drop table if exists base_trademark_topic;
drop table if exists date_info_topic;
drop table if exists holiday_info_topic;
drop table if exists holiday_year_topic;
drop table if exists order_detail_topic;
drop table if exists order_info_topic;
drop table if exists order_status_log_topic;
drop table if exists payment_info_topic;
drop table if exists sku_info_topic;
drop table if exists user_info_topic;

CREATE TABLE `base_category1_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category1'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_category2_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '二级分类名称',
  `category1_id` bigint  NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category2'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `base_category3_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '三级分类名称',
  `category2_id` bigint  NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category3'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_province_topic` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT '省名称',
  `region_id` int  NULL COMMENT '大区id',
  `area_code` string  NULL COMMENT '行政区位码'
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_province'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_region_topic` (
  `id` int NOT NULL COMMENT '大区id',
  `region_name` string  NULL COMMENT '大区名称',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_region'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_trademark_topic` (
  `tm_id` string  NULL COMMENT '品牌id',
  `tm_name` string  NULL COMMENT '品牌名称',
  PRIMARY KEY (`tm_id`) NOT ENFORCED

)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_trademark'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `date_info_topic` (
  `date_id` int NOT NULL,
  `week_id` int  NULL,
  `week_day` int  NULL,
  `day` int  NULL,
  `month` int  NULL,
  `quarter` int  NULL,
  `year` int  NULL,
  `is_workday` int  NULL,
  `holiday_id` int  NULL,
  PRIMARY KEY (`date_id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.date_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `holiday_info_topic` (
  `holiday_id` int NOT NULL,
  `holiday_name` string  NULL,
  PRIMARY KEY (`holiday_id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.holiday_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `holiday_year_topic` (
  `year_id` int  NULL,
  `holiday_id` int  NULL,
  `start_date_id` int  NULL,
  `end_date_id` int  NULL
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.holiday_year'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `order_detail_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `order_id` bigint  NULL COMMENT '订单编号',
  `sku_id` bigint  NULL COMMENT 'sku_id',
  `sku_name` string  NULL COMMENT 'sku名称(冗余)',
  `img_url` string  NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2)  NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` string  NULL COMMENT '购买个数',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_detail'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `order_info_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `consignee` string  NULL COMMENT '收货人',
  `consignee_tel` string  NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2)  NULL COMMENT '总金额',
  `order_status` string  NULL COMMENT '订单状态',
  `user_id` bigint  NULL COMMENT '用户id',
  `payment_way` string  NULL COMMENT '付款方式',
  `delivery_address` string  NULL COMMENT '送货地址',
  `order_comment` string  NULL COMMENT '订单备注',
  `out_trade_no` string  NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` string  NULL COMMENT '订单描述(第三方支付用)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  `operate_time` timestamp(3)  NULL COMMENT '操作时间',
  `expire_time` timestamp(3)  NULL COMMENT '失效时间',
  `tracking_no` string  NULL COMMENT '物流单编号',
  `parent_order_id` bigint  NULL COMMENT '父订单编号',
  `img_url` string  NULL COMMENT '图片路径',
  `province_id` int  NULL COMMENT '地区',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `order_status_log_topic` (
  `id` int NOT NULL ,
  `order_id` int  NULL,
  `order_status` int  NULL,
  `operate_time` timestamp(3)  NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_status_log'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `payment_info_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `out_trade_no` string  NULL COMMENT '对外业务编号',
  `order_id` string  NULL COMMENT '订单编号',
  `user_id` string  NULL COMMENT '用户编号',
  `alipay_trade_no` string  NULL COMMENT '支付宝交易流水编号',
  `total_amount` decimal(16,2)  NULL COMMENT '支付金额',
  `subject` string  NULL COMMENT '交易内容',
  `payment_type` string  NULL COMMENT '支付方式',
  `payment_time` string  NULL COMMENT '支付时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.payment_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `sku_info_topic` (
  `id` bigint NOT NULL  COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT '价格',
  `sku_name` string  NULL COMMENT 'sku名称',
  `sku_desc` string  NULL COMMENT '商品规格描述',
  `weight` decimal(10,2)  NULL COMMENT '重量',
  `tm_id` bigint  NULL COMMENT '品牌(冗余)',
  `category3_id` bigint  NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` string  NULL COMMENT '默认显示图片(冗余)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.sku_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `user_info_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `login_name` string  NULL COMMENT '用户名称',
  `nick_name` string  NULL COMMENT '用户昵称',
  `passwd` string  NULL COMMENT '用户密码',
  `name` string  NULL COMMENT '用户姓名',
  `phone_num` string  NULL COMMENT '手机号',
  `email` string  NULL COMMENT '邮箱',
  `head_img` string  NULL COMMENT '头像',
  `user_level` string  NULL COMMENT '用户级别',
  `birthday` date  NULL COMMENT '用户生日',
  `gender` varchar(1)  NULL COMMENT '性别 M男,F女',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.user_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

insert语句,将mysql binlog数据导入kafka对应的topic

%flink.ssql(runAsOne=true)
insert into base_category1_topic select * from base_category1;
insert into base_category2_topic select * from base_category2;
insert into base_category3_topic select * from base_category3;
insert into base_province_topic select * from base_province;
insert into base_region_topic select * from base_region;
insert into base_trademark_topic select * from base_trademark;
insert into date_info_topic select * from date_info;
insert into holiday_info_topic select * from holiday_info;
insert into holiday_year_topic select * from holiday_year;
insert into order_detail_topic select * from order_detail;
insert into order_info_topic select * from order_info;
insert into order_status_log_topic select * from order_status_log;
insert into payment_info_topic select * from payment_info;
insert into sku_info_topic select * from sku_info;
insert into user_info_topic select * from user_info;

将维表数据导入hudi

将my5.base_province和my1.base_region两张区域维表信息写入hudi COW表中

%flink.ssql
drop table if exists base_province_topic_source;
drop table if exists base_province_hudi;

CREATE TABLE `base_province_topic_source` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT '省名称',
  `region_id` int  NULL COMMENT '大区id',
  `area_code` string  NULL COMMENT '行政区位码'
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_province'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_province_hudi` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT '省名称',
  `region_id` int  NULL COMMENT '大区id',
  `area_code` string  NULL COMMENT '行政区位码',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_province_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into base_province_hudi select * from base_province_topic_source;
%flink.ssql
drop table if exists base_region_topic_source;
drop table if exists base_region_hudi;

CREATE TABLE `base_region_topic_source` (
  `id` int NOT NULL COMMENT '大区id',
  `region_name` string  NULL COMMENT '大区名称',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_region'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_region_hudi` (
  `id` int NOT NULL COMMENT '大区id',
  `region_name` string  NULL COMMENT '大区名称',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_region_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);

%flink.ssql
insert into base_region_hudi select * from base_region_topic_source;

使用上述两张维表创建dim_province表

%flink.ssql
DROP TABLE IF EXISTS dim_province_hudi;
create table dim_province_hudi (
  province_id INT,
  province_name STRING,
  area_code STRING,
  region_id INT,
  region_name STRING ,
  PRIMARY KEY (province_id) NOT ENFORCED
) with (
    'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/dim_province_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'province_id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql

insert into dim_province_hudi
SELECT
  bp.id AS province_id,
  bp.name AS province_name,
  bp.area_code AS area_code,
  br.id AS region_id,
  br.region_name AS region_name
FROM base_region_hudi br 
     JOIN base_province_hudi bp ON br.id= bp.region_id
;

将商品维表my5.base_category1和my5.base_category2两张商品维表信息写入hudi COW表

%flink.ssql
drop table if exists base_category1_topic_source;
drop table if exists base_category1_hudi;
CREATE TABLE `base_category1_topic_source` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category1'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_category1_hudi` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_category1_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);

%flink.ssql
insert into base_category1_hudi select * from base_category1_topic_source;
%flink.ssql
drop table if exists base_category2_topic_source;
drop table if exists base_category2_hudi;
CREATE TABLE `base_category2_topic_source` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  `category1_id` bigint NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category2'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_category2_hudi` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  `category1_id` bigint NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_category2_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);

%flink.ssql
insert into base_category2_hudi select * from base_category2_topic_source;
%flink.ssql
drop table if exists base_category3_topic_source;
drop table if exists base_category3_hudi;
CREATE TABLE `base_category3_topic_source` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  `category2_id` bigint NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category3'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_category3_hudi` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  `category2_id` bigint NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_category3_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);

%flink.ssql
insert into base_category3_hudi select * from base_category3_topic_source;

将商品表导入hudi

%flink.ssql
drop table if exists sku_info_topic_source;
drop table if exists sku_info_topic_hudi;

CREATE TABLE `sku_info_topic_source` (
  `id` bigint NOT NULL  COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT '价格',
  `sku_name` string  NULL COMMENT 'sku名称',
  `sku_desc` string  NULL COMMENT '商品规格描述',
  `weight` decimal(10,2)  NULL COMMENT '重量',
  `tm_id` bigint  NULL COMMENT '品牌(冗余)',
  `category3_id` bigint  NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` string  NULL COMMENT '默认显示图片(冗余)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.sku_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `sku_info_topic_hudi` (
  `id` bigint NOT NULL  COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT '价格',
  `sku_name` string  NULL COMMENT 'sku名称',
  `sku_desc` string  NULL COMMENT '商品规格描述',
  `weight` decimal(10,2)  NULL COMMENT '重量',
  `tm_id` bigint  NULL COMMENT '品牌(冗余)',
  `category3_id` bigint  NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` string  NULL COMMENT '默认显示图片(冗余)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/sku_info_topic_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into sku_info_topic_hudi select * from sku_info_topic_source;

基于上述步骤,我们把商品维表的基础数据同步到hudi中,同样我们使用商品维表创建dim_sku_info视图

%flink.ssql
drop view if exists dim_sku_info;
CREATE VIEW dim_sku_info AS
SELECT
  si.id AS id,
  si.sku_name AS sku_name,
  si.category3_id AS c3_id,
  si.weight AS weight,
  si.tm_id AS tm_id,
  si.price AS price,
  si.spu_id AS spu_id,
  c3.name AS c3_name,
  c2.id AS c2_id,
  c2.name AS c2_name,
  c3.id AS c1_id,
  c3.name AS c1_name
FROM
  sku_info_topic_hudi si 
  JOIN base_category3_hudi c3 ON si.category3_id = c3.id
  JOIN base_category2_hudi c2 ON c3.category2_id =c2.id
  JOIN base_category1_hudi c1 ON c2.category1_id = c1.id
;

DWD层数据处理

经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。

%flink.ssql
drop table if exists ods_order_detail_topic;
drop table if exists ods_order_info_topic;
drop table if exists dwd_paid_order_detail_hudi;

CREATE TABLE `ods_order_detail_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `order_id` bigint  NULL COMMENT '订单编号',
  `sku_id` bigint  NULL COMMENT 'sku_id',
  `sku_name` string  NULL COMMENT 'sku名称(冗余)',
  `img_url` string  NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2)  NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` int  NULL COMMENT '购买个数',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_detail'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset' 
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `ods_order_info_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `consignee` string  NULL COMMENT '收货人',
  `consignee_tel` string  NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2)  NULL COMMENT '总金额',
  `order_status` string  NULL COMMENT '订单状态',
  `user_id` bigint  NULL COMMENT '用户id',
  `payment_way` string  NULL COMMENT '付款方式',
  `delivery_address` string  NULL COMMENT '送货地址',
  `order_comment` string  NULL COMMENT '订单备注',
  `out_trade_no` string  NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` string  NULL COMMENT '订单描述(第三方支付用)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  `operate_time` timestamp(3)  NULL COMMENT '操作时间',
  `expire_time` timestamp(3)  NULL COMMENT '失效时间',
  `tracking_no` string  NULL COMMENT '物流单编号',
  `parent_order_id` bigint  NULL COMMENT '父订单编号',
  `img_url` string  NULL COMMENT '图片路径',
  `province_id` int  NULL COMMENT '地区',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset' 
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE dwd_paid_order_detail_hudi
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,0),
  create_time TIMESTAMP(3),
  pay_time  TIMESTAMP(3),
  primary key (detail_id) not enforced
 ) WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/dwd_paid_order_detail_hudi',
    'scan.startup.mode' = 'earliest-offset',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);
%flink.ssql

insert into dwd_paid_order_detail_hudi
SELECT
  od.id,
  oi.id order_id,
  oi.user_id,
  oi.province_id,
  od.sku_id,
  od.sku_name,
  od.sku_num,
  od.order_price,
  oi.create_time,
  oi.operate_time
FROM
    (
    SELECT * 
    FROM ods_order_info_topic
    WHERE order_status = '2' -- 已支付
    ) oi JOIN
    (
    SELECT *
    FROM ods_order_detail_topic
    ) od 
    ON oi.id = od.order_id;

ADS层数据

经过上面的步骤,我们创建了一张dwd_paid_order_detail明细宽表,并将该表存储在了hudi中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。

ads_province_index_hudi

%flink.ssql
drop table if exists ads_province_index_hudi;
drop table if exists tmp_province_index_hudi;

-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个省份的订单数
--      2.每天每个省份的订单金额
-- ---------------------------------

CREATE TABLE ads_province_index_hudi(
  province_id INT,
  area_code STRING,
  province_name STRING,
  region_id INT,
  region_name STRING,
  order_amount DECIMAL(10,2),
  order_count BIGINT,
  dt STRING,
  PRIMARY KEY (province_id, dt) NOT ENFORCED  
) WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/ads_province_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);

-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表
-- ---------------------------------

CREATE TABLE tmp_province_index_hudi(
    province_id INT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    pay_date DATE,
    primary key(province_id) not enforced
)WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/tmp_province_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'

);

%flink.ssql
-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表数据装载
-- ---------------------------------
INSERT INTO tmp_province_index_hudi
SELECT
      province_id,
      count(distinct order_id) order_count,-- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
      TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date
FROM dwd_paid_order_detail_hudi
GROUP BY province_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql

INSERT INTO ads_province_index_hudi
SELECT
  pc.province_id,
  dp.area_code,
  dp.province_name,
  dp.region_id,
  dp.region_name,
  pc.order_amount,
  pc.order_count,
  cast(pc.pay_date as VARCHAR)
FROM
tmp_province_index_hudi pc
  JOIN dim_province_hudi as dp 
  ON dp.province_id = pc.province_id;

查看ADS层的ads_province_index_hudi表数据:

ads_sku_index_hudi

%flink.ssql

-- ---------------------------------
-- 使用 DDL创建hudi中的ADS层表
-- 指标:1.每天每个商品对应的订单个数
--      2.每天每个商品对应的订单金额
--      3.每天每个商品对应的数量
-- ---------------------------------


drop table if exists ads_sku_index_hudi;
CREATE TABLE ads_sku_index_hudi
(
  sku_id BIGINT,
  sku_name VARCHAR,
  weight DOUBLE,
  tm_id BIGINT,
  price DOUBLE,
  spu_id BIGINT,
  c3_id BIGINT,
  c3_name VARCHAR ,
  c2_id BIGINT,
  c2_name VARCHAR,
  c1_id BIGINT,
  c1_name VARCHAR,
  order_amount DOUBLE,
  order_count BIGINT,
  sku_count BIGINT,
  dt varchar,
  PRIMARY KEY (sku_id,dt) NOT ENFORCED
) with (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/ads_sku_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);

-- ---------------------------------
-- tmp_sku_index
-- 商品指标统计
-- ---------------------------------
drop table if exists tmp_sku_index_hudi;
CREATE TABLE tmp_sku_index_hudi(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    order_sku_num BIGINT,
    pay_date DATE,
    PRIMARY KEY (sku_id) NOT ENFORCED
)WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/tmp_sku_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);
%flink.ssql

-- ---------------------------------
-- tmp_sku_index
-- 数据装载
-- ---------------------------------

INSERT INTO tmp_sku_index_hudi
SELECT
      sku_id,
      count(distinct order_id) order_count, -- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
      sum(sku_num) order_sku_num,
      TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date
FROM dwd_paid_order_detail_hudi
GROUP BY sku_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql
INSERT INTO ads_sku_index_hudi
SELECT
  sku_id ,
  sku_name ,
  weight ,
  tm_id ,
  price ,
  spu_id ,
  c3_id ,
  c3_name,
  c2_id ,
  c2_name ,
  c1_id ,
  c1_name ,
  sc.order_amount,
  sc.order_count ,
  sc.order_sku_num ,
  cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_hudi sc 
  JOIN dim_sku_info ds
  ON ds.id = sc.sku_id
%flink.ssql
select * from ads_sku_index_hudi;

相关文章
|
9天前
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。
|
5月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
318 2
|
9天前
|
SQL 存储 分布式计算
Hologres+Paimon构建一体化实时湖仓
Hologres 3.0全新升级,面向未来的一体化实时湖仓。它支持多种Table Format,提供湖仓存储、多模式计算、分析服务和Data+AI一体的能力。Hologres与Paimon结合,实现统一元数据管理、极速查询性能、增量消费及ETL功能。Dynamic Table支持流式、增量和全量三种刷新模式,满足不同业务需求,实现一份数据、一份SQL、一份计算的多模式刷新。该架构适用于高时效性要求的场景,也可用于成本敏感的数据共享场景。
|
28天前
|
DataWorks 数据挖掘 大数据
方案实践测评 | DataWorks集成Hologres构建一站式高性能的OLAP数据分析
DataWorks在任务开发便捷性、任务运行速度、产品使用门槛等方面都表现出色。在数据处理场景方面仍有改进和扩展的空间,通过引入更多的智能技术、扩展数据源支持、优化任务调度和可视化功能以及提升团队协作效率,DataWorks将能够为企业提供更全面、更高效的数据处理解决方案。
|
2月前
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
488 5
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
2月前
|
消息中间件 人工智能 监控
Paimon x StarRocks 助力喜马拉雅直播实时湖仓构建
本文由喜马拉雅直播业务与仓库建设负责人王琛撰写,介绍了喜马拉雅直播业务的数据仓库架构迭代升级。文章重点分享了基于 Flink + Paimon + StarRocks 实现实时湖仓的架构及其成效,通过分钟级别的收入监控、实时榜单生成、流量监测和盈亏预警,大幅提升了运营效率与决策质量,并为未来的业务扩展和 AI 项目打下坚实基础。
239 5
Paimon x StarRocks 助力喜马拉雅直播实时湖仓构建
|
3月前
|
存储 数据采集 大数据
Flink实时湖仓,为汽车行业数字化加速!
本文由阿里云计算平台产品专家李鲁兵(云觉)分享,聚焦汽车行业大数据应用。内容涵盖市场趋势、典型大数据架构、产品市场地位及能力解读,以及典型客户案例。文章详细介绍了新能源汽车市场的快速增长、大数据架构分析、实时湖仓方案的优势,以及Flink和Paimon在车联网中的应用案例。
209 8
Flink实时湖仓,为汽车行业数字化加速!
|
2月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
79 1
|
2月前
|
SQL 存储 数据挖掘
快速入门:利用AnalyticDB构建实时数据分析平台
【10月更文挑战第22天】在大数据时代,实时数据分析成为了企业和开发者们关注的焦点。传统的数据仓库和分析工具往往无法满足实时性要求,而AnalyticDB(ADB)作为阿里巴巴推出的一款实时数据仓库服务,凭借其强大的实时处理能力和易用性,成为了众多企业的首选。作为一名数据分析师,我将在本文中分享如何快速入门AnalyticDB,帮助初学者在短时间内掌握使用AnalyticDB进行简单数据分析的能力。
61 2
|
3月前
|
存储 SQL 分布式计算
湖仓一体架构深度解析:构建企业级数据管理与分析的新基石
【10月更文挑战第7天】湖仓一体架构深度解析:构建企业级数据管理与分析的新基石
202 1

热门文章

最新文章

下一篇
开通oss服务