海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用(中)

本文涉及的产品
对象存储 OSS,20GB 3个月
实时计算 Flink 版,5000CU*H 3个月
对象存储 OSS,恶意文件检测 1000次 1年
简介: 海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用

在streampark添加flink conf




构建 flink1.16.0 基础镜像从 dockerhub拉取对应版本的镜像

#拉取镜像
docker pull flink:1.16.0-scala_2.12-java8
#打上 tag
docker tagflink:1.16.0-scala_2.12-java8  registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8
#push 到公司仓库
docker pushregistry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8

创建 Dockerfile 文件 & target 目录将 flink-oss-fs-hadoop JAR包放置在该目录 Shaded Hadoop OSS file system jar包下载地址:https://repository.apache.org/snapshots/org/apache/paimon/paimon-oss/

.

├── Dockerfile

└── target

 └── flink-oss-fs-hadoop-1.16.0.jar

touch Dockerfile
mkdir target
#vim Dockerfile
FROM registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8
RUN mkdir /opt/flink/plugins/oss-fs-hadoop
COPY target/flink-oss-fs-hadoop-1.16.0.jar  /opt/flink/plugins/oss-fs-hadoop

#build 基础镜像

docker build -t flink-table-store:v1.16.0 .
docker tag flink-table-store:v1.16.0 registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0
docker push registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0

#准备 paimon jar 包

可以在 Apache Repository下载对应版本,需要注意的是要和 flink 大版本保持一致


使用 streampark 平台提交 paimon 任务


前提条件:

Kubernetes 客户端连接配置

Kubernetes RBAC 配置

容器镜像仓库配置 (案例中使用的是阿里云的容器镜像服务个人免费版)

创建挂载 checkpoint/savepoint 的 pvc 资源


Kubernetes 客户端连接配置:

将k8s master节点~/.kube/config配置直接拷贝到streampark服务器的目录,之后在streampark服务器执行以下命令显示k8s集群running代表权限和网络验证成功。

kubectl cluster-info


Kubernetes RBAC 配置

创建streamx命名空间

kubectl create ns streamx


使用default账户创建clusterrolebinding 资源

kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=streamx:default


容器镜像仓库配置


案例中使用阿里云容器镜像服务ACR,也可以使用自建镜像服务harbor代替。

创建命名空间streampark(安全设置需要设置为私有)



在streampark配置镜像仓库,任务构建镜像会推送到镜像仓库


创建k8s secret密钥用来拉取ACR中的镜像 streamparksecret为密钥名称 自定义

kubectl create secret docker-registry streamparksecret --docker-server=registry-vpc.cn-zhangjiakou.aliyuncs.com --docker-username=xxxxxx --docker-password=xxxxxx -n streamx


创建挂载 checkpoint/savepoint 的 pvc 资源


基于阿里云的对象存储OSS做K8S的持久化


OSS CSI 插件:

可以使用 OSS CSI 插件来帮助简化存储管理。您可以使用 csi 配置创建 pv,并且 pvc、pod 像往常一样定义,yaml 文件参考:https://bondextest.oss-cn-zhangjiakou.aliyuncs.com/ossyaml.zip


配置要求:

- 创建具有所需 RBAC 权限的服务帐户

参考:https://github.com/kubernetes-sigs/alibaba-cloud-csi-driver/blob/master/docs/oss.md 

kubectl -f rbac.yaml

- 部署OSS CSI 插件

kubectl -f oss-plugin.yaml

- 创建CP&SP的PV


kubectl -f checkpoints_pv.yaml kubectl -f savepoints_pv.yaml

- 创建CP&SP的PVC


kubectl -f checkpoints_pvc.yaml kubectl -f savepoints_pvc.yaml

配置好依赖环境,接下来我们就开始使用paimon进行流式数仓的分层开发。


案例:

统计海运空运实时委托单量


任务提交:

初始化paimon catalog配置


SET 'execution.runtime-mode' = 'streaming';
set 'table.exec.sink.upsert-materialize' = 'none'; 
SET 'sql-client.execution.result-mode' = 'tableau';
-- 创建并使用 FTS Catalog 底层存储方案采用阿里云oss
CREATE CATALOG `table_store` WITH (
'type' = 'paimon',
'warehouse' = 'oss://xxxxx/xxxxx' #自定义oss存储路径
);
USE CATALOG `table_store`;

一个任务同时抽取postgres、mysql、sqlserver三种数据库的表数据写入到paimon




Development Mode:Flink SQL

Execution Mode :kubernetes application

Flink Version :flink-1.16.0-scala-2.12

Kubernetes Namespace :streamx

Kubernetes ClusterId :(任务名自定义即可)

Flink Base Docker Image : registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0    #上传到阿里云镜像仓库的基础镜像

Rest-Service Exposed Type:NodePort

paimon基础依赖包:

