开源湖仓一体平台(二):Arctic(上篇)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 开源湖仓一体平台(二):Arctic(上篇)

Arctic Introduction
Catalogs
Table Format
self-optimizing
Table WaterMark

Deployment

Configurations

CDC Ingestion

spark Ingestion

Flink Ingestion

Mpp Ingestion



简介


Arctic 是一个开放式架构下的湖仓管理系统,在开放的数据湖格式之上,Arctic 提供更多面向流和更新场景的优化,以及一套可插拔的数据自优化机制和管理服务。基于 Arctic 可以帮助各类数据平台,工具和产品快速搭建开箱即用,流批统一的湖仓。

Arctic 开放式架构如下所示:

目前 Arctic 是搭建在 Iceberg format 之上的湖仓管理系统,得益于 Apache Iceberg 繁荣的生态,可以使用 Flink、Spark、Trino、Impala 等多种引擎在私有化场景和各个公有云平台上使用 Arctic,Arctic 的核心组件有:

  • AMS — Arctic Management Service,负责提供湖仓管理功能,调度自优化任务,AMS 可以同时管理 Hive 表和新型表格式,像 HMS 一样具备元数据存储和管理的功能,可以和 HMS 或其他 Metastore 协同使用,AMS 配套的 dashboard 可执行管理操作,查看 metrics,对计算资源和数据资源的伸缩做出决策。

Arctic 为流和更新的场景提供更多可插拔的组件:

  • Optimizers — 数据自优化的执行引擎插件,为流式湖仓数据异步地执行合并、排序、去重,layout 优化,optimizer 被设计为湖仓中的后台执行器,执行过程像虚拟机中的垃圾回收器一样对用户透明。
  • LogStore — Arctic 内置的 mixed streaming format 可以在表上配置 LogStore,利用 Kafka、Pulsar 等消息队列为实时数据加工提供毫秒到秒级的 SLA,LogStore 本身也会作为一种表格式使用。
  • Kyuubi — Arctic 提供的 SQL 工具可以对接 Kyuubi 实现 SQLGateway 功能

Multiple formats

AMS 可以管理不同 table format 的表,类似于 MySQL/ClickHouse 可以选择不同存储引擎,Arctic 通过使用不同的 table format 满足多样的用户需求,目前 Arctic 支持两种 table format:

  • Iceberg format — aka. native Iceberg format,使用 Iceberg 社区原生的 table format,具备所有 Iceberg 功能和特性
  • Mixed streaming format — Arctic 在 Hive 和 Iceberg 之上构建的 format,可以用 LogStore 加速数据处理,在 CDC 摄取,流式更新,fresh OLAP 上有自己的优化,mixed format 根据兼容性可分为:
    Mixed Hive format — schema、partition、types 与 Hive 完全兼容,支持 Hive 表原地升级和原生读写
  • Mixed Iceberg format — schema、partition、types 与 Iceberg 完全兼容,但是在 stream upsert, CDC, Merge on read 实现上与 mixed Hive format 保持一致。

  • 有什么不同?
  • 如果你是 Iceberg 用户,或在调研 Hudi/Iceberg/Delta 这类新型数据湖表格式,希望在生产场景中应用它,可以尝试用 Arctic 来管理原生 Iceberg 表,在 Iceberg 之上,Arctic 提供了:

  • Self-optimizing — 对用户无感的异步数据自优化功能,可以同时作为离线数仓和实时数仓使用
  • Extensible scheduling — 目前支持本地,yarn 调度 optimizer,可以通过 API 扩展到不同的调度平台,支持按需伸缩
  • More managment features
                可视化的 catalog 和 表管理
                轻量化的 SQL 终端,可以 对接 Kyuubi 提供 SQLGateway 功能
                丰富的表与数据优化任务的 metrics


  • 如果你更加关注数据加工延迟和实时场景下的 MPP 性能,或者作为 Hive format 用户,希望在自建的 data platform 或 DataOps 生产范式中快速落地湖仓架构,可以尝试 Arctic 提供的 Mixed streaming format, 除了上文提到数据自优化和管理特性外,Mixed streaming format 主要有以下的不同:
  • Defining keys — 支持在 Mixed format 表上定义主键,支持 Flink 和 Spark 读写数据的主键约束,未来计划支持更多类型的 key
  • Format compatible — 支持 Hive 或 Iceberg 的原生读写,即通过 Hive/Iceberg connector 读写 Mixed format 表
  • Better data pipeline SLA — 利用 LogStore 提升提升流式处理的时效性,从分钟级提升到毫秒与秒级
  • Concurrent conflicts resolving — Arctic 会自动解决引擎并发写入,用户优化任务和自优化任务之间的数据冲突

 

