SparkSQL 读写_JDBC_写入数据 | 学习笔记

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 快速学习 SparkSQL 读写_JDBC_写入数据

开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段SparkSQL 读写_JDBC_写入数据】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/690/detail/12065


SparkSQL 读写_JDBC_写入数据

 

内容介绍:

一、步骤

二、实操

 

通过上节课的学习,我们配置好了相关的环境,就可以使用 SparkSQL 访问 MySQL 当中的数据了。

在实际操作中,应该先写数据再读数据,因为先把数据写到数据库里,再使用 SparkSQL 读取,这样读出的数据才会比较多。本节课就学习直接使用 SparkSQL 来去写入数据。

 

一、步骤

1、创建 SparkSession

无论是编写何种程序,第一步都应创建创建 SparkSession。

val spark=SparkSession

.builder()

.appName("hive example")

.master("local[6]")

.getOrCreate()

2、读取数据

要读取数据,首先要先有数据集,才能将其写到 MySQL 中,故要先去读该数据集 studenttab10k,因为创建表时也是使用这个 schema 来完成的,因此,此时需要去指定一个 schema。

val schema=StructType(

List(

StructField("name",StringType),

StructField("age",IntegerType),

StructField("gpa",FloatType)

)

)

val studentDF=spark.read

.option("delimiter","\t")

.schema(schema)

.csv("dataset/studenttabl0k")

3、数据写入

studentDF.write.format("jdbc").mode(SaveMode.Overwrite)

.option("url","jdbc:mysql://node01:3306/spark test")

.option("dbtable","student")

.option("user","spark")

.option("password","Spark123!")

.save()

先指定 format 是 JDBC,指定写入模式,最后 save,这是一个标准的场景。

但是要解决数据应写在哪里、URL 是什么、表应该是怎样的等一系列问题,还需要通过一些参数配置信息,配置这些信息的参数在这里都有显示。

 

image.png

首先,可以通过 url 指定要连接的 JDBC 的 url;也通过 dbtable 可以指定要访问的表;在读取数据时,可以使用 fetchsize 来进行数据抓取大小设置,即一次最多抓取的数据的条数;在进行数据写入时,可以使用 batchsize 规定一次最多写的数据的条数;最后可以通过 isolation level 来指定事物隔离级别。

事物隔离级别不是经常设置的参数,因为数据库一般都有自己默认的,如果有进行修改的需求,也一般会改一些公共的,如改数据库或者默认的参数。

 

二、实操

进入到 idea 实现步骤。首先创建 Scala Class,并将其指定为 object,为其命名为 MySQLWrite。

package cn.itcast.spark.sql

//MySQL 的访问方式有两种,一种是使用本地运行的方式运行,另一种是提交到集群中运行。设置这样两种不同的访问方式的原因在于在本地运行和提交到集群中运行的部分内容存在差异。

//为了能体现这两种访问方式,在写入 MySQL 数据时,使用两种访问方式操作,即使用本地运行的访问方式,而读取数据时使用集群运行。

