如何通过Flink将Iceberg表数据写入到阿里云OSS

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
对象存储 OSS,标准 - 本地冗余存储 20GB 3个月
对象存储 OSS,内容安全 1000 次 1年
简介: 本文详细介绍了如何通过Flink将数据导入到iceberg+oss数据湖中,这里我们不需要依赖任何的定制化工具,只需要通过开源的产品就能实现哦。

版本要求

我在以下两个环境下都验证过flink的读写流程:

环境一
hadoop == 3.1.0
hive == 3.1.2
apache flink == 1.12.1

环境二
hadoop == 2.9.2
hive == 2.3.6
flink == 1.12.1

依赖环境

验证hadoop包读取oss文件

第一步,在 $HADOOP_HOME/etc/hadoop/hadoop-env.sh 中,设置如下。这样整个hadoop classpath都会自动加载aliyun oss的相关依赖。

export HADOOP_OPTIONAL_TOOLS="hadoop-aliyun"

如果使用hadoop==2.9.2版本,则需要执行如下命令:

echo "CLASSPATH=\${CLASSPATH}:\${TOOL_PATH}" >> $HADOOP_HOME/libexec/hadoop-config.sh

第二步,建议在 $HADOOP_HOME/etc/hadoop/core-site.xml 中新增如下配置,并通过hdfs的命令行工具来验证hadoop包确实能读取oss://...这种类型的文件。


fs.oss.endpoint=${YOUR_OSS_ENDPOINT}
fs.oss.accessKeyId=${YOU_ACCESS_KEY_ID}
fs.oss.accessKeySecret=${YOU_SECRET_KEY}
fs.AbstractFileSystem.oss.impl=org.apache.hadoop.fs.aliyun.oss.OSS
fs.oss.impl=org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem

第三步,通过hdfs命令行来验证的确可以读取oss的文件:

./bin/hdfs dfs -ls oss://vvr-daily/iceberg/warehouse
Found 1 items
drwxrwxrwx   -          0 2021-02-10 14:31 oss://vvr-daily/iceberg/warehouse/oss_db.db

启动hive metastore服务

export HADOOP_HOME=/Users/openinx/software/hadoop-binary/hadoop-3.1.0
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

./bin/hive --hiveconf hive.root.logger=console \
           --hiveconf hive.log.level=INFO \
           --service metastore

对于hive2.3.9,可以采用如下命令来启动hive:

export HADOOP_HOME=/Users/openinx/software/hadoop-binary/hadoop-2.9.2
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

./bin/hive --service metastore \
    --hiveconf hive.root.logger=INFO,console

启动apache flink的本地集群

第一步:编译apache iceberg,获取到flink-runtime-xx.jar文件

git clone --single-branch --branch aliyun-oss git@github.com:openinx/incubator-iceberg.git iceberg

cd iceberg/
./gradlew build -x test

# The generated flink-runtime-xx.jar is here:
ls -altr flink-runtime/build/libs/iceberg-flink-runtime-*.jar | grep -v javadoc | grep -v sources | grep -v tests

第二步:把编译获得的flink-runtime-xx.jar拷贝到 $FLINK_HOME/lib 目录下,以便让 flink 进程能正常加载到 flink-runtime-xx.jar

cp flink-runtime/build/libs/iceberg-flink-runtime-7ea20a0.jar $FLINK_HOME/lib/

第三步:启动flink本地集群

export HADOOP_HOME=/Users/openinx/software/hadoop-binary/hadoop-3.1.0
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

# switch to $FLINK_HOME directory
./bin/start-cluster.sh

SQL客户端验证

打开flink的sql-client.sh

export HADOOP_HOME=/Users/openinx/software/hadoop-binary/hadoop-3.1.0
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

./bin/sql-client.sh embedded shell

创建hive catalog

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint'='${YOUR_OSS_ENDPOINT}',
  'oss.access.key.id'='${YOU_OSS_ACCESS_KEY_ID}',
  'oss.access.key.secret'='${YOUR_OSS_ACCESS_SECRET}',
  'warehouse'='oss://vvr-daily/iceberg/warehouse'
);

