开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建):数据预处理-链路统计-效果与总结】学习笔记与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/670/detail/11635
数据预处理-链路统计-效果与总结
内容介绍:
一、设想运行结果
二、数据运行效果
三、总结
一、设想运行结果
Key 是一个前缀加时间戳,
前缀是 cluster.key.monitor.
linkprocess=csanti_Monitor_LP,
后面再加一个当前系统的时间戳,
即 system.current.timeMillis,时间看不到,Value 就是以 serversCountMap 为一个值和以activeNumMap 为key一个值,就是说 redis 里面会有一个 KEY 是前缀加时间戳,而里面的值,按照 serversCountMap 和 Activenum Map这个格式来写进数据,这是预想的结果。
二、数据运行效果
LinkCount 做了一个引用,直接执行 DataProcessLauncher,把程序跑起来,这时发现redis里面还是没有数据,因为没有数据流进来,这时候把爬虫跑起来,跑一个单程就足够,右键进行执行,但是在 dataprocessLauncher 里面没有看到数据写入,是因为没有做输出。此时看一下数据有没有写入redis里面。
只要把链路统计的功能写入redis里面,然后跑起来不报错,把爬虫也运行起来,这时候就可以看到redis里面有数据流入进来,都是lp加了一个时间戳,每一个文件里面都有一部分数据,数据都是按照 server count map 和 activeNumMap 格式发过来的数据,
如下所示
{
" serverscountMap":
{
“192.168.18.16e"
10
(当前被访问了10次)
activeNumMap":
(
“192.168.180.16e" : "21"
(当前活跃连接数)
}
}
在redis里面看到了以上数据,马上去看一下爬虫监控平台数据管理的数据采集,进行刷新。
发现刷新没有数据,是因为在前面 resource 里面的 conflig.properties 配置文件里面只配置了 mysql 的路径
jdbc_ur1=jdbc:mysq1://192.168.100.160:3306Vgciantispider?useUnicode=true&characterEncokjdbc_username=root
jdbc_password=123456
并没有对 redis 进行配置
#rediscluster 实例地址
#servers = 192.1$8.30.56:6379,192.168.30.56:6380,192.168.30.56:8379,192.168.30.57:6379
#servers = 10.108.151.91:7380,10.108.151.91:8380
#192.168.2.134
servers = 192.168.2.141;7001,192.168.2.141:7002,192.168.2.141:7003
(目前的集群,这是不对的,redis里面是160.7001)
#servers = 192.168.2.128:7001,192.168.p.128:7002,192.168.2.128:7000
所以需要修改IP为
servers = 192.168.100.160:7001,192.168.2.141:7002,192.168.2.141:7003
(把集群换成100.160)
反爬虫监控平台的 web 界面是在 Resources 里面的 Config.properties 进行读操作。
上面是配置 mysal,下面就可以配置 redis 的实例地址,配置好集群即可。配置完成以后重启服务,这时候,在 web 界面进行刷新,就会出现相应的数据,会出现部署服务器和当前活跃连接数以及最近三天采集数据量。
每一次刷新数据,都会进行更新,这就是链路统计的实际效果。这个做完之后就实现了从数据一步一步进行计算,从 reducebykey 到 server_ip n,到时间戳,时间戳打到了 redis 里面,但是数据里面并没有时间戳,当前活跃连接数也打进了redis,只要写到了 redis 里面,前端界面的效果自动就会出现,剩下的操作都是由前端工程师进行操作的。
三、总结
*目标:企业需要实时了解每个链路的运行情况、数据采集量、活跃链接数等信息。需在数据在采集时实时展现相关的信息在前台界面(管理界面)*
1、统计每个批次每台服务器访问的总量
获取到—条数据,使用#CS#""对数据进行切割,获取切分后的第十个数据(角标是9),将第十个数据和1,进行输出。
调用reducebykey (下划线+下划线)求去除这个批次每台服务器的放总量
//1遍历rdd获取到每条数据
val serverCount=rdd.map(message=>{
//2抽取出服务器的IP
var ip=""
if (message.split( regex = "#CS#" , limit =-1).length>9){
ip=message.split( regex = "#CS#" ,limit = -1)(9)
}
//3 将ip和1返回
(ip,1)
})//4调用 reducebykey 计算出 ip 和总数
2.统计每个批次每台服务器当前活跃链接数的量
获取到—条数据,使用“#CS#"对数据进行切割,
获取切分后的第十二个数据(角标是11)和第十个数据(角标是9) ,将第十个数据和第十二个数据,进行输出
调用 reducebykey ((k,v)=>v) 求去每个服务器多个数据中的最后一个数据
//1获取到一条数据,使用“#CS#”对数据进行切割
val activeusercount= rdd .map(message=>{
var ip="""
var activeUserCount=""
//切分数据
if (message.split(""#CS#"").length>11){
//2获取切分后的第十二个数据(角标是11)和第十个数据(角标是9)
//截取当前活跃连接数
activeusercount=message.split(""#CS#"")(11)
//截取IP
ip=message.sp1it("#CS#"")(9)}
//3将第十个数据和第十二个数据,进行输出 (ip ,activeusercount)
}).reduceByKey((k, v)=>v)
//4调用reducebykey ((k,v)=>v)
求去每个服务器多个数据中的最后一个
3.将两批数据写入 redis
在两个数据不为空的前提下,将两个数据转换成两个小的 map
if ( ! servercount.isEmpty)&& ! activeUsercount.isEmpty()){
//将两个数据转换成两个小的 map
val serverCountMap=servercount.co1lectAsMap()
val activeUserCountMap=activeUserCount.collectAsMap()
封装最终要写入redis的数据(将两个小的 MAP 封装成一个大的 MAP)
val Maps=Map(
"serverscountMap" ->serverCountMap ,"activeNumMap"->activeUsercountMap)
在配置文件中读取出数据key的前缀,+时间戳(redis 中对数据的 key)
va1
key=PropertiesUtil.getstringByKey("cluster .key.monitor .linkProcess" , "jedisConfig.properties ")+System.currentTimeMi1lis().tostring
在配置文件中读取出数据的有效存储时间
val
time=Propertiesutil.getStringByKey("cluster . exptime.monitor" , "jedisconfig . properties").
toInt
将数据写入 redis
redis.setex(key ,time , 3son(DefaultFormats). write(Maps))