海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用(中)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
简介: 海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用

在streampark添加flink conf




构建 flink1.16.0 基础镜像从 dockerhub拉取对应版本的镜像

#拉取镜像
docker pull flink:1.16.0-scala_2.12-java8
#打上 tag
docker tagflink:1.16.0-scala_2.12-java8  registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8
#push 到公司仓库
docker pushregistry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8

创建 Dockerfile 文件 & target 目录将 flink-oss-fs-hadoop JAR包放置在该目录 Shaded Hadoop OSS file system jar包下载地址:https://repository.apache.org/snapshots/org/apache/paimon/paimon-oss/

.

├── Dockerfile

└── target

 └── flink-oss-fs-hadoop-1.16.0.jar

touch Dockerfile
mkdir target
#vim Dockerfile
FROM registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8
RUN mkdir /opt/flink/plugins/oss-fs-hadoop
COPY target/flink-oss-fs-hadoop-1.16.0.jar  /opt/flink/plugins/oss-fs-hadoop

#build 基础镜像

docker build -t flink-table-store:v1.16.0 .
docker tag flink-table-store:v1.16.0 registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0
docker push registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0

#准备 paimon jar 包

可以在 Apache Repository下载对应版本,需要注意的是要和 flink 大版本保持一致


使用 streampark 平台提交 paimon 任务


前提条件:

Kubernetes 客户端连接配置

Kubernetes RBAC 配置

容器镜像仓库配置 (案例中使用的是阿里云的容器镜像服务个人免费版)

创建挂载 checkpoint/savepoint 的 pvc 资源


Kubernetes 客户端连接配置:

将k8s master节点~/.kube/config配置直接拷贝到streampark服务器的目录,之后在streampark服务器执行以下命令显示k8s集群running代表权限和网络验证成功。

kubectl cluster-info


Kubernetes RBAC 配置

创建streamx命名空间

kubectl create ns streamx


使用default账户创建clusterrolebinding 资源

kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=streamx:default


容器镜像仓库配置


案例中使用阿里云容器镜像服务ACR,也可以使用自建镜像服务harbor代替。

创建命名空间streampark(安全设置需要设置为私有)



在streampark配置镜像仓库,任务构建镜像会推送到镜像仓库


创建k8s secret密钥用来拉取ACR中的镜像 streamparksecret为密钥名称 自定义

kubectl create secret docker-registry streamparksecret --docker-server=registry-vpc.cn-zhangjiakou.aliyuncs.com --docker-username=xxxxxx --docker-password=xxxxxx -n streamx


创建挂载 checkpoint/savepoint 的 pvc 资源


基于阿里云的对象存储OSS做K8S的持久化


OSS CSI 插件:

可以使用 OSS CSI 插件来帮助简化存储管理。您可以使用 csi 配置创建 pv,并且 pvc、pod 像往常一样定义,yaml 文件参考:https://bondextest.oss-cn-zhangjiakou.aliyuncs.com/ossyaml.zip


配置要求:

- 创建具有所需 RBAC 权限的服务帐户

参考:https://github.com/kubernetes-sigs/alibaba-cloud-csi-driver/blob/master/docs/oss.md 

kubectl -f rbac.yaml

- 部署OSS CSI 插件

kubectl -f oss-plugin.yaml

- 创建CP&SP的PV


kubectl -f checkpoints_pv.yaml kubectl -f savepoints_pv.yaml

- 创建CP&SP的PVC


kubectl -f checkpoints_pvc.yaml kubectl -f savepoints_pvc.yaml

配置好依赖环境,接下来我们就开始使用paimon进行流式数仓的分层开发。


案例:

统计海运空运实时委托单量


任务提交:

初始化paimon catalog配置


SET 'execution.runtime-mode' = 'streaming';
set 'table.exec.sink.upsert-materialize' = 'none'; 
SET 'sql-client.execution.result-mode' = 'tableau';
-- 创建并使用 FTS Catalog 底层存储方案采用阿里云oss
CREATE CATALOG `table_store` WITH (
'type' = 'paimon',
'warehouse' = 'oss://xxxxx/xxxxx' #自定义oss存储路径
);
USE CATALOG `table_store`;

一个任务同时抽取postgres、mysql、sqlserver三种数据库的表数据写入到paimon




Development Mode:Flink SQL

Execution Mode :kubernetes application

Flink Version :flink-1.16.0-scala-2.12

Kubernetes Namespace :streamx

Kubernetes ClusterId :(任务名自定义即可)

Flink Base Docker Image : registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0    #上传到阿里云镜像仓库的基础镜像

Rest-Service Exposed Type:NodePort

paimon基础依赖包:

paimon-flink-1.16-0.4-20230424.001927-40.jar

flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

flinkcdc依赖包下载地址:

https://github.com/ververica/flink-cdc-connectors/releases/tag/release-2.2.0


pod template

apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
containers:
  - name: flink-main-container
    volumeMounts:
      - name: flink-checkpoints-csi-pvc
        mountPath: /opt/flink/checkpoints
      - name: flink-savepoints-csi-pvc
        mountPath: /opt/flink/savepoints
