海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用(下)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
简介: 海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用
-- 在paimon-dwd层创建宽表
CREATE TABLE IF NOT EXISTS dwd.`dwd_business_order` (
`reference_no` varchar(50) NOT NULL COMMENT '委托单号主键',
`bondex_shy_flag` varchar(8) NOT NULL COMMENT '区分',
`is_server_item` int NOT NULL COMMENT '是否已经关联订单',
`order_type_name` varchar(50) NOT NULL COMMENT '业务分类',
`consignor_date` DATE COMMENT '统计日期',
`consignor_code` varchar(50) COMMENT '客户编号',
`consignor_name` varchar(160) COMMENT '客户名称',
`sales_code` varchar(32) NOT NULL COMMENT '销售编号',
`sales_name` varchar(200) NOT NULL COMMENT '销售名称',
`delivery_center_op_id` varchar(32) NOT NULL COMMENT '交付编号',
`delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称',
`pol_code` varchar(100) NOT NULL COMMENT '起运港代码',
`pot_code` varchar(100) NOT NULL COMMENT '中转港代码',
`port_of_dest_code` varchar(100) NOT NULL  COMMENT '目的港代码',
`is_delete` int not NULL COMMENT '是否作废',
`order_status` varchar(8) NOT NULL COMMENT '订单状态',
`count_order` BIGINT not NULL COMMENT '订单数',
`o_year` BIGINT NOT NULL COMMENT '分区字段',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`o_year`,`reference_no`,`bondex_shy_flag`) NOT ENFORCED
) PARTITIONED BY (`o_year`)
WITH (
-- 每个 partition 下设置 2 个 bucket
'bucket' = '2',
'changelog-producer' = 'full-compaction',
'snapshot.time-retained' = '2h',
'changelog-producer.compaction-interval' = '2m'
);
-- 设置作业名,将ods层的相关业务表合并写入到dwd层
SET 'pipeline.name' = 'dwd_business_order';
INSERT INTO
dwd.`dwd_business_order`
SELECT
o.doccode,
......,
YEAR (o.docdate) AS o_year
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
ods.ods_shy_jh_doc_hdworkdochd o
INNER JOIN ods.ods_shy_base_enterprise en ON o.businessguid = en.entguid
LEFT JOIN dim.dim_hhl_user_code sales ON o.salesguid = sales.USER_GUID
LEFT JOIN dim.dim_hhl_user_code op ON o.bookingguid = op.USER_GUID
UNION ALL
SELECT
business_no,
......,
YEAR ( gmt_create ) AS o_year
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
ods.ods_bondexsea_doc_order
UNION ALL
SELECT
  HBLIndex,
 ......,
  YEAR ( CreateOPDate ) AS o_year
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
  ods.`ods_airsea_airfreight_orderhawb`
;

flink ui可以看到ods数据经过paimon实时join清洗到表dwd_business_order/



3. 将dwd层数据轻度聚合到dwm层中,将相关数据写入

dwm.`dwm_business_order_count`表中,该表数据会根据主键对聚合字段做sum,sum_orderCount字段就是聚合结果,物理删除的数据sum时paimon会自动处理。

-- 创建dwm层轻度汇总表,根据日期、销售、操作、业务类别、客户、起运港、目的港汇总单量
CREATE TABLE IF NOT EXISTS dwm.`dwm_business_order_count` (
`l_year` BIGINT NOT NULL COMMENT '统计年',
`l_month` BIGINT NOT NULL COMMENT '统计月',
`l_date` DATE NOT NULL  COMMENT '统计日期',
`bondex_shy_flag` varchar(8) NOT NULL COMMENT '区分',
`order_type_name` varchar(50) NOT NULL COMMENT '业务分类',
`is_server_item` int NOT NULL COMMENT '是否已经关联订单',
`customer_code` varchar(50) NOT NULL COMMENT '客户编号',
`sales_code` varchar(50) NOT NULL COMMENT '销售编号',
`delivery_center_op_id` varchar(50) NOT NULL COMMENT '交付编号',
`pol_code` varchar(100) NOT NULL COMMENT '起运港代码',
`pot_code` varchar(100) NOT NULL COMMENT '中转港代码',
`port_of_dest_code` varchar(100) NOT NULL COMMENT '目的港代码',
`customer_name` varchar(200) NOT NULL COMMENT '客户名称',
`sales_name` varchar(200) NOT NULL COMMENT '销售名称',
`delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称',
`sum_orderCount` BIGINT NOT NULL COMMENT '订单数',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`l_year`, `l_month`,`l_date`,`order_type_name`,`bondex_shy_flag`,`is_server_item`,`customer_code`,`sales_code`,`delivery_center_op_id`,`pol_code`,`pot_code`,`port_of_dest_code`) NOT ENFORCED
) WITH (
'changelog-producer' = 'full-compaction',
  'changelog-producer.compaction-interval' = '2m',
  'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum
  'fields.sum_orderCount.aggregate-function' = 'sum',
  'fields.create_date.ignore-retract'='true',
  'fields.sales_name.ignore-retract'='true',
  'fields.customer_name.ignore-retract'='true',
  'snapshot.time-retained' = '2h',
'fields.delivery_center_op_name.ignore-retract'='true'
);
-- 设置作业名
SET 'pipeline.name' = 'dwm_business_order_count';
INSERT INTO
dwm.`dwm_business_order_count`
SELECT
YEAR(o.`consignor_date`) AS `l_year`
,MONTH(o.`consignor_date`) AS `l_month`
......,
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS create_date
FROM
dwd.`dwd_business_order` o
;

