一、实验目的
掌握流式DataFrame的选择、投影和聚合操作。
掌握流式DataFrame的JOIN连接操作。
二、实验内容
1、读取json格式的文件数据源,并执行转换操作。
2、连接两个DataFrame。
三、实验原理
使用流数据格式的DataFrame,可以将任何select和filter转换应用到它,以及应用任何在个别列上操作的Spark SQL函数。此外,基本聚合和高级分析函数也可用于流DataFrame。
在流DataFrame中,不支持以下DataFrame转换,因为它们太过复杂,无法维护状态,或者由于流数据的无界性。
• 在流DataFrame上的多个聚合或聚合链。
• limit和take N行。
• distinct转换。但是,有一种方法可以使用唯一标识符来删除重复数据。
• 在没有任何聚合的情况下对流DataFrame进行排序。然而,在某种形式的聚合之后排序是得到支持的。
四、实验环境
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8.162,Spark-2.3.2,Hadoop-2.7.3
五、实验步骤
5.1 启动Spark集群
1、在终端窗口下,输入以下命令,分别启动Spark集群:
1. $ start-dfs.sh 2. $ cd /opt/spark 3. $ ./sbin/start-all.sh
2、在HDFS上面创建文件
1. hdfs dfs -mkdir -p /data/dataset/streaming
3、启动spark-shell。在终端窗口下,执行如下命令:(注意,请将以下命令中的localhost替换为虚拟机实际的机器名)
1. $ spark-shell --master spark://localhost:7077
5.2 数据源说明
创建一个数据源文件,文件存放位置位于”/data/dataset/streaming/“目录下。在终端窗口下,执行以下命令:
1. $ cd /data/dataset/streaming/ 2. $ vi language.json 文件内容如下: 1. {"name":"python","age":12} 2. {"name":"scala","age":23} 3. {"name":"python","age":15}
5.3 结构化流DataFrame基本操作:投影和过滤
1、首先创建一个Schema。在spark-shell窗口下,分别执行如下代码:
1. import org.apache.spark.sql.types._ 2. 3. val personSchema = new StructType().add("name", StringType,false).add("age", IntegerType,false)
2、读取文件数据源。在spark-shell窗口下,分别执行如下代码:
1. val data = spark.readStream.schema(personSchema).json("hdfs://localhost:9000/data/dataset/streaming/")
3、过滤和投影。选择查看数据中的name 和 age 字段,并且过滤 age 大于 12的人。在spark-shell窗口下,执行如下代码:
1. val result1 = data.select($"name",$"age").where($"age" > 12)
4、将流查询结果输出到控制台,并启动流程序。在spark-shell窗口下,执行如下代码:
1. val query1 = result1.writeStream.format("console").outputMode("append").start()
5、将本地的/data/dataset/language.json文件上传到HDFS受流程序监视的目录中。另打开一个终端,在该终端窗口下,执行如下命令:
1. $ hdfs dfs -put /data/dataset/streaming/language.json /data/dataset/streaming/
6. 切换回spark-shell窗口,可以看到如下的输出结果:
7、结束流程序运行。在spark-shell窗口下,执行如下代码:
1. query1.stop
5.4 结构化流DataFrame基本操作:数据聚合操作
1、删除HDFS目录下的文件
1. $ hdfs dfs -rm /data/dataset/streaming/language.json
2、按name字段对数据进行分组,并分组对age字段进行求和。在spark-shell窗口下,执行如下代码:
1. import org.apache.spark.sql.types._ 2. 3. val personSchema = new StructType()add("name", StringType,false).add("age", IntegerType,false) 4. 5. val data = spark.readStream.schema(personSchema).json("hdfs://localhost:9000/data/dataset/streaming") 6. 7. val result2 = words.groupBy($"name").sum("age") 8. 9. val query2 = result2.writeStream.format("console").outputMode("complete").start()
3、将本地的/data/dataset/language.json文件上传到HDFS受流程序监视的目录中。另打开一个终端,在该终端窗口下,执行如下命令:
1. $ hdfs dfs -put /data/dataset/streaming/language.json /data/dataset/streaming/
4. 切换回spark-shell窗口,可以看到如下的输出结果:
由上面结果可以看出对两条python的数据中的age值进行了求和。
5、结束流程序运行。在spark-shell窗口下,执行如下代码:
1. query2.stop
5.3 连接两个流DataFrame
可以用一个streaming DataFrame来做最酷的事情之一,就是join一个静态的DataFrame或者另一个streaming DataFrame。从Spark 2.3开始,结构化流支持join两个streaming DataFrames。
1、删除HDFS目录下的文件
1. $ hdfs dfs -rm /data/dataset/streaming/language.json
2、读取流数据。在spark-shell窗口下,执行如下代码:
1. import org.apache.spark.sql.types._ 2. 3. val personSchema = new StructType().add("name", StringType,false).add("age", IntegerType,false) 4. 5. val dataDF = spark.readStream.schema(personSchema).json("hdfs://localhost:9000/data/dataset/streaming")
3、通过join把两个DataFrame连接在一起(按name字段进行连接)。在spark-shell窗口下,执行如下代码:
1. val df = dataDF.join(dataDF, "name")
4、输出连接后的DataFrame。在spark-shell窗口下,执行如下代码:
1. val query = df.writeStream.format("console").outputMode("append").start()
5、将本地的/data/dataset/language.json文件上传到HDFS受流程序监视的目录中。另打开一个终端,在该终端窗口下,执行如下命令:
1. $ hdfs dfs -put /data/dataset/streaming/language.json /data/dataset/streaming/
6. 切换回spark-shell窗口,可以看到如下的输出结果:
7、结束流程序运行。在spark-shell窗口下,执行如下代码:
1. query.stop
— END —