【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)

本文涉及的产品
RDS AI 助手,专业版
RDS Agent(兼容OpenClaw),2核4GB
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: 【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)

需要源码和依赖请点赞关注收藏后评论区留言私信~~~

一、Dataframe操作

步骤如下

1)利用IntelliJ IDEA新建一个maven工程,界面如下

2)修改pom.XML添加相关依赖包

3)在工程名处点右键,选择Open Module Settings

4)配置Scala Sdk,界面如下

5)新建文件夹scala,界面如下:

6) 将文件夹scala设置成Source Root,界面如下:

7) 新建scala类,界面如下:

此类主要功能是读取D盘下的people.txt文件,使用编程方式操作DataFrame,相关代码如下

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
case class Person(name:String,age:Long)
object sparkSqlSchema {
  def main(args: Array[String]): Unit = {
    //创建Spark运行环境
    val spark = SparkSession.builder().appName("sparkSqlSchema").master("local").getOrCreate()
    val sc = spark.sparkContext;
    //读取文件
    val data: RDD[Array[String]] = sc.textFile("D:/people.txt"). map (x => x.split(","));
    //将RDD与样例类关联
    val personRdd: RDD[Person] = data. map (x => Person(x(0),x(1).toLong))
    //手动导入隐式转换
    import spark.implicits._
    val personDF: DataFrame = personRdd.toDF
    //显示DataFrame的数据
    personDF.show()
    //显示DataFrame的schema信息
    personDF.printSchema()
    //显示DataFrame记录数
    println(personDF.count())
    //显示DataFrame的所有字段
    personDF.columns.foreach(println)
    //取出DataFrame的第一行记录
    println(personDF.head())
    //显示DataFrame中name字段的所有值
    personDF.select("name").show()
    //过滤出DataFrame中年龄大于20的记录
    personDF.filter($"age" > 20).show()
    //统计DataFrame中年龄大于20的人数
    println(personDF.filter($"age" > 20).count())
    //统计DataFrame中按照年龄进行分组,求每个组的人数
    personDF.groupBy("age").count().show()
    //将DataFrame注册成临时表
    personDF.createOrReplaceTempView("t_person")
    //传入sql语句,进行操作
    spark.sql("select * from t_person").show()
    spark.sql("select * from t_person where name='王五'").show()
    spark.sql("select * from t_person order by age desc").show()
    //DataFrame转换成Dataset
    var ds=personDF.as[Person]
    ds.show()
    //关闭操作
    sc.stop()
    spark.stop()
  }
}

二、Spark SQL读写MySQL数据库

下面的代码使用JDBC连接MySQL数据库,并进行读写操作 主要步骤如下

1:新建数据库

2:新建表

3:添加依赖包

4:新建类

5:查看运行结果

代码如下

import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode,SparkSession}
object sparkSqlMysql {
  def main(args: Array[String]): Unit = {
    //创建sparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .appName("sparkSqlMysql")
      .master("local")
      .getOrCreate()
    val sc = spark.sparkContext
    //读取数据
    val data: RDD[Array[String]] = sc.textFile("D:/people.txt").map(x => x.split(","));
    //RDD关联Person
    val personRdd: RDD[Person] = data.map(x => Person(x(0), x(1).toLong))
    //导入隐式转换
    import spark.implicits._
    //将RDD转换成DataFrame
    val personDF: DataFrame = personRdd.toDF()
    personDF.show()
    //创建Properties对象,配置连接mysql的用户名和密码
    val prop =new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","123456")
    //将personDF写入MySQL
    personDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/spark?useUnicode=true&characterEncoding=utf8","person",prop)
    //从数据库里读取数据
    val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/spark", "person",                   prop)
    mysqlDF.show()
    spark.stop()
  }
}

三、Spark SQL读写Hive

下面的示例程序连接Hive,并读写Hive下的表 主要步骤如下

1:在pom.xml中添加Hive依赖包

2:连接Hive

3:新建表

4:向Hive表写入数据,新scala类sparksqlToHIVE,主要功能是读取D盘下的people.txt文件,使用编程方式操作DataFrame,然后插入到HIVE的表中。

5:查看运行结果

