基于Flink1.13.3与hudi 0.10.0-release构建准实时数仓

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
简介: 本文基于Flink1.13.3与hudi 0.10.0-release构建准实时数仓

本文基于Flink SQL与hudi构建准实时数仓,在Flink从kafka接入数据之后,即将所有数据存于hudi中,包括所有中间处理数据以及最终数据。文章《实时数仓|基于Flink1.11的SQL构建实时数仓探索实践 (qq.com)》描述了基于Flink SQL与kafka构建的实时数仓,本文以上述文章为基础。

在完成本文实践的同时可以同步参考上述文章。

最终结果:

背景介绍

本文电商业务为例,展示准实时数仓的数据处理流程。

组件与配置说明

Flink 1.13.3

flink cdc 2.0.2

hudi 0.10.0 (2021.12.08最新发布版本,地址:https://github.com/apache/hudi/releases/tag/release-0.10.0)

hadoop 3.2.0

zeppelin 0.10.0

mysql 5.7(开启binlog)

kafka 2.5.0

由于zeppelin的便捷性,本文全部基于zeppelin进行任务提交,如果您还不会用zeppelin,那么您可以参考:https://lrting-top.blog.csdn.net/article/details/120681666。当然,如果您不想用zeppelin,用Flink SQL Client提交也是完全没有问题的。

本实验Flink开启checkpoint,设置为60s。

在完成以下任务之前,请确保您已经

  • 部署好Flink 1.13.3,并将hudi对应的Jar包已经正确打包并且放置到Flink的lib目录下,将flink cdc对应的jar包放置到lib目录下。
  • 部署并启动zeppelin 0.10.0,在zeppelin的Flink interpreter上指定了FLINK_HOME以及HADOOP_CLASSPATH
  • 同时还有启动hadoop、mysql、kafka

处理流程

MySQL建表与原始数据载入

首先从以下地址获取mysql建表语句以及模拟数据:

https://obs-githubhelper.obs.cn-east-3.myhuaweicloud.com/blog-images/category/bigdata/hudi/flink-hudi-realtime/realtime_table.zip

下载了上述建表语句之后,进入mysql,新建realtime_dw_demo_1,进入数据库 realtime_dw_demo_1 ,初始化数据库

mysql -u root -p

create database realtime_dw_demo_1;

use database realtime_dw_demo_1;

source realtime_table.sql

将mysql表数据同步到kafka

使用flink cdc将mysql数据同步到kafka中,以下为相关sql语句:

读取mysql源表数据

%flink.ssql

drop table if exists base_category1;
drop table if exists base_category2;
drop table if exists base_category3;
drop table if exists base_province;
drop table if exists base_region;
drop table if exists base_trademark;
drop table if exists date_info;
drop table if exists holiday_info;
drop table if exists holiday_year;
drop table if exists order_detail;
drop table if exists order_info;
drop table if exists order_status_log;
drop table if exists payment_info;
drop table if exists sku_info;
drop table if exists user_info;

---mysql table

CREATE TABLE `base_category1` (
  `id` bigint NOT NULL,
  `name` string NOT NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_category1'
);

CREATE TABLE `base_category2` (
  `id` bigint NOT NULL COMMENT '编号',
  `name` string NOT NULL COMMENT '二级分类名称',
  `category1_id` bigint  NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_category2'
);


CREATE TABLE `base_category3` (
  `id` bigint NOT NULL COMMENT '编号',
  `name` string NOT NULL COMMENT '三级分类名称',
  `category2_id` bigint  NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_category3'
);

CREATE TABLE `base_province` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT '省名称',
  `region_id` int  NULL COMMENT '大区id',
  `area_code` string  NULL COMMENT '行政区位码',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_province'
);

CREATE TABLE `base_region` (
  `id` int NOT NULL COMMENT '大区id',
  `region_name` string  NULL COMMENT '大区名称',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_region'
);