Catalogs

multi-catalog介绍:

Catalog 是一个包含了数据库,表,视图, 索引,用户和 UDF 等一系列信息的元数据空间,catalog 可以被简单理解为 table 和 database 的上一级 namespace。在实践中,一般将 catalog 指向特定类型的数据源实例或集群,在 Flink、Spark 和 Trino 中,可以通过 multi-catalog 功能来支持跨数据源的 SQL,如:

SELECT c.ID, c.NAME, c.AGE, o.AMOUNT
FROM MYSQL.ONLINE.CUSTOMERS c JOIN HIVE.OFFLINE.ORDERS o
ON (c.ID = o.CUSTOMER_ID)

过去数据湖围绕 HMS 来管理元数据,遗憾的是 HMS 不支持 multi-catalog,导致引擎在数据湖上的功能存在一定限制,比如有些用户希望用 spark 通过指定 catalog 名称在不同 Hive 集群之间做联邦计算,需要用户在上层重构一套 Hive catalog plugin。其次,数据湖格式正在从 Hive 单极走向 Iceberg,Delta 以及 Hudi 多家争鸣的格局,新型的数据湖 format 对公有云更加友好,也会促进数据湖上云的进程,在这些背景下,需要一套面向 multi-catalog 的管理系统帮助用户治理不同环境,不同 format 的数据湖。

用户可以在 Arctic 中为不同环境,不同集群以及不同的 table format 创建 catalog,再利用 Flink、Spark、Trino 的 multi-catalog 功能实现多集群、多格式的联邦计算。同时,配置在 catalog 中的属性可以被所有表和用户共享,避免了重复设置。Arctic 通过 multi-catalog 的设计,为数据平台提供元数据中心的支持。

AMS 和 HMS 协同使用时,相当于使用 HMS 作为 AMS 的存储底座,结合 native Iceberg format,用户可以在不引入任何 Arctic 依赖的情况下,使用 AMS 的 multi-catalog 管理功能。

使用方式:
在 Arctic v0.4 之后,引入了 catalog 管理功能,所有表的创建都在 catalog 下完成,用户可以在 catalogs 模块中创建,编辑和删除 catalog,创建 catalog 时需要配置 metastore,table format 以及环境配置信息。


用户可以在 catalog properties 中配置给所有表配置参数,也可以在建表时指定 table properteis,在各个引擎执行时配置执行参数,这些参数的覆盖规则为:引擎优先于表,优先于 catalog。

在实践中,推荐按照下面的方式创建 catalog:

  • 如果希望和 HMS 协同使用,Metastore 选择 Hive,format 根据需求选择 Mixed Hive 或 Iceberg
  • 如果希望使用 Arctic 提供的 Mixed Iceberg format,Metastore 选择 Arctic

目前 Arctic catalog 创建时只能选择一种 table format,这主要考虑到引擎在使用 catalog 时会解析成特定的数据源,一对一的形式是符合直觉的,另一方面,直接使用 HMS 时可以突破这个限制,比如 Iceberg 社区提供的 SessionCatalog 实现,未来 Arctic 会考虑为用户提供更加灵活的管理方式。

创建catalog相关参数:

Catalog 和 Table format 的映射关系如下所示:

 

Table Format

 

Table format (aka. format)最早由 Iceberg 提出,table format 可以描述为:

  • 定义了表和文件的关系,任何引擎都可以根据 table format 查询和检索数据文件
  • Iceberg / Delta / Hudi 这类新型 format 进一步定义了表与快照,快照与文件的关系,表上所有写操作会产生新快照,所有表的读操作都面向快照,快照为数据湖带来了 MVCC、ACID 以及 Transaction 的能力。

此外,Iceberg 这类新型 table format 还提供了 schema evolve、hidden partiton、data skip 等众多高级特性,Hudi、Delta 在具体功能上可能有所差异,但我们看到在过去两年的迭代中,table format 的标准随着三个开源项目的功能趋同在逐步确立。

对用户,Arctic 的设计目标是开箱即用的湖仓系统,而在系统内部,Arctic 的设计理念是将不同 table format 作为数据湖的 storage engine 来使用,这种设计模式多见于 MySQL、ClickHouse 这样的开源系统。Arctic 选择了最早提出 table format 概念的 Iceberg 作为基础,在不魔改社区代码的前提下,为用户提供了一套可以兼容 Hive format,并且在流和更新场景下更加优化的 Mixed format。Iceberg format 和 Mixed format 各有优势,用户可以根据需求灵活选择,并且都能享受到 Arctic 开箱即用的体验。

