数据预处理—数据清洗—规则更新流程代码|学习笔记

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 快速学习数据预处理—数据清洗—规则更新流程代码

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第三阶段数据预处理—数据清洗—规则更新流程代码】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/671/detail/11644


数据预处理—数据清洗—规则更新流程代码


内容介绍:

一、目标

二、实际操作展示


一、目标

前面已经将数据过滤的规则读取到了程序里,而且也已经将过滤规则添加到广播变量,接下来开始需要在 redis 里面添加是否需要更新数据更新过滤的标识,每批次都从 redis 去读取此标识,判断其是否需要更新。

1.若不需要更新,则直接过滤数据。

2.若需要更新,则需要进行流程,在数据库中重新读取新的过滤规则到程序中。

(1)将广播变量清空。

(2)将新的规则重新加载到广播变量。

(3)将 redis 内是否需要更新规则的标识改成“false”。


二、实际操作展示

在 redis 里添加标识,既然需要操作 redis,首先就需要实例化 redis,在程序初始化阶段,实例化 redis,为后续使用。这里操作方法是输入 val redis = ,在提供的工具包当中有 jedis,其中有 jedisConnectionUtil,这里提供了 getJedisCluster 的方法,将其引入进来,这里就拿到了 redis,然后需要在 redis 里添加标记,去判断其是否需要更新。在程序初始化阶段后面添加,已经完成添加的话,也意味着其读取了一次,判断了一次。如果后面对其进行更新,规则在其中更新或者添加,如何让其快速生效?

//数据预处理的程序

def

setupSsc(sc:SparkContext,kafkaParams:Map[String,String]

,topics:Set[String]):streamingCo

//程序初始化阶段

//3、创建 streaming Context

val ssc=new StreamingContext(scseconds(2))

//读取数据库中的数据过滤规则

val filterRuleList:ArrayBuffer[string]=AnalyzeRuleDB.queryFilterRule()

//将过滤规则添加到广播变量

@volatile val

broadcastFilterRuleList=sc.broadcast(filterRuleList)

