Spark的数据读取与保存之文件类数据读取与保存

简介: Spark的数据读取与保存之文件类数据读取与保存

Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
文件格式分为:Text文件、Json文件、Csv文件、Sequence文件以及Object文件;
文件系统分为:本地文件系统、HDFS、HBASE以及数据库。

Text文件

1)数据读取:textFile(String)

scala> val hdfsFile = sc.textFile("hdfs://hadoop102:9000/fruit.txt")
hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at <console>:24

2)数据保存: saveAsTextFile(String)

scala> hdfsFile.saveAsTextFile("/fruitOut")

Json文件

如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。
注意:使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件。
(1)导入解析json所需的包

scala> import scala.util.parsing.json.JSON

(2)上传json文件到HDFS

[yongheng@hadoop102 spark]$ hadoop fs -put ./examples/src/main/resources/people.json /

(3)读取文件

scala> val json = sc.textFile("/people.json")
json: org.apache.spark.rdd.RDD[String] = /people.json MapPartitionsRDD[8] at textFile at <console>:24

(4)解析json数据

scala> val result  = json.map(JSON.parseFull)
result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[10] at map at <console>:27

(5)打印

scala> result.collect
res11: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))

Sequence文件

SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以调用 sequenceFile keyClass, valueClass
注意:SequenceFile文件只针对PairRDD
(1)创建一个RDD

scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24

(2)将RDD保存为Sequence文件

scala> rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")

(3)查看该文件

[yongheng@hadoop102 seqFile]$ pwd
/opt/module/spark/seqFile

[yongheng@hadoop102 seqFile]$ ll
总用量 8
-rw-r--r-- 1 yongheng yongheng 108 10月  9 10:29 part-00000
-rw-r--r-- 1 yongheng yongheng 124 10月  9 10:29 part-00001
-rw-r--r-- 1 yongheng yongheng   0 10月  9 10:29 _SUCCESS

[yongheng@hadoop102 seqFile]$ cat part-00000
SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritable

(4)读取Sequence文件

scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile")
seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at sequenceFile at <console>:24

(5)打印读取后的Sequence文件

scala> seq.collect
res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))

对象文件

对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFilek,v 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出。因为是序列化所以要指定类型。
(1)创建一个RDD

scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24

(2)将RDD保存为Object文件

scala> rdd.saveAsObjectFile("file:///opt/module/spark/objectFile")

(3)查看该文件

[yongheng@hadoop102 objectFile]$ pwd
/opt/module/spark/objectFile

[yongheng@hadoop102 objectFile]$ ll
总用量 8
-rw-r--r-- 1 yongheng yongheng 142 10月  9 10:37 part-00000
-rw-r--r-- 1 yongheng yongheng 142 10月  9 10:37 part-00001
-rw-r--r-- 1 yongheng yongheng   0 10月  9 10:37 _SUCCESS

[yongheng@hadoop102 objectFile]$ cat part-00000 
SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableW@`l

(4)读取Object文件

scala> val objFile = sc.objectFile[Int]("file:///opt/module/spark/objectFile")
objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at objectFile at <console>:24

(5)打印读取后的Sequence文件

scala> objFile.collect
res19: Array[Int] = Array(1, 2, 3, 4)
目录
相关文章
|
2月前
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
57 2
|
1月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
106 1
|
1月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
107 2
|
3月前
|
分布式计算 分布式数据库 API
Spark与HBase的集成与数据访问
Spark与HBase的集成与数据访问
|
3月前
|
JSON 分布式计算 关系型数据库
Spark中使用DataFrame进行数据转换和操作
Spark中使用DataFrame进行数据转换和操作
|
3月前
|
存储 分布式计算 调度
Spark任务调度与数据本地性
Spark任务调度与数据本地性
|
4月前
|
分布式计算 Java Spark
Spark Driver和Executor数据传递使用问题
Spark Driver和Executor数据传递使用问题
31 0
|
4月前
|
SQL 分布式计算 Apache
流数据湖平台Apache Paimon(六)集成Spark之DML插入数据
流数据湖平台Apache Paimon(六)集成Spark之DML插入数据
83 0
|
4月前
|
分布式计算 资源调度 大数据
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day30】——Spark数据调优(文末附完整文档)
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day30】——Spark数据调优(文末附完整文档)
65 0
|
4月前
|
消息中间件 分布式计算 大数据
【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)
【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)
75 0