切换到catalog和db并建表

USE CATALOG hive_catalog;

CREATE DATABASE oss_db;

USE oss_db;

CREATE TABLE sample (
  id BIGINT COMMENT 'unique id',
  data STRING
);

插入数据并查询

INSERT INTO sample VALUES (1, 'a'), (2, 'b'), (3, 'c');

SELECT * FROM sample;

验证oss对象存储文件

osscmd ls oss://vvr-daily/iceberg/warehouse/oss_db.db
LastModifiedTime                   Size(B)  StorageClass   ETAG                                  ObjectName
2021-02-10 14:31:51 +0800 CST            0      Standard   D41D8CD98F00B204E9800998ECF8427E      oss://vvr-daily/iceberg/warehouse/oss_db.db/
2021-02-10 15:25:15 +0800 CST          740      Standard   890693B69C7E842C29BAD95DE0904BDA      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-0de7dc8b-4c52-4505-8f2c-2d04b249c0d4-00001.parquet
2021-02-10 15:27:32 +0800 CST          746      Standard   EE6A0560F4CA31DC9081D02C51C43EBE      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-4a44477b-c052-425a-b3ab-96afe40b4339-00001.parquet
2021-02-10 15:23:49 +0800 CST          637      Standard   BD2975A01ED70D124FE578CED21C7B64      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-536c4e37-d3ce-4c7e-ba25-748214afd539-00001.parquet
2021-02-10 15:24:39 +0800 CST          637      Standard   BD2975A01ED70D124FE578CED21C7B64      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-60c99eda-598d-490f-8ec7-5db853ae5c48-00001.parquet
2021-02-10 15:26:13 +0800 CST          740      Standard   6FEA8E20BDE0028298CF461507AAB554      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-69eb8407-6e56-4476-aaf8-e532c02c7aab-00001.parquet
2021-02-10 15:26:51 +0800 CST          740      Standard   6793E685BD94AB8C2C0BE5BE5D785990      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-895b9620-07a2-4b8d-a91d-63698ddf06e0-00001.parquet
2021-02-10 15:28:18 +0800 CST          746      Standard   5365D890196B3F2B8847826A4E387601      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-8acc716c-2fa9-4f50-beb0-c97a0570d50d-00001.parquet
2021-02-10 15:27:13 +0800 CST          740      Standard   BECCDE882F3AD9A47EF769F2B858E080      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-c477130d-a7bb-4708-ab0f-f83b3afee46c-00001.parquet
2021-02-10 15:25:31 +0800 CST          740      Standard   6EFEF4A682B93A236D50335B811ED593      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-dd6f19da-14d5-43f2-9a36-0ea550958b63-00001.parquet
2021-02-10 16:09:02 +0800 CST          641      Standard   9072CDFDB5DF6BD7844B861F04B2A8ED      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-e58a43ec-cad2-47fe-aafc-73f2c684f314-00001.parquet
2021-02-10 15:09:26 +0800 CST          837      Standard   61777084CF896703012F45AB9B8498AE      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00000-0e19d8da-def7-40db-8648-9e952b668f9e.metadata.json
2021-02-10 15:23:50 +0800 CST         1840      Standard   60DDB6336B2D39C31419334A109770C9      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00001-8ef10750-d77f-4313-a3b2-1ec648259759.metadata.json
2021-02-10 15:24:39 +0800 CST         2873      Standard   F3AAFD6870CFF7EE9A7E804CC1D5523F      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00002-45af1b78-a263-41ce-a85e-cefc816421da.metadata.json
2021-02-10 15:25:15 +0800 CST         3910      Standard   449FB92DA945EFB51B5D1411DF276969      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00003-913d52e3-057f-4553-b7dd-be082b1d16d0.metadata.json
2021-02-10 15:25:31 +0800 CST         4947      Standard   AFB0EE4FD23518AD29AF2F12F226C824      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00004-3667f934-dd1b-433e-836b-4fc143068b74.metadata.json
2021-02-10 15:26:13 +0800 CST         5985      Standard   39FB15CE22E41F98D4368BF8853250A4      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00005-7b993212-acb0-43bf-b4af-27d0d5d61286.metadata.json
2021-02-10 15:26:51 +0800 CST         7024      Standard   2C7E61FBB012F03B0B64B7C90DFE5221      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00006-a78d66e9-f1de-46a7-95f1-78e14c0d6d58.metadata.json
2021-02-10 15:27:13 +0800 CST         8063      Standard   C6DAB40D00D01A25D257FF7F2FF76396      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00007-90d271a5-17e1-499a-a4ee-aa97eb23ac04.metadata.json
2021-02-10 15:27:32 +0800 CST         9099      Standard   2CA05CAA64BD32DF3C549F91D0CDC1A1      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00008-e7dcdee9-77a9-4d26-bf72-f1c0be8e0561.metadata.json
2021-02-10 15:28:18 +0800 CST        10136      Standard   A7760B70118B92A7CCD9A86A0AD8BCE0      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00009-93979359-82b4-49b2-8826-068c02f026f7.metadata.json
2021-02-10 16:09:03 +0800 CST        11176      Standard   AE2DEDB89B07E831B21F7B12500909D2      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00010-b83b6ac4-1c2d-4f94-adae-fe757a33faf7.metadata.json
2021-02-10 15:26:51 +0800 CST         5764      Standard   55C5FC5AE6B1F9F45B5F09AC70341612      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/025e0373-6f2e-476c-95aa-e12b71310cf4-m0.avro
2021-02-10 15:27:13 +0800 CST         5766      Standard   F9DF118D3D72C3F2AAF05BF0ED48B45B      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/2c76a3aa-c985-422d-9625-47e6f1c3740e-m0.avro
2021-02-10 15:23:49 +0800 CST         5762      Standard   0C8A3D04CDBEB3BB4B254D8468D73B9F      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/355363b5-2ac0-453f-8c54-86fb1a557982-m0.avro
2021-02-10 15:27:32 +0800 CST         5767      Standard   CD1B2BE3969B3CF06AF3F775D0F11B61      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/39d3080d-ac67-4d4d-9af7-2460fb5ba4e8-m0.avro
2021-02-10 15:25:15 +0800 CST         5765      Standard   0D09FDE94CD86622B29BACDD924E6255      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/4532147f-5f1f-4c8b-ab67-e4705cd48129-m0.avro
2021-02-10 15:25:31 +0800 CST         5767      Standard   74703571585B7838C17C571CBBC7B096      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/97c932d9-5897-4b90-99cd-271c082996a1-m0.avro
2021-02-10 15:26:13 +0800 CST         5765      Standard   826DC704DDD543B90E2D8747F0EF65EC      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/a2df9213-ed90-4db9-9633-9374bf0fa968-m0.avro
2021-02-10 15:28:18 +0800 CST         5766      Standard   0A3B3016D4B4D7F6023E2F15B1EFD5DD      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/a7575e5b-0140-4fc9-8c99-de748e3f826f-m0.avro
2021-02-10 16:09:03 +0800 CST         5767      Standard   996317E4064D84E79C18824F05E2FA41      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/e38151ba-2ae6-42a1-ab80-24bbe2029030-m0.avro
2021-02-10 15:24:39 +0800 CST         5761      Standard   46150675F7719C36DEE2B0552E1105F6      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/f85df6c5-40ab-4356-b6e6-62146620112a-m0.avro
2021-02-10 15:26:51 +0800 CST         3766      Standard   A5FB104A15FEFC394D26553EBD3181AC      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-1475725340177940004-1-025e0373-6f2e-476c-95aa-e12b71310cf4.avro
2021-02-10 15:27:13 +0800 CST         3811      Standard   7D9DD5B31EB196539B77E379D004B6DD      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-2325464499159025089-1-2c76a3aa-c985-422d-9625-47e6f1c3740e.avro
2021-02-10 15:24:39 +0800 CST         3576      Standard   928F49332EDFDCC8E13EB42DD436DB41      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-238979745127271847-1-f85df6c5-40ab-4356-b6e6-62146620112a.avro
2021-02-10 15:28:18 +0800 CST         3891      Standard   26CEF933ACF3719B0FCB1FC870FC24EF      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-281762876800111005-1-a7575e5b-0140-4fc9-8c99-de748e3f826f.avro
2021-02-10 15:26:13 +0800 CST         3717      Standard   F526908F6EC7D386C0CCE5F3D35D07F0      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-3217692098732956502-1-a2df9213-ed90-4db9-9633-9374bf0fa968.avro
2021-02-10 16:09:03 +0800 CST         3935      Standard   FF23CB475C8E1663B97D36FE7F39DF7A      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-3585784897630481352-1-e38151ba-2ae6-42a1-ab80-24bbe2029030.avro
2021-02-10 15:25:15 +0800 CST         3627      Standard   F311DFF00D62096F5E58CE112984FB08      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-4375397182832385716-1-4532147f-5f1f-4c8b-ab67-e4705cd48129.avro
2021-02-10 15:23:49 +0800 CST         3509      Standard   3976ACB4842DC17A7269EDB43FF04EDE      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-4925086930231607072-1-355363b5-2ac0-453f-8c54-86fb1a557982.avro
2021-02-10 15:27:32 +0800 CST         3852      Standard   D3DC3DECFA7257AC1833FEB9764C3579      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-666850814092055918-1-39d3080d-ac67-4d4d-9af7-2460fb5ba4e8.avro
2021-02-10 15:25:31 +0800 CST         3674      Standard   38F04E8D14DCD3402995AB724C3B7FB8      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-7178282328530815760-1-97c932d9-5897-4b90-99cd-271c082996a1.avro
Object Number is: 42
相关实践学习
通义万相文本绘图与人像美化
本解决方案展示了如何利用自研的通义万相AIGC技术在Web服务中实现先进的图像生成。
相关文章
|
2月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
345 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
2月前
|
存储 人工智能 Cloud Native
阿里云渠道商:OSS与传统存储系统的差异在哪里?
本文对比传统存储与云原生对象存储OSS的架构差异,涵盖性能、成本、扩展性等方面。OSS凭借高持久性、弹性扩容及与云服务深度集成,成为大数据与AI时代的优选方案。
|
2月前
|
存储 运维 分布式计算
零售数据湖的进化之路:滔搏从Lambda架构到阿里云Flink+Paimon统一架构的实战实践
在数字化浪潮席卷全球的今天,传统零售企业面临着前所未有的技术挑战和转型压力。本文整理自 Flink Forward Asia 2025 城市巡回上海站,滔搏技术负责人分享了滔搏从传统 Lambda 架构向阿里云实时计算 Flink 版+Paimon 统一架构转型的完整实战历程。这不仅是一次技术架构的重大升级,更是中国零售企业拥抱实时数据湖仓一体化的典型案例。
164 0
|
4月前
|
存储 运维 安全
阿里云国际站OSS与自建存储的区别
阿里云国际站对象存储OSS提供海量、安全、低成本的云存储解决方案。相比自建存储,OSS具备易用性强、稳定性高、安全性好、成本更低等优势,支持无限扩展、自动冗余、多层防护及丰富增值服务,助力企业高效管理数据。
|
4月前
|
存储 域名解析 前端开发
震惊!不买服务器,还可以用阿里云国际站 OSS 轻松搭建静态网站
在数字化时代,利用阿里云国际站OSS可低成本搭建静态网站。本文详解OSS优势及步骤:创建Bucket、上传文件、配置首页与404页面、绑定域名等,助你快速上线个人或小型业务网站,操作简单,成本低廉,适合初学者与中小企业。
|
7月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
1189 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
9月前
|
消息中间件 关系型数据库 Kafka
阿里云基于 Flink CDC 的现代数据栈云上实践
阿里云基于 Flink CDC 的现代数据栈云上实践
161 1
|
数据采集 Oracle 关系型数据库
实时计算 Flink版产品使用问题之怎么实现从Oracle数据库读取多个表并将数据写入到Iceberg表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
12月前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
456 16