如何通过Flink将Iceberg表数据写入到阿里云OSS

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
简介: 本文详细介绍了如何通过Flink将数据导入到iceberg+oss数据湖中,这里我们不需要依赖任何的定制化工具,只需要通过开源的产品就能实现哦。

版本要求

我在以下两个环境下都验证过flink的读写流程:

环境一
hadoop == 3.1.0
hive == 3.1.2
apache flink == 1.12.1

环境二
hadoop == 2.9.2
hive == 2.3.6
flink == 1.12.1

依赖环境

验证hadoop包读取oss文件

第一步,在 $HADOOP_HOME/etc/hadoop/hadoop-env.sh 中,设置如下。这样整个hadoop classpath都会自动加载aliyun oss的相关依赖。

export HADOOP_OPTIONAL_TOOLS="hadoop-aliyun"

如果使用hadoop==2.9.2版本,则需要执行如下命令:

echo "CLASSPATH=\${CLASSPATH}:\${TOOL_PATH}" >> $HADOOP_HOME/libexec/hadoop-config.sh

第二步,建议在 $HADOOP_HOME/etc/hadoop/core-site.xml 中新增如下配置,并通过hdfs的命令行工具来验证hadoop包确实能读取oss://...这种类型的文件。


fs.oss.endpoint=${YOUR_OSS_ENDPOINT}
fs.oss.accessKeyId=${YOU_ACCESS_KEY_ID}
fs.oss.accessKeySecret=${YOU_SECRET_KEY}
fs.AbstractFileSystem.oss.impl=org.apache.hadoop.fs.aliyun.oss.OSS
fs.oss.impl=org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem

第三步,通过hdfs命令行来验证的确可以读取oss的文件:

./bin/hdfs dfs -ls oss://vvr-daily/iceberg/warehouse
Found 1 items
drwxrwxrwx   -          0 2021-02-10 14:31 oss://vvr-daily/iceberg/warehouse/oss_db.db

启动hive metastore服务

export HADOOP_HOME=/Users/openinx/software/hadoop-binary/hadoop-3.1.0
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

./bin/hive --hiveconf hive.root.logger=console \
           --hiveconf hive.log.level=INFO \
           --service metastore

对于hive2.3.9,可以采用如下命令来启动hive:

export HADOOP_HOME=/Users/openinx/software/hadoop-binary/hadoop-2.9.2
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

./bin/hive --service metastore \
    --hiveconf hive.root.logger=INFO,console

启动apache flink的本地集群

第一步:编译apache iceberg,获取到flink-runtime-xx.jar文件

git clone --single-branch --branch aliyun-oss git@github.com:openinx/incubator-iceberg.git iceberg

cd iceberg/
./gradlew build -x test

# The generated flink-runtime-xx.jar is here:
ls -altr flink-runtime/build/libs/iceberg-flink-runtime-*.jar | grep -v javadoc | grep -v sources | grep -v tests

第二步:把编译获得的flink-runtime-xx.jar拷贝到 $FLINK_HOME/lib 目录下,以便让 flink 进程能正常加载到 flink-runtime-xx.jar

cp flink-runtime/build/libs/iceberg-flink-runtime-7ea20a0.jar $FLINK_HOME/lib/

第三步:启动flink本地集群

export HADOOP_HOME=/Users/openinx/software/hadoop-binary/hadoop-3.1.0
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

# switch to $FLINK_HOME directory
./bin/start-cluster.sh

SQL客户端验证

打开flink的sql-client.sh

export HADOOP_HOME=/Users/openinx/software/hadoop-binary/hadoop-3.1.0
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

./bin/sql-client.sh embedded shell

创建hive catalog

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint'='${YOUR_OSS_ENDPOINT}',
  'oss.access.key.id'='${YOU_OSS_ACCESS_KEY_ID}',
  'oss.access.key.secret'='${YOUR_OSS_ACCESS_SECRET}',
  'warehouse'='oss://vvr-daily/iceberg/warehouse'
);

切换到catalog和db并建表

USE CATALOG hive_catalog;

CREATE DATABASE oss_db;

USE oss_db;

CREATE TABLE sample (
  id BIGINT COMMENT 'unique id',
  data STRING
);

插入数据并查询

INSERT INTO sample VALUES (1, 'a'), (2, 'b'), (3, 'c');

SELECT * FROM sample;

验证oss对象存储文件

