数据预处理-系统监控-创建计算方法及监控实现思路|学习笔记

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
全局流量管理 GTM,标准版 1个月
简介: 快速学习数据预处理-系统监控-创建计算方法及监控实现思路

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第四阶段数据预处理-系统监控-创建计算方法及监控实现思路】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/672/detail/11674


数据预处理-系统监控-创建计算方法及监控实现思路


内容介绍:

一、需求

二、设计

三、代码实现


上节讲解了如何获取 json 中的 key 结束时间和开始时间:先获取整个 json,再获取以 gauges 为节点的所有值,再以key获取结束时间和开始时间。该key需要用到任务唯一id+driver+任务名称+开始/结束时间,后获取 value 值。


一、需求

数据中存储了很多的任务指标,其中有批处理的开始时间、批处理的结束时间,再加上计算出批处理的数据量,就可以得知批处理的平均计算速度(速度=数据量/时间),从而对任务的速度进行监控。

当然,除了时间,还有很多其他的指标可供统计,请自行使用。

图片1.png

 

二、设计

1、解析监控 json 数据

2、获取 gauges 节点下的数据

3、拿到批处理的开始时间和结束时间,计算批处理花费的时间

4、计算 rdd 的 count

5、计算批处理的平均计算速度

6、将指标 appid.appname、endtime、rddcount、花费时间、速度、serversCountMap 封装到 map 中

7、将 map 存储到 Redis,(预处理的监控功能完成,写入后首页中的图自动显示数据)key 设计为包含时间戳的


三、代码实现

在代码DataProcessLauncher.scala中,首先创建工程SparkStreamingMonitor.streamMonitor,输入

SparkStreamingMonitor.streamMonitor()

会标红,是因为此时还不存在,在工程 dataprocess下的 businessprocess 下右键新建一个 Scala Class,Name为SparkStreamingMonitor.streamMonitor,Kind为Object

创建出的该代码主要用于实现系统监控的过程。

创建完成后回到代码DataProcessLauncher.scala中,方法已经引入,来介绍需要哪些参数:解析数据需要拿到所有数据,拿到所有数据需要 url。再解析结束时间和开始时间,解析需要拼接 key 和任务 id、名称,而任务id与名称存在 SparkContext 中。在数据预处理的程序代码中存在 SparkContext:

//数据预处理的程序

def setupSsc(sc: sparkContext,kafkaParams: Map[String,String], topics: Set[String]): StreamingContext

//程序初始化阶段

//实例redis为后续使用

val redis =JedisconnectionUtil.getJedisCluster

//3 、创建Streaming context

val ssc=new StreamingContext(sc,Seconds(2))

在监控过程中任务的id与名称都设置在任务的 Spark 中,所以首先需要传入 SparkContext,因为后续计算需要用到任务名称和任务 id,所以直接调用 SparkContext 获取到。sc存在后就可以使用拼接获取到key值,第一个参数完成。获取到开始时间和结束时间可以计算出时间差,拥有时间差计算出速度还需要数据总量。

现在这个监控功能在数据循环中:

//5、消费数据

