开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第五阶段:爬虫识别-流程规则读取】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/673/detail/11692
爬虫识别-流程规则读取
目录:
一、流程规则
二、四张表数据读取到程序
三、四张表的关联
一、流程规则
已经将关键页面数据读取到程序,判断是否更新
1. 目标:
通过 SQL 获取数据库内的流程数据到爬虫识别程序中为识别爬虫准备必要的数据
2.四张表
流程中会计算八个指标结果,企业针对计算的八个结果有八个阈值,将计算结果与八个阈值对比。读取企业八个阈值,还有其它数据。
数据库表
a)itcast_process_info,
b)itcast_strategy
c)itcast_rule,
d)itcast_rules_maintenance_table
刷新界面,重新登录到流程,可能针对国企单独做一个流程,针对元旦单独做一个流程,针对春节单独做一个流程,可做多个流程。流程可添加、删除、查询、修改。
有四张表,流程名称、模型名称、创建时间、创建人、操作是独立的表。
流程退出
选中流程,点击笔有八个规则也是独立的表
阈值也是独立的表
八个规则,每一个规则都有独立的名字,名字也是独立的表。阈值、规则、流程名称、八个规则一共四张表。四张表如下
a)itcast_process_info,
b)itcast_strategy
c)itcast_rule,
d)itcast_rules_maintenance_table
数据库找 itcast_process_info 为流程独立表,流程时间、名称、人
itcast_strategy 为流程当中的阈值
输入120
刷新界面变为
数据库找到 itcast_rule 双击打开,表为8个规则
找到 itcast_rules_maintenance_table 双击打开,为八个指标的名称,0-7为8,八个指标分别对应的名字。
流程点击进去有八个指标,下一步为最终阈值和八个指标名称。最终阈值、八个指标、八个指标名称都所属于流程。流程包含了八个规则、八个规则名字、阈值。保存退出
二、四张表数据读取到程序
//获取流程规则策略配置
var flowList=AnalyzeRuleDB.createFlow(0)
@yolatile var broad cast FlowList=sc.broadcast(flowList)
复制,第一次读取数据在程序初始化阶段。没有 createFlow 方法,
复制粘贴
**
* 获取流程列表
*参数n为0为反爬虫流程
*参数n为1为防占座流程
*@returnArrayBuffer[FlowCollocation]
*/
def createFlow(n:Int):ArrayBuffer[FlowCollocation]={
vararray=newArrayBuffer[FlowCollocation]
var sql:String=""
if(n 0){ sql="select
Itcast_process_info.id,itcast_process_info.process_name itcast_str
ategy.crawler blacklist_thresh
olds from itcast.process_info,itcast_strategy where itcast_process_
_info.id=itcast_strategyid anc status=0"}
"select
else if(n == 1){sql ="select
itcast_process_info.id,itcast_process_info.process_name,itcast_strategyocc_blacklist_thresholds from itcast_process_info,itcast_strat
egy where itcast_process_info.id=itcast_strategy.id and status=1"}
var conn:Connection=null
var ps: PreparedStatement=null
varrs:ResultSet = null
try{
conn=c3pOUtil.getConnection
ps=conn.prepareStatement(sql)
rs=ps.executeQuery()
while(rs.next()){
val flowld = rs.getString("id")
val flowName = rs.getString("process_name")
if(n == 0){
val flowLimitScore=rs.getDouble("crawler_blacklist_thresholds")
array += new FlowCollocation(flowld, flowName,createRuleList(flo
wld,n) flowLimitScore,flowld)
}else if(n == 1){
val flowLimitScore=rs.getDouble("occ blacklist thresholds"
array += Tnew FlowCollocation(flowld,flowName,createRuleList(flo
wld,n) flowLimitScore, flowld)
}else if(n == 1){
val flowLimitScore=rs.getDouble("occ blacklist thresholds")
array +=new FlowCollocation(flowld,flowNamecreateRuleList(flowl
d,n)flowLimitScore,flowld)
}
}
}catch{
casee: Exception => e.printStackTrace()
}finally{
c3pOUtil.close(conn,ps, rs)
}
array
}
报错,createRuleList 也是独立的方法但没有此方法,创建方法。
复制粘贴
*获取规则列表
*@param process_id 根据该 ID 查询规则
* @returnlist列表
*/
defcreateRuleList(process id:String,n:Int):List[RuleCollocation]={
varlist=newListBuffer[RuleCollocation]
val sql = "select * from(select
itcast_rule.id,itcast_rule.process_id,itcast_rules_maintenance_table.rule_real_nameitcast_rule.rule_type,itcast_rule.crawler_type,"+
"itcast_rule.status,itcast_rule.argO,itcast_rulearg1,itcast_rule.score from itcast_rule,itcast_rules_maintenance_table where itcast_rule
s maintenance table."+
ule_type,itcast_rule.crawler_type,"+
"itcast_rule.status,itcast_rule.argO,itcast_rule.arg1,itcast_rule.score from
Itcast_rule,itcast_rules_maintenance_table where itcast_rules_mai
ntenance_table."+
"rule_name=itcast_rule.rule_name) as tab where process id =""+pr
ocess_id + "and crawler_type="+n
//andstatus="+n
varconn:Connection=null
var ps: PreparedStatement=null
var rs:ResultSet=null
try{
conn=c3pOUtil.getConnection
ps=conn.prepareStatement(sql)
rs=ps.executeQuery()
while( rs.next()){
val ruleld=rs.getString("id")
val flowld = rs.getString("process id")
val rule_real_name = rs.getString("rule_real_name")
val ruleType=rs.getString("rule_type")
val ruleStatus =rs.getlnt("status")
val ruleCrawlerType=rs.getlnt("crawler_type")
val ruleValueO=rs.getDouble("arg0")
val riuleValue1= rs.getDouble("arg1")
val ruleScore= rs.getlnt("score")
val ruleCollocation
RuleCollocation(ruleld,flowld,rule_real_name,ruleTyperuleStatus,ruleCrawlerType,ruleValueO,ruleValue1,ruleScore)
list+=ruleCollocation
}
}catch {
case e :rException =>e.printStackTrace()
}finally{
c3pOUtil.close(conn, ps, rs)
}
list.toList
}
不再报错
读取流程数据,流程有四张表,后续爬虫判断用数据,将数据读取到程序。
读取流程规则的方法 createFlow(0),查看0。n 为0,0代表反爬虫,1代表防占座。做反爬虫传入0
def createFlow(n:Int):ArrayBuffer[Flowcollocation] = {
var array=new ArrayBuffer[FlowCollocation]
当为0时的 sql 语句
var sql:string = ""
if(n == @){ sql = "select itcast_process_info.id,itcast_process_infe o.process_name,itcast_strategy.crawler_blacklist_threelse if(n == 1){sql = "select itcast_process_info.id,itcast_process_info.process_ name,itcast_strategy.occ_blacklist_thre
四张表,界面只有一个流程,流程里包含了四张表,四张表有关系。猜测流程里有流程 id,流程 id 与几个表关联。
三、四张表的关联
var sql:string="
if(n == 0){ sql = "select itcast_process_info_id,itcast_process_info.p
rocess_name," +
"itcast_strategy.crawler_blacklist_thresholds from itcast_process_i
nfo,itcast_strategy"+"
where itcast_process_info.id=itcast_strategy.id and status=0"}
itcast_process_info_ 表示流程表,将select id 插入,process_name读取名字
strategy 里crawler_blacklist_thresholds 值为最终阈值 from 两个表 itcast_process_info,itcast_strategy
条件为 process .id=strategy id,流程末尾为 4f33 阈值也为 4f33。两张表关联起来有一个关系 process .id=strategy id
流程 id 以及最终阈值读取出,流程名称、流程 id、流程最后阈值已经读取。
读取三个字段,一个字段时可调用 QueryDB 里的 queryData 读取,三个字段不可以。写 Connection、PreparedStatement、ResultSet 读取数据。
将 id、process_name、crawler_blacklist_thresholds 读取出
var conn:Connection=null
var ps: PreparedStatement=null
var rs:ResultSet=null
val flowld = rs.getString("id")
val flowName = rs.getString("process_name")
if(n == 0){
val flowLimitScore=rs.getDouble("crawler_blacklist_thresholds")
n=0 为反爬虫,走以下内容
val flowLimitScore=rs.getDouble("crawler_blacklist_thresholds")
array += new FlowCollocation(flowld, flowName,createRuleList(flo
wld,n) flowLimitScore,flowld)
有 flowld id、flowName、最终阈值,没有 FlowCollocation 此方法传入流程 id 两次、阈值、flowName。createRuleList 传入 flowld id、n=0 进入 createRuleList
def createRuleList(process_id:string,n:Int):List[RuleCollocation] ={
var list =new ListBuffer[RuleCollocation
val sql ="select * from(select_itcast_rule.id,itcast_rule.process_id,it
cast_rules_maintenance_table.rule_real_name,"
"itcast_rule.rule_type,itcast_rule.crawler_type,itcast_rule.status,itc
ast_rule.arg0,itcast_rule.arg1," +"
Itcast_rule.score from itcast_ rule,itcast_rules_maintenance table where itcast_rules_maintenance_table."+"
rule_name=itcast_rule.rule_name) as tab where process_id = '"+pr ocess_id + "'and crawler_type="+n
* from 括号的开始结束,括号里 itcast_rule.id 为八个规则,配置了很多规则 process_id 有4f33
读取了 id、process_id、rule_real_name 字段、rule_type、crawler_ty
pe、status、arg0、arg1、score
from itcast_ rule,itcast_rules_maintenance table 为以下两表
条件为
where itcast_rules_maintenance_table.rule_name=itcast_r
ule.rule_name itcast_rule 里 rule_name 为0-7,两张表 rule_nam
e=0名字对应。
括号里 sql 语句读取 itcast_rule.id 里所有字段、itcast_rules_ma
intenance table 里 rule_real_name。select *条件为 process_id = '"+process_id + "'and crawler_type="+n,process_id 由前面传来的流程 id 只要 process_id 状态为=n=0,数据库中 crawler_type=0
itcast_rule 右键设计表,crawler_type 字段0为反爬虫,0数据插入为反爬虫
四张表的关系找到并且查出,封装最后进行返回。数据查出封装到 array
array += new FlowCollocation(flowld, flowName,createRuleList(flo
wLimitScore,flowld)
最后返回到 var flowList=AnalyzeRuleDB.createFlow(0)
放到广播变量
@volatile var broadcastFlowList=scbroadcast(flowli
st)
判断是否需要更新,找到更新代码复制粘贴
//流程规则策略变更标识
val needUpDateflowList=jedis.get("NeedUpDateflowList")
//Mysql-流程规则策略是否改变标识
if(!needUpDateflowList.isEmpty()&&needUpDateflowList.toBoolean){
flowList=AnalyzeRuleDB.createFlow(0) broadcastFlowList.unpersist()
broadcastFlowList=sc.broadcast(flowList) jedis.set("NeedUpDateflowList", "false")
判断四张表数据是否需要更新,如果需要更新执行更新。