【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)

需要源码和依赖请点赞关注收藏后评论区留言私信~~~

一、Dataframe操作

步骤如下

1)利用IntelliJ IDEA新建一个maven工程,界面如下

2)修改pom.XML添加相关依赖包

3)在工程名处点右键,选择Open Module Settings

4)配置Scala Sdk,界面如下

5)新建文件夹scala,界面如下:

6) 将文件夹scala设置成Source Root,界面如下:

7) 新建scala类,界面如下:

此类主要功能是读取D盘下的people.txt文件,使用编程方式操作DataFrame,相关代码如下

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
case class Person(name:String,age:Long)
object sparkSqlSchema {
  def main(args: Array[String]): Unit = {
    //创建Spark运行环境
    val spark = SparkSession.builder().appName("sparkSqlSchema").master("local").getOrCreate()
    val sc = spark.sparkContext;
    //读取文件
    val data: RDD[Array[String]] = sc.textFile("D:/people.txt"). map (x => x.split(","));
    //将RDD与样例类关联
    val personRdd: RDD[Person] = data. map (x => Person(x(0),x(1).toLong))
    //手动导入隐式转换
    import spark.implicits._
    val personDF: DataFrame = personRdd.toDF
    //显示DataFrame的数据
    personDF.show()
    //显示DataFrame的schema信息
    personDF.printSchema()
    //显示DataFrame记录数
    println(personDF.count())
    //显示DataFrame的所有字段
    personDF.columns.foreach(println)
    //取出DataFrame的第一行记录
    println(personDF.head())
    //显示DataFrame中name字段的所有值
    personDF.select("name").show()
    //过滤出DataFrame中年龄大于20的记录
    personDF.filter($"age" > 20).show()
    //统计DataFrame中年龄大于20的人数
    println(personDF.filter($"age" > 20).count())
    //统计DataFrame中按照年龄进行分组,求每个组的人数
    personDF.groupBy("age").count().show()
    //将DataFrame注册成临时表
    personDF.createOrReplaceTempView("t_person")
    //传入sql语句,进行操作
    spark.sql("select * from t_person").show()
    spark.sql("select * from t_person where name='王五'").show()
    spark.sql("select * from t_person order by age desc").show()
    //DataFrame转换成Dataset
    var ds=personDF.as[Person]
    ds.show()
    //关闭操作
    sc.stop()
    spark.stop()
  }
}

二、Spark SQL读写MySQL数据库

下面的代码使用JDBC连接MySQL数据库,并进行读写操作 主要步骤如下

1:新建数据库

2:新建表

3:添加依赖包

4:新建类

5:查看运行结果

代码如下

import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode,SparkSession}
object sparkSqlMysql {
  def main(args: Array[String]): Unit = {
    //创建sparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .appName("sparkSqlMysql")
      .master("local")
      .getOrCreate()
    val sc = spark.sparkContext
    //读取数据
    val data: RDD[Array[String]] = sc.textFile("D:/people.txt").map(x => x.split(","));
    //RDD关联Person
    val personRdd: RDD[Person] = data.map(x => Person(x(0), x(1).toLong))
    //导入隐式转换
    import spark.implicits._
    //将RDD转换成DataFrame
    val personDF: DataFrame = personRdd.toDF()
    personDF.show()
    //创建Properties对象,配置连接mysql的用户名和密码
    val prop =new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","123456")
    //将personDF写入MySQL
    personDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/spark?useUnicode=true&characterEncoding=utf8","person",prop)
    //从数据库里读取数据
    val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/spark", "person",                   prop)
    mysqlDF.show()
    spark.stop()
  }
}

三、Spark SQL读写Hive

下面的示例程序连接Hive,并读写Hive下的表 主要步骤如下

1:在pom.xml中添加Hive依赖包

2:连接Hive

3:新建表

4:向Hive表写入数据,新scala类sparksqlToHIVE,主要功能是读取D盘下的people.txt文件,使用编程方式操作DataFrame,然后插入到HIVE的表中。

5:查看运行结果

