开发者学堂课程【NiFi 知识精讲与项目实战(第三阶段):Kafka 生产者案例】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/707/detail/12553
Kafka 生产者案例
内容介绍:
一、Producer 生产
二、实际操作
一、Producer 生产
1.创建处理器
创建处理器组 kafka,进入组后分别创建 GenerateFlowFile(为了测试使用,创建随机字符串)和 PublishKafka_0_10(随机字符串发送到 Kafka 中,查看能否正常使用)处理器。
2.负载均衡生产消息
(1)连接 GenerateFlowFile和PublishKafka_0_10
(2)负载均衡并发
3.配置 GenerateFlowFile
(1)调度配置
每1秒生产一次数据
(2)属性配置
文件大小100b;每次生成10个相同文件;每次生成的流文件内容唯一。一次生成10个相同的文件,每个批次的文件内容都是唯一的,不会出现两个批次的内容是一致的情况。
4.配置 PublishKafka_0_10
(1)属性配置
Brokers 设置为192.168.52.100:9092,192.168.52.110:9092,192.168.52.120:9092topic 设置为 nifi-topic,其它 topic名字也可以,保证后面消费的时候是统一的。如果 topic 不存在,会自动创建;
Delivery Guarantee,对应 kafka 的 acks 机制,选择最为保险的 Guarantee Replicated Delivery,不会产生数据丢失问题,选择性能最快的就可能产生数据丢失,相当于 acks=all。
(2)关系配置
进行自连接
5.启动数据并监听数据
(1)启动流程
(2)监听 kafka 消费数据
在 kafka 所在服务器执行监听命令:
/export/servers/kafka_2.11-0.10.2.1/bin/kafka-console-consumer.sh--bootstrap-server192.168.52.110:9092 --topic nifi-topic
二、实际操作
首先,创建一个处理器组,名字为 kafka
创建完成以后进入处理器组,首先,创建一个处理器,处理器包含了两个,一个为 GenerateFlowFile,用来生成随机的数据,另外一个是 publishkafka
它会出现很多组件,使用和服务器所对应的版本(0.10版本),把两个处理器进行连接。
连接完以后下来设置 Generateflowfile,设置生产的速率不为零,设置 run schedule 为1 sec,设置每次生产数据10B,设置一次创建的文件个数为10个,所以说一次是100B,还要设置它为唯一,也就是说每一批次生成的数据都是不一样的,可以说如果第一次生成的十个文件是一致的,但是在第二次生成的十个文件和第一次就不一样,便于进行测试。
接下来配置 publishkafka,首先设置 kafka brokers 地址,因为 kafka 是一个集群,所以可以配置一个集群模式,这里设置三台的数据为192.168.52.100: 9092,192.168.52.110: 9092 ,192.168.52.120: 9092 。接下来设置topic name,为 nifi-topic,如果 topic 不存在的话,会自动创建一个新的 topic 信息。接下来设置 delivery 是Guarantee,它就是 ask 机制,选用最保险的机制 Guarantee Replicated Delivery,避免数据丢失,方便进行测试。在很多生产环境下,这是最保险的形式也是最常用的形式,保证数据不丢失是系统的底线。如果数据丢失,计算结果将不正确。配置完以后发现数据报错
是因为它没有做自连接,进行自连接勾选 failure 和 success 信息,共享成功以后进行测试,先启动,启动查看是否有报错信息,可以到kafka服务器上,通过 kafka 监听命令,来监听数据。
监听这三台任何一台都可以,因为它们是同一个集群,Topic 要保持一致,产生的数据是十条十条进来而且每秒都会产生数据变化,这证明 kafka 生产已经生效了。