开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第三阶段):第三阶段总结】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/671/detail/11655
第三阶段总结
内容介绍:
一、数据清洗
二、数据脱敏
三、数据拆分
四、数据分类
五、数据解析
之前学习了查询类和预定类的数据解析。数据解析完之后,第三阶段也就结束了,下面马上就要进入数据的第四阶段。所以对第三阶段都做了哪些事做总结。
第三阶段要做的事情有数据的清洗、脱敏、拆分、分类打标签以及数据的解析过程。
前面的第一个模块儿做的是链路统计,这个是第二阶段做的事情。
代码:LinkCount.linkCount(rdd);
一、数据清洗
第三阶段的第一个模块叫做数据清洗功能。
数据清洗所要做的事情很简单:在数据采集的第一阶段采集到的数据什么样的都有;第二阶段发送数据到 Kafka 也是一样;第三阶段的3-1也是把所有数据读到预处理的程序里面。但是这些数据里包含了一些需要过滤掉的无用数据。
在这里有一个数据库叫做 itcast_filter-rule 如下图所示,里面的 html,js,css,jpq,png,gif 都是被列入黑名单的,也就是如果数据的 filterURL 是这几种格式,那么数据清洗就把它过滤掉。
这就是数据清洗要做的。
因为数据采集的时候所有的数据都有,3-1也是读取了全部的数据。然后在3-2中需要在数据库中把过滤规则读取到程序里面,然后用一条数据当中的 URL 和这个规则匹配。匹配成功就返回 false 没有成功就返回 true 。
代码:
//数据清洗功能
//定义方法,参数为一条数据和广播变量
val
filteredData=rdd.filter(message=>URLFilter.filterURL(message ,broadcastFilterRuleList.value))
//filteredData.foreach(println)
这个就是数据的清洗功能,调用 rdd 的 filter 功能来进行过滤,在读取 filter 方法之前需要把数据库(message)读过来。将数据的规则(broadcastFilterRuleList.value)也就是黑名单的数据读过来。
它的位置看下面的代码:
//读取数据库中的数据过滤规则
var filterRulelist:
ArrayBuffer[string]=AnalyzeRuleDB.queryFilterRule()
点击进入 queryFilterRule,我们的规则就在这里。
代码://实现数据预处理阶段 数据过滤规则的读取
def queryFilterRule(): ArrayBuffer[string] = {
//读物数据过滤规则的 sql
val sql="select value from itcast_filter_rule"
//接受数据的字段
val field="value"
//调用 QueryDB 读取数据
val filterRuleList=QueryDB.queryData(sql,field)
//返回过滤数据
filterRuleList
}
读取数据的过滤规则就在 itcast_filter_rule 这个表里。把它们都查过来,查过来以后把它收集过来,然后添加到广播变量里。
代码:@volatile var broadcastFilterRuleList=
sc.broadcast(filterRuleList)
在程序初始化阶段,就要读取一遍全新的数据。然后后面每一个批次都要循环判断,在redis里面添加一个标记,这个标记表示它是否需要更新。如果更新为 true 就要重新读取规则,然后将广播变量清空,然后将我新规则重新写入到广播变量里面。然后将标记改成 false。
这就是需要更新的流程。
代码://到 redis 读取是否需要更新的标记
val NeedUpDateFilterRule=redis.get("NeedupDateFilterRule")
//判断是否需要更新 若数据不为空并且数据转成 Boolean 为 true 表示需要更新if(!NeedupDateFilterRule.isEmpty && NeedupDateFilterRule.toBoolean){
//若需要更新,那么在数据库中重新读取新的过滤规则到程序中
filterRuleList=AnalyzeRuleDB.queryFilterRule()
//将广播变量清空
broadcastFilterRuleList.unpersist()
//将新的规则重新加载到广播变量
broadcastFilterRuleList= sc.broadcast(filterRuleList)
//将 redis 内是否需要更新规则的标识改为 ”false"
redis.set("NeedupDateFilterRule" , "false")}
那么更新流程有了,广播变量里的数据有了,message 数据也有了,那么接下来就要调用URLfilter 里面的 filterURL 这个方法去进行匹配。
代码://用于数据过滤object URLFilter {
//实现数据的过滤功能
def filterURL(message: String,FilterRuleList:
ArrayBuffer[String]): Boolean = {
//默认所有的数据都是需要保留的
var save=true
//截取出数据中的 url
val request= if(message.split( regex = “#CS#").length>1)
message.split(regex ="#CS#")(1)else ""
//遍历数据过滤的规则,拿 request 与过滤规则进行匹配
for(rule<-FilterRuleList){
//若匹配成功表示,需要删除,返回false
if ( request.matches(rule)){
//将标记改为false
save=false
}}
//若匹配不成功表示,表示不需要删除,返回true
Save}
在数据当中,先用”#CS#”拆分,用拆分出来的第一个数据去匹配每一个规则,如果匹配成功,返回一个 false,如果不成功就是 true ,这样就是做了一个是否需要过滤的一个匹配。返回完以后就要调用 filter 方法,这样就可以将 false 过滤掉,只保留 true 的,这就是数据清洗的过程。
二、数据脱敏
代码://数据脱敏建立在数据清洗之后
val DataProcess = filteredData.map(message=>{
数据脱敏就是拿到清洗过后的数据,再遍历出其中的某一条。
拿到每一条数据,调用加密数据,然后加密手机号。
代码://手机号的脱敏
val encrytedPhone=
EncryptedData.encryptedPhone(message)
点击进入 encryptedphone。
加密手机号不需要从数据库里面读配置,通过下面这行代码拿到数据:
def encryptedPhone(message: String):String ={
然后直接用一个手机号的正则表达式去匹配:
//获取手机号的正则表达式
val phonePattern =
Pattern.compile( regex = “((13[0-9])|(14[5|7])|(15([0-3][5-9]))|(17[0-9])|(18[0,5-9]))\\d{8}”)
//使用正则匹配数据获取出又可能是手机号的数据:
val phones=phonePattern .matcher(encryptedData)
找到有可能是手机号的数据,然后遍历迭代把每个数据都抽出来。这样这些抽出来的数据就有可能是也可能不是手机号。
遍历数据代码:
//遍历每一个可能是手机号的数据
while(phones.find(){
//过滤出一个可能手机号数据(不一定是手机号)
val phone= phones.group()
//获取到手机号码首位数字 前一位的 index
val befIndex-encryptedData .indexOf( phone)-1
//获取到手机号码最后一位数字 后面一位的 index
val aftIndex=encryptedData.indexOf( phone)+11
//获取到手机号码首位数字前一位的字符
val befLetter=encryptedData.charAt( befIndex).toString
怎么判断到底是不是手机号?
有两种方法,就是分别根据数据前面一个位置的字符和最后一位位置的字符来进行判断,如果前面一位不是数字,后面一位也不是数字,那么它就一定是手机号;前面一位不是数字,后面一位没有,也就是手机号码是最后一个数据,那么它一定是手机号。
判断的整体代码如下:
//3判断出一定是手机号的数据
//3-1手机号码前一个位置不是数字,并且手机号码是一条数据中的最后一个数据,那么表示这个一定是手机号
if ( ! befLetter.matches( regex =”^[0-9]$")){
//若手机号的最后一位后面的角标大于数据的总长度 那么表示这个手机号是数据的最后一位
if(aftIndex>encryptedData.length){
//确定出是手机号后,将手机号加密,替换原始的数据
encryptedData=encryotedData.replace( phone , md5.getMD5ofstr(phone))
}else{//表示数据不是最后一位
//3-2手机号码前一个位置不是数字,并且后一位也不是数字,那么表示这个一定是手机号
//获收到手机号码最后一位数字后面一位的字符
val aftLetter=encryptedData.charAt(aftIndex ).toString
if(!aftLetter.matches( regex= "^[0-9]$")){
//确定出是手机号后,将手机号加密,替换原始的数据
encryptedData=encryptedData.replace(phone ,md5.getMD
5ofstr( phone))
所以一共只有两种情况,也就是要确定它一定是手机号,首先要知道这个数据当中的前一个位置,也就是手机号的前面一个位和后面一位它字符。要找到字符就得先拿到前面一个位置的角标(index)。就是找到手机号前一位和后一位的角标。找到前面一个位置字符,然后进行判断。前面一位不是数字,后面一位的角标超过了数据最长的长度,也就是手机号是最后一位数据,那么就一定是手机号。确定是手机号后,对数据来做一个替换。这是第一种情况,第二种情况是前面一位不是数字,后面一位也不是数字,那么它一定是手机号,然后同样做一个替换。这就是手机号的脱敏工作,最后把替换完的数据进行返回就行了。
身份证号的脱敏思路跟手机号一模一样。只不过身份证号码更换了正则表达式。代码如下:
//用于实现身份证号码的脱敏实现
def encryptedId(message: String) : String = {
//定义临时接受数据的变量
var encryptedData=message
//实例 MD5
val md5=new MD5(
//获取身份证号码时正则表达式
val idPattern = Pattern.compile( regex ="(\\d(18))|(\\d(17)
(\\d|xx))I(\\d[15))")
//使用正则匹配数据据获取出有可能是身份证号码的数据
val ids=idPattern.matcher(encryptedData)
//遍历每一个可能是身份证号码的数据
while(ids.find({
//过滤出一个可能身纷证号码数据(不一定是身份证号码)
val id-=ids.group()
//获取到身份量号码店位数字 前一位的index
val befIndex=encryptedData.indexOf(id)-1
//获取到身份证号码最后一位数字 后面一位的index
val aftIndex=encryptedData.indexOf(id)+18
//获取到身份证号码首位数字 前一位的字符
val befLetter=encryptedData.charAt(befIndex ).tostring
同时,因为身份证号的长度是18位,而手机号的长度是11位。除了这两个其他的思路都是一样的。但这里面稍微要注意的地方,就是手机号脱敏是在原始数据进行的脱敏,如果把手机号脱敏了,下面的身份证脱敏就不能在原始数据上脱敏了。也就是要在手机号脱敏完以后的数据去脱敏身份证号,这样就将两个全都脱敏了。那脱敏工作就结束了。
三,数据拆分
接下来就是数据的拆分,数据拆分是比较简单的,代码如下:
val(requesturl ,requestNethod ,contentType ,requestBotdy,
httpReferrer ,remoteAddr ,httpUserAgent ,timeIso8601,
serverAddr ,cookiesstr ,cookievalue_JESSIONID,
cookievalue_usERID) =Datasplit.datasplit(encryptedId)
这个要调用一个 dataSplit 方法,这个方法里面数据是用 CS# 拼接进来的,拼接到程序里面,然后导入到 kafka。然后将数据读取到程序里时还是拼接后的数据,那么就要用 cs# 去进行分割。
代码://用 CS# 分割数据
val values = message.split(regex =”#CS#”,limit = -1)
分割以后单独抽出数据当中的 url ,以及第二个参数
requestMethod,contentType,requestBody 等等。
//分割出 request中的url
val requesturl = if (request.split( regex = " ").length > 1)
{request.split( regex = “”)(1)
}else {
“”
}
//请求方式GET/POST
val requestMethod = if (valuesLength > 2) values(2) else””
//content_type
val contentType = if (valuesLength > 3) values(3) else ""
//Post 提交的数据体
val requestBody = if (valuesLength > 4) values(4) else ""
抽出来后每个用 cs#拆分,拆分完抽出来的数据以后。把它们封装成一个 tup,然后把这个 tup 返回,返回以后在前面再定义一个空的 tup 进行接收。这样就把数据经过 #CS# 拆分完以后的数据全部拿到了。为什么要做数据拆分?因为后续的很多步骤都需要用到数据当中的这些数据。先把它们拆分开,这样用的时候直接拿就行了,这样就不用等到后面用一次拆一次,那样会很麻烦。这个就是数据的拆分。
四,数据分类
数据拆分以后要做的就是数据分类。分类实际上就是打标签,这里打标签分为两种:一种情况是根据飞行类型或者说航班类型以及操作类型,针对这两种业务场景可以分为国内查询和国际查询,国内预定和国际预定四种情况。这个需求是要把数据一条一条读到程序里,然后确定每一条数据的业务场景到底是哪一个。因为真实的数据当中并没有这些信息,那么后期业务需求里面需要用到这些数据时就要把它计算出来。
计算的方法是,根据数据库当中配置的一个规则,代码是:
//读取数据分类规剩(四种规剩,每种单独读取)到预处理程序
var RuleMaps=AnalyzeRuleDB.queryRuLeMap( )
@volatile var broadcastRuleMaps=sc.broadcast(RuleMaps)
也就是要先找一下广播变量,在广播变量里面有一个 queryRuleMap。分为四种情况:国内查询、国际查询、国内预定和国际预定。
这几种情况分别在 itcast_classify_rule 表里面已经明确的找出来了,下图就是这个表:
这个表里面已经明确告诉国内查询、国际查询、国内预定、国际预定这四种情况它们的 Url正则是什么,就是图中的 expression 。就是说数据里面肯定有 url ,用 url 和正则匹配,匹配上哪个就是哪个业务场景。
这个过程还是很简单的。
先把这个规则按照四种场景把 sql 写出来。
然后有一个接收的字段:Val field=”expression”
然后调用 QueryDB 里面的 queryData 执行 sql 语句
//国内查询规则读取
val nqRuleList= QueryDB.queryData(nqSQL,field)
//国际查询规则读取
val iqRuleList=QueryDB.queryData(iqSQL,field)
//国内预定规则读取
val nbRulelist= QueryDB.queryData(nbSQL ,field)
//国际预定规则读取
val ibRuleList= QueryDB.queryData( ibSQL,field)
然后就拿到了一个结果,也就拿到了四种业务场景的规则,再将四种业务场景的规则封装成一个map,并且返回。
//将四种业务的规网封装到一个 Map 内
val RuleMaps=new
util.HashMap[ string,ArrayBuffer[string]]()
RuleMaps.put("nqRuleList" , nqRuleList)
RuleMaps.put( “iqRuleList",iqRuleList)
RuleMaps.put ("nbRuleList",nbRuleList)
RuleMaps.put ("ibRuleList",ibRuleList)
//将Map返回
RuleMaps}
这样就一次性拿到四个业务场景的规则,然后添加到广播变量里。
//读取数据分类规则(四种规则,每种单独读取)到预处理程序
var RuleMaps=AnalyzeRuleDB.queryRuLeMap( )
@volatile var broadcastRuleMaps=sc.broadcast(RuleMaps)
这里要注意读取数据过程,第一次读取依然是在程序初始化阶段。那么就把分类的这四类规则拿到了,拿到以后来进行判断,判断它是否需要更新。更新的逻辑,在这就不在重复解释了。那么规则已经得到了并且放到了广播变量里面,然后 url 前面也已经解析出来,接下来就是传到 classifyRequest 里面去进行判断。
val requestType: RequestType=
RequestTypeclassifier.cLassifyRequest(requesturl ,broadcastRulemaps.value)
requestType
该怎么判断?从我的广播变量当中把四个规则都抽出来。获得这四种规则:
/ /1先在分类的广播变量中获取四种业务场景的规则
//国内查询规则
val nqRuleList=RuleMaps.get( "nqRuleList")
//国际查询规则
val iqRulelist=RuleMaps.get( "iqRuleList")
//国内预定规则
val nbRuleList=RuleMaps.get( "nbRuleList")
//国际预定规则
val ibRuleList=RuleMaps.get( "ibRuleList"")
//定义返回的类型
var requestType : RequestType=null
//守护进程 true 表示继续执行
var flag=true
然后分别遍历每一个规则,用数据的 url 与每一个规则匹配,匹配上哪个就是哪个业务场景。如果一个都没匹配到。那就是 others ,表示未知或者称为其他,然后返回。这样就返回了它到底是哪种业务场景。那么这个飞行类型操作类型的标签就打完了。
//遍历四种规划与数据进行匹配
//遍历国内查询规则
for( nqRule<-nqRuleList if flag){
//使用数据匹配国内查询的规则 若匹配成功表示 这个url 所在的数据就是”国内查询"
if (requesturl.matches( nqRule)){
flag=false
/ /到了这一步就可以断定这个数据的业务类型
requestType=RequestType(FlightTypeEnum.National,BehaviorTypeEnum.Query)
//遍历国际查询规购
for(iqRule<-iqRuleList if flag){
if (requesturl.matches(iqRule)){
flag=false
requestType=RequestType(FlightTypeEnum.International ,BehaviorTypeEnum.Query)}}
//遍历国内预定规则
for( nbRule<-nbRuleList if flag){
if (requesturl.matches( nbRule)){
flag=false
requestType=RequestType(FlightTypeEnum.National ,BehaviorTypeEnum.Book)
//遍历国际预定规则
for(ibRule<-ibRuleList if flag){
if (requesturl.matches( ibRule)){
Flag=false
requestType=RequestType(FlightTypeEnum.International ,BehaviorTypeEnum.Book)}}
//数据没有匹配上任意一个规则
if(flag){
requestType=RequestType(FlightTypeEnum.other,BehaviorTypeEnum.other)}
//3最终返回类型(国内查询国内预定国际查询国际预定)
requestType
第二个分类标签是单程跟往返。这个就根据数据当中,它的来源、路径里面包含日期类型的数量。一个就是单程两个就是往返。这里不需要规则,直接把数据传进来就行,然后传进来就是原始数据。
//5-2 单程往返
val travelType:
TravelTypeEnum=TravelTypeClassifier.classifyByRefererAndRequestBody(httpReferrer)
travelType
对原数据,先用问号切割,然后再用与字符(&)切分,与字符切分以后拿到多个值,再遍历每一个数据。遍历的数据再用等号切分,如果切分完以后的长度大于一,也就是有两个值。那就拿到第二个值去和日期格式的正则去匹配。每匹配上一次,计数器做一个加一操作。
代码如下:
//使用问号“?”对数据进行切割,获取切割后的第二个数据
if (httpReferrer.contains("?") && httpReferrer.split( regex = "\\?"").length>1){
//2在第一步的基础上,对数据使用"&”进行切割,获取切割后的所有数据
val params=httpReferrer.split( regex ="\\?"")
(1).split( regex = “&")
//3遍历每一个数据,在使用”=”进行切割,获取切割后的第二个数据
for(param<-params){
val keyAndvalue=param.split( regex=”=”)
//4用第三步的数据匹配日期的正则
if ( keyAndvalue.length>1 &&
keyAndvalue(1 ) .matches(regex)){
//5匹配成功计数器加一(计数器用于记录日期格式的数据出现的次)
datecounts+=1
}}}
最终拿到一个日期的数量,最开始默认是零个。然后如果匹配上一个就是一,两个就是二,没匹配上就是零,一共只有这三种情况,根据这三种情况,零返回其他,一返回单程,二返回往返。返回完以后就得到了想要的结果。前面定义一个 TravelType 来接收。这样单程和往返的规则就匹配完了
五,数据解析
最后一个叫做数据解析,解析工作的代码很多,但是需要写的代码不多,业务逻辑也比较复杂。数据解析的主要的目的就是,如下图界面,里面有出发地、目的地、起飞时间、乘机人数、成人乘机人数,儿童乘机人数,婴儿乘机人数这些数据。
那么数据解析的目的是为了解析它们,但直接使用原始数据是没有办法解析出来的。
因为国内查询、国际查询、国内预定、国际预定这四种业务场景,每一个业务场景解析出发境,目的地的规则都不一样。也就说解析数据有多种选择。它们都在数据库当中,比如下图所示的表格:
要解析查询数据,那么有3种选择。那么到底解析哪个?
这个界面就是告诉我们,先找到业务场景,然后再去判断到底用哪个解析,这个表就明确告诉我们如果是国内查询,就用这个正则去匹配,然后去解析这些内容。
上图显示如果是国际查询,就用图例的正则去匹配,然后用下面的规则去解析。正则对应表格里的 RequestMethodExpression ,其他的规则也能从表格里找到。
当有多种选择时,到底以哪个为准?哪么这个业务需求就是解析这些数据。这样就又涉及到了从数据库当中读取数据到程序里。下面是读取数据解析的规则的代码:
//读取数据库内的数据解析规则到预处理程序(将表内的所有查询规则数据全部读取到程序内)
//数据解析规刚--查询类
var queryRule=AnalyzeRuleDB.queryRule( behaviorType =0)
//将规则加载到广播变量
@volatile var broadcastQueryRules=sc.broadcast(queryRule)
//数据解析规则--预定类
var bookRule=AnalyzeRuleDe.queryRule( behaviorType = 1)
@volatile var broadcastBookRules=sc.broadcast( bookRule)
数据解析分查询型和预定型。查询的 behaviorType 是0,预定是1。在 analyzeRuleDB 里面,
代码:val sql: String = "select*from analyzerule where behavior_type =" + behaviorType
的作用是把数据库当中的 analyzerule 这个表里面的所有的数据、字段全都查过来,查过来以后全部加载到查询类跟预定类的代码中。
这样查询类的和预定类的数据就能全部拿到。接下来就是判断是否需要更新,如果需要更新就重新读取数据库,清空广播面量,重新加载,并且把 redis 的 true 改成 false。然后就是解析数据的过程。调用我们给大家提供好的 analyzeRequest ,这里面有个方法,就是当解析有三种情况不知道到底该用哪一个时。
就通过下面这几段代码来确定出到底以哪一个为准。
//数据库中有四条解析媒规则,需要通过传过来的这一条数据,确定数据匹配上的解析规则,然后用这个解析规则解柄数据
val matchedRules = analyzeRules.filter {rule =>
//先根据请求方式和请求类型过滤出的是属于查询的规则
if(rule.requestMethod.equalsIgnoreCase(PequestMethod)&
& rule.BehaviorType == requestTypeL abel,
behaviorType.id)
true
Else
false
}filter { rule ->
//然后根据 url 正则表达式。过滤出属于国内查询还是国际查询 requost .matches(rule.repuestMatchExpression)}
确定了以哪个为准去解析,只需要知道用后面的代码去解析,解析出来以后把数据反馈给我们就行。查询数据的解析和预定数据的解析是一模一样的流程。以上就是第三阶段实现的几个模块的回顾。