Spark读取阿里云OSS指定目录下文件名写入Maxcomputer

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,内容安全 1000次 1年
大数据开发治理平台DataWorks,资源组抵扣包 750CU*H
简介: 针对OSS存储的视频文件,这里介绍下使用Spark如何获取出来其文件名

1.概述

最近刚好有个需求,需要将阿里云对象存储OSS里的视频文件名称拿出来,以便数仓这边输出使用。其实方法有很多Java、Python都可以使用,因本人使用阿里云DataWorks+Maxcomputer,且Maxcomputer可以直接使用ODPS Spark,所以这里采用Spark读取

2.实现

1.通过IntelliJ IDEA工具

首先先创建一个工程,这里就不详细展开怎么创建了;导入需要的依赖,本地运行时Jar可见性需注释掉provided

<properties><spark.version>2.3.0</spark.version><cupid.sdk.version>3.3.8-public</cupid.sdk.version><scala.version>2.11.8</scala.version><scala.binary.version>2.11</scala.binary.version><oss.version>3.10.2</oss.version></properties><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.binary.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.binary.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>cupid-sdk</artifactId><version>${cupid.sdk.version}</version><scope>provided</scope></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>hadoop-fs-oss</artifactId><version>${cupid.sdk.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-actors</artifactId><version>${scala.version}</version></dependency><dependency><groupId>com.aliyun.oss</groupId><artifactId>aliyun-sdk-oss</artifactId><version>${oss.version}</version><scope>provided</scope></dependency>

因要外部读取OSS,这里采用Java SDK调用OSS暴露的API,写个方法返回OSS的数据

packagecn.xxx.utils;
importcom.aliyun.oss.ClientException;
importcom.aliyun.oss.OSS;
importcom.aliyun.oss.OSSClientBuilder;
importcom.aliyun.oss.OSSException;
importcom.aliyun.oss.model.OSSObjectSummary;
importcom.aliyun.oss.model.ObjectListing;
importjava.util.ArrayList;
publicclassOSSFileName {
publicstaticArrayList<String>getOSSFileName(Stringds){
// Endpoint以华东1(杭州)为例,其它Region请按实际情况填写。Stringendpoint="https://oss-cn-xxx.aliyuncs.com";
// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。StringaccessKeyId="xxx";
StringaccessKeySecret="xxx";
// 填写Bucket名称,例如examplebucket。StringbucketName="your bucket name";
// 创建OSSClient实例。OSSossClient=newOSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
//结果集ArrayList<String>list=newArrayList<>();
try {
// ossClient.listObjects返回ObjectListing实例,包含此次listObject请求的返回结果。ObjectListingobjectListing=ossClient.listObjects(bucketName,String.format("videos/%s",ds));
// objectListing.getObjectSummaries获取所有文件的描述信息。for (OSSObjectSummaryobjectSummary : objectListing.getObjectSummaries()) {
Stringkey=objectSummary.getKey();
Stringname=key.split("/")[2];
list.add(name);
            }
        } catch (OSSExceptionoe) {
System.out.println("Caught an OSSException, which means your request made it to OSS, "+"but was rejected with an error response for some reason.");
System.out.println("Error Message:"+oe.getErrorMessage());
System.out.println("Error Code:"+oe.getErrorCode());
System.out.println("Request ID:"+oe.getRequestId());
System.out.println("Host ID:"+oe.getHostId());
        } catch (ClientExceptionce) {
System.out.println("Caught an ClientException, which means the client encountered "+"a serious internal problem while trying to communicate with OSS, "+"such as not being able to access the network.");
System.out.println("Error Message:"+ce.getMessage());
        } finally {
if (ossClient!=null) {
ossClient.shutdown();
            }
        }
returnlist;
    }
}

将读取的数据写入ODPS

因获取OSS数据的方法是Java写的,这里要转换为Scala的集合或数据集需要

import scala.collection.convert.decorateAll._

