X-Pack Spark 访问OSS

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 简介 对象存储服务(Object Storage Service,OSS)是一种海量、安全、低成本、高可靠的云存储服务,适合存放任意类型的文件。容量和处理能力弹性扩展,多种存储类型供选择,全面优化存储成本。

简介

对象存储服务(Object Storage Service,OSS)是一种海量、安全、低成本、高可靠的云存储服务,适合存放任意类型的文件。容量和处理能力弹性扩展,多种存储类型供选择,全面优化存储成本。
本文主要介绍通过Spark操作OSS数据的常见方式,代码以Scala为例。本文的代码可以通过“数据工作台”提交。

前置条件

  1. OSS已经创建bucket,假设名称为:test_spark
  2. 已创建具备读写OSS bucket:test_spark权限的用户。假设用户名为test_oss,访问OSS的AccessKeyID和AccessKeySecret分别为:accessId,accessKey。
  3. OSS的路径格式为:oss://${AccessKeyID}:${AccessKeySecret}@${bucketName}.${endPoint}/${ossKeyPath}。例如:oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/spark-table/test.csv

使用Spark读写OSS文件样例

假设有如下内容的文本数据已经存在OSS中,路径为:oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/spark-table/test.csv ,内容为:

101, name_101, 0.52
102, name_102, 0.78
103, name_103, 0.76
104, name_104, 0.78
105, name_105, 0.02
106, name_106, 0.29
107, name_107, 0.63
108, name_108, 0.20
109, name_109, 0.07
110, name_110, 0.33

通过Spark读取文件,常用两种方法

一、 使用DataFrame 读取,实例代码如下:

val conf = new SparkConf().setAppName("spark sql test")
val sparkSession = SparkSession
      .builder()
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()
val ossCsvPath = s"oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/spark-table/test.csv"
//读取test.csv并生产DataFrame
val fileDF = sparkSession.read.csv(ossCsvPath)
//打印fileDF内容
fileDF.show()
//也可以把fileDF 注册是Spark表
fileDF.createOrReplaceTempView(“test_table")
sparkSession.sql("select * from test_table").show()    

二、 创建Spark Sql表指向test.csv,实例代码如下:

val sql =
      s"""create table test_table(
         |      id          int,
         |      name        string,
         |      value       float
         |      ) row format delimited fields terminated by ','
         |      location 'oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/spark-table/'
         |      """.stripMargin
//创建spark 表
sparkSession.sql(sql)
//查询表数据
sparkSession.sql("select * from test_table").show()

通过Spark写文件,常用DataFrame写文件。

示例代码如下:

val conf = new SparkConf().setAppName("spark sql test")
val sparkSession = SparkSession
      .builder()
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()
val ossCsvPath = s"oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/spark-table/test.csv"
//读取test.csv并生产DataFrame
val fileDF = sparkSession.read.csv(ossCsvPath)
//打印fileDF内容
fileDF.show()
val writeOssParquetPath = "oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/parquet-table/"
//写parquet格式文件
fileDF.write.parquet(writeOssParquetPath)
val writeCsvParquetPath = "oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/csv-table/"
//写csv格式文件
fileDF.write.csv(writeCsvParquetPath)

小结

本文给出Spark操作OSS数据的基本用法,更多用法会陆续推出。

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
相关文章
|
2月前
|
分布式计算 DataWorks 数据处理
MaxCompute操作报错合集之UDF访问OSS,配置白名单后出现报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
2月前
|
域名解析 Serverless API
函数计算产品使用问题之如何配置自定义域名访问OSS中的内容
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
3月前
|
存储 分布式计算 大数据
MaxCompute产品使用合集之是否支持创建OSS外部表为分区表,并访问OSS上以分区方式存储的数据
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
3月前
|
存储 运维 Serverless
Serverless 应用引擎产品使用合集之如何访问相同地域的OSS
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
3月前
|
机器学习/深度学习 人工智能 分布式计算
人工智能平台PAI操作报错合集之在ODPS的xxx_dev项目空间调用easyrec训练,需要访问yyy项目空间的OSS,出现报错,是什么导致的
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
2月前
|
机器学习/深度学习 人工智能 专有云
人工智能平台PAI使用问题之怎么将DLC的数据写入到另一个阿里云主账号的OSS中
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
28天前
|
存储 机器学习/深度学习 弹性计算
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
|
2月前
|
消息中间件 分布式计算 DataWorks
DataWorks产品使用合集之如何使用Python和阿里云SDK读取OSS中的文件
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
存储 运维 安全
阿里云OSS的优势
【7月更文挑战第19天】阿里云OSS的优势
99 2