数据预处理-数据推送-效果与总结|学习笔记

简介: 快速学习数据预处理-数据推送-效果与总结

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第四阶段数据预处理-数据推送-效果与总结】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/672/detail/11672


数据预处理-数据推送-效果与总结


数据推送-效果与总结

代码写完后,查看实际效果

程序运行,进入代码中,过滤出纯查询的数据,找到topic,点击 target.query.topic 是 processedQuery,拿到参数,写入 topic 中,会看到 topic 中有数据查看当前目录,在kafka目录下查看数据中有哪些 topic 只有 test、test01和推送数据的 topic

1.png

没有配置文件中的 processedQuery 运行程序,数据会增加一个 topic,名字为 processedQuery 运行反爬虫项目,预处理

1.png

结果未报错,运行爬虫,右键 run 执行爬虫

1.png

数据未报错,输出 ProcessedData

1.png

新的 ProcessedData 出现

Kafka 的topic创建出来,将 topic 名称换为 processedQuery,回车有数据

1.png

查看 processedQuery 数据,出现很多数据查看数据预处理程序

1.png

数据预处理程序刷新,左侧程序也刷新推送查询数据效果已看到,代码流程都没有问题,流程已跑通以上是推送查询类数据

预定类推送数据,推送预定类数据流程与推送查询类数据流程很像,只需要改动几个参数

Datasend.sendBookDataToKafka()

数据是 DataProcessDatasend 方法还未创建,进入方法中,复制发送查询的代码,更改参数

//将预定的数据推送到预定的 topic

defsendBookDataToKafka(DataProcess:RDD[ProcessedData]):

unit={

//过滤出纯预定的数据(过滤掉查询的数据)使用"#CS#”进行拼接

ValbookDatas=DataProcess.filter(message=message.

requestType.behaviorType==BehaviorTypeEnum.Book).map(messag

e-message.toKafkastring())

//将数据推送到 kafka

//1在配置文件中读取预定类的 Topic 到程序中

(#难送预订数据

target.book.topic=processedBook)

ValbookTopic=propertiesutil.getstringByKey(key=

"target.Book.topic",propName="kafkaConfig.properties")

//实例kafka参数

valprops=newutil.HashMap[string,object]()

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFI6,Propertie

sutil.getstringByKey(key="default.brokers",propName=

"kafkaconfig.properties"))

props.put(ProducerConfig.KEY_SERIALIZER_CLASs_CONFIG,Proper

tiesutil.getstringByKey(key="default.key_serializer_cla

ss_config",propName=“kafkaconfig.properties"))

