Flink+Paimon在阿里云大数据云原生运维数仓的实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 构建实时云原生运维数仓以提升大数据集群的运维能力,采用 Flink+Paimon 方案,解决资源审计、拓扑及趋势分析需求。

1. 背景

随着大数据产品云原生化的推进,云原生集群的规模和数量都在增加,云原生集群的运维难度也在不断增加,云原生集群的资源审计、资源拓扑、资源趋势的需要就比较迫切。云原生集群的资源审计主要是 node 资源、pod 资源,如当前集群的 node 数量以及Pod 数量;资源拓扑主要构建用户->实例->pod->node->集群的关联关系,例如一个公有云用户,提供给运维团队可以通过 uid 查询到用户实例以及实例所在的节点/集群信息。资源趋势是整个团队所管理的集群资源使用趋势,pod 数量趋势、node 数量趋势、以及用户使用资源趋势。

为了提升大数据产品云原生集群的运维能力,我们构建一个实时云原生运维数仓,业务能力提供资源审计、资源拓扑、资源趋势等能力。在数据性能方面最终数据需要具备实时性(分钟级)和一致性。

目前 Flink 已经是实时计算的标准解决方案,在这个基础上阿里云开源大数据团队推出 Paimon 为低成本的数据湖解决方案。本文我们使用 Flink+Paimon 为 流式数仓解决方案应用到云原生运维数仓中,提供实时(分钟级)的数据服务,提升云原生集群的运维能力。

2. 总体设计

2.1 方案选型

如 1 中背景所述,因为数据的实时性需求,所以我们不考虑离线数仓解决方案,直接考虑实时数仓解决方案。目前阿里云实时数仓解决方案中,比较推荐使用的是 Flink+Hologres 和 Flink+Paimon 两种解决方案。其中 Flink+Hologres 已经比较成熟,本文中我们不详细描述,其主要缺点是 Hologres 存储成本比较高。最终我们选择使用 Flink+Paimon 的方案,优势在于存储成本低,实时性高。

2.2 最终方案

数据采集采用 exporter operator 实现 kubernet 负载数据到 sls,为了适配阿里云多 region 网络隔离的问题,我们在数据出 region 之前先同步到 sls 中,再直接同步到 sls 的跨域网络同步到 paimon ods 表中,同步的过程需要处理数据排序问题。flink 作业通过运维数仓本身有比较高的实时性的要求,我们在数据处理过程中采用 Flink+Paimon 的解决方案,paimon 支持元数据注册到 maxcompute 中,可以直接使用 dataworks /flink 查询数据;最终数据报表采用 FBI 进行展示,资源拓扑数据写入到 CMDB 中给运维系统使用。

3. 落地实践

3.1 数据采集

我们研发了 exporter-operator 工具,嵌入 Kubernetes 集群中的哨兵,实时监听 Kubernetes api server 中 workload 数据变化。作为集群生态系统的 addon 组件,exporter-operator 以高效的响应机制驻留内部。

通过配置 Informer 中的自定义资源(CR),exporter-operator 灵活启用了多个 workload-informer 实例,针对各类工作负载实施监听,赋予系统卓越的灵活性,使其能够灵活监控多种工作负载信息。

exporter-operator 不断捕获并即时传输这些关键的工作负载数据,无缝对接阿里云日志服务(SLS)或 Apache Kafka 流处理平台,实现实时记录与深度分析集群运行状态,为优化集群效能提供强有力的数据支持。

3.2 数据清洗

为了数据分析可以使用 dataworks 进行分析,数据清洗前需要按照文档Paimon外部表进行配置。配置完成后创建 Paimon 表的元数据会直接同步到 dataworks 中,就可以直接进行数据分析。

3.2.1 ods 层

ods 层数据直接把 sls/kafka 中的数据同步到 paimon ods 表,保存有所有同步到的负载数据。

CREATE TABLE IF NOT EXISTS `abm-exporter-paimon`.`abm_exporter_db`.`ods_realtime_exporter_lakehouse`
 (
     `sls_time`  bigint
     ,`cluster`  varchar
     ,`content`  varchar
 ) PARTITIONED BY (cluster) with( 'orc.write.batch-size' = '128','file.format' = 'avro','bucket' = '8')