CREATE TABLE `base_trademark` (
  `tm_id` string  NULL COMMENT '品牌id',
  `tm_name` string  NULL COMMENT '品牌名称',
  PRIMARY KEY (`tm_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_trademark'
);


CREATE TABLE `date_info` (
  `date_id` int NOT NULL,
  `week_id` int  NULL,
  `week_day` int  NULL,
  `day` int  NULL,
  `month` int  NULL,
  `quarter` int  NULL,
  `year` int  NULL,
  `is_workday` int  NULL,
  `holiday_id` int  NULL,
  PRIMARY KEY (`date_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'date_info'
);


CREATE TABLE `holiday_info` (
  `holiday_id` int NOT NULL,
  `holiday_name` string  NULL,
  PRIMARY KEY (`holiday_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'holiday_info'
);

CREATE TABLE `holiday_year` (
  `year_id` int  NULL,
  `holiday_id` int  NULL,
  `start_date_id` int  NULL,
  `end_date_id` int  NULL,
  PRIMARY KEY (`end_date_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'holiday_year'
);

CREATE TABLE `order_detail` (
  `id` bigint NOT NULL COMMENT '编号',
  `order_id` bigint  NULL COMMENT '订单编号',
  `sku_id` bigint  NULL COMMENT 'sku_id',
  `sku_name` string  NULL COMMENT 'sku名称(冗余)',
  `img_url` string  NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2)  NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` string  NULL COMMENT '购买个数',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'order_detail'
);


CREATE TABLE `order_info` (
  `id` bigint NOT NULL COMMENT '编号',
  `consignee` string  NULL COMMENT '收货人',
  `consignee_tel` string  NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2)  NULL COMMENT '总金额',
  `order_status` string  NULL COMMENT '订单状态',
  `user_id` bigint  NULL COMMENT '用户id',
  `payment_way` string  NULL COMMENT '付款方式',
  `delivery_address` string  NULL COMMENT '送货地址',
  `order_comment` string  NULL COMMENT '订单备注',
  `out_trade_no` string  NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` string  NULL COMMENT '订单描述(第三方支付用)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  `operate_time` timestamp(3)  NULL COMMENT '操作时间',
  `expire_time` timestamp(3)  NULL COMMENT '失效时间',
  `tracking_no` string  NULL COMMENT '物流单编号',
  `parent_order_id` bigint  NULL COMMENT '父订单编号',
  `img_url` string  NULL COMMENT '图片路径',
  `province_id` int  NULL COMMENT '地区',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'order_info'
);


CREATE TABLE `order_status_log` (
  `id` int NOT NULL,
  `order_id` int  NULL,
  `order_status` int  NULL,
  `operate_time` timestamp(3)  NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'order_status_log'
);


CREATE TABLE `payment_info` (
  `id` bigint NOT NULL COMMENT '编号',
  `out_trade_no` string  NULL COMMENT '对外业务编号',
  `order_id` string  NULL COMMENT '订单编号',
  `user_id` string  NULL COMMENT '用户编号',
  `alipay_trade_no` string  NULL COMMENT '支付宝交易流水编号',
  `total_amount` decimal(16,2)  NULL COMMENT '支付金额',
  `subject` string  NULL COMMENT '交易内容',
  `payment_type` string  NULL COMMENT '支付方式',
  `payment_time` string  NULL COMMENT '支付时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'payment_info'
);


CREATE TABLE `sku_info` (
  `id` bigint NOT NULL COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT '价格',
  `sku_name` string  NULL COMMENT 'sku名称',
  `sku_desc` string  NULL COMMENT '商品规格描述',
  `weight` decimal(10,2)  NULL COMMENT '重量',
  `tm_id` bigint  NULL COMMENT '品牌(冗余)',
  `category3_id` bigint  NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` string  NULL COMMENT '默认显示图片(冗余)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'sku_info'
);

CREATE TABLE `user_info` (
  `id` bigint NOT NULL COMMENT '编号',
  `login_name` string  NULL COMMENT '用户名称',
  `nick_name` string  NULL COMMENT '用户昵称',
  `passwd` string  NULL COMMENT '用户密码',
  `name` string  NULL COMMENT '用户姓名',
  `phone_num` string  NULL COMMENT '手机号',
  `email` string  NULL COMMENT '邮箱',
  `head_img` string  NULL COMMENT '头像',
  `user_level` string  NULL COMMENT '用户级别',
  `birthday` date  NULL COMMENT '用户生日',
  `gender` string  NULL COMMENT '性别 M男,F女',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'user_info'
);

kafka sink表建表语句

%flink.ssql

drop table if exists base_category1_topic;
drop table if exists base_category2_topic;
drop table if exists base_category3_topic;
drop table if exists base_province_topic;
drop table if exists base_region_topic;
drop table if exists base_trademark_topic;
drop table if exists date_info_topic;
drop table if exists holiday_info_topic;
drop table if exists holiday_year_topic;
drop table if exists order_detail_topic;
drop table if exists order_info_topic;
drop table if exists order_status_log_topic;
drop table if exists payment_info_topic;
drop table if exists sku_info_topic;
drop table if exists user_info_topic;

CREATE TABLE `base_category1_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category1'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_category2_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '二级分类名称',
  `category1_id` bigint  NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category2'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `base_category3_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '三级分类名称',
  `category2_id` bigint  NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category3'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_province_topic` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT '省名称',
  `region_id` int  NULL COMMENT '大区id',
  `area_code` string  NULL COMMENT '行政区位码'
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_province'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_region_topic` (
  `id` int NOT NULL COMMENT '大区id',
  `region_name` string  NULL COMMENT '大区名称',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_region'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_trademark_topic` (
  `tm_id` string  NULL COMMENT '品牌id',
  `tm_name` string  NULL COMMENT '品牌名称',
  PRIMARY KEY (`tm_id`) NOT ENFORCED

)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_trademark'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `date_info_topic` (
  `date_id` int NOT NULL,
  `week_id` int  NULL,
  `week_day` int  NULL,
  `day` int  NULL,
  `month` int  NULL,
  `quarter` int  NULL,
  `year` int  NULL,
  `is_workday` int  NULL,
  `holiday_id` int  NULL,
  PRIMARY KEY (`date_id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.date_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `holiday_info_topic` (
  `holiday_id` int NOT NULL,
  `holiday_name` string  NULL,
  PRIMARY KEY (`holiday_id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.holiday_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `holiday_year_topic` (
  `year_id` int  NULL,
  `holiday_id` int  NULL,
  `start_date_id` int  NULL,
  `end_date_id` int  NULL
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.holiday_year'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `order_detail_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `order_id` bigint  NULL COMMENT '订单编号',
  `sku_id` bigint  NULL COMMENT 'sku_id',
  `sku_name` string  NULL COMMENT 'sku名称(冗余)',
  `img_url` string  NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2)  NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` string  NULL COMMENT '购买个数',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_detail'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `order_info_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `consignee` string  NULL COMMENT '收货人',
  `consignee_tel` string  NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2)  NULL COMMENT '总金额',
  `order_status` string  NULL COMMENT '订单状态',
  `user_id` bigint  NULL COMMENT '用户id',
  `payment_way` string  NULL COMMENT '付款方式',
  `delivery_address` string  NULL COMMENT '送货地址',
  `order_comment` string  NULL COMMENT '订单备注',
  `out_trade_no` string  NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` string  NULL COMMENT '订单描述(第三方支付用)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  `operate_time` timestamp(3)  NULL COMMENT '操作时间',
  `expire_time` timestamp(3)  NULL COMMENT '失效时间',
  `tracking_no` string  NULL COMMENT '物流单编号',
  `parent_order_id` bigint  NULL COMMENT '父订单编号',
  `img_url` string  NULL COMMENT '图片路径',
  `province_id` int  NULL COMMENT '地区',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `order_status_log_topic` (
  `id` int NOT NULL ,
  `order_id` int  NULL,
  `order_status` int  NULL,
  `operate_time` timestamp(3)  NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_status_log'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `payment_info_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `out_trade_no` string  NULL COMMENT '对外业务编号',
  `order_id` string  NULL COMMENT '订单编号',
  `user_id` string  NULL COMMENT '用户编号',
  `alipay_trade_no` string  NULL COMMENT '支付宝交易流水编号',
  `total_amount` decimal(16,2)  NULL COMMENT '支付金额',
  `subject` string  NULL COMMENT '交易内容',
  `payment_type` string  NULL COMMENT '支付方式',
  `payment_time` string  NULL COMMENT '支付时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.payment_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `sku_info_topic` (
  `id` bigint NOT NULL  COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT '价格',
  `sku_name` string  NULL COMMENT 'sku名称',
  `sku_desc` string  NULL COMMENT '商品规格描述',
  `weight` decimal(10,2)  NULL COMMENT '重量',
  `tm_id` bigint  NULL COMMENT '品牌(冗余)',
  `category3_id` bigint  NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` string  NULL COMMENT '默认显示图片(冗余)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.sku_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `user_info_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `login_name` string  NULL COMMENT '用户名称',
  `nick_name` string  NULL COMMENT '用户昵称',
  `passwd` string  NULL COMMENT '用户密码',
  `name` string  NULL COMMENT '用户姓名',
  `phone_num` string  NULL COMMENT '手机号',
  `email` string  NULL COMMENT '邮箱',
  `head_img` string  NULL COMMENT '头像',
  `user_level` string  NULL COMMENT '用户级别',
  `birthday` date  NULL COMMENT '用户生日',
  `gender` varchar(1)  NULL COMMENT '性别 M男,F女',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.user_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

insert语句,将mysql binlog数据导入kafka对应的topic

%flink.ssql(runAsOne=true)
insert into base_category1_topic select * from base_category1;
insert into base_category2_topic select * from base_category2;
insert into base_category3_topic select * from base_category3;
insert into base_province_topic select * from base_province;
insert into base_region_topic select * from base_region;
insert into base_trademark_topic select * from base_trademark;
insert into date_info_topic select * from date_info;
insert into holiday_info_topic select * from holiday_info;
insert into holiday_year_topic select * from holiday_year;
insert into order_detail_topic select * from order_detail;
insert into order_info_topic select * from order_info;
insert into order_status_log_topic select * from order_status_log;
insert into payment_info_topic select * from payment_info;
insert into sku_info_topic select * from sku_info;
insert into user_info_topic select * from user_info;

将维表数据导入hudi

将my5.base_province和my1.base_region两张区域维表信息写入hudi COW表中

%flink.ssql
drop table if exists base_province_topic_source;
drop table if exists base_province_hudi;

CREATE TABLE `base_province_topic_source` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT '省名称',
  `region_id` int  NULL COMMENT '大区id',
  `area_code` string  NULL COMMENT '行政区位码'
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_province'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_province_hudi` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT '省名称',
  `region_id` int  NULL COMMENT '大区id',
  `area_code` string  NULL COMMENT '行政区位码',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_province_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into base_province_hudi select * from base_province_topic_source;
%flink.ssql
drop table if exists base_region_topic_source;
drop table if exists base_region_hudi;

CREATE TABLE `base_region_topic_source` (
  `id` int NOT NULL COMMENT '大区id',
  `region_name` string  NULL COMMENT '大区名称',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_region'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_region_hudi` (
  `id` int NOT NULL COMMENT '大区id',
  `region_name` string  NULL COMMENT '大区名称',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_region_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);

%flink.ssql
insert into base_region_hudi select * from base_region_topic_source;

使用上述两张维表创建dim_province表

%flink.ssql
DROP TABLE IF EXISTS dim_province_hudi;
create table dim_province_hudi (
  province_id INT,
  province_name STRING,
  area_code STRING,
  region_id INT,
  region_name STRING ,
  PRIMARY KEY (province_id) NOT ENFORCED
) with (
    'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/dim_province_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'province_id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql

insert into dim_province_hudi
SELECT
  bp.id AS province_id,
  bp.name AS province_name,
  bp.area_code AS area_code,
  br.id AS region_id,
  br.region_name AS region_name
FROM base_region_hudi br 
     JOIN base_province_hudi bp ON br.id= bp.region_id
;

将商品维表my5.base_category1和my5.base_category2两张商品维表信息写入hudi COW表

%flink.ssql
drop table if exists base_category1_topic_source;
drop table if exists base_category1_hudi;
CREATE TABLE `base_category1_topic_source` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category1'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_category1_hudi` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_category1_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);

%flink.ssql
insert into base_category1_hudi select * from base_category1_topic_source;
%flink.ssql
drop table if exists base_category2_topic_source;
drop table if exists base_category2_hudi;
CREATE TABLE `base_category2_topic_source` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  `category1_id` bigint NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category2'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_category2_hudi` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  `category1_id` bigint NULL COMMENT '一级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_category2_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);

%flink.ssql
insert into base_category2_hudi select * from base_category2_topic_source;
%flink.ssql
drop table if exists base_category3_topic_source;
drop table if exists base_category3_hudi;
CREATE TABLE `base_category3_topic_source` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  `category2_id` bigint NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category3'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_category3_hudi` (
  `id` bigint NOT NULL  COMMENT '编号',
  `name` string NOT NULL COMMENT '分类名称',
  `category2_id` bigint NULL COMMENT '二级分类编号',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_category3_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);

