开发者学堂课程【大数据实战项目 - 反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第六阶段:爬虫识别- Redis 爬虫数据备份-代码解读】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/674/detail/11731
爬虫识别- Redis 爬虫数据备份-代码解读
内容介绍:
一、APP介绍
二、总结
一、APP 介绍
已经完成备份 Redis 数据到 HDFS,采集数据,封装数据,调用方法。调用存储方法,代码如下:
BlackListToHDFS . saveAntiBlackList
(sc. parallelize(antiBlackListDFR), sqlContext))
第1个参数是之前封装的数据,第2个参数是SQLcontext。方法内部如下:
def saveAntiBlacklist(
antiBlackListRDD: RDD[ Row], sqlContext: SQLContext) :
Unit ={
//
构建DataFrame
val tableCols = List("keyExpTime", "key", "value")
val schemastring=tableCols . mkString("-")
val schema = structType(
schemaString .
split( regex=
“-")
.map(fieldName => structField(fieldName, stringType, true)))
val dataFrame: DataFrame =
sqlContext . createDataFr ame ( antiBlackListRDD, schema)
val path: String = PropertiesUtil. getstringByKey(
key = "blackListPath" ,
propName = "HDFSPathconfig. properties")
HdfssaveUtil. save( dataFrame, time = nu1l, path)
def saveAntiOcpBlackList(): Unit ={
}
}
方法中第1个参数是之前封装好的时间、Key 和 value,此时将其转化为 Rdd 类型。方法中首先实例了 list。List 的值是时间、Key、Value。将 list 用横线进行拼接为一个字符串之后,用横线进行拆分,获取到每一个结果之后,调用map获取到每一个结果,也就是 filedName,第一次是 KeyExpTime,第二次是 Key,第三次是value。获取到时间之后,调用了struct field获取到了文件名称,也就是KeyExpTime。调用了struct field+文件名称+类型,封装 schema,使用的是spark SQL,所以需要有一个表,表结构就是 schema,第一次是 KeyExpTime,第二次是Key,第三次是value。也就是将3个参数转化为了一个schema。获取到schema、数据之后,数据的结构是时间、Key和value,与之前所写代码一一对应。将schema和数据转化为了data Fream,此时获取到了表结构。之后是配置路径,配置路径就是读取配置文件中的值,配置文件就是HDFSpathconfig。配置文件如下:
# HDFS数据路径
#黑名单提交到HDFS的路径
blackListPath=hdfs://192.168.100.100: 8020/csair/data/rule-black-list/
#blackListPath=hdfs://10.108.151.101 :8020/csair/data/rule-black-list/
#AntiCalculateResult
数据提交到HDFS的路径
#anticalculateResultPath=hdfs://192.168.30.17:8020/csair/data/rule - computed/
#antiCalculateResultPath=hdfs://10.108.151.101:8020/csair/data/rule - computed/
配置文件中是集群当中的配置路径。获取到路径之后,将路径和封装好的datafream以及时间传给了hdfs中的save方法。 Save方法如下:
object HdfsSaveUtil {
/**
*
基于时间生成文件夹保存DataFrame数据到HDFS
* @param dataFrame
数据
*@param time
数据时间
* @param paths
保存路径
*/
def save( dataFrame :DataFrame, time : Time ,paths:string): Unit ={
val date
= if(time == null){
new SimpleDateFormat( pattern = "yyyy/MM/ dd/HH") . format(System. currentTimeMillis())
}else{
new SimpleDateFormat( pattern = "yyyy/MM/dd/HH" ). format (new Date(time . milliseconds ) )
val path = paths+date
dataFrame .write . mode(SaveMode .Append) . format( source = "parquet") . save(path)
}
}
方法中首先进行了判断,判断时间是否为空,如果时间为空,就将当成的系统时间转化为年月日时的时间格式,进行返回,返回给date。如果不空,就将传过来的数据转化为年月日时传给date。将时间添加到path后,一层层嵌套。 Path和data Frame获取到之后,就将数据写入到路径下:
dataFrame .write . mode(SaveMode .Append) . format( source = "parquet") . save(path)
将数据按照parquet格式写入到配置文件中的配置路径,加上年月日时,将data Frame数据写到路径下。以上就是爬虫数据备份到hdfs的过程。
二、总结
第一步,准备数据,写入到 Redis 中同时收集数据,收集完数据之后,调用写入方法。写入方法中首先将数据和schema进行关联,创建data fream,获取到数据之后将其封装为data fream,然后将data Frame和path传到save方法中,将数据写入到路径中,以上就是数据分到HDFS的过程。
目标:由于Redis是基于内存的,爬虫数据写入Redis有数据丢失的风险,所以需要将爬虫数据写入redis的同时也写入HDFS-份,用于当redis丢失数据时的恢复。
1、在kv存储到redis的过程中, 向 ArrayBuffer[填充数据, 封装的是(time、 key、 value)
2、调用BlackListToHDFS.saveAntiBlackList
存储数据
3、在 saveAntiBlackList 创建 schema、 dataFrame, 然后调用 HdfsSaveUtil.save存储
4、在 save 方法里,先将路径加上时间年月日时,然后将数据存储为序列化(parquet) 的文件
以上就是实现数据备份到 HDFS 过程的代码解读。