Spark SQL 与Hive集成

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 笔记

一、Spark SQL 与Hive集成(spark-shell)


(1)添加配置项目

第一步:

Hadoop集群的core-site.xml,hdfs-site.xml和hive的配置文件hive-site.xml拷贝到spark的conf的目录下

cp hive-site.xml /opt/Hadoop/spark/conf

第二步:

添加hive-site.xml中metastore的url的配置

<property>
        <name>hive.metastore.uris</name>
        <value>thrift://node1:9083</value>
</property>

第三步:

把hive中的MySQL的jar包上传到spark的jars目录下

cp mysql-connector-java-5.1.48-bin.jar /opt/Hadoop/spark/jars

第四步:

检查spark-env.sh文件中的Hadoop的配置项

HADOOP_CONF_DIR=/opt/Hadoop/hadoop/etc/hadoop


(2)启动服务

第一步:

检查mysql是否启动:

service mysqld status
Redirecting to /bin/systemctl status mysqld.service
● mysqld.service - MySQL Server
   Loaded: loaded (/usr/lib/systemd/system/mysqld.service; enabled; vendor preset: disabled)
   Active: active (running) since 三 2020-09-23 14:48:53 CST; 1h 10min ago
     Docs: man:mysqld(8)
           http://dev.mysql.com/doc/refman/en/using-systemd.html
  Process: 1712 ExecStart=/usr/sbin/mysqld --daemonize --pid-file=/var/run/mysqld/mysqld.pid $MYSQLD_OPTS (code=exited, status=0/SUCCESS)
  Process: 1179 ExecStartPre=/usr/bin/mysqld_pre_systemd (code=exited, status=0/SUCCESS)
 Main PID: 1714 (mysqld)
   CGroup: /system.slice/mysqld.service
           └─1714 /usr/sbin/mysqld --daemonize --pid-file=/var/run/mys...
9月 23 14:48:50 node1 systemd[1]: Starting MySQL Server...
9月 23 14:48:53 node1 systemd[1]: Started MySQL Server.

第二步:

启动hive中的metastore

bin/hive --service metastore
Starting Hive Metastore Server


(3)数据测试

第一步:

启动hive

bin/hive

第二步:

创建kfk数据库

create database kfk;

第三步:

创建test表

use kfk;
create table if not exists test(userid string,username string) 
row format delimited fields terminated by ' ' stored as textfile;

第四步:

准备数据

0001 java
0002 python
0003 c
0004 hadoop
0005 php
0006 linux
0007 spark

第五步:

导入数据

load data local inpath "/opt/datas/test1.txt" into table test;
hive (kfk)> select * from test;
OK
test.userid test.username
0001  java
0002  python
0003  c
0004  hadoop
0005  php
0006  linux
0007  spark
Time taken: 0.055 seconds, Fetched: 7 row(s)

第六步:

通过spark-shell查看数据

spark.sql("select * from kfk.test")
res0: org.apache.spark.sql.DataFrame = [userid: string, username: string]
scala> spark.sql("select * from kfk.test").show
+------+--------+
|userid|username|
+------+--------+
|  0001|    java|
|  0002|  python|
|  0003|       c|
|  0004|  hadoop|
|  0005|     php|
|  0006|   linux|
|  0007|   spark|
+------+--------+


(4)将数据集写入到MySQL

首先进入mysql数据库,并且创建test数据库

mysql -u root -p
mysql> create database test;

然后进入spark shell ,将spark sql分析hive中的数据写入到mysql中

scala> import java.util.Properties
import java.util.Properties
scala> val pro = new Properties()
pro: java.util.Properties = {}
scala> pro.setProperty("driver","com.mysql.jdbc.Driver")
res0: Object = null
scala> val df = spark.sql("select * from kfk.test")
df: org.apache.spark.sql.DataFrame = [userid: string, username: string]
scala> df.write.jdbc("jdbc:mysql://node1/test?user=root&password=199911","spark1",pro)

最后查看mysql数据库中表spark1的数据

mysql> select * from spark1;
+--------+----------+
| userid | username |
+--------+----------+
| 0001   | java     |
| 0002   | python   |
| 0003   | c        |
| 0004   | hadoop   |
| 0005   | php      |
| 0006   | linux    |
| 0007   | spark    |
+--------+----------+
7 rows in set (0.00 sec)


二、Spark SQL 与Hive集成(spark-sql)


第一步:启动hive中的metastore

bin/hive --service metastore
Starting Hive Metastore Server

第二步:启动spark-sql

bin/spark-sql

显示数据库,我们可以发现是和hive中是一样的,命令也是使用SQL语句

