开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第五阶段:爬虫识别-关键页面数据读取】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/673/detail/11691
爬虫识别-关键页面数据读取
目录:
一、指标计算
二、需求
三、设计
四、代码
一、指标计算
单位时间内的 IP 段访问量(前两位)
单位时间内访问关键页面的 Cookie 数
单位时间内的访问总量
单位时间内查询不同行程的次数
单位时间内的关键页面访问总量
单位时间内的关键页面最短访问间隔
单位时间内的 UA 种类数统计
单位时间内小于最短访问间隔(自设)的关键页面查询次数
八个指标一部分需要用到关键页面,数据的关键页面在数据库已经配好。
数据封装为 processedData,效果没有问题。
二、需求
在之前 dataprocess 中,加载和很多的数据库规则,在 rulecomput
e 时,也需要加载数据库规则,供计算指标使用,提前加载
规则 1:关键页面;爬虫识别流程里计算总量时要知道谁是关键页面,关键页面在数据库配置。后期分析关键页面的访问量,加载进来,并广播
规则 2:流程规则;后期需要根据流程规则对数据打分。假如八个指标计算完成进行指标碰撞,八个指标是数据计算得出,怎样知道根据数据判断是否为爬虫需要参考期,企业配置的8个阈值
八个阈值:
5分钟内的 IP 访问量
5分钟内的关键页面访问总量
5分钟内的 UA 出现次数统计
5分钟内的关键页面最短访问间隔
5分钟内小于最短访问间隔()的关键页面查询次数
5分钟内关键页面的访问次数的 Cookie 数
5分钟内查询不同行程的次数
5分钟内的 IP 段访问总量
八个阈值在企业端数据库配置,八个指标算出与企业配置的八个阈值对比,从数据库里将八个指标读到程序,现在还在数据库里。
第二个流程规则是八个阈值数据,从数据库读到爬虫识别程序。关键页面、流程规则要读到数据库。
三、设计
1、在 sc 初始化后,kafka 数据加载前初始化 mysgl 数据,并广播。读取数据后加载到广播变量
2、在加载 kafka 数据后,需要根据数据库标识,标识是否需要更新决定什么时候需要更新广播变量
3、关键页面规则表 itcast_query_critical_pages
找到关键页面规则表双击打开
前面为 id 后面为关键页面,后面字段都是关键页面,设计表无说明。
判断是否为关键页面将后面加载,用数据的 URL 和三个循环对比、匹配,匹配任意一个都表示是关键页面。
4、流程规则策略配置
a) itcast_process_info,
b) itcast_strategy
c)itcast_rule,
d)itcast_rules_maintenance_table
四、代码:关键页面(itcast_query_critical_pages)
.广播
//获取查询关键页面
var queryCriticalPages=AnalyzeRuleDB.queryCriticalPages()
@volatile var broadcastQueryCriticalPages=sc.broadcast(queryCrit
icalPages
数据库值读到程序要先加载,AnalyzeRuleDB 里有关键的查询页面
queryCriticalPages()
根据经验在初始化阶段读取数据
setupRuleComputeSsc 初始化
Ssc=new StreamingContext(sc,Seconds(2))实例 StreamingContext
引入进 AnalyzeRuleDB.queryCriticalPages(),没有此方法,创建方法。
def queryCriticalPages():Nothing=??? 剪切到最后
object AnalyzeRuleDB{ 预处理的 AnalyzeRuleDB,爬虫识别与预处理使用同一个 object
//读取关键页面到程序
def queryCriticalPages():Unit={
读取关键页面需要 sql 语句,复制粘贴
//读取黑名单的 sql
val sql="select ip name from itcast_ip_blacklist'
//接受数据的字段
val field="ip_namel
//调用 QueryDB 读取数据
val blackIpList=QueryDB.queryData(sql,field)
//返回过滤数据
blackIpList
黑名单改为关键页面,sql 改为 criticalPageMatchExpression,表名改为 itcast_query_critical_pages,接收为 criticalPageMatchExpres
sion,blackIpList 改为 criticalPageList 返回过滤数据为 criticalPag
eList
def queryCriticalPages():ArrayBuffer[String]={
//读取关键页面的 sql
val sql="select criticalPageMatchExpression from itcast_query_criti
cal_pages“
//接受数据的字段
val field="criticalPageMatchExpression
//调用 QueryDB 读取数据
val criticalPageList=QueryDB.queryData(sql,field)
//返回过滤数据
criticalPageList
返回程序接收
//读取关键页面到程序
val criticalPageList=AnalyzeRuleDB.queryCriticalPages()
//将数据添加到广播变量
数据拿来广播,前面做接收 val broadcastcriticalPageList,前面加修饰不稳定 @volatile
@volatileval broadcastcriticalPageList=sc.broadcast(criticalPageLi
st)
判断是否需要更新,初始化阶段读取程序,两秒钟迭代一次。
用 kafkaValues 调用 foreachRDD,在 RDD 做大括号,每两秒钟循环一次,放入是否需要更新
kafkaValues.foreachRDD(rdd=>{
val needUpdateQueryCriticalPages=jedis.get("NeedUpDateQueryC
riticalPages"
if(!needUpdateQueryCriticalPages.isEmpty()&&needUpdateQueryCriticalPages.toBoolean){
queryCriticalPages=AnalyzeRuleDB.queryCriticalPages() broadcastQueryCriticalPages.unpersist()
broadcastQueryCriticalPages=sc.broadcast(queryCriticalPages) jedis.set("NeedUp DateQueryCriticalPages", "false")
没有 jedis 创建 jedis,爬虫预处理阶段有,复制粘贴
//实例 redis 为后续使用
val redis =JedisConnectionUtil.getJediscluster
初始化时创建,引入
将 redis 改为 jedis,queryCriticalPages 改为 criticalPageList,val 改为 var 可变类型,broadcastQueryCriticalPages 改为 broadcast
CriticalPagesList
//读取关键页面到程序
var criticalPagesList=AnalyzeRuleDBqueryCriticalPages()
//将数据添加到广播变量
@volatilevar broadcastCriticalPagesList=scbroadcast(criticalPages
List)
kafkaValues.foreachrDd(rdd=>{/
/关键页面是否改变标识
val needUpdateQueryCriticalPages=jedis.get("NeedUpDateQueryC
riticalPages")
if(!needUpdateQueryCriticalPages.isEmpty()&&needUpdateQueryCriticalPages.toBoolean){
criticalPagesList =AnalyzeRuleDB.queryCriticalPages()
broadcastCriticalPagesList.unpersist()
broadcastCriticalPagesList =sc.broadcast(queryCriticalPages)
jedis.set("NeedUpDateQueryCriticalPages","false")
}
逻辑
kafkaValues RDD 两秒钟循环,将数据添加到广播变量后判断数据是否需要更新,redis 里添加 key two or false。
查询关键页面是否需要更新,如果不等于空!查询转换为 toBoolean 表示需要更新,重新读取数据库规则、关键页面,广播变量进行清空,重新加载数据到广播变量里,将值改为 false。
关键页面数据读取成功。