insert into `abm-exporter-paimon`.`abm_exporter_db`.`ods_realtime_exporter_lakehouse` /*+ OPTIONS('orc.write-buffer-size' = '128','file.format' = 'avro') */
select
    `__timestamp__` as sls_time
    ,`__topic__` as cluster
    ,`content`
from source_k8s_meta;

注意:

paimon 默认文件格式是 orc,orc 是列存,所以需要先在内存里攒批然后写出去。默认一批是 1024 行,目前写入数据一行几 mb ,1024 行的 buffer 就几 G 了,堆内存会不够。所以选择使用行存:设置 'file.format' = 'avro',用 avro 格式;另外也可以设置 'orc.write.batch-size' = '128',把批的大小改成 128 来解决。

3.2.2 dwd 层

dwd 层的功能有三个部分,1.将数据按照 primary key 聚合 ,确保资源数据的唯一性 2.将数据进行排序,确保 delete 的数据在最后发生 3.集群维度进行维表 join,丰富集群维度字段。

  CREATE TABLE IF NOT EXISTS `abm-exporter-paimon`.`abm_exporter_db`.`dwd_realtime_exporter_lakehouse`
 (
   type                 varchar
   ,obj                 varchar
   ,k8s_group           varchar
   ,version             varchar
   ,kind                varchar
   ,namespace           varchar
   ,name                varchar
   ,resourceversion     varchar
   ,region_id           varchar NOT NULL
   ,cluster             varchar NOT NULL
   ,watchname           varchar NOT NULL
   ,watchsetuptimestamp bigint NOT NULL
   ,uid                 varchar PRIMARY KEY NOT ENFORCED
   ,collect_time        bigint NOT NULL
   ,sls_time            bigint
   ,cluster_id          varchar
   ,region              varchar
   ,zone                varchar
 )
 with (
   'merge-engine' = 'partial-update'
   ,'changelog-producer' = 'lookup'
,'partial-update.ignore-delete'='true'
 );
set 'table.exec.sink.upsert-materialize' = 'NONE';
insert into `abm-exporter-paimon`.`abm_exporter_db`.`dwd_realtime_exporter_lakehouse`  /*+ OPTIONS('orc.write-buffer-size' = '128','file.format' = 'avro') */
select
  json_value(`content`, '$.type')             as `type`
  ,json_value(`content`, '$.obj')              as `obj`
  ,json_value(`content`, '$.group')            as `k8s_group`
  ,json_value(`content`, '$.version')          as `version`
  ,json_value(`content`, '$.kind')             as `kind`
  ,json_value(`content`, '$.namespace')        as `namespace`
  ,json_value(`content`, '$.name')             as `name`
  ,json_value(`content`, '$.resourceversion')  as `resourceversion`
  ,'cn-zhangjiakou'                               as `region_id`
  ,`cluster`                                as `cluster`
  ,json_value(`content`, '$.watchname')        as `watchname`
  ,cast(json_value(`content`, '$.watchsetuptimestamp') as bigint)    as `watchsetuptimestamp`
  ,json_value(`content`, '$.uid')              as `uid`
  ,cast(UNIX_TIMESTAMP() as bigint)                      as `collect_time`
  ,`sls_time`                            as `sls_time`
  ,NULL
  ,NULL
  ,NULL
from (
  SELECT
    *
  FROM (
    SELECT
      *
      ,ROW_NUMBER()
      OVER (
        PARTITION BY `cluster`, json_value(`content`, '$.uid')
        ORDER BY cast(json_value(`content`, '$.resourceversion') as bigint) DESC,cast(json_value(`content`, '$.__time_milli__') as bigint) DESC
      ) as rowNum
    FROM `abm-exporter-paimon`.`abm_exporter_db`.`ods_realtime_exporter_lakehouse`
  )
  WHERE rowNum = 1
);

3.2.3 ads 层

