5万字长文!搞定Spark方方面面(四)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 5万字长文!搞定Spark方方面面

9、RDD 数据源

cd23da6be04cf7dea662cba8a6910d21.png

9.1 普通文本文件
sc.textFile("./dir/*.txt")
如果传递目录,则将目录下的所有文件读取作为RDD。文件路径支持通配符。
但是这样对于大量的小文件读取效率并不高,应该使用wholeTextFiles
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])
返回值RDD[(String, String)],其中Key是文件的名称,Value是文件的内容。
9.2 JDBC[掌握]

Spark 支持通过 Java JDBC 访问关系型数据库。需要使用 JdbcRDD

代码演示

package cn.itcast.core
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
/**
  * Desc 演示使用Spark操作JDBC-API实现将数据存入到MySQL并读取出来
  */
object JDBCDataSourceTest {
  def main(args: Array[String]): Unit = {
    //1.创建SparkContext
    val config = new SparkConf().setAppName("JDBCDataSourceTest").setMaster("local[*]")
    val sc = new SparkContext(config)
    sc.setLogLevel("WARN")
    //2.插入数据
    val data: RDD[(String, Int)] = sc.parallelize(List(("jack", 18), ("tom", 19), ("rose", 20)))
    //调用foreachPartition针对每一个分区进行操作
    //data.foreachPartition(saveToMySQL)
    //3.读取数据
    def getConn():Connection={
DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
    }
    val studentRDD: JdbcRDD[(Int, String, Int)] = new JdbcRDD(sc,
      getConn,
   "select * from t_student where id >= ? and id <= ? ",
      4,
      6,
      2,
      rs => {
        val id: Int = rs.getInt("id")
        val name: String = rs.getString("name")
        val age: Int = rs.getInt("age")
        (id, name, age)
      }
    )
    println(studentRDD.collect().toBuffer)
 }
  /*
      CREATE TABLE `t_student` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `name` varchar(255) DEFAULT NULL,
      `age` int(11) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
   */
  def saveToMySQL(partitionData:Iterator[(String, Int)] ):Unit = {
    //将数据存入到MySQL
    //获取连接
    val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
    partitionData.foreach(data=>{
      //将每一条数据存入到MySQL
      val sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (NULL, ?, ?);"
      val ps: PreparedStatement = conn.prepareStatement(sql)
      ps.setString(1,data._1)
      ps.setInt(2,data._2)
      ps.execute()//preparedStatement.addBatch()
    })
//ps.executeBatch()
    conn.close()
  }
}
9.3 HadoopAPI[了解]
https://blog.csdn.net/leen0304/article/details/78854530
Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。
HadoopRDD、newAPIHadoopRDD、saveAsHadoopFile、saveAsNewAPIHadoopFile 是底层API
其他的API接口都是为了方便最终的Spark程序开发者而设置的,是这两个接口的高效实现版本.

a1abd553c8724d94fb03c737b88d6830.png

1429367c26ecb9dab0f6546a341a69ff.png

9.4 SequenceFile 文件[了解]
SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。
https://blog.csdn.net/bitcarmanlee/article/details/78111289

1c48cb79846a12d1293eebbbcc6303cb.png

读sc.sequenceFile keyClass, valueClass
写RDD.saveAsSequenceFile(path)
要求键和值能够自动转为Writable类型。

d3312c343fec9ccec2f7db6ac6d08dcb.png