osscmd ls oss://vvr-daily/iceberg/warehouse/oss_db.db
LastModifiedTime                   Size(B)  StorageClass   ETAG                                  ObjectName
2021-02-10 14:31:51 +0800 CST            0      Standard   D41D8CD98F00B204E9800998ECF8427E      oss://vvr-daily/iceberg/warehouse/oss_db.db/
2021-02-10 15:25:15 +0800 CST          740      Standard   890693B69C7E842C29BAD95DE0904BDA      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-0de7dc8b-4c52-4505-8f2c-2d04b249c0d4-00001.parquet
2021-02-10 15:27:32 +0800 CST          746      Standard   EE6A0560F4CA31DC9081D02C51C43EBE      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-4a44477b-c052-425a-b3ab-96afe40b4339-00001.parquet
2021-02-10 15:23:49 +0800 CST          637      Standard   BD2975A01ED70D124FE578CED21C7B64      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-536c4e37-d3ce-4c7e-ba25-748214afd539-00001.parquet
2021-02-10 15:24:39 +0800 CST          637      Standard   BD2975A01ED70D124FE578CED21C7B64      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-60c99eda-598d-490f-8ec7-5db853ae5c48-00001.parquet
2021-02-10 15:26:13 +0800 CST          740      Standard   6FEA8E20BDE0028298CF461507AAB554      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-69eb8407-6e56-4476-aaf8-e532c02c7aab-00001.parquet
2021-02-10 15:26:51 +0800 CST          740      Standard   6793E685BD94AB8C2C0BE5BE5D785990      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-895b9620-07a2-4b8d-a91d-63698ddf06e0-00001.parquet
2021-02-10 15:28:18 +0800 CST          746      Standard   5365D890196B3F2B8847826A4E387601      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-8acc716c-2fa9-4f50-beb0-c97a0570d50d-00001.parquet
2021-02-10 15:27:13 +0800 CST          740      Standard   BECCDE882F3AD9A47EF769F2B858E080      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-c477130d-a7bb-4708-ab0f-f83b3afee46c-00001.parquet
2021-02-10 15:25:31 +0800 CST          740      Standard   6EFEF4A682B93A236D50335B811ED593      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-dd6f19da-14d5-43f2-9a36-0ea550958b63-00001.parquet
2021-02-10 16:09:02 +0800 CST          641      Standard   9072CDFDB5DF6BD7844B861F04B2A8ED      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/data/00000-0-e58a43ec-cad2-47fe-aafc-73f2c684f314-00001.parquet
2021-02-10 15:09:26 +0800 CST          837      Standard   61777084CF896703012F45AB9B8498AE      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00000-0e19d8da-def7-40db-8648-9e952b668f9e.metadata.json
2021-02-10 15:23:50 +0800 CST         1840      Standard   60DDB6336B2D39C31419334A109770C9      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00001-8ef10750-d77f-4313-a3b2-1ec648259759.metadata.json
2021-02-10 15:24:39 +0800 CST         2873      Standard   F3AAFD6870CFF7EE9A7E804CC1D5523F      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00002-45af1b78-a263-41ce-a85e-cefc816421da.metadata.json
2021-02-10 15:25:15 +0800 CST         3910      Standard   449FB92DA945EFB51B5D1411DF276969      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00003-913d52e3-057f-4553-b7dd-be082b1d16d0.metadata.json
2021-02-10 15:25:31 +0800 CST         4947      Standard   AFB0EE4FD23518AD29AF2F12F226C824      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00004-3667f934-dd1b-433e-836b-4fc143068b74.metadata.json
2021-02-10 15:26:13 +0800 CST         5985      Standard   39FB15CE22E41F98D4368BF8853250A4      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00005-7b993212-acb0-43bf-b4af-27d0d5d61286.metadata.json
2021-02-10 15:26:51 +0800 CST         7024      Standard   2C7E61FBB012F03B0B64B7C90DFE5221      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00006-a78d66e9-f1de-46a7-95f1-78e14c0d6d58.metadata.json
2021-02-10 15:27:13 +0800 CST         8063      Standard   C6DAB40D00D01A25D257FF7F2FF76396      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00007-90d271a5-17e1-499a-a4ee-aa97eb23ac04.metadata.json
2021-02-10 15:27:32 +0800 CST         9099      Standard   2CA05CAA64BD32DF3C549F91D0CDC1A1      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00008-e7dcdee9-77a9-4d26-bf72-f1c0be8e0561.metadata.json
2021-02-10 15:28:18 +0800 CST        10136      Standard   A7760B70118B92A7CCD9A86A0AD8BCE0      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00009-93979359-82b4-49b2-8826-068c02f026f7.metadata.json
2021-02-10 16:09:03 +0800 CST        11176      Standard   AE2DEDB89B07E831B21F7B12500909D2      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/00010-b83b6ac4-1c2d-4f94-adae-fe757a33faf7.metadata.json
2021-02-10 15:26:51 +0800 CST         5764      Standard   55C5FC5AE6B1F9F45B5F09AC70341612      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/025e0373-6f2e-476c-95aa-e12b71310cf4-m0.avro
2021-02-10 15:27:13 +0800 CST         5766      Standard   F9DF118D3D72C3F2AAF05BF0ED48B45B      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/2c76a3aa-c985-422d-9625-47e6f1c3740e-m0.avro
2021-02-10 15:23:49 +0800 CST         5762      Standard   0C8A3D04CDBEB3BB4B254D8468D73B9F      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/355363b5-2ac0-453f-8c54-86fb1a557982-m0.avro
2021-02-10 15:27:32 +0800 CST         5767      Standard   CD1B2BE3969B3CF06AF3F775D0F11B61      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/39d3080d-ac67-4d4d-9af7-2460fb5ba4e8-m0.avro
2021-02-10 15:25:15 +0800 CST         5765      Standard   0D09FDE94CD86622B29BACDD924E6255      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/4532147f-5f1f-4c8b-ab67-e4705cd48129-m0.avro
2021-02-10 15:25:31 +0800 CST         5767      Standard   74703571585B7838C17C571CBBC7B096      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/97c932d9-5897-4b90-99cd-271c082996a1-m0.avro
2021-02-10 15:26:13 +0800 CST         5765      Standard   826DC704DDD543B90E2D8747F0EF65EC      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/a2df9213-ed90-4db9-9633-9374bf0fa968-m0.avro
2021-02-10 15:28:18 +0800 CST         5766      Standard   0A3B3016D4B4D7F6023E2F15B1EFD5DD      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/a7575e5b-0140-4fc9-8c99-de748e3f826f-m0.avro
2021-02-10 16:09:03 +0800 CST         5767      Standard   996317E4064D84E79C18824F05E2FA41      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/e38151ba-2ae6-42a1-ab80-24bbe2029030-m0.avro
2021-02-10 15:24:39 +0800 CST         5761      Standard   46150675F7719C36DEE2B0552E1105F6      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/f85df6c5-40ab-4356-b6e6-62146620112a-m0.avro
2021-02-10 15:26:51 +0800 CST         3766      Standard   A5FB104A15FEFC394D26553EBD3181AC      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-1475725340177940004-1-025e0373-6f2e-476c-95aa-e12b71310cf4.avro
2021-02-10 15:27:13 +0800 CST         3811      Standard   7D9DD5B31EB196539B77E379D004B6DD      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-2325464499159025089-1-2c76a3aa-c985-422d-9625-47e6f1c3740e.avro
2021-02-10 15:24:39 +0800 CST         3576      Standard   928F49332EDFDCC8E13EB42DD436DB41      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-238979745127271847-1-f85df6c5-40ab-4356-b6e6-62146620112a.avro
2021-02-10 15:28:18 +0800 CST         3891      Standard   26CEF933ACF3719B0FCB1FC870FC24EF      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-281762876800111005-1-a7575e5b-0140-4fc9-8c99-de748e3f826f.avro
2021-02-10 15:26:13 +0800 CST         3717      Standard   F526908F6EC7D386C0CCE5F3D35D07F0      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-3217692098732956502-1-a2df9213-ed90-4db9-9633-9374bf0fa968.avro
2021-02-10 16:09:03 +0800 CST         3935      Standard   FF23CB475C8E1663B97D36FE7F39DF7A      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-3585784897630481352-1-e38151ba-2ae6-42a1-ab80-24bbe2029030.avro
2021-02-10 15:25:15 +0800 CST         3627      Standard   F311DFF00D62096F5E58CE112984FB08      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-4375397182832385716-1-4532147f-5f1f-4c8b-ab67-e4705cd48129.avro
2021-02-10 15:23:49 +0800 CST         3509      Standard   3976ACB4842DC17A7269EDB43FF04EDE      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-4925086930231607072-1-355363b5-2ac0-453f-8c54-86fb1a557982.avro
2021-02-10 15:27:32 +0800 CST         3852      Standard   D3DC3DECFA7257AC1833FEB9764C3579      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-666850814092055918-1-39d3080d-ac67-4d4d-9af7-2460fb5ba4e8.avro
2021-02-10 15:25:31 +0800 CST         3674      Standard   38F04E8D14DCD3402995AB724C3B7FB8      oss://vvr-daily/iceberg/warehouse/oss_db.db/sample/metadata/snap-7178282328530815760-1-97c932d9-5897-4b90-99cd-271c082996a1.avro
Object Number is: 42
相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
相关文章
|
4月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
836 7
阿里云实时计算Flink在多行业的应用和实践
|
3月前
|
数据采集 Oracle 关系型数据库
实时计算 Flink版产品使用问题之怎么实现从Oracle数据库读取多个表并将数据写入到Iceberg表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
机器学习/深度学习 人工智能 专有云
人工智能平台PAI使用问题之怎么将DLC的数据写入到另一个阿里云主账号的OSS中
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
24天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
3月前
|
存储 机器学习/深度学习 弹性计算
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
|
3月前
|
存储 运维 Cloud Native
"Flink+Paimon:阿里云大数据云原生运维数仓的创新实践,引领实时数据处理新纪元"
【8月更文挑战第2天】Flink+Paimon在阿里云大数据云原生运维数仓的实践
279 3
|
3月前
|
存储 Java 关系型数据库
实时计算 Flink版产品使用问题之以jar包方式同步数据是否需要定义存储oss的位置
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
消息中间件 分布式计算 DataWorks
DataWorks产品使用合集之如何使用Python和阿里云SDK读取OSS中的文件
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
存储 运维 安全
阿里云OSS的优势
【7月更文挑战第19天】阿里云OSS的优势
202 2
|
4月前
|
存储 API 开发工具
阿里云OSS
【7月更文挑战第19天】阿里云OSS
193 1
下一篇
无影云桌面