数据预处理—数据清洗—规则更新流程代码|学习笔记

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 快速学习数据预处理—数据清洗—规则更新流程代码

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第三阶段数据预处理—数据清洗—规则更新流程代码】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/671/detail/11644


数据预处理—数据清洗—规则更新流程代码


内容介绍:

一、目标

二、实际操作展示


一、目标

前面已经将数据过滤的规则读取到了程序里,而且也已经将过滤规则添加到广播变量,接下来开始需要在 redis 里面添加是否需要更新数据更新过滤的标识,每批次都从 redis 去读取此标识,判断其是否需要更新。

1.若不需要更新,则直接过滤数据。

2.若需要更新,则需要进行流程,在数据库中重新读取新的过滤规则到程序中。

(1)将广播变量清空。

(2)将新的规则重新加载到广播变量。

(3)将 redis 内是否需要更新规则的标识改成“false”。


二、实际操作展示

在 redis 里添加标识,既然需要操作 redis,首先就需要实例化 redis,在程序初始化阶段,实例化 redis,为后续使用。这里操作方法是输入 val redis = ,在提供的工具包当中有 jedis,其中有 jedisConnectionUtil,这里提供了 getJedisCluster 的方法,将其引入进来,这里就拿到了 redis,然后需要在 redis 里添加标记,去判断其是否需要更新。在程序初始化阶段后面添加,已经完成添加的话,也意味着其读取了一次,判断了一次。如果后面对其进行更新,规则在其中更新或者添加,如何让其快速生效?

//数据预处理的程序

def

setupSsc(sc:SparkContext,kafkaParams:Map[String,String]

,topics:Set[String]):streamingCo

//程序初始化阶段

//3、创建 streaming Context

val ssc=new StreamingContext(scseconds(2))

//读取数据库中的数据过滤规则

val filterRuleList:ArrayBuffer[string]=AnalyzeRuleDB.queryFilterRule()

//将过滤规则添加到广播变量

@volatile val

broadcastFilterRuleList=sc.broadcast(filterRuleList)

