如何通过Flink将Iceberg表数据写入到阿里云OSS

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
简介: 本文详细介绍了如何通过Flink将数据导入到iceberg+oss数据湖中,这里我们不需要依赖任何的定制化工具,只需要通过开源的产品就能实现哦。

版本要求

我在以下两个环境下都验证过flink的读写流程:

环境一
hadoop == 3.1.0
hive == 3.1.2
apache flink == 1.12.1

环境二
hadoop == 2.9.2
hive == 2.3.6
flink == 1.12.1

依赖环境

验证hadoop包读取oss文件

第一步,在 $HADOOP_HOME/etc/hadoop/hadoop-env.sh 中,设置如下。这样整个hadoop classpath都会自动加载aliyun oss的相关依赖。

export HADOOP_OPTIONAL_TOOLS="hadoop-aliyun"

如果使用hadoop==2.9.2版本,则需要执行如下命令:

echo "CLASSPATH=\${CLASSPATH}:\${TOOL_PATH}" >> $HADOOP_HOME/libexec/hadoop-config.sh

第二步,建议在 $HADOOP_HOME/etc/hadoop/core-site.xml 中新增如下配置,并通过hdfs的命令行工具来验证hadoop包确实能读取oss://...这种类型的文件。


fs.oss.endpoint=${YOUR_OSS_ENDPOINT}
fs.oss.accessKeyId=${YOU_ACCESS_KEY_ID}
fs.oss.accessKeySecret=${YOU_SECRET_KEY}
fs.AbstractFileSystem.oss.impl=org.apache.hadoop.fs.aliyun.oss.OSS
fs.oss.impl=org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem

第三步,通过hdfs命令行来验证的确可以读取oss的文件:

./bin/hdfs dfs -ls oss://vvr-daily/iceberg/warehouse
Found 1 items
drwxrwxrwx   -          0 2021-02-10 14:31 oss://vvr-daily/iceberg/warehouse/oss_db.db

启动hive metastore服务

export HADOOP_HOME=/Users/openinx/software/hadoop-binary/hadoop-3.1.0
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

./bin/hive --hiveconf hive.root.logger=console \
           --hiveconf hive.log.level=INFO \
           --service metastore

对于hive2.3.9,可以采用如下命令来启动hive:

export HADOOP_HOME=/Users/openinx/software/hadoop-binary/hadoop-2.9.2
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

./bin/hive --service metastore \
    --hiveconf hive.root.logger=INFO,console

启动apache flink的本地集群

第一步:编译apache iceberg,获取到flink-runtime-xx.jar文件

git clone --single-branch --branch aliyun-oss git@github.com:openinx/incubator-iceberg.git iceberg

cd iceberg/
./gradlew build -x test

# The generated flink-runtime-xx.jar is here:
ls -altr flink-runtime/build/libs/iceberg-flink-runtime-*.jar | grep -v javadoc | grep -v sources | grep -v tests

第二步:把编译获得的flink-runtime-xx.jar拷贝到 $FLINK_HOME/lib 目录下,以便让 flink 进程能正常加载到 flink-runtime-xx.jar

cp flink-runtime/build/libs/iceberg-flink-runtime-7ea20a0.jar $FLINK_HOME/lib/

第三步:启动flink本地集群

export HADOOP_HOME=/Users/openinx/software/hadoop-binary/hadoop-3.1.0
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

# switch to $FLINK_HOME directory
./bin/start-cluster.sh

SQL客户端验证

打开flink的sql-client.sh

export HADOOP_HOME=/Users/openinx/software/hadoop-binary/hadoop-3.1.0
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

./bin/sql-client.sh embedded shell

创建hive catalog

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint'='${YOUR_OSS_ENDPOINT}',
  'oss.access.key.id'='${YOU_OSS_ACCESS_KEY_ID}',
  'oss.access.key.secret'='${YOUR_OSS_ACCESS_SECRET}',
  'warehouse'='oss://vvr-daily/iceberg/warehouse'
);

切换到catalog和db并建表

USE CATALOG hive_catalog;

CREATE DATABASE oss_db;

USE oss_db;

CREATE TABLE sample (
  id BIGINT COMMENT 'unique id',
  data STRING
);

插入数据并查询

INSERT INTO sample VALUES (1, 'a'), (2, 'b'), (3, 'c');

SELECT * FROM sample;

验证oss对象存储文件