代码如下

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame,SparkSession}
object  sparksqlToHIVE {
  def main(args: Array[String]): Unit = {
    //设置访问用户名,主要用于访问HDFS下的Hive warehouse目录
    System.setProperty("HADOOP_USER_NAME", "root")
    //创建sparkSession
    val spark: SparkSession = SparkSession.builder()
      .appName("sparksqlToHIVE")
      .config("executor-cores",1)
      .master("local")
      .enableHiveSupport() //开启支持Hive
      .getOrCreate()
    val sc = spark.sparkContext
    //读取文件
    val data: RDD[Array[String]] = sc.textFile("D:/people.txt"). map (x => x.split(","));
    //将RDD与样例类关联
    val personRdd: RDD[Person] = data. map (x => Person(x(0),x(1).toLong))
    //手动导入隐式转换
    import spark.implicits._
    val personDF: DataFrame = personRdd.toDF
    //显示DataFrame的数据
    personDF.show()
    //将DataFrame注册成临时表t_person
    personDF.createOrReplaceTempView("t_person")
    //显示临时表t_person的数据
    spark.sql("select * from t_person").show()
    //使用Hive中bigdata的数据库
    spark.sql("use bigdata")
    //将临时表t_person的数据插入使用Hive中bigdata数据库下的person表中
    spark.sql("insert into person select * from t_person")
    //显示用Hive中bigdata数据库下的person表数据
    spark.sql("select * from person").show()
    spark.stop()
  }
}

创作不易 觉得有帮助请点赞关注收藏~~~

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
存储 机器学习/深度学习 SQL
大数据处理与分析技术
大数据处理与分析技术
104 2
|
5天前
|
SQL Oracle 数据库
使用访问指导(SQL Access Advisor)优化数据库业务负载
本文介绍了Oracle的SQL访问指导(SQL Access Advisor)的应用场景及其使用方法。访问指导通过分析给定的工作负载,提供索引、物化视图和分区等方面的优化建议,帮助DBA提升数据库性能。具体步骤包括创建访问指导任务、创建工作负载、连接工作负载至访问指导、设置任务参数、运行访问指导、查看和应用优化建议。访问指导不仅针对单条SQL语句,还能综合考虑多条SQL语句的优化效果,为DBA提供全面的决策支持。
28 11
|
4天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
24 2
|
19天前
|
SQL 关系型数据库 MySQL
MySQL导入.sql文件后数据库乱码问题
本文分析了导入.sql文件后数据库备注出现乱码的原因,包括字符集不匹配、备注内容编码问题及MySQL版本或配置问题,并提供了详细的解决步骤,如检查和统一字符集设置、修改客户端连接方式、检查MySQL配置等,确保导入过程顺利。
|
17天前
|
SQL 监控 安全
SQL Servers审核提高数据库安全性
SQL Server审核是一种追踪和审查SQL Server上所有活动的机制,旨在检测潜在威胁和漏洞,监控服务器设置的更改。审核日志记录安全问题和数据泄露的详细信息,帮助管理员追踪数据库中的特定活动,确保数据安全和合规性。SQL Server审核分为服务器级和数据库级,涵盖登录、配置变更和数据操作等事件。审核工具如EventLog Analyzer提供实时监控和即时告警,帮助快速响应安全事件。
|
18天前
|
SQL 运维 大数据
轻量级的大数据处理技术
现代大数据应用架构中,数据中心作为核心,连接数据源与应用,承担着数据处理与服务的重要角色。然而,随着数据量的激增,数据中心面临运维复杂、体系封闭及应用间耦合性高等挑战。为缓解这些问题,一种轻量级的解决方案——esProc SPL应运而生。esProc SPL通过集成性、开放性、高性能、数据路由和敏捷性等特性,有效解决了现有架构的不足,实现了灵活高效的数据处理,特别适用于应用端的前置计算,降低了整体成本和复杂度。
|
27天前
|
机器学习/深度学习 存储 大数据
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系,保留最大方差信息,实现数据压缩、去噪及可视化。本文详解PCA原理、步骤及其Python实现,探讨其在图像压缩、特征提取等领域的应用,并指出使用时的注意事项,旨在帮助读者掌握这一强大工具。
63 4
|
1月前
|
机器学习/深度学习 存储 大数据
云计算与大数据技术的融合应用
云计算与大数据技术的融合应用
|
1月前
|
SQL 存储 大数据
单机顶集群的大数据技术来了
大数据时代,分布式数仓如MPP成为热门技术,但其高昂的成本让人望而却步。对于多数任务,数据量并未达到PB级,单体数据库即可胜任。然而,由于SQL语法的局限性和计算任务的复杂性,分布式解决方案显得更为必要。esProc SPL作为一种开源轻量级计算引擎,通过高效的算法和存储机制,实现了单机性能超越集群的效果,为低成本、高效能的数据处理提供了新选择。
|
27天前
|
SQL Java 数据库连接
canal-starter 监听解析 storeValue 不一样,同样的sql 一个在mybatis执行 一个在数据库操作,导致解析不出正确对象
canal-starter 监听解析 storeValue 不一样,同样的sql 一个在mybatis执行 一个在数据库操作,导致解析不出正确对象

热门文章

最新文章