【Spark】(八)Spark SQL 应用解析2

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 【Spark】(八)Spark SQL 应用解析2

四、Spark SQL 操作Hive表


4.1 文件配置


分别复制 hive lib、conf 目录下文件到 spark 的jars 目录下

[root@zj1 sbin]# cd /opt/soft/hive110/lib/
[root@zj1 lib]# cp mysql-connector-java-5.1.39-bin.jar /opt/soft/spark234/jars/
[root@zj1 hive110]# cd conf/
[root@zj1 conf]# cp hive-site.xml /opt/soft/spark234/conf/


修改 spark hive-site.xml


加入

<property>
                <name>hive.metastore.uris</name>
                <value>thrift://zj1:9083</value>
        </property>

   

执行

hive --service metastore


4.1 操作 Hive 表

// 原有spark不支持 原有激活状态的spark先stop
scala> spark.stop
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val spark = SparkSession.builder().appName("spark-hive").enableHiveSupport.getOrCreate
// 通过SQL 命令直接操作 hive 表
scala> spark.sql("select * from mydemo.order").show
+----+----------+----+
|name| orderdate|cost|
+----+----------+----+
|jack|2015-04-03|  23|
|jack|2015-01-01|  10|
|tony|2015-01-02|  15|
|mart|2015-04-11|  75|
|neil|2015-06-12|  80|
|mart|2015-04-13|  94|
+----+----------+----+
scala> val spk= spark.sql("select * from mydemo.order")
scala> spk.repartition(1).write.format("csv").save("hdfs://192.168.56.137:9000/20200109")
// 如下 csv文件写到hdfs上

image.png

// 如下 表写到hive上
scala> spk.filter($"name".startsWith("jack")).write.saveAsTable("xxx")


我们到 hive 中查询结果 , 发现 hive 中出现 “xxx” 表

image.png


我们还可以通过spark 往表中插入数据

// 往 XXX 表中插入数据 
scala> spark.sql("insert into xxx values('jm','2020-09-09',99)")
1
2
五、Spark SQL 连 MySQL
// 启动 带jar 包
[root@zj1 bin]# ./spark-shell --jars /opt/soft/spark234/jars/mysql-connector-java-5.1.39-bin.jar
scala> val prop = new java.util.Properties
prop: java.util.Properties = {}
scala> prop.setProperty("driver","com.mysql.jdbc.Driver")
res0: Object = null
scala> prop.setProperty("user","root")
res1: Object = null
scala> prop.setProperty("password","ok")
res2: Object = null
// 从mysql中读取表
scala> val jdbcDF = spark.read.jdbc("jdbc:mysql://192.168.56.137:3306/mydemo","users",prop)
scala> jdbcDF.show
+---+--------+----------+                                                       
| id|username|  birthday|
+---+--------+----------+
|  1|      zs|1999-09-09|
|  2|      ls|1999-09-08|
|  4|      zl|1989-09-08|
+---+--------+----------+
// 过滤 
scala> jdbcDF.filter($"username".endsWith("s")).write.mode("append").jdbc("jdbc:mysql://192.168.56.137:3306/mydemo","myuser",prop)

image.png


六、Spark SQL 内置函数


// 建一个数组
val mylog = Array("2019-12-27,001","2019-12-27,001","2019-12-27,002","2019-12-28,001","2019-12-28,002","2019-12-28,002")
// 导包
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// 根据集合数据生成RDD
scala> val rdd = sc.parallelize(mylog).map(x=>{
     | val sp = x.split(",")
     | Row(sp(0),sp(1).toInt)
     | })
 // 定义DataFrame的结构
val struct = StructType(Array(
StructField("day",StringType,true),
StructField("userid",IntegerType,true)
))
val df = spark.createDataFrame(rdd,struct)
scala> df.show
+----------+------+                                                             
|       day|userid|
+----------+------+
|2019-12-27|     1|
|2019-12-27|     1|
|2019-12-27|     2|
|2019-12-28|     1|
|2019-12-28|     2|
|2019-12-28|     2|
+----------+------+
import org.apache.spark.sql.functions._
scala> df.groupBy("day").agg(count("userid").as("pv")).show
+----------+---+                                                                
|       day| pv|
+----------+---+
|2019-12-28|  3|
|2019-12-27|  3|
+----------+---+
scala> df.groupBy("day").agg(countDistinct("userid").as("pv")).show
+----------+---+                                                                
|       day| pv|
+----------+---+
|2019-12-28|  2|
|2019-12-27|  2|
+----------+---+


七、Spark SQL 自定义函数


