Spark读取阿里云OSS指定目录下文件名写入Maxcomputer

本文涉及的产品
对象存储 OSS,20GB 3个月
大数据开发治理平台DataWorks,资源组抵扣包 750CU*H
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 针对OSS存储的视频文件,这里介绍下使用Spark如何获取出来其文件名

1.概述

最近刚好有个需求,需要将阿里云对象存储OSS里的视频文件名称拿出来,以便数仓这边输出使用。其实方法有很多Java、Python都可以使用,因本人使用阿里云DataWorks+Maxcomputer,且Maxcomputer可以直接使用ODPS Spark,所以这里采用Spark读取

2.实现

1.通过IntelliJ IDEA工具

首先先创建一个工程,这里就不详细展开怎么创建了;导入需要的依赖,本地运行时Jar可见性需注释掉provided

<properties><spark.version>2.3.0</spark.version><cupid.sdk.version>3.3.8-public</cupid.sdk.version><scala.version>2.11.8</scala.version><scala.binary.version>2.11</scala.binary.version><oss.version>3.10.2</oss.version></properties><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.binary.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.binary.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>cupid-sdk</artifactId><version>${cupid.sdk.version}</version><scope>provided</scope></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>hadoop-fs-oss</artifactId><version>${cupid.sdk.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-actors</artifactId><version>${scala.version}</version></dependency><dependency><groupId>com.aliyun.oss</groupId><artifactId>aliyun-sdk-oss</artifactId><version>${oss.version}</version><scope>provided</scope></dependency>

因要外部读取OSS,这里采用Java SDK调用OSS暴露的API,写个方法返回OSS的数据

packagecn.xxx.utils;
importcom.aliyun.oss.ClientException;
importcom.aliyun.oss.OSS;
importcom.aliyun.oss.OSSClientBuilder;
importcom.aliyun.oss.OSSException;
importcom.aliyun.oss.model.OSSObjectSummary;
importcom.aliyun.oss.model.ObjectListing;
importjava.util.ArrayList;
publicclassOSSFileName {
publicstaticArrayList<String>getOSSFileName(Stringds){
// Endpoint以华东1(杭州)为例,其它Region请按实际情况填写。Stringendpoint="https://oss-cn-xxx.aliyuncs.com";
// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。StringaccessKeyId="xxx";
StringaccessKeySecret="xxx";
// 填写Bucket名称,例如examplebucket。StringbucketName="your bucket name";
// 创建OSSClient实例。OSSossClient=newOSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
//结果集ArrayList<String>list=newArrayList<>();
try {
// ossClient.listObjects返回ObjectListing实例,包含此次listObject请求的返回结果。ObjectListingobjectListing=ossClient.listObjects(bucketName,String.format("videos/%s",ds));
// objectListing.getObjectSummaries获取所有文件的描述信息。for (OSSObjectSummaryobjectSummary : objectListing.getObjectSummaries()) {
Stringkey=objectSummary.getKey();
Stringname=key.split("/")[2];
list.add(name);
            }
        } catch (OSSExceptionoe) {
System.out.println("Caught an OSSException, which means your request made it to OSS, "+"but was rejected with an error response for some reason.");
System.out.println("Error Message:"+oe.getErrorMessage());
System.out.println("Error Code:"+oe.getErrorCode());
System.out.println("Request ID:"+oe.getRequestId());
System.out.println("Host ID:"+oe.getHostId());
        } catch (ClientExceptionce) {
System.out.println("Caught an ClientException, which means the client encountered "+"a serious internal problem while trying to communicate with OSS, "+"such as not being able to access the network.");
System.out.println("Error Message:"+ce.getMessage());
        } finally {
if (ossClient!=null) {
ossClient.shutdown();
            }
        }
returnlist;
    }
}

将读取的数据写入ODPS

因获取OSS数据的方法是Java写的,这里要转换为Scala的集合或数据集需要

import scala.collection.convert.decorateAll._

packagecn.xxximportjava.utilimportcn.soterea.utils.{OSS, OSSFileName}
importorg.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
importscala.collection.convert.decorateAll._//java与scala转换需要objectSparkOSS2ODPS {
defmain(args: Array[String]): Unit= {
valsparkSession=SparkSession      .builder()
      .config("spark.master", "local[8]")//本地运行需要设置spark.master为local[N]才能直接运行,N为并发数      .config("spark.hadoop.odps.project.name","your Aliyun DataWorks Project name")
      .config("spark.hadoop.odps.access.id", "your Access id")
      .config("spark.hadoop.odps.access.key", "your Access key")
      .config("spark.hadoop.odps.end.point", "http://service.cn-xxx.maxcompute.aliyun.com/api")
      .appName("SparkOSS2ODPS")
      .getOrCreate()
importsparkSession._importsparkSession.implicits._valsc=sparkSession.sparkContext//日期参数valds=args(0)
//Java集合转DFvallist: util.ArrayList[String] =OSSFileName.getOSSFileName(ds)
list.asScala.toDF("file_name").createTempView("tb1")
valdf=sql(
s"""|select | *,| '${ds}' as ds|from tb1|""".stripMargin    )
df.write.mode(SaveMode.Overwrite).insertInto("your Maxcomputer table name")
sc.stop()
sparkSession.stop()
  }
}

直接运行就会读出OSS数据写入指定ODPS表中