%flink.ssql
insert into base_category3_hudi select * from base_category3_topic_source;

将商品表导入hudi

%flink.ssql
drop table if exists sku_info_topic_source;
drop table if exists sku_info_topic_hudi;

CREATE TABLE `sku_info_topic_source` (
  `id` bigint NOT NULL  COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT '价格',
  `sku_name` string  NULL COMMENT 'sku名称',
  `sku_desc` string  NULL COMMENT '商品规格描述',
  `weight` decimal(10,2)  NULL COMMENT '重量',
  `tm_id` bigint  NULL COMMENT '品牌(冗余)',
  `category3_id` bigint  NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` string  NULL COMMENT '默认显示图片(冗余)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.sku_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `sku_info_topic_hudi` (
  `id` bigint NOT NULL  COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT '价格',
  `sku_name` string  NULL COMMENT 'sku名称',
  `sku_desc` string  NULL COMMENT '商品规格描述',
  `weight` decimal(10,2)  NULL COMMENT '重量',
  `tm_id` bigint  NULL COMMENT '品牌(冗余)',
  `category3_id` bigint  NULL COMMENT '三级分类id(冗余)',
  `sku_default_img` string  NULL COMMENT '默认显示图片(冗余)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/sku_info_topic_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into sku_info_topic_hudi select * from sku_info_topic_source;

基于上述步骤,我们把商品维表的基础数据同步到hudi中,同样我们使用商品维表创建dim_sku_info视图

%flink.ssql
drop view if exists dim_sku_info;
CREATE VIEW dim_sku_info AS
SELECT
  si.id AS id,
  si.sku_name AS sku_name,
  si.category3_id AS c3_id,
  si.weight AS weight,
  si.tm_id AS tm_id,
  si.price AS price,
  si.spu_id AS spu_id,
  c3.name AS c3_name,
  c2.id AS c2_id,
  c2.name AS c2_name,
  c3.id AS c1_id,
  c3.name AS c1_name
FROM
  sku_info_topic_hudi si 
  JOIN base_category3_hudi c3 ON si.category3_id = c3.id
  JOIN base_category2_hudi c2 ON c3.category2_id =c2.id
  JOIN base_category1_hudi c1 ON c2.category1_id = c1.id
;

DWD层数据处理

经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。

%flink.ssql
drop table if exists ods_order_detail_topic;
drop table if exists ods_order_info_topic;
drop table if exists dwd_paid_order_detail_hudi;

CREATE TABLE `ods_order_detail_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `order_id` bigint  NULL COMMENT '订单编号',
  `sku_id` bigint  NULL COMMENT 'sku_id',
  `sku_name` string  NULL COMMENT 'sku名称(冗余)',
  `img_url` string  NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2)  NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` int  NULL COMMENT '购买个数',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_detail'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset' 
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `ods_order_info_topic` (
  `id` bigint NOT NULL  COMMENT '编号',
  `consignee` string  NULL COMMENT '收货人',
  `consignee_tel` string  NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2)  NULL COMMENT '总金额',
  `order_status` string  NULL COMMENT '订单状态',
  `user_id` bigint  NULL COMMENT '用户id',
  `payment_way` string  NULL COMMENT '付款方式',
  `delivery_address` string  NULL COMMENT '送货地址',
  `order_comment` string  NULL COMMENT '订单备注',
  `out_trade_no` string  NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` string  NULL COMMENT '订单描述(第三方支付用)',
  `create_time` timestamp(3)  NULL COMMENT '创建时间',
  `operate_time` timestamp(3)  NULL COMMENT '操作时间',
  `expire_time` timestamp(3)  NULL COMMENT '失效时间',
  `tracking_no` string  NULL COMMENT '物流单编号',
  `parent_order_id` bigint  NULL COMMENT '父订单编号',
  `img_url` string  NULL COMMENT '图片路径',
  `province_id` int  NULL COMMENT '地区',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset' 
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE dwd_paid_order_detail_hudi
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,0),
  create_time TIMESTAMP(3),
  pay_time  TIMESTAMP(3),
  primary key (detail_id) not enforced
 ) WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/dwd_paid_order_detail_hudi',
    'scan.startup.mode' = 'earliest-offset',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);
