开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第四阶段:数据预处理-系统监控_监控代码上】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/672/detail/11675
数据预处理-系统监控_监控代码上
实现系统监控功能
上节课已经搭建好 SparkStreamingMonitor.streamMonitor,方法和参数都已经设置成功,并且整理出了系统监控功能思路。接着根据思路来进行实现。第一个计算数据的总量,一个批次中到底存在多少数据,先来定义:输入
//1计算总的数据量
valdataCount=rdd.count()
rdd.count()
返回 rdd 中有多少数据
第二步根据url来获取 json 数据,首先定义一个url,再定义jsondata来获取所有数据:输入
//2通过http://localhost:4040/metrics/json/url
获取 json 数据
valurl=”http://localhost:4040/metrics/json/”
valjsonData=SparkMetricsUtils.getMetricsJson(url)
第三获取节点下所对应的数据,来获取全部括号中的数据,定义一个接收数据的变量名称,调用 json 提供的 getObject 方法:输入//3获取”gauges”对应的数据
valgaugesJson=jsonData.getJSONObject(key=”gauges”)
第四步第五步拼接结束时间和开始时间 path,首先要有任务的id和任务的名称再进行拼接获取。通过 SparkContext进行获取,获取到任务id和名称后进行拼装。id 加 driver,再加上appname再加上结束时间或者开始时间。
输入:
//获取任务的ID任务的名称
valappID=sc.applicationID
valappName=sc.appName
//4拼接结束时间的path
valendTimePath=appId+”.driver”
+appName+”.StreamingMetrics.streaming.lastCompletedBatch_pr
ocessingEndTime”
//5拼接开始时间的path
valstartTimePath=appId+”.driver”
+appName+”.StreamingMetrics.streaming.lastCompletedBatch_pr
ocessingStartTime”
第六步获取开始时间和结束时间,已经获取到了路径,先获取开始时间。依然调用 gaugesJson,然后输入startTimePath,此处不能添加引号。加了引号代表着它是一个字符串,而此处是代表一个变量,不能加双引号,此处就获取到了时间。接着添加接收,先临时定义为tmp,数据当中获取的 value 不为最终的数据,所以是一个临时数据。
接下来获取最终的数据,需要做一个判断,因为这个数据有可能是空的。所以这里要做一个判断:如果获取的值不为空才去获取。数据类型设置成 long 初始化,通过时间戳获取到 key 值。输入:
//6获取出开始时间和结束时间
//获取出开始时间
valtmpstartTime=gaugesJson.getJSONObject(startTimePath)
//获取最终的开始时间戳
varstartTime:Long=0
if(tmpStartTime!=null){
StartTime=tmpStartTime.getLong(key=”value”)
同样思路:使用 tmpEndTime,结束时间路径获取后获取临时结束时间。再用临时的结束时间判断,若不为空再定义endTime,Long 类型=0,再用long类型接收结束时间,再用getLong拿到时间戳就为 endTime
//获取出结束时间
valtmpEndTime=gaugesJson.getJSONObject(endTimePath)
//获取最终的开始时间戳
varendTime:Long=0
if(tmpEndTime!=null){
endTime=tmpEndTime.getLong(key=”value”)
第七个是求时间差,时间差等于结束时间减去开始时间。用运行时间进而求出速度,输入:
//7求出时间差(结束时间-开始时间)
valrunTime=endTime-startTime
第八步求运行速度,但需要做一个判断,时间不能为0。如果为0则报错。并且此处数据总量/时间得到的结果可能非常小,所以将两个都定义为 float 类型。输入
//8计算任务运行的速度(数据总量/时间)
if(runTime>0){
valrunSpeed=dataCount.toFloat/runTime.toFloat
}
第九步进行封装数据。需要哪些数据取决于业务需要哪些数据。业务需要的数据在页面中,即前端需要展现哪些数据。在实时流量转发的情况中,数据是从数据库中读取的。此时需要与前端工程师协同,后端按照哪种规则写代码,前端需要按照规则进行读取然后并且进行展现。之前在数据采集中已经做了一个约定,现在在此处也需要做一个约定,该界面已经开发好,即格式已经规定好。可以在代码
IndexSystemMonitorController中进行查看之前的约定。
/**
*
*获取实时流量转发情况
*/
@RequestMapping(value="/getRealTimeTraffic",method=
RequestNethod.POST)
@ResponseBody
publicMap<String,Object>
getRealTimeTraffic(HttpServletRequestrequest){
Map<string,objectmap=newHashMap<~>();
try{
map=TrafficUtil.getForwardInfo(minutes:
-20,Constants.CSANTI_MONITOR_DP);
}
点击getForwardInfo进入查看:
/**
*获取minutes时间内对应的值
*@paramminutes时间
*@paramkey键值
*@return
*/
publicstaticMap<String,Object>getForwardInfo(int
minutes,Stringkey){
Map<String,Object>mapS=newHashMap<~>();
//20分钟内的jsonVO
List<JsonVO>jsonVOs=getTrafficInfoByMinute(minutes,key);
List<String>endTimeList=newArrayList<~>();
找到JsonVO点击进入:
/**
*用于从redis缓存数据库中读取数据的Bean
*/
publicclassJsonVOimplements
Comparable<JsonVO>,java.io.Serializable{
privateStringkey;//redis系统内的key
privateStringcostTime;//时间差﹑任务运行时间(end-start)
privateMap<String,Object>serversCountMap;//链路统计中的结
果
privateStringapplirationId;//appID
privateStringcountPerMillis;//speed
privateStringapplicationUniqueName;//appname
privateStringendTime;//任务运行结尾时间
privateStringsourceCount;//一批次数据数据总量
将数据展现到前端需要用到这些数据,这些数据是从数据库中读取出,即约定好的数据。按着该数据读取,就要按着顺序来编写。
其中时间差任务运行时间 runTime、链路统计中的结果 serverCount、appID、speed、appname、任务运行结尾时间 endTime(此处为时间戳,需要将时间戳转换为年月日格式)、一批次数据数据总量(RDD 总数 rdd.count())这些数据都已经存在,只需要进行封装。