Spark连接JDBC数据源

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS AI 助手,专业版
简介: 在实际的项目环境中,成熟的技术体系对关系型数据库的依赖远远超过hdfs,而且对大数据运算的结果,实践中也倾向于保存到数据库中,以便进行多种方式的可视化。所以本次实践主要完成spark从mysql中读取和写入数据。

在实际的项目环境中,成熟的技术体系对关系型数据库的依赖远远超过hdfs,而且对大数据运算的结果,实践中也倾向于保存到数据库中,以便进行多种方式的可视化。所以本次实践主要完成spark从mysql中读取和写入数据。一般这个操作有两种方式,一种是自己建立jdbc连接,像一般数据库操作一样的写法,一种就是利用spark自带的jdbc操作函数。

首先要把mysql jdbc connector的jar包上传到集群中每台机器的spark/jars目录,这是一个讨巧的办法,因为spark运行之前一定把这里面所有的jar都加到CALSS_PATH里面去了。

通过spark.read.jdbc读取出来的返回值是DataFrame,如下代码所示。`

val rfidCardMap = spark.read.jdbc(mysqlHelper.DB_URL_R,"t_rfid_card",Array("org_id="+ ORG_ID), mysqlHelper.PROPERTIES).map(row => {
  (row.getAs[String]("card_id"), row.getAs[String]("card_label"))
}).rdd.collect() toMap`

此函数需要传入参数依次为:数据库连接url,表名,过滤条件表达式列表,带有用户名密码信息的属性对象。读取了数据之后,形成一个(String,String)对象返回。这里有两个要注意的:

  1. getAs的类型必须和数据库中列的类型严格匹配
  2. 返回元组类型的对象比返回自定义类的对象写法要轻松一些。如果是返回自定义类的对象,编译会出错,一般说法是语句之前加入import spark.implicits._会有效,但未必见得。尚待进一步探索。

如下是一个比较复杂的解析处理代码示例。`

val teamWeightMapRDD = dfMedicalWaste.map(row => {
  (rfidCardMap.get(row.getAs[String]("team_id")) toString,
  sdf.format(new Date(row.getAs[Timestamp]("rec_ts").getTime)) toInt,
  row.getAs[Double]("mw_weight"))
}).rdd.cache()`

这里sdf就是java里面常用的SimpleDateFormat,它把一个时间戳字段转化成了6个长度的整型。

处理完成后,将结果回写数据库时采用的是本地jdbc连接写法,这块内容很普通了。

这次实践有个特别清晰的理解就是scala的类型推断,由于要统计某个地点一段时间之内的产量总和、平均产量、最大和最小单位时间产量,使用到了DoubleRDDFunctions,代码如下:`

val weightArrayRDD = teamWeightMapRDD.filter(teamWeight => {
  teamWeight._1 == teamName && teamWeight._2 >= week._1 && teamWeight._2 < week._2
}).map(teamWeight => {
  (teamWeight._2, teamWeight._3)
}).reduceByKey((a, b) =>
  a + b
).map(item => {
  item._2
}).cache()`

使用的时候如下:`

line.append(weightArrayRDD sum).append("\t")
line.append(weightArrayRDD mean).append("\t")
line.append(weightArrayRDD max).append("\t")
line.append(weightArrayRDD min).append("\t")`

scala会根据返回值类型进行类型推断,从而匹配可以使用的函数,同样是RDD或者DataFram,包含的类型不同,可以使用的函数也不同,这一切都是透明的。

相关实践学习
自建数据库迁移到云数据库
本场景将引导您将网站的自建数据库平滑迁移至云数据库RDS。通过使用RDS,您可以获得稳定、可靠和安全的企业级数据库服务,可以更加专注于发展核心业务,无需过多担心数据库的管理和维护。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
Java 数据库连接 数据库
【YashanDB知识库】WAS配置YashanDB JDBC连接
【YashanDB知识库】WAS配置YashanDB JDBC连接
|
安全 Java 数据库连接
gbase8a JDBC常用特性-Kerberos认证方式连接
JDBC常用特性-Kerberos认证方式连接
|
Java 数据库连接 网络安全
JDBC常用特性-SSH隧道连接
JDBC常用特性-SSH隧道连接
|
Java 数据库连接
JDBC连接复习
JDBC连接复习
203 1
|
SQL Java 数据库连接
Java开发者必知:JDBC连接数据库的“三大法宝”
Java开发者必知:JDBC连接数据库的“三大法宝”
168 7
|
SQL Java 数据库连接
JDBC连接SQL Server2008 完成增加、删除、查询、修改等基本信息基本格式及示例代码
这篇文章提供了使用JDBC连接SQL Server 2008数据库进行增加、删除、查询和修改操作的基本步骤和示例代码。
|
SQL 存储 Java
完整java开发中JDBC连接数据库代码和步骤
该博客文章详细介绍了使用JDBC连接数据库的完整步骤,包括加载JDBC驱动、提供连接URL、创建数据库连接、执行SQL语句、处理结果以及关闭JDBC对象的过程,并提供了相应的示例代码。
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
380 0
|
Java 关系型数据库 MySQL
使用JDBC连接ADB
【8月更文挑战第6天】
475 0
|
分布式计算 Java 数据库连接