开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第三阶段):数据预处理-数据清洗-读取过滤规则到程序代码】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/671/detail/11643
数据预处理-数据清洗-读取过滤规则到程序代码
内容介绍:
一、数据清理
二、实际操作展示
一、数据清理
前面将数据清理的思路介绍了一遍,接下来根据此流程写代码。
目标:数据中含有 html、cssjs、jpg、png 等无用的数据,为使预处理更高效需将无用的数据过滤掉,只保留有效的数据。
1、思路与关键代码:
(1)读取 Kafka 中的数据到预处理程序中。
(2)读取 mysgl 数据库中的数据过滤规则到预处理程序。
(3)将数据过滤规则加载到广播变量,可以让所有的 executer 都能访问到。
(4)在 redis 内添加是否需要更新数据过滤规则的标识,每批次都从 redis 中读取这个标识,判断是否需要更新数据过滤规则。
(5)若不需要更新,那么直接过滤数据。
(6) 若需要更新,那么在数据库中重新读取新的过滤规则到程序中。
①将广播变量清空。
②将新的规则重新加载到广播变量。
③将 redis 内是否需要更新规则的标识改为“false”。
(7)进行数据的过滤。
①定义方法,参数为一条数据和广播变量。
②截取出数据中的 url。
③遍历数据过滤的规则,拿 url 与过滤规则进行匹配。
④若匹配成功表示,要要删除,返回 false。
二、实际操作展示
打开项目,实际上第一步是读取 Kafka 数据到程序当中,createDirectStream 即读取卡 Kafka 里的数据,之后进行处理,每隔两秒钟读过来的一批数据就是 rdd,即第一批数据已经读过来了。
接下来就是读取 MYSQL 数据库当中的过滤规则到数据预处理程序当中,实现第二个功能,读取数据清洗功能。读取数据的过滤规律,过滤规则在哪里读取这个问题一定要搞明白,非常关键。刚刚已经学习过,KafkaVlue 数据刚刚读过来,经过处理后到 kafkaValue.foreachRDD(rdd=>{ 这是数据的一个便利 foreachRDD,从此处开始,一个 foreachRDD 是 streaming 的一个批处理,每两秒钟执行一段代码。而执行哪一段必须要清楚,即两秒钟执行一次,哪一段代码是两秒钟跑几遍,而哪一段代码只跑一遍。即从 kafkaValue.foreachRDD(rdd=>{ 到下面系统监控功能(数据处理的监控功能)这段代码每两秒钟跑一次。而前面的部分,数据预处理程序 setupSsc 行开始到 val kafkaValue=kafkaData.map(_.2)这段代码肯定是只跑一遍。即上面的这段代码只跑一遍,而下面的这两个代码会连续不断的迭代跑。而现在要做的第二件事情,就是读取 mysql 数据库的过滤规则到预处理程序当中。
思考在哪里读取,只读一遍还是在迭代当中一遍一遍的执行。消费数据是迭代运行,每次都跑一遍,每两秒钟运行一次。即 kafkaValue.foreachRDD(rdd=>{ //迭代运行(每2秒运行一次)。现在应该易懂读取数据库的规则程序,到底是写在每次迭代还是写在上面代码中。如果放在每次迭代里,就意味着每隔两秒钟要读取一次马克斯库数据库,此时其压力比较大,现在是两秒钟,若更快马斯克数据库压力会非常大。所以,在程序初始化的时候,先把规则读起来,即下面代码“程序初始化阶段”至 val
kafkaValue=kafkaData.map(_.2)就是程序初始化阶段的时候。初始化时只跑一遍,在 val ssc=new
StreamingContext(scseconds(2))里将数据库的数据过滤规则读取后,刚才讲了有一个需要加载广播到广播变量里面,还需要运行 radis,每隔两秒钟在“消费数据”里去判断 radis,从其中判断它是否需要更新,如果需要更新,再重新读取数据库。在数据初始化阶段读取一次数据库的规则,然后在每一个迭代的时候去 radis 里判断是否需要更新,如果需要更新,再重新读即可。此时 Radis 高效的特点运用上了且压力也小了,这就是全部过程。
//数据预处理的程序
Def setupSsc(sc:SparkContext,
kafkaParams:Map[String,String],topics:Set[String]):StreamingCon
//程序初始化阶段
//3、创建 streaming Context
val ssc=new StreamingContext(scseconds(2))
//读取数据库中的数据过滤规则
//4、读取 kafka 内的数据 ssc,kafkaParam stopics)
//jssc:JavastreamingContext
//接收 kafka 的数据(key,value)
val
kafkaData=KafkaUtils.createDirectstream[String,string,stringDecoder,stringDecoder](ssc,kafkal
//真正的数据
val kafkaValue=kafkaData.map(_.2)
//5、消费数据
kafkaValue.foreachRDD(rdd=>{ //迭代运行(每2秒运行一次)
//1 链路统计功能
LinkCount.linkCount(rdd)
//2 数据清洗功能
//3 数据脱敏功能
//3-1手机号码脱敏/3-2身份证号码脱敏
//4 数据拆分功能
接下来实现一下这个功能,在实例化 streaming Context 后面实现读取功能,在讲义中,即下图“读取数据库中的过滤规则”在“实例化 ssc”之后,“读取数据之前”把数据加载到广播变量中。即//3、创建 streaming Context val ssc=new
StreamingContext(scseconds(2))是实例化读取 Context,在下方“//读取数据库中的数据过滤规则”。需要写一个单独的方法来读取此过滤规则,实际上定义好的这个数据过滤规则,即
“AnalyzeRuleDB.queryFilterRule()”里面已经定义好了方法,直接 CTRLC 复制粘贴。在写代码的时候也可以直接复制过来。
//数据预处理的程序
def setupssc(sc:SparkContext,kafkaParams:Map[stringstring],topics:set[st
/3、创建 istreaming Context
val ssc=new StreamingContext(sc,seconds(2))
//读取数据库中的过滤规则
val filterRuleList=AnalyzeRuleDB.queryFilterRule()
//将数据添加到广播变量
val broadcastFilterRuleListsc=sc.broadcast(filterRuleList)
//4、读取 kafka 内的数据 ssc,kafkaParams,topics)
val kafkaData=KafkaUtils.createDirectstream[string,string,stringDecoder,strir
//初始化的时候
读取数据库中的过滤规则
val filterRuleList=AnalyzeRuleDB.queryFilterRule()
//将数据添加到广播变量
val broadcastFilterRuleListsc=sc.broadcast(filterRuleList)
AnalyzeRuleDB 里面有一个 queryFilterRule(),这个方法就是来读取数据库中的过滤规则,而现在 AnalyzeRuleDB.queryFilterRule()报红说明现在还没有这个方法。没有这个 object 就创建即可, CTRLC 复制方法的名称,然后在数据预处理 dataprocess 里面有 businessprocess 业务,右键新建一个 scala object 并粘贴叫做 AnalyzeRuleDB,这个 object 就是用于实现在数据库中读取规则到程序中。而下面是本项目,即反爬虫项目,“本项目所有的数据库读取操作都放在这里”AnalyzeRuleDB 都放在其中,而这里面就包括数据过滤规则。Object 创建好了引入,而这个方法现在还没有,需要创建,让程序自动进行创建,选中然后创建。现在将大括号补全,queryFilterRule()主要实现数据预处理阶段数据过滤规则的读取。这就是queryFilterRule()实现代码的目标所需要实现过程的代码。即实现数据预处理阶段,数据过滤规则的读取就在此代码中实现。
//用于实现在数据库中读取规则到程序中
//本项目所有的数据库读取操作都放在这里
object AnalyzeRuleDB {
//实现数据预处理阶段 数据过滤规则的读取
def queryFilterRule():Unit={
}
此时不报错,接下来写代码,需要读数据库,在提供的方法当中,util 工具类里面有 database,当中有 Query DB,实际上 Query DB 在前面介绍项目时,有两个是需要掌握的,一个是读取配置文件,另外一个就是读取数据库的规则,也就是 Query DB。Object 里 querydate 方法里面就提供了一个参数是 sql,另一个参数是 field,读取哪个数据的 sql 语句以及用谁来进行接收,它会去执行 sql 语句,返回的结果使用 field 字段来进行接收,并且将结果进行返回 arr。这是读取数据的一个方法,此处用该方法来读取数据,此方法需要两个参数,第一个参数是 sql 语句,第二个参数是 field。接下来先把这两个 sql 语句写好。“读取数据过滤规则的 sql”,下一行写 var sql=”select value from itcast_filter_rule”,其中sql=字符串,就是查询语句。现在是 itcast filter_ rule 中的表格(下图),需要将 value 列六个数据全部进行读取,from 的表即 itcast 右键复制一下重名,将名字复制,注意不要写错,此时肯定不会报错。Select value from 这个表,这样最后一句就写好了。
接下来写接收数据的字段,即 val field=”value”,其中字段名字就是 value ,因为查询 value,所以 getString 在获取的时候也用 value 获取。此时字段有了,sql 有了,接下来就是调用Query DB 中的 queryDate 来读取数据,所以代码书写为 //调用 QueryDB 读取数据,下一行即 QueryDB.queryDate(sql,field),注意读取时要先将其引入进来,()中传 sql 语句和 filed。此时就拿到数据了,然后需要定义一个变量来进行接收数据。起个变量名叫做 filterRudelist,用来接收数据,即完整代码为 Var filterRudeList=QueryDB.queryDate(sql,field)
接收完以后返回过滤数据即可,此时 filterRudeList 中的结果就是过滤数据的规则,也就是上图 value 列中的六个数据。这样数据读取完成了,将其放到最后一行把它返回来,把数据类型补全,点击 val list :ArrayBuffer[String] = filterRuleList 选择 Specify type Settings,进行复制,粘贴到 def queryFilterRule(): 中,此时数据类型补全了。
//用于实现在数据库中读取规到程序中
//本项目所有的数据库读取操作都放在这里
object AnalyzeRuleDB {
//实现数据预处理阶段数据过滤规则的读取
def queryFilterRule():ArrayBuffer[string]={
//读物数据过滤规则的 sgl
val sql="select value from itcast filter_rule"
//接受数据的字段
val field="value"
//调用 OueryDB 读取数据
val filterRuleList=QueryDBqueryData(sqlfield)
//返回过滤数据
filterRuleList
这个方法是按照 AnalyzeRuleDB.queryFilterRule()点击后就是读到的数据,并且返回以后,接下来再来做接收,前面已经读过来了,需要接收,依然是 filterRuleList 进行接收,它的类型还是刚刚的类型,将其复制粘贴,filterRuleList 就是从数据库当中读取出来的过滤规则。现在已经读取完毕,下一步是将过滤规则添加到广播变量,让所有的 executer 能够访问到。接下来写入“将过滤规则添加到广播变量”,需要用到 sc.broadcast(filterRuleList),也就是将前面刚读取过来的数据库过滤规则添加到广播变量,然后前面还需要接收,因为后面需要用,即val broadcastFilterRuleList= sc.broadcast(filterRuleList) ,其中要将 f 改成大写的F。添加进去以后,broadcastFeltrRuledlist 就将这个数据添加到了广播变量里面。加完以后,还需要给它做修饰,在其前面加@volatile ,将这个变量把它设定成不稳定的,这样效率可能会更高一些,比较好一些。以上就是读取数据库的过滤规则全部内容。
//数据预处理的程序
def
setupSsc(sc:SparkContext,kafkaParams:Map[String,String]
,topics:Set[String]):streamingCo
//程序初始化阶段
//3、创建 streaming Context
val ssc=new StreamingContext(scseconds(2))
//读取数据库中的数据过滤规则
val
filterRuleList:ArrayBuffer[string]=AnalyzeRuleDB.queryFilterRule()
//将过滤规则添加到广播变量
@volatile val broadcastFilterRuleList=sc.broadcast(filterRuleList)
//4、读取 kafka 内的数据 ssc,kafkaParams,topics)
valkafkaData=KafkaUtils.createDirectstream[string,string,
stringDecoder,StringDecoder](ssc,kafk
//真正的数据
val kafkaValue=kafkaData.map(_._2)