Spark-数据读取与保存(Scala版)

简介: Spark-数据读取与保存(Scala版)

文件格式

Spark对文件的读取和保存方式都很简单,会根据文件的扩展名选择对应的处理方式

image.png

文本文件

当我们将一个文本文件读取为RDD时,输入的每一行都会成为RDD的一个元素,也可以将多个完整的文本文件一次性读取为一个pair RDD,其中键是文件名,值是文件内容。


读取文本文件


只需要使用文件路径作为参数调用SparkContext中的textFile()函数,就可以读取一个文本文件


读取一个文本文件

val input = sc.textFile("file:///home/holden/repos/spark/README.md")

如果文件足够小,可以使用SparkContext,wholeTextFiles()方法,该方法会返回一个pair RDD,其中键是输入文件的文件名

val input = sc.wholeTextFiles("file:///home/holden/salesFiles")
val result = input.mapValues{y =>
    val nums = y.split(" ").map(x => x.toDouble)
    nums.sum / nums.size.toDouble
}

JSON

JSON是一种使用较广的半结构化数据格式,读取JSON数据的最简单的方法可以在所有支持的编程语言中使用。然后使用JSON解释器来对RDD中的值进行映射操作,Scala中也可以使用一个自定义Hadoop格式来操作JSON数据。


在scala中读取JSON

import com.fasterxml.jackson.moudle.scala.DefaultScalaMoudle
import com.fasterxml.jackson.moudle.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMappr
import com.fasterxml.jackson.databind.DeserializationFeature
...
case class Person(name:String,lovesPandas:Boolean //必须是顶级类
...
//将其解析为特定的case class。使用flatMap,通过在遇到问题时返回空列表(None)
//来处理错误,而在没有问题时返回包含一个元素的列表(Some(_))
val result =input.flatMap(record =>{
  try{
  Some(mapper.readValue(record,classOf[Person]))
  }catch{
   case e:Exception => None
  }})

在scala中保存为JSON

result.filter(p => p.lovesPandas).map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile)

逗号分隔值(CSV)与制表符分割值(TSV)

读取CSV


读取CSV/TSV数据和读取JSON数据相似,都需要先把文件当做普通文本文件来读取数据,再对数据进行处理。由于格式标准的缺失,同一个库的不同版本有时也会用不同的方式处理输入数据。


如果你的CSV的所有数据字段均没有包含换行符,你也可以使用textFile()读取并解析数据

import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
...
val input = sc.textFile(inputFile)
val result = input.map{line => 
  val reader = new CSVReader(new StringReader(line));
  reader.readNext();
 }

如果在字段中嵌有换行符,就需要完整读入每个文件,然后解析。

case class Person(name: String,favoriteAnimal:String)
val input = sc.wholeTextFiles(inputFile)
val result = input.flatMap{case (_,txt) =>
  val reader = new CSVReader(new StringReader(txt));
  reader.readAll().map(x => Person(x(0),x(1)))
}

保存CSV

写出CSV/TSV数据很简单,可以通过重用输出编码器来加速

pandaLovers.map(persion => List(person.name,person.favoriteAnimal).toArray).mapPartitions{
  people => val stringWriter = new StringWriter();
  val csvWriter = new CSVWriter(stringWriter);
  csvWriter.writeAll(people.toList)
  Iterator(stringWriter.toString)
  }.saveAsTextFile(outFile)

上述的例子中只能在我们知道所有要输出的字段时使用,然而如果一些字段名是在运行时由用户输入决定的,就要使用别的方法了,最简单的方法是遍历所有的数据,提取不同的键,然后分别输出。


SequenceFile

SequenceFile是由没有相对关系结构的键值对文件组成常用的Hadoop格式,SequenceFile文件有同步标记,Spark可以用它来定位到文件中的某个点,然后再与记录的边界对其。这可以让Spark使用多个节点高效的并行读取SequenceFile文件。


读取SequenceFile


Spark有专门用来读取SequenceFile的接口。SparkContext中,可以调用sequenceFile(path,keyClass,valueClass,minPartitions)

val data = sc.sequenceFile(infile,classOf[Text],classOf[IntWritable]).
  map{case(x,y)=> (x.toString,y.get())}

保存SequenceFile

val data = sc.parallelize(List(("panda",3),("key",6),("Snail",2)))
data.saveAsSequenceFile(outputFile)


相关文章
|
4月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
70 3
|
4月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
77 0
|
4月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
124 0
|
4月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
107 0
|
4月前
|
缓存 分布式计算 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
95 0
|
4月前
|
分布式计算 算法 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
79 0
|
机器学习/深度学习 分布式计算 资源调度
《Scala机器学习》一一3.5 Spark的性能调整
本节书摘来自华章出版社《Scala机器学习》一 书中的第3章,第3.5节,作者:[美] 亚历克斯·科兹洛夫(Alex Kozlov)著 ,更多章节内容可以访问云栖社区“华章计算机”公众号查看。
1433 0
|
机器学习/深度学习 分布式计算 调度
|
4月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
92 5
|
4月前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
73 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方