Spark读取阿里云OSS指定目录下文件名写入Maxcomputer

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 针对OSS存储的视频文件,这里介绍下使用Spark如何获取出来其文件名

1.概述

最近刚好有个需求,需要将阿里云对象存储OSS里的视频文件名称拿出来,以便数仓这边输出使用。其实方法有很多Java、Python都可以使用,因本人使用阿里云DataWorks+Maxcomputer,且Maxcomputer可以直接使用ODPS Spark,所以这里采用Spark读取

2.实现

1.通过IntelliJ IDEA工具

首先先创建一个工程,这里就不详细展开怎么创建了;导入需要的依赖,本地运行时Jar可见性需注释掉provided

<properties><spark.version>2.3.0</spark.version><cupid.sdk.version>3.3.8-public</cupid.sdk.version><scala.version>2.11.8</scala.version><scala.binary.version>2.11</scala.binary.version><oss.version>3.10.2</oss.version></properties><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.binary.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.binary.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>cupid-sdk</artifactId><version>${cupid.sdk.version}</version><scope>provided</scope></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>hadoop-fs-oss</artifactId><version>${cupid.sdk.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-actors</artifactId><version>${scala.version}</version></dependency><dependency><groupId>com.aliyun.oss</groupId><artifactId>aliyun-sdk-oss</artifactId><version>${oss.version}</version><scope>provided</scope></dependency>

因要外部读取OSS,这里采用Java SDK调用OSS暴露的API,写个方法返回OSS的数据

packagecn.xxx.utils;
importcom.aliyun.oss.ClientException;
importcom.aliyun.oss.OSS;
importcom.aliyun.oss.OSSClientBuilder;
importcom.aliyun.oss.OSSException;
importcom.aliyun.oss.model.OSSObjectSummary;
importcom.aliyun.oss.model.ObjectListing;
importjava.util.ArrayList;
publicclassOSSFileName {
publicstaticArrayList<String>getOSSFileName(Stringds){
// Endpoint以华东1(杭州)为例,其它Region请按实际情况填写。Stringendpoint="https://oss-cn-xxx.aliyuncs.com";
// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。StringaccessKeyId="xxx";
StringaccessKeySecret="xxx";
// 填写Bucket名称,例如examplebucket。StringbucketName="your bucket name";
// 创建OSSClient实例。OSSossClient=newOSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
//结果集ArrayList<String>list=newArrayList<>();
try {
// ossClient.listObjects返回ObjectListing实例,包含此次listObject请求的返回结果。ObjectListingobjectListing=ossClient.listObjects(bucketName,String.format("videos/%s",ds));
// objectListing.getObjectSummaries获取所有文件的描述信息。for (OSSObjectSummaryobjectSummary : objectListing.getObjectSummaries()) {
Stringkey=objectSummary.getKey();
Stringname=key.split("/")[2];
list.add(name);
            }
        } catch (OSSExceptionoe) {
System.out.println("Caught an OSSException, which means your request made it to OSS, "+"but was rejected with an error response for some reason.");
System.out.println("Error Message:"+oe.getErrorMessage());
System.out.println("Error Code:"+oe.getErrorCode());
System.out.println("Request ID:"+oe.getRequestId());
System.out.println("Host ID:"+oe.getHostId());
        } catch (ClientExceptionce) {
System.out.println("Caught an ClientException, which means the client encountered "+"a serious internal problem while trying to communicate with OSS, "+"such as not being able to access the network.");
System.out.println("Error Message:"+ce.getMessage());
        } finally {
if (ossClient!=null) {
ossClient.shutdown();
            }
        }
returnlist;
    }
}

将读取的数据写入ODPS

因获取OSS数据的方法是Java写的,这里要转换为Scala的集合或数据集需要

import scala.collection.convert.decorateAll._

