SparkSQL 读写_JDBC_写入数据 | 学习笔记

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 快速学习 SparkSQL 读写_JDBC_写入数据

开发者学堂课程【大数据 Spark 2020版(知识精讲与实战演练)第三阶段SparkSQL 读写_JDBC_写入数据】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/690/detail/12065


SparkSQL 读写_JDBC_写入数据

 

内容介绍:

一、步骤

二、实操

 

通过上节课的学习,我们配置好了相关的环境,就可以使用 SparkSQL 访问 MySQL 当中的数据了。

在实际操作中,应该先写数据再读数据,因为先把数据写到数据库里,再使用 SparkSQL 读取,这样读出的数据才会比较多。本节课就学习直接使用 SparkSQL 来去写入数据。

 

一、步骤

1、创建 SparkSession

无论是编写何种程序,第一步都应创建创建 SparkSession。

val spark=SparkSession

.builder()

.appName("hive example")

.master("local[6]")

.getOrCreate()

2、读取数据

要读取数据,首先要先有数据集,才能将其写到 MySQL 中,故要先去读该数据集 studenttab10k,因为创建表时也是使用这个 schema 来完成的,因此,此时需要去指定一个 schema。

val schema=StructType(

List(

StructField("name",StringType),

StructField("age",IntegerType),

StructField("gpa",FloatType)

)

)

val studentDF=spark.read

.option("delimiter","\t")

.schema(schema)

.csv("dataset/studenttabl0k")

3、数据写入

studentDF.write.format("jdbc").mode(SaveMode.Overwrite)

.option("url","jdbc:mysql://node01:3306/spark test")

.option("dbtable","student")

.option("user","spark")

.option("password","Spark123!")

.save()

先指定 format 是 JDBC,指定写入模式,最后 save,这是一个标准的场景。

但是要解决数据应写在哪里、URL 是什么、表应该是怎样的等一系列问题,还需要通过一些参数配置信息,配置这些信息的参数在这里都有显示。

 

image.png

首先,可以通过 url 指定要连接的 JDBC 的 url;也通过 dbtable 可以指定要访问的表;在读取数据时,可以使用 fetchsize 来进行数据抓取大小设置,即一次最多抓取的数据的条数;在进行数据写入时,可以使用 batchsize 规定一次最多写的数据的条数;最后可以通过 isolation level 来指定事物隔离级别。

事物隔离级别不是经常设置的参数,因为数据库一般都有自己默认的,如果有进行修改的需求,也一般会改一些公共的,如改数据库或者默认的参数。

 

二、实操

进入到 idea 实现步骤。首先创建 Scala Class,并将其指定为 object,为其命名为 MySQLWrite。

package cn.itcast.spark.sql

//MySQL 的访问方式有两种,一种是使用本地运行的方式运行,另一种是提交到集群中运行。设置这样两种不同的访问方式的原因在于在本地运行和提交到集群中运行的部分内容存在差异。

//为了能体现这两种访问方式,在写入 MySQL 数据时,使用两种访问方式操作,即使用本地运行的访问方式,而读取数据时使用集群运行。