KafkaValue.foreachRDD(rdd=>{//迭代运行(每2秒运行一次)})

在 rdd 中,即此处获取到的数据会有多个。此处是一个rdd。上述代码:

//4、读取 kafka 内的数据 ssc, kafkaParams , topics)

val kafkaData=KafkaUtils.createDirectStream[ String,String,StringDecoder ,StringDecoder](ssc ,kafka

//真正的数据

val KafkaValue=kafkaData.map(_._2)

读取原始数据,过滤出第二个数据,然后遍历此处的 rdd,为最原始的 rdd。后续使用rdd进行一个操作:

//2数据清洗功能

//定义方法,参数为一条数据和广播变量

val filteredData=rdd.filter(message=>URLFilter.filterURL(message,broadcastFilterRuleList.value))

//filteredData.foreach(println)

过滤清洗掉其他数据后又返回一个 rdd,rdd.filter 为没有经过过滤的,原始数据,其中还存在无效数据。filteredData 为过滤后的有用的数据。需要以没有经过处理的rdd为准。

系统监控监控的为从链路统计后所有链路都经过的监控,只要预处理的板块包括的,都是预处理完成的工作,所以计算效率时都应该包括。数据清洗也是工作,也需要包括。所以我们要获取的是总的数据量。

图片1.png

所以需要第二个参数 rdd,通过 rdd 拿到数据的总数,通过sc求取总时间差,进而求出速度。

在数据进行采集过程中,实时流量转发可以计算出,当前系统运营状态为前端代码,还存在各链路流量转发情况。之前做过链路统计功能:

//1链路统计功能

LinkCount.LinkCount(rdd)

链路采集功能统计了第一个结果值,第二个结果值,然后把它写到数据库里。在我们当前监控过程当中,其中需要各链路流量的转发情况,即每一链路被访问了多少次需要进行统计。链路统计中已经计算出每个服务器在一个批次里面被访问了多少次。现在要使用该数据可以直接使用,但是链路统计没有返回值。

//每个服务器本批次数据访问了多少次

//1 遍历 rdd 获取到每条数据

val serverCount=rdd.map(message=>{

现在将该数据进行返回即可。获取计算出来的每一个服务器当前批次被访问的总数放到最后。输入

serverCount.collectAsMap()

转化为 map 返回。返回时不能为unit数据类型。所以将代码

def linkCount(rdd: RDD[String]): Unit={

修改为:

def linkcount(rdd: RDD[String]): collection.Map[String,Int] = {

返回就需要有接收,将链路统计功能的代码:

LinkCount.linkcount(rdd)

修改为

val serverCount=LinkCount.linkcount(rdd)

就完成了将每个服务器本批次数据访问了多少次返回。

目前 serverCount 获取了总数,而数据库中需要展示数据。所以也需要放入到监控的方法中作为第三个参数,即

SparkStreamingMonitor.streamMonitor(sc,rdd, serverCount,)

计算好的流程需要放入 redis 中,所以还需要 redis 的实例。在数据预处理的程序下 redis 进行实例:

//程序初始化阶段

//实例 redis 为后续使用

val redis=JedisConnectionUtil.getJedisCluster

将该 redis 也传入方法中:

SparkStreamingMonitor.streamMonitor(sc,rdd, serverCount,redis)

用该四个参数就可以实现数据的监控功能。最终的 redis 为写入数据库做准备。

写入参数后再来创建方法,打开代码

SparkStreamingMonitor.scala,

进入后在object SparkStreamingMonitor{中输入

def streamMonitor(sc,rdd,serverCount,redis): Unit={

}

继续添加数据类型,sc 的数据类型为 SparkContext,进行传入;rdd 数据类型为 RDD[String]类型,

图片1.png

 

进行引入;serverCount 数据类型为 collection.Map[String,Int],进行引入;redis 数据类型为 JedisCluster,

图片1.png

进行引入。即

//实现系统监控功能

def streamMonitor(sc:SparkContext,rdd:RDD[String],serverCount:collection.Map[String,Int],redis:JedisCluster): Unit={

}

以上就搭建好了集群监控的代码,主要实现系统监控功能。按照架构需要以下思路:

思路与关键代码:

1开启任务监控功能(SparkConf)

在代码DataProcessLauncher.scala中已经开启了任务监控:创建 Spark conf 已经开启了集群监控功能

2、通过http://localhost:4040/metrics/json/ url获取 json 数据

3、获取“gauges"对应的数据

4、拼接结束时间的 path

5、拼接开始时间的 path

6、获取出开始时间和结束时间

两个 key 拼接完成后获取到两个value值

7、求出时间差(结束时间-开始时间)

8、计算任务运行的速度(数据总量/时间)

再添加步骤计算总的数据量

9、封装展现需要的数据

10、将数据写入 redis

以上就是实现思路。

首先任务开启后,去程序中根据 url 把 json 读出来,再去获取 gauges 对应的数据,拼接开始时间和结束时间的path,去获取出开始时间和结束时间两个key拼接完把值获取出来,用结束时间减开始时间等于时间差,再去计算任务运行的速度(数据总量/时间),数据的总量用 RDD,添加步骤//计算总的数据量,再封装展现需要的数据,除了速度外还需要其他的数据,这些数据都需要封装到数据库里,最后把数据写入 redis 中,以上为实现思路。

相关文章
|
数据采集 SQL 运维
巧用指标平台DataIndex,五步法轻松实现指标管理
在业务发展初期,企业需要做好规范的指标管理,以保证随着业务的不断发展,数据化决策能够成为业务强有力的支撑。本文将为大家详解如何通过袋鼠云指标管理平台DataIndex 进行规范化的指标开发管理,轻松开发指标,避免各类指标问题。
874 0
|
6月前
|
机器学习/深度学习 搜索推荐 算法
推荐系统离线评估方法和评估指标,以及在推荐服务器内部实现A/B测试和解决A/B测试资源紧张的方法。还介绍了如何在TensorFlow中进行模型离线评估实践。
推荐系统离线评估方法和评估指标,以及在推荐服务器内部实现A/B测试和解决A/B测试资源紧张的方法。还介绍了如何在TensorFlow中进行模型离线评估实践。
421 0
|
28天前
|
JSON 算法 数据可视化
测试专项笔记(一): 通过算法能力接口返回的检测结果完成相关指标的计算(目标检测)
这篇文章是关于如何通过算法接口返回的目标检测结果来计算性能指标的笔记。它涵盖了任务描述、指标分析(包括TP、FP、FN、TN、精准率和召回率),接口处理,数据集处理,以及如何使用实用工具进行文件操作和数据可视化。文章还提供了一些Python代码示例,用于处理图像文件、转换数据格式以及计算目标检测的性能指标。
53 0
测试专项笔记(一): 通过算法能力接口返回的检测结果完成相关指标的计算(目标检测)
|
数据采集 数据可视化 数据挖掘
数据抽样技术全面概述
抽样是研究和数据收集中不可或缺的方法,能够从更大数据中获得有意义的见解并做出明智的决定的子集。
430 2
|
数据采集 存储 JSON
数据预处理-系统监控-监控代码下|学习笔记
快速学习数据预处理-系统监控-监控代码下
数据预处理-系统监控-监控代码下|学习笔记
|
数据采集 消息中间件 监控
数据预处理-链路统计实现思路|学习笔记
快速学习数据预处理-链路统计实现思路
数据预处理-链路统计实现思路|学习笔记
|
数据采集 监控 开发者
网站流量日志分析--数据预处理--点击流模型 pageviews 编程实现思路| 学习笔记
快速学习网站流量日志分析--数据预处理--点击流模型 pageviews 编程实现思路
网站流量日志分析--数据预处理--点击流模型 pageviews 编程实现思路| 学习笔记
|
SQL 数据采集 移动开发
网站流量日志分析--统计分析--漏斗模型转化分析实现|学习笔记
快速学习网站流量日志分析--统计分析--漏斗模型转化分析实现
230 0
网站流量日志分析--统计分析--漏斗模型转化分析实现|学习笔记
|
数据采集 消息中间件 分布式计算
数据预处理-链路统计-serverCoverCount计算|学习笔记
快速学习数据预处理-链路统计-serverCoverCount计算
数据预处理-链路统计-serverCoverCount计算|学习笔记
|
数据采集 JSON 缓存
数据预处理-系统监控_监控代码上|学习笔记
快速学习数据预处理-系统监控_监控代码上
下一篇
无影云桌面