Spark + OSS
Spark 接入 OSS
当前E-MapReduce支持
MetaService服务,支持用户在E-MapReduce环境免AK访问OSS数据源。旧的显式写AK和Endpoint方式也支持,但需要注意OSS Endpoint请使用内网域名,所有的Endpoint可以参考
OSS Endpoint。
下面这个例子演示了Spark如何免AK从OSS中读入数据,并将处理完的数据写回到OSS 中。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在阿里云E-MapReduce环境中,Spark作业可以非常方便地读取和写入OSS(对象存储服务)中的数据。您提到的示例代码展示了一个基本流程,我将为您解析这段代码并补充一些关键点。
首先,确保您的E-MapReduce集群已经配置了对OSS的访问权限,特别是MetaService服务,它允许用户无需显式提供AccessKey和SecretKey(即AK/SK)就能访问OSS资源。这是通过内部的授权机制实现的,提高了安全性并简化了配置。
val conf = new SparkConf().setAppName("Test OSS")
val sc = new SparkContext(conf)
这里初始化了Spark的配置和上下文,setAppName
用于设置应用的名字,这对于监控和日志记录很有帮助。
val pathIn = "oss://bucket/path/to/read"
val inputData = sc.textFile(pathIn)
这一部分指定了从OSS读取数据的路径,格式为oss://bucket-name/path/to/object
。textFile
方法会创建一个包含该OSS路径下所有文本文件内容的RDD(弹性分布式数据集)。
val cnt = inputData.count()
println(s"count: $cnt")
计算输入数据集中元素的数量并打印出来。
val outputPath = "oss://bucket/path/to/write"
val outpuData = inputData.map(e => s"$e has been processed.")
outpuData.saveAsTextFile(outputPath)
这部分代码定义了输出到OSS的路径,并使用map
函数对输入的每行数据进行处理(本例中只是简单地添加后缀),然后通过saveAsTextFile
方法将处理后的数据保存回OSS。
通过这种方式,您可以充分利用阿里云E-MapReduce与OSS的集成优势,高效、安全地处理大数据分析任务。