分布式消息队列Kafka之发布订阅消息系统

简介: 分布式消息队列Kafka之发布订阅消息系统

0x00 教程内容


  1. 启动Kafka
  2. 创建Topic
  3. 启动生产者与消费者
  4. 演示消息发布订阅

前提:

先安装好Zookeeper、Kakfa

版本是:

zookeeper-3.4.10kafka_2.11-1.0.0

参考教程:

D011 复制粘贴玩大数据之安装与配置Kafka集群

D003 复制粘贴玩大数据之安装与配置Zookeeper集群


0x01 启动Kafka


1. 启动Zookeeper

a. 启动Zookeeper(三台均需执行)

zkServer.sh start


2. 启动Kafka

a. 后台启动Kafka(三台均需执行)

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties


image.png


0x02 创建Topic


1. 创建Topic

a. 创建topic(3个副本、5个分区、名为:topic_sny

kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 5 --topic snytopic


image.png


2. 查看topic列表

a. 查看所有topic

kafka-topics.sh --zookeeper master:2181 --list

image.png


3. 查看topic详情信息

a. 查看topic详情信息(将--create换成--describe,然后去掉创建的参数)

kafka-topics.sh --describe --zookeeper master:2181 --topic snytopic


image.png


0x03 启动生产者与消费者


1. 启动生产者

a. 在master上执行:

kafka-console-producer.sh --broker-list master:9092 --topic snytopic


image.png


PS:启动之后,处于待输入状态(绿色小箭头)


2. 启动消费者

a. 在slave1上执行(也可以在其他节点执行):

kafka-console-consumer.sh --bootstrap-server master:9092 --topic snytopic --from-beginning


image.png

image.png


PS:启动之后,处于待订阅状态


0x04 演示消息发布订阅


1. 发送消息

a. 在生产者界面(即master)输入内容

hello,shaonaiyi!

image.png


2. 订阅消息

a. 会发现消费者界面(即salve1)会自动订阅到相应的内容

image.png


0xFF 总结


删除topic(如果kafka配置delete.topic.enable=true,那么可以直接删除topic,执行删除topic命令,否则只是标记删除,并没有删除数据,同时也不能往这个topic写入数据,想要彻底删除可以进入Zookeeper相对应的路径手动删除)

kafka-topics.sh --zookeeper master:2181 --delete --topic snytopic

zookeeper删除多级路径:rmr /brokers/topics/snytopic

zookeeper删除无子节点的节点路径:delete /brokers/topics/snytopic/0/state

思考题:

1、当前咱们是有三个节点,尝试创建一个有3个副本以上的topic看有什么效果?

kafka-topics.sh --create --zookeeper master:2181 --replication-factor 4 --partitions 5 --topic snytopic2

2、尝试将topic标为删除状态,然后重新发布订阅,观察有何效果?

3、如何恢复topic的状态?提示:(ls /admin/delete_topics)


相关文章
|
21天前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
48 7
|
17天前
|
存储 运维 负载均衡
构建高可用性GraphRAG系统:分布式部署与容错机制
【10月更文挑战第28天】作为一名数据科学家和系统架构师,我在构建和维护大规模分布式系统方面有着丰富的经验。最近,我负责了一个基于GraphRAG(Graph Retrieval-Augmented Generation)模型的项目,该模型用于构建一个高可用性的问答系统。在这个过程中,我深刻体会到分布式部署和容错机制的重要性。本文将详细介绍如何在生产环境中构建一个高可用性的GraphRAG系统,包括分布式部署方案、负载均衡、故障检测与恢复机制等方面的内容。
69 4
构建高可用性GraphRAG系统:分布式部署与容错机制
|
21天前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
44 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
1月前
|
消息中间件 中间件 数据库
NServiceBus:打造企业级服务总线的利器——深度解析这一面向消息中间件如何革新分布式应用开发与提升系统可靠性
【10月更文挑战第9天】NServiceBus 是一个面向消息的中间件,专为构建分布式应用程序设计,特别适用于企业级服务总线(ESB)。它通过消息队列实现服务间的解耦,提高系统的可扩展性和容错性。在 .NET 生态中,NServiceBus 提供了强大的功能,支持多种传输方式如 RabbitMQ 和 Azure Service Bus。通过异步消息传递模式,各组件可以独立运作,即使某部分出现故障也不会影响整体系统。 示例代码展示了如何使用 NServiceBus 发送和接收消息,简化了系统的设计和维护。
48 3
|
1月前
|
消息中间件 大数据 Kafka
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
28 2
|
1月前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
34 1
|
19天前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
1月前
|
存储 开发框架 .NET
C#语言如何搭建分布式文件存储系统
C#语言如何搭建分布式文件存储系统
70 2
|
1月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现?
消息队列系统中的确认机制在分布式系统中如何实现?
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

相关产品

  • 云消息队列 Kafka 版