Kafka 生产者案例 | 学习笔记

简介: 快速学习 Kafka 生产者案例

开发者学堂课程【NiFi 知识精讲与项目实战(第三阶段):Kafka 生产者案例学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/707/detail/12553


Kafka 生产者案例

内容介绍:

一、Producer 生产

二、实际操作


一、Producer 生产

1.创建处理器

创建处理器组 kafka,进入组后分别创建 GenerateFlowFile(为了测试使用,创建随机字符串)和 PublishKafka_0_10(随机字符串发送到 Kafka 中,查看能否正常使用)处理器。

2.负载均衡生产消息

(1)连接 GenerateFlowFile和PublishKafka_0_10

image.png

(2)负载均衡并发

image.png

3.配置 GenerateFlowFile

(1)调度配置

每1秒生产一次数据

image.png

(2)属性配置

文件大小100b;每次生成10个相同文件;每次生成的流文件内容唯一。一次生成10个相同的文件,每个批次的文件内容都是唯一的,不会出现两个批次的内容是一致的情况。

image.png

4.配置 PublishKafka_0_10

(1)属性配置

Brokers 设置为192.168.52.100:9092,192.168.52.110:9092,192.168.52.120:9092topic 设置为 nifi-topic,其它 topic名字也可以,保证后面消费的时候是统一的。如果 topic 不存在,会自动创建;

Delivery Guarantee,对应 kafka 的 acks 机制,选择最为保险的 Guarantee Replicated Delivery,不会产生数据丢失问题,选择性能最快的就可能产生数据丢失,相当于 acks=all。

(2)关系配置

进行自连接

image.png

5.启动数据并监听数据

(1)启动流程

(2)监听 kafka 消费数据

在 kafka 所在服务器执行监听命令:

/export/servers/kafka_2.11-0.10.2.1/bin/kafka-console-consumer.sh--bootstrap-server192.168.52.110:9092 --topic nifi-topic


二、实际操作

首先,创建一个处理器组,名字为 kafka

image.png

创建完成以后进入处理器组,首先,创建一个处理器,处理器包含了两个,一个为 GenerateFlowFile,用来生成随机的数据,另外一个是 publishkafka

image.png

它会出现很多组件,使用和服务器所对应的版本(0.10版本),把两个处理器进行连接。

连接完以后下来设置 Generateflowfile,设置生产的速率不为零,设置 run schedule 为1 sec,设置每次生产数据10B,设置一次创建的文件个数为10个,所以说一次是100B,还要设置为唯一,也就是说每一批次生成的数据都是不一样的,可以说如果第一次生成的十个文件是一致的,但是在第二次生成的十个文件和第一次就不一样,便于进行测试。

接下来配置 publishkafka,首先设置 kafka brokers 地址,因为 kafka 是一个集群,所以可以配置一个集群模式,这里设置三台的数据为192.168.52.100: 9092,192.168.52.110: 9092 ,192.168.52.120: 9092 。接下来设置topic name,为 nifi-topic,如果 topic 不存在的话,会自动创建一个新的 topic 信息。接下来设置 delivery 是Guarantee,就是 ask 机制,选用最保险的机制 Guarantee Replicated Delivery,避免数据丢失,方便进行测试。在很多生产环境下,这是最保险的形式也是最常用的形式,保证数据不丢失是系统的底线。如果数据丢失,计算结果将不正确。配置完以后发现数据报错

image.png

是因为没有做自连接,进行自连接勾选 failure 和 success 信息,共享成功以后进行测试,先启动,启动查看是否有报错信息,可以到kafka服务器上,通过 kafka 监听命令,来监听数据。

监听这三台任何一台都可以,因为们是同一个集群,Topic 要保持一致,产生的数据是十条十条进来而且每秒都会产生数据变化,这证明 kafka 生产已经生效了。

相关文章
|
8天前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
2月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
|
2月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
87 4
|
3月前
|
消息中间件 存储 缓存
深入理解Kafka核心设计及原理(二):生产者
深入理解Kafka核心设计及原理(二):生产者
68 8
|
2月前
|
消息中间件 安全 机器人
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
|
2月前
|
消息中间件 Java Kafka
Kafka生产者同步和异步的JavaAPI代码演示
Kafka生产者同步和异步的JavaAPI代码演示
26 0
|
3月前
|
消息中间件 监控 Java
查询Kafka生产者是否连接到Kafka服务
查询Kafka生产者是否连接到Kafka服务
136 2
|
3月前
|
消息中间件 存储 Kafka
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
49 0
|
4月前
|
消息中间件 Kafka
Kafka生产者和消费者相关命令行操作
Kafka生产者和消费者相关命令行操作
79 1
|
4月前
|
消息中间件 存储 Kafka
Kafka(二)【文件存储机制 & 生产者】(2)
Kafka(二)【文件存储机制 & 生产者】
下一篇
无影云桌面