ads 层进行数据聚合,flink 处理逻辑只负责将数据写入到 Paimon 中,使用 Paimon 的merge-engine='aggregation'进行字段的聚合。这样计算逻辑不在 flink 中,不需要在 flink state 中进行,大量减少 state 的资源消耗。因为 Paimon 的计算是在 flink checkpoint 结束后触发,所以减少 flink checkpoint 的间隔时间可以提高数据的实时性。

CREATE TABLE IF NOT EXISTS `abm-exporter-paimon`.`abm_exporter_db`.`ads_realtime_exporter_lakehouse`
(
    kind          varchar
    ,cluster      varchar NOT NULL
    ,workload_num bigint
    ,PRIMARY KEY (kind, cluster) NOT ENFORCED
)
with (
    'merge-engine' = 'aggregation'
    ,'changelog-producer' = 'lookup'
    ,'fields.workload_num.aggregate-function' = 'sum'
)
;
insert into `abm-exporter-paimon`.`abm_exporter_db`.`ads_realtime_exporter_lakehouse`
select
    kind
    ,cluster
    ,case
        when type = 'ADDED' then 1
        when type = 'DELETED' then -1
        ELSE  0
    end as workload_num
from `abm-exporter-paimon`.`abm_exporter_db`.`dwd_realtime_exporter_lakehouse`;

3.3 数据分析

如果只需要查询一次的情况可以使用 dataworks 进行数据查询,当然也可以直接使用 flink 进行数据查询,当前 VVP 里面已经开发了查询的功能。如果需要生成报表就使用fbi 等工具进行数据分析,也可以使用 flink 将相关的数据清洗到 cmdb 系统中去。

use bigdata_sre;
SET odps.table.scan-options.odps.external.sub.disable.hyper=true;
SET odps.sql.common.table.planner.ext.hive.bridge = true;
set odps.sql.submit.ddltask.via.common.table=true;
SET odps.sql.hive.compatible = false;
set odps.compiler.verify=true;
set odps.isolation.session.enable=true;
-- select * from ods_realtime_exporter_lakehouse limit 100;
-- select * from dwd_realtime_exporter_lakehouse where kind='Node' limit 100;
select * from ads_realtime_exporter_lakehouse order by workload_num desc limit 100;

4. 总结

目前 Flink+Paimon 实时数据湖的方案已经比较成熟,使用成本不高,相关的生态也比较完善,在低成本和低延迟的要求下,这个方案还是非常好的选择。如果对成本没太高的要求,Flink+Hologres 在延迟方面会有些优势。

参考文件:

  1. 《Flink+Paimon构建流式数据湖仓》

    https://ata.atatech.org/articles/11000268231?spm=ata.25287382.0.0.6be65f6948opbG

  2. 《Paimon外部表》

    https://help.aliyun.com/zh/maxcompute/user-guide/paimon-external-table

  3. 《当流计算邂逅数据湖:Paimon 的前生今世》

    https://xie.infoq.cn/article/63890f6dc8afcbfaac312444f


更多内容

