海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用(下)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS SQL Server,基础系列 2核4GB
实时计算 Flink 版,1000CU*H 3个月
简介: 海程邦达基于Apache Paimon+Streampark实现 Streaming warehouse的实战应用
-- 在paimon-dwd层创建宽表
CREATE TABLE IF NOT EXISTS dwd.`dwd_business_order` (
`reference_no` varchar(50) NOT NULL COMMENT '委托单号主键',
`bondex_shy_flag` varchar(8) NOT NULL COMMENT '区分',
`is_server_item` int NOT NULL COMMENT '是否已经关联订单',
`order_type_name` varchar(50) NOT NULL COMMENT '业务分类',
`consignor_date` DATE COMMENT '统计日期',
`consignor_code` varchar(50) COMMENT '客户编号',
`consignor_name` varchar(160) COMMENT '客户名称',
`sales_code` varchar(32) NOT NULL COMMENT '销售编号',
`sales_name` varchar(200) NOT NULL COMMENT '销售名称',
`delivery_center_op_id` varchar(32) NOT NULL COMMENT '交付编号',
`delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称',
`pol_code` varchar(100) NOT NULL COMMENT '起运港代码',
`pot_code` varchar(100) NOT NULL COMMENT '中转港代码',
`port_of_dest_code` varchar(100) NOT NULL  COMMENT '目的港代码',
`is_delete` int not NULL COMMENT '是否作废',
`order_status` varchar(8) NOT NULL COMMENT '订单状态',
`count_order` BIGINT not NULL COMMENT '订单数',
`o_year` BIGINT NOT NULL COMMENT '分区字段',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`o_year`,`reference_no`,`bondex_shy_flag`) NOT ENFORCED
) PARTITIONED BY (`o_year`)
WITH (
-- 每个 partition 下设置 2 个 bucket
'bucket' = '2',
'changelog-producer' = 'full-compaction',
'snapshot.time-retained' = '2h',
'changelog-producer.compaction-interval' = '2m'
);
-- 设置作业名,将ods层的相关业务表合并写入到dwd层
SET 'pipeline.name' = 'dwd_business_order';
INSERT INTO
dwd.`dwd_business_order`
SELECT
o.doccode,
......,
YEAR (o.docdate) AS o_year
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
ods.ods_shy_jh_doc_hdworkdochd o
INNER JOIN ods.ods_shy_base_enterprise en ON o.businessguid = en.entguid
LEFT JOIN dim.dim_hhl_user_code sales ON o.salesguid = sales.USER_GUID
LEFT JOIN dim.dim_hhl_user_code op ON o.bookingguid = op.USER_GUID
UNION ALL
SELECT
business_no,
......,
YEAR ( gmt_create ) AS o_year
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
ods.ods_bondexsea_doc_order
UNION ALL
SELECT
  HBLIndex,
 ......,
  YEAR ( CreateOPDate ) AS o_year
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date`
FROM
  ods.`ods_airsea_airfreight_orderhawb`
;

flink ui可以看到ods数据经过paimon实时join清洗到表dwd_business_order/



3. 将dwd层数据轻度聚合到dwm层中,将相关数据写入

dwm.`dwm_business_order_count`表中,该表数据会根据主键对聚合字段做sum,sum_orderCount字段就是聚合结果,物理删除的数据sum时paimon会自动处理。

-- 创建dwm层轻度汇总表,根据日期、销售、操作、业务类别、客户、起运港、目的港汇总单量
CREATE TABLE IF NOT EXISTS dwm.`dwm_business_order_count` (
`l_year` BIGINT NOT NULL COMMENT '统计年',
`l_month` BIGINT NOT NULL COMMENT '统计月',
`l_date` DATE NOT NULL  COMMENT '统计日期',
`bondex_shy_flag` varchar(8) NOT NULL COMMENT '区分',
`order_type_name` varchar(50) NOT NULL COMMENT '业务分类',
`is_server_item` int NOT NULL COMMENT '是否已经关联订单',
`customer_code` varchar(50) NOT NULL COMMENT '客户编号',
`sales_code` varchar(50) NOT NULL COMMENT '销售编号',
`delivery_center_op_id` varchar(50) NOT NULL COMMENT '交付编号',
`pol_code` varchar(100) NOT NULL COMMENT '起运港代码',
`pot_code` varchar(100) NOT NULL COMMENT '中转港代码',
`port_of_dest_code` varchar(100) NOT NULL COMMENT '目的港代码',
`customer_name` varchar(200) NOT NULL COMMENT '客户名称',
`sales_name` varchar(200) NOT NULL COMMENT '销售名称',
`delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称',
`sum_orderCount` BIGINT NOT NULL COMMENT '订单数',
`create_date` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`l_year`, `l_month`,`l_date`,`order_type_name`,`bondex_shy_flag`,`is_server_item`,`customer_code`,`sales_code`,`delivery_center_op_id`,`pol_code`,`pot_code`,`port_of_dest_code`) NOT ENFORCED
) WITH (
'changelog-producer' = 'full-compaction',
  'changelog-producer.compaction-interval' = '2m',
  'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum
  'fields.sum_orderCount.aggregate-function' = 'sum',
  'fields.create_date.ignore-retract'='true',
  'fields.sales_name.ignore-retract'='true',
  'fields.customer_name.ignore-retract'='true',
  'snapshot.time-retained' = '2h',
'fields.delivery_center_op_name.ignore-retract'='true'
);
-- 设置作业名
SET 'pipeline.name' = 'dwm_business_order_count';
INSERT INTO
dwm.`dwm_business_order_count`
SELECT
YEAR(o.`consignor_date`) AS `l_year`
,MONTH(o.`consignor_date`) AS `l_month`
......,
,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS create_date
FROM
dwd.`dwd_business_order` o
;

Flink UI效果如下dwd_business_orders数据聚合写到dwm_business_order_count:



4. 将dwm层数据聚合到dws层中,dws层是做了更小维度的汇总

-- 创建根据操作人、业务类型聚合当天的单量
CREATE TABLE IF NOT EXISTS dws.`dws_business_order_count_op` (
  `l_year` BIGINT NOT NULL COMMENT '统计年',
  `l_month` BIGINT NOT NULL COMMENT '统计月',
  `l_date` DATE NOT NULL  COMMENT '统计日期',
  `order_type_name` varchar(50) NOT NULL COMMENT '业务分类',
  `delivery_center_op_id` varchar(50) NOT NULL COMMENT '交付编号',
  `delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称',
  `sum_orderCount` BIGINT NOT NULL COMMENT '订单数',
  `create_date` timestamp NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`l_year`, `l_month`,`l_date`,`order_type_name`,`delivery_center_op_id`) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum
  'fields.sum_orderCount.aggregate-function' = 'sum',
  'fields.create_date.ignore-retract'='true',
  'snapshot.time-retained' = '2h',
  'fields.delivery_center_op_name.ignore-retract'='true'
);
-- 设置作业名
SET 'pipeline.name' = 'dws_business_order_count_op';
INSERT INTO
  dws.`dws_business_order_count_op`
