海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用(下)

本文涉及的产品
云数据库 RDS SQL Server,独享型 2核4GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
简介: 海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用
-- 在paimon-dwd层创建宽表
CREATE TABLE IF NOT EXISTS dwd.`dwd_business_order` (
`reference_no` varchar(50) NOT NULL COMMENT '委托单号主键',
`bondex_shy_flag` varchar(8) NOT NULL COMMENT '区分',
`is_server_item` int NOT NULL COMMENT '是否已经关联订单',
`order_type_name` varchar(50) NOT NULL COMMENT '业务分类',
`consignor_date` DATE COMMENT '统计日期',
`consignor_code` varchar(50) COMMENT '客户编号',
`consignor_name` varchar(160) COMMENT '客户名称',
`sales_code` varchar(32) NOT NULL COMMENT '销售编号',
`sales_name` varchar(200) NOT NULL COMMENT '销售名称',
`delivery_center_op_id` varchar(32) NOT NULL COMMENT '交付编号',
`delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称',
`pol_code` varchar(100) NOT NULL COMMENT '起运港代码',
`pot_code` varchar(100) NOT NULL COMMENT '中转港代码',
`port_of_dest_code` varchar(100) NOT NULL  COMMENT '目的港代码',
`is_delete` int not NULL COMMENT '是否作废',
`order_status` varchar(8) NOT NULL COMMENT '订单状态',
`count_order` BIGINT not NULL COMMENT '订单数',
`o_year` BIGINT NOT NULL COMMENT '分区字段',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`o_year`,`reference_no`,`bondex_shy_flag`) NOT ENFORCED
) PARTITIONED BY (`o_year`)
WITH (
-- 每个 partition 下设置 2 个 bucket
'bucket' = '2',
'changelog-producer' = 'full-compaction',
'snapshot.time-retained' = '2h',
'changelog-producer.compaction-interval' = '2m'
);
-- 设置作业名,将ods层的相关业务表合并写入到dwd层
SET 'pipeline.name' = 'dwd_business_order';
INSERT INTO
dwd.`dwd_business_order`
SELECT
o.doccode,
......,
YEAR (o.docdate) AS o_year
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
ods.ods_shy_jh_doc_hdworkdochd o
INNER JOIN ods.ods_shy_base_enterprise en ON o.businessguid = en.entguid
LEFT JOIN dim.dim_hhl_user_code sales ON o.salesguid = sales.USER_GUID
LEFT JOIN dim.dim_hhl_user_code op ON o.bookingguid = op.USER_GUID
UNION ALL
SELECT
business_no,
......,
YEAR ( gmt_create ) AS o_year
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
ods.ods_bondexsea_doc_order
UNION ALL
SELECT
  HBLIndex,
 ......,
  YEAR ( CreateOPDate ) AS o_year
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
  ods.`ods_airsea_airfreight_orderhawb`
;

flink ui可以看到ods数据经过paimon实时join清洗到表dwd_business_order/



3. 将dwd层数据轻度聚合到dwm层中,将相关数据写入

dwm.`dwm_business_order_count`表中,该表数据会根据主键对聚合字段做sum,sum_orderCount字段就是聚合结果,物理删除的数据sum时paimon会自动处理。

-- 创建dwm层轻度汇总表,根据日期、销售、操作、业务类别、客户、起运港、目的港汇总单量
CREATE TABLE IF NOT EXISTS dwm.`dwm_business_order_count` (
`l_year` BIGINT NOT NULL COMMENT '统计年',
`l_month` BIGINT NOT NULL COMMENT '统计月',
`l_date` DATE NOT NULL  COMMENT '统计日期',
`bondex_shy_flag` varchar(8) NOT NULL COMMENT '区分',
`order_type_name` varchar(50) NOT NULL COMMENT '业务分类',
`is_server_item` int NOT NULL COMMENT '是否已经关联订单',
`customer_code` varchar(50) NOT NULL COMMENT '客户编号',
`sales_code` varchar(50) NOT NULL COMMENT '销售编号',
`delivery_center_op_id` varchar(50) NOT NULL COMMENT '交付编号',
`pol_code` varchar(100) NOT NULL COMMENT '起运港代码',
`pot_code` varchar(100) NOT NULL COMMENT '中转港代码',
`port_of_dest_code` varchar(100) NOT NULL COMMENT '目的港代码',
`customer_name` varchar(200) NOT NULL COMMENT '客户名称',
`sales_name` varchar(200) NOT NULL COMMENT '销售名称',
`delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称',
`sum_orderCount` BIGINT NOT NULL COMMENT '订单数',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`l_year`, `l_month`,`l_date`,`order_type_name`,`bondex_shy_flag`,`is_server_item`,`customer_code`,`sales_code`,`delivery_center_op_id`,`pol_code`,`pot_code`,`port_of_dest_code`) NOT ENFORCED
) WITH (
'changelog-producer' = 'full-compaction',
  'changelog-producer.compaction-interval' = '2m',
  'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum
  'fields.sum_orderCount.aggregate-function' = 'sum',
  'fields.create_date.ignore-retract'='true',
  'fields.sales_name.ignore-retract'='true',
  'fields.customer_name.ignore-retract'='true',
  'snapshot.time-retained' = '2h',
'fields.delivery_center_op_name.ignore-retract'='true'
);
-- 设置作业名
SET 'pipeline.name' = 'dwm_business_order_count';
INSERT INTO
dwm.`dwm_business_order_count`
SELECT
YEAR(o.`consignor_date`) AS `l_year`
,MONTH(o.`consignor_date`) AS `l_month`
......,
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS create_date
FROM
dwd.`dwd_business_order` o
;