img


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
新用户复制点击下方链接或者扫描二维码即可0元免费试用 Flink + Paimon
实时计算 Flink 版(3000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?utm_content=g_1000395379&productCode=sc

retouch_2024070417440476.jpg

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
14天前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
544 7
阿里云实时计算Flink在多行业的应用和实践
|
9天前
|
运维 算法 数据可视化
【2021 高校大数据挑战赛-智能运维中的异常检测与趋势预测】2 方案设计与实现-Python
文章详细介绍了参加2021高校大数据挑战赛中智能运维异常检测与趋势预测任务的方案设计与Python实现,包括问题一的异常点和异常周期检测、问题二的异常预测多变量分类问题,以及问题三的多变量KPI指标预测问题的算法过程描述和代码实现。
26 0
|
3天前
|
消息中间件 大数据 Kafka
"Apache Flink:重塑大数据实时处理新纪元,卓越性能与灵活性的实时数据流处理王者"
【8月更文挑战第10天】Apache Flink以卓越性能和高度灵活性在大数据实时处理领域崭露头角。它打破批处理与流处理的传统界限,采用统一模型处理有界和无界数据流,提升了开发效率和系统灵活性。Flink支持毫秒级低延迟处理,通过时间窗口、状态管理和自动并行化等关键技术确保高性能与可靠性。示例代码展示了如何使用Flink从Kafka读取实时数据并进行处理,简明扼要地呈现了Flink的强大能力。随着技术进步,Flink将在更多场景中提供高效可靠的解决方案,持续引领大数据实时处理的发展趋势。
18 7
|
11天前
|
存储 运维 Cloud Native
"Flink+Paimon:阿里云大数据云原生运维数仓的创新实践,引领实时数据处理新纪元"
【8月更文挑战第2天】Flink+Paimon在阿里云大数据云原生运维数仓的实践
166 3
|
9天前
|
机器学习/深度学习 运维 算法
【2021 高校大数据挑战赛-智能运维中的异常检测与趋势预测】1 赛后总结与分析
对2021高校大数据挑战赛中智能运维异常检测与趋势预测赛题的赛后总结与分析,涉及赛题解析、不足与改进,并提供了异常检测、异常预测和趋势预测的方法和模型选择的讨论。
29 0
【2021 高校大数据挑战赛-智能运维中的异常检测与趋势预测】1 赛后总结与分析
|
8天前
|
消息中间件 大数据 Kafka
Apache Flink 大揭秘:征服大数据实时流处理的神奇魔法,等你来解锁!
【8月更文挑战第5天】Apache Flink 是一款强大的开源大数据处理框架,专长于实时流处理。本教程通过两个示例引导你入门:一是计算数据流中元素的平均值;二是从 Kafka 中读取数据并实时处理。首先确保已安装配置好 Flink 和 Kafka 环境。第一个 Java 示例展示了如何创建流执行环境,生成数据流,利用 `flatMap` 转换数据,并使用 `keyBy` 和 `sum` 计算平均值。第二个示例则演示了如何设置 Kafka 消费者属性,并从 Kafka 主题读取数据。这两个示例为你提供了使用 Flink 进行实时流处理的基础。随着进一步学习,你将能应对更复杂的实时数据挑战。
26 0
|
12天前
|
弹性计算 运维 搜索推荐
阿里云建站方案参考:云服务器、速成美站、企业官网区别及选择参考
随着数字化转型的浪潮不断推进,越来越多的企业和公司开始将业务迁移到云端,而搭建一个专业、高效的企业官网成为了上云的第一步。企业官网不仅是展示公司形象、产品和服务的重要窗口,更是与客户沟通、传递价值的关键渠道。随着阿里云服务器和建站产品的知名度越来越高,越来越多的用户选择阿里云的产品来搭建自己的官网。本文将深入探讨在阿里云平台上,如何选择最适合自己的建站方案:云服务器建站、云·速成美站还是云·企业官网。
阿里云建站方案参考:云服务器、速成美站、企业官网区别及选择参考
|
6天前
|
编解码 分布式计算 Linux
最新阿里云服务器、轻量应用服务器、GPU云服务器活动价格参考
阿里云服务器产品包含云服务器、轻量应用服务器、GPU云服务器等,本文汇总了这些云服务器当下最新的实时活动价格情况,包含经济型e实例云服务器价格、通用算力型u1实例云服务器价格、第七代云服务器价格、轻量应用服务器最新价格、GPU云服务器价格,以供大家参考。
最新阿里云服务器、轻量应用服务器、GPU云服务器活动价格参考
|
13天前
|
域名解析 弹性计算 数据挖掘
阿里云特惠云服务器82元、298元、99元和199元性能及选择参考
目前在阿里云的活动中有几款价格非常实惠的云服务器,现在购买2核2G轻量应用服务器最低只要82元1年,云服务器最低只要99元1年。购买2核4G轻量应用服务器最低只要298元1年,云服务器最低只要199元1年。本文为大家解析2024年阿里云这几款特惠云服务器的性能及选择参考。
阿里云特惠云服务器82元、298元、99元和199元性能及选择参考
|
1天前
|
运维 安全 网络安全
运维笔记:基于阿里云跨地域服务器通信
运维笔记:基于阿里云跨地域服务器通信
9 1

相关产品

  • 实时计算 Flink版
  • 下一篇
    云函数