spark-sql (default)> show databases;
20/09/23 10:38:58 INFO CodeGenerator: Code generated in 164.478292 ms
databaseName
default
kfk
Time taken: 1.338 seconds, Fetched 2 row(s)
20/09/23 10:38:58 INFO SparkSQLCLIDriver: Time taken: 1.338 seconds, Fetched 2 row(s)
spark-sql (default)> use kfk;
spark-sql (default)> show tables;
20/09/23 10:39:34 INFO CodeGenerator: Code generated in 8.452303 ms
database  tableName isTemporary
kfk test  false
Time taken: 0.059 seconds, Fetched 1 row(s)
20/09/23 10:39:34 INFO SparkSQLCLIDriver: Time taken: 0.059 seconds, Fetched 1 row(s)
spark-sql (default)> select * from test;
userid  username
0001  java
0002  python
0003  c
0004  hadoop
0005  php
0006  linux
0007  spark
Time taken: 0.806 seconds, Fetched 7 row(s)

综上,Spark SQL 与Hive集成成功。


三、Spark SQL 与Hive集成(IDEA工具)


把Hadoop集群的core-site.xml,hdfs-sit.xml和hive的hive-site.xml文件拷贝到项目的resources目录下

package com.kfk.spark.sql
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/9
 * @time : 4:01 下午
 */
object HiveSpark {
    def main(args: Array[String]): Unit = {
        val spark = SparkSession
                .builder
                .appName("Spark Hive Example")
                .master("local")
                .config("spark.sql.warehouse.dir", "/Users/caizhengjie/Document/spark/spark-warehouse")
                .enableHiveSupport()
                .getOrCreate
        spark.sql("select * from hivespark.person").show()
    }
}

运行结果:

+------+------+--------+------+
|deptid|userid|username|salary|
+------+------+--------+------+
|dept-1|   001|  cherry|  1900|
|dept-1|   002|    alex|  5600|
|dept-1|   003|    jack|  7800|
|dept-2|   004|    jone|  2000|
|dept-2|   005|    lucy|  4500|
|dept-2|   006|    lili|  6300|
|dept-2|   007|   carry|  9000|
+------+------+--------+------+


四、Spark SQL之ThirftServer和beeline使用


Spark SQL也可以使用其JDBC / ODBC或命令行界面充当分布式查询引擎。

thriftserver和spark-shell/spark sql的区别:


spark-shell,spark-sql都是一个spark application

thriftserver,不管你启动多少个客户端(beeline/code),只要是连在一个thriftserver上,永远都是一个spark application,解决了一个数据共享的问题,多个客户端可以共享数据。

用thriftserver,在UI中能直接看到sql的执行计划,方便优化

总结

基于Spark的thirftserver来访问hive中的数据,可以让多个客户端连接到同一个服务器端,跑的是同一个application

Thirftserver作为服务端,beeline作为客户端来访问服务端,支持多个客户端同时访问,有助于多个客户端之间数据的共享


使用步骤:

第一步:启动metastore服务

bin/hive --service metastore
Starting Hive Metastore Server

第二步:启动thriftserver

sbin/start-thriftserver.sh


starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /opt/Hadoop/spark/logs/spark-caizhengjie-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-node1.out


第三步:通过客户端beeline来连接

[caizhengjie@node1 spark]$ bin/beeline 
Beeline version 1.2.1.spark2 by Apache Hive
beeline> !connect jdbc:hive2://node1:10000
Connecting to jdbc:hive2://node1:10000
Enter username for jdbc:hive2://node1:10000: caizhengjie
Enter password for jdbc:hive2://node1:10000: ******
20/09/24 01:26:56 INFO Utils: Supplied authorities: node1:10000
20/09/24 01:26:56 INFO Utils: Resolved authority: node1:10000
20/09/24 01:26:56 INFO HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://node1:10000
Connected to: Spark SQL (version 2.4.6)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ

下面就可以通过SQL命令来来访问hive中的数据表

show databases;
+---------------+--+
| databaseName  |
+---------------+--+
| default       |
| kfk           |
+---------------+--+
use kfk;
+---------+--+
| Result  |
+---------+--+
+---------+--+
show tables;
+-----------+------------+--------------+--+
| database  | tableName  | isTemporary  |
+-----------+------------+--------------+--+
| kfk       | test       | false        |
+-----------+------------+--------------+--+
select * from test;
+---------+-----------+--+
| userid  | username  |
+---------+-----------+--+
| 0001    | java      |
| 0002    | python    |
| 0003    | c         |
| 0004    | hadoop    |
| 0005    | php       |
| 0006    | linux     |
| 0007    | spark     |
+---------+-----------+--+

最后,我通过测试,使用了2个客户端beeline的连接,查看web监控页面

30.png

其实就是一个application,每个beeline只作为一个job