Iceberg format

Arctic v0.4 之后开始支持管理原生 Iceberg 表,Iceberg format 具有充分的向上和向下兼容特性,一般情况下,用户不用担心引擎客户端所用的 Iceberg 版本与 Arctic 依赖的 Iceberg 版本的兼容性。

Arctic 同时支持 Iceberg format v1 和 v2,Iceberg v2 中引入了 row-delete 特性,支持 flink 摄取有主键的数据,可以减少更新场景下的写放大:

与 mixed streaming format 不同,Iceberg format 中的主键只适用于 flink,spark 下不保障主键约束


Mixed streaming format

Mixed streaming format 相比 Iceberg format 提供了更多的特性:

  • 更强的主键约束,对 Spark 也同样适用
  • 通过 auto-bucket 机制,为实时数仓提供生产可用的 OLAP 性能
  • 可以通过配置 LogStore,将 data pipeline 的延迟从分钟提升到毫秒/秒
  • Hive 或 Iceberg 格式兼容,支持 Hive 秒级原地升级,兼容 Iceberg 各项原生功能
  • 事务冲突解决机制,让相同主键的并发写入变得可能

Mixed streaming format 的设计初衷是基于数据湖为大数据平台提供流批一体的存储层,以及离线和实时统一的数据仓库,在这个目标驱动下,Arctic 将 mixed format 设计为三级结构,每级结构命名为不同的 TableStore:

  • BaseStore — 存储表的存量数据,通常由批计算或 optimizing 过程产生,作为 ReadStore 对读更加友好
  • ChangeStore — 存储表的流和变更数据,通常由流计算实时写入,也可用于下游的 CDC 消费,作为 WriteStore 对写更加友好
  • LogStore — 作为 ChangeStore 的 cache 层来加速流处理,Arctic 会管理 LogStore 和 ChangeStore 的一致性

Mixed format 中 TableStore 的设计理念类似数据库中的聚簇索引,每个 TableStore 可以使用不同 table format。Mixed format 通过 BaseStore 和 ChangeStore 之间的 merge-on-read 来提供高新鲜度的 OLAP,为了提供高性能的 merge-on-read,BaseStore 和 ChangeStore 采用了完全一致的 partition 和 layout,且都支持 auto-bucket。

Auto-bucket 功能帮助 self-optimizing 过程将 BaseStore 的文件大小控制在 target-size 上下,在尽可能维持 base file size 同时,通过 bucket 的分裂和合并来实现数据量的动态伸缩。Auto-bucket 将一个 partition 下的数据按主键哈希的方式分割成一个个主键不相交的集合,极大降低了 optimizing 过程和 merge-on-read 时需要 scan 的数据量,提升了性能

静态数据情况下 Mixed Iceberg 和 Iceberg 的查询性能几乎相同,但是随着 TPCC 的进行,CDC 数据的增多,不带 Self-Optimizing 的 Mixed Iceberg 和 Iceberg 的性能都会线性增长, 而带有 Self-Optimizing 的表都能稳定在一个合理的范围内
Hudi 的性能因为写入任务自带 Self-Optimizing,所以查询性能也能很好的收敛,总体是优于不带 Self-Optimizing 的 Mix Iceberg,弱于带 Self-Optimizing 的 Mixed Iceberg 和 Native Iceberg。
mixed streaming format 在使用上存在的限制有:

mixed streaming format 在使用上存在的限制有:

  • Compatibility limited — 在 Hive 和 Iceberg 的兼容写的场景下,可能出现主键唯一性破坏或冲突解决失效
  • Primary key constraint — 在主键不包含分区键的情况下,如果流数据中没有更新前像,需要使用 normalized 算子或其他方式还原数据前像,才能保障主键唯一
  • Engines integrated — 目前支持 Flink 和 Spark 读写,支持 Trino 和 Impala 查询数据

Mixed Iceberg format

Mixed Iceberg format 的 BaseStore 和 ChangeStore 都使用 Iceberg format,在 schema、types 和 partition 用法上与 Iceberg 保持一致,在具备 Mixed streaming format 功能特性的同时,可以使用原生 Iceberg connector 读写 BaseStore 和 ChangeStore,从而具备 Iceberg format 的所有功能特性,下面以 Spark 为例,介绍如何用 Iceberg connector 操作 Quick demo 创建的 Mixed Iceberg format 表,我们使用下面的命令打开一个 Spark SQL 客户端:

spark-sql --packages org.apache.Iceberg:Iceberg-spark-runtime-3.2_2.12:0.14.0\
    --conf spark.sql.extensions=org.apache.Iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.local=org.apache.Iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.local.type=hadoop \
    --conf spark.sql.catalog.local.warehouse=/tmp/Arctic/warehouse

之后即可使用如下命令读取、写入这些 Arctic 创建管理的 Iceberg 表:

-- 切换到 Iceberg catalog 下
use local;
-- 查看所有的 Iceberg 表
show tables;
-- 查看 BaseStore
select * from local.test_db.test_table.base;
-- 查看 ChangeStore
select * from local.test_db.test_table.change;
-- 写入 BaseStore
insert into local.test_db.test_table.base value(10, 'tony', timestamp('2022-07-03 12:10:30'));

Arctic 的 Minor optimizing 功能一般可以保障 Iceberg BaseStore 的数据新鲜度维持在分钟级

Mixed Hive format

Mixed Hive format 使用 Hive 表作为 BaseStore,Iceberg 表作为 ChangeStore,Mixed Hive format 支持:

  • schema、partition、types 与 Hive format 保持一致
  • 使用 Hive connector 将 Mixed Hive format 表当成 Hive 表来读写
  • 可以将 Hive 表原地升级为 Mixed Hive format 表,升级过程没有数据重写和迁移,秒级响应
  • 具有 Mixed streaming format 所有功能特性

Mixed Hive format 结构如下所示:

在 BaseStore 中,Hive location 下的文件也会被 Iceberg manifest 索引,不会产生两种 format 的数据冗余,Mixed Hive format 融合了 Iceberg 的快照、ACID 以及 MVCC 特性,对 Hive 的使用方式也做出了极大的兼容,为过去围绕 Hive format 搭建的数据平台、流程以及产品提供了灵活的选型和扩展方案。

Hive location 下的数据新鲜度通过 Full optimizing 来保障,因此 Hive 原生读的时效性相比 Mixed Iceberg table 有较大差距,推荐使用 Mixed Hive format 的 Merge-on-read 读取分钟级新鲜度数据


Self-optimizing

简介:

LakeHouse 具有开放和低耦合的特性,数据和文件交给用户通过各类引擎维护,这样的架构在 T+1 的场景下看起来还好,但随着越来越多的人关注如何将 Lakehouse 应用于流式数仓,实时分析场景,问题变得难以调和,比如:

  • 流式写入带来海量的小文件
  • CDC ingestion 和流式更新产生过量的 delta 数据
  • 应用新型数据湖格式会带来孤儿文件和过期快照

上述任何问题都会给数据分析的性能和成本带来严重影响,为此 Arctic 引入 self-optimizing 机制,目标是将基于新型 table format 打造像数据库,传统数仓一样开箱即用的流式湖仓服务,Self-optimizing 包含但不限于文件合并,去重,排序,孤儿文件和过期快照的清理。

Self-optimizing 的架构与工作机制如下图所示:

Optimizer 是 self-optimizing 的执行组件,是由 AMS 管理的常驻进程,AMS 会负责发现和计划湖仓表的自优化任务,并实时调度给 optimizer 分布式执行,最后由 AMS 负责提交优化结果,Arctic 通过 Optimizer Group 对 Optimizers 实现物理隔离。

Arctic 的 self-optimizing 的核心特性有:

  • 自动、异步与透明 — 后台持续检测文件变化,异步分布式执行优化任务,对用户透明无感
  • 资源隔离和共享 —允许资源在表级隔离和共享,以及设置资源配额
  • 灵活可扩展的部署方式 — 执行节点支持多种部署方式,便捷的扩缩容

Self-optimizing mechanism

在数据写入过程中,可能会产生写放大和读放大两类情况:

  • 读放大 — 由于写入过程中产生过量的小文件,或 delete 文件与 insert 文件产生了过多的映射(如果你是 Iceberg v2 format 用户,对这个问题可能不陌生),如果 optimizing 的调度频率跟不上小文件产生的速度,会严重拖慢文件读取性能
  • 写放大 — 频繁地调度 optimizing 会让存量数据被频繁合并和重写,造成 CPU/IO/Memoery 的资源竞争和浪费,拖慢 optimizing 的速度,也会进一步引发读放大

