我正在尝试通过在EMR上执行的spark应用程序读取s3上目录中的所有文件。
数据以典型格式存储,如“s3a://Some/path/yyyy/mm/dd/hh/blah.gz”
如果我使用深度嵌套的通配符(例如“s3a:// SomeBucket / SomeFolder / / / / *。gz”),性能非常糟糕,需要大约40分钟才能读取几万个小的gzip压缩文件。
我的另外两种方法,我的研究告诉我,它的性能要高得多。
使用hadoop.fs库(2.8.5)我尝试读取我提供的每个文件路径。
private def getEventDataHadoop(
eventsFilePaths: RDD[String]
)(implicit sqlContext: SQLContext): Try[RDD[String]] =
Try(
{
val conf = sqlContext.sparkContext.hadoopConfiguration
eventsFilePaths.map(eventsFilePath => {
val p = new Path(eventsFilePath)
val fs = p.getFileSystem(conf)
val eventData: FSDataInputStream = fs.open(p)
IOUtils.toString(eventData)
})
}
)
这些文件路径由以下代码生成:
private[disneystreaming] def generateInputBucketPaths(
s3Protocol: String,
bucketName: String,
service: String,
region: String,
yearsMonths: Map[String, Set[String]]
): Try[Set[String]] =
Try(
{
val days = 1 to 31
val hours = 0 to 23
val dateFormatter: Int => String = buildDateFormat("00")
yearsMonths.flatMap { yearMonth: (String, Set[String]) =>
for {
month: String <- yearMonth._2
day: Int <- days
hour: Int <- hours
} yield
s"$s3Protocol$bucketName/$service/$region/${dateFormatter(yearMonth._1.toInt)}/${dateFormatter(month.toInt)}/" +
s"${dateFormatter(day)}/${dateFormatter(hour)}/*.gz"
}.toSet
}
)
hadoop.fs代码失败,因为Path类不可序列化。我想不出怎么能解决这个问题。
所以这导致我使用AmazonS3Client的另一种方法,我只是要求客户端给我文件夹(或前缀)中的所有文件路径,然后将文件解析为字符串,由于它们被压缩可能会失败:
private def getEventDataS3(bucketName: String, prefix: String)(
implicit sqlContext: SQLContext
): Try[RDD[String]] =
Try(
{
import com.amazonaws.services.s3._, model._
import scala.collection.JavaConverters._
val request = new ListObjectsRequest()
request.setBucketName(bucketName)
request.setPrefix(prefix)
request.setMaxKeys(Integer.MAX_VALUE)
val s3 = new AmazonS3Client(new ProfileCredentialsProvider("default"))
val objs: ObjectListing = s3.listObjects(request) // Note that this method returns truncated data if longer than the "pageLength" above. You might need to deal with that.
sqlContext.sparkContext
.parallelize(objs.getObjectSummaries.asScala.map(_.getKey).toList)
.flatMap { key =>
Source
.fromInputStream(s3.getObject(bucketName, key).getObjectContent: InputStream)
.getLines()
}
}
)
此代码产生null异常,因为配置文件不能为null(“java.lang.IllegalArgumentException:配置文件不能为null”)。请记住,此代码在AWS中的EMR上运行,因此如何提供所需的凭据?其他人如何使用此客户端在EMR上运行spark作业?
Path在后来的Hadoop版本中是可序列化的,因为它可以在Spark RDD中使用。在此之前,将路径转换为URI,对其进行编组,并在闭包内从该URI创建新路径。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。