开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第三阶段):数据预处理-数据解析-读取规则及加载到广播变量】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/671/detail/11661
数据预处理-数据解析-读取规则及加载到广播变量
前面已经把数据解析的思路和目的确定过了,本节课学习到底该怎么实现这个流程。
首先,在 kafka 中把数据读出来,再从数据库当中读取解析的规则,尤其是查询性解析规则。因为我们现在要做的是查询业务数据的解析,所以要把查询的规则读取过来。
数据在 mysql 数据库当中,这涉及到了数据的读取流程。
先要在数据库当中把规则读到程序里,然后数据有了规则就可以去解析。把规则读取过来并把它放到广播变量里面,涉及到需要从 Redis 当中添加一个是否更新的标识,如果需要更新就走更新的流程,如果不需要更新就直接跳过。
然后用数据库的规则去进行过滤。过滤出对应的数据解析规则。
也就是从数据库当中把规则读过来,然后放到广播变量中,拿出所有查询的解析规则。在数据库当中。查询型用 Behaviortype 0 表示:
package com.air.antispider.stream.dataprocess.constants
//操作标记类别0-查询,1-预定,-1-其他
object BehaviorTypeEnum extends Enumeration{
type BehaviorTypeEnum = Value
val Query = Value(0,"Query")
val Book = value(1,"Book")
val Other = value(-1, "Other")}
那么 behavior_type 代表等于零的有三种情况.
那么这三种到底该使用哪一种去解析出发地,目的地和起飞时间这些数据?
也就是要过滤出对应的这条数据的解析规则,过滤完了以后就能够确定该用哪个解析,这样就从三个选择变成了一个选择。规则确定了,后面就开始进行解析。
第五步,数据库规则都在 analyzerule 这个表里面。
接下来看一下具体的实现思路。
先看查询性数据解析具体该怎么实现,按照什么顺序来:
查询类数据解析工作的目的就是解析出数据当中的出发地和目的地和起飞时间。具体的做法:
首先解析出 requestType 标记的一个业务场景,也就是国内查询、国际查询,国内预定、国际预定。这些在之前的课程已经解析完了,还要 travelType 也一样。这两个解析完以后,读取数据库内的解析规则到程序里边,这个是第一步。其实就是又读取数据库。第二步就是把这个规则加载到广播变量,第三步是对数据进行解析,一共就这三步。
第一步,读取数据库内的规则到我的程序当中。
将表内所有的查询规则全部都读取到程序里面。这里虽然查询的就这三条数据,但是里面有很多字段都能用得上。但是 book 的用不了。为了方便,要把这些数据一次性拿过来,包括后面的 depCity,arrCity 这些数据,以及前面的这些指标。解析的时候就要用这些指标从三个里面确定出到底用哪一个?
第一步读取数据库,那读取数据库的数据。
读取数据库的规则:
//读取数据分类规则(四种规则,每种单独读取)到预处理程厅
var RuleMaps=AnalyzeRuleDB.queryRuLeNap()
@volatile "var
broadcastRuleMaps=sc.broadcast(RuleMaps)
读取数据库要先把它放在程序初始化阶段:
//数据预处理的程序
def setupSsc(sc: SparkContext,kafkaPar
//程序初始化阶段
读过来以后,第二步需要将规则加载到广播分量里循环判断是否需要更新。
是否需要更新里也有很多步骤:更新流程在每一次循环里面,
val ssc=new StreamingContext(sc,Seconds(2))
这行代码表示每两秒钟迭代一次。每一次迭代都要去判断是否需要更新,具体流程就类似下面的代码,所以在这里面走第二步。
kafkaValue.foreachRDD( rdd=>{//迭代运行(每2秒运行一次)
//到 redis 读取是否需要更新的标记
val
NeedupDateFilterRule=redis.get("NeedupDateFilterRule")
//判断是否需要更新 若数据不为空并且数据转成 BooLean 为 true 表示需要更新if(!NeedupDateFilterRule.isEmpty && NeedupDateFilterRule.toBoolean){
//若需要更新,那么在数据库中重新读取新的过滤规则到程序中
filterRuleList=AnalyzeRuleDB.queryFilterRule()
//将广播变量清空
broadcastFilterRuleList.unpersist(
//将新的规则重新加载到广播变量
broadcastFilterRuleList= sc.broadcast(filterRuleList)
//将redis内是否需要更新规则的标识改为”false”
redis.set(“NeedupDateFilterRule"",false"))}
第三步,做数据解析。解析工作实际分为这三步:第一步,把它放在数据初始化阶段来执行。代码如下:
//读取数据库内的数据解析规则到预处理程序(将表内的所有查询规则数据全部读取到程序内)
//数据解析规则--查询类
var queryRule=AnalyzeRulgDB.queryRule(0)
@volatile var
broadcastQueryRules=sc.broadcast(queryRule)
//数据解析规则--预定类
var bookRule=AnalyzeRuleDB.queryRule(1)
@volatile var broadcastBookRules=sc.broadcast(bookRule)
查询类为0,预定类为1,AnalyzeRulgDB 方法还没有,说明还没有创建这个方法,创建。
代码如下:
这段代码就是来实现数据解析工作的。
能够看出里面很多爆红的,这里需要把它引进来。具体怎么做?
看下图的操作,AnalyzeRule 引入
com.air.antispider.streamcommon.bean.AnalyzeRule? Alt+Enter
同理:connect 引入 java.sql.Connection;Preparedstatement 引
入 java.sql.Preparedstatement;
ResultSet 引入 java.sql.ResultSet;c3p0UTil 引入 com.air.antispider.streamcommon.database.c3p0UTil
这样就不报错了,实现数据解析规则的读取的这个方法需要一个参数。
在 mysql 中解析出去的规则 Behaviortype ,0是查询,1是预定。
看下面这行代码:
val sql: String = "select * from analyzerule where behavior_type =" + behaviorType
数据库当中有 behaviortype 这个表。现在要做查询型的业务需求。那么就把查询的规则查出来,也就是 behavior_type 等于零的查过来,有下面三种情况。
先看 behavior_type,sql 语句是:
val sql: String = "select * from analyzerule where
behavior_type =" + behaviorType
它等于一个值,这个值就是 int 类型的参数。
//读取数据库内的数据解析规则到预处理程序(将表内的所有查询规则数据全部读取到程序内)
//数据解析规则询类
var queryRule=AnalyzeRuleDB.queryRule( behaviorType = 0 )
@volatile var broadcastQueryRules=sc.broadcast(queryRule)
//数揶解析规则--预定类
var bookRule=AnalyzeRuleDB.queryRuLe( behaviorType = 1)
@volatile var broadcastBookRules=sc.broadcast(bookRule)
这里面为什么要传个零进去,因为在这里要做的是查询型的数据,而查询型 behavior 在这里是零,所以就传一个零进去。所以把查询型的数据 behaviortype 等于零就查出来了。
前面不是用 queryDB 里面的 queryData 获取数据?
/ /国内查询规则读取
val nqRuleList=QueryDB.queryData(nqsQL,field)
/ /国际查询规则读取
val iqRuleList= QueryDB.queryData(iqsQL,field)
//国内预定规则读取
val nbRuleList= QueryDB.queryData(nbsQL,field)
//国际预定规则读取
val ibRuleList= QueryDe.queryData( ibsQL,field)
这样给个sql,给个字段就可以获取数据了,为什么要写一个 sql ,然后实现Connection,preparedStatement,ResultSet 为什么要通过这种方式?
因为这个属性,它这里面只能有一个字段。
Object QueryDB {
def queryData(sql: String,field: String):
ArrayBuffer[String] = {
//创建ab,用来封装数据
val arr = new ArrayBuffer[string](
//获取连接
val conn = c3pautil.getconnection[
//执行sqL语句
val ps = conn.preparestatement(sql)
val rs = ps.executeouery()
//封装数据
while (rs.next(){
arr.i=(rs.getstring(field))}
c3p0Util.close(conn,ps,rs)
/ /返回结果
arr
通过上面的代码发现,这个方法只能有一个字段:
arr.i=(rs.getstring(field))} 所以只有一个字段的时候可以用它。但是这里面是把所有的数据都查出来。所以不能用这个方法,只能用最原始的方法连接 preparedStatement 去查,查完以后拿到的结果就是一个一个的字段,然后这就解析完了,接下来就是去连接,然后执行并返回的结果。
代码如下:
conn = c3p0Util.getConnection
ps =conn.prepareStatement(sql)
rs = ps.executeQuery()
这里有一个 Analyzerule ,它是一个对象,来看一下这个类型,
点击进入:
这个是用来封装 analyzerule 的。也就是按这个表里的这些字段,里面的 id,flightType,BehaviorType等等全都是查询类的,下面也都是查询类的,然后把对应数据当中的这个表里的字段。把查过来的数据,获取 id 塞给这个对象的 id 里面。查过来的数据库当中的 flight_type 塞到 flightType 里面 ,behavior_type塞到BehaviorType当中:
val analyzeRule = new AnalyzeRule()
analyzeRule.id = rs.getstring( columnLabel = "id")
analyzeRule.flightType = rs.getstring( columnLabel = "flight_type").toInt
analyzeRule.BehaviorType = rs.getstring( columnLabel = "behavior_type”).toInt
analyzeRule.requestMatchExpression m rs.getstring( columnLabel =“requestNatchExpression")
把所有查出来的数据塞给 analyzeRule ,然后再塞到:analyzeRuleList += analyzeRule 里面。
然后转化成 tolist 进行返回:analyzeRuleList.toList,那么到这里就拿到了查询型的规则。
第二步,将规则加载到广播变量。
//将规则加载到广播变量,并循环判断是否需要更新
@volatile var broadcastQueryRules=sc.broadcast(queryRule)
查过来以后就已经加载到广播变量了,但是还没有启动更新。
//环判断是否需要更新(内含多个步骤,此处省略)
val needUpDataAnalyzeRule=redis.get("NeedupDataAnalyzeRule"")
//如果获取的数据是非空的,并且这个值是 true,那么就进行数据的更新操作(在数据库中重新读取数据加载到Redis )
if( !needupDataAnalyzeRule.isEmpty&&
needuUpDataAnalyzeRule.toBoolean){
//重新读取 mysql 的数据
queryRule=AnalyzeRuleDB.queryRule( behaviorType = 0)
bookRule=AnalyzeRuleDB.queryRule( behaviorType =1)
//清空广播变量中的数据
broadcastQueryRules.unpersist()
broadcastBookRules.unpersist()
//重新载入新的过滤数据
broadcastQueryRules=sc.broadcast(queryRule)
broadcastBookRules-sc.broadcast( bookRule)
//更新定华后,将 Redis中的true 改成 false
redis.set("NeedupDataAnalyzeRule" , "false"")}
在 redis 里面要加一个字段叫做 NeedupDataAnalyzeRule,读过来
之后要么是 true 要么是
False(!NeedupDataAnalyzeRule 不为空并且转化成布尔类型为 true 的时候表示需要重新读取规则,那么要重新读取两个规则:查询的和预定,重新清空两个广播变量。清空以后再重新将读过来的规则加载到广播变量里面,然后把 AnalyeRuleNeedUpData 更新掉,以 NeedupDataAnalyzeRule 为准。
这个有了,更新规就有了。现在要把它添加到广播变量里。这个做完
后要添加到我的 redis 里面了,操作如下图:
这样第二步就做完了,接下来是第三步:解析数据,数据怎么解析?
代码如下:
//对数据进行解析(在多种解断规则的情况下,确定最终使用哪一个规则进行解析)
val queryRequestData =
AnalyzeRequest.analyzeQueryRequest( requestTypeLabel,
requestMethod,contentType,request,requestBody,travelType, broadcastQueryRules.value)
这里面有几个参数显示红色,那是因为这个参数 requestTypeLabel 实际就是前面解析出来的 requestType ,这里把它删掉即可。request 就是前面解析出来的 url ,把它替换成 requesturl 就解决了。以上就是实际的解析过程。