开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第五阶段:爬虫识别-封装数据成 processedData】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/673/detail/11689
爬虫识别-封装数据成 processedData
内容介绍:
一、目的
二、需求与设计
三、代码编写
一、目的
本章节要对数据进行封装,现在的数据都是用#CS#分开的数据,如果以后需要调用的时候再来一个一个进行拆分,就显得麻烦,所以再次统一进行拆分,然后封装,在调用对的点,去拿取数据,更为方便。
即经过预处理以后的数据存储到 kafka 里面,是要进行分割的,后续计算的时候会用到各个数据,临时切分不太方便,为了方便后续使用,需要先在这里面先把它进行拆分,然后封装成 process date。
二、需求与设计
需求
从 kafka 中读取数据,用#CS#分割的字符,需要重新解封到对应的 process data 中,后期方便使用。
设计
1、拿到 kafkaValue 进行 mapPartitions,减少创建类的开销,这样速度会更快。
2、对 RDD 进行 map,循环数据
3、对一行数据用#CS#分割,取出所有的值封装到 ProcessedData 中,存在 bin 当中
4、返回数据
三、代码编写
具体代码如下:
//数据加载封装
//将 kafka 内的数据使用“#CS#”拆分,拆分后封装成 process data
val processedData =
QueryDataPackage.queryDataLoadAndPackage(kafkaValue)
将上述代码复制到开发工具中,将 kafka 数据拆封,分装成 process data。
在新建一个 scala 的 object,然后在进行打包,目的是实现数据切分。具体代码如下:
package com.air.antispider.stream.rulecompute.businessprocess
import com.air.antispider.stream.common.bean.
import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum.TravelTypeEnum importcCm.air.antispider.stream.dataprocess.constants.BehaviorTypeEnum,FlightTypeEnum, TravelTypeEnum)
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.streaming.dstream.DStream
//用于实现数据分割封装
object QueryDataPackage(
object QueryDataPackage(
//分割封装
def queryDataLoadAndPackage(kafkaValue:
DStream[String])=(
//使用 mapPartitions 减少包装类的创建开销
kafkaValue.mapPartitions (partitionslterator=>
//创建 json 解析
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
//将数据进行 map,一条条处理
partitionslterator.map(sourceLine=>
//分割数据
val dataArray= sourceLine.split("#CS#", -1)
会发现有报错。因为里面有而很多的包没有引用,引入进来之后,就不报错了。传递过去的数据 values,里面是一个字符串,用#cs#进行拼接。
用#cs#进行 split 操作,拿到了一个个结果,到最后将结果进行封装,封装完后就返回。最后的结果就是封装后的process data,最后再经过一串代码进行处理之后,输出的结果就不是#cs#分割的,而是 process data。
运行程序的话,还需要用到爬虫预处理的数据,俩个都执行起来后,爬虫预处理运行结果如下:
数据预处理是 process data 爬虫也是一样就代表将一条数据塞到方法里进行拆分,拆分完封装成 process data,返回能拿到,就是数据的封装,将原始“#CS#”一条数据封装成 processedData 就做完了。
//将 kafka 内的数据使用“#CS#”拆分,拆分后封装成 process data
val processedData =
QueryDataPackage.queryDataLoadAndPackage(kafkaValue)
processedData.foreachRDD(rdd=>rdd.foreach(println)