一、业务场景
通过实时计算所有计算机机架在一个滑动窗口上的平均温度,从而发现温度异常的机架,及时检测故障的发生。
二、数据集说明
本案例用到的实时数据说明如下:
- 原始数据文件位于本地/data/dataset/streaming/iot/file1.json和/data/dataset/streaming/iot/file2.json
以下为数据中心的两个数据传感器检测到的两个机架的温度数据:
file1.json:
{“rack”:”rack1”,”temperature”:99.5,”ts”:”2017-06-02T08:01:01”} {“rack”:”rack1”,”temperature”:100.5,”ts”:”2017-06-02T08:06:02”} {“rack”:”rack1”,”temperature”:101.0,”ts”:”2017-06-02T08:11:03”} {“rack”:”rack1”,”temperature”:102.0,”ts”:”2017-06-02T08:16:04”}
file2.json:
{“rack”:”rack2”,”temperature”:99.5,”ts”:”2017-06-02T08:01:02”} {“rack”:”rack2”,”temperature”:105.5,”ts”:”2017-06-02T08:06:04”} {“rack”:”rack2”,”temperature”:104.0,”ts”:”2017-06-02T08:11:06”} {“rack”:”rack2”,”temperature”:108.0,”ts”:”2017-06-02T08:16:08”}
三、操作步骤
阶段一、启动HDFS、Spark集群服务
1、启动HDFS集群
在Linux终端窗口下,输入以下命令,启动HDFS集群:
1. $ start-dfs.sh
2、启动Spark集群
在Linux终端窗口下,输入以下命令,启动Spark集群:
1. $ cd /opt/spark 2. $ ./sbin/start-all.sh
3、验证以上进程是否已启动
在Linux终端窗口下,输入以下命令,查看启动的服务进程:
1. $ jps
如果显示以下5个进程,则说明各项服务启动正常,可以继续下一阶段。
1. 2288 NameNode 2. 2402 DataNode 3. 2603 SecondaryNameNode 4. 2769 Master 5. 2891 Worker
阶段二、准备案例中用到的数据集
1、在HDFS上创建数据存储目录。在Linux终端窗口下,输入以下命令:
1. $ hdfs dfs -mkdir -p /data/dataset/streaming/iot-input
2、将iot数据文件上传到HDFS上。在Linux终端窗口下,输入以下命令:
1. $ hdfs dfs -put /data/dataset/streaming/iot/* /data/dataset/streaming/iot-input/
3、在Linux终端窗口下,输入以下命令,查看HDFS上是否已经有了iot目录:
1. $ hdfs dfs -ls /data/dataset/streaming/iot-input
这时应该看到该目录下有两个文件:file1.json和file2.json。
阶段三、处理实时数据
1、启动spark shell。在Linux终端窗口中,输入以下命令:(注意,请将以下命令中的localhost替换为虚拟机实际的机器名)
1. $ spark-shell --master spark://localhost:7077
执行以上代码,进入到Spark Shell交互开发环境:
2、首先需要导入案例代码运行所依赖的包。在paste模式下,输入以下代码:
1. // 首先导入包 2. import org.apache.spark.sql.types._ 3. import org.apache.spark.sql.functions._
同时按下【Ctrl + D】按键,执行以上代码,输出结果如下:
3、构造数据Schema模式,注意其中温度和时间的数据类型。在paste模式下,输入以下代码:
1. // 定义schema 2. val iotDataSchema = new StructType().add("rack", StringType, false) 3. .add("temperature", DoubleType, false) 4. .add("ts", TimestampType, false)
同时按下 【Ctrl + D】 键,执行以上代码,输出结果如下:
4、加载数据文件到DataFrame中。在paste模式下,输入以下代码:
1. // 读取温度数据 2. val dataPath = "hdfs://localhost:9000/data/dataset/streaming/iot-input" 3. val iotSSDF = spark.readStream.schema(iotDataSchema).json(dataPath)
同时按下 【Ctrl + D】 键,执行以上代码,输出内容如下:
由以上输出内容可以看出,采集到的机架温度(json格式)数据被读取到了DataFrame中。
5、创建10分钟大小的滑动窗口,并在temperature列上求机架温度的平均值。在paste模式下,输入以下代码:
1. // group by一个滑动窗口,并在temperature列上求平均值 2. val iotAvgDF = iotSSDF.groupBy(window($"ts", "10 minutes", "5 minutes")) 3. .agg(avg("temperature") as "avg_temp")
同时按下 【Ctrl + D】 键,执行以上代码,输出内容如下:
6、将数据写出到memory data sink。在paste模式下,输入以下代码:
1. // 将数据写出到memory data sink,使用查询名称iot 2. val iotMemorySQ = iotAvgDF.writeStream 3. .format("memory") 4. .queryName("iot") 5. .outputMode("complete") 6. .start()
同时按下 【Ctrl + D】 键,执行以上代码,输出内容如下:
7、在iot上执行sql查询,显示数据,以start时间排序。在paste模式下,输入以下代码:
1. // 显示数据,以start时间排序 2. spark.sql("select * from iot").orderBy($"window.start").show(false)
同时按下【 Ctrl + D 】键,执行以上代码,输出结果如下:
从实时检测的结果来看,机架的平均温度一直在上升,其中8:15-8:25这十分钟内的机架温度最高。
8、停止该流查询,命令行下执行如下命令:
1. iotMemorySQ.stop
按下【Enter】键,执行以上代码。
阶段四、自行练习
1、在本案例的基础上,请继续分析,找出是哪些机架的温度在上升。