开发者学堂课程【大数据实战项目 - 反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第七阶段:最终整体回顾总结(代码-预处理及爬虫识别)】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/675/detail/11744
最终整体回顾总结(代码-预处理及爬虫识别)
内容简介:
一、数据采集
二、数据预处理
三、爬虫识别
一、数据采集
代码:
[root@c]intnode testlua]#pwd
/usr/local/openresty/test lua
G
etDataToKa
.lua test01/lua test03.lua test05.lua test07.lua test.09.lua TestRedis.lua
Hello.lua test02 test04.lua test06.lua test08.lua test10,lua
[root@clintnode testlua]#
首先,我们采集阶段做了一个阈值的判断,那到了一定的阈值以后我就不采集数据了。
然后引入了一个我的 Kafka 的依赖包,实例了一个 Kafka 的 broker-list,然后实例 Kafka 的一个生产者。
实际上这个过程很简单,引入第3方包,后面是创建 Kafka 的生产者在第3步,走到我们的最后一步,发送数据即可。
基本上就这三步。但是呢,我们在这里面做了两件事情,这里面生产者创建出来以后,在发送数据的时候我们需要发送有两个数据,有两个数据,一个是 topic,还有一个就是分区的编号以及 message。
topic 我们已经定死,分区编号我们则需要花了很多的时间。
我们在取分区编号的时候,先获取了共享字典的变量,然后我去变量里面找 cont 对应的值,这个值代表的数据的行号,我用行号与我的分区数量取余积,我就把数据放在哪个分区里面。
只不过这个行号怎么取呢?数据没有行号,我们需要自己设置一个行号。然后共享字典,来一条数据,我加个1。来一条数据我加一个1,加了以后,我用数据编号与数据的分析术语写完。
所以我们在分区编号这个地方我们多花了一点时间,然后再往后就是获取数据,获取数据就按照这个格式把你想要的数据全部都获取过来,然后拼接成一个字符串,用一个固定的模式,我们用的是“#cs#“字形拼接完了以后分区有了,topic 有了数据,那么直接发送数据就可以了,那这样的数据就推送到 Kafka 里面了,这个就是我们的数据采集。
二、数据预处理
(1)、预处理前期准备工作
代码
Def main (args:Array[String]):Ubit={
/
/
添加日志级别的设置
L
oggerLevels.setStreamingLogLevels()
/
/
当应用被停止的时候,进行如下设置可以保证当前批次执行完之后在停止应用
S
ystem.setProperty(“spark.streaming.stopGracefullyOnShutdom”,”true”)
首先写一个 main 方法,添加日志级别的设置。随后创建 Spark conf 和 SparkContext
之后,再设置集群监控任务的开启,然后再往后就写了爬虫的数据预处理的一个代码号,setupSsc,然后将 Kafka 的配置文件和相关的信息传到这个方法里面,我在这个方法里面来读取数据
(2)、链路统计功能和数据清洗功能
代码
/
/1
链路统计功能
/
/
将每个服务器本批次数据访问了多少次返回
/
al serverCount=LinkCount.linkCount(rdd)
/
/2
数据清洗功能
/
/
定义方法,参数为一条数据和广播变量
Val filteredData=rdd.filter(message=>URLFilter.filterURL(message,broadcastFilterRuleList.value))
/
/filteredData.foreach(println)
然后,我们读取过来数据以后。拿到第2个数据就是 value 数据进行计算,计算的第1个功能模块叫做链路统计,打开第1个功能模块叫做链路统计功能模块主要计算,链路统计模板就是我当前活跃用户连接数以及我服务器近几天采集的数据的一个情况。
第2个是数据的清洗,清洗里面,我涉及到规则的读取,我在数据库里面,企业里面的一些规则在马赛克数据库,
所以,我要运行计算的时候,先在规则里面把数据读取过来,筹码之后读到数据,从码字后读到我的程序里面,读过来以后我进行广播变量的判断,读取过滤规则,就是类似这些读取规则,然后读取过滤规则过去以后,放到广播变量,广播变量这里面涉及到可能需要更新,有一个更新的逻辑,更新完以后我们去进行数据的清洗,清洗掉以后,去用 URL,用我数据截取出来的 URL,用 uio 去匹配,匹配一下,不是需要过滤掉,如果是,那就要返回,如果是匹配上了就返回 false,没有就返回错误,返回是表示我要过滤掉的数据。
(3)、数据脱敏功能和数据拆分功能
代码
/
/3
数据脱敏功能
/
/3-1
手机号码脱敏
Val encryptedPhone=EncryptedDAta.encryptedPhone(message)
/
/3-2
身份证号码脱敏
Val encryptedId=EncrytedData.encryptedId(encryptedPhone)
/
/encryptedId
清晰完数据以后,就是对数据进行脱敏。这里以手机号脱敏和身份证号的脱敏为例,这里有一个关键的地方,就是你怎么样来判断它到底是手机号还是不是手机号呢?
前面一个不是数字,后面一个不是数字,那它就是手机号。前面一个不是数字,后面一个没有了数据的最后一个数据的最后一个阶段,这一批数据就是手机号,那它就一定是手机号,然后我来替换就行了,身份证号的逻辑和他是一模一样的,再往后就是进行数据的拆分,拆分的目的是后面有这么多的模块,要算的时候会用到数据里面的一个一个的小数据,先把它拆分开,放着用的时候直接拿。
(4)、数据分类功能(打标签)和数据解析
数据的分类就是我的分类,分为飞行的形式,国内查询,是飞行,还是国内和国际,查询操作的操作类型是查询和预订,他们分别两两组合,就是国内查询,国际查询,国内一定和国际一定这4种情况,我要判断数据到底是哪一个业务,场景使用谁判断呢?
使用 URL,UIO,与这4个规则,独立的分别判断,匹配上哪个它就是哪个业务场景,这个判断再往后,找单程和往返单程和往返,那这里面要做的事情就是判断你的数据,你的数据当中 HTTP 的水分里面有日期的数量,一个就是三成,两个就是往返没有,就是其他。
数据的解析,解析的目的就是为了解析出发地,目的地,起飞时间,成人,婴儿儿童的成绩等一系列数据。解析数据需要应用 URL,拿到 URL 去匹配出你的数据到底是哪个业务场景来之前的业务场景,业务场景确定了以后,我再确定你要哪个规则,而这个解析规则,实际上同学们只要知道给它这几个参数,它自动会将我们想要的数据给我们返回,包括触发节目,滴滴,起飞时间等等数据,解析又分为查询数据的解析和预定数据的解析。
(5)、历史爬虫判断
历史爬虫数据,就是判断我的每一条数据是否在历史出现过,我历史出现过的爬虫数据备份到数据库里,我从数据库里面拿过来以后,我用当前 IP 和历史出现过的去判断一下它是不是相等。
(6)、数据结构化
数据结构化就是把数原本“#CS#”字形拼接的数据拆开,拆开以后,我要写到数据库之前,还得把它再拼成一个字符串,拼这个字符串,只不过这里面有一个核心请求参数,核心请求参数我们是需要计算的。怎么计算呢?
用出发地目的地和起飞时间封装成核心请求参数,这样我们就为我的这个 Pro 这个结构化的数据全部准备好了,封装完以后我就拿到了封装库的数据,再往后端进行推送。
(7)、数据推送
查询的数据推送到查询的 topic,预订的数据推送到预定的 topic,这样就结束了,然后后面是监控,我们实际上就需要数据的总量,数据总量获取到,然后再拿到系统运作我每一个批次运营的时间。要获取时间我们通过uio拿到一个大的 Jason。
Jason 里面包含了我的任务的开始时间和结束时间,我想办法把这个这段时间抽取出来做差,它就是运行时间了,数据量有了,运行时间有了,这个商就是要速度。然后再看一下企业里面你到底要哪些数据,你要什么数据我给你什么数据,这样我们就把监控做完了。
最后将数据按照固定的格式写到Redis里面就可以了。
三、爬虫识别
(1)数据封装
/
/ kafkavalues.foreachRDD9rdd=>rdd.foreach(println))
/
/
数据加载封装
/
/
将kafka内的数据使用“#
cs#”
拆分,拆分后封装成p
rocessedData
Val processedData=QueryDataPackage.queryDataLoadAndPackage(kafkaValues)
/
/processedData=QueryDataPackage.queryDataLoadAndPackage(kafkaValues)
/
/processedData.foreachRDD(rdd=>rdd.foreah(println))
首先读取数据,读取过来的爬虫数据,先要将数据封装成 process date,我就通过后面的这个,plus 对的对一下点的形式来进行获取数据更方便,这个是封装,封装完以后。
接下来就是8个指标的计算,也就是我们流程当中的这8个指标的计算过程。
(2)按 IP 段聚合-5分钟内的 IP 段(IP前两位)访问量
代码
Val ipBlockCounts=CoreRule.ipBlockCounts(processedData)
/
/
将最终数据转换为Map,便于后续提取使用。
Var ipBlockCountsMap:collection.Map[String,Int]=nu11
i
pBlockCounts.foreachRDD(rdd=>{
i
pBlockCountsMap=rdd.collectAsMap()
}
)
/
/ipBlockCounts.foreachRDD(rdd=>rdd.foreach(printLn))
第1个指标,某个IP段的访问量,那我拿出获取出数据当中的 IP,这个是计算方获取出数据端的IP,我再截取出 IP 段,之后将 IP 段和1输出,返回,返回以后调用 read 或者是 readByAndwindow 去进行计算,这样就能够计算出 IP 段的访问总量,它就类似于我的 count,而我们这里是 IP 段的 count,这是第1个指标。
(3)、按 IP 地址聚合-某个 IP,5分钟内访问总量
代码
Val ipCounts=CoreRule.ipCounts(processedData)
/
/
将最终数据转换为Map:
collection.Map[string,Int]=nu11
i
pCounts.foreachRDD
(
rdd=>{ipCountsMap=rdd.collectAsMap()})
/
/ipCounts.foreachRDD(rdd=>rdd.foreach(println))
第二个指标,按 IP 地址聚合,某个 IP,5分钟内总访问量。拿到 IP 和1,直接返回,做一个reduceByKeyAndWindow,就返回了我的总量。
(4)、按 IP 地址聚合-某个 IP,5分钟内的关键页面访问总量
时间问题关键页面的访问,流量在它的第2个前提下加了一个关键页面,所以要在 IP 和1输出之前先判断你是不是关键页面,而关键页面的数据已经在数据库里面了,我们从数据库里面读过来以后去进行判断,用 URL 数据当中的第3个,从数据当中抽取出IP和URL,我用URL匹配规则,如果匹配上了,就是关键页面,就输出IP和1,不是就返回IP和0,然后再调用 reduceByKeyAndWindow,这样就计算出来了关键页面的访问总量。
(5)、按IP地址聚合-某个 IP,5分钟内的UA种类数统计
第四个指标是按IP地址聚合,某个IP,5分钟内的 UA 种类数。先对 UA 要进行去重,统计 UA 里的将 IP 作为 Ku 则 AD 作为value,然后我去进行 grow up by and key,grow up by Mt 输出进行 grow up grow up 以后,我单独抽取出IP和user agent的例子,然后去将它的 list 进行 distinct,然后求size就是我们想要的UA的种类数。
(6)、按IP地址数据转换为 Map,便于后续提取、使用
第五个指标是不同行程的次数,和第4步非常像,只不过我将这里面是 UA 制的,我换成了出发地和目的地拼接这个字符串。
(7)、按IP地址聚合-某个 IP,5分钟内访问关键页面的 Cookie 数
第6个指标是cookie。
cookie 的逻辑和这个 UA 的逻辑非常相似,只不过这里面又加了一个前提条件,关键页面,先判断是关键页面,然后再输出 IP 和 cookie,然后再输入 alphabet。
(8)、按 IP 地址聚合-某个 IP,5分钟内的关键页面最短访问间隔
第7个指标是求最小访问时间间隔,代码量虽多,但是业务不复杂。它还是关键页面的前提下判断,是不是关键页面?
如果是关键页面我输出 IP 和时间,然后,我拿到多个时间,我就有时间差,求最小时间差,那就要拿到一个月的时间,然后将它转换成时间差,然后我对时间的list进行排序,排序以后我得到的是相邻的两个时间,我在相邻的两个求时间差,求完时间差,就将那个差,放在一个list里面,然后我在对这个时间差的 list 排序,排完序。
因为是升序的排名,以后第1个它就是最小的值,这里面IP和最小值就有了,然后我返回就可以了,实际上代码挺多,但是逻辑不复杂,就一步一步的做就行了。
(9)、按 IP 地址聚合-某个IP,5分钟内小于最短访问间隔(自设)的关键查询次数,
第8个指标,按 IP 地址聚合,某个IP,5分钟内小于最短访问间隔(自设)的关键页面查询次数。
小于某个自设值,自设阈值的这个关键页面的次数,而且必须是这个两次访问的时间差小于阈值。操作方法基本是上一样的,那前提依然要判断你是关键页面,关键页面判断一番。
最小阈值我们从系统里面已经配好了,我们先把这个最小的阈值读取出来,最小阈值有了以后,是关键页面,是不是关键页面,关键页面有在输出时间,而后面的操作全部都跟第7步非常相似,只不过这里面我不需要时间差,我有了时间差,我拿着这个差直接去与我的最小值比对,如果小于的值,那我就要在计数器加1,然后我再返回IP和这个计数器的值就行了,这就是我们第8个指标。
(10)、按照八个指标进行爬虫识别
代码
/
/1
指标碰撞
/
/2
最终的打分
/
/3
爬虫判断(结论:是/不是)
Val antiCalculateResults=processedData.map(message=>{
/
/
获取数据中ip
Val ip=message.remoteAddr
/
/
返回结果是最终的返回结果A
ntiCalculateResult
Val antiCalculateResult=RuleUtil.calculateAntiResult(message,ip,ipBlockCountsMap,ipCountsMap,criticalPagesCountsMap,udifferentJourneysCountsMap,ipCookCountMap,minTimeDiffMap,IessDefaultTimesMap,broadcastFlowList.value)
a
ntiCalculateResult
}
)
利用这8个指标进行爬虫的判断,先准备数据。这里是 IP 和8个 map,要拿到的是map里面的 IP 对应的值,把这个值抽取出来,IP在这7个 map里面,通过IP把它抽出来,分出一个 map,去做指标碰到的第1个数据,就有了第1个数据,然后有了第2个数据,这些数据就是我的阈值数据库里面配置的这些阈值。
之后我们传入到碰撞算法里面,碰撞算法会最终给我们算出,你和选择是否无关的值,还有在必须支持选定的同时,指定大于阈值的数据值,把两个结果发送给最终的打分算法,它就会给我们一个分数。
(11)、爬虫判断
分数出来以后,我就和阈值进行比较,是爬虫输出 true,不是输出 false。判断完以后进行封装即可。
(12)、数据入库
识别完爬虫以后,就进行非爬虫的过滤。只关注于它是不是爬虫,普通用户直接放行即可。
爬虫数据拿到了以后,要进行爬虫的去重,一个批次里面可能有一个爬虫的多条数据,每一条数据都判断一次,就要一个数据即可,拿一个数据,去除即可,AP和普洛斯提取出来,然后调用reduceByKey,拿到最后一个结果,这样就去重成功,之后将数据写到 redis,一步一步写到 redis,拿到 Key,但实际上这个可以完全取决于我们和前端做的约定,之后剔除的工作交给前端工程师即可。