开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第四阶段:数据预处理-数据推送-效果与总结】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/672/detail/11672
数据预处理-数据推送-效果与总结
数据推送-效果与总结
代码写完后,查看实际效果
程序运行,进入代码中,过滤出纯查询的数据,找到topic,点击 target.query.topic 是 processedQuery,拿到参数,写入 topic 中,会看到 topic 中有数据查看当前目录,在kafka目录下查看数据中有哪些 topic 只有 test、test01和推送数据的 topic
没有配置文件中的 processedQuery 运行程序,数据会增加一个 topic,名字为 processedQuery 运行反爬虫项目,预处理
结果未报错,运行爬虫,右键 run 执行爬虫
数据未报错,输出 ProcessedData
新的 ProcessedData 出现
Kafka 的topic创建出来,将 topic 名称换为 processedQuery,回车有数据
查看 processedQuery 数据,出现很多数据查看数据预处理程序
数据预处理程序刷新,左侧程序也刷新推送查询数据效果已看到,代码流程都没有问题,流程已跑通以上是推送查询类数据
预定类推送数据,推送预定类数据流程与推送查询类数据流程很像,只需要改动几个参数
Datasend.sendBookDataToKafka()
数据是 DataProcessDatasend 方法还未创建,进入方法中,复制发送查询的代码,更改参数
//将预定的数据推送到预定的 topic
defsendBookDataToKafka(DataProcess:RDD[ProcessedData]):
unit={
//过滤出纯预定的数据(过滤掉查询的数据)使用"#CS#”进行拼接
ValbookDatas=DataProcess.filter(message=message.
requestType.behaviorType==BehaviorTypeEnum.Book).map(messag
e-message.toKafkastring())
//将数据推送到 kafka
//1在配置文件中读取预定类的 Topic 到程序中
(#难送预订数据
target.book.topic=processedBook)
ValbookTopic=propertiesutil.getstringByKey(key=
"target.Book.topic",propName="kafkaConfig.properties")
//实例kafka参数
valprops=newutil.HashMap[string,object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFI6,Propertie
sutil.getstringByKey(key="default.brokers",propName=
"kafkaconfig.properties"))
props.put(ProducerConfig.KEY_SERIALIZER_CLASs_CONFIG,Proper
tiesutil.getstringByKey(key="default.key_serializer_cla
ss_config",propName=“kafkaconfig.properties"))
props.put(Producercojfig.vALUE_SERIALIZER_CLASs_CONFTG,Prop
ertiesUtil.getstringByKey(key="default.value_serializer
_class_config",propName="kafkaConfig.properties"))
props.put(Producerconfig.BATCH_SIZE_CONFTG,PropertiesUtil.
getstringByKey(key="default.batch_size_config",propName
="kafkaConfig.properties"))
props.put(ProducerConfig.LINGER_MS_CONFIG,Propertiesutil.ge
tstringByKey(key="default.linger_ms_config",propName
="kafkaconfig.properties"))
//遍历数据的分区
BookDatas.foreachPartition(partition=>{
//2创建kafka生产者
valkafkaProducer=newKafkaProducer[string,string](props)
//遍历partition内的数据
partition.foreach(message=>{
//3数据的载体
valrecord=new
ProducerRecord[string,string](bookTopic,message)
//4数据的发送
kafkaProducer.send(record)
})
//5关闭生成者
kafkaProducer.close()
参数不需要修改,遍历分区改为 bookData,创建生产者、遍历分区的数据没有问题,数据载体 query 改为 book,数据发送和关闭生产者都没有问题
查询类数据与预定类数据的流程是相同的没有预定的数据,所以查看不到效果运行一下,如果查询类的 topic 有数据打入,说明程序代码没有问题,发送 book 代码也没有问题,只不过没有数据运行数据预处理的代码
右键执行爬虫,开始执行
程序未报错,流程有数据流入,说明推送的代码都没有问题数据推送预定代码流程全部完成
总结:数据推送(查询类)
目标:根据业务场景,将相应的数据推送到相应的 topic 内。
查询的数据推送到查询的 Topic 内,预定的数据推送到预定的 Topic 内思路与关键代码:
(1)获取到结构化后数据数据,过虑掉预定的数据,只保留查询的数据过滤出纯查询的数据(过滤掉预定的数据)使用"#CS#”进行拼接
valqueryDatas=DataProcess.filter(message=message.
requestType.behaviorType==BehaviorTypeEnum.Query).map(messa
ge-message.toKafkastring())
(2)在配置文件中读取查询类的Topic到程序中
valqueryTopic=propertiesutil.getstringByKey(key="target.
query.topic",propName="kafkaConfig.properties")
(1)创建 kafka 生产者
(2)数据的载体
(3)数据的发送
(4)关闭生成者
//遍历数据的分区
queryDatas.foreachPartition(partition=>{
//2创建 kafka 生产者
valkafkaProducer=newKafkaProducer[string,string](props)
//遍历 partition 内的数据
partition.foreach(message=>{
//3数据的载体
valrecord=new
ProducerRecord[string,string](queryTopic,message)
//4数据的发送
kafkaProducer.send(record)
})
//5关闭生成者
kafkaProducer.close()
创建生产者、数据载体、数据发送、关闭生产者整体拷贝,有整体感,推送预定类的数据流程是相同的数据推送(预定类)
目标:根据业务场景,将相应的数据推送到相应的 topic 内。
查询的数据推送到查询的 Topic 内,预定的数据推送到预定的 Topic内思路与关键代码:
(1)获取到结构化后数据数据,过滤掉查询的数据,只保留预定的数据
ValbookDatas=DataProcess.filter(message=message.
requestType.behaviorType==BehaviorTypeEnum.Book).map(messag
e-message.toKafkastring())
(2)在配置文件中读取预定类的 Topic 到程序中
ValbookTopic=propertiesutil.getstringByKey(key=
"target.Book.topic",propName="kafkaConfig.properties")
(3)创建 kafka 生产者
(4)数据的载体
(5)数据的发送
(6)关闭生成者
//遍历数据的分区
BookDatas.foreachPartition(partition=>{
//2创建 kafka 生产者
valkafkaProducer=newKafkaProducer[string,string](props)
//遍历 partition 内的数据
partition.foreach(message=>{
//3数据的载体
valrecord=new
ProducerRecord[string,string](bookTopic,message)
//4数据的发送
kafkaProducer.send(record)
})
//5关闭生成者
kafkaProducer.close()
数据推送模块查询类与预定类的数据推送全部完成