开发者学堂课程【大数据实战项目 - 反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第六阶段:爬虫识别-HDFS数据恢复到 Redis】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/674/detail/11733
爬虫识别-HDFS数据恢复到 Redis
此时已经完成数据写入 HDFS 的代码以及实际效果展示。需要考虑的是数据写入到hdfs 中,如果数据发生丢失,就可以进行恢复。假设数据丢失,恢复也就是黑名单radius数据的恢复过程。从 hdfs 上将数据读出,将其恢复到 Redis 中。代码如下:
/*
恢复Redis黑名单数据,用于防止程序停止而产生的Redis数据丢失
*/
BlackListToRedis.blackListDataToRedis(jedis, sC, sqIContext)
此时假设数据丢失,新的任务已经存在,进行恢复需要在将下一次数据写入到Redis之前进行。如果数据已经丢失,重新运行之后,数据已经在写入,恢复要在写之前,例如在开始遍历黑名单数据之前,进行了去重准备,此时,进行恢复。恢复完成之后,再将数据写入Redis。所以恢复一定要提前操作。在遍历之前,获取到数据,还没有写入,先做恢复,代码如下:
BlackListToRedis.blackListDataToRedis(jedis, SC, sqIContext)
此时已经完成从hdfs恢复到Redis。总体流程如下:
1、 根据 redis 标记(dang) 进行是否恢复数据判断
2、如果要恢复数据,那么获取当前时间和当前时间之前的3600秒的序列化(parquet) 数据
3、找到最贴近当前时间的黑名单数据
4、恢复到redis
在写入数据的时候,Redis 存数据的周期是3600,如果数据丢失,在进行恢复时,hdfs 有时间限制。在恢复时,要在最近的一个小时之内进行数据恢复。如果丢数据,需要在最近一个小时之内进行恢复:从hdfs中找到最近一个小时的数据,也就是最后一次数据,然后找到最贴近当前时间的黑名单数据,恢复到Redis。代码传入了3个参数:jedis, SC, sqIContext,进入方法,该代码就是从hdfs恢复到Redis的过程,代码如下:
def blackListDataToRedis
(
jedis: JedisCluster ,
SC: SparkContext,
sq1Context: SQLContext
):
Unit = {
val dangKey = jedis. get("dang")
if(dangKey == "yes")
{
val arrayBuffer: ArrayBuffer[String] = ArrayBuffer()
//
获取hdfs的FileSystem
val conf = new Configuration()
//配置hdfs的路径
conf . set("fs . defaultFS", "hdfs://192. 168.56.111:8020");
val fs = FileSystem. get( conf)
//时间格式化类,用于按时间存储
val sdf = new SimpleDateFormat( pattern = "yyy/MM/dd/HH" )
//获取当前时间
val sysCurrentTime = System. currentTimeMillis()
//添加hdfs路径到arrayBuffer
for(i<-0to1)
{
//为了避免有效数据丢失,所以从前- -个小时之前进行取数据
val startTime =
sdf . format(sysCurrentTime - 1000 * 60 * 60 * i)
val path = PropertiesUtil . getStringByKey
(
key = "blackListPath",
propName = "HDFSPathConfig . properties"
)
+ startTime
if (fs.exists (new Path(path)))
{
arrayBuffer += path
}
}
首先需要在 Redis 中添加一个 Key 为 dnag 的值, Value 为 Yes。如果是 Yes 字符串,表示需要恢复,如果不是 Yes,就表示不需要恢复。假设需要恢复,首先获取hdfs 的 file System,然后配置 HDFS 的路径,时间格式化类用于按时间存储,接下来获取当前时间添加 HDFS 路径到 array buffer。最后,为了避免有效数据丢失,从前一个数据从前一个小时之前进行读取数据。在conf中,需要设置默认值,然后实例化HDFS文件系统。在写数据时,按照年月日时进行写入,恢复需要找到用来做备份的数据,备份的数据是年月日时,所以在读取时也需要按照年月日时格式来进行。 For循环从0到1,循环中,用当前时间-1000*60*60*I,其中 I表示循环个数,如果是第一次循环,I就是零,也就表示当前时间。所以第一个值就是以当前时间初始化的。在HDFS中读取值,值表示路径,写的时候按照该路径写,所以读取的时候也需要按照该路径来读取。将路径读取之后,加入了Start Time,是根据当前系统时间格式为年月日时的结果,此时是将目录拼接出来。接下来进入了判断,如果目录存在,将 path 传递到 array buffer 中。Array buffer 用于存储路径。此时,第一次循环结束。第二次循环,I=1,表示系统当前时间减一个小时,获取到一个小时前的时间点。第二次循环将时间点加路径获取,然后查看hdfs中是否存在该路径,如果存在,再将路径传递。此时,arraybuffer 中存在的两个路径。之后调用datatoRedis,当中的第一个参数 SQLcontext,是之前传入的, Arraybuffer中存储了两个路径,将其转化成array之后,传入jedis。进入到datatoredis方法中:
def dataToRedis(
sqlContext : SQLContext ,
paths : Array[string],
jedis: JedisCluster
): Unit ={
val parquet1: DataFrame = sq1Context . read. parquet (paths:_*)
//对parquet1按key去重,取最大的keyExpTime.对应的记录,这里使用表join进行去重
parquet1. registerTempTable( tableName = "blacklist")
val grouped = sqlContext. sql(
sqlText = "select max(keyExpTime) keyExpTime, key from blacklist group by key"
)
grouped . show()
grouped . registerTempTable(
tableName = "groupedlist"
)
val distincted=SQLcontext.sql
(
“select a. keyExpTime,
a.key,
a.value from blacklist a join groupedlist+
"b on a.keyExpTime=b. keyExpTime and a. key=b. key"
)
distincted . show( )
distincted.collect().
foreach( s =>
toRedis(jedis ,
s. get(1).tostring,s.get(2). tostring,s.get(0). tostring . toLong))
使用 SQLcontest 读取 packet, packet 文件就是2个路径。 写入数据的格式是packet,读出packet1之后。将packet映射成表,称为blacklist。执行SQL语句,从表中查询最大的时间和Key,以Key进行分组。但此时写入的数据加入了时间戳,意味着每个分组都分开了。所以此时不应该加入时间戳,因为如果加入了时间戳,每个节点的时间戳都不同,所以每个分组都不同。所以需要将时间戳去除。Key取决于业务需求,需要什么就给予什么。假如此时已经删除原来的分组,在2个路径下找出最大的时间,获取到最大的时间和Key之后,使用Key进行分组,也就是距离宕机恢复最近的数据,进行分组之后进行输出,再将进行分组完之后查询出的数据映射成一张表,称为grouplist,然后再次进行查询,查询语句如下:
val distincted=SQLcontext.sql(
“select a. keyExpTime, a.key, a.value from blacklist a join groupedlist+"b on a.keyExpTime=b. keyExpTime and a. key=b. key" )
条件如下:
b on a.keyExpTime=b. keyExpTime and a. key=b. key
A的时间等于B的时间。B是过滤完之后的结果。 B中的时间是最大的。在A表中将时间Key和value同时取出,取时间最大且Key相同的。通过以上操作获取到了HDSF当中与当前时间最近的数据。进行收集之后,调用方法foreach, ToRedis中,将Redis集群传入, S是collect之后for each的结果,时间是0,Key是1,value是2。 Get(1)是Key ,get,(2)是values,get(0)是时间。时间转换完成之后,传给ToRedis。 ToRedis中代码如下:
def toRedis(
jedis: JedisCluster ,
flowId:string,
flowScoreStrategyCode:string,
failureTime:Long
)
{
val time =
(
(failureTime - System. currentTimeMillis()) / 1000
). toInt
if(time>0)
{
if( !jedis.exists(flowId))
{
//time 为期过时间(1 小时)
jedis . setex(flowId, time, flowScoreStrategyCode)
}
}
}
}
首先判断时间是否大于零,然后判断 Key 是否存在,如果 Key 不存在就执行数据,FLOW ID 是 Key,Time 是经过时间与当前系统时间经过计算之后获取到的。第三个参数就是最终的value。调用该方法将HDFS中的Redis数据写入到了Redis中。以上就是代码恢复的过程。