开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第四阶段:数据预处理-系统监控-创建计算方法及监控实现思路】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/672/detail/11674
数据预处理-系统监控-创建计算方法及监控实现思路
内容介绍:
一、需求
二、设计
三、代码实现
上节讲解了如何获取 json 中的 key 结束时间和开始时间:先获取整个 json,再获取以 gauges 为节点的所有值,再以key获取结束时间和开始时间。该key需要用到任务唯一id+driver+任务名称+开始/结束时间,后获取 value 值。
一、需求
数据中存储了很多的任务指标,其中有批处理的开始时间、批处理的结束时间,再加上计算出批处理的数据量,就可以得知批处理的平均计算速度(速度=数据量/时间),从而对任务的速度进行监控。
当然,除了时间,还有很多其他的指标可供统计,请自行使用。
二、设计
1、解析监控 json 数据
2、获取 gauges 节点下的数据
3、拿到批处理的开始时间和结束时间,计算批处理花费的时间
4、计算 rdd 的 count
5、计算批处理的平均计算速度
6、将指标 appid.appname、endtime、rddcount、花费时间、速度、serversCountMap 封装到 map 中
7、将 map 存储到 Redis,(预处理的监控功能完成,写入后首页中的图自动显示数据)key 设计为包含时间戳的
三、代码实现
在代码DataProcessLauncher.scala
中,首先创建工程SparkStreamingMonitor.streamMonitor
,输入
SparkStreamingMonitor.streamMonitor()
会标红,是因为此时还不存在,在工程 dataprocess下的 businessprocess 下右键新建一个 Scala Class,Name为SparkStreamingMonitor.streamMonitor,Kind为Object
创建出的该代码主要用于实现系统监控的过程。
创建完成后回到代码DataProcessLauncher.scala
中,方法已经引入,来介绍需要哪些参数:解析数据需要拿到所有数据,拿到所有数据需要 url。再解析结束时间和开始时间,解析需要拼接 key 和任务 id、名称,而任务id与名称存在 SparkContext 中。在数据预处理的程序代码中存在 SparkContext:
//数据预处理的程序
def setupSsc(sc: sparkContext,kafkaParams: Map[String,String], topics: Set[String]): StreamingContext
//程序初始化阶段
//实例redis为后续使用
val redis =JedisconnectionUtil.getJedisCluster
//3 、创建Streaming context
val ssc=new StreamingContext(sc,Seconds(2))
在监控过程中任务的id与名称都设置在任务的 Spark 中,所以首先需要传入 SparkContext,因为后续计算需要用到任务名称和任务 id,所以直接调用 SparkContext 获取到。sc存在后就可以使用拼接获取到key值,第一个参数完成。获取到开始时间和结束时间可以计算出时间差,拥有时间差计算出速度还需要数据总量。
现在这个监控功能在数据循环中:
//5、消费数据
KafkaValue.foreachRDD(rdd=>{//迭代运行(每2秒运行一次)})
在 rdd 中,即此处获取到的数据会有多个。此处是一个rdd。上述代码:
//4、读取 kafka 内的数据 ssc, kafkaParams , topics)
val kafkaData=KafkaUtils.createDirectStream[ String,String,StringDecoder ,StringDecoder](ssc ,kafka
//真正的数据
val KafkaValue=kafkaData.map(_._2)
读取原始数据,过滤出第二个数据,然后遍历此处的 rdd,为最原始的 rdd。后续使用rdd进行一个操作:
//2数据清洗功能
//定义方法,参数为一条数据和广播变量
val filteredData=rdd.filter(message=>URLFilter.filterURL(message,broadcastFilterRuleList.value))
//filteredData.foreach(println)
过滤清洗掉其他数据后又返回一个 rdd,rdd.filter 为没有经过过滤的,原始数据,其中还存在无效数据。filteredData 为过滤后的有用的数据。需要以没有经过处理的rdd为准。
系统监控监控的为从链路统计后所有链路都经过的监控,只要预处理的板块包括的,都是预处理完成的工作,所以计算效率时都应该包括。数据清洗也是工作,也需要包括。所以我们要获取的是总的数据量。
所以需要第二个参数 rdd,通过 rdd 拿到数据的总数,通过sc求取总时间差,进而求出速度。
在数据进行采集过程中,实时流量转发可以计算出,当前系统运营状态为前端代码,还存在各链路流量转发情况。之前做过链路统计功能:
//1链路统计功能
LinkCount.LinkCount(rdd)
链路采集功能统计了第一个结果值,第二个结果值,然后把它写到数据库里。在我们当前监控过程当中,其中需要各链路流量的转发情况,即每一链路被访问了多少次需要进行统计。链路统计中已经计算出每个服务器在一个批次里面被访问了多少次。现在要使用该数据可以直接使用,但是链路统计没有返回值。
//每个服务器本批次数据访问了多少次
//1 遍历 rdd 获取到每条数据
val serverCount=rdd.map(message=>{
现在将该数据进行返回即可。获取计算出来的每一个服务器当前批次被访问的总数放到最后。输入
serverCount.collectAsMap()
转化为 map 返回。返回时不能为unit数据类型。所以将代码
def linkCount(rdd: RDD[String]): Unit={
修改为:
def linkcount(rdd: RDD[String]): collection.Map[String,Int] = {
返回就需要有接收,将链路统计功能的代码:
LinkCount.linkcount(rdd)
修改为
val serverCount=LinkCount.linkcount(rdd)
就完成了将每个服务器本批次数据访问了多少次返回。
目前 serverCount 获取了总数,而数据库中需要展示数据。所以也需要放入到监控的方法中作为第三个参数,即
SparkStreamingMonitor.streamMonitor(sc,rdd, serverCount,)
计算好的流程需要放入 redis 中,所以还需要 redis 的实例。在数据预处理的程序下 redis 进行实例:
//程序初始化阶段
//实例 redis 为后续使用
val redis=JedisConnectionUtil.getJedisCluster
将该 redis 也传入方法中:
SparkStreamingMonitor.streamMonitor(sc,rdd, serverCount,redis)
用该四个参数就可以实现数据的监控功能。最终的 redis 为写入数据库做准备。
写入参数后再来创建方法,打开代码
SparkStreamingMonitor.scala,
进入后在object SparkStreamingMonitor{中输入
def streamMonitor(sc,rdd,serverCount,redis): Unit={
}
继续添加数据类型,sc 的数据类型为 SparkContext,进行传入;rdd 数据类型为 RDD[String]类型,
进行引入;serverCount 数据类型为 collection.Map[String,Int],进行引入;redis 数据类型为 JedisCluster,
进行引入。即
//实现系统监控功能
def streamMonitor(sc:SparkContext,rdd:RDD[String],serverCount:collection.Map[String,Int],redis:JedisCluster): Unit={
}
以上就搭建好了集群监控的代码,主要实现系统监控功能。按照架构需要以下思路:
思路与关键代码:
1开启任务监控功能(SparkConf)
在代码DataProcessLauncher.scala
中已经开启了任务监控:创建 Spark conf 已经开启了集群监控功能
2、通过http://localhost:4040/metrics/json/ url
获取 json 数据
3、获取“gauges"对应的数据
4、拼接结束时间的 path
5、拼接开始时间的 path
6、获取出开始时间和结束时间
两个 key 拼接完成后获取到两个value值
7、求出时间差(结束时间-开始时间)
8、计算任务运行的速度(数据总量/时间)
再添加步骤计算总的数据量
9、封装展现需要的数据
10、将数据写入 redis
以上就是实现思路。
首先任务开启后,去程序中根据 url 把 json 读出来,再去获取 gauges 对应的数据,拼接开始时间和结束时间的path,去获取出开始时间和结束时间两个key拼接完把值获取出来,用结束时间减开始时间等于时间差,再去计算任务运行的速度(数据总量/时间),数据的总量用 RDD,添加步骤//计算总的数据量,再封装展现需要的数据,除了速度外还需要其他的数据,这些数据都需要封装到数据库里,最后把数据写入 redis 中,以上为实现思路。