object MySQLWrite(

//直接定义一个 object:MySQLWrite

def main(args:Array[string]):unit = (

//创建 main 方法

//1、创建 SparkSession 对象

val spark = New Spark/SparkSession.builder

//可以使用 New Spark 或直接使用 SparkSession 的静态方法 builder 来直接获取 builder 对象

.master("local[6]")

//设置 builder 对象的 master 为 local[6]

.appName("mysql write")

//设置 appName 为 mysql write

.gettoCreate()

//通过 builder 对象创建 SparkSession 对象

//2、从文件中读取数据,创建 DateFrame

(1)拷贝文件

进入到/spark/Files/Dataset/,将 studenttab10k 文件拷贝到 idea 中,放在 Dataset 目录下。打开该文件,依旧没有相关的 schema 信息,因此,还需要手动指定 schema,并指定分隔符 delimiter 为 1 个 tab。

(2)读取

val schema = StructType(

//通过创建 StructType 来创建 schema,注意 StructType 在导入包时候应导入的是 sql.types 下的 StructType,而不是 ColumMetaData 下的 StructType

List(

StructField("name",StringType),

StructField("age",IntegerType),

StructField("gpa",FloatType)

)

//在这个 play 方法中,应传入 List,在 List 中使用 StructField 指定每一列的信息,在打包 StructField 时要注意导入 spark 的包

)

val df = spark.read

.schema(schema)

//指定 schema 为 schema

.option("delimiter","\t")

//指定 option,指定读取文件的制表符为\t

.csv("dataset/studenttabl0k")

//指定路径

//3、处理数据,是一般的数据处理流程

val resultDF = df.where("age < 30")

//where 中不只可以使用表达式,也可使用 sql 的字串

//创建了变量 resultDF,获取到了 DateFrame

//4、落地数据,即将数据落地到 MySQL 中

//将 DateFrame 落地到数据库中,并指定相关信息

resultDF.write

.format("jdbc")

//指定 format 是 jdbc

.option("url"jdbc:mysql://node01:3306/spark02")

//指定 url 为 jdbc:mysql://node01:3306

//指定数据库名为 spark02

.option("dbtable","student")

//指定数据最终落地的表 dbtable 为 student

.option("user","spark03")

//指定用户名,即访问的方式为 spark

.option("password","Spark03!")

//指定密码 password 为“Spark03!”

.save()

//保存

5、运行纠

运行这段代码,运行结果显示有两处错误。

其一是 Table or view “student”already exist,即表 student 已经存在,则应指定写入模式是追加或是覆盖。因此,可以通过在 save(保存)之前指定写入方式直接覆盖掉原 student 表,即:

.mode(Savemode.OverWrite)

其二是 java.sql.SQL exception:No suitable driver,即没有 driver。现在要访问 sql,但编码没有给定 driver,因此要指定 driver。

driver 实际上是 Mysql 的 JDBC 的 driver。如果是本地运行,那么就要去修改 pom 文件。打开 pom 文件指定 dependency。

将 Maven 依赖的 dependency 拷贝到 pom 文件中的任意位置,且版本为 5.1.47:

mysql

mysql-connector-

java5.1.47

等待加载结束,如果加载时间较长,可以使用 Maven 进行 compare 或 package,使用 Maven 加载会更快一些,而单使用 idea 加载速度较慢。

再次运行程序,运行结束后,进入代码编辑进行相应的查看。

mysql -u root -p ;

输入密码之后,再输入代码进行查询:

select * from spark02.student limit100;

image.png

可以发现数据中人的年龄全小于 30 岁,说明数据处理没有任何问题。

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
2月前
|
安全 Java 数据库连接
jdbc解析excel文件,批量插入数据至库中
jdbc解析excel文件,批量插入数据至库中
21 0
|
4月前
|
Java 数据库连接 数据库
使用原生JDBC动态解析并获取表格列名和数据
使用原生JDBC动态解析并获取表格列名和数据
|
7月前
|
SQL Java 关系型数据库
JDBC插入数据详解
在Java应用程序中,与数据库交互是一项常见的任务。其中,插入数据操作是一种基本的数据库操作之一。本文将详细介绍如何使用Java JDBC(Java Database Connectivity)来执行插入数据操作。无论您是初学者还是有一定经验的开发人员,都能从本文中获得有关插入数据的重要信息。
98 0
|
7月前
|
SQL Java 大数据
Hive实战(03)-深入了解Hive JDBC:在大数据世界中实现数据交互
Hive实战(03)-深入了解Hive JDBC:在大数据世界中实现数据交互
239 1
|
11天前
|
Java 关系型数据库 MySQL
JDBC实现往MySQL插入百万级数据
JDBC实现往MySQL插入百万级数据
|
11天前
|
SQL Java 数据库连接
Springboot框架整合Spring JDBC操作数据
JDBC是Java数据库连接API,用于执行SQL并访问多种关系数据库。它包括一系列Java类和接口,用于建立数据库连接、创建数据库操作对象、定义SQL语句、执行操作并处理结果集。直接使用JDBC涉及七个步骤,包括加载驱动、建立连接、创建对象、定义SQL、执行操作、处理结果和关闭资源。Spring Boot的`spring-boot-starter-jdbc`简化了这些步骤,提供了一个在Spring生态中更便捷使用JDBC的封装。集成Spring JDBC需要添加相关依赖,配置数据库连接信息,并通过JdbcTemplate进行数据库操作,如插入、更新、删除和查询。
|
11天前
|
SQL Java 关系型数据库
JDBC批量插入mysql数据
JDBC批量插入mysql数据
|
12天前
|
Java 关系型数据库 MySQL
【JDBC编程】基于MySql的Java应用程序中访问数据库与交互数据的技术
【JDBC编程】基于MySql的Java应用程序中访问数据库与交互数据的技术
|
2月前
|
安全 Java 数据库连接
jdbc实现批量给多个表中更新数据(解析Excel表数据插入到数据库中)
jdbc实现批量给多个表中更新数据(解析Excel表数据插入到数据库中)
156 0
|
7月前
|
存储 Java 数据库连接
云数据仓库ADB不管是jdbc写入或者dts同步,均会存在丢数据的情况?
云数据仓库ADB不知道是不是磁盘出问题了不管是jdbc写入或者dts同步,均会存在丢数据的情况?
57 2