实际这就是上文提到的,在循环迭代中每两秒钟运行一次,其代码为:“kafkavalue.foreachRDD(rdd=>{”,在这段代码里进行添加,每添加一次,就去 redis 里读取,那 redis 的压力也是可以接受的,所以添加 redis 的标记,添加完成之后,读取的时候也需要在循环里进行读取,这样不断的循环查看数据。数据处理程序如果需要更新,快速也可以生效。

第一次程序的初始化,即第一次把程序过滤规则读取出来,只需要第一次程序启动时读取,之后仅仅需要判断是否需要更新,如果更新,这里就选择更新程序。如果不更新,则选择进行过滤程序。第一次相当于程序的初始化,而第二次以及后面的每一次,都在迭代里面进行处理。

接下来添加广播变量到程序中,观察到代码中已经存在 redis,直接读取 redis 就可以达成目标。

输入://到 redis 读取是否需要更新的标记。

redis.get.(“needupdatefilterrule”)

代码中需要给到 key,也就是标记是否需要更新的 key,命名为讲义里面的 NeedUpDateFilterRule 含义为:是否需要更新过滤规则。 NeedUpDateFilterRule 读取过来也就是key,而 redis 应该也存在与 key 对应的值。

现在对此值进行添加,选择 redis 右键点击任意选项。image.png

添加 key, NeedUpDateFilterRule 这就是 key 的前缀,在 value 输入 false,默认情况下 false 表示不需要更新,这就是字符串,字符串不需要进行更新。

image.png

然后点击添加,添加完成之后,相当于在 redis 里面就可以读取到。现在通过可以读取到 flash 的字符串,需要接收一下,还是将其命名为 NeedUpDateFilterRule ,这就是刚才读取过来的 false 值。数据取读成功了,接下来就可以判断是否需要更新。若需要更新,在数据库中重新读取新的过滤规则到程序中,然后将广播变量清空,将新的规则重新加载到广播变量。加载完全以后再来将 redis 里的值改成 false。这里存在问题,因为现在就是 false,其实如果数据库现在更新,更新完以后需要立刻跑到 redis 里将其改成 true 然后进行保存,现在其读取过来就是 true。判断其是否需要更新,输入if(!NeedUpDateFilterRule.isempty  &&

NeedUpDateFilterRule.toBoolean)这里如果值是空的肯定不行,所以前提要是其不为空,在前面加!。那如果其不为空的情况下,并且这个值是 true 的这样的字符串,并且它把其转化成 toBoolean 类型,则表示需要更新。

val NeedUpDateFilterRule=redis.get(NeedUpDateFilterRule")

//判断是否需要更新一若数据不为容并目数据转成 Boolean 为 true  表示需要更新

if(!NeedUpDateFilterRule.isEmpty&&NeedUpDateFilterRuletoBoolean){

表示需要更新以后,接下来详细介绍整个数据的清洗工作。(因为这是第一次,后面还有许多地方需要此操作,所以这里详细的进行讲解。)接下来按照流程进行。

若需要更新,那么在数据库当中重新读取新的规则到程序中,即需要把前面读取数据库的规则的方法重新读取一遍,这里选择直接调用就可以完成。重新读取需要将其类型关掉,将类型删掉以后系统提示报错,这里报错的原因是上方 val 是不可变类型,改成 var 可变类型便不进行报错,这就是重新读取数据库规则。读取完成之后,接下来清空广播变量(broadcastFilterRuleList),将广播变量(broadcastFilterRuleList)复制粘贴到下方并添加.unpersist,这就是清空广播变量,清空完之后,接下来重新加载广播变量,将新的规则重新加载到广播变量,var broadcastFilterRuleList= sc.broadcast(FilterRuleList)这是将数据放到广播里的过程,将过滤规则添加到广播变量。这里也出现了报错,原因也是因为 val,将其改成 var并保存,就不会进行报错。这个就是将最新的规则加载到广播变量,加载到广播变量以后,这里使用 true 读取过来进行重新规则,如果不将其更改回去,默认情况下,其下次还是 true 并且后面的所有词都是 true,为了防止这种方法,需要将 redis 内是否需要更新规则的标识改成“false”,将上方的 redis.get(“needupdatefilterrule”),将 get 改成 set,在后面添加上“false”,也就是redis.set(“needupdatefilterrule”,“false”)这就是数据预处理阶段。

如果此程序需要更新规则,还有更新规则的逻辑:

是否需要更新,如果是 true 就更新,更新就重新读取规则,清空广播变量,然后再重新加载广播变量,然后将其改成 false,这样新规则就可以读取过来,以上就是数据清洗阶段,广播更新规则的流程。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
6月前
|
数据采集 数据挖掘 数据处理
进行数据清洗的过程通常包括以下步骤
【4月更文挑战第3天】进行数据清洗的过程通常包括以下步骤
180 3
|
6月前
|
数据采集 SQL 分布式计算
在数据清洗过程中,处理大量重复数据通常涉及以下步骤
【4月更文挑战第2天】在数据清洗过程中,处理大量重复数据通常涉及以下步骤
163 2
|
6月前
|
数据采集 数据可视化 数据挖掘
数据清洗有什么方式
数据清洗有什么方式
|
2月前
|
数据采集 机器学习/深度学习 算法
数据清洗过程中,如何确定哪些数据是异常
数据清洗过程中,如何确定哪些数据是异常
|
5月前
|
数据采集 机器学习/深度学习 数据挖掘
利用Python实现高效的数据清洗与预处理流程
本文旨在探讨如何使用Python编程语言及其强大的数据处理库(如pandas、numpy等)来构建一个高效且灵活的数据清洗与预处理流程。与常规的数据清洗方法不同,本文不仅关注于传统的缺失值填充、异常值处理、数据类型转换等步骤,还引入了数据质量评估、数据特征选择以及自动化处理流程的设计等高级主题。通过实际案例和代码演示,本文将为读者提供一套完整的数据清洗与预处理解决方案,助力数据分析师和数据科学家在数据探索阶段更加高效、准确地处理数据。
|
5月前
|
数据采集 监控 安全
数据预处理几种常见问题
【6月更文挑战第12天】数据处理中常见的问题:数据缺失、数据重复、数据异常和数据样本差异大。对于数据缺失,处理方法包括定位、不处理、删除和填补,其中填补可使用业务知识、其他属性或统计方法。
|
6月前
|
数据采集 JSON 数据挖掘
利用Python实现自动化数据清洗和转换
数据清洗和转换是数据分析的重要步骤。在数据分析工作中,我们常常需要处理不规范、重复、缺失或错误的数据。本文介绍如何使用Python编程语言实现自动化数据清洗和转换,为数据分析工作提供高效的支持。
|
6月前
|
数据采集 Python
数据清洗是数据预处理的重要步骤
数据清洗是数据预处理的重要步骤
71 0
|
数据采集 机器学习/深度学习 算法
②数据预处理之数据清理,数据集成,数据规约,数据变化和离散化
数据预处理之数据清理,数据集成,数据规约,数据变化和离散化
793 0
②数据预处理之数据清理,数据集成,数据规约,数据变化和离散化
|
存储 数据可视化 Python
数据的预处理基础:如何处理缺失值(一)
数据的预处理基础:如何处理缺失值(一)
272 0
数据的预处理基础:如何处理缺失值(一)