-- 在paimon-dwd层创建宽表 CREATE TABLE IF NOT EXISTS dwd.`dwd_business_order` ( `reference_no` varchar(50) NOT NULL COMMENT '委托单号主键', `bondex_shy_flag` varchar(8) NOT NULL COMMENT '区分', `is_server_item` int NOT NULL COMMENT '是否已经关联订单', `order_type_name` varchar(50) NOT NULL COMMENT '业务分类', `consignor_date` DATE COMMENT '统计日期', `consignor_code` varchar(50) COMMENT '客户编号', `consignor_name` varchar(160) COMMENT '客户名称', `sales_code` varchar(32) NOT NULL COMMENT '销售编号', `sales_name` varchar(200) NOT NULL COMMENT '销售名称', `delivery_center_op_id` varchar(32) NOT NULL COMMENT '交付编号', `delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称', `pol_code` varchar(100) NOT NULL COMMENT '起运港代码', `pot_code` varchar(100) NOT NULL COMMENT '中转港代码', `port_of_dest_code` varchar(100) NOT NULL COMMENT '目的港代码', `is_delete` int not NULL COMMENT '是否作废', `order_status` varchar(8) NOT NULL COMMENT '订单状态', `count_order` BIGINT not NULL COMMENT '订单数', `o_year` BIGINT NOT NULL COMMENT '分区字段', `create_date` timestamp NOT NULL COMMENT '创建时间', PRIMARY KEY (`o_year`,`reference_no`,`bondex_shy_flag`) NOT ENFORCED ) PARTITIONED BY (`o_year`) WITH ( -- 每个 partition 下设置 2 个 bucket 'bucket' = '2', 'changelog-producer' = 'full-compaction', 'snapshot.time-retained' = '2h', 'changelog-producer.compaction-interval' = '2m' ); -- 设置作业名,将ods层的相关业务表合并写入到dwd层 SET 'pipeline.name' = 'dwd_business_order'; INSERT INTO dwd.`dwd_business_order` SELECT o.doccode, ......, YEAR (o.docdate) AS o_year ,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date` FROM ods.ods_shy_jh_doc_hdworkdochd o INNER JOIN ods.ods_shy_base_enterprise en ON o.businessguid = en.entguid LEFT JOIN dim.dim_hhl_user_code sales ON o.salesguid = sales.USER_GUID LEFT JOIN dim.dim_hhl_user_code op ON o.bookingguid = op.USER_GUID UNION ALL SELECT business_no, ......, YEAR ( gmt_create ) AS o_year ,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date` FROM ods.ods_bondexsea_doc_order UNION ALL SELECT HBLIndex, ......, YEAR ( CreateOPDate ) AS o_year ,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date` FROM ods.`ods_airsea_airfreight_orderhawb` ;
flink ui可以看到ods数据经过paimon实时join清洗到表dwd_business_order/
3. 将dwd层数据轻度聚合到dwm层中,将相关数据写入
dwm.`dwm_business_order_count`表中,该表数据会根据主键对聚合字段做sum,sum_orderCount字段就是聚合结果,物理删除的数据sum时paimon会自动处理。
-- 创建dwm层轻度汇总表,根据日期、销售、操作、业务类别、客户、起运港、目的港汇总单量 CREATE TABLE IF NOT EXISTS dwm.`dwm_business_order_count` ( `l_year` BIGINT NOT NULL COMMENT '统计年', `l_month` BIGINT NOT NULL COMMENT '统计月', `l_date` DATE NOT NULL COMMENT '统计日期', `bondex_shy_flag` varchar(8) NOT NULL COMMENT '区分', `order_type_name` varchar(50) NOT NULL COMMENT '业务分类', `is_server_item` int NOT NULL COMMENT '是否已经关联订单', `customer_code` varchar(50) NOT NULL COMMENT '客户编号', `sales_code` varchar(50) NOT NULL COMMENT '销售编号', `delivery_center_op_id` varchar(50) NOT NULL COMMENT '交付编号', `pol_code` varchar(100) NOT NULL COMMENT '起运港代码', `pot_code` varchar(100) NOT NULL COMMENT '中转港代码', `port_of_dest_code` varchar(100) NOT NULL COMMENT '目的港代码', `customer_name` varchar(200) NOT NULL COMMENT '客户名称', `sales_name` varchar(200) NOT NULL COMMENT '销售名称', `delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称', `sum_orderCount` BIGINT NOT NULL COMMENT '订单数', `create_date` timestamp NOT NULL COMMENT '创建时间', PRIMARY KEY (`l_year`, `l_month`,`l_date`,`order_type_name`,`bondex_shy_flag`,`is_server_item`,`customer_code`,`sales_code`,`delivery_center_op_id`,`pol_code`,`pot_code`,`port_of_dest_code`) NOT ENFORCED ) WITH ( 'changelog-producer' = 'full-compaction', 'changelog-producer.compaction-interval' = '2m', 'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum 'fields.sum_orderCount.aggregate-function' = 'sum', 'fields.create_date.ignore-retract'='true', 'fields.sales_name.ignore-retract'='true', 'fields.customer_name.ignore-retract'='true', 'snapshot.time-retained' = '2h', 'fields.delivery_center_op_name.ignore-retract'='true' ); -- 设置作业名 SET 'pipeline.name' = 'dwm_business_order_count'; INSERT INTO dwm.`dwm_business_order_count` SELECT YEAR(o.`consignor_date`) AS `l_year` ,MONTH(o.`consignor_date`) AS `l_month` ......, ,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS create_date FROM dwd.`dwd_business_order` o ;
Flink UI效果如下dwd_business_orders数据聚合写到dwm_business_order_count:
4. 将dwm层数据聚合到dws层中,dws层是做了更小维度的汇总
-- 创建根据操作人、业务类型聚合当天的单量 CREATE TABLE IF NOT EXISTS dws.`dws_business_order_count_op` ( `l_year` BIGINT NOT NULL COMMENT '统计年', `l_month` BIGINT NOT NULL COMMENT '统计月', `l_date` DATE NOT NULL COMMENT '统计日期', `order_type_name` varchar(50) NOT NULL COMMENT '业务分类', `delivery_center_op_id` varchar(50) NOT NULL COMMENT '交付编号', `delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称', `sum_orderCount` BIGINT NOT NULL COMMENT '订单数', `create_date` timestamp NOT NULL COMMENT '创建时间', PRIMARY KEY (`l_year`, `l_month`,`l_date`,`order_type_name`,`delivery_center_op_id`) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum 'fields.sum_orderCount.aggregate-function' = 'sum', 'fields.create_date.ignore-retract'='true', 'snapshot.time-retained' = '2h', 'fields.delivery_center_op_name.ignore-retract'='true' ); -- 设置作业名 SET 'pipeline.name' = 'dws_business_order_count_op'; INSERT INTO dws.`dws_business_order_count_op` SELECT o.`l_year` ,o.`l_month` ,o.`l_date` ,o.`order_type_name` ,o.`delivery_center_op_id` ,o.`delivery_center_op_name` ,o.`sum_orderCount` ,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS create_date FROM dwm.`dwm_business_order_count` o ;
Flink UI效果如下 dws_business_order_count_op数据写到dws_business_order_count_op:
总体数据流转示例
源表:
paimon-ods:
paimon-dwd:
paimon-dwm:
paimon-dws:
特别提醒sqlserver数据库抽取时如果源表数据量过大全量抽取会锁表,建议在业务允许的情况下采用增量抽取。对于全量抽取sqlserver可以采用中转的方式sqlserver全量数据导入到mysql,从mysql再到paimon-ods,后面再通过sqlserever做增量抽取。
04
问题排查分析
1. 聚合数据计算不准
sqlserver cdc 采集数据到 paimon 表
说明:
dwd 表:
'changelog-producer' = 'input'
ads 表:
'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum
'fields.sum_amount.aggregate-function' = 'sum'
ADS 层聚合表采用 agg sum 会出现 dwd 数据流不产生 update_before,产生错误数据流 update_after 比如上游源表 update 10 到 30 dwd 层数据会变更为 30,ads 聚合层数据也会变更为 30,但是现在变为了 append 数据变成了 10+30=40 的错误数据。
解决办法:
By specifying 'changelog-producer' = 'full-compaction', Table Store will compare the results between full compactions and produce the differences as changelog. The latency of changelog is affected by the frequency of full compactions.
By specifying changelog-producer.compaction-interval table property (default value 30min), users can define the maximum interval between two full compactions to ensure latency. This table property does not affect normal compactions and they may still be performed once in a while by writers to reduce reader costs.
这样能解决上述问题。但是随之而来出现了新的问题。默认 changelog-producer.compaction-interval 是 30min,意味着 上游的改动到 ads 查询要间隔 30min,生产过程中发现将压缩间隔时间改成 1min 或者 2 分钟的情况下,又会出现上述 ADS 层聚合数据不准的情况。
'changelog-producer.compaction-interval' = '2m'
需要在写入 Flink Table Store 时需要配置 table.exec.sink.upsert-materialize= none,避免产生 Upsert 流,以保证 Flink Table Store 中能够保存完整的 changelog,为后续的流读操作做准备。
set 'table.exec.sink.upsert-materialize' = 'none'
2. 相同 sequence.field 导致 dwd 明细宽表无法收到 update 数据更新
mysql cdc 采集数据到 paimon 表
说明:
在 MySQL 源端执行 update
数据修改成功后,dwd_orders 表数据能同步成功
但是查看 dwd_enriched_orders 表数据无法同步,启动流模式查看数据,发现没有数据流向
解决:
排查发现是由于配置了参数 'sequence.field' = 'o_orderdate'(使用 o_orderdate 生成 sequence id,相同主键合并时选择 sequence id 更大的记录)导致的,因为在修改价格的时候 o_orderdate 字段时间不变, 继而'sequence.field' 是相同的,导致顺序不确定,所以 ROW1 和 ROW2,它们的 o_orderdate 是一样的,所以在更新时会随机选择,所有将该参数去掉即可,去掉后正常按照输入的先后顺序,自动生成一个 sequence number,不会影响同步结果。
3. Aggregate function 'last_non_null_value' does not support retraction
报错:Caused by: java.lang.UnsupportedOperationException: Aggregate function 'last_non_null_value' does not support retraction, If you allow this function to ignore retraction messages, you can configure 'fields.${field_name}.ignore-retract'='true'.
可以在官方文档找到解释:
Only sum supports retraction (UPDATE_BEFORE and DELETE), others aggregate functions do not support retraction.
可以理解为:除了 SUM 函数,其他的 Agg 函数都不支持 Retraction,为了避免接收到 DELETE 和 UPDATEBEFORE 消息报错,需要通过给指定字段配'fields.${field_name}.ignore-retract'='true' 忽略,解决这个报错
WITH (
'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum
'fields.sum_orderCount.aggregate-function' = 'sum',
'fields.create_date.ignore-retract'='true' #create_date 字段
);
4. paimon任务中断失败
任务异常中断 导致pod挂掉
查看loki日志显示akka.pattern.AskTimeoutException: Ask timed out on
java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(JobMasterGateway.updateTaskExecutionState(TaskExecutionState))] at recipient [akka.tcp://flink@fts-business-order-count.streamx:6123/user/rpc/jobmanager_2] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.\n"
初步判断应该是由于以上2个原因导致触发了akka的超时机制,那就调整集群的akka超时间配置和进行单个任务拆分或者调大资源配置。
我们这边先看如何进行参数修改:
key |
default |
describe |
akka.ask.timeout |
10s |
Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d). |
web.timeout
|
600000
|
Timeout for asynchronous operations by the web monitor in milliseconds. |
在conf/flink-conf.yaml最后增加下面参数
akka.ask.timeout: 100s
web.timeout:1000000
然后在streampark手动刷新下flink-conf.yaml验证参数是否同步成功。
5. snapshot no such file or director
发现cp出现失败情况
对应时间点日志显示Snapshot丢失,任务显示为running状态,但是源表mysql数据无法写入paimon ods表
定位cp失败原因为:计算量大,CPU密集性,导致TM内线程一直在processElement,而没有时间做CP
无法读取Snapshot原因为:flink集群资源不够,Writer和Committer产生竞争,Full-Compaction时读到了已过期部分的不完整的Snapshot,目前官方针对这个问题已经修复
https://github.com/apache/incubator-paimon/pull/1308
而解决cp失败的解决办法增加并行度,增加deploymenttaskmanager slot和jobmanager cpu
-D kubernetes.jobmanager.cpu=0.8
-D kubernetes.jobmanager.cpu.limit-factor=1
-D taskmanager.numberOfTaskSlots=8
-D jobmanager.adaptive-batch-scheduler.default-source-parallelism=2
在复杂的实时任务中,可以通过修改动态参数的方式,增加资源。
05
未来规划
- 自建的数据平台bondata正在集成paimon的元数据信息、数据指标体系、血缘、一键pipline等功能,形成海程邦达的数据资产,并将在此基础上展开一站式数据治理
- 后面将基于trino Catalog接入Doris,实现真正的离线数据和实时数据的one service
- 采用doris+paimon的架构方案继续推进集团内部流批一体数仓建设的步伐
在这里要感谢之信老师在使用paimon过程中的大力支持,在学习使用过程中遇到的问题,都能在第一时间给到解惑并得到解决,我们后面也会积极参与社区的交流和建设,让paimon能为更多开发者和企业提供流批一体的数据湖解决方案。
作者简介 PROFILE
王新
数据架构师 专注于大数据流批一体架构在企业的落地实现。
周文轩
数据开发工程师 对数据分析和流计算的实时开发充满热情。
大数据流动:专注于大数据、数据治理、人工智能相关知识分享。
作者独孤风,港口工人转行成为国企大数据负责人,不断自学考研考证充实自己。
提供大数据,数据治理,人工智能相关技术实践与理论学习交流群。
大数据流动,学习永不止步。