scala> val df = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.137:9000/20200102/events.csv")
scala> df.printSchema
// 设置 自定义函数
scala> spark.udf.register("eaddu",(eid:String,uid:String)=>eid+uid)
scala> spark.sql("select event_id,eaddu(event_id,user_id) as fullid from events").show(3)
+----------+-------------------+
|  event_id|             fullid|
+----------+-------------------+
| 684921758|6849217583647864012|
| 244999119|2449991193476440521|
|3928440935|3928440935517514445|
+----------+-------------------+



目录
相关文章
|
2月前
|
SQL 数据可视化 关系型数据库
MCP与PolarDB集成技术分析:降低SQL门槛与简化数据可视化流程的机制解析
阿里云PolarDB与MCP协议融合,打造“自然语言即分析”的新范式。通过云原生数据库与标准化AI接口协同,实现零代码、分钟级从数据到可视化洞察,打破技术壁垒,提升分析效率99%,推动企业数据能力普惠化。
253 3
|
7月前
|
机器学习/深度学习 文字识别 监控
安全监控系统:技术架构与应用解析
该系统采用模块化设计,集成了行为识别、视频监控、人脸识别、危险区域检测、异常事件检测、日志追溯及消息推送等功能,并可选配OCR识别模块。基于深度学习与开源技术栈(如TensorFlow、OpenCV),系统具备高精度、低延迟特点,支持实时分析儿童行为、监测危险区域、识别异常事件,并将结果推送给教师或家长。同时兼容主流硬件,支持本地化推理与分布式处理,确保可靠性与扩展性,为幼儿园安全管理提供全面解决方案。
359 3
|
7月前
|
SQL 安全 关系型数据库
SQL注入之万能密码:原理、实践与防御全解析
本文深入解析了“万能密码”攻击的运行机制及其危险性,通过实例展示了SQL注入的基本原理与变种形式。文章还提供了企业级防御方案,包括参数化查询、输入验证、权限控制及WAF规则配置等深度防御策略。同时,探讨了二阶注入和布尔盲注等新型攻击方式,并给出开发者自查清单。最后强调安全防护需持续改进,无绝对安全,建议使用成熟ORM框架并定期审计。技术内容仅供学习参考,严禁非法用途。
1123 0
|
4月前
|
SQL JSON 分布式计算
Spark SQL架构及高级用法
Spark SQL基于Catalyst优化器与Tungsten引擎,提供高效的数据处理能力。其架构涵盖SQL解析、逻辑计划优化、物理计划生成及分布式执行,支持复杂数据类型、窗口函数与多样化聚合操作,结合自适应查询与代码生成技术,实现高性能大数据分析。
|
6月前
|
SQL 存储 自然语言处理
SQL的解析和优化的原理:一条sql 执行过程是什么?
SQL的解析和优化的原理:一条sql 执行过程是什么?
SQL的解析和优化的原理:一条sql 执行过程是什么?
|
8月前
|
人工智能 API 开发者
HarmonyOS Next~鸿蒙应用框架开发实战:Ability Kit与Accessibility Kit深度解析
本书深入解析HarmonyOS应用框架开发,聚焦Ability Kit与Accessibility Kit两大核心组件。Ability Kit通过FA/PA双引擎架构实现跨设备协同,支持分布式能力开发;Accessibility Kit提供无障碍服务构建方案,优化用户体验。内容涵盖设计理念、实践案例、调试优化及未来演进方向,助力开发者打造高效、包容的分布式应用,体现HarmonyOS生态价值。
506 27
|
8月前
|
存储 弹性计算 安全
阿里云服务器ECS通用型规格族解析:实例规格、性能基准与场景化应用指南
作为ECS产品矩阵中的核心序列,通用型规格族以均衡的计算、内存、网络和存储性能著称,覆盖从基础应用到高性能计算的广泛场景。通用型规格族属于独享型云服务器,实例采用固定CPU调度模式,实例的每个CPU绑定到一个物理CPU超线程,实例间无CPU资源争抢,实例计算性能稳定且有严格的SLA保证,在性能上会更加稳定,高负载情况下也不会出现资源争夺现象。本文将深度解析阿里云ECS通用型规格族的技术架构、实例规格特性、最新价格政策及典型应用场景,为云计算选型提供参考。
|
8月前
|
数据采集 机器学习/深度学习 存储
可穿戴设备如何重塑医疗健康:技术解析与应用实战
可穿戴设备如何重塑医疗健康:技术解析与应用实战
324 4
|
8月前
|
SQL 分布式计算 资源调度
Dataphin功能Tips系列(48)-如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
332 4
|
8月前
|
负载均衡 JavaScript 前端开发
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~

推荐镜像

更多
  • DNS
  • 下一篇
    oss云网关配置