Flink UI效果如下dwd_business_orders数据聚合写到dwm_business_order_count:



4. 将dwm层数据聚合到dws层中,dws层是做了更小维度的汇总

-- 创建根据操作人、业务类型聚合当天的单量
CREATE TABLE IF NOT EXISTS dws.`dws_business_order_count_op` (
  `l_year` BIGINT NOT NULL COMMENT '统计年',
  `l_month` BIGINT NOT NULL COMMENT '统计月',
  `l_date` DATE NOT NULL  COMMENT '统计日期',
  `order_type_name` varchar(50) NOT NULL COMMENT '业务分类',
  `delivery_center_op_id` varchar(50) NOT NULL COMMENT '交付编号',
  `delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称',
  `sum_orderCount` BIGINT NOT NULL COMMENT '订单数',
  `create_date` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`l_year`, `l_month`,`l_date`,`order_type_name`,`delivery_center_op_id`) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum
  'fields.sum_orderCount.aggregate-function' = 'sum',
  'fields.create_date.ignore-retract'='true',
  'snapshot.time-retained' = '2h',
  'fields.delivery_center_op_name.ignore-retract'='true'
);
-- 设置作业名
SET 'pipeline.name' = 'dws_business_order_count_op';
INSERT INTO
  dws.`dws_business_order_count_op`
SELECT
  o.`l_year`
  ,o.`l_month`
  ,o.`l_date`
  ,o.`order_type_name`
  ,o.`delivery_center_op_id`
  ,o.`delivery_center_op_name`
  ,o.`sum_orderCount`
  ,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS create_date
FROM
  dwm.`dwm_business_order_count` o
;

Flink UI效果如下 dws_business_order_count_op数据写到dws_business_order_count_op:

总体数据流转示例

源表:

paimon-ods:

paimon-dwd:

paimon-dwm:

paimon-dws:

特别提醒sqlserver数据库抽取时如果源表数据量过大全量抽取会锁表,建议在业务允许的情况下采用增量抽取。对于全量抽取sqlserver可以采用中转的方式sqlserver全量数据导入到mysql,从mysql再到paimon-ods,后面再通过sqlserever做增量抽取。


04

问题排查分析


1. 聚合数据计算不准


sqlserver cdc 采集数据到 paimon 表

说明:

dwd 表:

'changelog-producer' = 'input'

ads 表:

'merge-engine' = 'aggregation',   -- 使用 aggregation 聚合计算 sum

'fields.sum_amount.aggregate-function' = 'sum'


ADS 层聚合表采用 agg sum 会出现 dwd 数据流不产生 update_before,产生错误数据流 update_after  比如上游源表 update 10 到 30  dwd 层数据会变更为 30,ads 聚合层数据也会变更为 30,但是现在变为了 append 数据变成了 10+30=40 的错误数据。


解决办法:

By specifying 'changelog-producer' = 'full-compaction', Table Store will compare the results between full compactions and produce the differences as changelog. The latency of changelog is affected by the frequency of full compactions.

By specifying changelog-producer.compaction-interval table property (default value 30min), users can define the maximum interval between two full compactions to ensure latency. This table property does not affect normal compactions and they may still be performed once in a while by writers to reduce reader costs.


这样能解决上述问题。但是随之而来出现了新的问题。默认 changelog-producer.compaction-interval 是 30min,意味着 上游的改动到 ads 查询要间隔 30min,生产过程中发现将压缩间隔时间改成 1min 或者 2 分钟的情况下,又会出现上述 ADS 层聚合数据不准的情况。


'changelog-producer.compaction-interval' = '2m'


需要在写入 Flink Table Store 时需要配置 table.exec.sink.upsert-materialize= none,避免产生 Upsert 流,以保证 Flink Table Store 中能够保存完整的 changelog,为后续的流读操作做准备。


set 'table.exec.sink.upsert-materialize' = 'none'


2. 相同 sequence.field 导致 dwd 明细宽表无法收到 update 数据更新

mysql cdc 采集数据到 paimon 表


说明:

在 MySQL 源端执行 update

数据修改成功后,dwd_orders 表数据能同步成功


但是查看 dwd_enriched_orders 表数据无法同步,启动流模式查看数据,发现没有数据流向



解决:

排查发现是由于配置了参数 'sequence.field' = 'o_orderdate'(使用 o_orderdate 生成 sequence id,相同主键合并时选择 sequence id 更大的记录)导致的,因为在修改价格的时候 o_orderdate 字段时间不变, 继而'sequence.field' 是相同的,导致顺序不确定,所以 ROW1 和 ROW2,它们的 o_orderdate 是一样的,所以在更新时会随机选择,所有将该参数去掉即可,去掉后正常按照输入的先后顺序,自动生成一个 sequence number,不会影响同步结果。


3. Aggregate function 'last_non_null_value' does not support retraction


报错:Caused by: java.lang.UnsupportedOperationException: Aggregate function 'last_non_null_value' does not support retraction, If you allow this function to ignore retraction messages, you can configure 'fields.${field_name}.ignore-retract'='true'.


可以在官方文档找到解释:

Only sum supports retraction (UPDATE_BEFORE and DELETE), others aggregate functions do not support retraction.


可以理解为:除了 SUM 函数,其他的 Agg 函数都不支持 Retraction,为了避免接收到 DELETE 和 UPDATEBEFORE 消息报错,需要通过给指定字段配'fields.${field_name}.ignore-retract'='true' 忽略,解决这个报错

WITH (

'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum

'fields.sum_orderCount.aggregate-function' = 'sum',

'fields.create_date.ignore-retract'='true'  #create_date 字段

);


4. paimon任务中断失败


任务异常中断 导致pod挂掉


查看loki日志显示akka.pattern.AskTimeoutException: Ask timed out on


java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(JobMasterGateway.updateTaskExecutionState(TaskExecutionState))] at recipient [akka.tcp://flink@fts-business-order-count.streamx:6123/user/rpc/jobmanager_2] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.\n"


初步判断应该是由于以上2个原因导致触发了akka的超时机制,那就调整集群的akka超时间配置和进行单个任务拆分或者调大资源配置。


我们这边先看如何进行参数修改:


key

default

describe

akka.ask.timeout

10s

Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d).

web.timeout

 

600000

 

Timeout for asynchronous operations by the web monitor in milliseconds.


在conf/flink-conf.yaml最后增加下面参数

akka.ask.timeout: 100s

web.timeout:1000000

然后在streampark手动刷新下flink-conf.yaml验证参数是否同步成功。


5. snapshot no such file or director


发现cp出现失败情况

对应时间点日志显示Snapshot丢失,任务显示为running状态,但是源表mysql数据无法写入paimon ods表

定位cp失败原因为:计算量大,CPU密集性,导致TM内线程一直在processElement,而没有时间做CP

无法读取Snapshot原因为:flink集群资源不够,Writer和Committer产生竞争,Full-Compaction时读到了已过期部分的不完整的Snapshot,目前官方针对这个问题已经修复


https://github.com/apache/incubator-paimon/pull/1308



而解决cp失败的解决办法增加并行度,增加deploymenttaskmanager slot和jobmanager cpu


-D kubernetes.jobmanager.cpu=0.8

-D kubernetes.jobmanager.cpu.limit-factor=1

-D taskmanager.numberOfTaskSlots=8

-D jobmanager.adaptive-batch-scheduler.default-source-parallelism=2



在复杂的实时任务中,可以通过修改动态参数的方式,增加资源。


05

未来规划


  • 自建的数据平台bondata正在集成paimon的元数据信息、数据指标体系、血缘、一键pipline等功能,形成海程邦达的数据资产,并将在此基础上展开一站式数据治理
  • 后面将基于trino Catalog接入Doris,实现真正的离线数据和实时数据的one service
  • 采用doris+paimon的架构方案继续推进集团内部流批一体数仓建设的步伐



在这里要感谢之信老师在使用paimon过程中的大力支持,在学习使用过程中遇到的问题,都能在第一时间给到解惑并得到解决,我们后面也会积极参与社区的交流和建设,让paimon能为更多开发者和企业提供流批一体的数据湖解决方案。







作者简介 PROFILE

王新

数据架构师 专注于大数据流批一体架构在企业的落地实现。

周文轩

数据开发工程师 对数据分析和流计算的实时开发充满热情。


大数据流动:专注于大数据、数据治理、人工智能相关知识分享

作者独孤风,港口工人转行成为国企大数据负责人,不断自学考研考证充实自己。

提供大数据,数据治理,人工智能相关技术实践与理论学习交流群。

大数据流动,学习永不止步。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
18天前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
64 5
|
2月前
|
安全 网络协议 应用服务中间件
AJP Connector:深入解析及在Apache HTTP Server中的应用
【9月更文挑战第6天】在Java Web应用开发中,Tomcat作为广泛使用的Servlet容器,经常与Apache HTTP Server结合使用,以提供高效、稳定的Web服务。而AJP Connector(Apache JServ Protocol Connector)作为连接Tomcat和Apache HTTP Server的重要桥梁,扮演着至关重要的角色
74 2
|
17天前
|
存储 数据挖掘 数据处理
巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践
随着数据湖技术的发展,企业纷纷探索其优化潜力。本文分享了巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践。Paimon 支持流式和批处理,提供高性能、统一的数据访问和流批一体的优势。通过示例代码和实践经验,展示了如何高效处理实时数据,解决了数据一致性和故障恢复等挑战。
99 61
|
17天前
|
分布式计算 大数据 Apache
Apache Spark & Paimon Meetup · 北京站,助力 LakeHouse 架构生产落地
2024年11月15日13:30北京市朝阳区阿里中心-望京A座-05F,阿里云 EMR 技术团队联合 Apache Paimon 社区举办 Apache Spark & Paimon meetup,助力企业 LakeHouse 架构生产落地”线下 meetup,欢迎报名参加!
84 3
|
1月前
|
存储 分布式计算 druid
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
57 1
大数据-149 Apache Druid 基本介绍 技术特点 应用场景
|
1月前
|
存储 数据挖掘 数据处理
Apache Paimon 是一款高性能的数据湖框架,支持流式和批处理,适用于实时数据分析
【10月更文挑战第8天】随着数据湖技术的发展,越来越多企业开始利用这一技术优化数据处理。Apache Paimon 是一款高性能的数据湖框架,支持流式和批处理,适用于实时数据分析。本文分享了巴别时代在构建基于 Paimon 的 Streaming Lakehouse 的探索和实践经验,包括示例代码和实际应用中的优势与挑战。
63 1
|
1月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
40 3
|
2月前
|
Apache
多应用模式下,忽略项目的入口文件,重写Apache规则
本文介绍了在多应用模式下,如何通过编辑Apache的.htaccess文件来重写URL规则,从而实现忽略项目入口文件index.php进行访问的方法。
|
Java 应用服务中间件 Apache
Apache 与tomcat实现分布式应用部署
一:原理 tomcat是一个web应用服务器,能够解析静态文件和动态文件(如:html、jsp、servlet等);apache是一个web server,能够解析静态文件。Tomcat作为一个独立的web服务器是可以使用的,但是它对静态文件的解析能力不如apache,所以就产生现在的web应用的分布式部署,apache+tomcat。 两者之间的通信通过workers配置(由tomc
2161 0
|
28天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
576 13
Apache Flink 2.0-preview released

推荐镜像

更多