props.put(Producercojfig.vALUE_SERIALIZER_CLASs_CONFTG,Prop

ertiesUtil.getstringByKey(key="default.value_serializer

_class_config",propName="kafkaConfig.properties"))

props.put(Producerconfig.BATCH_SIZE_CONFTG,PropertiesUtil.

getstringByKey(key="default.batch_size_config",propName

="kafkaConfig.properties"))

props.put(ProducerConfig.LINGER_MS_CONFIG,Propertiesutil.ge

tstringByKey(key="default.linger_ms_config",propName

="kafkaconfig.properties"))

//遍历数据的分区

BookDatas.foreachPartition(partition=>{

//2创建kafka生产者

valkafkaProducer=newKafkaProducer[string,string](props)

//遍历partition内的数据

partition.foreach(message=>{

//3数据的载体

valrecord=new

ProducerRecord[string,string](bookTopic,message)

//4数据的发送

kafkaProducer.send(record)

})

//5关闭生成者

kafkaProducer.close()

参数不需要修改,遍历分区改为 bookData,创建生产者、遍历分区的数据没有问题,数据载体 query 改为 book,数据发送和关闭生产者都没有问题

查询类数据与预定类数据的流程是相同的没有预定的数据,所以查看不到效果运行一下,如果查询类的 topic 有数据打入,说明程序代码没有问题,发送 book 代码也没有问题,只不过没有数据运行数据预处理的代码

1.png

右键执行爬虫,开始执行

1.png

程序未报错,流程有数据流入,说明推送的代码都没有问题数据推送预定代码流程全部完成

总结:数据推送(查询类)

目标:根据业务场景,将相应的数据推送到相应的 topic 内。

查询的数据推送到查询的 Topic 内,预定的数据推送到预定的 Topic 内思路与关键代码:

(1)获取到结构化后数据数据,过虑掉预定的数据,只保留查询的数据过滤出纯查询的数据(过滤掉预定的数据)使用"#CS#”进行拼接

valqueryDatas=DataProcess.filter(message=message.

requestType.behaviorType==BehaviorTypeEnum.Query).map(messa

ge-message.toKafkastring())

(2)在配置文件中读取查询类的Topic到程序中

valqueryTopic=propertiesutil.getstringByKey(key="target.

query.topic",propName="kafkaConfig.properties")

(1)创建 kafka 生产者

(2)数据的载体

(3)数据的发送

(4)关闭生成者

//遍历数据的分区

queryDatas.foreachPartition(partition=>{

//2创建 kafka 生产者

valkafkaProducer=newKafkaProducer[string,string](props)

//遍历 partition 内的数据

partition.foreach(message=>{

//3数据的载体

valrecord=new

ProducerRecord[string,string](queryTopic,message)

//4数据的发送

kafkaProducer.send(record)

})

//5关闭生成者

kafkaProducer.close()

创建生产者、数据载体、数据发送、关闭生产者整体拷贝,有整体感,推送预定类的数据流程是相同的数据推送(预定类)

目标:根据业务场景,将相应的数据推送到相应的 topic 内。

查询的数据推送到查询的 Topic 内,预定的数据推送到预定的 Topic内思路与关键代码:

(1)获取到结构化后数据数据,过滤掉查询的数据,只保留预定的数据

ValbookDatas=DataProcess.filter(message=message.

requestType.behaviorType==BehaviorTypeEnum.Book).map(messag

e-message.toKafkastring())

(2)在配置文件中读取预定类的 Topic 到程序中

ValbookTopic=propertiesutil.getstringByKey(key=

"target.Book.topic",propName="kafkaConfig.properties")

(3)创建 kafka 生产者

(4)数据的载体

(5)数据的发送

(6)关闭生成者

//遍历数据的分区

BookDatas.foreachPartition(partition=>{

//2创建 kafka 生产者

valkafkaProducer=newKafkaProducer[string,string](props)

//遍历 partition 内的数据

partition.foreach(message=>{

//3数据的载体

valrecord=new

ProducerRecord[string,string](bookTopic,message)

//4数据的发送

kafkaProducer.send(record)

})

//5关闭生成者

kafkaProducer.close()

数据推送模块查询类与预定类的数据推送全部完成

相关文章
|
6月前
|
数据采集 数据可视化 数据挖掘
数据清洗有什么方式
数据清洗有什么方式
|
5月前
|
数据采集 监控 安全
数据预处理几种常见问题
【6月更文挑战第12天】数据处理中常见的问题:数据缺失、数据重复、数据异常和数据样本差异大。对于数据缺失,处理方法包括定位、不处理、删除和填补,其中填补可使用业务知识、其他属性或统计方法。
|
6月前
|
数据采集 Python
数据清洗是数据预处理的重要步骤
数据清洗是数据预处理的重要步骤
78 0
|
数据采集 SQL 分布式计算
81 网站点击流数据分析案例(数据预处理功能)
81 网站点击流数据分析案例(数据预处理功能)
87 0
|
消息中间件 数据采集 Java
数据预处理-数据推送-代码实现|学习笔记
快速学习数据预处理-数据推送-代码实现
127 0
数据预处理-数据推送-代码实现|学习笔记
|
消息中间件 数据采集 分布式计算
数据预处理-数据推送-过滤纯查询数据|学习笔记
快速学习数据预处理-数据推送-过滤纯查询数据
128 0
数据预处理-数据推送-过滤纯查询数据|学习笔记
|
数据采集 分布式计算 NoSQL
数据预处理—数据清洗—规则更新流程代码|学习笔记
快速学习数据预处理—数据清洗—规则更新流程代码
243 0
数据预处理—数据清洗—规则更新流程代码|学习笔记
|
数据采集 消息中间件 前端开发
数据预处理流程详解|学习笔记
快速学习数据预处理流程详解
数据预处理流程详解|学习笔记
|
数据采集 存储 监控
数据预处理-数据解析-需求与思路|学习笔记
快速学习数据预处理-数据解析-需求与思路
218 0
数据预处理-数据解析-需求与思路|学习笔记
|
数据采集 大数据 开发者
数据预处理—数据清洗—数据过滤功能代码|学习笔记
快速学习数据预处理—数据清洗—数据过滤功能代码
346 0
数据预处理—数据清洗—数据过滤功能代码|学习笔记