osscmd ls oss://vvr-daily/iceberg/warehouse/oss_db.db
LastModifiedTime                   Size(B)  StorageClass   ETAG                                  ObjectName
2021-02-10 14:31:51 +0800 CST            0      Standard   D41D8CD98F00B204E9800998ECF8427E      oss://vvr-daily/iceberg/warehouse/oss_db.db/
2021-02-10 15:25:15 +0800 CST          740      Standard   890693B69C7E842C29BAD95DE0904BDA      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-0de7dc8b-4c52-4505-8f2c-2d04b249c0d4-00001.parquet
2021-02-10 15:27:32 +0800 CST          746      Standard   EE6A0560F4CA31DC9081D02C51C43EBE      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-4a44477b-c052-425a-b3ab-96afe40b4339-00001.parquet
2021-02-10 15:23:49 +0800 CST          637      Standard   BD2975A01ED70D124FE578CED21C7B64      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-536c4e37-d3ce-4c7e-ba25-748214afd539-00001.parquet
2021-02-10 15:24:39 +0800 CST          637      Standard   BD2975A01ED70D124FE578CED21C7B64      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-60c99eda-598d-490f-8ec7-5db853ae5c48-00001.parquet
2021-02-10 15:26:13 +0800 CST          740      Standard   6FEA8E20BDE0028298CF461507AAB554      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-69eb8407-6e56-4476-aaf8-e532c02c7aab-00001.parquet
2021-02-10 15:26:51 +0800 CST          740      Standard   6793E685BD94AB8C2C0BE5BE5D785990      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-895b9620-07a2-4b8d-a91d-63698ddf06e0-00001.parquet
2021-02-10 15:28:18 +0800 CST          746      Standard   5365D890196B3F2B8847826A4E387601      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-8acc716c-2fa9-4f50-beb0-c97a0570d50d-00001.parquet
2021-02-10 15:27:13 +0800 CST          740      Standard   BECCDE882F3AD9A47EF769F2B858E080      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-c477130d-a7bb-4708-ab0f-f83b3afee46c-00001.parquet
2021-02-10 15:25:31 +0800 CST          740      Standard   6EFEF4A682B93A236D50335B811ED593      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-dd6f19da-14d5-43f2-9a36-0ea550958b63-00001.parquet
2021-02-10 16:09:02 +0800 CST          641      Standard   9072CDFDB5DF6BD7844B861F04B2A8ED      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-e58a43ec-cad2-47fe-aafc-73f2c684f314-00001.parquet
2021-02-10 15:09:26 +0800 CST          837      Standard   61777084CF896703012F45AB9B8498AE      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00000-0e19d8da-def7-40db-8648-9e952b668f9e.metadata.json
2021-02-10 15:23:50 +0800 CST         1840      Standard   60DDB6336B2D39C31419334A109770C9      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00001-8ef10750-d77f-4313-a3b2-1ec648259759.metadata.json
2021-02-10 15:24:39 +0800 CST         2873      Standard   F3AAFD6870CFF7EE9A7E804CC1D5523F      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00002-45af1b78-a263-41ce-a85e-cefc816421da.metadata.json
2021-02-10 15:25:15 +0800 CST         3910      Standard   449FB92DA945EFB51B5D1411DF276969      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00003-913d52e3-057f-4553-b7dd-be082b1d16d0.metadata.json
2021-02-10 15:25:31 +0800 CST         4947      Standard   AFB0EE4FD23518AD29AF2F12F226C824      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00004-3667f934-dd1b-433e-836b-4fc143068b74.metadata.json
2021-02-10 15:26:13 +0800 CST         5985      Standard   39FB15CE22E41F98D4368BF8853250A4      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00005-7b993212-acb0-43bf-b4af-27d0d5d61286.metadata.json
2021-02-10 15:26:51 +0800 CST         7024      Standard   2C7E61FBB012F03B0B64B7C90DFE5221      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00006-a78d66e9-f1de-46a7-95f1-78e14c0d6d58.metadata.json
2021-02-10 15:27:13 +0800 CST         8063      Standard   C6DAB40D00D01A25D257FF7F2FF76396      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00007-90d271a5-17e1-499a-a4ee-aa97eb23ac04.metadata.json
2021-02-10 15:27:32 +0800 CST         9099      Standard   2CA05CAA64BD32DF3C549F91D0CDC1A1      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00008-e7dcdee9-77a9-4d26-bf72-f1c0be8e0561.metadata.json
2021-02-10 15:28:18 +0800 CST        10136      Standard   A7760B70118B92A7CCD9A86A0AD8BCE0      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00009-93979359-82b4-49b2-8826-068c02f026f7.metadata.json
2021-02-10 16:09:03 +0800 CST        11176      Standard   AE2DEDB89B07E831B21F7B12500909D2      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00010-b83b6ac4-1c2d-4f94-adae-fe757a33faf7.metadata.json
2021-02-10 15:26:51 +0800 CST         5764      Standard   55C5FC5AE6B1F9F45B5F09AC70341612      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/025e0373-6f2e-476c-95aa-e12b71310cf4-m0.avro
2021-02-10 15:27:13 +0800 CST         5766      Standard   F9DF118D3D72C3F2AAF05BF0ED48B45B      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/2c76a3aa-c985-422d-9625-47e6f1c3740e-m0.avro
2021-02-10 15:23:49 +0800 CST         5762      Standard   0C8A3D04CDBEB3BB4B254D8468D73B9F      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/355363b5-2ac0-453f-8c54-86fb1a557982-m0.avro
2021-02-10 15:27:32 +0800 CST         5767      Standard   CD1B2BE3969B3CF06AF3F775D0F11B61      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/39d3080d-ac67-4d4d-9af7-2460fb5ba4e8-m0.avro
2021-02-10 15:25:15 +0800 CST         5765      Standard   0D09FDE94CD86622B29BACDD924E6255      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/4532147f-5f1f-4c8b-ab67-e4705cd48129-m0.avro
2021-02-10 15:25:31 +0800 CST         5767      Standard   74703571585B7838C17C571CBBC7B096      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/97c932d9-5897-4b90-99cd-271c082996a1-m0.avro
2021-02-10 15:26:13 +0800 CST         5765      Standard   826DC704DDD543B90E2D8747F0EF65EC      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/a2df9213-ed90-4db9-9633-9374bf0fa968-m0.avro
2021-02-10 15:28:18 +0800 CST         5766      Standard   0A3B3016D4B4D7F6023E2F15B1EFD5DD      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/a7575e5b-0140-4fc9-8c99-de748e3f826f-m0.avro
2021-02-10 16:09:03 +0800 CST         5767      Standard   996317E4064D84E79C18824F05E2FA41      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/e38151ba-2ae6-42a1-ab80-24bbe2029030-m0.avro
2021-02-10 15:24:39 +0800 CST         5761      Standard   46150675F7719C36DEE2B0552E1105F6      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/f85df6c5-40ab-4356-b6e6-62146620112a-m0.avro
2021-02-10 15:26:51 +0800 CST         3766      Standard   A5FB104A15FEFC394D26553EBD3181AC      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-1475725340177940004-1-025e0373-6f2e-476c-95aa-e12b71310cf4.avro
2021-02-10 15:27:13 +0800 CST         3811      Standard   7D9DD5B31EB196539B77E379D004B6DD      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-2325464499159025089-1-2c76a3aa-c985-422d-9625-47e6f1c3740e.avro
2021-02-10 15:24:39 +0800 CST         3576      Standard   928F49332EDFDCC8E13EB42DD436DB41      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-238979745127271847-1-f85df6c5-40ab-4356-b6e6-62146620112a.avro
2021-02-10 15:28:18 +0800 CST         3891      Standard   26CEF933ACF3719B0FCB1FC870FC24EF      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-281762876800111005-1-a7575e5b-0140-4fc9-8c99-de748e3f826f.avro
2021-02-10 15:26:13 +0800 CST         3717      Standard   F526908F6EC7D386C0CCE5F3D35D07F0      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-3217692098732956502-1-a2df9213-ed90-4db9-9633-9374bf0fa968.avro
2021-02-10 16:09:03 +0800 CST         3935      Standard   FF23CB475C8E1663B97D36FE7F39DF7A      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-3585784897630481352-1-e38151ba-2ae6-42a1-ab80-24bbe2029030.avro
2021-02-10 15:25:15 +0800 CST         3627      Standard   F311DFF00D62096F5E58CE112984FB08      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-4375397182832385716-1-4532147f-5f1f-4c8b-ab67-e4705cd48129.avro
2021-02-10 15:23:49 +0800 CST         3509      Standard   3976ACB4842DC17A7269EDB43FF04EDE      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-4925086930231607072-1-355363b5-2ac0-453f-8c54-86fb1a557982.avro
2021-02-10 15:27:32 +0800 CST         3852      Standard   D3DC3DECFA7257AC1833FEB9764C3579      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-666850814092055918-1-39d3080d-ac67-4d4d-9af7-2460fb5ba4e8.avro
2021-02-10 15:25:31 +0800 CST         3674      Standard   38F04E8D14DCD3402995AB724C3B7FB8      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-7178282328530815760-1-97c932d9-5897-4b90-99cd-271c082996a1.avro
Object Number is: 42
相关实践学习
通义万相文本绘图与人像美化
本解决方案展示了如何利用自研的通义万相AIGC技术在Web服务中实现先进的图像生成。
相关文章
|
4月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
851 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
1348 7
阿里云实时计算Flink在多行业的应用和实践
|
6月前
|
消息中间件 关系型数据库 Kafka
阿里云基于 Flink CDC 的现代数据栈云上实践
阿里云基于 Flink CDC 的现代数据栈云上实践
112 1
|
数据采集 Oracle 关系型数据库
实时计算 Flink版产品使用问题之怎么实现从Oracle数据库读取多个表并将数据写入到Iceberg表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
9月前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
380 16
|
存储 运维 Cloud Native
"Flink+Paimon:阿里云大数据云原生运维数仓的创新实践,引领实时数据处理新纪元"
【8月更文挑战第2天】Flink+Paimon在阿里云大数据云原生运维数仓的实践
436 3
|
存储 Java 关系型数据库
实时计算 Flink版产品使用问题之以jar包方式同步数据是否需要定义存储oss的位置
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
搜索推荐 数据处理 调度
阿里云实时计算:引领企业走向实时智能决策之路
数据整合:整合交通摄像头、GPS定位等多种数据源。 实时路况分析:分析实时路况,预测交通拥堵。 智能调度:基于分析结果进行车辆调度和路线规划。
|
SQL 运维 关系型数据库

热门文章

最新文章