%flink.ssql

insert into dwd_paid_order_detail_hudi
SELECT
  od.id,
  oi.id order_id,
  oi.user_id,
  oi.province_id,
  od.sku_id,
  od.sku_name,
  od.sku_num,
  od.order_price,
  oi.create_time,
  oi.operate_time
FROM
    (
    SELECT * 
    FROM ods_order_info_topic
    WHERE order_status = '2' -- 已支付
    ) oi JOIN
    (
    SELECT *
    FROM ods_order_detail_topic
    ) od 
    ON oi.id = od.order_id;

ADS层数据

经过上面的步骤,我们创建了一张dwd_paid_order_detail明细宽表,并将该表存储在了hudi中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。

ads_province_index_hudi

%flink.ssql
drop table if exists ads_province_index_hudi;
drop table if exists tmp_province_index_hudi;

-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个省份的订单数
--      2.每天每个省份的订单金额
-- ---------------------------------

CREATE TABLE ads_province_index_hudi(
  province_id INT,
  area_code STRING,
  province_name STRING,
  region_id INT,
  region_name STRING,
  order_amount DECIMAL(10,2),
  order_count BIGINT,
  dt STRING,
  PRIMARY KEY (province_id, dt) NOT ENFORCED  
) WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/ads_province_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);

-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表
-- ---------------------------------

