【Spark】(八)Spark SQL 应用解析2

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 【Spark】(八)Spark SQL 应用解析2

四、Spark SQL 操作Hive表


4.1 文件配置


分别复制 hive lib、conf 目录下文件到 spark 的jars 目录下

[root@zj1 sbin]# cd /opt/soft/hive110/lib/
[root@zj1 lib]# cp mysql-connector-java-5.1.39-bin.jar /opt/soft/spark234/jars/
[root@zj1 hive110]# cd conf/
[root@zj1 conf]# cp hive-site.xml /opt/soft/spark234/conf/


修改 spark hive-site.xml


加入

<property>
                <name>hive.metastore.uris</name>
                <value>thrift://zj1:9083</value>
        </property>

   

执行

hive --service metastore


4.1 操作 Hive 表

// 原有spark不支持 原有激活状态的spark先stop
scala> spark.stop
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val spark = SparkSession.builder().appName("spark-hive").enableHiveSupport.getOrCreate
// 通过SQL 命令直接操作 hive 表
scala> spark.sql("select * from mydemo.order").show
+----+----------+----+
|name| orderdate|cost|
+----+----------+----+
|jack|2015-04-03|  23|
|jack|2015-01-01|  10|
|tony|2015-01-02|  15|
|mart|2015-04-11|  75|
|neil|2015-06-12|  80|
|mart|2015-04-13|  94|
+----+----------+----+
scala> val spk= spark.sql("select * from mydemo.order")
scala> spk.repartition(1).write.format("csv").save("hdfs://192.168.56.137:9000/20200109")
// 如下 csv文件写到hdfs上

image.png

// 如下 表写到hive上
scala> spk.filter($"name".startsWith("jack")).write.saveAsTable("xxx")


我们到 hive 中查询结果 , 发现 hive 中出现 “xxx” 表

image.png


我们还可以通过spark 往表中插入数据

// 往 XXX 表中插入数据 
scala> spark.sql("insert into xxx values('jm','2020-09-09',99)")
1
2
五、Spark SQL 连 MySQL
// 启动 带jar 包
[root@zj1 bin]# ./spark-shell --jars /opt/soft/spark234/jars/mysql-connector-java-5.1.39-bin.jar
scala> val prop = new java.util.Properties
prop: java.util.Properties = {}
scala> prop.setProperty("driver","com.mysql.jdbc.Driver")
res0: Object = null
scala> prop.setProperty("user","root")
res1: Object = null
scala> prop.setProperty("password","ok")
res2: Object = null
// 从mysql中读取表
scala> val jdbcDF = spark.read.jdbc("jdbc:mysql://192.168.56.137:3306/mydemo","users",prop)
scala> jdbcDF.show
+---+--------+----------+                                                       
| id|username|  birthday|
+---+--------+----------+
|  1|      zs|1999-09-09|
|  2|      ls|1999-09-08|
|  4|      zl|1989-09-08|
+---+--------+----------+
// 过滤 
scala> jdbcDF.filter($"username".endsWith("s")).write.mode("append").jdbc("jdbc:mysql://192.168.56.137:3306/mydemo","myuser",prop)

image.png


六、Spark SQL 内置函数


// 建一个数组
val mylog = Array("2019-12-27,001","2019-12-27,001","2019-12-27,002","2019-12-28,001","2019-12-28,002","2019-12-28,002")
// 导包
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// 根据集合数据生成RDD
scala> val rdd = sc.parallelize(mylog).map(x=>{
     | val sp = x.split(",")
     | Row(sp(0),sp(1).toInt)
     | })
 // 定义DataFrame的结构
