分布式消息队列Kafka之发布订阅消息系统

简介: 分布式消息队列Kafka之发布订阅消息系统

0x00 教程内容


  1. 启动Kafka
  2. 创建Topic
  3. 启动生产者与消费者
  4. 演示消息发布订阅

前提:

先安装好Zookeeper、Kakfa

版本是:

zookeeper-3.4.10kafka_2.11-1.0.0

参考教程:

D011 复制粘贴玩大数据之安装与配置Kafka集群

D003 复制粘贴玩大数据之安装与配置Zookeeper集群


0x01 启动Kafka


1. 启动Zookeeper

a. 启动Zookeeper(三台均需执行)

zkServer.sh start


2. 启动Kafka

a. 后台启动Kafka(三台均需执行)

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties


image.png


0x02 创建Topic


1. 创建Topic

a. 创建topic(3个副本、5个分区、名为:topic_sny

kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 5 --topic snytopic


image.png


2. 查看topic列表

a. 查看所有topic

kafka-topics.sh --zookeeper master:2181 --list

image.png


3. 查看topic详情信息

a. 查看topic详情信息(将--create换成--describe,然后去掉创建的参数)

kafka-topics.sh --describe --zookeeper master:2181 --topic snytopic


image.png


0x03 启动生产者与消费者


1. 启动生产者

a. 在master上执行:

kafka-console-producer.sh --broker-list master:9092 --topic snytopic


image.png


PS:启动之后,处于待输入状态(绿色小箭头)


2. 启动消费者

a. 在slave1上执行(也可以在其他节点执行):

kafka-console-consumer.sh --bootstrap-server master:9092 --topic snytopic --from-beginning


image.png

image.png


PS:启动之后,处于待订阅状态


0x04 演示消息发布订阅


1. 发送消息

a. 在生产者界面(即master)输入内容

hello,shaonaiyi!

image.png


2. 订阅消息

a. 会发现消费者界面(即salve1)会自动订阅到相应的内容

image.png


0xFF 总结


删除topic(如果kafka配置delete.topic.enable=true,那么可以直接删除topic,执行删除topic命令,否则只是标记删除,并没有删除数据,同时也不能往这个topic写入数据,想要彻底删除可以进入Zookeeper相对应的路径手动删除)

kafka-topics.sh --zookeeper master:2181 --delete --topic snytopic

zookeeper删除多级路径:rmr /brokers/topics/snytopic

zookeeper删除无子节点的节点路径:delete /brokers/topics/snytopic/0/state

思考题:

1、当前咱们是有三个节点,尝试创建一个有3个副本以上的topic看有什么效果?

kafka-topics.sh --create --zookeeper master:2181 --replication-factor 4 --partitions 5 --topic snytopic2

2、尝试将topic标为删除状态,然后重新发布订阅,观察有何效果?

3、如何恢复topic的状态?提示:(ls /admin/delete_topics)


相关文章
|
8月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
379 8
|
11月前
|
Kubernetes 大数据 调度
Airflow vs Argo Workflows:分布式任务调度系统的“华山论剑”
本文对比了Apache Airflow与Argo Workflows两大分布式任务调度系统。两者均支持复杂的DAG任务编排、社区支持及任务调度功能,且具备优秀的用户界面。Airflow以Python为核心语言,适合数据科学家使用,拥有丰富的Operator库和云服务集成能力;而Argo Workflows基于Kubernetes设计,支持YAML和Python双语定义工作流,具备轻量化、高性能并发调度的优势,并通过Kubernetes的RBAC机制实现多用户隔离。在大数据和AI场景中,Airflow擅长结合云厂商服务,Argo则更适配Kubernetes生态下的深度集成。
1201 34
|
7月前
|
存储 算法 安全
“卧槽,系统又崩了!”——别慌,这也许是你看过最通俗易懂的分布式入门
本文深入解析分布式系统核心机制:数据分片与冗余副本实现扩展与高可用,租约、多数派及Gossip协议保障一致性与容错。探讨节点故障、网络延迟等挑战,揭示CFT/BFT容错原理,剖析规模与性能关系,为构建可靠分布式系统提供理论支撑。
331 2
|
7月前
|
机器学习/深度学习 算法 安全
新型电力系统下多分布式电源接入配电网承载力评估方法研究(Matlab代码实现)
新型电力系统下多分布式电源接入配电网承载力评估方法研究(Matlab代码实现)
228 3
|
9月前
|
数据采集 缓存 NoSQL
分布式新闻数据采集系统的同步效率优化实战
本文介绍了一个针对高频新闻站点的分布式爬虫系统优化方案。通过引入异步任务机制、本地缓存池、Redis pipeline 批量写入及身份池策略,系统采集效率提升近两倍,数据同步延迟显著降低,实现了分钟级热点追踪能力,为实时舆情监控与分析提供了高效、稳定的数据支持。
363 1
分布式新闻数据采集系统的同步效率优化实战
|
存储 运维 安全
盘古分布式存储系统的稳定性实践
本文介绍了阿里云飞天盘古分布式存储系统的稳定性实践。盘古作为阿里云的核心组件,支撑了阿里巴巴集团的众多业务,确保数据高可靠性、系统高可用性和安全生产运维是其关键目标。文章详细探讨了数据不丢不错、系统高可用性的实现方法,以及通过故障演练、自动化发布和健康检查等手段保障生产安全。总结指出,稳定性是一项系统工程,需要持续迭代演进,盘古经过十年以上的线上锤炼,积累了丰富的实践经验。
1181 7
|
存储 分布式计算 Hadoop
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
492 7
|
存储 运维 负载均衡
构建高可用性GraphRAG系统:分布式部署与容错机制
【10月更文挑战第28天】作为一名数据科学家和系统架构师,我在构建和维护大规模分布式系统方面有着丰富的经验。最近,我负责了一个基于GraphRAG(Graph Retrieval-Augmented Generation)模型的项目,该模型用于构建一个高可用性的问答系统。在这个过程中,我深刻体会到分布式部署和容错机制的重要性。本文将详细介绍如何在生产环境中构建一个高可用性的GraphRAG系统,包括分布式部署方案、负载均衡、故障检测与恢复机制等方面的内容。
791 4
构建高可用性GraphRAG系统:分布式部署与容错机制
|
机器学习/深度学习 存储 运维
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
825 4
|
机器学习/深度学习 人工智能 分布式计算
【AI系统】分布式通信与 NVLink
进入大模型时代后,AI的核心转向大模型发展,训练这类模型需克服大量GPU资源及长时间的需求。面对单个GPU内存限制,跨多个GPU的分布式训练成为必要,这涉及到分布式通信和NVLink技术的应用。分布式通信允许多个节点协作完成任务,而NVLink则是一种高速、低延迟的通信技术,用于连接GPU或GPU与其它设备,以实现高性能计算。随着大模型的参数、数据规模扩大及算力需求增长,分布式并行策略,如数据并行和模型并行,变得至关重要。这些策略通过将模型或数据分割在多个GPU上处理,提高了训练效率。此外,NVLink和NVSwitch技术的持续演进,为GPU间的高效通信提供了更强的支持,推动了大模型训练的快
585 0

相关产品

  • 云消息队列 Kafka 版