开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建):数据预处理-链路统计实现思路】学习笔记与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/670/detail/11631
数据预处理-链路统计实现思路
内容介绍:
一、数据预处理流程
二、链路统计
一、数据预处理流程
将数据预处理分成前中后三个阶段,在数据预处理前我们需要将采集到的数据打入到 kafka 中进行读取数据,接下来要做的是数据的链路统计功能。
预处理中期要做的是数据清洗、数据脱敏、数据拆分、数据分类、数据解析、历史爬虫判断、数据结构化。
之后进入到数据预处理后期,把已经结构化的爬虫数据推送到kafka 里,再推送回去。
推送后再进行一个系统监控,监控的是从链路统计一直到数据推送的过程中预处理的指标和链路的数量等等。
二、链路统计
目标:
企业需要实时了解每个链路的运行情况、数据采集量、活跃链接数等信息。需在数据在采集时实时展现相关的信息在前台界面(管理界面)
1、需求
统计lua 链路数据
实际效果:
在数据管理模块中有一个数据采集,数据采集模块中部署服务器、当前活跃连接数、最近三天采集数据量,就是现在要实践的列入统计功能。
采集多少台服务器在响应,每台服务器此时此刻的当前活跃连接数以及最近三天的数据量。计算这些指标,即上图结果。
2、设计
(1)、部署服务器:
server_ip,可以从 kafka 的数据中获取
(2)、当前服务器的活跃连接数:
需要增加 lua 采集指标,对接到kafka 中
首先写入原始数据按照该格式体现到kafka 里
然后目标效果是
要从原始数据生成目标结果,
从上到下的顺序:
先看第一个点,将服务器的ip导入,IP:server_addr,将ip结果直接截取出来放在服务器中
当前活跃用户连接数已经采集,就是
activeUserNumber=ngx.var.connections_active,
之前没有这个数据,所以要在“实例将要打入kafka 的数据”里面追加前面的activeUserNumber=ngx.var.connections_active
这样原始数据就被替换成了这样。
——实例将要打入 kafka 的数据
local messa
g
e = t
i
me-loca
l ..
”
#CS#
”
..
request_method
..
“#CS#”
..
content_type
..
”
#CS#
”
.. request_body ..
”
#CS#
”
..
http_referer
..
”
#CS#
”
..
remote-addr
..
”#CS#”
..
http_user_agent
..
”
#CS#
”
..time_iso8601 ..
”
#CS#
”
.. server_addr ..
“
#CS#
”
.. http_cookie ..
“
#CS#
”
..activeUserNumber
这个不需要计算,拆分后,拿到这个结果,直接就是当前活跃连接数
在此并不需要计算,采集出来的数就是最终结果。
①、被访问次数的算法:
先拿到 server_ip ,一条数据一个1,两条数据两个1......n条数据n个1。用前面用过的 reducebykey,把数据放在一起处理完就是server_ip n。
在数据当中加入一个时间戳,时间戳有后就可以让前端工程师进行计算。将算出结果写到 redis 里面,还有另外一个模块mysql,将redis 处理后的数据导入到 mysql。Redis 数据就会到当前活跃连接数里面,而 mysql 数据会进入到最近三天采集数据量和部署服务器中
在 streaming 程序中获取数据,数据采集到Redis中,写到数据库。
(3)、最近三天采集数据量
昨天:每天的采集数据在 stream 中统计,存储到 Redis 中(过期时间24小时),web 端定时任务每天24点统计数据到数据库
前天:24点定时任务把昨天的数据更新到前天,重新计算昨天的数据
前三天:累加上面两个数据和当天Redis 中的数据