代码如下

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame,SparkSession}
object  sparksqlToHIVE {
  def main(args: Array[String]): Unit = {
    //设置访问用户名,主要用于访问HDFS下的Hive warehouse目录
    System.setProperty("HADOOP_USER_NAME", "root")
    //创建sparkSession
    val spark: SparkSession = SparkSession.builder()
      .appName("sparksqlToHIVE")
      .config("executor-cores",1)
      .master("local")
      .enableHiveSupport() //开启支持Hive
      .getOrCreate()
    val sc = spark.sparkContext
    //读取文件
    val data: RDD[Array[String]] = sc.textFile("D:/people.txt"). map (x => x.split(","));
    //将RDD与样例类关联
    val personRdd: RDD[Person] = data. map (x => Person(x(0),x(1).toLong))
    //手动导入隐式转换
    import spark.implicits._
    val personDF: DataFrame = personRdd.toDF
    //显示DataFrame的数据
    personDF.show()
    //将DataFrame注册成临时表t_person
    personDF.createOrReplaceTempView("t_person")
    //显示临时表t_person的数据
    spark.sql("select * from t_person").show()
    //使用Hive中bigdata的数据库
    spark.sql("use bigdata")
    //将临时表t_person的数据插入使用Hive中bigdata数据库下的person表中
    spark.sql("insert into person select * from t_person")
    //显示用Hive中bigdata数据库下的person表数据
    spark.sql("select * from person").show()
    spark.stop()
  }
}

创作不易 觉得有帮助请点赞关注收藏~~~

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
相关文章
|
7月前
|
存储 人工智能 大数据
云栖2025|阿里云开源大数据发布新一代“湖流一体”数智平台及全栈技术升级
阿里云在云栖大会发布“湖流一体”数智平台,推出DLF-3.0全模态湖仓、实时计算Flink版升级及EMR系列新品,融合实时化、多模态、智能化技术,打造AI时代高效开放的数据底座,赋能企业数字化转型。
1336 0
|
9月前
|
数据采集 人工智能 分布式计算
ODPS在AI时代的发展战略与技术演进分析报告
ODPS(现MaxCompute)历经十五年发展,从分布式计算平台演进为AI时代的数据基础设施,以超大规模处理、多模态融合与Data+AI协同为核心竞争力,支撑大模型训练与实时分析等前沿场景,助力企业实现数据驱动与智能化转型。
582 4
|
10月前
|
存储 分布式计算 Hadoop
Hadoop框架解析:大数据处理的核心技术
组件是对数据和方法的封装,从用户角度看是实现特定功能的独立黑盒子,能够有效完成任务。组件,也常被称作封装体,是对数据和方法的简洁封装形式。从用户的角度来看,它就像是一个实现了特定功能的黑盒子,具备输入和输出接口,能够独立完成某些任务。
|
11月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
595 0
|
7月前
|
数据可视化 大数据 关系型数据库
基于python大数据技术的医疗数据分析与研究
在数字化时代,医疗数据呈爆炸式增长,涵盖患者信息、检查指标、生活方式等。大数据技术助力疾病预测、资源优化与智慧医疗发展,结合Python、MySQL与B/S架构,推动医疗系统高效实现。
|
9月前
|
SQL 分布式计算 大数据
我与ODPS的十年技术共生之路
ODPS十年相伴,从初识的分布式计算到共生进化,突破架构边界,推动数据价值深挖。其湖仓一体、隐私计算与Serverless能力,助力企业降本增效,赋能政务与商业场景,成为数字化转型的“数字神经系统”。
|
9月前
|
存储 人工智能 算法
Java 大视界 -- Java 大数据在智能医疗影像数据压缩与传输优化中的技术应用(227)
本文探讨 Java 大数据在智能医疗影像压缩与传输中的关键技术应用,分析其如何解决医疗影像数据存储、传输与压缩三大难题,并结合实际案例展示技术落地效果。
|
9月前
|
机器学习/深度学习 算法 Java
Java 大视界 -- Java 大数据在智能物流运输车辆智能调度与路径优化中的技术实现(218)
本文深入探讨了Java大数据技术在智能物流运输中车辆调度与路径优化的应用。通过遗传算法实现车辆资源的智能调度,结合实时路况数据和强化学习算法进行动态路径优化,有效提升了物流效率与客户满意度。以京东物流和顺丰速运的实际案例为支撑,展示了Java大数据在解决行业痛点问题中的强大能力,为物流行业的智能化转型提供了切实可行的技术方案。
|
10月前
|
数据采集 自然语言处理 分布式计算
大数据岗位技能需求挖掘:Python爬虫与NLP技术结合
大数据岗位技能需求挖掘:Python爬虫与NLP技术结合
|
10月前
|
存储 分布式计算 算法
Java 大视界 -- Java 大数据在智能教育在线考试监考与作弊检测中的技术创新(193)
本文探讨了Java大数据技术在智能教育在线考试监考与作弊检测中的创新应用。随着在线考试的普及,作弊问题日益突出,传统监考方式难以应对。通过Java大数据技术,可实现考生行为分析、图像识别等多维度监控,提升作弊检测的准确性与效率。结合Hadoop与Spark等技术,系统能实时处理海量数据,构建智能监考体系,保障考试公平性,推动教育评价体系的数字化转型。

推荐镜像

更多