CREATE TABLE tmp_province_index_hudi(
    province_id INT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    pay_date DATE,
    primary key(province_id) not enforced
)WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/tmp_province_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'

);

%flink.ssql
-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表数据装载
-- ---------------------------------
INSERT INTO tmp_province_index_hudi
SELECT
      province_id,
      count(distinct order_id) order_count,-- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
      TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date
FROM dwd_paid_order_detail_hudi
GROUP BY province_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql

INSERT INTO ads_province_index_hudi
SELECT
  pc.province_id,
  dp.area_code,
  dp.province_name,
  dp.region_id,
  dp.region_name,
  pc.order_amount,
  pc.order_count,
  cast(pc.pay_date as VARCHAR)
FROM
tmp_province_index_hudi pc
  JOIN dim_province_hudi as dp 
  ON dp.province_id = pc.province_id;

查看ADS层的ads_province_index_hudi表数据:

ads_sku_index_hudi

%flink.ssql

-- ---------------------------------
-- 使用 DDL创建hudi中的ADS层表
-- 指标:1.每天每个商品对应的订单个数
--      2.每天每个商品对应的订单金额
--      3.每天每个商品对应的数量
-- ---------------------------------


drop table if exists ads_sku_index_hudi;
CREATE TABLE ads_sku_index_hudi
(
  sku_id BIGINT,
  sku_name VARCHAR,
  weight DOUBLE,
  tm_id BIGINT,
  price DOUBLE,
  spu_id BIGINT,
  c3_id BIGINT,
  c3_name VARCHAR ,
  c2_id BIGINT,
  c2_name VARCHAR,
  c1_id BIGINT,
  c1_name VARCHAR,
  order_amount DOUBLE,
  order_count BIGINT,
  sku_count BIGINT,
  dt varchar,
  PRIMARY KEY (sku_id,dt) NOT ENFORCED
) with (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/ads_sku_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);

