开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第四阶段:数据预处理-数据推送-过滤纯查询数据】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/672/detail/11670
数据预处理-数据推送-过滤纯查询数据
过滤纯查询数据
括号扩到数据过滤后,表示在括号内部,都是针对数据进行的操作
即流程中数据预处理阶段,链路统计未对数据进行操作,数据清洗、数据脱敏一直到数据结构化都是对数据进行操作,数据处理完后要对数据进行推送操作,不对数据进行任何操作,数据获取后写入数据库中,因此数据推送规划到预处理后数据推送分两种情况,一种是查询类数据的推送,一种是预定类数据的推送
1.目标
根据业务场疑,将相应的数据推送到相应的 topic 内。
推送到 Kafka 中,Kafka 会划分成 topic
2.需求
前面我们已经将数据进行了分类、解析、加工(识别,爬虫判断),并封装到了 ProcessedData 中,现在我们需要将加工完的数据推送到 kafka 中,供后续的反爬虫计算或者其他业务使用。
由于 ProcessedData 中封装着两种数据类型,query 数据或者 book 数据,我们需要在推送的时候判断是属于query 还是 book,分别推送到不同的 topic 中。
query 查询类数据,book 预定类数据
根据业务场景,即数据分为查询类和预定类数据,国内国际不考虑,查询、预定都有国内国际,大的方向为查询和预定,在推送的时候判断是属于 query 还是 book,分别推送到不同的 topic 中,即将查询的数据推送到查询的 topic中,预定的数据推送到预定的 topic 中
3.设计
(1)通过操作类型 behaviorType=0过滤 ProcessedData 数据中的 query 数据
(2)如果 query 数据不为笙,推送到 kafka 的 processedQuery 这个 topic 中
(3)通过操作类型behaviorType=1过滤 ProcessedData 数据中的 book 数据
(4)如果 book 数据不为空,推送到 kafka 的 processedBook 这个 topic 中
过滤出纯查询数据和纯预定数据后,单独推送到相应的数据中,即需求
在 object 实现查询类数据的操作,后面实现预定
推送 query 数据:DataSend.sendQueryDataToKafka
DataSend 数据发送,sendQueryDataToKafka 发送查询数据到 Kafka 中,DataSend 没有需要创建,DataSend 用于实现数据发送功能,根据需求即查询的数据推送到查询是 topic,预定的数据推送到预定的topic
将结构化以后的数据 processedData 推送到 Kafka 中,toKafkaString 效果用#CS#进行拼接,直接将String类型传入
ProcessedData.toKafkaString()
中既有查询又有预定,要判断是查询还是预定,toKafkaString 无法实现,在processedData 中有一个 RequestType 包括国内查询和国际查询,国内预定和国际预定
打入 Kafka 之前,需要用 processedData
未判断出纯查询数据,返回到 DataProcess 中,将 DataProcess 放入,即 rdd 传入,DataProcess 包括既查询数据,也有预定数据,在 DataProcess 中过滤纯查询数据,
创建方法,在方法中实现将查询的数据推送到查询是 topic,要过滤出纯查询的数据,即过滤调预定的数据,调用filter,调用 filter 后拿到的结果是一条条数据,即 message,找到 message 中的 requestType,现在调的 requestType 表示数据是查询,国内查询还是国际查询,国内预定还是国际预定,不关心国内还是国际,只关心过滤掉预定的数据保留查询的数据,requestType 是一个样例类,有 flightType 和 behaviorType,behaviorType 有查询、预定,调用 behaviorType,filter 返回值(双等于)引入 BehaviorTypeEnum.Query,数据的类型只需要查询和预定,所有的数据都是查询类,即保留纯查询类等于 BehaviorTypeEnum 中的 Query,经过此方法处理,过滤掉所有的预定的数据,预定的数据是 book,不等于 Query,返回 false,被过滤掉
数据类型是 processedData,数据拿到纯查询数据,需要将纯查询数据写入到 Kafka 中,processedData 是样例类需要转成字符串,调用 map,拿到里面的每一条数据,调用 message.toKafkaString
.filter(message=message.requestType.behaviorType==BehaviorTypeEnum.Query)
过滤出纯查询的数据,.map(message-message.tokafkastring())
将纯查询数据转成 Kafkastring,拿到字符串数据,用#CS#拼接的数据
以上过滤出纯查询数据,同时转为字符串,用#CS#拼接的数据
//过滤出纯查询的数据(过滤掉预定的数据)DataProcess.filter(message=message.requestType.behaviorType==BehaviorTypeEnum.Query).map(message-message.toKafkastring())
将数据推送到 Kafka 中,即数据推送过程
4.数据推送
查询的数据推送到查询的 Topic 内,预定的数据推送到预定的Topic内思路与关键代码:
获取到结构化后数据数据,过滤掉预定的数据,只保留查询的数据
在配置文件中读取查询类的 Topic 到程序中
创建 kafka 生产者
数据的载体
数据的发送
关闭生成者
//用于实现数据发送功能
//查询的数据推送到查询的 topic
//预定的数据推送到预定的 topicob
Object Datasend {
//将查询的数据推送到查询的 topic
def sendQueryDataToKafka(DataProcess: RDD[ ProcessedData]): unit = {
//过滤出纯查询的数据(过滤掉预定的数据)使用"#CS#”进行拼接
vlqueryDatas=DataProcess.filter(message=message.
requestType.behaviorType==BehaviorTypeEnum.Query).map(message-message.toKafkastring())
//将数据推送到 Kafka 中
//1在配置文件中读取查询类的 Topic 到程序中
//2创建 kafka 生产者
//3数据的载体
//4数据的发送
//5关闭生成者
将数据推送到 Kafka 按照上面5步流程进行