volumes:
  - name: flink-checkpoints-csi-pvc
    persistentVolumeClaim:
      claimName: flink-checkpoints-csi-pvc
  - name: flink-savepoints-csi-pvc
    persistentVolumeClaim:
      claimName: flink-savepoints-csi-pvc
imagePullSecrets:      
- name: streamparksecret

flink sql

1. 构建源表和paimon中ods表的关系,这里就是源表和目标表一对一映射

-- postgre数据库 示例
CREATE TEMPORARY TABLE `shy_doc_hdworkdochd` (
`doccode` varchar(50) not null COMMENT '主键',
`businessmodel` varchar(450) COMMENT '业务模式',
`businesstype` varchar(450)  COMMENT '业务性质',
`transporttype` varchar(50) COMMENT '运输类型',
......
`bookingguid` varchar(50) COMMENT '操作编号',
PRIMARY KEY (`doccode`) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '数据库服务器IP地址',
'port' = '端口号',
'username' = '用户名',
'password' = '密码',
'database-name' = '数据库名',
'schema-name' = 'dev',
'decoding.plugin.name' = 'wal2json',,
'table-name' = 'doc_hdworkdochd',
'debezium.slot.name' = 'hdworkdochdslotname03'
);
CREATE TEMPORARY TABLE `shy_base_enterprise` (
`entguid` varchar(50) not null COMMENT '主键',
`entorgcode` varchar(450) COMMENT '客户编号',
`entnature` varchar(450)  COMMENT '客户类型',
`entfullname` varchar(50) COMMENT '客户名称',
PRIMARY KEY (`entguid`,`entorgcode`) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '数据库服务器IP地址',
'port' = '端口号',
'username' = '用户名',
'password' = '密码',
'database-name' = '数据库名',
'schema-name' = 'dev',
'decoding.plugin.name' = 'wal2json',
'table-name' = 'base_enterprise',
'debezium.snapshot.mode'='never', -- 增量同步(全量+增量忽略该属性)
'debezium.slot.name' = 'base_enterprise_slotname03'
);
-- 根据源表结构在paimon上ods层创建对应的目标表
CREATE TABLE IF NOT EXISTS ods.`ods_shy_jh_doc_hdworkdochd` (
`o_year` BIGINT NOT NULL COMMENT '分区字段',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`o_year`, `doccode`) NOT ENFORCED
) PARTITIONED BY (`o_year`)
WITH (
'changelog-producer.compaction-interval' = '2m'
) LIKE `shy_doc_hdworkdochd` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS);
CREATE TABLE IF NOT EXISTS ods.`ods_shy_base_enterprise` (
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`entguid`,`entorgcode`) NOT ENFORCED
)
WITH (
'changelog-producer.compaction-interval' = '2m'
) LIKE `shy_base_enterprise` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS);
-- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中
SET 'pipeline.name' = 'ods_doc_hdworkdochd';
INSERT INTO
ods.`ods_shy_jh_doc_hdworkdochd`
SELECT
*
,YEAR(`docdate`) AS `o_year`
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
`shy_doc_hdworkdochd` where `docdate` is not null and `docdate` > '2023-01-01';
SET 'pipeline.name' = 'ods_shy_base_enterprise';
INSERT INTO
ods.`ods_shy_base_enterprise`
SELECT
*
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
`shy_base_enterprise` where entorgcode is not null and entorgcode <> '';
-- mysql数据库 示例
CREATE TEMPORARY TABLE `doc_order` (
`id` BIGINT NOT NULL COMMENT '主键',
`order_no` varchar(50) NOT NULL COMMENT '订单号',
`business_no` varchar(50) COMMENT 'OMS服务号',
......
`is_deleted` int COMMENT '是否作废',
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '数据库服务器地址',
'port' = '端口号',
'username' = '用户名',
'password' = '密码',
'database-name' = '库名',
'table-name' = 'doc_order'
);
-- 根据源表结构在paimon上ods层创建对应的目标表
CREATE TABLE IF NOT EXISTS ods.`ods_bondexsea_doc_order` (
`o_year` BIGINT NOT NULL COMMENT '分区字段',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`o_year`, `id`) NOT ENFORCED
) PARTITIONED BY (`o_year`)
WITH (
'changelog-producer.compaction-interval' = '2m'
) LIKE `doc_order` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS);
-- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中
SET 'pipeline.name' = 'ods_bondexsea_doc_order';
INSERT INTO
ods.`ods_bondexsea_doc_order`
SELECT
*
,YEAR(`gmt_create`) AS `o_year`
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM `doc_order` where gmt_create > '2023-01-01';
-- sqlserver数据库 示例
CREATE TEMPORARY TABLE `OrderHAWB` (
`HBLIndex` varchar(50) NOT NULL COMMENT '主键',
`CustomerNo` varchar(50) COMMENT '客户编号',
......
`CreateOPDate` timestamp COMMENT '制单日期',
PRIMARY KEY (`HBLIndex`) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = '数据库服务器地址',
'port' = '端口号',
'username' = '用户名',
'password' = '密码',
'database-name' = '数据库名',
'schema-name' = 'dbo',
-- 'debezium.snapshot.mode' = 'initial' -- 全量增量都抽取
'scan.startup.mode' = 'latest-offset',-- 只抽取增量数据
'table-name' = 'OrderHAWB'
);
-- 根据源表结构在paimon上ods层创建对应的目标表
CREATE TABLE IF NOT EXISTS ods.`ods_airsea_airfreight_orderhawb` (
`o_year` BIGINT NOT NULL COMMENT '分区字段',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`o_year`, `HBLIndex`) NOT ENFORCED
) PARTITIONED BY (`o_year`)
WITH (
'changelog-producer.compaction-interval' = '2m'
) LIKE `OrderHAWB` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS);
-- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中
SET 'pipeline.name' = 'ods_airsea_airfreight_orderhawb';
INSERT INTO
ods.`ods_airsea_airfreight_orderhawb`
SELECT
RTRIM(`HBLIndex`) as `HBLIndex`
......
,`CreateOPDate`
,YEAR(`CreateOPDate`) AS `o_year`
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM `OrderHAWB` where CreateOPDate > '2023-01-01';