为了缓解读放大需要频繁执行 optimizing,但是频繁 optimizing 会导致写放大,Self-optimizing 的设计需要在读放大和写放大之间提供最佳的 trade off,Arctic 的 self-optimizing 借鉴了 java 虚拟机分代垃圾回收算法,将文件按照大小分为 Fragment 和 Segment,将 Fragment 和 Segement 上执行的不同 self-optimizing 过程分为 minor 和 major 两种类型,为此 Arctic v0.4 引入了两个参数来定义 Fragment 和 Segment:


-- self-optimizing 的文件目标大小self-optimizing.target-size = 128;-- self-optimizing 处理的 fragment 文件阈值self-optimizing.fragment-ratio = 8

self-optimizing.target-size 定义了 major optimizing 的目标输出大小,默认 128m,self-optimizing.fragment-ratio 定义了 fragment 文件阈值在 target-size 中的占比,8 代表着 target-size 的 1/8,对应 128m 的 target-size 默认 fragement 阈值为 16m,小于 16m 是 fragment 文件,大于 16m 是 segment 文件,如下图所示:

Minor optimizing 的目标是缓解读放大问题,这里涉及两项工作:

  • 将 fragment 文件尽可能快地合并为 segment 文件,当小文件积累时, minor optimizing 会较为频繁地执行
  • 将写友好(WriteStore)的文件格式转换为读友好(ReadStore)的文件格式,对 Mixed format 而言是 ChangeStore 向 BaseStore 的转换,对 Iceberg format 则是 eq-delete 文件向 pos-delete 的转换

在 Minor optimizing 执行多次之后,表空间内会存在较多的 Segment 文件,虽然 Segement 文件的读取效率很多情况下能够满足对性能的要求,但是:

  • 各个 Segment 文件上可能积累了总量可观的 delete 数据
  • Segment 之间可能存在很多在主键上的重复数据

这时候影响读取性能不再是小文件和文件格式引发的读放大问题,而是在存在过量垃圾数据,需要在 merge-on-read 时被合并处理,所以 Arctic 在这里引入了 major optimizing 通过 segment 文件合并来清理垃圾数据,从而将垃圾数据量控制在一个对读友好的比例,一般情况下,minor optimizing 已经做过多轮去重,major optimizing 不会频繁调度执行,从而避免了写放大问题。另外,Full optimizing 会将 target space 内所有文件合并成一个文件,是 major optimizing 的一种特殊情况:

Major optimizing 和 minor optimizing 的设计参考了垃圾回收算法的分代设计,两种 optimizing 的执行逻辑是一致的,都会执行文件合并,数据去重,WriteStore 格式向 ReadStore 格式的转换,Minor、major 和 full optimizing 的输入输出关系如下表所示:


Self-optimizing quota
如果你使用的是不可更新的表,如日志,传感器数据,并且已经习惯于 Iceberg 提供的 optimize 指令,可以考虑通过下面的配置关闭表上的 self-optimizing 功能:


self-optimizing.enabled = false;

如果表配置了主键,支持 CDC 摄取和流式更新,比如数据库同步表,或者按照维度聚合过的表,建议开启 self-optimizing 功能。单张表的 self-optimizing 资源用量通过在表上配置 quota 参数来管理:



-- self-optimizing 能够使用的最大 CPU 数量,可以取小数self-optimizing.quota = 1;

Quota 定义了单张表可以使用的最大 CPU 用量,但 self-optimizing 实际是分布式执行,真实的资源用量是按实际执行时间动态管理的过程,在 optimizing 管理界面,可以通过 quota occupy 这个指标查看单张表的动态 quota 占用,从设计目标看,quota occupy 指标应当动态趋于 100%。

在平台中可能出现超售和超买两种情况:

  • 超买 — 若所有 optimizer 配置超过所有表配置的 quota 总和,quota occupy 可能动态趋于 100% 以上
  • 超卖 — 若所有 optimizer 配置低于所有配置表的 quota 总和,quota occupy 应当动态趋于 100% 以下

 

Table WaterMark

 

Data fressness
数据的新鲜度代表了时效性,在很多论述中,新鲜度是数据质量的重要指标之一,在传统的离线数仓中,更高的成本意味着更好的性能,成本和性能之间是典型的双元悖论,而在高新鲜度的流式数仓中,海量的小文件和更新数据会带来性能下降,新鲜度越高,对性能的影响越大,为了达到满足要求的性能,用户需要支出更高的成本,所以对流式湖仓而言,数据新鲜度,查询性能,成本构成了三元悖论:

Arctic 通过 AMS 的管理功能以及 Self-optimizing 机制为用户提供三元悖论的调和方案,不同于传统数仓,湖仓表会应用于各种各样的 data pipeline,AI,BI 的场景,怎样度量数据的新鲜度对于数据开发者,分析师以及管理员来说都直观重要,为此 Arctic 学习流计算采用 watermark 的概念来度量表的新鲜度。