val struct = StructType(Array(
StructField("day",StringType,true),
StructField("userid",IntegerType,true)
))
val df = spark.createDataFrame(rdd,struct)
scala> df.show
+----------+------+                                                             
|       day|userid|
+----------+------+
|2019-12-27|     1|
|2019-12-27|     1|
|2019-12-27|     2|
|2019-12-28|     1|
|2019-12-28|     2|
|2019-12-28|     2|
+----------+------+
import org.apache.spark.sql.functions._
scala> df.groupBy("day").agg(count("userid").as("pv")).show
+----------+---+                                                                
|       day| pv|
+----------+---+
|2019-12-28|  3|
|2019-12-27|  3|
+----------+---+
scala> df.groupBy("day").agg(countDistinct("userid").as("pv")).show
+----------+---+                                                                
|       day| pv|
+----------+---+
|2019-12-28|  2|
|2019-12-27|  2|
+----------+---+


七、Spark SQL 自定义函数


scala> val df = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.137:9000/20200102/events.csv")
scala> df.printSchema
// 设置 自定义函数
scala> spark.udf.register("eaddu",(eid:String,uid:String)=>eid+uid)
scala> spark.sql("select event_id,eaddu(event_id,user_id) as fullid from events").show(3)
+----------+-------------------+
|  event_id|             fullid|
+----------+-------------------+
| 684921758|6849217583647864012|
| 244999119|2449991193476440521|
|3928440935|3928440935517514445|
+----------+-------------------+



目录
相关文章
|
18天前
|
机器学习/深度学习 人工智能 自然语言处理
AI技术深度解析:从基础到应用的全面介绍
人工智能(AI)技术的迅猛发展,正在深刻改变着我们的生活和工作方式。从自然语言处理(NLP)到机器学习,从神经网络到大型语言模型(LLM),AI技术的每一次进步都带来了前所未有的机遇和挑战。本文将从背景、历史、业务场景、Python代码示例、流程图以及如何上手等多个方面,对AI技术中的关键组件进行深度解析,为读者呈现一个全面而深入的AI技术世界。
91 10
|
9天前
|
安全 API 数据安全/隐私保护
速卖通AliExpress商品详情API接口深度解析与实战应用
速卖通(AliExpress)作为全球化电商的重要平台,提供了丰富的商品资源和便捷的购物体验。为了提升用户体验和优化商品管理,速卖通开放了API接口,其中商品详情API尤为关键。本文介绍如何获取API密钥、调用商品详情API接口,并处理API响应数据,帮助开发者和商家高效利用这些工具。通过合理规划API调用策略和确保合法合规使用,开发者可以更好地获取商品信息,优化管理和营销策略。
|
30天前
|
机器学习/深度学习 搜索推荐 API
淘宝/天猫按图搜索(拍立淘)API的深度解析与应用实践
在数字化时代,电商行业迅速发展,个性化、便捷性和高效性成为消费者新需求。淘宝/天猫推出的拍立淘API,利用图像识别技术,提供精准的购物搜索体验。本文深入探讨其原理、优势、应用场景及实现方法,助力电商技术和用户体验提升。
|
2月前
|
编译器 PHP 开发者
PHP 8新特性解析与实战应用####
随着PHP 8的发布,这一经典编程语言迎来了诸多令人瞩目的新特性和性能优化。本文将深入探讨PHP 8中的几个关键新功能,包括命名参数、JIT编译器、新的字符串处理函数以及错误处理改进等。通过实际代码示例,展示如何在现有项目中有效利用这些新特性来提升代码的可读性、维护性和执行效率。无论你是PHP新手还是经验丰富的开发者,本文都将为你提供实用的技术洞察和最佳实践指导。 ####
34 1
|
2月前
|
存储 供应链 算法
深入解析区块链技术的核心原理与应用前景
深入解析区块链技术的核心原理与应用前景
60 0
|
2月前
|
存储 监控 API
深入解析微服务架构及其在现代应用中的实践
深入解析微服务架构及其在现代应用中的实践
47 0
|
2月前
|
SQL Java 数据库连接
canal-starter 监听解析 storeValue 不一样,同样的sql 一个在mybatis执行 一个在数据库操作,导致解析不出正确对象
canal-starter 监听解析 storeValue 不一样,同样的sql 一个在mybatis执行 一个在数据库操作,导致解析不出正确对象
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
156 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
78 0
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
53 0

推荐镜像

更多