9.5 对象文件[了解]
对象文件是将对象序列化后保存的文件
读sc.objectFilek,v //因为是序列化所以要指定类型
写RDD.saveAsObjectFile()
9.6 HBase[了解]
由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的实现,Spark 可以通过Hadoop输入格式访问HBase。
这个输入格式会返回键值对数据,
其中键的类型为org. apache.hadoop.hbase.io.ImmutableBytesWritable,
而值的类型为org.apache.hadoop.hbase.client.Result。
https://github.com/teeyog/blog/issues/22
9.7 扩展阅读
package cn.itcast.core
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object DataSourceTest {
  def main(args: Array[String]): Unit = {
    val config = new SparkConf().setAppName("DataSourceTest").setMaster("local[*]")
    val sc = new SparkContext(config)
    sc.setLogLevel("WARN")
    System.setProperty("HADOOP_USER_NAME", "root")
    //1.HadoopAPI
    println("HadoopAPI")
    val dataRDD = sc.parallelize(Array((1,"hadoop"), (2,"hive"), (3,"spark")))
    dataRDD.saveAsNewAPIHadoopFile("hdfs://node01:8020/spark_hadoop/",
      classOf[LongWritable],
      classOf[Text],
      classOf[TextOutputFormat[LongWritable, Text]])
    val inputRDD: RDD[(LongWritable, Text)] = sc.newAPIHadoopFile(
      "hdfs://node01:8020/spark_hadoop/*",
      classOf[TextInputFormat],
      classOf[LongWritable],
      classOf[Text],
      conf = sc.hadoopConfiguration
    )
    inputRDD.map(_._2.toString).foreach(println)
    //2.读取小文件
    println("读取小文件")
    val filesRDD: RDD[(String, String)] = sc.wholeTextFiles("D:\\data\\spark\\files", minPartitions = 3)
    val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\r\\n"))
    val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
    wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)
    //3.操作SequenceFile
    println("SequenceFile")
    val dataRDD2: RDD[(Int, String)] = sc.parallelize(List((2, "aa"), (3, "bb"), (4, "cc"), (5, "dd"), (6, "ee")))
    dataRDD2.saveAsSequenceFile("D:\\data\\spark\\SequenceFile")
    val sdata: RDD[(Int, String)] = sc.sequenceFile[Int, String]("D:\\data\\spark\\SequenceFile\\*")
    sdata.collect().foreach(println)
    //4.操作ObjectFile
    println("ObjectFile")
    val dataRDD3 = sc.parallelize(List((2, "aa"), (3, "bb"), (4, "cc"), (5, "dd"), (6, "ee")))
    dataRDD3.saveAsObjectFile("D:\\data\\spark\\ObjectFile")
    val objRDD = sc.objectFile[(Int, String)]("D:\\data\\spark\\ObjectFile\\*")
    objRDD.collect().foreach(println)
    sc.stop()
  }
}
package cn.itcast.core
import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object DataSourceTest2 {
  def main(args: Array[String]): Unit = {
    val config = new SparkConf().setAppName("DataSourceTest").setMaster("local[*]")
    val sc = new SparkContext(config)
    sc.setLogLevel("WARN")
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181")
    val fruitTable = TableName.valueOf("fruit")
    val tableDescr = new HTableDescriptor(fruitTable)
    tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
    val admin = new HBaseAdmin(conf)
    if (admin.tableExists(fruitTable)) {
      admin.disableTable(fruitTable)
      admin.deleteTable(fruitTable)
    }
    admin.createTable(tableDescr)
    def convert(triple: (String, String, String)) = {
      val put = new Put(Bytes.toBytes(triple._1))
      put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
      put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
      (new ImmutableBytesWritable, put)
    }
    val dataRDD: RDD[(String, String, String)] = sc.parallelize(List(("1","apple","11"), ("2","banana","12"), ("3","pear","13")))
    val targetRDD: RDD[(ImmutableBytesWritable, Put)] = dataRDD.map(convert)
    val jobConf = new JobConf(conf)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit")
    //写入数据
    targetRDD.saveAsHadoopDataset(jobConf)
    println("写入数据成功")
    //读取数据
    conf.set(TableInputFormat.INPUT_TABLE, "fruit")
    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    val count: Long = hbaseRDD.count()
    println("hBaseRDD RDD Count:"+ count)
    hbaseRDD.foreach {
      case (_, result) =>
        val key = Bytes.toString(result.getRow)
        val name = Bytes.toString(result.getValue("info".getBytes, "name".getBytes))
        val color = Bytes.toString(result.getValue("info".getBytes, "price".getBytes))
        println("Row key:" + key + " Name:" + name + " Color:" + color)
    }
    sc.stop()
  }
}

