开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第四阶段:数据预处理-封装 ProcessData 中(封装核心请求参数)】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/672/detail/11668
数据预处理-封装 ProcessData 中(封装核心请求参数
数据结构化
CorsRequestParams 在 ProcessData 需要参数而且是这个类型,选中走到代码的文件走到最后一行,就是核心请求算数要封装的核心,会用到起飞时间、出发地以及目的地这几个数据,然后就可以去封装核心请求参数。
//保存结构化数据
case class ProcessedData(sourceData: String,requestMethod: String,request: String,
remoteAddr: string,httpUserAgent: String,timeIso8601:string, serverAddr:String,highFrqIPGroup;Boolean,
requestType:RequestType,travelType:TravelTypeEnum
requestparams:CoreRequestParams,cookieValue JSESSIONID: String,cookieValue USERID: String
queryRequestData:OptiomQueryRequestData],bookRequest Data:Option[BookRequestData], httpReferrer:string){
//用 null 替换空数据
implicit class Stringutils(s: string) {
def repEmptystr(replacement: String ="NULL"):string = {
if(s.isEmpty) replacement else s
}
}
//推送到 kafka 的数据格式,使用 #CS# 分隔数据
def tokafkastring(fieldSeparator: String ="#CS#"):String ={
//转换查询参数和预订参数对象为 JSON
val mapper =new objectMapper();
mapper.registerModule(DefaultscalaModule)
val queryRequestDatastr=queryRequestData match {
case Some(value)=>
try {
mapper.writeValueAsstring(value)
} catch {
case _: Throwable => ""
)
case _ => ""
}
val bookRequestDataStr=bookRequestData match{
case Some(value)=>
try {
mapperwriteValueAsString(value)
} catch {
case _: Throwable => ""
}
case _ => ""
}
//_0-原始数据
sourceData.repEmptystr()+fieldSeparator+
//_1-请求类型GET/POST
requestMethod.repEmptystr()+fieldseparator+
//_2-请求http://xxxxx
request.repEmptystr()+ fieldseparator+
//_3-客户端地址(IP)
remoteAddr.repEmptystr()+fieldseparator+
//_4-客户端浏览器(UA)
httpUserAgent.repEmptystr()+fieldseparator+
//_5-服务器时间的ISO 8610格式
timeIso8601.repEmptystr()+fieldseparator+
//_6-服务器地址
serverAddr.repEmptystr()+fieldSeparator+
//_7-是否属于高频IP段
highFrqIPGroup+fieldSeparator+
//_8-航班类型-National/International/other
requestType.flightType+fieldSeparator+
//_9-消求行为-Query/Book/other
requestType.behaviorType+fieldseparator+
//_10-行程类型-OneWay/RoundTrip/Unknown
travelType+fieldseparator+
//_11-航班日期 -requestParams.flightDate.repEmptystr()
+fieldseparator
//_12-始发地 -
requestParams.depcity.repEmptystr()+fieldseparator+
//13-目的地 -
requestParams.arrcity.repEmptystr(()+fieldseparator+
//14-关键cookie-JSESSIONID
cookieValue JSESSIONID.repEmptystr()+fieldseparator+
//15-关键Cookie-用户ID
cookieValue USERID.repEmptystr()+fieldseparator+
//16-解析的查询参数对象JSON
aueryReauestDatastr.repEmptystr()+fieldseparator+
//17-解析的购票参数对象JSON
bookRequestDataStr.repEmptystr()+fieldseparator+
//18-当前请求是从哪个请求跳转过来的 httpReferrer.repEmptystr()
}
}
//封装请求类型:航线类别(0-国内,1-国际,-1-其他) 和操作类别(0-查询,1-预定,-1-其他)
case class RequestType(flightType:FlightTypeEnumbehavior Type:BehaviorTvpeEnum)
//用于封装核心请求信息:飞行时间、出发地、目的地、
case class CoreRequestParams(flightDate:Stringdepcity:String, arrcity:string)
里面不是没有三个数据出发地、目的地,起飞时间,只是隐藏在数据里面,数据的解析时专门去解析出数据当中出发地、目的地、起飞时间以及成绩。
//6数据的解
//6-1查询类数据的解析
//1前面
//2前面
//3对数据进行解析(在多种解析规则的情况下,确定最终使用哪一个规则进行解析)
val queryRequestData=AnalyzeRequest.analyzeQueryReques t(requestType,requestMethod,contentType
requestUrl,requestBody,travelType,broadcastQueryRules.value
val data=queryRequestData match
case Some(datas)=> datas.flightDate
}
data
//6-2 预定类数据的解析
val bookRequestData =AnalyzeBookRequestanalyzeBookR equest(requestType,requestMethod,
contentTyperequestUrl,requestBody,travelType,broadcastBookRules.value)
//7 历史爬虫判断
//1
//2
//3将数据中的 ip 与历史出现过的黑名单 IP 数据进行对比,判断是否相等
val isFreIP=IpOperation.isFreIP(remoteAddrbroadcastBlackIpLi st.value)
//8 数据结构化
DataPackage.dataPackaae(str=“"reauestMethod,reauesturl.remoteAddr,httpUserAgent,timeIso8601.serverAddr, isFre
requestType,travelTypecookievalue JSESSIONID,cookieValue USERIDqueryRequestData,bookRequestData,httpReferrer
})
DataProcess.foreach(println)
查询里面有可能会有出发地和目的地,预定的里面也可能会有,query 的 request 里面查询的解析数据结果以及预定的检测数据结果中,包含了出发地和目的地以及起飞时间以及其他的数据,所以三个数据在里面的,而参数也已经传递到 datapackage 里面。
//用于实现数据非封装封装成ProcessedData object DataPackage {
//封装 ProcessedData
def dataPackage(str: String, requestMethod: String, request Url:String, remoteAddr: String, httpUserAgent: String,timeIso 8601:String,serverAddr:String,isFreIp:Boolean,requestTy pe:RequestType,travelType: TravelTypeEnum,cookieValue JSE SSIONID:StringcookieValue USERID: String queryRequestDa ta:Option[QueryRequestData]bookRequestData:Option[BookRequestData], httpReferrer:String):ProcessedData={
//封装 requestParams:CoreRequestParams
//买例出发地、目的地、起飞时间的变量
var depcity=""
var arrcity =""
var flightDate=""
//在查询解析后的数据中抽取出出发地、目的地、起飞时间 queryrequestData match {
case Some(datas)=>
depcity=datas.depcity
arrcity=datas.arrcity
flightDate=datas.flightDate
case None=>
}
}
//在预定解析后的数据中抽取出出发地、目的地、起飞时间 bookRequestData match {
case Some(datas)=>
depcity=datas.depcity
arrcity=datas.arrcity
flightDate=datas.flightDate
case None=>
}
标红线里面有list buffer string 而返回类型是 string ,上面调用的是 query request data 查询返回的数据,里面 depCity 、arr、flightDate 是字符串。
预定的 request data,出发地、目的地、起飞时间是一个 buffer。
//实例出发地、目的地、起飞时间的变量
var depcity=""
var arrcity =""
var flightDate=
//用于实现数据非封装封装成ProcessedData object DataPackage {
//封装ProcessedData
def dataPackage(str: String, requestMethod: String, request Url:String, remoteAddr: String, httpUserAgent: String,timeIso 8601:String,serverAddr:String,isFreIp:Boolean,requestTy pe:RequestType,travelType: TravelTypeEnum,cookieValue JSE SSIONID:StringcookieValue USERID: String queryRequestDa ta:Option[QueryRequestData]bookRequestData:Option[BookRequestData], httpReferrer:String):ProcessedData={
//封装 requestParams:CoreRequestParams
//买例出发地、目的地、起飞时间的变量
var depcity=""
var arrcity =""
var flightDate=""
//在查询解析后的数据中抽取出出发地、目的地、起飞时间 queryrequestData match {
case Some(datas)=>
depcity=datas.depcity
arrcity=datas.arrcity
flightDate=datas.flightDate
case None=>
}
}
//若数据在查询的结果中已经封装完了(flightDate有值),则不使用预定的数据封装
if(flightDate==""){
//在预定解析后的数据中抽取出出发地、目的地、起飞时间
bookRequestData match {
case Some(datas)=>
depcity=datas.depcity.tostring()
arrcity=datas.arrcity.tostring()
flightDate=datas.flightDate.tostring()
case None=>
}
}
//封装核心请求参数
val requestParams:CoreRequestParams=CoreRequestParams (flightDatedepcity,arrcity)
ProcessedData("", requestMethod,requestUrl,remoteAddr, htt pUserAgent, timeIso8601,serverAddr, isFreip,requestType, travelType,requestParams,cookievalue JSESSIONID, cookieva lue USERID,queryRequestData, bookRequestData,httpReferrer)
}
默认返回前面做接收:
//8 数据结构化
val processedData=DataPackage.dataPackage("",requestMe thod,requestUrl,rem oteAddr, httpUserAgent, timeIso86 01,serverAddr, isFreIPrequ estType,travelType,cookieValue JS ESSIONID,cookievalue USE RID,queryRequestData,bookRequ estData,httpreferrer)
})
DataProcess.foreach(println)
打包数据:
//封装 requestParams:CoreRequestParams
//实例出发地、目的地、起飞时间的变量
var depcity=""
var arrcity =""
var flightDate=""
//在查询解析后的数据中抽取出出发地、目的地、起飞时间
queryRequestData match {
case Some(datas)=>
depcity=datas.depcity arrcity=datas.arrcity
flightDate=datas.flightiate case None=>
//若数据在查询的结果中已经封装完了(flightDate有值),则不使用预定的数据封装 if(flightDate==""){
//在预定解析后的数据中抽取出出发地、目的地、起飞时间 bookRequestData match {
case Some(datas)=>
depcity=datas.depcity.tostring()
arrcity=datas.arrcity.tostring()
flightDate=datas.flightDate.tostring()
case None=>
}
}
//封装核心请求参数
val requestparams: coreRequestParams=CoreRequestParam s(flightDatedepcity,arrcity) ProcessedData("",requestMethod,requesturl,remoteAddr,httpUserAgent, timeIso8601,serverAddr, isFreiprequestType,travelTyperequestParamscookieValue_JSESSIONIDcookieValue_USERIDqueryRequestData, bookRequestData httpReferrer)
}
参数都是已知的,封装 ProcessDate 用到这些参数,但是缺核心请求参数,核心请求参数需要用到起飞时间、出发地和目的地,上面的代码是来封装出发地和目的地的,获取出来然后再去封装,有可能在查询解析出来的数据当中,也有可能在预定解析的数据中查询出来,查询出来以后封装核心请求参数,封装 ProcesseDate 的所有参数都有可以去封装,封装完毕前面做接收,接收以后再把它粘过来,返回到 data process 里面,process 后面又做了一个 print line。
//8数据结构化
valprocessedData=DataPackage.dataPackage(str=""request Method,requestUrlremoteAddr,httpUserAgent, timeIso8601requestTypetravelType,cookieValue JSESSIONID,cookieVal ueUSERID,queryRequestData,bookRequestData,httpReferrer)
processedData
})
DataProcess.foreach(println)
运行:
程序跑起来了没有报错,
爬虫跑起来:
ProcessedData 数据类型按照 url,服务器的 ip,一步一步的输出,这就是封装完的 ProcessedData,ProcessedData的封装就完成了。停掉爬虫和预处理程序,以上就是封装 ProcessedData 代码实现过程。