开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第三阶段):数据预处理-数据清洗-实现思路】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/671/detail/11642
数据预处理-数据清洗-实现思路
主要内容:
一、目标
二、思路与关键代码
一、目标
前面把数据预处理的目标,需求设计的思路学习了,接下来,如果要实现这个功能,要按照的思路,前面已经大概理了一遍,现在再详细的领一遍,这个思路其实在随堂笔记里面已经理了一遍,在知识点26数据清洗我们的目标是把数据库中含有 HTML ,CSS ,js, JPg,png 这些无用的数据给过滤掉,只保留有效的数据,首先,我们要读取 kafka中的数据到预处理的数据中,数据预处理的阶段。
二、思路与关键代码
这里已经把数据读过来了,rdd 就是数据,
//5、消费数据
KafKaValue.foreachRDD(rdd=>{
接着就是读取 MySQL 数据中的数据过滤规则到预处理程序,将数据过滤规则加载到广播变量,为了让所有的节点都能访问到,因为生产的条件是一个集群,所以要加载到广播变量,第四部要在 reids 内添加是否需要更新数据过滤规则的标识,这个很重要,redis 已经有了,这是前面的数据,
需要在 redis 面前添加一个标识,看他是否需要更新,因为在生产过程中它会发生变化,如果变了,就要加一个标识,每批次都从 redis 中读取这个标识,这里面的批次的周期是两秒中,每两秒钟就执行一下数据的 foreach rdd,
//5、消费数据
KafKaValue.foreachRDD(rdd=>{
这里的代码是每隔两秒钟执行一次,从代码中的57行道91行都是每两秒钟执行一次,
代码如下:
katkaValue.foreachrdd(rdd=>
//1 链路统计功能
LinkCount.linkCount(rdd)
//2 数据清洗功能
//3 数据脱敏功能
//3-1 手机号码脱敏//3-2 身份证号码脱敏
//4 数据拆分功能
//5 数据分类功能
//5-1 单程/往返
//5-2 飞行类型与操作类型
//6 数据的解析
//6-1 查询类数据的解析//6-2 预定类数据的解析
//7 历史爬虫判断
//8 数据结构化
//9 数据推送
//9-1查询类数据的推送1/9-2 预定类数据的推送
//10 系统监控功能(数据预处理的监控功能)
也就是一个批次,一个批次的处理,这里每个批次都要从 redis 中读取这个标识,为了能更快的识别这种变化,就要在每两秒钟之间读取一次,判断是否需要更新过滤规则,如果不需要更新,那么直接过滤数据,如果需要更新,那么在数据库中重新读取新的过滤规则到程序中,然后在清空广播变量,清空完之后,再将新的规则重新加载到广播变量,加载完之后再把新的规则标识改为 false,表示是否需要更新数据过滤的标识,是就是 true 否就是 false 需要更新更新,完了就改为 false,如果不改的话就每次都要更新,每次修改都要更新,那数据库的压力就更大了。接下来要做的事情就是对数据进行过滤,前面数据有了规则,也有了接下来就要写一个方法,拿一条数据放到广播变量里面,然后塞到方法里面,这个方法来进行实际的过滤,规则如下:
它的格式是 URL 的格式,如果直接拿一条数据去拼接就太长了,URL,在访问的路径里面,此时需要把 request 截取出来,这个 request 包含 URL,接下来就是便利数据过滤的规则,拿 URL 与过滤规则进行匹配,如果匹配成功,则表示需要删除返回 false,因为这个规则表示是黑名单,黑名单就表示是需要过滤掉的数据,
如果匹配上了,就是要过滤掉,再去调用 RDD 的 filter 进行数据的过滤,RDD 的 filter 过滤就是通过返回的 true 或者 false 进行的,如果没有匹配成功,就表示不需要删除就返回 TRUE,一种情况返回 TRUE,一种情况返回 false,然后通过调用 RDD 的 filter 进行数据的过滤,这个就是数据清洗的实现思路。