数据预处理-数据推送-代码实现|学习笔记

简介: 快速学习数据预处理-数据推送-代码实现

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)第四阶段数据预处理-数据推送-代码实现】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/672/detail/11671


数据预处理-数据推送-代码实现


数据推送-代码实现

已经过滤出纯查询的数据,将查询的数据推送到查询的 topic 中,数据已具备,要将数据推送到 Kafka 中,先读取配置文件中的配置,查询的 topic 已经配置好,在提供的 Kafka 文件中,打开文件

#消费者

#来自采集服务的原始数据

source.nginx.topic = B2CDATA_COLLECTION3

#处理后的查询数据

source.query .topic = processedouery

#处理后的预订数据

source.book .topic = processedBook

#生产者

#推送查询数据

target.query.topic = processedouery

#推送预订数据

target.book.topic = processedBook

采集完数据推送到 Kafka,即流程中的第二步

image.png

要完成第四步,即拿到 topic

生产者、推送查询数据中的 topic,将查询的数据推送到查询的 topic,即target.query.topic = processedouery,将预定的数据推送到预定的 topic,即 target.book.topic = processedBook

读取数据,使用 PropertiesUtil 调用,key有两个值,一个是推送查询数据的 topic,第二个参数是配置文件名称,即kafkaConfig.properties,推送查询数据的 topic 拿到,定义查询变量 queryTopic

创建Kafka生产者,首先拿到数据,遍历数据分区,需要遍历多个 partition,foreachPartition 效率比 foreach 快,先遍历分区,在一个分区创建生产者,一个分区创建一个生产者,多个分区有多个生产者,多个生产者同时写出速度更快,效率更高,创建一个 KafkaProducer 变量等于新的 KafkaProducer,范型是 string,需要一个Kafka参数,实例Kafka参数,val props=new,用map类型进行封装,定义java类型的 util.HashMap,map 有k和v,k是 string类型,v是 object 类型,实现参数往里面添加数据put,k指定 Kafka 集群

default.brokers = 192.168.100.100:9092,192.168.100.110:9092,192.168.100.120:9092

作为v添加

k 调用 producerConfig,引用 org.apache,

image.png

BOOTSTRAP_SERVERS 属性,将集群配置文件的值加入,k 是 default.brokers,v 是 kafkaConfig.properties,Kafka 配置文件名称

Key 的序列化、value 的序列化以及一个批次提交数据大小或间隔的时间都要进行配置

依次将配置文件名称修改,key发生变化,v不需要改变,就是 Kafka 配置文件名称,更改 ProducerConfig 的配置,使得前后一致

引用 org.apache,BOOTSTRAP_SERVERS 属性是因为直接设置好,可以直接使用

配置文件引用完成后,直接上传到生产者的参数中,流程与写Kafka的流程是一样的,数据生产者引用完成,下一步数据的载体

Partition 是分区,载体要拿到一条条数据,遍历分区数据,Partition 直接调用 foreach 或 map 数据就能拿到一个个的结果,需要返回值用 map,不需要返回值用 foreach,这里不需要返回值直接使用 foreach,foreach 拿到每一个数据 message,遍历出某一条数据,一个数据一个载体,进行下一步数据的载体,定义变量 record 等于新的ProducerRecord,需要一个 string 类型的范型,传入 queryTopic 中,将数据 message 写入到 Topic 中,就是数据的载体,数据载体具备后,发送数据,用生产者 KafkaProducer,send(record),数据发送完关闭生产者,

//将数据推送到 kafka

// 1在配置文件中读取查询类的Topic到程序中

val queryTopic= propertiesutil.getstringByKey( key = "target. query.topic" , propName ="kafkaConfig.properties")

//实例 kafka 参数

