开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第四阶段:数据预处理-封装 ProcessedData 上】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/672/detail/11667
数据预处理-封装 ProcessedData 上
数据结构化
历史爬虫判断结束以后进入到数据结构化过程
1、目标
将经过拆分的数据和与前面计算出来的各个指标(国内查询、国际预定、单程、往返、解析出的数据、是否是历史爬虫)封装成 ProcessedData
拆分后的数据实际上是 split 的,把数据拆分以及用拆分出来的数据去计算出来指标,Request type、 travel type、预定的数据、查询类的解析数据和预订类的解析数据以及是否是高频 IT,也是是否在历史的爬虫中出现过。封装模块将前面的解析拆出来的数据和封装出来的数据封装成 ProcessedData
2、需求
将数据按照 ProcessedData 格式封装,后期会将数据处理后的最终结果 ProcessedData 发送 kafka 供后续 rulecompute 业务进行处理
打开 common-bean- ProcessedData
*requesttype:请求类型 travelType:往返类型
*requestParams:核心请求参数,飞行时间、目的地、出发地
*cookieValue JSESSIONID:cookie 中的 jessionid
* cookieValue USERID:cookie 中的 userid
*queryRequestData:查询请求的 form 数据
*bookRequestData:预定请求的 body 数据
*httpReferrer: refer
*/
保存结构化数据
case class ProcessedData(sourceData: String,requestMethod: String, request: string,
remoteAddr:String,httpUserAgent:String,timeIso8601:string, serverAddr:string,highFrqiPGroup:Boolean,
requestType:RequestType,travelType:TravelTypeEnum,
requestParams:CoreRequestParams,cookieValue JSESSIONID:StringcookieValue USERID: String
queryRequestData:Option[QueryRequestData],bookRequest DataOption[BookRequestData],
httpReferrer: String){
//用 null 替换空数据
implicit class stringutils(s: string){
def repEmptystr(replacement:String ="NULL"):string = {
if(s.isEmpty)replacement else s
}
}
case class 是样例类,里面很多的参数,ProcessDatad 是数据预处理阶段也是蓝色的部分,最终需要封装的结果,把里面所需要用到的参数全部提供出来封装 ProcessDatad,数据预处理基本就做完了分析操作就做完了。封装需要用到很多的参数,实际上大部分现在已经有了,把数据传进来。
3、代码
//8 数据结构化
DataPackage.dataPackage()
新建数据预处理的 businessprocess Scala object
:
package com.air.antispider.stream.dataprocess.businessproc ess
//用于实现数据非封装封装成ProcessedData
object DataPackage {
}
数据和计算数据包装成 ProcessedData,然后 process 的格式所需要的参数有一部分有一部分没有,在方法里面去封装,拆分出来的数据都在这,而前面计算出来的各个指标数据也都在这里,把这些数据都传递到里面来去封装 ProcessedData,顺序按照它来,所以既然要数据而且要顺序,里面现在数据基本都有,就按照顺序直接复制粘贴,删掉 String 类型。
//8 数据结构化
DataPackage.dataPackage()
“”, requestMethod, requestUrl,
remoteAddr,httpUserAgent,timeIso8601,
serverAddr, IsfreIP,
requestType, travelType,
cookieValue JSESSIONID,cookieValue USERID
queryRequestData,bookRequestData,
httpReferrer
sourceData 没有,process 对着里面,sourceData 指的是请求的原始数据,原始数据是我一个循环里面的 message,或者经过手机号码脱敏以后的数据是原始数据。虽然要原始数据但是先不给,做一个预留字段,先给个空如果后面有业务需求,真的需要原始数据,再传过来,如果不需要先给个空做预留。
然后 requestMethod 现在有拆分方法,request 想要的数据是真正的数据当中的 url。
有高频 IP 的组,实际上是刚刚计算出来的也是历史爬虫数据,是不是历史爬虫中出现,也是高频 IP 里面直接切过来把它替换掉。
requestParams 的数据类型是一个核心的请求参数,实际上在ProcessedData scholar 的文件中:
//封装请求类型:航线类别(0-国内,1-国际,-1-其他)和操作类别(0-查询,1-预定,-1-其他)
case class RequestType(flightType:FlightTypeEnum,behavior Type:BehaviorTypeEnum)
用于封装核心请求信息:飞行时间、出发地、目的地,
case class CoreRequestParams(flightDate: String,depcity: Stri ng, arrcity: String)
拖到最后有核心的请求参数也是一个样例,需要用到起飞时间、出发地和目的地来解析。出发时间、出发地和目的地封装出来的核心请求参数前面没有用到,先删掉,删掉以后在封装 ProcessDate 里面,现在缺少一个核心的请求参数,缺少 requestParams 缺的在 data package 里面来实现,添加进来就可以,然后在 package 里面封装 ProcessedData。
//6 数据的解析
//6-1 查询类数据的解析
//1前面
//2前面
//3对数据进行解析(在多种解析规则的情况下,确定最终使用哪一个规则进行解析)
val queryRequestData =AnalyzeRequest.analyzeQueryRequest (requestType,requestMethod,contentType
requesturl,requestBody,travelTypebroadcastQueryRulesvalue)
val data=queryRequestData match{
case Some(datas)=> datas.flightDate
data
//6-2 预定类数据的解析
val bookRequestData =AnalyzeBookRequest.analyzeBokReq uest(requestType,requestMethod,
contentType,requestUrlrequestBody,travelType,broadcastBookRules.value)
//7 历史爬虫判断
//1
//2
//3将数据中的 ip 与历史出现过的黑名单 IP 数据进行对比,判断是否相等
Ipoperation.isFreIP(remoteAddr,broadcastBlackIpList.value)
//4若有任意一个是相等的返回 true,反之返回 false
//8 数据结构化
DataPackage.dataPackage("",requestMethod,requestUrl,remoteAddr, httpUserAgent, timeIso8601,serverAddr, isFreIP,reque stType,travelTypecookieValue_JSESSIONID,cookievalue USER ID,queryRequestData, bookRequestData,httpReferrer)
创建方法:
package com.air.antispider.stream.dataprocess.businessproc ess
import com.air.antispider.stream.commonbean{BookReuestD ataQueryRequestData,RequestType
import com.air.antispiderstream.dataprocess.constantsTrave lTypeEnum.TravelTypeEnum
//用于实现数据非封装封装成
ProcessedData object DataPackage {
//封装ProcessedData
def dataPackage(str: String,requestMethod:String,requestur l:String,remoteAddr: String,httpUserAgent: StringtimeIso8601:stringserverAddr:string
,isFreIp:BooleanrequestType:Reque stType, travelType: TravelTypeEnumcookieValue JSESSIONID: StringcookieValueUSERID:StringqueryRequestData:Option[QueryRequestDatal,bookRequestData:OptionBookRequestDa tal, httpReferrer:String):Unit ={
假设封装完以后,把前面准备好的参数直接复制过来,然后粘到里面,爆红的原因是缺少核心的请求参数。假设核心参数已经有了以后把它引进来,让后面封装 process 不报错,在 Travel type 的后面和 cookie 的前面,找到相应的位置,然后把封装添加进来以后就不报错。然后让它在代码的最后一行返回来,把封装好的 ProcessData 返回,只不过现在核心的请求参数还没有,接下来就来封装。
def dataPackage(str: String,requestMethod:String,requestur l:String,remoteAddr: String,httpUserAgent: StringtimeIso8601: string,serverAddr:string,isFreIp:BooleanrequestType:Reque stType, travelType: TravelTypeEnumcookieValue JSESSIONID: StringcookieValue USERID: String queryRequestData:Option[Q ueryRequestDatal,bookRequestData:OptionBookRequestDa tal, httpReferrer:String) ={
//封装reauestParams:CoreReauestParams
val requestParams:CoreRequestParams
ProcessedData("", requestMethod,requestUrl,remoteAddr,httpUserAgent,timeIso8601,serverAddr, isFreIP requestType,travelTyperequestparam scookieValue JSESSIONID,cookieValue USERIDqueryRequestDa ta bookRequestData,httpReferrer)
}
}