Table WaterMark

在 Mixed streaming format 中,通过 table watermark 来度量数据的新鲜度

严格来说,Table watermark 用于描述表的写入进度,具体来说它是表上的一个类型为时间戳的属性,意为小于这个时间戳的数据都已经写入到表中,它一般用来观察表的写入进度,也可以作为下游批量计算任务的触发指标。

Mixed streaming format 通过下面的参数来配置 watermark:



'table.event-time-field ' = 'op_time',  'table.watermark-allowed-lateness-second' = '60'

上面的例子中将 op_time 设置为表的事件时间字段,在数据写入时会用写入数据的 op_time 来计算表的水位,同时为了应对写入乱序的问题,设置在计算 watermark 时允许的数据迟到时间为一分钟。不同于流计算,event_time 值小于 watermark 的数据不会被拒绝写入,但也不会影响 watermark 的推进。


可以在 AMS Dashboard 的表详情中看到表当前的水位,也可以在 Terminal 中输入下面的 SQL 来查询表的水位:


SHOW TBLPROPERTIES test_db.test_log_store ('watermark.table');

也可以通过下面的方式查询表 basestore 的 table watermark,结合 Hive 或 Iceberg 的原生读可以更加灵活:


SHOW TBLPROPERTIES test_db.test_log_store ('watermark.base')

只支持在 Mixed hive format 和 Mixed Iceberg format 配置watermark, 不支持在 Iceberg 上配置。

 

Deployment

1: 环境要求

  • Java 8, Trino 需要安装 Java11
  • Optional: MySQL 5.5 及以上 或者 MySQL 8
  • Optional: zookeeper 3.4.x 及以上
  • Optional: Hive(2.x or 3.x)
  • Optional: Hadoop(2.9.x or 3.x)

2: 下载安装包

https://github.com/NetEase/arctic/releases 找到已发行的版本,除了下载 arctic-x.y.z-bin.zip ( x.y.z 是发行版本号)外,可按需根据自己使用的引擎,下载对应各引擎不同版本的runtime包。执行 unzip arctic-x.y.z-bin.zip 解压缩后在同级目录下生成 arctic-x.y.z 目录, 进入到目录 arctic-x.y.z。

3: 源码编译

$ git clone https://github.com/NetEase/arctic.git
$ cd arctic
$ mvn clean package -DskipTests -pl '!Trino' [-Dcheckstyle.skip=true]
$ cd dist/target/
$ ls
arctic-x.y.z-bin.zip (目标版本包)
dist-x.y.z-tests.jar
dist-x.y.z.jar
archive-tmp/
maven-archiver/
$ cd ../../flink/v1.12/flink-runtime/target
$ ls 
arctic-flink-runtime-1.12-x.y.z-tests.jar
arctic-flink-runtime-1.12-x.y.z.jar (Flink 1.12 目标flink runtime 包)
original-arctic-flink-runtime-1.12-x.y.z.jar
maven-archiver/
或者从  dist/target 切换到 spark runtime 包
$ spark/v3.1/spark-runtime/target
$ ls
arctic-spark-3.1-runtime-0.4.0.jar (spark v3.1 目标flink runtime 包)
arctic-spark-3.1-runtime-0.4.0-tests.jar
arctic-spark-3.1-runtime-0.4.0-sources.jar
original-arctic-spark-3.1-runtime-0.4.0.jar

如果需要同时编译 Trino 模块,需要先本地安装 jdk11,并且在用户的 ${user.home}/.m2/ 目录下配置 toolchains.xml,然后执行 mvn package -P toolchain 进行整个项目的编译即可。

<?xml version="1.0" encoding="UTF-8"?>
<toolchains>
    <toolchain>
        <type>jdk</type>
        <provides>
            <version>11</version>
            <vendor>sun</vendor>
        </provides>
        <configuration>
            <jdkHome>${yourJdk11Home}</jdkHome>
        </configuration>
    </toolchain>
</toolchains>

4: 配置 AMS

如果想要在正式场景使用AMS,建议参考以下配置步骤, 修改 {ARCTIC_HOME}/conf/config.yaml
配置服务地址

  • arctic.ams.server-host.prefix 配置选择你服务绑定的 IP 地址或者网段前缀, 目的是在 HA 模式下,用户可以在多台主机上使用相同的配置文件;如果用户只部署单节点,该配置也可以直接指定完整的 IP 地址。
  • AMS 本身对外提供 http 服务和 thrift 服务,需要配置这两个服务监听的端口。Http 服务默认端口1630, Thrift 服务默认端口1260
