数据湖实操讲解【OSS 访问加速】第七讲:Flink 高效 sink 写入 OSS

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
EMR Serverless Spark 免费试用,1000 CU*H 有效期3个月
简介: 数据湖 JindoFS+OSS 实操干货 36讲 每周二16点准时直播!扫文章底部二维码入钉群,线上准时观看~ Github链接: https://github.com/aliyun/alibabacloud-jindofs

本期导读 :【OSS 访问加速】第七讲


主题:Flink 高效 sink 写入 OSS


讲师:重湖,阿里巴巴计算平台事业部 EMR 高级工程师


内容框架:

  • 背景介绍
  • 功能介绍
  • 如何配置
  • 如何使用


直播回放链接:(7/8讲)

https://developer.aliyun.com/live/246851



一、背景介绍


Apache Flink 简介

Apache Flink 是新一代大数据计算引擎的代表,以分布式流计算为核心,同时支持批处理。特点:

  • 低延时:Flink 流式计算可以做到亚秒甚至毫秒级延时,相比之下 Spark 流计算很难达到秒级
  • 高吞吐:Flink 以分布式快照算法实现容错,对吞吐量的影响很小
  • 高容错:基于分布式快照算法,Flink 实现了低代价、高效的容错表现,以及 Exactly_Once 语义保证

image.png


JindoFS Flink Connector 产生背景


阿里云对象存储 Object Storage Service(OSS):

  • 海量:无限容量,弹性伸缩
  • 安全:12个9的数据安全性,多种加密方式
  • 低成本:远低于云磁盘,且有多种存储方式、生命周期管理等节约成本
  • 高可靠:服务可用性 99.9%
  • 已服务于海量用户


Flink 应用广泛:

  • 流计算领域业内主要解决方案
  • Apache 基金会最活跃项目之一
  • 未来:流批一体、在线分析


Flink 使用痛点:

  • 开源 ApacheFlink 尚不支持直接写入 OSS
  • Hadoop OSS SDK 写入性能不一定满足需求


JindoFS Flink Connector 介绍


整体架构:

两阶段 Checkpoint (检查点) 机制:

  • 第一阶段 MPU (MultiPartUpload,分片上传) 写入 OSS
  • 第二阶段 MPU 提交


Recoverable Writer 可恢复性写入:

  • 临时文件以普通文件格式上传 OSS
  • Sink 节点状态快照


image.png

写入 OSS vs.  写入 亚马逊S3:

  • Native 实现:数据写入以 C++ 代码实现,相比 Java 更高效
  • 高速读写:多线程读写临时文件,对大于1MB的文件优势尤其明显
  • 数据缓存:读写 OSS 实现本地缓存,加速外部访问


OSS 访问加速,JindoFS 提供新支持


image.png


二、如何配置


如何配置 JindoFS Flink Connector

环境要求:

  • 集群上有开源版本 Flink 软件,版本不低于1.10.1


SDK 配置:

下载所需 SDK 文件:


将两个 jar 放置于集群 Flink 目录下 lib 文件夹:

  • Flink 根目录通常可由 $FLINK_HOME 环境变量获取
  • 集群所有节点均需配置


Java SPI:自动加载资源,无需额外配置

⭐文档链接(Github):

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/flink/jindofs_sdk_on_flink_for_oss.md


在程序中使用 JindoFS Flink Connector

确保集群能够访问 OSS Bucket


使用合适的路径,流式写入OSS Bucket

  • 写入 OSS 须使用 oss:// 前缀路径,类似于:

oss://<user-bucket>/<user-defined-sink-dir>


更多优化!用 JindoFS SDK 加速 OSS 访问,参考

⭐Github:

https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/jindofs_sdk_vs_hadoop_sdk.md



在程序中使用 JindoFS Flink Connector:Java

在程序中开启 Flink Checkpoint

  • 前提:使用可重发的数据源,如 Kafka


  • 通过 StreamExecutionEnvironment 对象打开 Checkpoint(示例):

建立:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

打开:

env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);


示例程序

  • 下文中,outputStream 是一个预先形成的 DataStream 对象,若需写入 OSS,则可以这样添加 sink:
String outputPath = "oss://<user-bucket>/<user-defined-sink-dir>";
StreamingFileSink<String> sink= StreamingFileSink.forRowFormat(
        new Path(outputPath),
        new SimpleStringEncoder<String>("UTF-8")
).build();
outputStream.addSink(sink);
  • 上述程序指定将 outputStream 中的String 内容写入 OSS 路径 oss:///,最后还需用 env.execute() 语句执行 Flink 作业,env 是已建立的 StreamExecutionEnvironment 对象
  • 最后,将 Java 作业打包为 jar 文件,并用 flink run 在集群提交即可



在程序中使用 JindoFS Flink Connector:Pyflink

与Java 示例类似,在 Pyflink 中使用 JindoFS Flink Connector 与写入 HDFS 等其他介质方式相同,只需:

  • 将写入路径写作合适的 OSS 路径
  • 注意打开 Checkpoint 功能


