1.概述
最近刚好有个需求,需要将阿里云对象存储OSS里的视频文件名称拿出来,以便数仓这边输出使用。其实方法有很多Java、Python都可以使用,因本人使用阿里云DataWorks+Maxcomputer,且Maxcomputer可以直接使用ODPS Spark,所以这里采用Spark读取
2.实现
1.通过IntelliJ IDEA工具
首先先创建一个工程,这里就不详细展开怎么创建了;导入需要的依赖,本地运行时Jar可见性需注释掉provided
<properties><spark.version>2.3.0</spark.version><cupid.sdk.version>3.3.8-public</cupid.sdk.version><scala.version>2.11.8</scala.version><scala.binary.version>2.11</scala.binary.version><oss.version>3.10.2</oss.version></properties><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.binary.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.binary.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>cupid-sdk</artifactId><version>${cupid.sdk.version}</version><scope>provided</scope></dependency><dependency><groupId>com.aliyun.odps</groupId><artifactId>hadoop-fs-oss</artifactId><version>${cupid.sdk.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-actors</artifactId><version>${scala.version}</version></dependency><dependency><groupId>com.aliyun.oss</groupId><artifactId>aliyun-sdk-oss</artifactId><version>${oss.version}</version><scope>provided</scope></dependency>
因要外部读取OSS,这里采用Java SDK调用OSS暴露的API,写个方法返回OSS的数据
packagecn.xxx.utils; importcom.aliyun.oss.ClientException; importcom.aliyun.oss.OSS; importcom.aliyun.oss.OSSClientBuilder; importcom.aliyun.oss.OSSException; importcom.aliyun.oss.model.OSSObjectSummary; importcom.aliyun.oss.model.ObjectListing; importjava.util.ArrayList; publicclassOSSFileName { publicstaticArrayList<String>getOSSFileName(Stringds){ // Endpoint以华东1(杭州)为例,其它Region请按实际情况填写。Stringendpoint="https://oss-cn-xxx.aliyuncs.com"; // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。StringaccessKeyId="xxx"; StringaccessKeySecret="xxx"; // 填写Bucket名称,例如examplebucket。StringbucketName="your bucket name"; // 创建OSSClient实例。OSSossClient=newOSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret); //结果集ArrayList<String>list=newArrayList<>(); try { // ossClient.listObjects返回ObjectListing实例,包含此次listObject请求的返回结果。ObjectListingobjectListing=ossClient.listObjects(bucketName,String.format("videos/%s",ds)); // objectListing.getObjectSummaries获取所有文件的描述信息。for (OSSObjectSummaryobjectSummary : objectListing.getObjectSummaries()) { Stringkey=objectSummary.getKey(); Stringname=key.split("/")[2]; list.add(name); } } catch (OSSExceptionoe) { System.out.println("Caught an OSSException, which means your request made it to OSS, "+"but was rejected with an error response for some reason."); System.out.println("Error Message:"+oe.getErrorMessage()); System.out.println("Error Code:"+oe.getErrorCode()); System.out.println("Request ID:"+oe.getRequestId()); System.out.println("Host ID:"+oe.getHostId()); } catch (ClientExceptionce) { System.out.println("Caught an ClientException, which means the client encountered "+"a serious internal problem while trying to communicate with OSS, "+"such as not being able to access the network."); System.out.println("Error Message:"+ce.getMessage()); } finally { if (ossClient!=null) { ossClient.shutdown(); } } returnlist; } }
将读取的数据写入ODPS
因获取OSS数据的方法是Java写的,这里要转换为Scala的集合或数据集需要
import scala.collection.convert.decorateAll._
packagecn.xxximportjava.utilimportcn.soterea.utils.{OSS, OSSFileName} importorg.apache.spark.sql.{DataFrame, SaveMode, SparkSession} importscala.collection.convert.decorateAll._//java与scala转换需要objectSparkOSS2ODPS { defmain(args: Array[String]): Unit= { valsparkSession=SparkSession .builder() .config("spark.master", "local[8]")//本地运行需要设置spark.master为local[N]才能直接运行,N为并发数 .config("spark.hadoop.odps.project.name","your Aliyun DataWorks Project name") .config("spark.hadoop.odps.access.id", "your Access id") .config("spark.hadoop.odps.access.key", "your Access key") .config("spark.hadoop.odps.end.point", "http://service.cn-xxx.maxcompute.aliyun.com/api") .appName("SparkOSS2ODPS") .getOrCreate() importsparkSession._importsparkSession.implicits._valsc=sparkSession.sparkContext//日期参数valds=args(0) //Java集合转DFvallist: util.ArrayList[String] =OSSFileName.getOSSFileName(ds) list.asScala.toDF("file_name").createTempView("tb1") valdf=sql( s"""|select | *,| '${ds}' as ds|from tb1|""".stripMargin ) df.write.mode(SaveMode.Overwrite).insertInto("your Maxcomputer table name") sc.stop() sparkSession.stop() } }
直接运行就会读出OSS数据写入指定ODPS表中
2.通过DataWorks
使用DataWorks时,注意以下事项
1.依赖中需要将Jar可见性为provided
2.主类中注释掉spark.master和spark.hadoop.odps配置项,因为在Maxcomputer集群中运行,不需要再配置
3.将代码工程打包上传至资源处提交即可
以上就是Spark读取OSS目录下文件名写入ODPS的过程,当然有很多中方式可以实现,我只是用了其中一种,若有不足之处欢迎指正
拜了个拜