数据预处理-系统监控-创建计算方法及监控实现思路|学习笔记

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 快速学习数据预处理-系统监控-创建计算方法及监控实现思路

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第四阶段数据预处理-系统监控-创建计算方法及监控实现思路】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/672/detail/11674


数据预处理-系统监控-创建计算方法及监控实现思路


内容介绍:

一、需求

二、设计

三、代码实现


上节讲解了如何获取 json 中的 key 结束时间和开始时间:先获取整个 json,再获取以 gauges 为节点的所有值,再以key获取结束时间和开始时间。该key需要用到任务唯一id+driver+任务名称+开始/结束时间,后获取 value 值。


一、需求

数据中存储了很多的任务指标,其中有批处理的开始时间、批处理的结束时间,再加上计算出批处理的数据量,就可以得知批处理的平均计算速度(速度=数据量/时间),从而对任务的速度进行监控。

当然,除了时间,还有很多其他的指标可供统计,请自行使用。

图片1.png

 

二、设计

1、解析监控 json 数据

2、获取 gauges 节点下的数据

3、拿到批处理的开始时间和结束时间,计算批处理花费的时间

4、计算 rdd 的 count

5、计算批处理的平均计算速度

6、将指标 appid.appname、endtime、rddcount、花费时间、速度、serversCountMap 封装到 map 中

7、将 map 存储到 Redis,(预处理的监控功能完成,写入后首页中的图自动显示数据)key 设计为包含时间戳的


三、代码实现

在代码DataProcessLauncher.scala中,首先创建工程SparkStreamingMonitor.streamMonitor,输入

SparkStreamingMonitor.streamMonitor()

会标红,是因为此时还不存在,在工程 dataprocess下的 businessprocess 下右键新建一个 Scala Class,Name为SparkStreamingMonitor.streamMonitor,Kind为Object

创建出的该代码主要用于实现系统监控的过程。

创建完成后回到代码DataProcessLauncher.scala中,方法已经引入,来介绍需要哪些参数:解析数据需要拿到所有数据,拿到所有数据需要 url。再解析结束时间和开始时间,解析需要拼接 key 和任务 id、名称,而任务id与名称存在 SparkContext 中。在数据预处理的程序代码中存在 SparkContext:

//数据预处理的程序

def setupSsc(sc: sparkContext,kafkaParams: Map[String,String], topics: Set[String]): StreamingContext

//程序初始化阶段

//实例redis为后续使用

val redis =JedisconnectionUtil.getJedisCluster

//3 、创建Streaming context

val ssc=new StreamingContext(sc,Seconds(2))

在监控过程中任务的id与名称都设置在任务的 Spark 中,所以首先需要传入 SparkContext,因为后续计算需要用到任务名称和任务 id,所以直接调用 SparkContext 获取到。sc存在后就可以使用拼接获取到key值,第一个参数完成。获取到开始时间和结束时间可以计算出时间差,拥有时间差计算出速度还需要数据总量。

现在这个监控功能在数据循环中:

//5、消费数据