相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
11天前
|
SQL HIVE
【Hive SQL 每日一题】环比增长率、环比增长率、复合增长率
该文介绍了环比增长率、同比增长率和复合增长率的概念及计算公式,并提供了SQL代码示例来计算商品的月度增长率。环比增长率是相邻两期数据的增长率,同比增长率是与去年同期相比的增长率,复合增长率则是连续时间段内平均增长的速率。文章还包含了一组销售数据用于演示如何运用这些增长率进行计算。
|
11天前
|
SQL 分布式计算 关系型数据库
使用 Spark 抽取 MySQL 数据到 Hive 时某列字段值出现异常(字段错位)
在 MySQL 的 `order_info` 表中,包含 `order_id` 等5个字段,主要存储订单信息。执行按 `create_time` 降序的查询,显示了部分结果。在 Hive 中复制此表结构时,所有字段除 `order_id` 外设为 `string` 类型,并添加了 `etl_date` 分区字段。然而,由于使用逗号作为字段分隔符,当 `address` 字段含逗号时,数据写入 Hive 出现错位,导致 `create_time` 值变为中文字符串。问题解决方法包括更换字段分隔符或使用 Hive 默认分隔符 `\u0001`。此案例提醒在建表时需谨慎选择字段分隔符。
|
11天前
|
SQL HIVE
【Hive SQL 每日一题】统计用户连续下单的日期区间
该SQL代码用于统计用户连续下单的日期区间。首先按`user_id`和`order_date`分组并去除重复,然后使用`row_number()`标记行号,并通过`date_sub`与行号计算潜在的连续日期。接着按用户ID和计算后的日期分组,排除连续订单数少于2的情况,最后提取连续下单的起始和结束日期。输出结果展示了用户连续下单的日期范围。
|
11天前
|
SQL HIVE
【Hive SQL】字符串操作函数你真的会用吗?
本文介绍了SQL中判断字符串是否包含子串的几种方法。`IN`函数判断元素是否完全等于给定元素组中的某项,而非包含关系。`INSTR`和`LOCATE`函数返回子串在字符串中首次出现的位置,用于检测是否存在子串。`SUBSTR`则用于提取字符串的子串。`LIKE`用于模糊匹配,常与通配符配合使用。注意`IN`并非用于判断子串包含。
|
12天前
|
SQL 分布式计算 Java
Spark 为什么比 Hive 快
Spark与Hive在数据处理上有显著区别。Spark以其内存计算和线程级并行提供更快的速度,但稳定性受内存限制。相比之下,Hive虽较慢,因使用MapReduce,其稳定性更高,对内存需求较小。在Shuffle方式上,Spark的内存 Shuffle 比Hive的磁盘 Shuffle 更高效。综上,Spark在处理速度和Shuffle上占优,Hive则在稳定性和资源管理上更胜一筹。
|
12天前
|
SQL 关系型数据库 HIVE
【Hive SQL 每日一题】统计最近1天/7天/30天商品的销量
这段内容是关于SQL查询的示例,目标是统计`sales`表中最近1天、7天和30天的商品销量和销售次数。表结构包含`id`、`product_id`、`quantity`和`sale_date`字段。初始查询方法通过三个独立的子查询完成,但效率较低。优化后的查询使用了`lateral view explode`将数据炸裂,通过一次查询同时获取所有所需时间段的数据,提高了效率。示例中展示了优化前后的SQL代码及结果对比。
|
12天前
|
SQL HIVE
【Hive SQL 每日一题】统计最近7天内连续下单3日的用户量
创建了一个名为`sales`的测试表,包含`user_id`、`product_id`、`quantity`和`sale_date`字段,插入了多条销售数据。需求是找出最近7天内连续下单3天的用户数量。SQL查询通过分组和窗口函数`row_number()`检查日期连续性,最终计算满足条件的唯一用户数。示例结果显示有3名用户符合条件。
|
12天前
|
SQL BI HIVE
【Hive SQL 每日一题】统计用户留存率
用户留存率是衡量产品成功的关键指标,表示用户在特定时间内持续使用产品的比例。计算公式为留存用户数除以初始用户数。例如,游戏发行后第一天有10000玩家,第七天剩5000人,第一周留存率为50%。提供的SQL代码展示了如何根据用户活动数据统计每天的留存率。需求包括计算系统上线后的每日留存率,以及从第一天开始的累计N日留存率。通过窗口函数`LAG`和`COUNT(DISTINCT user_id)`,可以有效地分析用户留存趋势。
|
12天前
|
SQL HIVE 索引
【Hive SQL 每日一题】行列转换
该文介绍了如何使用SQL进行数据的行列转换。首先展示了行转列的例子,通过创建一个学生成绩表,利用`IF`和`SUM`函数按学生ID分组,将每个学生的各科成绩转换为独立列。然后,文章讲述了列转行的需求,利用`LATERAL VIEW`和`POSEXPLODE`将已转换的表格恢复为原始行格式,通过索引匹配过滤笛卡尔积避免错误结果。此外,还提到了使用`UNION ALL`的另一种列转行方法。
|
12天前
|
SQL HIVE
【Hive SQL 每日一题】分组排名取值
创建了一个名为`sales_data`的测试表,包含商品ID、销售额和销售日期。展示了部分示例数据。接着,提供了三个SQL查询:1) 查找每个商品销售额最高的记录;2) 获取每个商品最近和最远的销售记录;3) 求每个商品距今第二近的销售记录。每个查询都利用了窗口函数来处理数据,并给出了相应的查询结果图。