四、Spark SQL 操作Hive表
4.1 文件配置
分别复制 hive lib、conf 目录下文件到 spark 的jars 目录下
[root@zj1 sbin]# cd /opt/soft/hive110/lib/ [root@zj1 lib]# cp mysql-connector-java-5.1.39-bin.jar /opt/soft/spark234/jars/ [root@zj1 hive110]# cd conf/ [root@zj1 conf]# cp hive-site.xml /opt/soft/spark234/conf/
修改 spark hive-site.xml
加入
<property> <name>hive.metastore.uris</name> <value>thrift://zj1:9083</value> </property>
执行
hive --service metastore
4.1 操作 Hive 表
// 原有spark不支持 原有激活状态的spark先stop scala> spark.stop scala> import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession scala> val spark = SparkSession.builder().appName("spark-hive").enableHiveSupport.getOrCreate // 通过SQL 命令直接操作 hive 表 scala> spark.sql("select * from mydemo.order").show +----+----------+----+ |name| orderdate|cost| +----+----------+----+ |jack|2015-04-03| 23| |jack|2015-01-01| 10| |tony|2015-01-02| 15| |mart|2015-04-11| 75| |neil|2015-06-12| 80| |mart|2015-04-13| 94| +----+----------+----+ scala> val spk= spark.sql("select * from mydemo.order") scala> spk.repartition(1).write.format("csv").save("hdfs://192.168.56.137:9000/20200109") // 如下 csv文件写到hdfs上
// 如下 表写到hive上 scala> spk.filter($"name".startsWith("jack")).write.saveAsTable("xxx")
我们到 hive 中查询结果 , 发现 hive 中出现 “xxx” 表
我们还可以通过spark 往表中插入数据
// 往 XXX 表中插入数据 scala> spark.sql("insert into xxx values('jm','2020-09-09',99)") 1 2 五、Spark SQL 连 MySQL // 启动 带jar 包 [root@zj1 bin]# ./spark-shell --jars /opt/soft/spark234/jars/mysql-connector-java-5.1.39-bin.jar scala> val prop = new java.util.Properties prop: java.util.Properties = {} scala> prop.setProperty("driver","com.mysql.jdbc.Driver") res0: Object = null scala> prop.setProperty("user","root") res1: Object = null scala> prop.setProperty("password","ok") res2: Object = null // 从mysql中读取表 scala> val jdbcDF = spark.read.jdbc("jdbc:mysql://192.168.56.137:3306/mydemo","users",prop) scala> jdbcDF.show +---+--------+----------+ | id|username| birthday| +---+--------+----------+ | 1| zs|1999-09-09| | 2| ls|1999-09-08| | 4| zl|1989-09-08| +---+--------+----------+ // 过滤 scala> jdbcDF.filter($"username".endsWith("s")).write.mode("append").jdbc("jdbc:mysql://192.168.56.137:3306/mydemo","myuser",prop)
六、Spark SQL 内置函数
// 建一个数组 val mylog = Array("2019-12-27,001","2019-12-27,001","2019-12-27,002","2019-12-28,001","2019-12-28,002","2019-12-28,002") // 导包 import org.apache.spark.sql.Row import org.apache.spark.sql.types._ // 根据集合数据生成RDD scala> val rdd = sc.parallelize(mylog).map(x=>{ | val sp = x.split(",") | Row(sp(0),sp(1).toInt) | }) // 定义DataFrame的结构 val struct = StructType(Array( StructField("day",StringType,true), StructField("userid",IntegerType,true) )) val df = spark.createDataFrame(rdd,struct) scala> df.show +----------+------+ | day|userid| +----------+------+ |2019-12-27| 1| |2019-12-27| 1| |2019-12-27| 2| |2019-12-28| 1| |2019-12-28| 2| |2019-12-28| 2| +----------+------+ import org.apache.spark.sql.functions._ scala> df.groupBy("day").agg(count("userid").as("pv")).show +----------+---+ | day| pv| +----------+---+ |2019-12-28| 3| |2019-12-27| 3| +----------+---+ scala> df.groupBy("day").agg(countDistinct("userid").as("pv")).show +----------+---+ | day| pv| +----------+---+ |2019-12-28| 2| |2019-12-27| 2| +----------+---+
七、Spark SQL 自定义函数
scala> val df = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.137:9000/20200102/events.csv") scala> df.printSchema // 设置 自定义函数 scala> spark.udf.register("eaddu",(eid:String,uid:String)=>eid+uid) scala> spark.sql("select event_id,eaddu(event_id,user_id) as fullid from events").show(3) +----------+-------------------+ | event_id| fullid| +----------+-------------------+ | 684921758|6849217583647864012| | 244999119|2449991193476440521| |3928440935|3928440935517514445| +----------+-------------------+