【Spark】(八)Spark SQL 应用解析2

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 【Spark】(八)Spark SQL 应用解析2

四、Spark SQL 操作Hive表


4.1 文件配置


分别复制 hive lib、conf 目录下文件到 spark 的jars 目录下

[root@zj1 sbin]# cd /opt/soft/hive110/lib/
[root@zj1 lib]# cp mysql-connector-java-5.1.39-bin.jar /opt/soft/spark234/jars/
[root@zj1 hive110]# cd conf/
[root@zj1 conf]# cp hive-site.xml /opt/soft/spark234/conf/


修改 spark hive-site.xml


加入

<property>
                <name>hive.metastore.uris</name>
                <value>thrift://zj1:9083</value>
        </property>

   

执行

hive --service metastore


4.1 操作 Hive 表

// 原有spark不支持 原有激活状态的spark先stop
scala> spark.stop
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val spark = SparkSession.builder().appName("spark-hive").enableHiveSupport.getOrCreate
// 通过SQL 命令直接操作 hive 表
scala> spark.sql("select * from mydemo.order").show
+----+----------+----+
|name| orderdate|cost|
+----+----------+----+
|jack|2015-04-03|  23|
|jack|2015-01-01|  10|
|tony|2015-01-02|  15|
|mart|2015-04-11|  75|
|neil|2015-06-12|  80|
|mart|2015-04-13|  94|
+----+----------+----+
scala> val spk= spark.sql("select * from mydemo.order")
scala> spk.repartition(1).write.format("csv").save("hdfs://192.168.56.137:9000/20200109")
// 如下 csv文件写到hdfs上

image.png

// 如下 表写到hive上
scala> spk.filter($"name".startsWith("jack")).write.saveAsTable("xxx")


我们到 hive 中查询结果 , 发现 hive 中出现 “xxx” 表

image.png


我们还可以通过spark 往表中插入数据

// 往 XXX 表中插入数据 
scala> spark.sql("insert into xxx values('jm','2020-09-09',99)")
1
2
五、Spark SQL 连 MySQL
// 启动 带jar 包
[root@zj1 bin]# ./spark-shell --jars /opt/soft/spark234/jars/mysql-connector-java-5.1.39-bin.jar
scala> val prop = new java.util.Properties
prop: java.util.Properties = {}
scala> prop.setProperty("driver","com.mysql.jdbc.Driver")
res0: Object = null
scala> prop.setProperty("user","root")
res1: Object = null
scala> prop.setProperty("password","ok")
res2: Object = null
// 从mysql中读取表
scala> val jdbcDF = spark.read.jdbc("jdbc:mysql://192.168.56.137:3306/mydemo","users",prop)
scala> jdbcDF.show
+---+--------+----------+                                                       
| id|username|  birthday|
+---+--------+----------+
|  1|      zs|1999-09-09|
|  2|      ls|1999-09-08|
|  4|      zl|1989-09-08|
+---+--------+----------+
// 过滤 
scala> jdbcDF.filter($"username".endsWith("s")).write.mode("append").jdbc("jdbc:mysql://192.168.56.137:3306/mydemo","myuser",prop)

image.png


六、Spark SQL 内置函数


// 建一个数组
val mylog = Array("2019-12-27,001","2019-12-27,001","2019-12-27,002","2019-12-28,001","2019-12-28,002","2019-12-28,002")
// 导包
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// 根据集合数据生成RDD
scala> val rdd = sc.parallelize(mylog).map(x=>{
     | val sp = x.split(",")
     | Row(sp(0),sp(1).toInt)
     | })
 // 定义DataFrame的结构
val struct = StructType(Array(
StructField("day",StringType,true),
StructField("userid",IntegerType,true)
))
val df = spark.createDataFrame(rdd,struct)
scala> df.show
+----------+------+                                                             
|       day|userid|
+----------+------+
|2019-12-27|     1|
|2019-12-27|     1|
|2019-12-27|     2|
|2019-12-28|     1|
|2019-12-28|     2|
|2019-12-28|     2|
+----------+------+
import org.apache.spark.sql.functions._
scala> df.groupBy("day").agg(count("userid").as("pv")).show
+----------+---+                                                                
|       day| pv|
+----------+---+
|2019-12-28|  3|
|2019-12-27|  3|
+----------+---+
scala> df.groupBy("day").agg(countDistinct("userid").as("pv")).show
+----------+---+                                                                
|       day| pv|
+----------+---+
|2019-12-28|  2|
|2019-12-27|  2|
+----------+---+


七、Spark SQL 自定义函数


scala> val df = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.137:9000/20200102/events.csv")
scala> df.printSchema
// 设置 自定义函数
scala> spark.udf.register("eaddu",(eid:String,uid:String)=>eid+uid)
scala> spark.sql("select event_id,eaddu(event_id,user_id) as fullid from events").show(3)
+----------+-------------------+
|  event_id|             fullid|
+----------+-------------------+
| 684921758|6849217583647864012|
| 244999119|2449991193476440521|
|3928440935|3928440935517514445|
+----------+-------------------+



目录
相关文章
RS-485网络中的标准端接与交流电端接应用解析
RS-485,作为一种广泛应用的差分信号传输标准,因其传输距离远、抗干扰能力强、支持多点通讯等优点,在工业自动化、智能建筑、交通运输等领域得到了广泛应用。在构建RS-485网络时,端接技术扮演着至关重要的角色,它直接影响到网络的信号完整性、稳定性和通信质量。
|
9天前
|
自然语言处理 并行计算 数据可视化
免费开源法律文档比对工具:技术解析与应用
这款免费开源的法律文档比对工具,利用先进的文本分析和自然语言处理技术,实现高效、精准的文档比对。核心功能包括文本差异检测、多格式支持、语义分析、批量处理及用户友好的可视化界面,广泛适用于法律行业的各类场景。
|
3天前
|
存储 供应链 物联网
深入解析区块链技术的核心原理与应用前景
深入解析区块链技术的核心原理与应用前景
|
3天前
|
存储 供应链 安全
深度解析区块链技术的核心原理与应用前景
深度解析区块链技术的核心原理与应用前景
10 0
|
7天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
7天前
|
SQL 监控 安全
员工上网行为监控软件:SQL 在数据查询监控中的应用解析
在数字化办公环境中,员工上网行为监控软件对企业网络安全和管理至关重要。通过 SQL 查询和分析数据库中的数据,企业可以精准了解员工的上网行为,包括基础查询、复杂条件查询、数据统计与分析等,从而提高网络管理和安全防护的效率。
20 0
|
3天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
14 2
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
66 0
|
1月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
52 0
|
1月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
60 0

推荐镜像

更多