开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段:SparkSQL 读写_JDBC_写入数据】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/690/detail/12065
SparkSQL 读写_JDBC_写入数据
内容介绍:
一、步骤
二、实操
通过上节课的学习,我们配置好了相关的环境,就可以使用 SparkSQL 访问 MySQL 当中的数据了。
在实际操作中,应该先写数据再读数据,因为先把数据写到数据库里,再使用 SparkSQL 读取,这样读出的数据才会比较多。本节课就学习直接使用 SparkSQL 来去写入数据。
一、步骤
1、创建 SparkSession
无论是编写何种程序,第一步都应创建创建 SparkSession。
val spark=SparkSession
.builder()
.appName("hive example")
.master("local[6]")
.getOrCreate()
2、读取数据
要读取数据,首先要先有数据集,才能将其写到 MySQL 中,故要先去读该数据集 studenttab10k,因为创建表时也是使用这个 schema 来完成的,因此,此时需要去指定一个 schema。
val schema=StructType(
List(
StructField("name",StringType),
StructField("age",IntegerType),
StructField("gpa",FloatType)
)
)
val studentDF=spark.read
.option("delimiter","\t")
.schema(schema)
.csv("dataset/studenttabl0k")
3、数据写入
studentDF.write.format("jdbc").mode(SaveMode.Overwrite)
.option("url","jdbc:mysql://node01:3306/spark test")
.option("dbtable","student")
.option("user","spark")
.option("password","Spark123!")
.save()
先指定 format 是 JDBC,指定写入模式,最后 save,这是一个标准的场景。
但是要解决数据应写在哪里、URL 是什么、表应该是怎样的等一系列问题,还需要通过一些参数配置信息,配置这些信息的参数在这里都有显示。
首先,可以通过 url 指定要连接的 JDBC 的 url;也通过 dbtable 可以指定要访问的表;在读取数据时,可以使用 fetchsize 来进行数据抓取大小设置,即一次最多抓取的数据的条数;在进行数据写入时,可以使用 batchsize 规定一次最多写的数据的条数;最后可以通过 isolation level 来指定事物隔离级别。
事物隔离级别不是经常设置的参数,因为数据库一般都有自己默认的,如果有进行修改的需求,也一般会改一些公共的,如改数据库或者默认的参数。
二、实操
进入到 idea 实现步骤。首先创建 Scala Class,并将其指定为 object,为其命名为 MySQLWrite。
package cn.itcast.spark.sql
//MySQL 的访问方式有两种,一种是使用本地运行的方式运行,另一种是提交到集群中运行。设置这样两种不同的访问方式的原因在于在本地运行和提交到集群中运行的部分内容存在差异。
//为了能体现这两种访问方式,在写入 MySQL 数据时,使用两种访问方式操作,即使用本地运行的访问方式,而读取数据时使用集群运行。
object MySQLWrite(
//直接定义一个 object:MySQLWrite
def main(args:Array[string]):unit = (
//创建 main 方法
//1、创建 SparkSession 对象
val spark = New Spark/SparkSession.builder
//可以使用 New Spark 或直接使用 SparkSession 的静态方法 builder 来直接获取 builder 对象
.master("local[6]")
//设置 builder 对象的 master 为 local[6]
.appName("mysql write")
//设置 appName 为 mysql write
.gettoCreate()
//通过 builder 对象创建 SparkSession 对象
//2、从文件中读取数据,创建 DateFrame
(1)拷贝文件
进入到/spark/Files/Dataset/,将 studenttab10k 文件拷贝到 idea 中,放在 Dataset 目录下。打开该文件,依旧没有相关的 schema 信息,因此,还需要手动指定 schema,并指定分隔符 delimiter 为 1 个 tab。
(2)读取
val schema = StructType(
//通过创建 StructType 来创建 schema,注意 StructType 在导入包时候应导入的是 sql.types 下的 StructType,而不是 ColumMetaData 下的 StructType
List(
StructField("name",StringType),
StructField("age",IntegerType),
StructField("gpa",FloatType)
)
//在这个 play 方法中,应传入 List,在 List 中使用 StructField 指定每一列的信息,在打包 StructField 时要注意导入 spark 的包
)
val df = spark.read
.schema(schema)
//指定 schema 为 schema
.option("delimiter","\t")
//指定 option,指定读取文件的制表符为\t
.csv("dataset/studenttabl0k")
//指定路径
//3、处理数据,是一般的数据处理流程
val resultDF = df.where("age < 30")
//where 中不只可以使用表达式,也可使用 sql 的字串
//创建了变量 resultDF,获取到了 DateFrame
//4、落地数据,即将数据落地到 MySQL 中
//将 DateFrame 落地到数据库中,并指定相关信息
resultDF.write
.format("jdbc")
//指定 format 是 jdbc
.option("url"jdbc:mysql://node01:3306/spark02")
//指定 url 为 jdbc:mysql://node01:3306
//指定数据库名为 spark02
.option("dbtable","student")
//指定数据最终落地的表 dbtable 为 student
.option("user","spark03")
//指定用户名,即访问的方式为 spark
.option("password","Spark03!")
//指定密码 password 为“Spark03!”
.save()
//保存
5、运行纠错
运行这段代码,运行结果显示有两处错误。
其一是 Table or view “student”already exist,即表 student 已经存在,则应指定写入模式是追加或是覆盖。因此,可以通过在 save(保存)之前指定写入方式直接覆盖掉原 student 表,即:
.mode(Savemode.OverWrite)
其二是 java.sql.SQL exception:No suitable driver,即没有 driver。现在要访问 sql,但编码没有给定 driver,因此要指定 driver。
driver 实际上是 Mysql 的 JDBC 的 driver。如果是本地运行,那么就要去修改 pom 文件。打开 pom 文件指定 dependency。
将 Maven 依赖的 dependency 拷贝到 pom 文件中的任意位置,且版本为 5.1.47:
mysql
mysql-connector-
java5.1.47
等待加载结束,如果加载时间较长,可以使用 Maven 进行 compare 或 package,使用 Maven 加载会更快一些,而单使用 idea 加载速度较慢。
再次运行程序,运行结束后,进入代码编辑进行相应的查看。
mysql -u root -p ;
输入密码之后,再输入代码进行查询:
select * from spark02.student limit100;
可以发现数据中人的年龄全小于 30 岁,说明数据处理没有任何问题。