开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建):Kafka 生产者 API 回顾】学习笔记与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/670/detail/11613
Kafka 生产者 API 回顾
内容介绍:
一、创建新的项目
二、创建Kafka生产者
三、创建数据的载体
四、发送数据
五、关闭生产者
一、创建新的项目
1.在Pom 文件已给好的前提下,创建一个 Maven 的项目,来测试我Kafka 的生产者 API :
File-文件-新建 project -创建 Maven 项目,点击下一步。
创建如下内容:
并完善 Pom 文件, Pom 文件为 Java 提供测试环境。
2. 将讲义中相关内容粘贴保存。出现以下问题时的解决办法有
(1)当环境中缺少两个目录时
原因在于 src 、 main 和 txt中没有 scala ,需要分别手动创建 scala 。
创建完成后,此时的 Pom 文件就不存在报错问题。
(2)其他问题
找到 File - setting - Maven ,并将 Maven 的地址和 setting file 配置到你自己的环境当中。
3.写生产者的 API生成
在test - Kafka - Java 中新建一个 Java Class,起名叫 test producer ,创建完成后,进行测试。
首先采用 main 方法:创建 main、 ,准备实现 Kafka 生产者的API。
二、创建 Kafka 的生产者
1.要有数据类型叫KafkaProducer 然后这里面有一个泛型[K,V]填入为 String类型
//设置集群属性
props.put("bootstrap.servers","192.168.100.100:9092,192.168.100.110:9092,192.168.100.120:9092,");
//设置keyvalue序列化方式
props.put("key.serializer","org.apache.kafka.common.serialization.
StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
// 1、创建kafka 生产者
KafkaProducer<String,String>kafkaProducer=newKafkaProducer<String,String>(props);
2.创建一个硬件配置文件,并往填充数据
KafkaProducer<String,String>kafkaProducer=new KafkaProducer<StringString>(props);
这个就是 Key 的一个序列化的方式。
3. 处理好配置文件后,把配置文件添加到这里面来。
三、创建数据的载体。
1.简单准备几个数据,把这个流程跑通,并把数据打到 Kafka 里面即可。并循环5次。
2. 准备这个数据的载体,命名为producer record。
3. 修改一个String数据类型,然后再编辑名字。
叫做record即一个新的 record;数据类型依然是string;输入
创建数据的载体
ProducerRecord<StringString)record=new ProducerRecord(StringString>(topic:"test",value:“123456”);
四、发送数据
1.复制 Kafka 的生产者。
2. 输入.send,即message消息的载体。完成发送。
kafkaProducer.send(record);
五、关闭生产者
输入 KafkaProducer,close();
执行数据,检查是否能将其打入 Kafka 中:
确认此 Kafka 中不存在topic。
跑一下这个程序:右键执行。
执行完成后,检查 topic是否产生变化,是否产生多个topic。
确认数据是否是填入的[123456]吗。
查询数据From--from-beginning,看一下是否能产生相关数据并循环执行,把程序再跑一遍,若不存在差错,即已将数据打入 Kafka 生产者中。
附加:消费者的相关代码