数据预处理-数据清洗-读取过滤规则到程序代码|学习笔记

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 快速学习数据预处理-数据清洗-读取过滤规则到程序代码

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第三阶段数据预处理-数据清洗-读取过滤规则到程序代码】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/671/detail/11643


数据预处理-数据清洗-读取过滤规则到程序代码


内容介绍:

一、数据清理

二、实际操作展示


一、数据清理

前面将数据清理的思路介绍了一遍,接下来根据此流程写代码。

目标:数据中含有 html、cssjs、jpg、png 等无用的数据,为使预处理更高效需将无用的数据过滤掉,只保留有效的数据。

1、思路与关键代码:

(1)读取 Kafka 中的数据到预处理程序中。

(2)读取 mysgl 数据库中的数据过滤规则到预处理程序。

(3)将数据过滤规则加载到广播变量,可以让所有的 executer 都能访问到。

(4)在 redis 内添加是否需要更新数据过滤规则的标识,每批次都从 redis 中读取这个标识,判断是否需要更新数据过滤规则。

(5)若不需要更新,那么直接过滤数据。

(6) 若需要更新,那么在数据库中重新读取新的过滤规则到程序中。

①将广播变量清空。

②将新的规则重新加载到广播变量。

③将 redis 内是否需要更新规则的标识改为“false”。

(7)进行数据的过滤。

①定义方法,参数为一条数据和广播变量。

②截取出数据中的 url。

③遍历数据过滤的规则,拿 url 与过滤规则进行匹配。

④若匹配成功表示,要要删除,返回 false。


二、实际操作展示

打开项目,实际上第一步是读取 Kafka 数据到程序当中,createDirectStream 即读取卡 Kafka 里的数据,之后进行处理,每隔两秒钟读过来的一批数据就是 rdd,即第一批数据已经读过来了。