ams:
  arctic.ams.server-host.prefix: "127." #To facilitate batch deployment can config server host prefix.Must be enclosed in double quotes
  arctic.ams.thrift.port: 1260   # ams thrift服务访问的端口
  arctic.ams.http.port: 1630    # ams dashboard 访问的端口

配置元数据库

用户可以使用 MySQL 作为系统库使用,默认为 Derby,首先在 MySQL 中初始化系统库:

$ mysql -h{mysql主机IP} -P{mysql端口} -u{username} -p
Enter password: #输入密码
'Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 41592724
Server version: 5.7.20-v3-log Source distribution
Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql>
mysql> create database arctic;
Query OK, 1 row affected (0.01 sec)
mysql> use arctic;
Database changed
mysql> source {ARCTIC_HOME}/conf/mysql/x.y.z-init.sql

ams 下添加 MySQL 配置:

ams:
  arctic.ams.mybatis.ConnectionURL: jdbc:mysql://{host}:{port}/{database}?useUnicode=true&characterEncoding=UTF8&autoReconnect=true&useAffectedRows=true&useSSL=false
  arctic.ams.mybatis.ConnectionDriverClassName: com.mysql.jdbc.Driver
  arctic.ams.mybatis.ConnectionUserName: {user}
  arctic.ams.mybatis.ConnectionPassword: {password}
  arctic.ams.database.type: mysql

配置高可用

ams:
  #HA config
  arctic.ams.ha.enable: true     #开启 ha
  arctic.ams.cluster.name: default  # 区分同一套 zookeeper 上绑定多套 AMS
  arctic.ams.zookeeper.server: 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183  # zookeeper server地址

配置Optimizer

Self-optimizing 需要配置 optimizer 资源,包含 Containers 配置和 Optimizer group 配置。以配置 Flink 类型的 Optimizer 为例,配置如下:

containers:
  - name: flinkContainer
    type: flink
    properties:
      FLINK_HOME: /opt/flink/        #flink install home
      HADOOP_CONF_DIR: /etc/hadoop/conf/       #hadoop config dir
      HADOOP_USER_NAME: hadoop       #hadoop user submit on yarn
      JVM_ARGS: -Djava.security.krb5.conf=/opt/krb5.conf       #flink launch jvm args, like kerberos config when ues kerberos
      FLINK_CONF_DIR: /etc/hadoop/conf/        #flink config dir
optimize_group:
  - name: flinkOp
    # container name, should be in the names of containers  
    container: flinkContainer
    properties:
      taskmanager.memory: 2048
      jobmanager.memory: 1024

一个完整的配置样例:

ams:
  arctic.ams.server-host.prefix: "127." #To facilitate batch deployment can config server host prefix.Must be enclosed in double quotes
  arctic.ams.thrift.port: 1260   # ams thrift服务访问的端口
  arctic.ams.http.port: 1630    # ams dashboard 访问的端口
  arctic.ams.optimize.check.thread.pool-size: 10
  arctic.ams.optimize.commit.thread.pool-size: 10
  arctic.ams.expire.thread.pool-size: 10
  arctic.ams.orphan.clean.thread.pool-size: 10
  arctic.ams.file.sync.thread.pool-size: 10
  # derby config.sh 
  # arctic.ams.mybatis.ConnectionDriverClassName: org.apache.derby.jdbc.EmbeddedDriver
  # arctic.ams.mybatis.ConnectionURL: jdbc:derby:/tmp/arctic/derby;create=true
  # arctic.ams.database.type: derby
  # mysql config
  arctic.ams.mybatis.ConnectionURL: jdbc:mysql://{host}:{port}/{database}?useUnicode=true&characterEncoding=UTF8&autoReconnect=true&useAffectedRows=true&useSSL=false
  arctic.ams.mybatis.ConnectionDriverClassName: com.mysql.jdbc.Driver
  arctic.ams.mybatis.ConnectionUserName: {user}
  arctic.ams.mybatis.ConnectionPassword: {password}
  arctic.ams.database.type: mysql
  #HA config
  arctic.ams.ha.enable: true     #开启ha
  arctic.ams.cluster.name: default  # 区分同一套zookeeper上绑定多套AMS
  arctic.ams.zookeeper.server: 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
  # Kyuubi config
  arctic.ams.terminal.backend: kyuubi
  arctic.ams.terminal.kyuubi.jdbc.url: jdbc:hive2://127.0.0.1:10009/