object MySQLWrite(

//直接定义一个 object:MySQLWrite

def main(args:Array[string]):unit = (

//创建 main 方法

//1、创建 SparkSession 对象

val spark = New Spark/SparkSession.builder

//可以使用 New Spark 或直接使用 SparkSession 的静态方法 builder 来直接获取 builder 对象

.master("local[6]")

//设置 builder 对象的 master 为 local[6]

.appName("mysql write")

//设置 appName 为 mysql write

.gettoCreate()

//通过 builder 对象创建 SparkSession 对象

//2、从文件中读取数据,创建 DateFrame

(1)拷贝文件

进入到/spark/Files/Dataset/,将 studenttab10k 文件拷贝到 idea 中,放在 Dataset 目录下。打开该文件,依旧没有相关的 schema 信息,因此,还需要手动指定 schema,并指定分隔符 delimiter 为 1 个 tab。

(2)读取

val schema = StructType(

//通过创建 StructType 来创建 schema,注意 StructType 在导入包时候应导入的是 sql.types 下的 StructType,而不是 ColumMetaData 下的 StructType

List(

StructField("name",StringType),

StructField("age",IntegerType),

StructField("gpa",FloatType)

)

//在这个 play 方法中,应传入 List,在 List 中使用 StructField 指定每一列的信息,在打包 StructField 时要注意导入 spark 的包

)

val df = spark.read

.schema(schema)

//指定 schema 为 schema

.option("delimiter","\t")

//指定 option,指定读取文件的制表符为\t

.csv("dataset/studenttabl0k")

//指定路径

//3、处理数据,是一般的数据处理流程

val resultDF = df.where("age < 30")

//where 中不只可以使用表达式,也可使用 sql 的字串

//创建了变量 resultDF,获取到了 DateFrame

//4、落地数据,即将数据落地到 MySQL 中

//将 DateFrame 落地到数据库中,并指定相关信息

resultDF.write

.format("jdbc")

//指定 format 是 jdbc

.option("url"jdbc:mysql://node01:3306/spark02")

//指定 url 为 jdbc:mysql://node01:3306

//指定数据库名为 spark02

.option("dbtable","student")

//指定数据最终落地的表 dbtable 为 student

.option("user","spark03")

//指定用户名,即访问的方式为 spark

.option("password","Spark03!")

//指定密码 password 为“Spark03!”

.save()

//保存

5、运行纠

运行这段代码,运行结果显示有两处错误。

其一是 Table or view “student”already exist,即表 student 已经存在,则应指定写入模式是追加或是覆盖。因此,可以通过在 save(保存)之前指定写入方式直接覆盖掉原 student 表,即:

.mode(Savemode.OverWrite)

其二是 java.sql.SQL exception:No suitable driver,即没有 driver。现在要访问 sql,但编码没有给定 driver,因此要指定 driver。

driver 实际上是 Mysql 的 JDBC 的 driver。如果是本地运行,那么就要去修改 pom 文件。打开 pom 文件指定 dependency。

将 Maven 依赖的 dependency 拷贝到 pom 文件中的任意位置,且版本为 5.1.47:

mysql

mysql-connector-

java5.1.47

等待加载结束,如果加载时间较长,可以使用 Maven 进行 compare 或 package,使用 Maven 加载会更快一些,而单使用 idea 加载速度较慢。

再次运行程序,运行结束后,进入代码编辑进行相应的查看。

mysql -u root -p ;

输入密码之后,再输入代码进行查询:

select * from spark02.student limit100;

image.png

可以发现数据中人的年龄全小于 30 岁,说明数据处理没有任何问题。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
37 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
87 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
40 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
48 0
|
3月前
|
SQL druid Java
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(下)
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)
58 3
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(下)
|
3月前
|
SQL Java 关系型数据库
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(上)
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)
138 3
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(上)
|
6月前
|
Java 关系型数据库 MySQL
JDBC实现往MySQL插入百万级数据
JDBC实现往MySQL插入百万级数据
|
3月前
|
SQL 关系型数据库 MySQL
Java数据库部分(MySQL+JDBC)(一、MySQL超详细学习笔记)(下)
Java数据库部分(MySQL+JDBC)(一、MySQL超详细学习笔记)
39 6
|
3月前
|
存储 关系型数据库 MySQL
Java数据库部分(MySQL+JDBC)(一、MySQL超详细学习笔记)(上)
Java数据库部分(MySQL+JDBC)(一、MySQL超详细学习笔记)
50 4
|
3月前
|
SQL 关系型数据库 MySQL
Java数据库部分(MySQL+JDBC)(一、MySQL超详细学习笔记)(中)
Java数据库部分(MySQL+JDBC)(一、MySQL超详细学习笔记)
31 3
下一篇
无影云桌面