爬虫识别-HDFS数据恢复到 Redis| 学习笔记

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 快速学习爬虫识别-HDFS数据恢复到 Redis

开发者学堂课程【大数据实战项目 - 反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第六阶段爬虫识别-HDFS数据恢复到 Redis】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/674/detail/11733


爬虫识别-HDFS数据恢复到 Redis

此时已经完成数据写入 HDFS 的代码以及实际效果展示。需要考虑的是数据写入到hdfs 中,如果数据发生丢失,就可以进行恢复。假设数据丢失,恢复也就是黑名单radius数据的恢复过程。从 hdfs 上将数据读出,将其恢复到 Redis 中。代码如下:

/*

恢复Redis黑名单数据,用于防止程序停止而产生的Redis数据丢失

*/

BlackListToRedis.blackListDataToRedis(jedis, sC, sqIContext)

此时假设数据丢失,新的任务已经存在,进行恢复需要在将下一次数据写入到Redis之前进行。如果数据已经丢失,重新运行之后,数据已经在写入,恢复要在写之前,例如在开始遍历黑名单数据之前,进行了去重准备,此时,进行恢复。恢复完成之后,再将数据写入Redis。所以恢复一定要提前操作。在遍历之前,获取到数据,还没有写入,先做恢复,代码如下:

BlackListToRedis.blackListDataToRedis(jedis, SC, sqIContext)

此时已经完成从hdfs恢复到Redis。总体流程如下:

1、 根据 redis 标记(dang) 进行是否恢复数据判断

2、如果要恢复数据,那么获取当前时间和当前时间之前的3600秒的序列化(parquet) 数据

3、找到最贴近当前时间的黑名单数据

4、恢复到redis

在写入数据的时候,Redis 存数据的周期是3600,如果数据丢失,在进行恢复时,hdfs 有时间限制。在恢复时,要在最近的一个小时之内进行数据恢复。如果丢数据,需要在最近一个小时之内进行恢复:从hdfs中找到最近一个小时的数据,也就是最后一次数据,然后找到最贴近当前时间的黑名单数据,恢复到Redis。代码传入了3个参数:jedis, SC, sqIContext,进入方法,该代码就是从hdfs恢复到Redis的过程,代码如下:

def blackListDataToRedis

(

jedis: JedisCluster ,

SC: SparkContext,

sq1Context: SQLContext

):

Unit = {

val dangKey = jedis. get("dang")

if(dangKey == "yes")

{

val arrayBuffer: ArrayBuffer[String] = ArrayBuffer()

//获取hdfsFileSystem

val conf = new Configuration()

//配置hdfs的路径

conf . set("fs . defaultFS", "hdfs://192. 168.56.111:8020");

val fs = FileSystem. get( conf)

//时间格式化类,用于按时间存储

val sdf = new SimpleDateFormat( pattern = "yyy/MM/dd/HH" )

//获取当前时间

val sysCurrentTime = System. currentTimeMillis()

//添加hdfs路径到arrayBuffer

for(i<-0to1)

{

//为了避免有效数据丢失,所以从前- -个小时之前进行取数据

val startTime =

sdf . format(sysCurrentTime - 1000 * 60 * 60 * i)

val path = PropertiesUtil . getStringByKey

(

key = "blackListPath",

propName = "HDFSPathConfig . properties"

)

+ startTime

if (fs.exists (new Path(path)))

{

arrayBuffer += path

}

}

首先需要在 Redis 中添加一个 Key dnag 的值, Value Yes。如果是 Yes 字符串,表示需要恢复,如果不是 Yes,就表示不需要恢复。假设需要恢复,首先获取hdfs file System,然后配置 HDFS 的路径,时间格式化类用于按时间存储,接下来获取当前时间添加 HDFS 路径到 array buffer。最后,为了避免有效数据丢失,从前一个数据从前一个小时之前进行读取数据。在conf中,需要设置默认值,然后实例化HDFS文件系统。在写数据时,按照年月日时进行写入,恢复需要找到用来做备份的数据,备份的数据是年月日时,所以在读取时也需要按照年月日时格式来进行。 For循环从01,循环中,用当前时间-1000*60*60*I,其中 I表示循环个数,如果是第一次循环,I就是零,也就表示当前时间。所以第一个值就是以当前时间初始化的。在HDFS中读取值,值表示路径,写的时候按照该路径写,所以读取的时候也需要按照该路径来读取。将路径读取之后,加入了Start Time,是根据当前系统时间格式为年月日时的结果,此时是将目录拼接出来。接下来进入了判断,如果目录存在,将 path 传递到 array buffer 中。Array buffer 用于存储路径。此时,第一次循环结束。第二次循环,I=1,表示系统当前时间减一个小时,获取到一个小时前的时间点。第二次循环将时间点加路径获取,然后查看hdfs中是否存在该路径,如果存在,再将路径传递。此时,arraybuffer 中存在的两个路径。之后调用datatoRedis,当中的第一个参数 SQLcontext,是之前传入的, Arraybuffer中存储了两个路径,将其转化成array之后,传入jedis。进入到datatoredis方法中:

def dataToRedis(

sqlContext : SQLContext ,

paths : Array[string],

jedis: JedisCluster

): Unit ={

val parquet1: DataFrame = sq1Context . read. parquet (paths:_*)

//parquet1key去重,取最大的keyExpTime.对应的记录,这里使用表join进行去重

parquet1. registerTempTable( tableName = "blacklist")

val grouped = sqlContext. sql(

sqlText = "select max(keyExpTime) keyExpTime, key from blacklist group by key"

)

grouped . show()

grouped . registerTempTable(

tableName = "groupedlist"

)

val distincted=SQLcontext.sql

(

select a. keyExpTime,

a.key,

a.value from blacklist a join groupedlist+

"b on a.keyExpTime=b. keyExpTime and a. key=b. key"

)

distincted . show( )

distincted.collect().

foreach( s =>

toRedis(jedis ,

s. get(1).tostring,s.get(2). tostring,s.get(0). tostring . toLong))

使用 SQLcontest 读取 packet packet 文件就是2个路径。 写入数据的格式是packet,读出packet1之后。将packet映射成表,称为blacklist。执行SQL语句,从表中查询最大的时间和Key,以Key进行分组。但此时写入的数据加入了时间戳,意味着每个分组都分开了。所以此时不应该加入时间戳,因为如果加入了时间戳,每个节点的时间戳都不同,所以每个分组都不同。所以需要将时间戳去除。Key取决于业务需求,需要什么就给予什么。假如此时已经删除原来的分组,在2个路径下找出最大的时间,获取到最大的时间和Key之后,使用Key进行分组,也就是距离宕机恢复最近的数据,进行分组之后进行输出,再将进行分组完之后查询出的数据映射成一张表,称为grouplist,然后再次进行查询,查询语句如下:

val distincted=SQLcontext.sql(

select a. keyExpTime, a.key, a.value from blacklist a join groupedlist+"b on a.keyExpTime=b. keyExpTime and a. key=b. key" )

条件如下:

b on a.keyExpTime=b. keyExpTime and a. key=b. key

A的时间等于B的时间。B是过滤完之后的结果。 B中的时间是最大的。在A表中将时间Keyvalue同时取出,取时间最大且Key相同的。通过以上操作获取到了HDSF当中与当前时间最近的数据。进行收集之后,调用方法foreach ToRedis中,将Redis集群传入, Scollect之后for each的结果,时间是0Key1value2 Get1)是Key get,(2)是valuesget0)是时间。时间转换完成之后,传给ToRedis ToRedis中代码如下:

def toRedis(

jedis: JedisCluster ,

flowId:string,

flowScoreStrategyCode:string,

failureTime:Long

)

{

val time =

(

(failureTime - System. currentTimeMillis()) / 1000

). toInt

if(time>0)

{

if( !jedis.exists(flowId))

{

//time 为期过时间(1 小时)

jedis . setex(flowId, time, flowScoreStrategyCode)

}

}

}

}