接下来就是读取 MYSQL 数据库当中的过滤规则到数据预处理程序当中,实现第二个功能,读取数据清洗功能。读取数据的过滤规律,过滤规则在哪里读取这个问题一定要搞明白,非常关键。刚刚已经学习过,KafkaVlue 数据刚刚读过来,经过处理后到 kafkaValue.foreachRDD(rdd=>{ 这是数据的一个便利 foreachRDD,从此处开始,一个 foreachRDD 是 streaming 的一个批处理,每两秒钟执行一段代码。而执行哪一段必须要清楚,即两秒钟执行一次,哪一段代码是两秒钟跑几遍,而哪一段代码只跑一遍。即从 kafkaValue.foreachRDD(rdd=>{ 到下面系统监控功能(数据处理的监控功能)这段代码每两秒钟跑一次。而前面的部分,数据预处理程序 setupSsc 行开始到 val kafkaValue=kafkaData.map(_.2)这段代码肯定是只跑一遍。即上面的这段代码只跑一遍,而下面的这两个代码会连续不断的迭代跑。而现在要做的第二件事情,就是读取 mysql 数据库的过滤规则到预处理程序当中。

思考在哪里读取,只读一遍还是在迭代当中一遍一遍的执行。消费数据是迭代运行,每次都跑一遍,每两秒钟运行一次。即 kafkaValue.foreachRDD(rdd=>{ //迭代运行(每2秒运行一次)。现在应该易懂读取数据库的规则程序,到底是写在每次迭代还是写在上面代码中。如果放在每次迭代里,就意味着每隔两秒钟要读取一次马克斯库数据库,此时其压力比较大,现在是两秒钟,若更快马斯克数据库压力会非常大。所以,在程序初始化的时候,先把规则读起来,即下面代码“程序初始化阶段”至 val

kafkaValue=kafkaData.map(_.2)就是程序初始化阶段的时候。初始化时只跑一遍,在 val ssc=new

StreamingContext(scseconds(2))里将数据库的数据过滤规则读取后,刚才讲了有一个需要加载广播到广播变量里面,还需要运行 radis,每隔两秒钟在“消费数据”里去判断 radis,从其中判断它是否需要更新,如果需要更新,再重新读取数据库。在数据初始化阶段读取一次数据库的规则,然后在每一个迭代的时候去 radis 里判断是否需要更新,如果需要更新,再重新读即可。此时 Radis 高效的特点运用上了且压力也小了,这就是全部过程。

//数据预处理的程序

Def setupSsc(sc:SparkContext,

kafkaParams:Map[String,String],topics:Set[String]):StreamingCon

//程序初始化阶段

//3、创建 streaming Context

val ssc=new StreamingContext(scseconds(2))

//读取数据库中的数据过滤规则

//4、读取 kafka 内的数据 ssc,kafkaParam stopics)

//jssc:JavastreamingContext

//接收 kafka 的数据(key,value)

val

kafkaData=KafkaUtils.createDirectstream[String,string,stringDecoder,stringDecoder](ssc,kafkal

//真正的数据

val kafkaValue=kafkaData.map(_.2)

//5、消费数据

kafkaValue.foreachRDD(rdd=>{ //迭代运行(每2秒运行一次)

//1 链路统计功能

LinkCount.linkCount(rdd)

//2 数据清洗功能

//3 数据脱敏功能

//3-1手机号码脱敏/3-2身份证号码脱敏

//4 数据拆分功能

接下来实现一下这个功能,在实例化 streaming Context 后面实现读取功能,在讲义中,即下图“读取数据库中的过滤规则”在“实例化 ssc”之后,“读取数据之前”把数据加载到广播变量中。即//3、创建 streaming Context val ssc=new

StreamingContext(scseconds(2))是实例化读取 Context,在下方“//读取数据库中的数据过滤规则”。需要写一个单独的方法来读取此过滤规则,实际上定义好的这个数据过滤规则,即

“AnalyzeRuleDB.queryFilterRule()”里面已经定义好了方法,直接 CTRLC 复制粘贴。在写代码的时候也可以直接复制过来。

//数据预处理的程序

def setupssc(sc:SparkContext,kafkaParams:Map[stringstring],topics:set[st

/3、创建 istreaming  Context

val ssc=new StreamingContext(sc,seconds(2))

//读取数据库中的过滤规则

val filterRuleList=AnalyzeRuleDB.queryFilterRule()

//将数据添加到广播变量

val broadcastFilterRuleListsc=sc.broadcast(filterRuleList)

//4、读取 kafka 内的数据 ssc,kafkaParams,topics)

val kafkaData=KafkaUtils.createDirectstream[string,string,stringDecoder,strir

//初始化的时候

读取数据库中的过滤规则

val filterRuleList=AnalyzeRuleDB.queryFilterRule()

//将数据添加到广播变量

val broadcastFilterRuleListsc=sc.broadcast(filterRuleList)

image.png

AnalyzeRuleDB 里面有一个 queryFilterRule(),这个方法就是来读取数据库中的过滤规则,而现在 AnalyzeRuleDB.queryFilterRule()报红说明现在还没有这个方法。没有这个 object 就创建即可, CTRLC 复制方法的名称,然后在数据预处理 dataprocess 里面有 businessprocess 业务,右键新建一个 scala object 并粘贴叫做 AnalyzeRuleDB,这个 object 就是用于实现在数据库中读取规则到程序中。而下面是本项目,即反爬虫项目,“本项目所有的数据库读取操作都放在这里”AnalyzeRuleDB 都放在其中,而这里面就包括数据过滤规则。Object 创建好了引入,而这个方法现在还没有,需要创建,让程序自动进行创建,选中然后创建。现在将大括号补全,queryFilterRule()主要实现数据预处理阶段数据过滤规则的读取。这就是queryFilterRule()实现代码的目标所需要实现过程的代码。即实现数据预处理阶段,数据过滤规则的读取就在此代码中实现。

//用于实现在数据库中读取规则到程序中

//本项目所有的数据库读取操作都放在这里

object AnalyzeRuleDB {

//实现数据预处理阶段 数据过滤规则的读取

def queryFilterRule():Unit={

}

此时不报错,接下来写代码,需要读数据库,在提供的方法当中,util 工具类里面有 database,当中有 Query DB,实际上 Query DB 在前面介绍项目时,有两个是需要掌握的,一个是读取配置文件,另外一个就是读取数据库的规则,也就是 Query DB。Object 里 querydate 方法里面就提供了一个参数是 sql,另一个参数是 field,读取哪个数据的 sql 语句以及用谁来进行接收,它会去执行 sql 语句,返回的结果使用 field 字段来进行接收,并且将结果进行返回 arr。这是读取数据的一个方法,此处用该方法来读取数据,此方法需要两个参数,第一个参数是 sql 语句,第二个参数是 field。接下来先把这两个 sql 语句写好。“读取数据过滤规则的 sql”,下一行写 var sql=”select value from itcast_filter_rule”,其中sql=字符串,就是查询语句。现在是 itcast filter_ rule 中的表格(下图),需要将 value 列六个数据全部进行读取,from 的表即 itcast 右键复制一下重名,将名字复制,注意不要写错,此时肯定不会报错。Select value from 这个表,这样最后一句就写好了。

image.png

接下来写接收数据的字段,即 val field=”value”,其中字段名字就是 value ,因为查询 value,所以 getString 在获取的时候也用 value 获取。此时字段有了,sql 有了,接下来就是调用Query DB 中的 queryDate 来读取数据,所以代码书写为 //调用 QueryDB 读取数据,下一行即 QueryDB.queryDate(sql,field),注意读取时要先将其引入进来,()中传 sql 语句和 filed。此时就拿到数据了,然后需要定义一个变量来进行接收数据。起个变量名叫做 filterRudelist,用来接收数据,即完整代码为 Var filterRudeList=QueryDB.queryDate(sql,field)

接收完以后返回过滤数据即可,此时 filterRudeList 中的结果就是过滤数据的规则,也就是上图 value 列中的六个数据。这样数据读取完成了,将其放到最后一行把它返回来,把数据类型补全,点击 val list :ArrayBuffer[String] = filterRuleList 选择 Specify type Settings,进行复制,粘贴到 def queryFilterRule(): 中,此时数据类型补全了。

//用于实现在数据库中读取规到程序中

//本项目所有的数据库读取操作都放在这里

object AnalyzeRuleDB {

//实现数据预处理阶段数据过滤规则的读取

def queryFilterRule():ArrayBuffer[string]={

//读物数据过滤规则的 sgl

val sql="select value from itcast filter_rule"

//接受数据的字段

val field="value"

//调用 OueryDB 读取数据

val filterRuleList=QueryDBqueryData(sqlfield)

//返回过滤数据

filterRuleList

这个方法是按照 AnalyzeRuleDB.queryFilterRule()点击后就是读到的数据,并且返回以后,接下来再来做接收,前面已经读过来了,需要接收,依然是 filterRuleList 进行接收,它的类型还是刚刚的类型,将其复制粘贴,filterRuleList 就是从数据库当中读取出来的过滤规则。现在已经读取完毕,下一步是将过滤规则添加到广播变量,让所有的 executer 能够访问到。接下来写入“将过滤规则添加到广播变量”,需要用到 sc.broadcast(filterRuleList),也就是将前面刚读取过来的数据库过滤规则添加到广播变量,然后前面还需要接收,因为后面需要用,即val broadcastFilterRuleList= sc.broadcast(filterRuleList) ,其中要将 f 改成大写的F。添加进去以后,broadcastFeltrRuledlist 就将这个数据添加到了广播变量里面。加完以后,还需要给它做修饰,在其前面加@volatile ,将这个变量把它设定成不稳定的,这样效率可能会更高一些,比较好一些。以上就是读取数据库的过滤规则全部内容。

//数据预处理的程序

def

setupSsc(sc:SparkContext,kafkaParams:Map[String,String]

,topics:Set[String]):streamingCo

//程序初始化阶段

//3、创建 streaming Context

val ssc=new StreamingContext(scseconds(2))

//读取数据库中的数据过滤规则

val

filterRuleList:ArrayBuffer[string]=AnalyzeRuleDB.queryFilterRule()

//将过滤规则添加到广播变量

@volatile val broadcastFilterRuleList=sc.broadcast(filterRuleList)

//4、读取 kafka 内的数据 ssc,kafkaParams,topics)

valkafkaData=KafkaUtils.createDirectstream[string,string,

stringDecoder,StringDecoder](ssc,kafk

//真正的数据

val kafkaValue=kafkaData.map(_._2)

相关文章
|
8月前
|
数据采集 SQL 分布式计算
在数据清洗过程中,处理大量重复数据通常涉及以下步骤
【4月更文挑战第2天】在数据清洗过程中,处理大量重复数据通常涉及以下步骤
217 2
|
8月前
|
数据采集 数据挖掘 数据处理
进行数据清洗的过程通常包括以下步骤
【4月更文挑战第3天】进行数据清洗的过程通常包括以下步骤
245 3
|
8月前
|
Python
选择和过滤数据DataFrame信息案例解析
该文介绍了如何使用pandas处理DataFrame数据。首先,通过创建字典并调用pd.DataFrame()函数转换为DataFrame。接着展示了使用loc[]选择年龄大于30的记录,iloc[]选取特定位置行,以及query()根据字符串表达式筛选(年龄大于30且城市为北京)数据的方法。
136 1
|
8月前
|
数据采集 数据可视化 数据挖掘
数据清洗有什么方式
数据清洗有什么方式
|
2月前
|
数据采集 机器学习/深度学习 测试技术
数据清洗与过滤中,如何确定哪些数据是高质量的?
数据清洗与过滤中,如何确定哪些数据是高质量的?
|
2月前
|
数据采集 机器学习/深度学习 自然语言处理
数据清洗与过滤
数据清洗与过滤
|
4月前
|
数据采集 机器学习/深度学习 算法
数据清洗过程中,如何确定哪些数据是异常
数据清洗过程中,如何确定哪些数据是异常
|
5月前
|
数据采集 机器学习/深度学习 前端开发
Java爬虫中的数据清洗:去除无效信息的技巧
Java爬虫中的数据清洗:去除无效信息的技巧
|
8月前
|
数据采集 数据挖掘 数据格式
探讨 DataFrame 的高级功能,如数据清洗、转换和分组操作
【5月更文挑战第19天】本文探讨了DataFrame的高级功能,包括数据清洗、转换和分组操作。在数据清洗时,使用fillna处理缺失值,设定阈值或统计方法处理异常值。数据转换涉及标准化和编码,如将分类数据转为数值。分组操作用于按特定列聚合计算,支持多级分组和自定义聚合函数。掌握这些技能能有效处理和分析数据,为决策提供支持。
97 2
|
存储 数据可视化 Python
数据的预处理基础:如何处理缺失值(一)
数据的预处理基础:如何处理缺失值(一)
296 0
数据的预处理基础:如何处理缺失值(一)

相关实验场景

更多