开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第四阶段:数据预处理-系统监控-本地监控介绍】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/672/detail/11673
数据预处理-系统监控-本地监控介绍
内容介绍:
一.系统监控功能(数据预处理的监控功能)
二.任务实时监控
之前完成了数据推送中的查询类和预定类功能的实现。接着来查看数据预处理流程。
预处理前、中、后已经处理完成,并且最终推送到 Kafka 中,还需要完成最后模块-系统监控模块。
一.系统监控功能(数据预处理的监控功能)
从第一批数据的链路统计到数据清洗、脱敏、拆分、分类,解析、判断、结构化、推送,这几个过程是所有流程中对它整个进行一个监控,该预处理的整个流程处理速度是什么以及每个电路有多少请求、最后一次请求是多少及集群是否健康,这些数据都要监控起来后写到数据库中。
二.任务实时监控
目标:监控每个链路(在流程中存在多个服务器,每一台都为一个链路)。一个批次内采集的数据量(每个链路被访问了多少次)、数据预处理程序的健康状况、预处理的速度等信息,最终以图表的形式直观的展示。(在首页中找到系统监控可以看见实时流量转发的情况的每个链路流量转发的曲线)
需求:
在任务运行过程当中,需要对任务进行实时的监控,每处理一个批次,看下处理的速度和时间还有状态,从而能够显著的提高跟踪任务的效率。
spark 当中提供了一个监控功能,路径为 http://localhost:4040/metrics/json/,(在任务运行起来以后,有一个本地目录,访问 local 下的4040,有一个 metrics/json 路径,在该路径下会看到一个 json 格式的数据如下。)
该数据是spark任务在运行的时候的一个监控界面,作为监控数据最为合适。该数据中记录一些相应日期看数据记录信息。先运行任务:在 DataProcessLauncher.scala 中在程序主入口下有代码设置日志
级别:
LoggerLevels.setStreamingLogLevels()
//当应用被停止的时候,进行如下设置可以保证当前批次执行完之后
再停止应用。
System.setProperty("spark.streaming.stopGracefullyOnShutdow
n","true")
//1、创建 Sparkconf
valconf=new
SparkConf().setAppName("DataProcess").setMaster("local[2]")
.set("spark.metrics.conf.executor.source.jvm.class",
"org.apache.spark.metrics.source.JvmSource")//
开启集群监控功能在设置数据入口时添加了 Sparkconf 功能,只有开启了任务监控功能,才能够在任务运行起来后访问4040/metrics/json,才能够看到上述界面。如果不开启这个功能看不到。
接着执行任务,开启监控,执行完任务再访问路径,打开网址 http://localhost:4040/metrics/json/显示如图,该界面为 spark 提供的监控界面。刷新后数据发生改变(日期时间戳发生变化)是因为两秒钟迭代一次。监控的指标记录每一次迭代起始时间和结束时间。图中有 processingEndTime 和 processingStartTime。每次迭代使用结束时间减去开始时间则是每个批次运行的时间,即为总长度,再用总数据量除以运行时间可以得出速度。
可以看到图中数据比较杂乱,可以进行整理。提供的素材中的资料包中有数据样本,其中有监控数据样例。
打开显示如图:
该监控数据的样本是通过数据格式化后的结果,是一个json格式。
现在要获取结束时间和开始时间的值,即拿到图中的“value”:1517307625168和”value”:1517307625012,而key值为value前的内容。其中开始时间和结束时间的key值不会发生改变,只有日期会发生变化。所以要得到结束时间应该先求出key值,可以看到图中的key值为几个内容进行拼接:
“local-1517307598019.driver.streaming-data-process.Streami
ngMetrics.streaming.lastCompletedBath_processingEndTime”:{
“value”:1517307625168}
目前执行的任务为 SparkStreaming 任务,local-1517307598019是 streaming 任务的唯一id,任务不停止,id
则不会变化,刷新保持不变。.driver.固定。在代码DataProcessLauncher.scala
中初始化了任务名称:valconf=new
SparkConf().setAppName("DataProcess").setMaster("local[2]")
其中 DataProcess 设置为当前任务的名称。
.StreamingMetrics.streaming.lastCompletedBath_processingEndTime
为固定。
即结束时间开始时间两者为任务 id+driver+任务名称+开始/结束的字符串。
如果获取到了任务 id 和任务名称,那么key值即为将四个内容拼接起来,得到key值后获取 value 值。在整个任务中为一个大范围的 json,在 json 中有很多值例如 gauges、counters 等。gauges 为 json 中的一个节点,而{}中的所有值都属于该节点。要得到结束时间应该先得到节点的所有数据才能获取key值。总结读取开始时间和结束时间:先获取所有的 key 值,即浏览器中 url 反映出的所有数据,再获取到以 gauges 为 key 值的节点,以 gauges 为节点的所有数据。再去
获取拼接出的 key,再拿到 key 对应的 value 值。
以上就是获取数据中的开始时间和结束时间的内容。