四、SparkSQL 入门详解

1、Spark SQL概述

1.1 Spark SQL官方介绍

官网

http://spark.apache.org/sql/

7d87af684d7b7b15cc640fb93171aa07.png

Spark SQL是Spark用来处理结构化数据的一个模块。
Spark SQL还提供了多种使用方式,包括DataFrames API和Datasets API。但无论是哪种API或者是编程语言,它们都是基于同样的执行引擎,因此你可以在不同的API之间随意切换,它们各有各的特点。
1.2 Spark SQL 的特点
1.易整合
可以使用java、scala、python、R等语言的API操作。
2.统一的数据访问
连接到任何数据源的方式相同。
3.兼容Hive
支持hiveHQL的语法。
兼容hive(元数据库、SQL语法、UDF、序列化、反序列化机制)
4.标准的数据连接
可以使用行业标准的JDBC或ODBC连接。
1.3 SQL优缺点
1)SQL的优点
表达非常清晰, 比如说这段 SQL 明显就是为了查询三个字段,条件是查询年龄大于 10 岁的
难度低、易学习。
2)SQL的缺点
复杂分析,SQL嵌套较多:试想一下3层嵌套的 SQL维护起来应该挺力不从心的吧
机器学习较难:试想一下如果使用SQL来实现机器学习算法也挺为难的吧
1.4 Hive和SparkSQL
Hive是将SQL转为MapReduce
SparkSQL可以理解成是将SQL解析成'RDD' + 优化再执行

45ada2628f63beaf35e46bf553f1885f.png

1.5 Spark SQL数据抽象

1.5.1 DataFrame

什么是DataFrame?

DataFrame是一种以RDD为基础的带有Schema元信息的分布式数据集,类似于传统数据库的二维表格 。


3af0dbbf801c4ce14a780bf92d242f43.png


1.5.2 DataSet

什么是DataSet?

DataSet是保存了更多的描述信息,类型信息的分布式数据集。
与RDD相比,保存了更多的描述信息,概念上等同于关系型数据库中的二维表。
与DataFrame相比,保存了类型信息,是强类型的,提供了编译时类型检查,
调用Dataset的方法先会生成逻辑计划,然后被spark的优化器进行优化,最终生成物理计划,然后提交到集群中运行!

1f23ca33cbcbdfbad7051e589723b3f5.png

DataSet包含了DataFrame的功能,
Spark2.0中两者统一,DataFrame表示为DataSet[Row],即DataSet的子集。
DataFrame其实就是Dateset[Row]

97c1d6e5fe4973dad1ce8968e6ca79be.png

1.5.3 RDD、DataFrame、DataSet的区别结构图解

4d3f05efc7a2c4dd18627bef0f3b4b93.png