KafkaValue.foreachRDD(rdd=>{//迭代运行(每2秒运行一次)})

在 rdd 中,即此处获取到的数据会有多个。此处是一个rdd。上述代码:

//4、读取 kafka 内的数据 ssc, kafkaParams , topics)

val kafkaData=KafkaUtils.createDirectStream[ String,String,StringDecoder ,StringDecoder](ssc ,kafka

//真正的数据

val KafkaValue=kafkaData.map(_._2)

读取原始数据,过滤出第二个数据,然后遍历此处的 rdd,为最原始的 rdd。后续使用rdd进行一个操作:

//2数据清洗功能

//定义方法,参数为一条数据和广播变量

val filteredData=rdd.filter(message=>URLFilter.filterURL(message,broadcastFilterRuleList.value))

//filteredData.foreach(println)

过滤清洗掉其他数据后又返回一个 rdd,rdd.filter 为没有经过过滤的,原始数据,其中还存在无效数据。filteredData 为过滤后的有用的数据。需要以没有经过处理的rdd为准。

系统监控监控的为从链路统计后所有链路都经过的监控,只要预处理的板块包括的,都是预处理完成的工作,所以计算效率时都应该包括。数据清洗也是工作,也需要包括。所以我们要获取的是总的数据量。

图片1.png

所以需要第二个参数 rdd,通过 rdd 拿到数据的总数,通过sc求取总时间差,进而求出速度。

在数据进行采集过程中,实时流量转发可以计算出,当前系统运营状态为前端代码,还存在各链路流量转发情况。之前做过链路统计功能:

//1链路统计功能

LinkCount.LinkCount(rdd)

链路采集功能统计了第一个结果值,第二个结果值,然后把它写到数据库里。在我们当前监控过程当中,其中需要各链路流量的转发情况,即每一链路被访问了多少次需要进行统计。链路统计中已经计算出每个服务器在一个批次里面被访问了多少次。现在要使用该数据可以直接使用,但是链路统计没有返回值。

//每个服务器本批次数据访问了多少次

//1 遍历 rdd 获取到每条数据

val serverCount=rdd.map(message=>{

现在将该数据进行返回即可。获取计算出来的每一个服务器当前批次被访问的总数放到最后。输入

serverCount.collectAsMap()

转化为 map 返回。返回时不能为unit数据类型。所以将代码

def linkCount(rdd: RDD[String]): Unit={

修改为:

def linkcount(rdd: RDD[String]): collection.Map[String,Int] = {

返回就需要有接收,将链路统计功能的代码:

LinkCount.linkcount(rdd)

修改为

val serverCount=LinkCount.linkcount(rdd)

就完成了将每个服务器本批次数据访问了多少次返回。

目前 serverCount 获取了总数,而数据库中需要展示数据。所以也需要放入到监控的方法中作为第三个参数,即

SparkStreamingMonitor.streamMonitor(sc,rdd, serverCount,)

计算好的流程需要放入 redis 中,所以还需要 redis 的实例。在数据预处理的程序下 redis 进行实例:

//程序初始化阶段

//实例 redis 为后续使用

val redis=JedisConnectionUtil.getJedisCluster

将该 redis 也传入方法中:

SparkStreamingMonitor.streamMonitor(sc,rdd, serverCount,redis)

用该四个参数就可以实现数据的监控功能。最终的 redis 为写入数据库做准备。

写入参数后再来创建方法,打开代码

SparkStreamingMonitor.scala,

进入后在object SparkStreamingMonitor{中输入

def streamMonitor(sc,rdd,serverCount,redis): Unit={

}

继续添加数据类型,sc 的数据类型为 SparkContext,进行传入;rdd 数据类型为 RDD[String]类型,

图片1.png

 

进行引入;serverCount 数据类型为 collection.Map[String,Int],进行引入;redis 数据类型为 JedisCluster,

图片1.png

进行引入。即

//实现系统监控功能

def streamMonitor(sc:SparkContext,rdd:RDD[String],serverCount:collection.Map[String,Int],redis:JedisCluster): Unit={

}

以上就搭建好了集群监控的代码,主要实现系统监控功能。按照架构需要以下思路:

思路与关键代码:

1开启任务监控功能(SparkConf)

在代码DataProcessLauncher.scala中已经开启了任务监控:创建 Spark conf 已经开启了集群监控功能

2、通过http://localhost:4040/metrics/json/ url获取 json 数据

3、获取“gauges"对应的数据

4、拼接结束时间的 path

5、拼接开始时间的 path

6、获取出开始时间和结束时间

两个 key 拼接完成后获取到两个value值

7、求出时间差(结束时间-开始时间)

8、计算任务运行的速度(数据总量/时间)

再添加步骤计算总的数据量

9、封装展现需要的数据

10、将数据写入 redis

以上就是实现思路。

首先任务开启后,去程序中根据 url 把 json 读出来,再去获取 gauges 对应的数据,拼接开始时间和结束时间的path,去获取出开始时间和结束时间两个key拼接完把值获取出来,用结束时间减开始时间等于时间差,再去计算任务运行的速度(数据总量/时间),数据的总量用 RDD,添加步骤//计算总的数据量,再封装展现需要的数据,除了速度外还需要其他的数据,这些数据都需要封装到数据库里,最后把数据写入 redis 中,以上为实现思路。

相关文章
|
8月前
|
数据采集 SQL 运维
巧用指标平台DataIndex,五步法轻松实现指标管理
在业务发展初期,企业需要做好规范的指标管理,以保证随着业务的不断发展,数据化决策能够成为业务强有力的支撑。本文将为大家详解如何通过袋鼠云指标管理平台DataIndex 进行规范化的指标开发管理,轻松开发指标,避免各类指标问题。
405 0
|
1月前
|
机器学习/深度学习 搜索推荐 算法
推荐系统离线评估方法和评估指标,以及在推荐服务器内部实现A/B测试和解决A/B测试资源紧张的方法。还介绍了如何在TensorFlow中进行模型离线评估实践。
推荐系统离线评估方法和评估指标,以及在推荐服务器内部实现A/B测试和解决A/B测试资源紧张的方法。还介绍了如何在TensorFlow中进行模型离线评估实践。
237 0
|
1月前
|
C++
【SPSS】两独立样本T检验分析详细操作教程(附案例实战)
【SPSS】两独立样本T检验分析详细操作教程(附案例实战)
573 0
|
7月前
|
数据采集 数据可视化 数据挖掘
数据抽样技术全面概述
抽样是研究和数据收集中不可或缺的方法,能够从更大数据中获得有意义的见解并做出明智的决定的子集。
76 2
|
编解码 自然语言处理 数据可视化
MIM方法为什么简单高效?可视化和大规模实验给出了答案
MIM方法为什么简单高效?可视化和大规模实验给出了答案
139 0
MIM方法为什么简单高效?可视化和大规模实验给出了答案
|
数据采集 存储 JSON
数据预处理-系统监控-监控代码下|学习笔记
快速学习数据预处理-系统监控-监控代码下
67 0
数据预处理-系统监控-监控代码下|学习笔记
|
数据采集 JSON 缓存
数据预处理-系统监控_监控代码上|学习笔记
快速学习数据预处理-系统监控_监控代码上
82 0
|
数据采集 监控 开发者
网站流量日志分析--数据预处理--点击流模型 pageviews 编程实现思路| 学习笔记
快速学习网站流量日志分析--数据预处理--点击流模型 pageviews 编程实现思路
151 0
网站流量日志分析--数据预处理--点击流模型 pageviews 编程实现思路| 学习笔记
|
数据采集 消息中间件 监控
数据预处理-链路统计实现思路|学习笔记
快速学习数据预处理-链路统计实现思路
96 0
数据预处理-链路统计实现思路|学习笔记
|
数据采集 消息中间件 分布式计算
数据预处理-链路统计-serverCoverCount计算|学习笔记
快速学习数据预处理-链路统计-serverCoverCount计算
74 0
数据预处理-链路统计-serverCoverCount计算|学习笔记

热门文章

最新文章