Flink UI效果如下dwd_business_orders数据聚合写到dwm_business_order_count:



4. 将dwm层数据聚合到dws层中,dws层是做了更小维度的汇总

-- 创建根据操作人、业务类型聚合当天的单量
CREATE TABLE IF NOT EXISTS dws.`dws_business_order_count_op` (
  `l_year` BIGINT NOT NULL COMMENT '统计年',
  `l_month` BIGINT NOT NULL COMMENT '统计月',
  `l_date` DATE NOT NULL  COMMENT '统计日期',
  `order_type_name` varchar(50) NOT NULL COMMENT '业务分类',
  `delivery_center_op_id` varchar(50) NOT NULL COMMENT '交付编号',
  `delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称',
  `sum_orderCount` BIGINT NOT NULL COMMENT '订单数',
  `create_date` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`l_year`, `l_month`,`l_date`,`order_type_name`,`delivery_center_op_id`) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum
  'fields.sum_orderCount.aggregate-function' = 'sum',
  'fields.create_date.ignore-retract'='true',
  'snapshot.time-retained' = '2h',
  'fields.delivery_center_op_name.ignore-retract'='true'
);
-- 设置作业名
SET 'pipeline.name' = 'dws_business_order_count_op';
INSERT INTO
  dws.`dws_business_order_count_op`
SELECT
  o.`l_year`
  ,o.`l_month`
  ,o.`l_date`
  ,o.`order_type_name`
  ,o.`delivery_center_op_id`
  ,o.`delivery_center_op_name`
  ,o.`sum_orderCount`
  ,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS create_date
FROM
  dwm.`dwm_business_order_count` o
;

Flink UI效果如下 dws_business_order_count_op数据写到dws_business_order_count_op:

总体数据流转示例

源表:

paimon-ods:

paimon-dwd:

paimon-dwm:

paimon-dws:

特别提醒sqlserver数据库抽取时如果源表数据量过大全量抽取会锁表,建议在业务允许的情况下采用增量抽取。对于全量抽取sqlserver可以采用中转的方式sqlserver全量数据导入到mysql,从mysql再到paimon-ods,后面再通过sqlserever做增量抽取。


04

问题排查分析


1. 聚合数据计算不准


sqlserver cdc 采集数据到 paimon 表

说明:

dwd 表:

'changelog-producer' = 'input'

ads 表:

'merge-engine' = 'aggregation',   -- 使用 aggregation 聚合计算 sum

'fields.sum_amount.aggregate-function' = 'sum'


ADS 层聚合表采用 agg sum 会出现 dwd 数据流不产生 update_before,产生错误数据流 update_after  比如上游源表 update 10 到 30  dwd 层数据会变更为 30,ads 聚合层数据也会变更为 30,但是现在变为了 append 数据变成了 10+30=40 的错误数据。


解决办法:

By specifying 'changelog-producer' = 'full-compaction', Table Store will compare the results between full compactions and produce the differences as changelog. The latency of changelog is affected by the frequency of full compactions.

By specifying changelog-producer.compaction-interval table property (default value 30min), users can define the maximum interval between two full compactions to ensure latency. This table property does not affect normal compactions and they may still be performed once in a while by writers to reduce reader costs.


这样能解决上述问题。但是随之而来出现了新的问题。默认 changelog-producer.compaction-interval 是 30min,意味着 上游的改动到 ads 查询要间隔 30min,生产过程中发现将压缩间隔时间改成 1min 或者 2 分钟的情况下,又会出现上述 ADS 层聚合数据不准的情况。


'changelog-producer.compaction-interval' = '2m'


需要在写入 Flink Table Store 时需要配置 table.exec.sink.upsert-materialize= none,避免产生 Upsert 流,以保证 Flink Table Store 中能够保存完整的 changelog,为后续的流读操作做准备。


set 'table.exec.sink.upsert-materialize' = 'none'


2. 相同 sequence.field 导致 dwd 明细宽表无法收到 update 数据更新

mysql cdc 采集数据到 paimon 表


说明:

在 MySQL 源端执行 update

数据修改成功后,dwd_orders 表数据能同步成功


但是查看 dwd_enriched_orders 表数据无法同步,启动流模式查看数据,发现没有数据流向



解决:

排查发现是由于配置了参数 'sequence.field' = 'o_orderdate'(使用 o_orderdate 生成 sequence id,相同主键合并时选择 sequence id 更大的记录)导致的,因为在修改价格的时候 o_orderdate 字段时间不变, 继而'sequence.field' 是相同的,导致顺序不确定,所以 ROW1 和 ROW2,它们的 o_orderdate 是一样的,所以在更新时会随机选择,所有将该参数去掉即可,去掉后正常按照输入的先后顺序,自动生成一个 sequence number,不会影响同步结果。


3. Aggregate function 'last_non_null_value' does not support retraction


报错:Caused by: java.lang.UnsupportedOperationException: Aggregate function 'last_non_null_value' does not support retraction, If you allow this function to ignore retraction messages, you can configure 'fields.${field_name}.ignore-retract'='true'.


可以在官方文档找到解释:

Only sum supports retraction (UPDATE_BEFORE and DELETE), others aggregate functions do not support retraction.


可以理解为:除了 SUM 函数,其他的 Agg 函数都不支持 Retraction,为了避免接收到 DELETE 和 UPDATEBEFORE 消息报错,需要通过给指定字段配'fields.${field_name}.ignore-retract'='true' 忽略,解决这个报错

WITH (

'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum

'fields.sum_orderCount.aggregate-function' = 'sum',

'fields.create_date.ignore-retract'='true'  #create_date 字段

);


4. paimon任务中断失败


任务异常中断 导致pod挂掉


查看loki日志显示akka.pattern.AskTimeoutException: Ask timed out on


java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(JobMasterGateway.updateTaskExecutionState(TaskExecutionState))] at recipient [akka.tcp://flink@fts-business-order-count.streamx:6123/user/rpc/jobmanager_2] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.\n"


初步判断应该是由于以上2个原因导致触发了akka的超时机制,那就调整集群的akka超时间配置和进行单个任务拆分或者调大资源配置。


我们这边先看如何进行参数修改:


key

default

describe

akka.ask.timeout

10s

Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d).

web.timeout

 

600000

 

Timeout for asynchronous operations by the web monitor in milliseconds.


在conf/flink-conf.yaml最后增加下面参数

akka.ask.timeout: 100s

web.timeout:1000000

然后在streampark手动刷新下flink-conf.yaml验证参数是否同步成功。


5. snapshot no such file or director


发现cp出现失败情况

对应时间点日志显示Snapshot丢失,任务显示为running状态,但是源表mysql数据无法写入paimon ods表

定位cp失败原因为:计算量大,CPU密集性,导致TM内线程一直在processElement,而没有时间做CP

无法读取Snapshot原因为:flink集群资源不够,Writer和Committer产生竞争,Full-Compaction时读到了已过期部分的不完整的Snapshot,目前官方针对这个问题已经修复


https://github.com/apache/incubator-paimon/pull/1308



而解决cp失败的解决办法增加并行度,增加deploymenttaskmanager slot和jobmanager cpu


-D kubernetes.jobmanager.cpu=0.8

-D kubernetes.jobmanager.cpu.limit-factor=1

-D taskmanager.numberOfTaskSlots=8

-D jobmanager.adaptive-batch-scheduler.default-source-parallelism=2



在复杂的实时任务中,可以通过修改动态参数的方式,增加资源。


05

未来规划


  • 自建的数据平台bondata正在集成paimon的元数据信息、数据指标体系、血缘、一键pipline等功能,形成海程邦达的数据资产,并将在此基础上展开一站式数据治理
  • 后面将基于trino Catalog接入Doris,实现真正的离线数据和实时数据的one service
  • 采用doris+paimon的架构方案继续推进集团内部流批一体数仓建设的步伐



在这里要感谢之信老师在使用paimon过程中的大力支持,在学习使用过程中遇到的问题,都能在第一时间给到解惑并得到解决,我们后面也会积极参与社区的交流和建设,让paimon能为更多开发者和企业提供流批一体的数据湖解决方案。







作者简介 PROFILE

王新

数据架构师 专注于大数据流批一体架构在企业的落地实现。

周文轩

数据开发工程师 对数据分析和流计算的实时开发充满热情。


大数据流动:专注于大数据、数据治理、人工智能相关知识分享

作者独孤风,港口工人转行成为国企大数据负责人,不断自学考研考证充实自己。

提供大数据,数据治理,人工智能相关技术实践与理论学习交流群。

大数据流动,学习永不止步。

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
18天前
|
运维 Linux Apache
Linux Apache服务详解——Apache虚拟目录与禁止显示目录列表实战
Linux Apache服务详解——Apache虚拟目录与禁止显示目录列表实战
32 2
|
18天前
|
存储 机器学习/深度学习 Apache
如何将Apache Hudi应用于机器学习
如何将Apache Hudi应用于机器学习
27 0
|
18天前
|
SQL Java 数据库连接
apache DbUtils 组件核心原理与应用
DbUtils 的设计思想是简化 JDBC 编程,通过封装 JDBC 操作,减少样板代码,提高开发效率。它通过 QueryRunner、ResultSetHandler 和 RowProcessor 的协同工作,实现了对 JDBC 资源的精细化管理,同时避免了资源泄漏的风险。DbUtils 的使用不涉及复杂的配置和ORM映射,适合需要快速、轻量级数据库操作的场景。
|
18天前
|
存储 分布式计算 Apache
官宣|Apache Paimon 毕业成为顶级项目,数据湖步入实时新篇章!
Apache Paimon 在构建实时数据湖与流批处理技术领域取得了重大突破,数据湖步入实时新篇章!
2371 6
官宣|Apache Paimon 毕业成为顶级项目,数据湖步入实时新篇章!
|
18天前
|
Java 数据处理 调度
更高效准确的数据库内部任务调度实践,阿里云数据库SelectDB 内核 Apache Doris 内置 Job Scheduler 的实现与应用
Apache Doris 2.1 引入了内置的 Job Scheduler,旨在解决依赖外部调度系统的问题,提供秒级精确的定时任务管理。
|
18天前
|
运维 Linux Apache
LAMP架构调优(九)——Apache Rewrite功能实战
LAMP架构调优(九)——Apache Rewrite功能实战
17 1
|
18天前
|
监控 API Apache
实战!配置DataDog监控Apache Hudi应用指标
实战!配置DataDog监控Apache Hudi应用指标
27 0
|
18天前
|
消息中间件 JSON Kafka
实战 | Apache Hudi回调功能简介及使用示例
实战 | Apache Hudi回调功能简介及使用示例
19 0
|
18天前
|
存储 机器学习/深度学习 分布式计算
Apache Hudi在Hopsworks机器学习的应用
Apache Hudi在Hopsworks机器学习的应用
41 0
|
10天前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错之Apache Flink中的SplitFetcher线程在读取数据时遇到了未预期的情况,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。

推荐镜像

更多