使用Spark从同一区域的多个s3桶中读取-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

使用Spark从同一区域的多个s3桶中读取

2019-04-22 17:06:10 3039 1

我正在尝试从多个s3存储桶中读取文件。

最初桶应该在不同的区域,但看起来这是不可能的。

所以现在我已经将另一个桶复制到与要读取的第一个桶相同的区域,这与我正在执行spark作业的区域相同。

SparkSession设置:

val sparkConf = new SparkConf()

      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array(classOf[Event]))

    SparkSession.builder
      .appName("Merge application")
      .config(sparkConf)
      .getOrCreate()

使用创建SparkSession中的SQLContext调用的函数:

private def parseEvents(bucketPath: String, service: String)(

implicit sqlContext: SQLContext

): Try[RDD[Event]] =

Try(
  sqlContext.read
    .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
    .json(bucketPath)
    .toJSON
    .rdd
    .map(buildEvent(_, bucketPath, service).get)
)

主流程:

for {

  bucketOnePath               <- buildBucketPath(config.bucketOne.name)
  _                           <- log(s"Reading events from $bucketOnePath")
  bucketOneEvents: RDD[Event] <- parseEvents(bucketOnePath, config.service)
  _                           <- log(s"Enriching events from $bucketOnePath with originating region data")
  bucketOneEventsWithRegion: RDD[Event] <- enrichEventsWithRegion(
    bucketOneEvents,
    config.bucketOne.region
  )

  bucketTwoPath               <- buildBucketPath(config.bucketTwo.name)
  _                           <- log(s"Reading events from $bucketTwoPath")
  bucketTwoEvents: RDD[Event] <- parseEvents(config.bucketTwo.name, config.service)
  _                           <- log(s"Enriching events from $bucketTwoPath with originating region data")
  bucketTwoEventsWithRegion: RDD[Event] <- enrichEventsWithRegion(
    bucketTwoEvents,
    config.bucketTwo.region
  )

  _                        <- log("Merging events")
  mergedEvents: RDD[Event] <- merge(bucketOneEventsWithRegion, bucketTwoEventsWithRegion)
  if mergedEvents.isEmpty() == false
  _ <- log("Grouping merged events by partition key")
  mergedEventsByPartitionKey: RDD[(EventsPartitionKey, Iterable[Event])] <- eventsByPartitionKey(
    mergedEvents
  )

  _ <- log(s"Storing merged events to ${config.outputBucket.name}")
  _ <- store(config.outputBucket.name, config.service, mergedEventsByPartitionKey)
} yield ()

我在日志中得到的错误(实际存储桶名称已更改,但实际名称确实存在):

19/04/09 13:10:20 INFO SparkContext: Created broadcast 4 from rdd at MergeApp.scala:141
19/04/09 13:10:21 INFO FileSourceScanExec: Planning scan with bin packing, max size: 134217728 bytes, open cost is considered as scanning 4194304 bytes.
org.apache.spark.sql.AnalysisException: Path does not exist: hdfs:someBucket2
我的stdout日志显示主要代码在失败之前走了多远:

Reading events from s3://someBucket/////*.gz
Enriching events from s3://someBucket/////*.gz with originating region data
Reading events from s3://someBucket2/////*.gz
Merge failed: Path does not exist: hdfs://someBucket2
奇怪的是,无论我选择哪个桶,第一次读取总是有效。但是第二次读取总是失败,无论是什么桶。这告诉我水桶没什么问题,但是在使用多个s3水桶时会有些奇怪。

我只能看到从单个s3存储桶读取多个文件的线程,而不是来自多个s3存储桶的多个文件。

取消 提交回答
全部回答(1)
  • 小六码奴
    2019-07-17 23:34:00

    你在someBucket2路径中缺少一个s3://前缀,所以它试图(默认)在hdfs中找到它

    0 0
相关问答

5

回答

Spark 【问答合集】

社区小助手 2019-05-29 14:13:40 130250浏览量 回答数 5

10

回答

【精品问答合集】Hbase热门问答

hbase小能手 2019-05-29 14:37:26 123870浏览量 回答数 10

119

回答

OSS存储服务-客户端工具

newegg11 2012-05-17 15:37:18 302828浏览量 回答数 119

3

回答

Logstash采集文件名称以及自定义格式的日志文件采集

Snorlax 2019-06-18 11:51:28 116073浏览量 回答数 3

5

回答

java.lang.UnsupportedOperationException: This is supposed to be overridden by subclasses.

迷茫君 2019-07-16 09:26:11 120885浏览量 回答数 5

1

回答

出现这个问题求解决This XML file does not appear to have any

淘合肥 2017-04-08 21:07:00 100383浏览量 回答数 1

43

回答

【精品问答集锦】Python热门问题

小六码奴 2019-05-30 15:27:34 144862浏览量 回答数 43

24

回答

【精品问答】python技术1000问(1)

问问小秘 2019-11-15 13:25:00 486587浏览量 回答数 24

23

回答

【精品问答合集】Redis热门问答

李博 bluemind 2019-05-29 16:36:15 131382浏览量 回答数 23

2

回答

区域选择帮助

fanyue88888 2012-12-07 15:54:30 205803浏览量 回答数 2
+关注
8
文章
487
问答
问答排行榜
最热
最新
相关电子书
更多
JS零基础入门教程(上册)
立即下载
性能优化方法论
立即下载
手把手学习日志服务SLS,云启实验室实战指南
立即下载