SELECT
  o.`l_year`
  ,o.`l_month`
  ,o.`l_date`
  ,o.`order_type_name`
  ,o.`delivery_center_op_id`
  ,o.`delivery_center_op_name`
  ,o.`sum_orderCount`
  ,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS create_date
FROM
  dwm.`dwm_business_order_count` o
;

Flink UI效果如下 dws_business_order_count_op数据写到dws_business_order_count_op:

总体数据流转示例

源表:

paimon-ods:

paimon-dwd:

paimon-dwm:

paimon-dws:

特别提醒sqlserver数据库抽取时如果源表数据量过大全量抽取会锁表,建议在业务允许的情况下采用增量抽取。对于全量抽取sqlserver可以采用中转的方式sqlserver全量数据导入到mysql,从mysql再到paimon-ods,后面再通过sqlserever做增量抽取。


04

问题排查分析


1. 聚合数据计算不准


sqlserver cdc 采集数据到 paimon 表

说明:

dwd 表:

'changelog-producer' = 'input'

ads 表:

'merge-engine' = 'aggregation',   -- 使用 aggregation 聚合计算 sum

'fields.sum_amount.aggregate-function' = 'sum'


ADS 层聚合表采用 agg sum 会出现 dwd 数据流不产生 update_before,产生错误数据流 update_after  比如上游源表 update 10 到 30  dwd 层数据会变更为 30,ads 聚合层数据也会变更为 30,但是现在变为了 append 数据变成了 10+30=40 的错误数据。


解决办法:

By specifying 'changelog-producer' = 'full-compaction', Table Store will compare the results between full compactions and produce the differences as changelog. The latency of changelog is affected by the frequency of full compactions.

By specifying changelog-producer.compaction-interval table property (default value 30min), users can define the maximum interval between two full compactions to ensure latency. This table property does not affect normal compactions and they may still be performed once in a while by writers to reduce reader costs.