业务表数据实时写入paimon ods表效果如下:

2. 将ods层表的数据打宽写入dwd层中,这里其实就是将ods层相关业务表合并写入dwd中,这里主要是处理count_order字段的值,因为源表中的数据存在逻辑删除和物理删除所以通过count函数统计会有问题,所以我们这里采用sum聚合来计算单量,每个reference_no对应的count_order是1,如果逻辑作废通过sql将它处理成0,物理删除paimon会自动处理。


dim维表我们这边直接拿的doris里面处理好的维表来使用,维表更新频率低,所以没有在paimon里面进行二次开发。


相关实践学习
通过workbench远程登录ECS,快速搭建Docker环境
本教程指导用户体验通过workbench远程登录ECS,完成搭建Docker环境的快速搭建,并使用Docker部署一个Nginx服务。
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
1月前
|
运维 Linux Apache
Linux Apache服务详解——Apache虚拟目录与禁止显示目录列表实战
Linux Apache服务详解——Apache虚拟目录与禁止显示目录列表实战
40 2
|
1月前
|
存储 机器学习/深度学习 Apache
如何将Apache Hudi应用于机器学习
如何将Apache Hudi应用于机器学习
30 0
|
3天前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
25 6
|
13天前
|
分布式计算 Spark 大数据
深入探究Apache Spark在大数据处理中的实践应用
【6月更文挑战第2天】Apache Spark是流行的开源大数据处理框架,以其内存计算速度和低延迟脱颖而出。本文涵盖Spark概述、核心组件(包括Spark Core、SQL、Streaming和MLlib)及其在数据预处理、批处理分析、交互式查询、实时处理和机器学习中的应用。通过理解Spark内部机制和实践应用,可提升大数据处理效率,发挥其在各行业的潜力。
|
1月前
|
SQL Java 数据库连接
apache DbUtils 组件核心原理与应用
DbUtils 的设计思想是简化 JDBC 编程,通过封装 JDBC 操作,减少样板代码,提高开发效率。它通过 QueryRunner、ResultSetHandler 和 RowProcessor 的协同工作,实现了对 JDBC 资源的精细化管理,同时避免了资源泄漏的风险。DbUtils 的使用不涉及复杂的配置和ORM映射,适合需要快速、轻量级数据库操作的场景。
|
1月前
|
存储 分布式计算 Apache
官宣|Apache Paimon 毕业成为顶级项目,数据湖步入实时新篇章!
Apache Paimon 在构建实时数据湖与流批处理技术领域取得了重大突破,数据湖步入实时新篇章!
2452 6
官宣|Apache Paimon 毕业成为顶级项目,数据湖步入实时新篇章!
|
1月前
|
Java 数据处理 调度
更高效准确的数据库内部任务调度实践,阿里云数据库SelectDB 内核 Apache Doris 内置 Job Scheduler 的实现与应用
Apache Doris 2.1 引入了内置的 Job Scheduler,旨在解决依赖外部调度系统的问题,提供秒级精确的定时任务管理。
|
1月前
|
运维 Linux Apache
LAMP架构调优(九)——Apache Rewrite功能实战
LAMP架构调优(九)——Apache Rewrite功能实战
20 1
|
1月前
|
监控 API Apache
实战!配置DataDog监控Apache Hudi应用指标
实战!配置DataDog监控Apache Hudi应用指标
28 0
|
Java 应用服务中间件 Apache
Apache 与tomcat实现分布式应用部署
一:原理 tomcat是一个web应用服务器,能够解析静态文件和动态文件(如:html、jsp、servlet等);apache是一个web server,能够解析静态文件。Tomcat作为一个独立的web服务器是可以使用的,但是它对静态文件的解析能力不如apache,所以就产生现在的web应用的分布式部署,apache+tomcat。 两者之间的通信通过workers配置(由tomc
2130 0

推荐镜像

更多