实际这就是上文提到的,在循环迭代中每两秒钟运行一次,其代码为:“kafkavalue.foreachRDD(rdd=>{”,在这段代码里进行添加,每添加一次,就去 redis 里读取,那 redis 的压力也是可以接受的,所以添加 redis 的标记,添加完成之后,读取的时候也需要在循环里进行读取,这样不断的循环查看数据。数据处理程序如果需要更新,快速也可以生效。

第一次程序的初始化,即第一次把程序过滤规则读取出来,只需要第一次程序启动时读取,之后仅仅需要判断是否需要更新,如果更新,这里就选择更新程序。如果不更新,则选择进行过滤程序。第一次相当于程序的初始化,而第二次以及后面的每一次,都在迭代里面进行处理。

接下来添加广播变量到程序中,观察到代码中已经存在 redis,直接读取 redis 就可以达成目标。

输入://到 redis 读取是否需要更新的标记。

redis.get.(“needupdatefilterrule”)

代码中需要给到 key,也就是标记是否需要更新的 key,命名为讲义里面的 NeedUpDateFilterRule 含义为:是否需要更新过滤规则。 NeedUpDateFilterRule 读取过来也就是key,而 redis 应该也存在与 key 对应的值。

现在对此值进行添加,选择 redis 右键点击任意选项。image.png

添加 key, NeedUpDateFilterRule 这就是 key 的前缀,在 value 输入 false,默认情况下 false 表示不需要更新,这就是字符串,字符串不需要进行更新。

image.png

然后点击添加,添加完成之后,相当于在 redis 里面就可以读取到。现在通过可以读取到 flash 的字符串,需要接收一下,还是将其命名为 NeedUpDateFilterRule ,这就是刚才读取过来的 false 值。数据取读成功了,接下来就可以判断是否需要更新。若需要更新,在数据库中重新读取新的过滤规则到程序中,然后将广播变量清空,将新的规则重新加载到广播变量。加载完全以后再来将 redis 里的值改成 false。这里存在问题,因为现在就是 false,其实如果数据库现在更新,更新完以后需要立刻跑到 redis 里将其改成 true 然后进行保存,现在其读取过来就是 true。判断其是否需要更新,输入if(!NeedUpDateFilterRule.isempty  &&

NeedUpDateFilterRule.toBoolean)这里如果值是空的肯定不行,所以前提要是其不为空,在前面加!。那如果其不为空的情况下,并且这个值是 true 的这样的字符串,并且它把其转化成 toBoolean 类型,则表示需要更新。

val NeedUpDateFilterRule=redis.get(NeedUpDateFilterRule")

//判断是否需要更新一若数据不为容并目数据转成 Boolean 为 true  表示需要更新

if(!NeedUpDateFilterRule.isEmpty&&NeedUpDateFilterRuletoBoolean){

表示需要更新以后,接下来详细介绍整个数据的清洗工作。(因为这是第一次,后面还有许多地方需要此操作,所以这里详细的进行讲解。)接下来按照流程进行。

若需要更新,那么在数据库当中重新读取新的规则到程序中,即需要把前面读取数据库的规则的方法重新读取一遍,这里选择直接调用就可以完成。重新读取需要将其类型关掉,将类型删掉以后系统提示报错,这里报错的原因是上方 val 是不可变类型,改成 var 可变类型便不进行报错,这就是重新读取数据库规则。读取完成之后,接下来清空广播变量(broadcastFilterRuleList),将广播变量(broadcastFilterRuleList)复制粘贴到下方并添加.unpersist,这就是清空广播变量,清空完之后,接下来重新加载广播变量,将新的规则重新加载到广播变量,var broadcastFilterRuleList= sc.broadcast(FilterRuleList)这是将数据放到广播里的过程,将过滤规则添加到广播变量。这里也出现了报错,原因也是因为 val,将其改成 var并保存,就不会进行报错。这个就是将最新的规则加载到广播变量,加载到广播变量以后,这里使用 true 读取过来进行重新规则,如果不将其更改回去,默认情况下,其下次还是 true 并且后面的所有词都是 true,为了防止这种方法,需要将 redis 内是否需要更新规则的标识改成“false”,将上方的 redis.get(“needupdatefilterrule”),将 get 改成 set,在后面添加上“false”,也就是redis.set(“needupdatefilterrule”,“false”)这就是数据预处理阶段。

如果此程序需要更新规则,还有更新规则的逻辑:

是否需要更新,如果是 true 就更新,更新就重新读取规则,清空广播变量,然后再重新加载广播变量,然后将其改成 false,这样新规则就可以读取过来,以上就是数据清洗阶段,广播更新规则的流程。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
6月前
|
数据采集 数据挖掘 数据处理
进行数据清洗的过程通常包括以下步骤
【4月更文挑战第3天】进行数据清洗的过程通常包括以下步骤
183 3
|
6月前
|
数据采集 SQL 分布式计算
在数据清洗过程中,处理大量重复数据通常涉及以下步骤
【4月更文挑战第2天】在数据清洗过程中,处理大量重复数据通常涉及以下步骤
170 2
|
6月前
|
数据采集 数据可视化 数据挖掘
数据清洗有什么方式
数据清洗有什么方式
|
22小时前
|
存储 机器学习/深度学习 数据可视化
数据集中存在大量的重复值,会对后续的数据分析和处理产生什么影响?
数据集中存在大量重复值可能会对后续的数据分析和处理产生多方面的负面影响
21 10
|
2月前
|
数据采集 机器学习/深度学习 算法
数据清洗过程中,如何确定哪些数据是异常
数据清洗过程中,如何确定哪些数据是异常
|
5月前
|
数据采集 机器学习/深度学习 数据挖掘
利用Python实现高效的数据清洗与预处理流程
本文旨在探讨如何使用Python编程语言及其强大的数据处理库(如pandas、numpy等)来构建一个高效且灵活的数据清洗与预处理流程。与常规的数据清洗方法不同,本文不仅关注于传统的缺失值填充、异常值处理、数据类型转换等步骤,还引入了数据质量评估、数据特征选择以及自动化处理流程的设计等高级主题。通过实际案例和代码演示,本文将为读者提供一套完整的数据清洗与预处理解决方案,助力数据分析师和数据科学家在数据探索阶段更加高效、准确地处理数据。
|
6月前
|
数据采集 JSON 数据挖掘
利用Python实现自动化数据清洗和转换
数据清洗和转换是数据分析的重要步骤。在数据分析工作中,我们常常需要处理不规范、重复、缺失或错误的数据。本文介绍如何使用Python编程语言实现自动化数据清洗和转换,为数据分析工作提供高效的支持。
|
6月前
|
数据采集 Python
数据清洗是数据预处理的重要步骤
数据清洗是数据预处理的重要步骤
77 0
|
数据采集 机器学习/深度学习 算法
②数据预处理之数据清理,数据集成,数据规约,数据变化和离散化
数据预处理之数据清理,数据集成,数据规约,数据变化和离散化
803 0
②数据预处理之数据清理,数据集成,数据规约,数据变化和离散化
|
存储 数据可视化 Python
数据的预处理基础:如何处理缺失值(一)
数据的预处理基础:如何处理缺失值(一)
274 0
数据的预处理基础:如何处理缺失值(一)