这样能解决上述问题。但是随之而来出现了新的问题。默认 changelog-producer.compaction-interval 是 30min,意味着 上游的改动到 ads 查询要间隔 30min,生产过程中发现将压缩间隔时间改成 1min 或者 2 分钟的情况下,又会出现上述 ADS 层聚合数据不准的情况。


'changelog-producer.compaction-interval' = '2m'


需要在写入 Flink Table Store 时需要配置 table.exec.sink.upsert-materialize= none,避免产生 Upsert 流,以保证 Flink Table Store 中能够保存完整的 changelog,为后续的流读操作做准备。


set 'table.exec.sink.upsert-materialize' = 'none'


2. 相同 sequence.field 导致 dwd 明细宽表无法收到 update 数据更新

mysql cdc 采集数据到 paimon 表


说明:

在 MySQL 源端执行 update

数据修改成功后,dwd_orders 表数据能同步成功


但是查看 dwd_enriched_orders 表数据无法同步,启动流模式查看数据,发现没有数据流向



解决:

排查发现是由于配置了参数 'sequence.field' = 'o_orderdate'(使用 o_orderdate 生成 sequence id,相同主键合并时选择 sequence id 更大的记录)导致的,因为在修改价格的时候 o_orderdate 字段时间不变, 继而'sequence.field' 是相同的,导致顺序不确定,所以 ROW1 和 ROW2,它们的 o_orderdate 是一样的,所以在更新时会随机选择,所有将该参数去掉即可,去掉后正常按照输入的先后顺序,自动生成一个 sequence number,不会影响同步结果。


3. Aggregate function 'last_non_null_value' does not support retraction


报错:Caused by: java.lang.UnsupportedOperationException: Aggregate function 'last_non_null_value' does not support retraction, If you allow this function to ignore retraction messages, you can configure 'fields.${field_name}.ignore-retract'='true'.


可以在官方文档找到解释:

Only sum supports retraction (UPDATE_BEFORE and DELETE), others aggregate functions do not support retraction.


可以理解为:除了 SUM 函数,其他的 Agg 函数都不支持 Retraction,为了避免接收到 DELETE 和 UPDATEBEFORE 消息报错,需要通过给指定字段配'fields.${field_name}.ignore-retract'='true' 忽略,解决这个报错

WITH (

'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum

'fields.sum_orderCount.aggregate-function' = 'sum',

'fields.create_date.ignore-retract'='true'  #create_date 字段

);


4. paimon任务中断失败


任务异常中断 导致pod挂掉


查看loki日志显示akka.pattern.AskTimeoutException: Ask timed out on


java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(JobMasterGateway.updateTaskExecutionState(TaskExecutionState))] at recipient [akka.tcp://flink@fts-business-order-count.streamx:6123/user/rpc/jobmanager_2] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.\n"


初步判断应该是由于以上2个原因导致触发了akka的超时机制,那就调整集群的akka超时间配置和进行单个任务拆分或者调大资源配置。


我们这边先看如何进行参数修改:


key

default

describe

akka.ask.timeout

10s

Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d).

web.timeout

 

600000

 

Timeout for asynchronous operations by the web monitor in milliseconds.


在conf/flink-conf.yaml最后增加下面参数

akka.ask.timeout: 100s

web.timeout:1000000

然后在streampark手动刷新下flink-conf.yaml验证参数是否同步成功。


5. snapshot no such file or director


发现cp出现失败情况

对应时间点日志显示Snapshot丢失,任务显示为running状态,但是源表mysql数据无法写入paimon ods表

定位cp失败原因为:计算量大,CPU密集性,导致TM内线程一直在processElement,而没有时间做CP

无法读取Snapshot原因为:flink集群资源不够,Writer和Committer产生竞争,Full-Compaction时读到了已过期部分的不完整的Snapshot,目前官方针对这个问题已经修复


https://github.com/apache/incubator-paimon/pull/1308



而解决cp失败的解决办法增加并行度,增加deploymenttaskmanager slot和jobmanager cpu


-D kubernetes.jobmanager.cpu=0.8

-D kubernetes.jobmanager.cpu.limit-factor=1

-D taskmanager.numberOfTaskSlots=8

-D jobmanager.adaptive-batch-scheduler.default-source-parallelism=2



在复杂的实时任务中,可以通过修改动态参数的方式,增加资源。


05