2.通过DataWorks

使用DataWorks时,注意以下事项

1.依赖中需要将Jar可见性为provided

2.主类中注释掉spark.master和spark.hadoop.odps配置项,因为在Maxcomputer集群中运行,不需要再配置

3.将代码工程打包上传至资源处提交即可


以上就是Spark读取OSS目录下文件名写入ODPS的过程,当然有很多中方式可以实现,我只是用了其中一种,若有不足之处欢迎指正

拜了个拜

相关实践学习
基于Hologres轻量实时的高性能OLAP分析
本教程基于GitHub Archive公开数据集,通过DataWorks将GitHub中的项⽬、行为等20多种事件类型数据实时采集至Hologres进行分析,同时使用DataV内置模板,快速搭建实时可视化数据大屏,从开发者、项⽬、编程语⾔等多个维度了解GitHub实时数据变化情况。
目录
相关文章
|
2月前
|
存储 运维 安全
阿里云国际站OSS与自建存储的区别
阿里云国际站对象存储OSS提供海量、安全、低成本的云存储解决方案。相比自建存储,OSS具备易用性强、稳定性高、安全性好、成本更低等优势,支持无限扩展、自动冗余、多层防护及丰富增值服务,助力企业高效管理数据。
|
2月前
|
存储 域名解析 前端开发
震惊!不买服务器,还可以用阿里云国际站 OSS 轻松搭建静态网站
在数字化时代,利用阿里云国际站OSS可低成本搭建静态网站。本文详解OSS优势及步骤:创建Bucket、上传文件、配置首页与404页面、绑定域名等,助你快速上线个人或小型业务网站,操作简单,成本低廉,适合初学者与中小企业。
|
6月前
|
分布式计算 运维 搜索推荐
立马耀:通过阿里云 Serverless Spark 和 Milvus 构建高效向量检索系统,驱动个性化推荐业务
蝉妈妈旗下蝉选通过迁移到阿里云 Serverless Spark 及 Milvus,解决传统架构性能瓶颈与运维复杂性问题。新方案实现离线任务耗时减少40%、失败率降80%,Milvus 向量检索成本降低75%,支持更大规模数据处理,查询响应提速。
290 57
|
2月前
|
存储 安全 API
某网盘不好用?有没有类似某网盘的存储软件?阿里云国际站 OSS:云存储的全能助手,你 get 了吗?
在数据爆炸时代,阿里云国际站OSS提供海量、安全、低成本的云存储服务,支持多种数据类型存储与灵活访问,助力企业与个人高效管理数据,降低存储成本。开通简便,操作友好,是理想的云端数据解决方案。
|
4月前
|
人工智能 分布式计算 DataWorks
一体系数据平台的进化:基于阿里云 EMR Serverless Spark 的持续演进
本文介绍了一体系汽配供应链平台如何借助阿里云EMR Serverless Spark实现从传统Hadoop平台向云原生架构的迁移。通过融合高质量零部件供应与创新互联网科技,一体系利用EMR Serverless Spark和DataWorks构建高效数据分析体系,解决大规模数据处理瓶颈。方案涵盖实时数据集成、Lakehouse搭建、数仓分层设计及BI/ML应用支持,显著提升数据处理性能与业务响应速度,降低运维成本,为数字化转型奠定基础。最终实现研发效率提升、运维压力减轻,并推动AI技术深度整合,迈向智能化云原生数据平台。
152 4
|
4月前
|
分布式计算 运维 监控
Fusion 引擎赋能:流利说如何用阿里云 Serverless Spark 实现数仓计算加速
本文介绍了流利说与阿里云合作,利用EMR Serverless Spark优化数据处理的全过程。流利说是科技驱动的教育公司,通过AI技术提升用户英语水平。原有架构存在资源管理、成本和性能等痛点,采用EMR Serverless Spark后,实现弹性资源管理、按需计费及性能优化。方案涵盖数据采集、存储、计算到查询的完整能力,支持多种接入方式与高效调度。迁移后任务耗时减少40%,失败率降低80%,成本下降30%。未来将深化合作,探索更多行业解决方案。
202 1
|
8月前
|
存储 分布式计算 物联网
美的楼宇科技基于阿里云 EMR Serverless Spark 构建 LakeHouse 湖仓数据平台
美的楼宇科技基于阿里云 EMR Serverless Spark 建设 IoT 数据平台,实现了数据与 AI 技术的有效融合,解决了美的楼宇科技设备数据量庞大且持续增长、数据半结构化、数据价值缺乏深度挖掘的痛点问题。并结合 EMR Serverless StarRocks 搭建了 Lakehouse 平台,最终实现不同场景下整体性能提升50%以上,同时综合成本下降30%。
620 58
|
8月前
|
机器学习/深度学习 分布式计算 大数据
阿里云 EMR Serverless Spark 在微财机器学习场景下的应用
面对机器学习场景下的训练瓶颈,微财选择基于阿里云 EMR Serverless Spark 建立数据平台。通过 EMR Serverless Spark,微财突破了单机训练使用的数据规模瓶颈,大幅提升了训练效率,解决了存算分离架构下 Shuffle 稳定性和性能困扰,为智能风控等业务提供了强有力的技术支撑。
366 15
|
8月前
|
SQL 分布式计算 Serverless
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
172 0