开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第四阶段:数据预处理总结﹣代码总结】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/672/detail/11683
数据预处理总结﹣代码总结
将从代码级别回顾,下面来打开爬虫,从头开始看,在预处理阶段首先写了一个 main 方法,整个程序的入口:程序的入口主要就是用来趋动程序的运行,这里面首先在执行的时候设置了日志的级别,也就是运行的时候不要打错日志,设置了任务如果被 Q 掉的时候,执行完最后一个批次再停止。然后又设置了 spark.conf,要注意一定不要忘了设置系统监控功能,如果不设,后边的监控功能是看不到的。接下来是 SparkContent,然后 Kafka的Params 以及 topics,这几个参数传到我的 setupssc 里面,实例出来了读取数据,然后返回了一个 ssc,SparkStreamingContext ,开启任务和执行循环。在我的 setupssc 里面,做的是数据预处理的程序,预处理程序当中,做的第一个事情是:实例了streaming Context ,每两秒钟迭代一次,这个要根据企业的需求,企业里面需要多长时间,我们就用多长时间,比如说企业当中需要五秒钟跑一次,那我就设置成五秒钟。
然后下面是初始化系列所要做的事情,去 Kafka 当中读取数据,也就是流程中的3-1,
读取数据到程序里面,读过来以后,这里面的数据是分为 key 和 value 的,只要 value 只要数据就行,最终的数据拿过来以后,去迭代遍历,这里面要强调的一点是,数据 foreachRDD,从开始直到结束,这里面所有的代码都是两秒钟执行一遍,所有数据预处理的操作都包括在这个里面,都包括在 foreachRDD 里边,下边再往后是读取这个程序,读取是否需要更新。
1、链路统计
数据都过来了,然后遍历,链路统计功能模块,也就是流程当中的第一个模块链路统计,链路统计 RDD 传过来之后,做了两个工作,一个是计算每个服务器本批次数据访问次数,相当于每一行闯过来一个服务器,传过来一个,这里面要统计服务器传过来的次数,实验思路类似于 word count,把IP截取出来,使用#cs#拆分,拆分完以后把我的IP找到,找到以后我把 IP 和1作为一个输出,然后调用 reduceByKey,做一个++的操作,就算出来的总数,这就是每个服务器访问多少次。
第二个是当前活跃用户联系数的计算,在数据当中,已经采集好的数据,直接截取就行,他的位置是第11个角标,也就是第12个数据,将我的IP同样也截取出来,然后把它输出,输出以后,调用 Reducebykey(k,v),把v数出来,这里面输出的就是很多 key value list 当中相通 key 的数出来,以上就是每一个节点,当前活跃用户数的一个计算。这两个计算出来以后,后面是要把它写到Redis里面,首先要在它俩不为空的情况下转化成,两个小的 map,然后为了把数据写到 Redis 里面,形成一个整个的数据,将两个小的 map,封装成一个大的 map,封装的时候 map 的k不是随便写的,需要和前端工程师做一个预订,我们按照这个写,他按照这个读,数据这就准备好了,读取链路统计的前缀,然后加一个时间戳,然后作为key,这个数据在 Redis 里面读取的存储周期读取出来以后,时间就有了,k有了,时间也有了,前面数据也有了,数据把 map 转化成 jsonVO,写到 Redis 里面,这个就是整个链路统计的实验过程,只要把它写到 Redis 里面,在前边数据管理就能看到效果。
2.数据的清洗,把过滤掉的规则读过来,读过来以后,数据有了,规则有了,这个就是拿规则去匹配数据,读取数据库规则,程序在初始化阶段先读过一次,把过滤规则读过来,写这个 field =" value ,用 QueryDB . queryData 读取数据,然后返回数据,返回数据然后就拿到了过滤规则,过滤规则添加到广播变量,因为运行的时候有多个集群,集群的话会有多个节点,要让多个节点里面的多个 filter,都能访问到数据,所以要把它添加到广播变量当中,而这个是第一次录取,所以要把它放到程序刚刚启动的时候,相当于程序初始化的时候,就把它读过来,这个是第一次读取,后面如果发生更新,因为两秒钟迭代一次,如果规则发生变化的话,为了让他更快的生效,所以每次迭代的时候,都需要去判断它的规则是否发生了变化,如果发生了变化,就要重新读取,
所以,这里面没有直接去读取,那样会被造成比较大的压力,所以在Redis里面添加了一个标记,如果这个标记是TRUE,那么表示需要更新,就需要重新读取规则,然后清空广播变量,将新的规则添加到广播变量里面,然后把标记改成 FALSE,然后我的新规则就有了,然后后边再用,在程序每个程序迭代的最前面来判断你是否是,如果需要更新,我就从重新读取,把新的读完,如果不需要更新,我就跳过,以上就是规则更新。
规则更新完之后,去调用我的 filter 的方法,filter url 把我数据当中的url截取出来,和规则去匹配,过滤掉的规则匹配,如果规则匹配上了数据,也就是匹配上的数据,那么表示需要删除,如果没有匹配上,那就不需要删除返回TRUE我们调用的是 file 的方法返回TRUE匹配上的就选择 FALSE,也就是删除过滤掉了。这就是数据清洗的实验思路。
3.数据的脱敏,脱敏比较简单,经过清洗以后的数据,再强调一点,清洗后的数据直接做的是,清洗后的数据直接做了一个 map,拿到的是一条数据,脱敏拆分还有打标签解析以及历史爬虫判断,都是针对一条数据操作,所以从第三到第八步,都是针对的一条数据,所以都放在过滤掉的数据方法 map 里边。
先拿到一条数据去脱敏,脱敏首先进行手机号码的脱敏,手机号码脱敏先拿到的是手机号码的正则表达式正则去匹配数据,匹配 massage,接受过来的数据会拿到有可能是手机号,也有可能不是手机号的数据,去遍历每一个。判断是不是手机号有以下两种情况:
手植号码前一个位置不是数字,并且手机号码是一条数据中的最后一个数据,那么表示这个一定是手机号,若手机号的最后一位后面的角标大于数据的总长度那么表示这个手机号是数据的最后一位。
手机号码前一个位置不是数字,并且后一位也不是数字,那么表示这个一定是手机号。确定出是手机号后,将手机号加密,替换原始的数据
身份证号脱敏和手机号都是一样的思路,身份证脱敏的时候要从要从手机脱敏后的数据去进行脱敏,这样才能将两个数据同时脱敏。
4.数据拆分,原本是使用#cs#进行的拼接,这里面把他拆开拆完后返回一个 Qpout,
包括了所有的数据:equestUrl , requestMethod , contentType , requestBodyphttpReferrer , remoteAddr , httpUserAgent ,timeIso8601, serverAddr , cookiesSt
5.打标签,这里面打了两个,进行国内查询,国际查询,国内预订,国际预订的判断。首先数据再传递过来的时候,也就是上面解析出来的数据是没有任何标记,表示究竟是哪一个的。用 URL 判断,这里边的数据已经明确指出,国内查询,国际查询,国内预订,国际预订的正则表达式是什么,完全用 URL 来匹配,匹配这四种业务场景的规则,匹配上哪个规则就是哪个规则。所以这个配置配置在数据库里面,所以又涉及到了一个正则表达式的读取,四种规则我们分为四种情况来读取规则,写好思路,然后把四种情况的规则分别读取,数据读取过来以后,分装到一个map里面,一次性返回,返回的结果当中,就有了四个业务场景的数据,就把它添加到广播变量,又涉及到了什么时候更新。
规则有了然后将 URL,传到这个方法里面,先判断取出每一个规则然后用 URL 去匹配每一个规则。四种情况分别循环匹配上哪个就返回哪个对应的结构业务逻辑。国内查询,国际查询,国内预订,国际预订,或者其他。这里就把国内查询国际查询的标签打完了。
第2个是单程还是往返,是根据日期类型出现的次数,如果出现一个就是单程,两个就是往返,没有就是其他,里面数据如下:
//t=R&
//c1=CAN&
//c2=PEK&
//d1=2019-05-16&
//d2=2019-05-16&
//at=1&
//ct=0&
It=0
先用?切分,再用&切分,再用=切分,分完以后的第二个值,去和日期类型正则表达式去匹配,匹配上了用计数器,记录日期类型出现的次数,出现一次加一个,一出现两次,再加一个,一变成二,然后根据数量去进行判断,零就是其他,一是单程,二就是往返。
6.数据的解析,解析的工作业务逻辑比较复杂,解析工作最终的目的就是解析数据目的地起飞时间,成人人数,儿童人数数据,从原始数据中,也就是说,从解析出的这些数据当中,是没有办法直接拿出来的,因为这里面不同的业务场景,国内查询用这个来解决国际查询,
用这些来不同的业务场景,解析的规则是不一样的,所以我们这里面要做解析,解析有多种选择的情况。经过一系列的判断,从多个选择情况,就像国内查询的话会有三个,经过下图这个代码,就能知道用哪一个
从三个变成了一个,我就知道该用哪一个去解析,后面的代码来帮我们解析数据,解析出来就有了什么结果。查询结果出来。预订的也需要解析出来,逻辑是一模一样的。
7.历史爬虫判断在数据库中,已经明确指出历史出现过的爬虫,把他读过来之后放到广播变量,然后再判断是否需要更新,然后再回到判断当中,历史的爬虫就有了,当前的 IP 也就有了,如果我遍历历史的IP和我的数据去匹配,如果任何一个历史的 IP 和我当前的IP相等,那么就表示,任何一个历史的 IP 在当前的数据出现过,出现过我就可以返回一个 TRUE,没有出现过就默认是 FALSE,表示没有出现过,这样就拿到了一个结果。
8.数据的结构化
前面经过拆分后的这么多数据,以及后面的国内查询,国际查询,国内预定,国际预订以及单程还是往返解析,查询的解析,预订的解析以及历史爬虫判断,这些数据都是零散的。要把它封装成 processed.Date,调用里面有一个toKafkastring 方法,调用里面有一个方法,去封装的时候最终会形成一个数据,形成一个用#CS#拼接的数据,但是这里面有一个参数是没有的。CorreRequestParams 核心请求参数是前面没有的。需要用目的地,出发地和起飞时间来进行封装,而出发地目的地前边已经通过数据解析解析出来了,根据解析出来的出发地,目的地和起飞的时间,解析出来以后,用这三个数据封装核心请求参数,也就是 CorreRequestParams。这个封装之后,我封装 processed.Date 所有数据就都有了,封装完之后再进行返回,返回之后再去调用它的 dr,toKafkastring 就能拿到这个数据,但是在这里返回的时候还不能返回 toKafkastring,因为后面还需要过滤,以上就是数据的结构化。
数据的推送,推送的工作,查询的要推送到查询的里边,预订的要推送到预订的里边,在查询的当中,里面既有查询的,又有预订的,需要把requestType . behaviorType == BehaviorTypeEnum . puery ).
过滤出来,然后再把它调用toKafkastring,拿到所有的查询的数据转化成的string然后接下来调用Kafka的API,生产者,数据的载体,发送数据关闭程序,然后这里边创造生产者需要一些参数,把他打入到查询的topic里边当中。
预订的数据推送也是一样的,过滤出只是book的预订的的数据,然后转化成 toKafkastring,数据类型,把它推送到预订的 topic 里面,这里边推送也是Kafka生产者,数据的载体,数据的发送,列生产者按照这个程序来走,我们把数据的推送也就推送完了。
最后一个是性能的监控,首先第一步,开启在最前面,在最前面要开启任务监控功能,第二步来到程序当中,开启之后,通过 url,可以拿到 json 数据,以这个为节点的数据,这个结点的数据拿到以后,拼接结束时间的路径和开始时间的路径,拼接这两个时间的路径需要用到任务的IP任务名称,拼接昨晚在拼接名称,在拼接结束的这个路径,结束的路径就有了,开始时间前面是一样的,后面变成了 start time,两个路径就有了。
然后根据这两个路径去获取开始时间和结束时间,两个时间拿到以后,我们做一个时间差就得到了运行时间,运行时间这样就有了,然后再拿到数据的总量,数据总量除以运行时间就是运行的速度,然后再根据前端跟工程师做好的约定,要以下约数据作为 key
Val Maps = Map(
“costTime: -> runTime.toString,
“serverCountMap” -> serverCount,
“appLicationId” -> appId.toString,
“countPerMillis” ->runSpeed.toString,
“appLicationUniqueName” -> appName.toString,
“endTime” -> endTimeS,
“sourceCount” -> dataCount.toString
数据作为key添加上以后,再把数据写入到瑞丽斯里面,监控功能就结束了,只要把监控的功能写到 Redis 里面,在前端就能看到实时流量转发,运行情况以及各链路流量转发情况,写进去数据就能自动展示出来。
本次数据预处理工作的全部内容结束。