Kafka修炼日志(三):Streams简明使用教程

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介:  Streams是Kafka 10版本新增的功能,用于实时处理存储与Kafka服务器的数据,并将处理后的结果推送至指定的Topic中,供后续使用者使用。

Streams是Kafka 10版本新增的功能,用于实时处理存储与Kafka服务器的数据,并将处理后的结果推送至指定的Topic中,供后续使用者使用。

     下面结合官方教程详述如何使用Streams实时分析处理数据,教程的Demo是一个单词计数器:

(1)首先使用Kafka Topic创建命令创建一个用于生产消息的Topic:streams-file-input。

[root@localhost kafka_2.12-0.10.2.0]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication 3 --partitions 1 --topic streams-file-input

image.gifimage.gif

编写一个消息测试文件file-input.txt,作为消息生产的输入。

[root@localhost kafka_2.12-0.10.2.0]# echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

image.gifimage.gif

(2)在Console上启动一个消息生产者,将file-input的消息生产到Topic:streams-file-input。

[root@localhost kafka_2.12-0.10.2.0]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt &

image.gifimage.gif

(3)使用示例Demo:Word Count,分析处理消息,并将处理后的消息推送至Topic:streams-wordcount-output。

[root@localhost kafka_2.12-0.10.2.0]# ./bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

image.gifimage.gif

(4)在Console中启动一个消费者,验证消息的处理结果,输出为所有单词的计数列表。

[root@localhost kafka_2.12-0.10.2.0]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

image.gifimage.gif

(5)消费Topic:streams-wordcount-output中的消息,输出结果,这里计数是双倍,是因为我将消息的生产、处理分别运行了两次,这里是累计结果。

[root@localhost kafka_2.12-0.10.2.0]# all       2
lead    2
to      2
hello   2
streams 4
join    2
kafka   6
summit  2

image.gifimage.gif

Kafka Streams处理原理

   说明下Kafka Streams的处理过程,以上面的Word Count为例,首先看下图(图片来自Kafka官方网站),KTable为单词计数的实时状态,KStream为每次更新的单词,将更新状态输入到KTable,并推送至消息处理结果指定的Topic。

22134923_PUf2.png

   下图(图片来自Kafka官方网站)展示了每输入一句单词,已经存在的单词会累计计数,同时KTable会更新不存在的单词。

22134926_uLeU.png

本文属作者原创,转贴请声明!

相关文章
|
17天前
|
消息中间件 人工智能 安全
秒级灾备恢复:Kafka 2025 AI自愈集群下载及跨云Topic迁移终极教程
Apache Kafka 2025作为企业级实时数据中枢,实现五大革新:量子安全传输(CRYSTALS-Kyber抗量子加密算法)、联邦学习总线(支持TensorFlow Federated/Horizontal FL框架)、AI自愈集群(MTTR缩短至30秒内)、多模态数据处理(原生支持视频流、3D点云等)和跨云弹性扩展(AWS/GCP/Azure间自动迁移)。平台采用混合云基础设施矩阵与软件依赖拓扑设计,提供智能部署架构。安装流程涵盖抗量子安装包获取、量子密钥配置及联邦学习总线设置。
|
3月前
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
5月前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
125 4
|
5月前
|
存储 消息中间件 大数据
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
84 1
|
5月前
|
存储 消息中间件 大数据
大数据-68 Kafka 高级特性 物理存储 日志存储概述
大数据-68 Kafka 高级特性 物理存储 日志存储概述
59 1
|
5月前
|
数据采集 监控 Java
SpringBoot日志全方位超详细手把手教程,零基础可学习 日志如何配置及SLF4J的使用......
本文是关于SpringBoot日志的详细教程,涵盖日志的定义、用途、SLF4J框架的使用、日志级别、持久化、文件分割及格式配置等内容。
363 0
SpringBoot日志全方位超详细手把手教程,零基础可学习 日志如何配置及SLF4J的使用......
|
6月前
|
消息中间件 Kafka API
python之kafka日志
python之kafka日志
68 3
|
5月前
|
消息中间件 Kafka API
kafka使用教程
kafka使用教程
|
2月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
5月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
224 1

热门文章

最新文章