一、Spark SQL 与Hive集成(spark-shell)
(1)添加配置项目
第一步:
把Hadoop集群的core-site.xml,hdfs-site.xml和hive的配置文件hive-site.xml拷贝到spark的conf的目录下
cp hive-site.xml /opt/Hadoop/spark/conf
第二步:
添加hive-site.xml中metastore的url的配置
<property> <name>hive.metastore.uris</name> <value>thrift://node1:9083</value> </property>
第三步:
把hive中的MySQL的jar包上传到spark的jars目录下
cp mysql-connector-java-5.1.48-bin.jar /opt/Hadoop/spark/jars
第四步:
检查spark-env.sh文件中的Hadoop的配置项
HADOOP_CONF_DIR=/opt/Hadoop/hadoop/etc/hadoop
(2)启动服务
第一步:
检查mysql是否启动:
service mysqld status
Redirecting to /bin/systemctl status mysqld.service ● mysqld.service - MySQL Server Loaded: loaded (/usr/lib/systemd/system/mysqld.service; enabled; vendor preset: disabled) Active: active (running) since 三 2020-09-23 14:48:53 CST; 1h 10min ago Docs: man:mysqld(8) http://dev.mysql.com/doc/refman/en/using-systemd.html Process: 1712 ExecStart=/usr/sbin/mysqld --daemonize --pid-file=/var/run/mysqld/mysqld.pid $MYSQLD_OPTS (code=exited, status=0/SUCCESS) Process: 1179 ExecStartPre=/usr/bin/mysqld_pre_systemd (code=exited, status=0/SUCCESS) Main PID: 1714 (mysqld) CGroup: /system.slice/mysqld.service └─1714 /usr/sbin/mysqld --daemonize --pid-file=/var/run/mys... 9月 23 14:48:50 node1 systemd[1]: Starting MySQL Server... 9月 23 14:48:53 node1 systemd[1]: Started MySQL Server.
第二步:
启动hive中的metastore
bin/hive --service metastore
Starting Hive Metastore Server
(3)数据测试
第一步:
启动hive
bin/hive
第二步:
创建kfk数据库
create database kfk;
第三步:
创建test表
use kfk;
create table if not exists test(userid string,username string) row format delimited fields terminated by ' ' stored as textfile;
第四步:
准备数据
0001 java 0002 python 0003 c 0004 hadoop 0005 php 0006 linux 0007 spark
第五步:
导入数据
load data local inpath "/opt/datas/test1.txt" into table test;
hive (kfk)> select * from test; OK test.userid test.username 0001 java 0002 python 0003 c 0004 hadoop 0005 php 0006 linux 0007 spark Time taken: 0.055 seconds, Fetched: 7 row(s)
第六步:
通过spark-shell查看数据
spark.sql("select * from kfk.test")
res0: org.apache.spark.sql.DataFrame = [userid: string, username: string]
scala> spark.sql("select * from kfk.test").show +------+--------+ |userid|username| +------+--------+ | 0001| java| | 0002| python| | 0003| c| | 0004| hadoop| | 0005| php| | 0006| linux| | 0007| spark| +------+--------+
(4)将数据集写入到MySQL
首先进入mysql数据库,并且创建test数据库
mysql -u root -p
mysql> create database test;
然后进入spark shell ,将spark sql分析hive中的数据写入到mysql中
scala> import java.util.Properties import java.util.Properties
scala> val pro = new Properties() pro: java.util.Properties = {}
scala> pro.setProperty("driver","com.mysql.jdbc.Driver") res0: Object = null
scala> val df = spark.sql("select * from kfk.test") df: org.apache.spark.sql.DataFrame = [userid: string, username: string]
scala> df.write.jdbc("jdbc:mysql://node1/test?user=root&password=199911","spark1",pro)
最后查看mysql数据库中表spark1的数据
mysql> select * from spark1;
+--------+----------+ | userid | username | +--------+----------+ | 0001 | java | | 0002 | python | | 0003 | c | | 0004 | hadoop | | 0005 | php | | 0006 | linux | | 0007 | spark | +--------+----------+ 7 rows in set (0.00 sec)
二、Spark SQL 与Hive集成(spark-sql)
第一步:启动hive中的metastore
bin/hive --service metastore
Starting Hive Metastore Server
第二步:启动spark-sql
bin/spark-sql
显示数据库,我们可以发现是和hive中是一样的,命令也是使用SQL语句
spark-sql (default)> show databases; 20/09/23 10:38:58 INFO CodeGenerator: Code generated in 164.478292 ms databaseName default kfk Time taken: 1.338 seconds, Fetched 2 row(s) 20/09/23 10:38:58 INFO SparkSQLCLIDriver: Time taken: 1.338 seconds, Fetched 2 row(s)
spark-sql (default)> use kfk;
spark-sql (default)> show tables; 20/09/23 10:39:34 INFO CodeGenerator: Code generated in 8.452303 ms database tableName isTemporary kfk test false Time taken: 0.059 seconds, Fetched 1 row(s) 20/09/23 10:39:34 INFO SparkSQLCLIDriver: Time taken: 0.059 seconds, Fetched 1 row(s)
spark-sql (default)> select * from test;
userid username 0001 java 0002 python 0003 c 0004 hadoop 0005 php 0006 linux 0007 spark Time taken: 0.806 seconds, Fetched 7 row(s)
综上,Spark SQL 与Hive集成成功。
三、Spark SQL 与Hive集成(IDEA工具)
把Hadoop集群的core-site.xml,hdfs-sit.xml和hive的hive-site.xml文件拷贝到项目的resources目录下
package com.kfk.spark.sql import org.apache.spark.sql.SparkSession /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/9 * @time : 4:01 下午 */ object HiveSpark { def main(args: Array[String]): Unit = { val spark = SparkSession .builder .appName("Spark Hive Example") .master("local") .config("spark.sql.warehouse.dir", "/Users/caizhengjie/Document/spark/spark-warehouse") .enableHiveSupport() .getOrCreate spark.sql("select * from hivespark.person").show() } }
运行结果:
+------+------+--------+------+ |deptid|userid|username|salary| +------+------+--------+------+ |dept-1| 001| cherry| 1900| |dept-1| 002| alex| 5600| |dept-1| 003| jack| 7800| |dept-2| 004| jone| 2000| |dept-2| 005| lucy| 4500| |dept-2| 006| lili| 6300| |dept-2| 007| carry| 9000| +------+------+--------+------+
四、Spark SQL之ThirftServer和beeline使用
Spark SQL也可以使用其JDBC / ODBC或命令行界面充当分布式查询引擎。
thriftserver和spark-shell/spark sql的区别:
spark-shell,spark-sql都是一个spark application
thriftserver,不管你启动多少个客户端(beeline/code),只要是连在一个thriftserver上,永远都是一个spark application,解决了一个数据共享的问题,多个客户端可以共享数据。
用thriftserver,在UI中能直接看到sql的执行计划,方便优化
总结
基于Spark的thirftserver来访问hive中的数据,可以让多个客户端连接到同一个服务器端,跑的是同一个application
Thirftserver作为服务端,beeline作为客户端来访问服务端,支持多个客户端同时访问,有助于多个客户端之间数据的共享
使用步骤:
第一步:启动metastore服务
bin/hive --service metastore
Starting Hive Metastore Server
第二步:启动thriftserver
sbin/start-thriftserver.sh
starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /opt/Hadoop/spark/logs/spark-caizhengjie-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-node1.out
第三步:通过客户端beeline来连接
[caizhengjie@node1 spark]$ bin/beeline Beeline version 1.2.1.spark2 by Apache Hive
beeline> !connect jdbc:hive2://node1:10000
Connecting to jdbc:hive2://node1:10000 Enter username for jdbc:hive2://node1:10000: caizhengjie Enter password for jdbc:hive2://node1:10000: ****** 20/09/24 01:26:56 INFO Utils: Supplied authorities: node1:10000 20/09/24 01:26:56 INFO Utils: Resolved authority: node1:10000 20/09/24 01:26:56 INFO HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://node1:10000 Connected to: Spark SQL (version 2.4.6) Driver: Hive JDBC (version 1.2.1.spark2) Transaction isolation: TRANSACTION_REPEATABLE_READ
下面就可以通过SQL命令来来访问hive中的数据表
show databases;
+---------------+--+ | databaseName | +---------------+--+ | default | | kfk | +---------------+--+
use kfk;
+---------+--+ | Result | +---------+--+ +---------+--+
show tables;
+-----------+------------+--------------+--+ | database | tableName | isTemporary | +-----------+------------+--------------+--+ | kfk | test | false | +-----------+------------+--------------+--+
select * from test;
+---------+-----------+--+ | userid | username | +---------+-----------+--+ | 0001 | java | | 0002 | python | | 0003 | c | | 0004 | hadoop | | 0005 | php | | 0006 | linux | | 0007 | spark | +---------+-----------+--+
最后,我通过测试,使用了2个客户端beeline的连接,查看web监控页面
其实就是一个application,每个beeline只作为一个job