在streampark添加flink conf
构建 flink1.16.0 基础镜像从 dockerhub拉取对应版本的镜像
#拉取镜像 docker pull flink:1.16.0-scala_2.12-java8 #打上 tag docker tagflink:1.16.0-scala_2.12-java8 registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8 #push 到公司仓库 docker pushregistry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8
创建 Dockerfile 文件 & target 目录将 flink-oss-fs-hadoop JAR包放置在该目录 Shaded Hadoop OSS file system jar包下载地址:https://repository.apache.org/snapshots/org/apache/paimon/paimon-oss/
.
├── Dockerfile
└── target
└── flink-oss-fs-hadoop-1.16.0.jar
touch Dockerfile mkdir target
#vim Dockerfile FROM registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8 RUN mkdir /opt/flink/plugins/oss-fs-hadoop COPY target/flink-oss-fs-hadoop-1.16.0.jar /opt/flink/plugins/oss-fs-hadoop
#build 基础镜像
docker build -t flink-table-store:v1.16.0 . docker tag flink-table-store:v1.16.0 registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0 docker push registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0
#准备 paimon jar 包
可以在 Apache Repository下载对应版本,需要注意的是要和 flink 大版本保持一致
使用 streampark 平台提交 paimon 任务
前提条件:
Kubernetes 客户端连接配置
Kubernetes RBAC 配置
容器镜像仓库配置 (案例中使用的是阿里云的容器镜像服务个人免费版)
创建挂载 checkpoint/savepoint 的 pvc 资源
Kubernetes 客户端连接配置:
将k8s master节点~/.kube/config配置直接拷贝到streampark服务器的目录,之后在streampark服务器执行以下命令显示k8s集群running代表权限和网络验证成功。
kubectl cluster-info
Kubernetes RBAC 配置
创建streamx命名空间
kubectl create ns streamx
使用default账户创建clusterrolebinding 资源
kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=streamx:default
容器镜像仓库配置
案例中使用阿里云容器镜像服务ACR,也可以使用自建镜像服务harbor代替。
创建命名空间streampark(安全设置需要设置为私有)
在streampark配置镜像仓库,任务构建镜像会推送到镜像仓库
创建k8s secret密钥用来拉取ACR中的镜像 streamparksecret为密钥名称 自定义
kubectl create secret docker-registry streamparksecret --docker-server=registry-vpc.cn-zhangjiakou.aliyuncs.com --docker-username=xxxxxx --docker-password=xxxxxx -n streamx
创建挂载 checkpoint/savepoint 的 pvc 资源
基于阿里云的对象存储OSS做K8S的持久化
OSS CSI 插件:
可以使用 OSS CSI 插件来帮助简化存储管理。您可以使用 csi 配置创建 pv,并且 pvc、pod 像往常一样定义,yaml 文件参考:https://bondextest.oss-cn-zhangjiakou.aliyuncs.com/ossyaml.zip
配置要求:
- 创建具有所需 RBAC 权限的服务帐户
参考:https://github.com/kubernetes-sigs/alibaba-cloud-csi-driver/blob/master/docs/oss.md
kubectl -f rbac.yaml
- 部署OSS CSI 插件
kubectl -f oss-plugin.yaml
- 创建CP&SP的PV
kubectl -f checkpoints_pv.yaml kubectl -f savepoints_pv.yaml
- 创建CP&SP的PVC
kubectl -f checkpoints_pvc.yaml kubectl -f savepoints_pvc.yaml
配置好依赖环境,接下来我们就开始使用paimon进行流式数仓的分层开发。
案例:
统计海运空运实时委托单量
任务提交:
初始化paimon catalog配置
SET 'execution.runtime-mode' = 'streaming'; set 'table.exec.sink.upsert-materialize' = 'none'; SET 'sql-client.execution.result-mode' = 'tableau'; -- 创建并使用 FTS Catalog 底层存储方案采用阿里云oss CREATE CATALOG `table_store` WITH ( 'type' = 'paimon', 'warehouse' = 'oss://xxxxx/xxxxx' #自定义oss存储路径 ); USE CATALOG `table_store`;
一个任务同时抽取postgres、mysql、sqlserver三种数据库的表数据写入到paimon
Development Mode:Flink SQL
Execution Mode :kubernetes application
Flink Version :flink-1.16.0-scala-2.12
Kubernetes Namespace :streamx
Kubernetes ClusterId :(任务名自定义即可)
Flink Base Docker Image : registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0 #上传到阿里云镜像仓库的基础镜像
Rest-Service Exposed Type:NodePort
paimon基础依赖包:
paimon-flink-1.16-0.4-20230424.001927-40.jar
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
flinkcdc依赖包下载地址:
https://github.com/ververica/flink-cdc-connectors/releases/tag/release-2.2.0
pod template
apiVersion: v1 kind: Pod metadata: name: pod-template spec: containers: - name: flink-main-container volumeMounts: - name: flink-checkpoints-csi-pvc mountPath: /opt/flink/checkpoints - name: flink-savepoints-csi-pvc mountPath: /opt/flink/savepoints volumes: - name: flink-checkpoints-csi-pvc persistentVolumeClaim: claimName: flink-checkpoints-csi-pvc - name: flink-savepoints-csi-pvc persistentVolumeClaim: claimName: flink-savepoints-csi-pvc imagePullSecrets: - name: streamparksecret
flink sql:
1. 构建源表和paimon中ods表的关系,这里就是源表和目标表一对一映射
-- postgre数据库 示例 CREATE TEMPORARY TABLE `shy_doc_hdworkdochd` ( `doccode` varchar(50) not null COMMENT '主键', `businessmodel` varchar(450) COMMENT '业务模式', `businesstype` varchar(450) COMMENT '业务性质', `transporttype` varchar(50) COMMENT '运输类型', ...... `bookingguid` varchar(50) COMMENT '操作编号', PRIMARY KEY (`doccode`) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = '数据库服务器IP地址', 'port' = '端口号', 'username' = '用户名', 'password' = '密码', 'database-name' = '数据库名', 'schema-name' = 'dev', 'decoding.plugin.name' = 'wal2json',, 'table-name' = 'doc_hdworkdochd', 'debezium.slot.name' = 'hdworkdochdslotname03' ); CREATE TEMPORARY TABLE `shy_base_enterprise` ( `entguid` varchar(50) not null COMMENT '主键', `entorgcode` varchar(450) COMMENT '客户编号', `entnature` varchar(450) COMMENT '客户类型', `entfullname` varchar(50) COMMENT '客户名称', PRIMARY KEY (`entguid`,`entorgcode`) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = '数据库服务器IP地址', 'port' = '端口号', 'username' = '用户名', 'password' = '密码', 'database-name' = '数据库名', 'schema-name' = 'dev', 'decoding.plugin.name' = 'wal2json', 'table-name' = 'base_enterprise', 'debezium.snapshot.mode'='never', -- 增量同步(全量+增量忽略该属性) 'debezium.slot.name' = 'base_enterprise_slotname03' ); -- 根据源表结构在paimon上ods层创建对应的目标表 CREATE TABLE IF NOT EXISTS ods.`ods_shy_jh_doc_hdworkdochd` ( `o_year` BIGINT NOT NULL COMMENT '分区字段', `create_date` timestamp NOT NULL COMMENT '创建时间', PRIMARY KEY (`o_year`, `doccode`) NOT ENFORCED ) PARTITIONED BY (`o_year`) WITH ( 'changelog-producer.compaction-interval' = '2m' ) LIKE `shy_doc_hdworkdochd` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS); CREATE TABLE IF NOT EXISTS ods.`ods_shy_base_enterprise` ( `create_date` timestamp NOT NULL COMMENT '创建时间', PRIMARY KEY (`entguid`,`entorgcode`) NOT ENFORCED ) WITH ( 'changelog-producer.compaction-interval' = '2m' ) LIKE `shy_base_enterprise` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS); -- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中 SET 'pipeline.name' = 'ods_doc_hdworkdochd'; INSERT INTO ods.`ods_shy_jh_doc_hdworkdochd` SELECT * ,YEAR(`docdate`) AS `o_year` ,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date` FROM `shy_doc_hdworkdochd` where `docdate` is not null and `docdate` > '2023-01-01'; SET 'pipeline.name' = 'ods_shy_base_enterprise'; INSERT INTO ods.`ods_shy_base_enterprise` SELECT * ,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date` FROM `shy_base_enterprise` where entorgcode is not null and entorgcode <> ''; -- mysql数据库 示例 CREATE TEMPORARY TABLE `doc_order` ( `id` BIGINT NOT NULL COMMENT '主键', `order_no` varchar(50) NOT NULL COMMENT '订单号', `business_no` varchar(50) COMMENT 'OMS服务号', ...... `is_deleted` int COMMENT '是否作废', PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '数据库服务器地址', 'port' = '端口号', 'username' = '用户名', 'password' = '密码', 'database-name' = '库名', 'table-name' = 'doc_order' ); -- 根据源表结构在paimon上ods层创建对应的目标表 CREATE TABLE IF NOT EXISTS ods.`ods_bondexsea_doc_order` ( `o_year` BIGINT NOT NULL COMMENT '分区字段', `create_date` timestamp NOT NULL COMMENT '创建时间', PRIMARY KEY (`o_year`, `id`) NOT ENFORCED ) PARTITIONED BY (`o_year`) WITH ( 'changelog-producer.compaction-interval' = '2m' ) LIKE `doc_order` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS); -- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中 SET 'pipeline.name' = 'ods_bondexsea_doc_order'; INSERT INTO ods.`ods_bondexsea_doc_order` SELECT * ,YEAR(`gmt_create`) AS `o_year` ,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date` FROM `doc_order` where gmt_create > '2023-01-01'; -- sqlserver数据库 示例 CREATE TEMPORARY TABLE `OrderHAWB` ( `HBLIndex` varchar(50) NOT NULL COMMENT '主键', `CustomerNo` varchar(50) COMMENT '客户编号', ...... `CreateOPDate` timestamp COMMENT '制单日期', PRIMARY KEY (`HBLIndex`) NOT ENFORCED ) WITH ( 'connector' = 'sqlserver-cdc', 'hostname' = '数据库服务器地址', 'port' = '端口号', 'username' = '用户名', 'password' = '密码', 'database-name' = '数据库名', 'schema-name' = 'dbo', -- 'debezium.snapshot.mode' = 'initial' -- 全量增量都抽取 'scan.startup.mode' = 'latest-offset',-- 只抽取增量数据 'table-name' = 'OrderHAWB' ); -- 根据源表结构在paimon上ods层创建对应的目标表 CREATE TABLE IF NOT EXISTS ods.`ods_airsea_airfreight_orderhawb` ( `o_year` BIGINT NOT NULL COMMENT '分区字段', `create_date` timestamp NOT NULL COMMENT '创建时间', PRIMARY KEY (`o_year`, `HBLIndex`) NOT ENFORCED ) PARTITIONED BY (`o_year`) WITH ( 'changelog-producer.compaction-interval' = '2m' ) LIKE `OrderHAWB` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS); -- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中 SET 'pipeline.name' = 'ods_airsea_airfreight_orderhawb'; INSERT INTO ods.`ods_airsea_airfreight_orderhawb` SELECT RTRIM(`HBLIndex`) as `HBLIndex` ...... ,`CreateOPDate` ,YEAR(`CreateOPDate`) AS `o_year` ,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date` FROM `OrderHAWB` where CreateOPDate > '2023-01-01';
业务表数据实时写入paimon ods表效果如下:
2. 将ods层表的数据打宽写入dwd层中,这里其实就是将ods层相关业务表合并写入dwd中,这里主要是处理count_order字段的值,因为源表中的数据存在逻辑删除和物理删除所以通过count函数统计会有问题,所以我们这里采用sum聚合来计算单量,每个reference_no对应的count_order是1,如果逻辑作废通过sql将它处理成0,物理删除paimon会自动处理。
dim维表我们这边直接拿的doris里面处理好的维表来使用,维表更新频率低,所以没有在paimon里面进行二次开发。