paimon-flink-1.16-0.4-20230424.001927-40.jar

flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

flinkcdc依赖包下载地址:

https://github.com/ververica/flink-cdc-connectors/releases/tag/release-2.2.0


pod template

apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
containers:
  - name: flink-main-container
    volumeMounts:
      - name: flink-checkpoints-csi-pvc
        mountPath: /opt/flink/checkpoints
      - name: flink-savepoints-csi-pvc
        mountPath: /opt/flink/savepoints
volumes:
  - name: flink-checkpoints-csi-pvc
    persistentVolumeClaim:
      claimName: flink-checkpoints-csi-pvc
  - name: flink-savepoints-csi-pvc
    persistentVolumeClaim:
      claimName: flink-savepoints-csi-pvc
imagePullSecrets:      
- name: streamparksecret

flink sql

1. 构建源表和paimon中ods表的关系,这里就是源表和目标表一对一映射

-- postgre数据库 示例
CREATE TEMPORARY TABLE `shy_doc_hdworkdochd` (
`doccode` varchar(50) not null COMMENT '主键',
`businessmodel` varchar(450) COMMENT '业务模式',
`businesstype` varchar(450)  COMMENT '业务性质',
`transporttype` varchar(50) COMMENT '运输类型',
......
`bookingguid` varchar(50) COMMENT '操作编号',
PRIMARY KEY (`doccode`) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '数据库服务器IP地址',
'port' = '端口号',
'username' = '用户名',
'password' = '密码',
'database-name' = '数据库名',
'schema-name' = 'dev',
'decoding.plugin.name' = 'wal2json',,
'table-name' = 'doc_hdworkdochd',
'debezium.slot.name' = 'hdworkdochdslotname03'
);
CREATE TEMPORARY TABLE `shy_base_enterprise` (
`entguid` varchar(50) not null COMMENT '主键',
`entorgcode` varchar(450) COMMENT '客户编号',
`entnature` varchar(450)  COMMENT '客户类型',
`entfullname` varchar(50) COMMENT '客户名称',
PRIMARY KEY (`entguid`,`entorgcode`) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '数据库服务器IP地址',
'port' = '端口号',
'username' = '用户名',
'password' = '密码',
'database-name' = '数据库名',
'schema-name' = 'dev',
'decoding.plugin.name' = 'wal2json',
'table-name' = 'base_enterprise',
'debezium.snapshot.mode'='never', -- 增量同步(全量+增量忽略该属性)
'debezium.slot.name' = 'base_enterprise_slotname03'
);
-- 根据源表结构在paimon上ods层创建对应的目标表
CREATE TABLE IF NOT EXISTS ods.`ods_shy_jh_doc_hdworkdochd` (
`o_year` BIGINT NOT NULL COMMENT '分区字段',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`o_year`, `doccode`) NOT ENFORCED
) PARTITIONED BY (`o_year`)
WITH (
'changelog-producer.compaction-interval' = '2m'
) LIKE `shy_doc_hdworkdochd` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS);
CREATE TABLE IF NOT EXISTS ods.`ods_shy_base_enterprise` (
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`entguid`,`entorgcode`) NOT ENFORCED
)
WITH (
'changelog-producer.compaction-interval' = '2m'
) LIKE `shy_base_enterprise` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS);
-- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中
SET 'pipeline.name' = 'ods_doc_hdworkdochd';
INSERT INTO
ods.`ods_shy_jh_doc_hdworkdochd`
SELECT
*
,YEAR(`docdate`) AS `o_year`
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
`shy_doc_hdworkdochd` where `docdate` is not null and `docdate` > '2023-01-01';
SET 'pipeline.name' = 'ods_shy_base_enterprise';
INSERT INTO
ods.`ods_shy_base_enterprise`
SELECT
*
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
`shy_base_enterprise` where entorgcode is not null and entorgcode <> '';
-- mysql数据库 示例
CREATE TEMPORARY TABLE `doc_order` (
`id` BIGINT NOT NULL COMMENT '主键',
`order_no` varchar(50) NOT NULL COMMENT '订单号',
`business_no` varchar(50) COMMENT 'OMS服务号',
......
`is_deleted` int COMMENT '是否作废',
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '数据库服务器地址',
'port' = '端口号',
'username' = '用户名',
'password' = '密码',
'database-name' = '库名',
'table-name' = 'doc_order'
);
-- 根据源表结构在paimon上ods层创建对应的目标表
CREATE TABLE IF NOT EXISTS ods.`ods_bondexsea_doc_order` (
`o_year` BIGINT NOT NULL COMMENT '分区字段',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`o_year`, `id`) NOT ENFORCED
) PARTITIONED BY (`o_year`)
WITH (
'changelog-producer.compaction-interval' = '2m'
) LIKE `doc_order` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS);
-- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中
SET 'pipeline.name' = 'ods_bondexsea_doc_order';
INSERT INTO
ods.`ods_bondexsea_doc_order`
SELECT
*
,YEAR(`gmt_create`) AS `o_year`
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM `doc_order` where gmt_create > '2023-01-01';
-- sqlserver数据库 示例
CREATE TEMPORARY TABLE `OrderHAWB` (
`HBLIndex` varchar(50) NOT NULL COMMENT '主键',
`CustomerNo` varchar(50) COMMENT '客户编号',
......
`CreateOPDate` timestamp COMMENT '制单日期',
PRIMARY KEY (`HBLIndex`) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = '数据库服务器地址',
'port' = '端口号',
'username' = '用户名',
'password' = '密码',
'database-name' = '数据库名',
'schema-name' = 'dbo',
-- 'debezium.snapshot.mode' = 'initial' -- 全量增量都抽取
'scan.startup.mode' = 'latest-offset',-- 只抽取增量数据
'table-name' = 'OrderHAWB'
);
-- 根据源表结构在paimon上ods层创建对应的目标表
CREATE TABLE IF NOT EXISTS ods.`ods_airsea_airfreight_orderhawb` (
`o_year` BIGINT NOT NULL COMMENT '分区字段',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`o_year`, `HBLIndex`) NOT ENFORCED
) PARTITIONED BY (`o_year`)
WITH (
'changelog-producer.compaction-interval' = '2m'
) LIKE `OrderHAWB` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS);
-- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中
SET 'pipeline.name' = 'ods_airsea_airfreight_orderhawb';
INSERT INTO
ods.`ods_airsea_airfreight_orderhawb`
SELECT
RTRIM(`HBLIndex`) as `HBLIndex`
......
,`CreateOPDate`
,YEAR(`CreateOPDate`) AS `o_year`
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM `OrderHAWB` where CreateOPDate > '2023-01-01';

