开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建):数据预处理-链路统计-serverCoverCount计算】学习笔记与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/670/detail/11632
数据预处理-链路统计-serverCoverCount计算
内容介绍:
一、原始数据
二、数据实现
一、原始数据
实例将要打入 kafka 的数据
local message = time_ loca
l
.."#CS#".. request . .”#CS#".. request_ method . . " #CS# '
content type.."#CS#"..request_ body"#CS#". . http_ referer 。. ' #CS#". . remote_ addr'#CS#"
http- user agent . ."#CS#".. time_ iso8601 . ."#CS#". . server addr .."#CS#".. http- cookie. . "#CS#". activeUserNumber
二、数据实现
//消费数据
kafkavalue.foreachRDD(rdd=>{
//链路统计功能
Linkcount.linkcount()
由于 LinkCount 是红色,
需要进行创建:
Project->src->main->scala->dataprocess->businessprocess->new->Scala Class
Name:LinkCount Kind:Object
需要进行引入,引入在这个方法里面没有,所以需要创建一下,这个方法里面要做链路统计功能需要有数据,数据就是 rdd,把 rdd 传进去,让 idea 主动创建Linkcount.
根据前面的流程需求,需要统计的是服务器的名称,服务器的名称直接截过来就可以,需要统计当前活跃连接数,服务器的 IP 以及它的数值,所以一个是当前活跃连接数,还有就是服务器IP出现的次数,即服务器的 count。
先把服务器的IP拿出来,再把衣拿出来,然后 reducebykey,再加入 redis 的时候,加入时间段。
具体流程:
//实现链路统计功能
object LinkCount {
//链路统计代码
def linkCount(rdd: RDD[String]): Unit = {
//每个服务器本批次数据访问了多少次
//1.遍历 rdd 获取到每条数据
val serverCount=rdd . map( message=>{
注意:
此处用 map 还是 foreach 取决于要不要返回结果,map需要返回结果,foreach 是直接去操作,不需要返回结果
//2.抽取出服务器的 IP
var ip=""
if (message.split( regex= "#CS#", limit= -1).1ength>9){
ip=message.split( regex= "#CS#", limit= -1)(9)
在原始数据中,服务器ip(server_addr)是第十个,当前活跃连接数(activeUserNumber) 是第十二个,第十个数据角标是9,第十二个数据角标是11。
//3.将ip 和1返回
(ip,1)
}).reduceByKey(_ .+_ )
//4.调用 reducebykey 计算出ip 和总数
//5.当前活跃连接数的计算
/6.将it算出来的两个结果写 Aredis