packagecn.xxximportjava.utilimportcn.soterea.utils.{OSS, OSSFileName}
importorg.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
importscala.collection.convert.decorateAll._//java与scala转换需要objectSparkOSS2ODPS {
defmain(args: Array[String]): Unit= {
valsparkSession=SparkSession      .builder()
      .config("spark.master", "local[8]")//本地运行需要设置spark.master为local[N]才能直接运行,N为并发数      .config("spark.hadoop.odps.project.name","your Aliyun DataWorks Project name")
      .config("spark.hadoop.odps.access.id", "your Access id")
      .config("spark.hadoop.odps.access.key", "your Access key")
      .config("spark.hadoop.odps.end.point", "http://service.cn-xxx.maxcompute.aliyun.com/api")
      .appName("SparkOSS2ODPS")
      .getOrCreate()
importsparkSession._importsparkSession.implicits._valsc=sparkSession.sparkContext//日期参数valds=args(0)
//Java集合转DFvallist: util.ArrayList[String] =OSSFileName.getOSSFileName(ds)
list.asScala.toDF("file_name").createTempView("tb1")
valdf=sql(
s"""|select | *,| '${ds}' as ds|from tb1|""".stripMargin    )
df.write.mode(SaveMode.Overwrite).insertInto("your Maxcomputer table name")
sc.stop()
sparkSession.stop()
  }
}

直接运行就会读出OSS数据写入指定ODPS表中

2.通过DataWorks

使用DataWorks时,注意以下事项

1.依赖中需要将Jar可见性为provided

2.主类中注释掉spark.master和spark.hadoop.odps配置项,因为在Maxcomputer集群中运行,不需要再配置

3.将代码工程打包上传至资源处提交即可


以上就是Spark读取OSS目录下文件名写入ODPS的过程,当然有很多中方式可以实现,我只是用了其中一种,若有不足之处欢迎指正

拜了个拜

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
一站式大数据开发治理平台DataWorks初级课程
DataWorks 从 2009 年开始,十ー年里一直支持阿里巴巴集团内部数据中台的建设,2019 年双 11 稳定支撑每日千万级的任务调度。每天阿里巴巴内部有数万名数据和算法工程师正在使用DataWorks,承了阿里巴巴 99%的据业务构建。本课程主要介绍了阿里巴巴大数据技术发展历程与 DataWorks 几大模块的基本能力。 产品官网 https://www.aliyun.com/product/bigdata/ide 大数据&amp;AI体验馆 https://workbench.data.aliyun.com/experience.htm#/ 帮助文档https://help.aliyun.com/zh/dataworks 课程目标 &nbsp;通过讲师的详细讲解与实际演示,学员可以一边学习一边进行实际操作,可以深入了解DataWorks各大模块的使用方式和具体功能,让学员对DataWorks数据集成、开发、分析、运维、安全、治理等方面有深刻的了解,加深对阿里云大数据产品体系的理解与认识。 适合人群 &nbsp;企业数据仓库开发人员 &nbsp;大数据平台开发人员 &nbsp;数据分析师 &nbsp;大数据运维人员 &nbsp;对于大数据平台、数据中台产品感兴趣的开发者
目录
打赏
0
0
0
0
106
分享
相关文章
阿里云 EMR Serverless Spark 在微财机器学习场景下的应用
面对机器学习场景下的训练瓶颈,微财选择基于阿里云 EMR Serverless Spark 建立数据平台。通过 EMR Serverless Spark,微财突破了单机训练使用的数据规模瓶颈,大幅提升了训练效率,解决了存算分离架构下 Shuffle 稳定性和性能困扰,为智能风控等业务提供了强有力的技术支撑。
148 15
美的楼宇科技基于阿里云 EMR Serverless Spark 构建 LakeHouse 湖仓数据平台
美的楼宇科技基于阿里云 EMR Serverless Spark 建设 IoT 数据平台,实现了数据与 AI 技术的有效融合,解决了美的楼宇科技设备数据量庞大且持续增长、数据半结构化、数据价值缺乏深度挖掘的痛点问题。并结合 EMR Serverless StarRocks 搭建了 Lakehouse 平台,最终实现不同场景下整体性能提升50%以上,同时综合成本下降30%。
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
人工智能平台PAI使用问题之怎么将DLC的数据写入到另一个阿里云主账号的OSS中
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云 EMR Serverless Spark 版正式开启商业化,内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验!
244 3
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
阿里云EMR数据湖文件系统问题之OSS-HDFS全托管服务的问题如何解决
函数计算产品使用问题之对于OSS打包的zip的保存目录,该如何操作
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
8月前
|
阿里云OSS的优势
【7月更文挑战第19天】阿里云OSS的优势
299 2
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等