# extension properties for like system
extension_properties:
#test.properties: test
containers:
  # arctic optimizer container config.sh
  - name: localContainer
    type: local
    properties:
      hadoop_home: /opt/hadoop
      # java_home: /opt/java
  - name: flinkContainer
    type: flink
    properties:
      FLINK_HOME: /opt/flink/        #flink install home
      HADOOP_CONF_DIR: /etc/hadoop/conf/       #hadoop config dir
      HADOOP_USER_NAME: hadoop       #hadoop user submit on yarn
      JVM_ARGS: -Djava.security.krb5.conf=/opt/krb5.conf       #flink launch jvm args, like kerberos config when ues kerberos
      FLINK_CONF_DIR: /etc/hadoop/conf/        #flink config dir
  - name: externalContainer
    type: external
    properties:
optimize_group:
  - name: default
    # container name, should equal with the name that containers config.sh
    container: localContainer
    properties:
      # unit MB
      memory: 1024
  - name: flinkOp
    container: flinkContainer
    properties:
      taskmanager.memory: 1024
      jobmanager.memory: 1024
  - name: externalOp
    container: external
    properties:

5:启动AMS

进入到目录 arctic-x.y.z , 执行 bin/ams.sh start 启动 AMS。

$ cd arctic-x.y.z
$ bin/ams.sh start

然后通过浏览器访问 http://localhost:1630 可以看到登录界面,则代表启动成功,登录的默认用户名和密码都是 admin。

 

Configurations

1: Self-optimizing 配置

Self-optimizing 配置对 Iceberg format, Mixed streaming format 都会生效。

2: 数据清理配置

数据清理配置对 Iceberg format, Mixed streaming format 都会生效。

3: Mixed streaming format

如果使用 native Iceberg format 表,请参阅 Iceberg configurations,以下参数对 Mixed streaming format 有效

相关实践学习
数据库实验室挑战任务-初级任务
本场景介绍如何开通属于你的免费云数据库,在RDS-MySQL中完成对学生成绩的详情查询,执行指定类型SQL。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
4月前
|
存储 SQL 分布式计算
开源大数据比对平台设计与实践—dataCompare
开源大数据比对平台设计与实践—dataCompare
70 0
|
4月前
|
SQL 存储 大数据
某互联网大厂亿级大数据服务平台的建设和实践
某互联网大厂亿级大数据服务平台的建设和实践
68 0
|
4月前
|
SQL 大数据 关系型数据库
开源大数据比对平台(dataCompare)新版本发布
开源大数据比对平台(dataCompare)新版本发布
74 0
|
4月前
|
SQL 存储 分布式计算
从0到1介绍一下开源大数据比对平台dataCompare
从0到1介绍一下开源大数据比对平台dataCompare
124 0
|
2月前
|
监控 物联网 大数据
智慧工地管理平台系统源码基于物联网、云计算、大数据等技术
智慧工地平台APP通过对施工过程人机料法环的全面感知、互联互通、智能协同,提高施工现场的生产效率、管理水平和决策能力,实现施工管理的数字化、智能化、精益化。
57 0
|
3月前
|
SQL 消息中间件 分布式计算
开源湖仓一体平台(一):LakeSoul
开源湖仓一体平台(一):LakeSoul
|
3月前
|
存储 人工智能 运维
轻喜到家基于 EMR-StarRocks 构建实时湖仓分析平台实践
本文从轻喜到家的历史技术架构与痛点问题、架构升级需求与 OLAP 选型过程、最新技术架构及落地场景应用等方面,详细介绍了轻喜到家基于 EMR-StarRocks 构建实时湖仓分析平台实践经验。
904 0
轻喜到家基于 EMR-StarRocks 构建实时湖仓分析平台实践
|
4月前
|
SQL 存储 大数据
从0到1介绍一下开源大数据服务平台dataService
从0到1介绍一下开源大数据服务平台dataService
116 1
|
4月前
|
存储 SQL 数据挖掘
某工商信息商业查询平台基于阿里云数据库 SelectDB 版内核 Apache Doris 的湖仓一体建设实践
从传统 Lambda 架构到基于 Doris Multi-Catalog 的湖仓一体架构实践,保证了数据的准确性和实时性、高效处理和分析了大规模数据,推动信息服务行业发展创新!
某工商信息商业查询平台基于阿里云数据库 SelectDB 版内核 Apache Doris 的湖仓一体建设实践
|
2月前
|
分布式计算 DataWorks IDE
MaxCompute数据问题之忽略脏数据如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
47 0