-- ---------------------------------
-- tmp_sku_index
-- 商品指标统计
-- ---------------------------------
drop table if exists tmp_sku_index_hudi;
CREATE TABLE tmp_sku_index_hudi(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    order_sku_num BIGINT,
    pay_date DATE,
    PRIMARY KEY (sku_id) NOT ENFORCED
)WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/tmp_sku_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);
%flink.ssql

-- ---------------------------------
-- tmp_sku_index
-- 数据装载
-- ---------------------------------

INSERT INTO tmp_sku_index_hudi
SELECT
      sku_id,
      count(distinct order_id) order_count, -- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
      sum(sku_num) order_sku_num,
      TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date
FROM dwd_paid_order_detail_hudi
GROUP BY sku_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql
INSERT INTO ads_sku_index_hudi
SELECT
  sku_id ,
  sku_name ,
  weight ,
  tm_id ,
  price ,
  spu_id ,
  c3_id ,
  c3_name,
  c2_id ,
  c2_name ,
  c1_id ,
  c1_name ,
  sc.order_amount,
  sc.order_count ,
  sc.order_sku_num ,
  cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_hudi sc 
  JOIN dim_sku_info ds
  ON ds.id = sc.sku_id
%flink.ssql
select * from ads_sku_index_hudi;

相关文章
|
12天前
|
SQL 分布式计算 数据库
畅捷通基于Flink的实时数仓落地实践
本文整理自畅捷通总架构师、阿里云MVP专家郑芸老师在 Flink Forward Asia 2023 中闭门会上的分享。
8200 14
畅捷通基于Flink的实时数仓落地实践
|
1天前
|
SQL 关系型数据库 MySQL
实时数仓 Hologres操作报错合集之Flink CTAS Source(Mysql) 表字段从可空改为非空的原因是什么
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
21天前
|
SQL 关系型数据库 MySQL
如何在Dataphin中构建Flink+Paimon流式湖仓方案
当前大数据处理工业界非常重要的一个大趋势是一体化,尤其是湖仓一体架构。与过去分散的数据仓库和数据湖不同,湖仓一体架构通过将数据存储和处理融为一体,不仅提升了数据访问速度和处理效率,还简化了数据管理流程,降低了资源成本。企业可以更轻松地实现数据治理和分析,从而快速决策。paimon是国内开源的,也是最年轻的成员。 本文主要演示如何在 Dataphin 产品中构建 Flink+Paimon 的流式湖仓方案。
7278 1
如何在Dataphin中构建Flink+Paimon流式湖仓方案
|
1天前
|
SQL 关系型数据库 测试技术
实时数仓 Hologres操作报错合集之执行Flink的sink操作时出现报错,是什么原因
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
18天前
|
存储 运维 Cloud Native
"Flink+Paimon:阿里云大数据云原生运维数仓的创新实践,引领实时数据处理新纪元"
【8月更文挑战第2天】Flink+Paimon在阿里云大数据云原生运维数仓的实践
180 3
|
1天前
|
存储 SQL Java
实时数仓 Hologres产品使用合集之如何使用Flink的sink连接
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
23天前
|
存储 数据挖掘 BI
数据仓库深度解析与实时数仓应用案例探析
随着数据量的不断增长和数据应用的广泛深入,数据治理和隐私保护将成为数据仓库建设的重要议题。企业需要建立完善的数据治理体系,确保数据的准确性、一致性和完整性;同时加强隐私保护机制建设,确保敏感数据的安全性和合规性。
128 55
|
6天前
|
消息中间件 存储 大数据
大数据-数据仓库-实时数仓架构分析
大数据-数据仓库-实时数仓架构分析
22 1
|
23天前
|
存储 消息中间件 数据挖掘
数据仓库的深度探索与实时数仓应用案例解析
大数据技术的发展,使得数据仓库能够支持大量和复杂数据类型(如文本、图像、视频、音频等)。数据湖作为一种新的数据存储架构,强调原始数据的全面保留和灵活访问,与数据仓库形成互补,共同支持企业的数据分析需求。
|
2月前
|
Cloud Native 数据管理 OLAP
云原生数据仓库AnalyticDB产品使用合集之是否可以创建表而不使用分区
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
389 2
云原生数据仓库AnalyticDB产品使用合集之是否可以创建表而不使用分区