如何使用EMR上的spark有效地读取/解析s3文件夹中.gz文件的负载-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

如何使用EMR上的spark有效地读取/解析s3文件夹中.gz文件的负载

2019-04-22 15:45:07 2988 1

我正在尝试通过在EMR上执行的spark应用程序读取s3上目录中的所有文件。

数据以典型格式存储,如“s3a://Some/path/yyyy/mm/dd/hh/blah.gz”

如果我使用深度嵌套的通配符(例如“s3a:// SomeBucket / SomeFolder / / / / *。gz”),性能非常糟糕,需要大约40分钟才能读取几万个小的gzip压缩文件。
我的另外两种方法,我的研究告诉我,它的性能要高得多。

使用hadoop.fs库(2.8.5)我尝试读取我提供的每个文件路径。

private def getEventDataHadoop(

eventsFilePaths: RDD[String]

)(implicit sqlContext: SQLContext): Try[RDD[String]] =

Try(
  {
    val conf = sqlContext.sparkContext.hadoopConfiguration

    eventsFilePaths.map(eventsFilePath => {
      val p                            = new Path(eventsFilePath)
      val fs                           = p.getFileSystem(conf)
      val eventData: FSDataInputStream = fs.open(p)
      IOUtils.toString(eventData)
    })
  }
)

这些文件路径由以下代码生成:

private[disneystreaming] def generateInputBucketPaths(

s3Protocol: String,
bucketName: String,
service: String,
region: String,
yearsMonths: Map[String, Set[String]]

): Try[Set[String]] =

Try(
  {
    val days                         = 1 to 31
    val hours                        = 0 to 23
    val dateFormatter: Int => String = buildDateFormat("00")

    yearsMonths.flatMap { yearMonth: (String, Set[String]) =>
      for {
        month: String <- yearMonth._2
        day: Int      <- days
        hour: Int     <- hours
      } yield
        s"$s3Protocol$bucketName/$service/$region/${dateFormatter(yearMonth._1.toInt)}/${dateFormatter(month.toInt)}/" +
          s"${dateFormatter(day)}/${dateFormatter(hour)}/*.gz"
    }.toSet
  }
)

hadoop.fs代码失败,因为Path类不可序列化。我想不出怎么能解决这个问题。

所以这导致我使用AmazonS3Client的另一种方法,我只是要求客户端给我文件夹(或前缀)中的所有文件路径,然后将文件解析为字符串,由于它们被压缩可能会失败:

private def getEventDataS3(bucketName: String, prefix: String)(

implicit sqlContext: SQLContext

): Try[RDD[String]] =

Try(
  {
    import com.amazonaws.services.s3._, model._
    import scala.collection.JavaConverters._

    val request = new ListObjectsRequest()
    request.setBucketName(bucketName)
    request.setPrefix(prefix)
    request.setMaxKeys(Integer.MAX_VALUE)
    val s3 = new AmazonS3Client(new ProfileCredentialsProvider("default"))

    val objs: ObjectListing = s3.listObjects(request) // Note that this method returns truncated data if longer than the "pageLength" above. You might need to deal with that.
    sqlContext.sparkContext
      .parallelize(objs.getObjectSummaries.asScala.map(_.getKey).toList)
      .flatMap { key =>
        Source
          .fromInputStream(s3.getObject(bucketName, key).getObjectContent: InputStream)
          .getLines()
      }
  }
)

此代码产生null异常,因为配置文件不能为null(“java.lang.IllegalArgumentException:配置文件不能为null”)。请记住,此代码在AWS中的EMR上运行,因此如何提供所需的凭据?其他人如何使用此客户端在EMR上运行spark作业?

取消 提交回答
全部回答(1)
  • 小六码奴
    2019-07-17 23:33:58

    Path在后来的Hadoop版本中是可序列化的,因为它可以在Spark RDD中使用。在此之前,将路径转换为URI,对其进行编组,并在闭包内从该URI创建新路径。

    0 0
相关问答

1

回答

Spark序列化组件中的Java序列化是什么意思啊?

2021-12-13 20:02:58 298浏览量 回答数 1

1

回答

怎么使用Spark分析Parquet文件?

2021-12-12 12:03:51 111浏览量 回答数 1

1

回答

Spark程序中使用SQL语句读取ORC文件的方法是什么?

2021-12-09 20:17:07 125浏览量 回答数 1

1

回答

Hadoop中调用文件系统(FS)Shell命令应使用什么样的语法进行呢?

2021-12-05 14:14:32 162浏览量 回答数 1

1

回答

使用Spark从同一区域的多个s3桶中读取

2019-04-22 17:06:10 3115浏览量 回答数 1

1

回答

我使用spark thrift jdbc 已经在spark配置文件设置了

2019-03-18 16:06:08 4223浏览量 回答数 1

1

回答

Spark DataFrame处理损坏的记录

2018-12-12 11:28:33 1479浏览量 回答数 1

1

回答

如何使用scala解压缩和解压缩.gz.tar文件在文件夹中?

2018-12-11 16:50:06 3044浏览量 回答数 1

1

回答

拆分spark DataFrame列

2018-12-06 15:42:40 2238浏览量 回答数 1

3

回答

请问如何使用JAVA的SDK创建文件夹以及上传文件到指定目录

2014-07-16 16:50:24 11734浏览量 回答数 3
+关注
8
文章
487
问答
问答排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载