RDD[Person]
以Person为类型参数,但不了解 其内部结构。
DataFrame
提供了详细的结构信息schema列的名称和类型。这样看起来就像一张表了
DataSet[Person]
![](https://files.mdnice.com/user/37735/a21c2a86-9dbc-41cc-837f-2ff8c538d01a.png)
不光有schema信息,还有类型信息

2、Spark SQL初体验

2.1 入口-SparkSession
在spark2.0版本之前
SQLContext是创建DataFrame和执行SQL的入口
HiveContext通过hive sql语句操作hive表数据,兼容hive操作,hiveContext继承自SQLContext。
在spark2.0之后
SparkSession 封装了SqlContext及HiveContext所有功能。通过SparkSession还可以获取到SparkConetxt。
SparkSession可以执行SparkSQL也可以执行HiveSQL.

b4f4bd81113f095b084e97ebd56c7f41.png

2.2. 创建DataFrame

2.2.1. 创读取文本文件

1.在本地创建一个文件,有id、name、age三列,用空格分隔,然后上传到hdfs上
vim /root/person.txt
1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40
上传数据文件到HDFS上:
hadoop fs -put /root/person.txt  /
2.在spark shell执行下面命令,读取数据,将每一行的数据使用列分隔符分割
打开spark-shell 
/export/servers/spark/bin/spark-shell 
创建RDD
val lineRDD= sc.textFile("hdfs://node01:8020/person.txt").map(_.split(" ")) 
//RDD[Array[String]]
3.定义case class(相当于表的schema)
case class Person(id:Int, name:String, age:Int)
4.将RDD和case class关联
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) 
//RDD[Person]
5.将RDD转换成DataFrame
val personDF = personRDD.toDF 
//DataFrame
6.查看数据和schema
personDF.show
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan| 20|
|  2|    lisi| 29|
|  3|  wangwu| 25|
|  4| zhaoliu| 30|
|  5|  tianqi| 35|
|  6|    kobe| 40|
+---+--------+---+
personDF.printSchema
7.注册表
personDF.createOrReplaceTempView("t_person")
8.执行SQL
spark.sql("select id,name from t_person where id > 3").show
9.也可以通过SparkSession构建DataFrame
val dataFrame=spark.read.text("hdfs://node01:8020/person.txt") 
dataFrame.show //注意:直接读取的文本文件没有完整schema信息
dataFrame.printSchema

2.2.2 读取json文件

1.数据文件
使用spark安装包下的json文件
more /export/servers/spark/examples/src/main/resources/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
2.在spark shell执行下面命令,读取数据
val jsonDF= spark.read.json("file:///export/servers/spark/examples/src/main/resources/people.json")
3.接下来就可以使用DataFrame的函数操作
jsonDF.show 
//注意:直接读取json文件有schema信息,因为json文件本身含有Schema信息,SparkSQL可以自动解析

2.2.3. 读取parquet文件

1.数据文件
使用spark安装包下的parquet文件
more /export/servers/spark/examples/src/main/resources/users.parquet
2.在spark shell执行下面命令,读取数据
val parquetDF=spark.read.parquet("file:///export/servers/spark/examples/src/main/resources/users.parquet")
3.接下来就可以使用DataFrame的函数操作
parquetDF.show 
//注意:直接读取parquet文件有schema信息,因为parquet文件中保存了列的信息
2.3 创建DataSet
1.通过spark.createDataset创建Dataset
val fileRdd = sc.textFile("hdfs://node01:8020/person.txt") //RDD[String]
val ds1 = spark.createDataset(fileRdd)  //DataSet[String] 
ds1.show
2.通RDD.toDS方法生成DataSet
case class Person(name:String, age:Int)
val data = List(Person("zhangsan",20),Person("lisi",30)) //List[Person]
val dataRDD = sc.makeRDD(data)
val ds2 = dataRDD.toDS  //Dataset[Person]
ds2.show
3.通过DataFrame.as[泛型]转化生成DataSet
case class Person(name:String, age:Long)
val jsonDF= spark.read.json("file:///export/servers/spark/examples/src/main/resources/people.json")
val jsonDS = jsonDF.as[Person] //DataSet[Person]
jsonDS.show
4.DataSet也可以注册成表进行查询
jsonDS.createOrReplaceTempView("t_person")
spark.sql("select * from t_person").show
2.4 两种查询风格[先了解]

2.4.1 准备工作

读取文件并转换为DataFrame或DataSet
val lineRDD= sc.textFile("hdfs://node01:8020/person.txt").map(_.split(" "))
case class Person(id:Int, name:String, age:Int)
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
val personDF = personRDD.toDF
personDF.show
//val personDS = personRDD.toDS
//personDS.show

2.4.2 DSL风格