首先判断时间是否大于零,然后判断 Key 是否存在,如果 Key 不存在就执行数据,FLOW ID KeyTime 是经过时间与当前系统时间经过计算之后获取到的。第三个参数就是最终的value。调用该方法将HDFS中的Redis数据写入到了Redis中。以上就是代码恢复的过程。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
打赏
0
0
0
0
317
分享
相关文章
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
分布式爬虫框架Scrapy-Redis实战指南
Redis系列学习文章分享---第十六篇(Redis原理1篇--Redis数据结构-动态字符串,insert,Dict,ZipList,QuickList,SkipList,RedisObject)
Redis系列学习文章分享---第十六篇(Redis原理1篇--Redis数据结构-动态字符串,insert,Dict,ZipList,QuickList,SkipList,RedisObject)
111 1
redis学习四、可视化操作工具链接 centos redis,付费Redis Desktop Manager和免费Another Redis DeskTop Manager下载、安装
本文介绍了Redis的两个可视化管理工具:付费的Redis Desktop Manager和免费的Another Redis DeskTop Manager,包括它们的下载、安装和使用方法,以及在使用Another Redis DeskTop Manager连接Redis时可能遇到的问题和解决方案。
232 1
redis学习四、可视化操作工具链接 centos redis,付费Redis Desktop Manager和免费Another Redis DeskTop Manager下载、安装
|
10月前
|
Redis系列学习文章分享---第十八篇(Redis原理篇--网络模型,通讯协议,内存回收)
Redis系列学习文章分享---第十八篇(Redis原理篇--网络模型,通讯协议,内存回收)
126 0
Redis系列学习文章分享---第十七篇(Redis原理篇--数据结构,网络模型)
Redis系列学习文章分享---第十七篇(Redis原理篇--数据结构,网络模型)
141 0
Redis系列学习文章分享---第十篇(Redis快速入门之附近商铺+用户签到+UV统计)
Redis系列学习文章分享---第十篇(Redis快速入门之附近商铺+用户签到+UV统计)
76 0
Redis系列学习文章分享---第九篇(Redis快速入门之好友关注--关注和取关 -共同关注 -Feed流实现方案分析 -推送到粉丝收件箱 -滚动分页查询)
Redis系列学习文章分享---第九篇(Redis快速入门之好友关注--关注和取关 -共同关注 -Feed流实现方案分析 -推送到粉丝收件箱 -滚动分页查询)
97 0
Docker学习二(Centos):Docker安装并运行redis(成功运行)
这篇文章介绍了在CentOS系统上使用Docker安装并运行Redis数据库的详细步骤,包括拉取Redis镜像、创建挂载目录、下载配置文件、修改配置以及使用Docker命令运行Redis容器,并检查运行状态和使用Navicat连接Redis。
679 3
shiro学习四:使用springboot整合shiro,正常的企业级后端开发shiro认证鉴权流程。使用redis做token的过滤。md5做密码的加密。
这篇文章介绍了如何使用Spring Boot整合Apache Shiro框架进行后端开发,包括认证和授权流程,并使用Redis存储Token以及MD5加密用户密码。
104 0
shiro学习四:使用springboot整合shiro,正常的企业级后端开发shiro认证鉴权流程。使用redis做token的过滤。md5做密码的加密。
大数据-44 Redis 慢查询日志 监视器 慢查询测试学习
大数据-44 Redis 慢查询日志 监视器 慢查询测试学习
78 3
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等