业务表数据实时写入paimon ods表效果如下:

2. 将ods层表的数据打宽写入dwd层中,这里其实就是将ods层相关业务表合并写入dwd中,这里主要是处理count_order字段的值,因为源表中的数据存在逻辑删除和物理删除所以通过count函数统计会有问题,所以我们这里采用sum聚合来计算单量,每个reference_no对应的count_order是1,如果逻辑作废通过sql将它处理成0,物理删除paimon会自动处理。


dim维表我们这边直接拿的doris里面处理好的维表来使用,维表更新频率低,所以没有在paimon里面进行二次开发。


相关实践学习
通过容器镜像仓库与容器服务快速部署spring-hello应用
本教程主要讲述如何将本地Java代码程序上传并在云端以容器化的构建、传输和运行。
Kubernetes极速入门
Kubernetes(K8S)是Google在2014年发布的一个开源项目,用于自动化容器化应用程序的部署、扩展和管理。Kubernetes通常结合docker容器工作,并且整合多个运行着docker容器的主机集群。 本课程从Kubernetes的简介、功能、架构,集群的概念、工具及部署等各个方面进行了详细的讲解及展示,通过对本课程的学习,可以对Kubernetes有一个较为全面的认识,并初步掌握Kubernetes相关的安装部署及使用技巧。本课程由黑马程序员提供。 &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情:&nbsp;https://www.aliyun.com/product/kubernetes
相关文章
|
18天前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
64 5
|
17天前
|
存储 数据挖掘 数据处理
巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践
随着数据湖技术的发展,企业纷纷探索其优化潜力。本文分享了巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践。Paimon 支持流式和批处理,提供高性能、统一的数据访问和流批一体的优势。通过示例代码和实践经验,展示了如何高效处理实时数据,解决了数据一致性和故障恢复等挑战。
99 61
|
17天前
|
分布式计算 大数据 Apache
Apache Spark & Paimon Meetup · 北京站,助力 LakeHouse 架构生产落地
2024年11月15日13:30北京市朝阳区阿里中心-望京A座-05F,阿里云 EMR 技术团队联合 Apache Paimon 社区举办 Apache Spark & Paimon meetup,助力企业 LakeHouse 架构生产落地”线下 meetup,欢迎报名参加!
84 3
|
1月前
|
存储 分布式计算 druid
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
57 1
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
|
1月前
|
存储 数据挖掘 数据处理
Apache Paimon 是一款高性能的数据湖框架,支持流式和批处理,适用于实时数据分析
【10月更文挑战第8天】随着数据湖技术的发展,越来越多企业开始利用这一技术优化数据处理。Apache Paimon 是一款高性能的数据湖框架,支持流式和批处理,适用于实时数据分析。本文分享了巴别时代在构建基于 Paimon 的 Streaming Lakehouse 的探索和实践经验,包括示例代码和实际应用中的优势与挑战。
63 1
|
1月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
40 3
|
3月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
45 1
|
28天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
576 13
Apache Flink 2.0-preview released
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
63 3
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。

推荐镜像

更多