未来规划


  • 自建的数据平台bondata正在集成paimon的元数据信息、数据指标体系、血缘、一键pipline等功能,形成海程邦达的数据资产,并将在此基础上展开一站式数据治理
  • 后面将基于trino Catalog接入Doris,实现真正的离线数据和实时数据的one service
  • 采用doris+paimon的架构方案继续推进集团内部流批一体数仓建设的步伐



在这里要感谢之信老师在使用paimon过程中的大力支持,在学习使用过程中遇到的问题,都能在第一时间给到解惑并得到解决,我们后面也会积极参与社区的交流和建设,让paimon能为更多开发者和企业提供流批一体的数据湖解决方案。







作者简介 PROFILE

王新

数据架构师 专注于大数据流批一体架构在企业的落地实现。

周文轩

数据开发工程师 对数据分析和流计算的实时开发充满热情。


大数据流动:专注于大数据、数据治理、人工智能相关知识分享

作者独孤风,港口工人转行成为国企大数据负责人,不断自学考研考证充实自己。

提供大数据,数据治理,人工智能相关技术实践与理论学习交流群。

大数据流动,学习永不止步。

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
相关文章
SQL 关系型数据库 MySQL
282 0
|
3月前
|
存储 分布式计算 Apache
湖仓一体:小米集团基于 Apache Doris + Apache Paimon 实现 6 倍性能飞跃
小米通过将 Apache Doris(数据库)与 Apache Paimon(数据湖)深度融合,不仅解决了数据湖分析的性能瓶颈,更实现了 “1+1>2” 的协同效应。在这些实践下,小米在湖仓数据分析场景下获得了可观的业务收益。
779 9
湖仓一体:小米集团基于 Apache Doris + Apache Paimon 实现 6 倍性能飞跃
|
5月前
|
人工智能 运维 监控
Aipy实战:分析apache2日志中的网站攻击痕迹
Apache2日志系统灵活且信息全面,但安全分析、实时分析和合规性审计存在较高技术门槛。为降低难度,可借助AI工具如aipy高效分析日志,快速发现攻击痕迹并提供反制措施。通过结合AI与学习技术知识,新手运维人员能更轻松掌握复杂日志分析任务,提升工作效率与技能水平。
|
8月前
|
Java 网络安全 Apache
SshClient应用指南:使用org.apache.sshd库在服务器中执行命令。
总结起来,Apache SSHD库是一个强大的工具,甚至可以用于创建你自己的SSH Server。当你需要在服务器中执行命令时,这无疑是非常有用的。希望这个指南能对你有所帮助,并祝你在使用Apache SSHD库中有一个愉快的旅程!
524 29
|
8月前
|
SQL 分布式计算 流计算
官宣|Apache Paimon 1.0 发布公告
官宣|Apache Paimon 1.0 发布公告
555 8
|
8月前
|
存储 分布式数据库 Apache
小米基于 Apache Paimon 的流式湖仓实践
小米基于 Apache Paimon 的流式湖仓实践
223 0
小米基于 Apache Paimon 的流式湖仓实践
|
9月前
|
存储 分布式数据库 Apache
小米基于 Apache Paimon 的流式湖仓实践
本文整理自Flink Forward Asia 2024流式湖仓专场分享,由计算平台软件研发工程师钟宇江主讲。内容涵盖三部分:1)背景介绍,分析当前实时湖仓架构(如Flink + Talos + Iceberg)的痛点,包括高成本、复杂性和存储冗余;2)基于Paimon构建近实时数据湖仓,介绍其LSM存储结构及应用场景,如Partial-Update和Streaming Upsert,显著降低计算和存储成本,简化架构;3)未来展望,探讨Paimon在流计算中的进一步应用及自动化维护服务的建设。
529 0
小米基于 Apache Paimon 的流式湖仓实践
|
12月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
450 1
|
分布式计算 大数据 Apache
Apache Spark & Paimon Meetup · 北京站,助力 LakeHouse 架构生产落地
2024年11月15日13:30北京市朝阳区阿里中心-望京A座-05F,阿里云 EMR 技术团队联合 Apache Paimon 社区举办 Apache Spark & Paimon meetup,助力企业 LakeHouse 架构生产落地”线下 meetup,欢迎报名参加!
398 59
|
存储 数据挖掘 数据处理
巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践
随着数据湖技术的发展,企业纷纷探索其优化潜力。本文分享了巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践。Paimon 支持流式和批处理,提供高性能、统一的数据访问和流批一体的优势。通过示例代码和实践经验,展示了如何高效处理实时数据,解决了数据一致性和故障恢复等挑战。
309 61

推荐镜像

更多
下一篇
oss云网关配置