开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第四阶段:数据预处理-数据推送-过滤纯查询数据】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/672/detail/11670
数据预处理-数据推送-过滤纯查询数据
过滤纯查询数据
括号扩到数据过滤后,表示在括号内部,都是针对数据进行的操作
即流程中数据预处理阶段,链路统计未对数据进行操作,数据清洗、数据脱敏一直到数据结构化都是对数据进行操作,数据处理完后要对数据进行推送操作,不对数据进行任何操作,数据获取后写入数据库中,因此数据推送规划到预处理后数据推送分两种情况,一种是查询类数据的推送,一种是预定类数据的推送
1.目标
根据业务场疑,将相应的数据推送到相应的 topic 内。
推送到 Kafka 中,Kafka 会划分成 topic
2.需求
前面我们已经将数据进行了分类、解析、加工(识别,爬虫判断),并封装到了 ProcessedData 中,现在我们需要将加工完的数据推送到 kafka 中,供后续的反爬虫计算或者其他业务使用。
由于 ProcessedData 中封装着两种数据类型,query 数据或者 book 数据,我们需要在推送的时候判断是属于query 还是 book,分别推送到不同的 topic 中。
query 查询类数据,book 预定类数据
根据业务场景,即数据分为查询类和预定类数据,国内国际不考虑,查询、预定都有国内国际,大的方向为查询和预定,在推送的时候判断是属于 query 还是 book,分别推送到不同的 topic 中,即将查询的数据推送到查询的 topic中,预定的数据推送到预定的 topic 中
3.设计
(1)通过操作类型 behaviorType=0过滤 ProcessedData 数据中的 query 数据
(2)如果 query 数据不为笙,推送到 kafka 的 processedQuery 这个 topic 中
(3)通过操作类型behaviorType=1过滤 ProcessedData 数据中的 book 数据
(4)如果 book 数据不为空,推送到 kafka 的 processedBook 这个 topic 中
过滤出纯查询数据和纯预定数据后,单独推送到相应的数据中,即需求
在 object 实现查询类数据的操作,后面实现预定
推送 query 数据:DataSend.sendQueryDataToKafka
DataSend 数据发送,sendQueryDataToKafka 发送查询数据到 Kafka 中,DataSend 没有需要创建,DataSend 用于实现数据发送功能,根据需求即查询的数据推送到查询是 topic,预定的数据推送到预定的topic
将结构化以后的数据 processedData 推送到 Kafka 中,toKafkaString 效果用#CS#进行拼接,直接将String类型传入
ProcessedData.toKafkaString()中既有查询又有预定,要判断是查询还是预定,toKafkaString 无法实现,在processedData 中有一个 RequestType 包括国内查询和国际查询,国内预定和国际预定
打入 Kafka 之前,需要用 processedData
未判断出纯查询数据,返回到 DataProcess 中,将 DataProcess 放入,即 rdd 传入,DataProcess 包括既查询数据,也有预定数据,在 DataProcess 中过滤纯查询数据,
创建方法,在方法中实现将查询的数据推送到查询是 topic,要过滤出纯查询的数据,即过滤调预定的数据,调用filter,调用 filter 后拿到的结果是一条条数据,即 message,找到 message 中的 requestType,现在调的 requestType 表示数据是查询,国内查询还是国际查询,国内预定还是国际预定,不关心国内还是国际,只关心过滤掉预定的数据保留查询的数据,requestType 是一个样例类,有 flightType 和 behaviorType,behaviorType 有查询、预定,调用 behaviorType,filter 返回值(双等于)引入 BehaviorTypeEnum.Query,数据的类型只需要查询和预定,所有的数据都是查询类,即保留纯查询类等于 BehaviorTypeEnum 中的 Query,经过此方法处理,过滤掉所有的预定的数据,预定的数据是 book,不等于 Query,返回 false,被过滤掉
数据类型是 processedData,数据拿到纯查询数据,需要将纯查询数据写入到 Kafka 中,processedData 是样例类需要转成字符串,调用 map,拿到里面的每一条数据,调用 message.toKafkaString.filter(message=message.requestType.behaviorType==BehaviorTypeEnum.Query)过滤出纯查询的数据,.map(message-message.tokafkastring())将纯查询数据转成 Kafkastring,拿到字符串数据,用#CS#拼接的数据
以上过滤出纯查询数据,同时转为字符串,用#CS#拼接的数据
//过滤出纯查询的数据(过滤掉预定的数据)DataProcess.filter(message=message.requestType.behaviorType==BehaviorTypeEnum.Query).map(message-message.toKafkastring())
将数据推送到 Kafka 中,即数据推送过程
4.数据推送
查询的数据推送到查询的 Topic 内,预定的数据推送到预定的Topic内思路与关键代码:
获取到结构化后数据数据,过滤掉预定的数据,只保留查询的数据
在配置文件中读取查询类的 Topic 到程序中
创建 kafka 生产者
数据的载体
数据的发送
关闭生成者
//用于实现数据发送功能
//查询的数据推送到查询的 topic
//预定的数据推送到预定的 topicob
Object Datasend {
//将查询的数据推送到查询的 topic
def sendQueryDataToKafka(DataProcess: RDD[ ProcessedData]): unit = {
//过滤出纯查询的数据(过滤掉预定的数据)使用"#CS#”进行拼接
vlqueryDatas=DataProcess.filter(message=message.
requestType.behaviorType==BehaviorTypeEnum.Query).map(message-message.toKafkastring())
//将数据推送到 Kafka 中
//1在配置文件中读取查询类的 Topic 到程序中
//2创建 kafka 生产者
//3数据的载体
//4数据的发送
//5关闭生成者
将数据推送到 Kafka 按照上面5步流程进行
