开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第四阶段:数据预处理-数据推送-代码实现】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/672/detail/11671
数据预处理-数据推送-代码实现
数据推送-代码实现
已经过滤出纯查询的数据,将查询的数据推送到查询的 topic 中,数据已具备,要将数据推送到 Kafka 中,先读取配置文件中的配置,查询的 topic 已经配置好,在提供的 Kafka 文件中,打开文件
#消费者
#来自采集服务的原始数据
source.nginx.topic = B2CDATA_COLLECTION3
#处理后的查询数据
source.query .topic = processedouery
#处理后的预订数据
source.book .topic = processedBook
#生产者
#推送查询数据
target.query.topic = processedouery
#推送预订数据
target.book.topic = processedBook
采集完数据推送到 Kafka,即流程中的第二步
要完成第四步,即拿到 topic
生产者、推送查询数据中的 topic,将查询的数据推送到查询的 topic,即target.query.topic = processedouery
,将预定的数据推送到预定的 topic,即 target.book.topic = processedBook
读取数据,使用 PropertiesUtil 调用,key有两个值,一个是推送查询数据的 topic,第二个参数是配置文件名称,即kafkaConfig.properties,推送查询数据的 topic 拿到,定义查询变量 queryTopic
创建Kafka生产者,首先拿到数据,遍历数据分区,需要遍历多个 partition,foreachPartition 效率比 foreach 快,先遍历分区,在一个分区创建生产者,一个分区创建一个生产者,多个分区有多个生产者,多个生产者同时写出速度更快,效率更高,创建一个 KafkaProducer 变量等于新的 KafkaProducer,范型是 string,需要一个Kafka参数,实例Kafka参数,val props=new,用map类型进行封装,定义java类型的 util.HashMap,map 有k和v,k是 string类型,v是 object 类型,实现参数往里面添加数据put,k指定 Kafka 集群
default.brokers = 192.168.100.100:9092,192.168.100.110:9092,192.168.100.120:9092
作为v添加
k 调用 producerConfig,引用 org.apache,
BOOTSTRAP_SERVERS 属性,将集群配置文件的值加入,k 是 default.brokers,v 是 kafkaConfig.properties,Kafka 配置文件名称
Key 的序列化、value 的序列化以及一个批次提交数据大小或间隔的时间都要进行配置
依次将配置文件名称修改,key发生变化,v不需要改变,就是 Kafka 配置文件名称,更改 ProducerConfig 的配置,使得前后一致
引用 org.apache,BOOTSTRAP_SERVERS 属性是因为直接设置好,可以直接使用
配置文件引用完成后,直接上传到生产者的参数中,流程与写Kafka的流程是一样的,数据生产者引用完成,下一步数据的载体
Partition 是分区,载体要拿到一条条数据,遍历分区数据,Partition 直接调用 foreach 或 map 数据就能拿到一个个的结果,需要返回值用 map,不需要返回值用 foreach,这里不需要返回值直接使用 foreach,foreach 拿到每一个数据 message,遍历出某一条数据,一个数据一个载体,进行下一步数据的载体,定义变量 record 等于新的ProducerRecord,需要一个 string 类型的范型,传入 queryTopic 中,将数据 message 写入到 Topic 中,就是数据的载体,数据载体具备后,发送数据,用生产者 KafkaProducer,send(record),数据发送完关闭生产者,
//将数据推送到 kafka
// 1在配置文件中读取查询类的Topic到程序中
val queryTopic= propertiesutil.getstringByKey( key = "target. query.topic" , propName ="kafkaConfig.properties")
//实例 kafka 参数
val props=new util.HashMap[string,object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFI6,Propertiesutil.getstringByKey(key = "default.brokers",propName = "kafkaconfig.properties"))
props.put(ProducerConfig.KEY_SERIALIZER_CLASs_CONFIG,Propertiesutil.getstringByKey(key = "default.key_serializer_class_config",propName = “kafkaconfig.properties")) props.put(Producercojfig.vALUE_SERIALIZER_CLASs_CONFTG,PropertiesUtil.getstringByKey(key = "default.value_serializer_class_config", propName = "kafkaConfig.properties"))
props.put(Producerconfig.BATCH_SIZE_CONFTG ,PropertiesUtil.getstringByKey(key = "default.batch_size_config",propName = "kafkaConfig.properties")) props.put(ProducerConfig.LINGER_MS_CONFIG,Propertiesutil.getstringByKey( key = "default.linger_ms_config",propName = "kafkaconfig.properties"))
//遍历数据的分区
queryDatas.foreachPartition(partition=>{
// 2创建kafka生产者
val kafkaProducer=new KafkaProducer[string,string](props)
//遍历partition 内的数据
partition.foreach(message=>{
//3数据的载体
val record=new ProducerRecord[string,string](queryTopic,message)
//4数据的发送
kafkaProducer.send(record)
})
//5关闭生成者
kafkaProducer.close()
推送数据到 Kafka 的过程已写完,回到程序调用的方法中,做一个接收,
//9数据推送
//9-1查询类数据的推送
Val Datasend.sendQueryDataToKafka(DataProcess)