例如,下列 Python 程序定义了一张位于 OSS 的表:

sink_dest = "oss://<user-bucket>/<user-defined-sink-dir>"
sink_ddl = f""" 
        CREATE TABLE mySink (
                uid INT,
                pid INT
        ) PARTITIONED BY (
                pid
        ) WITH (
                'connector' = 'filesystem',
                'fpath' = '{sink_dest}',
                'format' = 'csv',
                'sink.rolling-policy.file-size' = '2MB',
                'sink.partition-commit.policy.kind' = 'success-file'
        )
"""


然后将其添加到 StreamTableEnvironmentt_env 中即可:t_env.sql_update(sink_ddl)


在程序中使用 JindoFS Flink Connector:更多配置

用户通过 flink run 提交 java 或 pyflink 程序时,可以额外自定义一些参数,格式:

     flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...

目前支持“熵注入”及“分片上传并行度”两项配置


熵注入(entropyinjection):

  • 功能:将写入路径的一段特定字符串匹配出来,用一段随机的字符串进行替换
  • 效果:削弱所谓 “片区” (sharding) 效应,提高写入效率
  • 配置参数:

 oss.entropy.key=<user-defined-key>

 oss.entropy.length=<user-defined-length>


分片上传并行度

  • 配置参数:oss.upload.max.concurrent.uploads
  • 默认值:当前可用的处理器数量



直接观看第四课(7/8讲)视频回放,获取实例讲解~

https://developer.aliyun.com/live/246851





Github链接:

https://github.com/aliyun/alibabacloud-jindofs


不错过每次直播信息、探讨更多数据湖 JindoFS+OSS 相关技术问题,欢迎扫码加入钉钉交流群!


image.png

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
目录
打赏
0
0
0
0
1393
分享
相关文章
AI 场景深度优化!K8s 集群 OSSFS 2.0 存储卷全面升级,高效访问 OSS 数据
阿里云对象存储OSS是一款海量、安全、低成本、高可靠的云存储服务,是用户在云上存储的高性价比选择…
StrmVol存储卷:如何解锁K8s对象存储海量小文件访问性能新高度?
如何提升海量文件的数据读取速率,对于AI训练集管理、量化回测、时序日志分析等场景尤为重要。阿里云容器服务(ACK))支持StrmVol类型存储卷,基于底层虚拟块设备及内核态文件系统,显著降低海量小文件访问延迟。
StrmVol 存储卷:解锁 K8s 对象存储海量小文件访问性能新高度
本文介绍了阿里云容器服务(ACK)支持的StrmVol存储卷方案,旨在解决Kubernetes环境中海量小文件访问性能瓶颈问题。通过虚拟块设备与内核态文件系统(如EROFS)结合,StrmVol显著降低了小文件访问延迟,适用于AI训练集加载、时序日志分析等场景。其核心优化包括内存预取加速、减少I/O等待、内核态直接读取避免用户态切换开销,以及轻量索引快速初始化。示例中展示了基于Argo Workflows的工作流任务,模拟分布式图像数据集加载,测试结果显示平均处理时间为21秒。StrmVol适合只读场景且OSS端数据无需频繁更新的情况,详细使用方法可参考官方文档。
549 144
AI 场景深度优化!K8s 集群 OSSFS 2.0 存储卷全面升级,高效访问 OSS 数据
OSSFS 2.0通过轻量化协议设计、协程化技术及FUSE3低级API重构,实现大文件顺序读写与小文件高并发加载的显著提升,在实际测试中表现出高达数十倍的吞吐量增长。适用于机器学习训练、推理等对高带宽低延迟要求严苛的场景,同时支持静态和动态挂载方式,方便用户在ACK集群中部署使用。
258 35
对象存储OSS-Python设置代理访问请求
通过 Python SDK 配置 nginx 代理地址请求阿里云 OSS 存储桶服务。示例代码展示了如何使用 RAM 账号进行身份验证,并通过代理下载指定对象到本地文件。
308 15
AI助理直击要害,从繁复中提炼精华——使用CDN加速访问OSS存储的图片
本案例介绍如何利用AI助理快速实现OSS存储的图片接入CDN,以加速图片访问。通过AI助理提炼关键操作步骤,避免在复杂文档中寻找解决方案。主要步骤包括开通CDN、添加加速域名、配置CNAME等。实测显示,接入CDN后图片加载时间显著缩短,验证了加速效果。此方法大幅提高了操作效率,降低了学习成本。
5677 16
缺乏中间证书导致通过HTTPS协议访问OSS异常
【10月更文挑战第4天】缺乏中间证书导致通过HTTPS协议访问OSS异常
506 4
实时计算 Flink版操作报错合集之使用parquet时,怎么解决报错:无法访问到java.uti.Arrays$ArrayList类的私有字段
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之在编译源码时遇到报错:无法访问,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
133 1
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问