分布式消息队列Kafka之发布订阅消息系统

简介: 分布式消息队列Kafka之发布订阅消息系统

0x00 教程内容


  1. 启动Kafka
  2. 创建Topic
  3. 启动生产者与消费者
  4. 演示消息发布订阅

前提:

先安装好Zookeeper、Kakfa

版本是:

zookeeper-3.4.10kafka_2.11-1.0.0

参考教程:

D011 复制粘贴玩大数据之安装与配置Kafka集群

D003 复制粘贴玩大数据之安装与配置Zookeeper集群


0x01 启动Kafka


1. 启动Zookeeper

a. 启动Zookeeper(三台均需执行)

zkServer.sh start


2. 启动Kafka

a. 后台启动Kafka(三台均需执行)

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties


image.png


0x02 创建Topic


1. 创建Topic

a. 创建topic(3个副本、5个分区、名为:topic_sny

kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 5 --topic snytopic


image.png


2. 查看topic列表

a. 查看所有topic

kafka-topics.sh --zookeeper master:2181 --list

image.png


3. 查看topic详情信息

a. 查看topic详情信息(将--create换成--describe,然后去掉创建的参数)

kafka-topics.sh --describe --zookeeper master:2181 --topic snytopic


image.png


0x03 启动生产者与消费者


1. 启动生产者

a. 在master上执行:

kafka-console-producer.sh --broker-list master:9092 --topic snytopic


image.png


PS:启动之后,处于待输入状态(绿色小箭头)


2. 启动消费者

a. 在slave1上执行(也可以在其他节点执行):

kafka-console-consumer.sh --bootstrap-server master:9092 --topic snytopic --from-beginning


image.png

image.png


PS:启动之后,处于待订阅状态


0x04 演示消息发布订阅


1. 发送消息

a. 在生产者界面(即master)输入内容

hello,shaonaiyi!

image.png


2. 订阅消息

a. 会发现消费者界面(即salve1)会自动订阅到相应的内容

image.png


0xFF 总结


删除topic(如果kafka配置delete.topic.enable=true,那么可以直接删除topic,执行删除topic命令,否则只是标记删除,并没有删除数据,同时也不能往这个topic写入数据,想要彻底删除可以进入Zookeeper相对应的路径手动删除)

kafka-topics.sh --zookeeper master:2181 --delete --topic snytopic

zookeeper删除多级路径:rmr /brokers/topics/snytopic

zookeeper删除无子节点的节点路径:delete /brokers/topics/snytopic/0/state

思考题:

1、当前咱们是有三个节点,尝试创建一个有3个副本以上的topic看有什么效果?

kafka-topics.sh --create --zookeeper master:2181 --replication-factor 4 --partitions 5 --topic snytopic2

2、尝试将topic标为删除状态,然后重新发布订阅,观察有何效果?

3、如何恢复topic的状态?提示:(ls /admin/delete_topics)


相关文章
|
5月前
|
Kubernetes 大数据 调度
Airflow vs Argo Workflows:分布式任务调度系统的“华山论剑”
本文对比了Apache Airflow与Argo Workflows两大分布式任务调度系统。两者均支持复杂的DAG任务编排、社区支持及任务调度功能,且具备优秀的用户界面。Airflow以Python为核心语言,适合数据科学家使用,拥有丰富的Operator库和云服务集成能力;而Argo Workflows基于Kubernetes设计,支持YAML和Python双语定义工作流,具备轻量化、高性能并发调度的优势,并通过Kubernetes的RBAC机制实现多用户隔离。在大数据和AI场景中,Airflow擅长结合云厂商服务,Argo则更适配Kubernetes生态下的深度集成。
653 34
|
25天前
|
存储 算法 安全
“卧槽,系统又崩了!”——别慌,这也许是你看过最通俗易懂的分布式入门
本文深入解析分布式系统核心机制:数据分片与冗余副本实现扩展与高可用,租约、多数派及Gossip协议保障一致性与容错。探讨节点故障、网络延迟等挑战,揭示CFT/BFT容错原理,剖析规模与性能关系,为构建可靠分布式系统提供理论支撑。
157 2
|
1月前
|
机器学习/深度学习 算法 安全
新型电力系统下多分布式电源接入配电网承载力评估方法研究(Matlab代码实现)
新型电力系统下多分布式电源接入配电网承载力评估方法研究(Matlab代码实现)
|
3月前
|
数据采集 缓存 NoSQL
分布式新闻数据采集系统的同步效率优化实战
本文介绍了一个针对高频新闻站点的分布式爬虫系统优化方案。通过引入异步任务机制、本地缓存池、Redis pipeline 批量写入及身份池策略,系统采集效率提升近两倍,数据同步延迟显著降低,实现了分钟级热点追踪能力,为实时舆情监控与分析提供了高效、稳定的数据支持。
分布式新闻数据采集系统的同步效率优化实战
|
9月前
|
存储 运维 安全
盘古分布式存储系统的稳定性实践
本文介绍了阿里云飞天盘古分布式存储系统的稳定性实践。盘古作为阿里云的核心组件,支撑了阿里巴巴集团的众多业务,确保数据高可靠性、系统高可用性和安全生产运维是其关键目标。文章详细探讨了数据不丢不错、系统高可用性的实现方法,以及通过故障演练、自动化发布和健康检查等手段保障生产安全。总结指出,稳定性是一项系统工程,需要持续迭代演进,盘古经过十年以上的线上锤炼,积累了丰富的实践经验。
680 7
|
9月前
|
存储 分布式计算 Hadoop
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
293 7
|
10月前
|
机器学习/深度学习 存储 运维
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
569 4
|
9月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
12月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
417 1
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
283 1

相关产品

  • 云消息队列 Kafka 版
  • 下一篇
    oss教程