开发者学堂课程【NiFi 知识精讲与项目实战(第三阶段):Kafka 消费者案例】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/707/detail/12554
Kafka 消费者案例
内容介绍:
一、Consumer 消费
二、实际操作
一、Consumer 消费
1.创建处理器并连接
创建 ConsumeKafka_0_10和 LogAttribute(打印日志,观察是否真正消费 kafka 数据)处理器,并连接。
2.配置 ConsumeKafka_0_10
Brokers 地址要和 Producer 的设置一样:192.168.52.100:9092,192.168.52.110:9092,192.168.52.120:9092
Topic 设置和 Producer 一致: nifi-topic(不一致就接收不到信息)
Group ID 随意设置:nifi,相当于分组去消费
offset Reset 设置为: latest,从最新的消息开始消费,;也可以从最早的进行消费。
3.设置 LogAttribute
设置为自连接,其它都是用默认值。
4.启动流程并查看日志
5.增加生产频率
注意:如果服务器资源有限,不要进行此操作。因为不仅要启动 nifi 服务器集群,还要启动 kafka 集群,如果电脑配置不够,就让消费的速度慢一些。
GenerateFlowFile 的调度频率加快: 20ms
二、实际操作
首先创建处理器 consumekafka,还是选择0.10版本的处理器,再创建 logattribute,处理器创建好以后进行连接,连接以后,对 kafka 的消费数据,会推送到日志里面,通过 nifi 日志就可以查看是否被消费以及消费的数据,接下来配置 kafkaconsumer,更改一下 broker 地址,可以改成和生产一样的地址,不一样的地址就接收不到消息,topic 还是一样为 nifi-topic,Group ID 可以随意填写,为了方便查看最新的消费情况 offset 可以选择 latest 最新的数据,然后点击 apply。
此时发现 logattribute 有黄色提示信息,设置自连接,接下来测试是否能够正常消费,查看 nifi 日志。All nodes 消费以后,日志可以改为 primary nodes。让它只在主节点里面运行,只要查看主节点的数据即可,点击 cluster
主节点是第三个,进入第三个去查看日志即可
通过这些来监听日志,这些是JK心跳日志,稍后会启用 kafka 日志。kafka 服务器上面也在继续监听,它没有新消息,需要启动一下所有服务,带马蓉显示已经开始消费 kafka 数据
消费端也有数据在不断生成,每一次十个一秒发一次,这时候的服务器负载已经超高,可以先进行停止,自己在测试的时候,一定要根据硬件配置来进行,最后的案例可以把速率调小一点,查看 kafka 消费情况和 nifi 日志打印,速率调小那么日志发送的速度就会变慢。