SparkSQL提供了一个领域特定语言(DSL)以方便操作结构化数据

1.查看name字段的数据
personDF.select(personDF.col("name")).show
personDF.select(personDF("name")).show
personDF.select(col("name")).show
personDF.select("name").show
2.查看name和age字段数据
personDF.select("name", "age").show
3.查询所有的name和age,并将age+1
personDF.select(personDF.col("name"), personDF.col("age") + 1).show
personDF.select(personDF("name"), personDF("age") + 1).show
personDF.select(col("name"), col("age") + 1).show
personDF.select("name","age").show
//personDF.select("name", "age"+1).show 
personDF.select($"name",$"age",$"age"+1).show
4.过滤age大于等于25的,使用filter方法过滤
personDF.filter(col("age") >= 25).show
personDF.filter($"age" >25).show
5.统计年龄大于30的人数
personDF.filter(col("age")>30).count()
personDF.filter($"age" >30).count()
6.按年龄进行分组并统计相同年龄的人数
personDF.groupBy("age").count().show

2.4.3 SQL风格

DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql() 来执行SQL查询,结果将作为一个DataFrame返回

如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:
personDF.createOrReplaceTempView("t_person")
spark.sql("select * from t_person").show
1.显示表的描述信息
spark.sql("desc t_person").show
2.查询年龄最大的前两名
spark.sql("select * from t_person order by age desc limit 2").show
3.查询年龄大于30的人的信息
spark.sql("select * from t_person where age > 30 ").show
4.使用SQL风格完成DSL中的需求
spark.sql("select name, age + 1 from t_person").show
spark.sql("select name, age from t_person where age > 25").show
spark.sql("select count(age) from t_person where age > 30").show
spark.sql("select age, count(age) from t_person group by age").show
2.5 总结
1.DataFrame和DataSet都可以通过RDD来进行创建
2.也可以通过读取普通文本创建–注意:直接读取没有完整的约束,需要通过RDD+Schema
3.通过josn/parquet会有完整的约束
4.不管是DataFrame还是DataSet都可以注册成表,之后就可以使用SQL进行查询了! 也可以使用DSL!
相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
运维 前端开发 安全
万字长文搞懂产品模式和项目模式
万字长文搞懂产品模式和项目模式
196 0
|
8月前
|
存储 编译器 C++
【非常详细!】QT基础【二万字长文】
【非常详细!】QT基础【二万字长文】
|
消息中间件 分布式计算 Kafka
Spark面试干货总结!(8千字长文、27个知识点、21张图)
Spark面试干货总结!(8千字长文、27个知识点、21张图)
332 1
|
存储 编解码 自然语言处理
史诗级计算机字符编码知识分享,万字长文,一文即懂!
前一阵跟同事碰到了字符乱码的问题,了解后发现这个问题存在两年了,我们程序员每天都在跟编码打交道,但大家对字符编码都是一知半解:“天天吃猪肉却很少见过猪跑”,今天我就把它彻底讲透!
10687 3
|
存储 Java 编译器
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!3
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!
|
存储 JSON 安全
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!1
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!
|
SQL 分布式计算 监控
5万字长文!搞定Spark方方面面(六)
5万字长文!搞定Spark方方面面
215 0
5万字长文!搞定Spark方方面面(六)
|
消息中间件 存储 分布式计算
5万字长文!搞定Spark方方面面(七)
5万字长文!搞定Spark方方面面
229 0
5万字长文!搞定Spark方方面面(七)
|
分布式计算 资源调度 并行计算
5万字长文!搞定Spark方方面面(二)
5万字长文!搞定Spark方方面面
289 0
5万字长文!搞定Spark方方面面(二)
|
分布式计算 资源调度 算法
5万字长文!搞定Spark方方面面(一)
5万字长文!搞定Spark方方面面
398 0
5万字长文!搞定Spark方方面面(一)

热门文章

最新文章