开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第三阶段):数据预处理—数据清洗—规则更新流程代码】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/671/detail/11644
数据预处理—数据清洗—规则更新流程代码
内容介绍:
一、目标
二、实际操作展示
一、目标
前面已经将数据过滤的规则读取到了程序里,而且也已经将过滤规则添加到广播变量,接下来开始需要在 redis 里面添加是否需要更新数据更新过滤的标识,每批次都从 redis 去读取此标识,判断其是否需要更新。
1.若不需要更新,则直接过滤数据。
2.若需要更新,则需要进行流程,在数据库中重新读取新的过滤规则到程序中。
(1)将广播变量清空。
(2)将新的规则重新加载到广播变量。
(3)将 redis 内是否需要更新规则的标识改成“false”。
二、实际操作展示
在 redis 里添加标识,既然需要操作 redis,首先就需要实例化 redis,在程序初始化阶段,实例化 redis,为后续使用。这里操作方法是输入 val redis = ,在提供的工具包当中有 jedis,其中有 jedisConnectionUtil,这里提供了 getJedisCluster 的方法,将其引入进来,这里就拿到了 redis,然后需要在 redis 里添加标记,去判断其是否需要更新。在程序初始化阶段后面添加,已经完成添加的话,也意味着其读取了一次,判断了一次。如果后面对其进行更新,规则在其中更新或者添加,如何让其快速生效?
//数据预处理的程序
def
setupSsc(sc:SparkContext,kafkaParams:Map[String,String]
,topics:Set[String]):streamingCo
//程序初始化阶段
//3、创建 streaming Context
val ssc=new StreamingContext(scseconds(2))
//读取数据库中的数据过滤规则
val filterRuleList:ArrayBuffer[string]=AnalyzeRuleDB.queryFilterRule()
//将过滤规则添加到广播变量
@volatile val
broadcastFilterRuleList=sc.broadcast(filterRuleList)
实际这就是上文提到的,在循环迭代中每两秒钟运行一次,其代码为:“kafkavalue.foreachRDD(rdd=>{”,在这段代码里进行添加,每添加一次,就去 redis 里读取,那 redis 的压力也是可以接受的,所以添加 redis 的标记,添加完成之后,读取的时候也需要在循环里进行读取,这样不断的循环查看数据。数据处理程序如果需要更新,快速也可以生效。
第一次程序的初始化,即第一次把程序过滤规则读取出来,只需要第一次程序启动时读取,之后仅仅需要判断是否需要更新,如果更新,这里就选择更新程序。如果不更新,则选择进行过滤程序。第一次相当于程序的初始化,而第二次以及后面的每一次,都在迭代里面进行处理。
接下来添加广播变量到程序中,观察到代码中已经存在 redis,直接读取 redis 就可以达成目标。
输入://到 redis 读取是否需要更新的标记。
redis.get.(“needupdatefilterrule”)
代码中需要给到 key,也就是标记是否需要更新的 key,命名为讲义里面的 NeedUpDateFilterRule 含义为:是否需要更新过滤规则。 NeedUpDateFilterRule 读取过来也就是key,而 redis 应该也存在与 key 对应的值。
现在对此值进行添加,选择 redis 右键点击任意选项。
添加 key, NeedUpDateFilterRule 这就是 key 的前缀,在 value 输入 false,默认情况下 false 表示不需要更新,这就是字符串,字符串不需要进行更新。
然后点击添加,添加完成之后,相当于在 redis 里面就可以读取到。现在通过可以读取到 flash 的字符串,需要接收一下,还是将其命名为 NeedUpDateFilterRule ,这就是刚才读取过来的 false 值。数据取读成功了,接下来就可以判断是否需要更新。若需要更新,在数据库中重新读取新的过滤规则到程序中,然后将广播变量清空,将新的规则重新加载到广播变量。加载完全以后再来将 redis 里的值改成 false。这里存在问题,因为现在就是 false,其实如果数据库现在更新,更新完以后需要立刻跑到 redis 里将其改成 true 然后进行保存,现在其读取过来就是 true。判断其是否需要更新,输入if(!NeedUpDateFilterRule.isempty &&
NeedUpDateFilterRule.toBoolean)这里如果值是空的肯定不行,所以前提要是其不为空,在前面加!。那如果其不为空的情况下,并且这个值是 true 的这样的字符串,并且它把其转化成 toBoolean 类型,则表示需要更新。
val NeedUpDateFilterRule=redis.get(NeedUpDateFilterRule")
//判断是否需要更新一若数据不为容并目数据转成 Boolean 为 true 表示需要更新
if(!NeedUpDateFilterRule.isEmpty&&NeedUpDateFilterRuletoBoolean){
表示需要更新以后,接下来详细介绍整个数据的清洗工作。(因为这是第一次,后面还有许多地方需要此操作,所以这里详细的进行讲解。)接下来按照流程进行。
若需要更新,那么在数据库当中重新读取新的规则到程序中,即需要把前面读取数据库的规则的方法重新读取一遍,这里选择直接调用就可以完成。重新读取需要将其类型关掉,将类型删掉以后系统提示报错,这里报错的原因是上方 val 是不可变类型,改成 var 可变类型便不进行报错,这就是重新读取数据库规则。读取完成之后,接下来清空广播变量(broadcastFilterRuleList),将广播变量(broadcastFilterRuleList)复制粘贴到下方并添加.unpersist,这就是清空广播变量,清空完之后,接下来重新加载广播变量,将新的规则重新加载到广播变量,var broadcastFilterRuleList= sc.broadcast(FilterRuleList)这是将数据放到广播里的过程,将过滤规则添加到广播变量。这里也出现了报错,原因也是因为 val,将其改成 var并保存,就不会进行报错。这个就是将最新的规则加载到广播变量,加载到广播变量以后,这里使用 true 读取过来进行重新规则,如果不将其更改回去,默认情况下,其下次还是 true 并且后面的所有词都是 true,为了防止这种方法,需要将 redis 内是否需要更新规则的标识改成“false”,将上方的 redis.get(“needupdatefilterrule”),将 get 改成 set,在后面添加上“false”,也就是redis.set(“needupdatefilterrule”,“false”)这就是数据预处理阶段。
如果此程序需要更新规则,还有更新规则的逻辑:
是否需要更新,如果是 true 就更新,更新就重新读取规则,清空广播变量,然后再重新加载广播变量,然后将其改成 false,这样新规则就可以读取过来,以上就是数据清洗阶段,广播更新规则的流程。