val props=new util.HashMap[string,object]()

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFI6,Propertiesutil.getstringByKey(key = "default.brokers",propName = "kafkaconfig.properties"))props.put(ProducerConfig.KEY_SERIALIZER_CLASs_CONFIG,Propertiesutil.getstringByKey(key = "default.key_serializer_class_config",propName = “kafkaconfig.properties")) props.put(Producercojfig.vALUE_SERIALIZER_CLASs_CONFTG,PropertiesUtil.getstringByKey(key = "default.value_serializer_class_config", propName = "kafkaConfig.properties"))props.put(Producerconfig.BATCH_SIZE_CONFTG ,PropertiesUtil.getstringByKey(key = "default.batch_size_config",propName = "kafkaConfig.properties")) props.put(ProducerConfig.LINGER_MS_CONFIG,Propertiesutil.getstringByKey( key = "default.linger_ms_config",propName = "kafkaconfig.properties"))

//遍历数据的分区

queryDatas.foreachPartition(partition=>{

// 2创建kafka生产者

val kafkaProducer=new KafkaProducer[string,string](props)

//遍历partition 内的数据

partition.foreach(message=>{

//3数据的载体

val record=new ProducerRecord[string,string](queryTopic,message)

//4数据的发送

kafkaProducer.send(record)

})

//5关闭生成者

kafkaProducer.close()

推送数据到 Kafka 的过程已写完,回到程序调用的方法中,做一个接收,

//9数据推送

//9-1查询类数据的推送

Val Datasend.sendQueryDataToKafka(DataProcess)

相关文章
|
10月前
|
存储 安全 API
权限申请被拒?详解京东/淘宝API审核标准与申诉技巧
在对接电商 API 时,权限申请常因资质或材料问题被拒。本文详解京东、淘宝的审核标准与申诉策略,结合实战案例,教你如何提升通过率,规避风险,实现高效对接。
|
存储 安全 物联网
什么是安全密钥,它是如何工作的
安全密钥是一种物理设备,常用于双因素或多因素身份验证(2FA/MFA),以提升在线账户安全性。它通过公钥加密协议(如FIDO U2F/FIDO2)实现强大的防网络钓鱼和凭证盗窃功能。常见的类型包括USB-A、USB-C、NFC和蓝牙密钥,支持一键登录且兼容多种服务。即使凭据泄露,安全密钥也能有效保护账户。若丢失密钥,可通过备用验证码或替代验证方法恢复访问,并重新注册新密钥。工具如ADSelfService Plus可与安全密钥无缝集成,提供自适应MFA及密码管理功能,增强整体安全性。
1257 0
什么是安全密钥,它是如何工作的
|
Windows
修改Windows语言出现“我们无法获取此Windows显示语言”问题怎么办?
本文介绍在Windows 10操作系统中,修改系统的显示语言的方法;并解决在这一过程中,出现的“很抱歉,我们无法获取此Windows显示语言”报错问题~
828 2
修改Windows语言出现“我们无法获取此Windows显示语言”问题怎么办?
|
机器学习/深度学习 计算机视觉
《深度剖析:残差连接如何攻克深度卷积神经网络的梯度与退化难题》
残差连接通过引入“短路”连接,解决了深度卷积神经网络(CNN)中随层数增加而出现的梯度消失和退化问题。它使网络学习输入与输出之间的残差,而非直接映射,从而加速训练、提高性能,并允许网络学习更复杂的特征。这一设计显著提升了深度学习在图像识别等领域的应用效果。
829 13
|
网络协议 网络安全 数据安全/隐私保护
批量修改网络配置,还得让Python来!
批量修改网络配置,还得让Python来!
257 0
|
存储 人工智能 缓存
《鸿蒙安全沙箱机制——人工智能应用的安全护盾》
鸿蒙系统的安全沙箱机制为人工智能应用提供了强大的安全保障。通过独立沙箱目录和路径隔离,确保每个应用在独立环境中运行,防止数据泄露和跨应用攻击。严格的访问控制和权限管理限制了程序的访问权限,保护敏感数据。统一身份认证、生物识别技术增强了登录安全性,防止未经授权访问。安全检测和重签名机制阻止恶意软件动态加载,确保应用的真实性和完整性。透明的权限管理让用户对应用行为有更多知情权和控制权。鸿蒙系统全方位的安全防护,使人工智能应用能在安全可靠的环境中运行。
900 15
|
算法 搜索推荐
解读双编码器和交叉编码器:信息检索中的向量表示与语义匹配
在信息检索领域(即从海量数据中查找相关信息),双编码器和交叉编码器是两种至关重要的工具。它们各自拥有独特的工作机制、优势和局限性。本文将深入探讨这两种核心技术。
785 3
解读双编码器和交叉编码器:信息检索中的向量表示与语义匹配
|
算法 安全 Ubuntu
8 种 Java 内存溢出之八 -Kill process or sacrifice child
8 种 Java 内存溢出之八 -Kill process or sacrifice child
|
数据采集 缓存 前端开发
服务器端渲染(SSR)
服务器端渲染(SSR)
|
数据采集 存储 人工智能
蚂蚁集团联合上海仁济医院泌尿科发布国内首个临床专科推理数据集:RJUA-QA
详细介绍数据集的构建过程、特点及统计分析,并全面评测了行业和通用大模型在该数据集上的性能,后续团队将持续优化数据集,为人工智能在医疗领域的研究与应用提供有力支持。
蚂蚁集团联合上海仁济医院泌尿科发布国内首个临床专科推理数据集:RJUA-QA

热门文章

最新文章