packagecn.xxximportjava.utilimportcn.soterea.utils.{OSS, OSSFileName}
importorg.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
importscala.collection.convert.decorateAll._//java与scala转换需要objectSparkOSS2ODPS {
defmain(args: Array[String]): Unit= {
valsparkSession=SparkSession      .builder()
      .config("spark.master", "local[8]")//本地运行需要设置spark.master为local[N]才能直接运行,N为并发数      .config("spark.hadoop.odps.project.name","your Aliyun DataWorks Project name")
      .config("spark.hadoop.odps.access.id", "your Access id")
      .config("spark.hadoop.odps.access.key", "your Access key")
      .config("spark.hadoop.odps.end.point", "http://service.cn-xxx.maxcompute.aliyun.com/api")
      .appName("SparkOSS2ODPS")
      .getOrCreate()
importsparkSession._importsparkSession.implicits._valsc=sparkSession.sparkContext//日期参数valds=args(0)
//Java集合转DFvallist: util.ArrayList[String] =OSSFileName.getOSSFileName(ds)
list.asScala.toDF("file_name").createTempView("tb1")
valdf=sql(
s"""|select | *,| '${ds}' as ds|from tb1|""".stripMargin    )
df.write.mode(SaveMode.Overwrite).insertInto("your Maxcomputer table name")
sc.stop()
sparkSession.stop()
  }
}

直接运行就会读出OSS数据写入指定ODPS表中

2.通过DataWorks

使用DataWorks时,注意以下事项

1.依赖中需要将Jar可见性为provided

2.主类中注释掉spark.master和spark.hadoop.odps配置项,因为在Maxcomputer集群中运行,不需要再配置

3.将代码工程打包上传至资源处提交即可


以上就是Spark读取OSS目录下文件名写入ODPS的过程,当然有很多中方式可以实现,我只是用了其中一种,若有不足之处欢迎指正

拜了个拜

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
一站式大数据开发治理平台DataWorks初级课程
DataWorks 从 2009 年开始,十ー年里一直支持阿里巴巴集团内部数据中台的建设,2019 年双 11 稳定支撑每日千万级的任务调度。每天阿里巴巴内部有数万名数据和算法工程师正在使用DataWorks,承了阿里巴巴 99%的据业务构建。本课程主要介绍了阿里巴巴大数据技术发展历程与 DataWorks 几大模块的基本能力。 课程目标 &nbsp;通过讲师的详细讲解与实际演示,学员可以一边学习一边进行实际操作,可以深入了解DataWorks各大模块的使用方式和具体功能,让学员对DataWorks数据集成、开发、分析、运维、安全、治理等方面有深刻的了解,加深对阿里云大数据产品体系的理解与认识。 适合人群 &nbsp;企业数据仓库开发人员 &nbsp;大数据平台开发人员 &nbsp;数据分析师 &nbsp;大数据运维人员 &nbsp;对于大数据平台、数据中台产品感兴趣的开发者
目录
相关文章
|
6月前
|
机器学习/深度学习 人工智能 专有云
人工智能平台PAI使用问题之怎么将DLC的数据写入到另一个阿里云主账号的OSS中
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
6月前
|
存储 分布式计算 Serverless
|
2月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
4月前
|
SQL 分布式计算 Serverless
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云 EMR Serverless Spark 版正式开启商业化,内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验!
184 3
阿里云 EMR Serverless Spark 版正式开启商业化
|
5月前
|
分布式计算 安全 OLAP
7倍性能提升|阿里云AnalyticDB Spark向量化能力解析
AnalyticDB Spark如何通过向量化引擎提升性能?
|
5月前
|
存储 机器学习/深度学习 弹性计算
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
|
6月前
|
监控 Java Serverless
函数计算产品使用问题之对于OSS打包的zip的保存目录,该如何操作
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
7月前
|
运维 Serverless 应用服务中间件
Serverless 应用引擎产品使用合集之关于OSS映射目录的大小限制,如何可以跳过
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
Serverless 应用引擎产品使用合集之关于OSS映射目录的大小限制,如何可以跳过
|
6月前
|
消息中间件 分布式计算 DataWorks
DataWorks产品使用合集之如何使用Python和阿里云SDK读取OSS中的文件
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
6月前
|
分布式计算 DataWorks 调度
DataWorks产品使用合集之在使用MaxCompute进行数据集成同步到OSS时,出现表名和OSS文件名不一致且多了后缀,该如何处理
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。