开发者学堂课程【大数据实战项目 - 反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第六阶段:爬虫识别-爬虫写入 Redis】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/674/detail/11728
爬虫识别-爬虫写入 Redis
实现流程
在爬虫数据去重操作完成之后,两条数据最终生成一个。接下来需要做的就是数据入库,也是在流程中的最后一步。
数据入库也就是写入到Redis中,从总体流程来说是第六步:
5-1是读取数据,5-2进行处理,处理完之后,计算8个指标以及识别爬虫和去重以及非爬虫去除,去除之后将数据写入到Redis中,也就是第六步。数据入库的总体需求就是将黑名单数据写入Redis中。目的就是将识别到的爬虫数据写入Redis,为剔除爬虫准备数据。实现该目标和需求需要先遍历出最终爬虫数据及打分数据,也就是取出 IP 和 FLOW score,由于之前已经将IP和 FLOW score 返回,所以在入库时先将2个参数取出,取出之后,封装写入Redis所需要的Key,然后封装写入Redis所需要的 value 以及 Key和 value需要在Redis中存储的时间周期,读取完Key、周期和value之后,将数据写入到Redis中上就是整体流程。以下是详细的流程:
1、遍历数据中的 RDD,将其序列化(collect ())(不序列化汇报错: Task not serializable)
2、遍历序列后的每一个数据,获取出数据中的 ip 和 flowsScore
3、遍历 flowsScore,为写入Redis准备数据
4、准备写入 redis 的key
在配置文件中读取出key的前缀+数据的IP+流程id+时间戳
5、准备写入 redis 的 time
在配置文件中读取出数据的保留时间
6、准备写入redis 的value
value包括计算的最终分数+命中的规则+命中的时间
7、将数据写入redis
对于第一步序列化,如果不序列化就会报错:Task not serializable,所以需要先将其调用collec序列化,然后再遍历序列化之后的每一个数据,获取出IP和float score。获取到IP和FLOW score之后,遍历FLOW score,因为FLOW School是一个array为写入Redis准备数据, FLOW School中有所需要的流程ID、分数
预值、规则时间、命令规则时间,以上参数都有可能会用到。接下来准备写入Redis所需要的Key,准备写入Redis所需要的value,以及准备Redis所需要的Time,此时在配置文件中取出Key的前缀,在链路统计时有链路统计的前缀:打开链路统计的前缀是lp,爬虫识别预处理阶段的前缀是dp,写入Redis数据的黑名单,数据中也有前缀将其取出,然后将数据的IP加流程ID加时间戳,将其拼接后的结果作为Key。最终的分数、命中的规则以及命中的时间作为value。 Key和value如何设计取决于和前端所做的约定。例如约定好按照以上格式写入数据,前端获取数据就按照以上格式获取,约定好之后执行即可,此时Key和时间、value准备完毕,准备数据完成之后写入到Redis中,以上就是详细流程。按照以上流程,在代码中做实现。
写入数据第一步,遍历数据中的RDD,将其序列化(collect ())(不序列化汇报错: Task not serializable)
。第二步,遍历序列后的每一个数据,获取出数据中的ip和flowsScore。第三步,准备数据,准备数据就是准备所需要的key,在配置文件中读取出key的前缀+数据的IP+流程id+时间戳。第五步是读取时间,准备写入redis 的time。在配置文件中读取出数据的保留时间。第六步,准备写入redis 的value。value包括计算的最终分数+命中的规则+命中的时间。第七步,将数据写入redis。
第一步遍历数据,BLACK datas
是之前去重之后的结果,调用方法 foreachRdd,将数据写入到Redis中,不需要返回结果,直接遍历Rdd,获取每一个Rdd,以下是第一步遍历Rdd的代码:
blackDatas.foreachRDD(rdd=>i{
})
遍历完成之后将其序列化,所以使用 Rdd 调用方法 collect:
blackDatas.foreachRDD(rdd=>i{
rdd.collect()
})
调用完成之后将数据进行接收:
blackDatas.foreachRDD(rdd=>i{
val blackDatas: Array[(string
,Array[FlowScoreResult])] =rdd.collect()
})
数据类型是array类型。以上就是转化完collect之后进行序列化的结果。
获取到BLACK datas之后,进行第二步遍历序列化的数据,获取数据中的IP和FLOW Score。 BLACK datas是一个array,
所以需要做遍历:
for(blackData<- blackDatas){
}
由以上代码获取到黑名单数据,然后获取数据中的IP:
for(blackData<- blackDatas){
// 获取数据中的IP
val ip=blackData._ 1
//获取数据中的flowsScore
val flowsScore=blackData._ 2
通过以上代码获取出数据。前面第一个是IP,第二个是Flow score。
第三步遍历FLOW score。此时需要将数据类型补全,依然是array。数据类型取决于之前定义的代码。虽然是一个array类型,但是正常只会有一个。刷新结果如下:
以上代码是遍历流程当中的FLOW score。因为可能有多个,所以在float score后加S,然后遍历其中的流程:
for(flowsScore<-flowsScores){
}
获取流程之后准备Redis的Key的数据。在配置文件中读取key的前缀,先准备:
Val=key
key的前缀存在于redis中,黑名单数据如下:
cluster.key.anti_black_list = CSANTI_ ANTI_ _BLACK
以上内容作为前缀,所以只需要将cluster.key.anti_black_list
读出即可。在此时需用到配置文件的读取,配置文件的读取存在于jedis的 Propertiesutil中,key第一个值就是前缀,第2个参数是jedis中的配置文件名称。此时获取到前缀,代码如下:
Val key=PropertiesUtil.getStringByKey(
key="cluster.key.anti _black_ list"
,
jedisConfig.properties)
也可以进行直接拼接,拼接数据的IP,加竖线作为分割,再加上IP,然后,再加分割符,用竖线将数据分开,IP加完以后加流程ID,流程的ID在FLOW score中,加上FLOW score点FLOW ID,再加上一个分隔符,加上时间戳,最后将其转化为string类型。到此为止,前缀已经读取,数据IP和流程ID、时间戳都已经具备,key准备完成:
val key=PropertiesUtil .getStringByKey(
key = "cluster .key. anti_ black_ list",
propName = "jedisConfig. properties")
+"I"+
ip+
" |"
+flowsScore. flowId
+" |"
+System. currentTimeMillis(). Tostring
第五步,准备所需要的时间,在配置文件中取出数据的保留时间。方法与上相同,但是参数不同。时间如下:
cluster. exptime. anti black list = 3600
3600是一个小时,黑名单数据在Redis中存储的周期是一个小时,超过一个小时的数据就全部删除。将前置换掉之后,后面的内容不需要更换,因为名字相同,都在一个配置文件中。代码如下:
val time =PropertiesUtil . getstringByKey(
key = "cluster . exptime .anti_ black_ _list",
propName = "jedisconfig. Properties")
第六步准备写入Redis的value, value包括计算的最终分数、命名的规则和命中的时间。分数在FLOW score中调用Flaws score。命中的规则是hitrules。命中的时间是HITTime。此时拼接完成,代码如下:
val value=
flowsScore.flowScore+
"I"
+flowsScore.hitRules+
"I"
+flowsScore . hitTime
拼接完成之后,进行第七步,将数据写入Redis中。此时已经存在radius的实例。用Jedis调用setex,在其中输入Key和value,时间需要将其转换为INT类型:
jedis. setex(key, time , vaalue )
此时不报错,代码准备完成。以上是爬虫数据写入Redis中的实现思路和实现过程。