摘要
本文主要介绍作为供应链物流服务商海程邦达在数字化转型过程中采用 Paimon 实现流式数仓的落地方案。我们提供一个适用于 k8s 环境并且易于上手的生产操作手册,旨在帮助读者快速掌握 Paimon 的使用方法。
- 公司业务情况介绍
- 大数据技术痛点以及选型
- 生产实践
- 问题排查分析
- 未来规划
01
公司业务情况介绍
海程邦达集团一直专注于供应链物流领域,通过打造优秀的国际化物流平台,为客户提供端到端一站式智慧型供应链物流服务。集团现有员工 2000 余人,年营业额逾 120 亿人民币,网络遍及全球 200 余个港口,在海内外有超 80 家分、子公司,助力中国企业与世界互联互通。
业务背景:
随着公司规模的不断扩大和业务复杂性的增加,为了更好地实现资源优化和流程改进,公司运营与流程管理部需要实时监控公司的业务运转情况,以确保业务流程的稳定性和高效性。
公司运营与流程管理部负责监督公司各类业务流程的执行,包括海运、空运、铁运各个大区和事业部的订单量,大客户的订单量,航线订单量,关务、仓储、陆运各个操作站点的委托量,公司当天各个大区和事业部实际收入和支出情况等。通过对这些流程的监控和分析,公司能够识别出潜在的问题和瓶颈,提出改进措施和建议,以优化公司运营效率。
数仓批量处理架构:
实时数仓架构情况:
当前系统要求直接从生产系统收集实时数据,但存在多个数据源需要进行关联查询,而帆软报表在处理多个数据源时展示不够友好,且无法再次聚合多个数据源。定时查询生产系统会给生产系统数据库带来压力,影响生产系统的稳定运行。因此,我们需要引入一个可以通过Flink CDC技术实现流式处理的数仓,以解决实时数据处理的问题。这个数仓需要能够从多个数据源收集实时数据并在此基础上实现复杂的关联SQL查询、机器学习等操作,并且可以避免不定时查询生产系统,从而减轻生产系统的压力,保障生产系统的稳定运行。
02
大数据技术痛点以及选型
海程邦达大数据团队建立以来一直以高效运维工具或平台来实现对人员的高效配置,优化重复劳动,手工作业。
在离线批数处理已能够支持集团基础驾驶舱和管理报表的情况下,集团运管部门提出了业务要实时统计订单数量,操作单量的需求,财务部门有现金流实时展示的需求,在这样的背景下,基于大数据的流批一体方案势在必行。
虽然大数据部门已经使用了Apache Doris来实现湖仓一体的存储和计算,此前已在Doris社区发表湖仓一体建设的文章,但是有些问题有待解决,流式数据存储无法复用、中间层数据不可查、做不到实时聚合计算问题。
按照架构演进时间排序,近几年通用的架构解决方案如下:
hadoop架构:
传统数仓和互联网数仓的分界点,在互联网早期的时候,大家对于数据分析的要求也不高,主要是做实时性不高的报表、支撑决策,对应的离线数据分析方案就产生了。
优点:数据类型支持丰富,支持海量运算,机器配置要求低,时效性低,容错
缺点:不支持实时;运维复杂;查询优化器不如MPP,响应慢
选型依据:不支持实时;运维复杂,不符合人员精简配置原则;性能差
lambda架构:
Lambda 架构是由 Storm 的作者 Nathan Marz 提出的一个实时大数据处理框架。Marz 在 Twitter 工作期间开发了著名的实时大数据处理框架 Storm,Lambda 架构是其根据多年进行分布式大数据系统的经验总结提炼而成。
数据流处理分为 ServingLayer、SpeedLayer、BatchLayer三层:
在Batch层主要是对离线数据进行处理,最后提供view服务给到业务;
在Speed层主要是对实时增量数据进行处理,最后提供view服务给到业务;
在Serving层主要是响应用户的请求,实现离线和增量数据的聚合计算,并最终提供服务;
优点是:离线和实时分开计算,使用两套框架,架构稳定
缺点是:离线和实时数据很难保持一致性,运维人员需要维护两套框架三层架构,开发人员需要写三套代码
选型依据:数据一致性不可控;运维、开发工作量大,不符合人员精简配置的原则;
kappa架构:
kappa架构只用一套数据流处理架构来解决离线和实时数据,用实时流来解决所有问题,旨在提供快速可靠的查询访问结果。它非常适合各种数据处理工作负载,包括连续数据管道、实时数据处理、机器学习模型和实时数据分析、物联网系统以及许多其他具有单一技术堆栈的用例。
它通常使用流处理引擎实现,例如Apache Flink、Apache Storm、Apache Kinesis、 Apache Kafka,旨在处理大量数据流并提供快速可靠的查询访问结果。
优点是:单数据流处理框架
缺点是:虽然它的架构相对lamabda架构简单,但是流式处理框架的设置和维护相对复杂,不具备真正意义上的离线数据处理能力;流平台中存储大数据成本高昂
选型依据:离线数据处理能力需要保留,控制成本
Iceberg
为此我们也调研了Iceberg,它的快照功能一定程度上能够实现流批一体,但是它的问题是基于kafka做的实时表中间层不可查或者无法复用已经存在的表,对kafka有强依赖,需要利用kafka将中间结果写到iceberg表,增加了系统的复杂度和可维护性。
选型依据:无kafka实时架构已落地,中间数据无法实现可查可复用
流式数仓(kappa架构的延续)
海程邦达大数据团队自FTS0.3.0版本开始参与流式数仓建设,旨在进一步降低数据处理框架的复杂度和人员的精简配置,前期的宗旨是既然是趋势就要参与进来,过程中不断学习精进,向最前沿的技术靠拢,团队一致认为有坑就踩坑,摸着石头也要过河,好在经过几个版本的迭代,在社区的高效配合下,最开始出现的问题也慢慢得以解决
流式数仓架构如下:
延续了kappa架构的特点,一套流处理架构,好处在与,底层paimon的技术支撑使得数据在全链路可查,数仓分层架构得以复用,同时兼顾了离线和实时的处理能力,减少存储和计算的浪费
03
生产实践
本方案采用 Flink Application On K8s 集群,Flink CDC 实时摄取业务系统关系型数据库数据,通过 streampark 任务平台提交 Flink+Paimon Streaming Data Warehouse 任务, 最后采用 Trino 引擎接入 Finereport 提供服务和开发人员的查询。paimon底层存储支持 S3 协议,因为公司大数据服务依赖于阿里云所以使用对象存储OSS作为数据文件系统。
形成一个全链路实时流动、可查、分层可复用的Pipline
架构图:
主要采用组件版本如下:
flink-1.16.0-scala-2.12
paimon-flink-1.16-0.4-20230424.001927-40.jar
apache-streampark_2.12-2.0.0
kubernetes v1.18.3
环境构建
下载 flink-1.16.0-scala-2.12.tar.gz 可以在 flink官网下载对应版本的安装包到streampark服务器
#解压 tar zxvf flink-1.16.0-scala-2.12.tar.gz #修改 flink-conf 配置文件并启动集群 vim flink-1.16.0-scala-2.12/conf/flink-conf.yaml 文件,按如下配置修改 jobmanager.rpc.address: localhost jobmanager.rpc.port: 6123 jobmanager.bind-host: localhost jobmanager.memory.process.size: 4096m taskmanager.bind-host: localhost taskmanager.host: localhost taskmanager.memory.process.size: 4096m taskmanager.numberOfTaskSlots: 4 parallelism.default: 4 akka.ask.timeout: 100s web.timeout: 1000000 #checkpoints&&savepoints state.checkpoints.dir: file:///opt/flink/checkpoints state.savepoints.dir: file:///opt/flink/savepoints execution.checkpointing.interval: 2min #当作业手动取消/暂停时,将会保留作业的 Checkpoint 状态信息 execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION state.backend: rocksdb 已完成的 cp 保存个数 state.checkpoints.num-retained: 2000 state.backend.incremental: true execution.checkpointing.checkpoints-after-tasks-finish.enabled: true #OSS fs.oss.endpoint: oss-cn-zhangjiakou-internal.aliyuncs.com fs.oss.accessKeyId: xxxxxxxxxxxxxxxxxxxxxxx fs.oss.accessKeySecret: xxxxxxxxxxxxxxxxxxxxxxx fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem jobmanager.execution.failover-strategy: region rest.port: 8081 rest.address: localhost
建议可以在本地添加 FLINK_HOME 方便在上 k8s 之前本地排查问题使用
vim /etc/profile
#FLINK export FLINK_HOME=/data/src/flink-1.16.0-scala-2.12 export PATH=$PATH:$FLINK_HOME/bin source /etc/profile