RabbitMQ 核心部分之简单模式和工作模式

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 【1月更文挑战第9天】1.消息属性RabbitMQ是基于AMQP消息传输协议来实现的消息中间件;类似HTTP有header和body两部分数据,Message是RabbitMQ中的消息体概念。Message由Properties和Body组成,前者是一些元信息,如消息的优先级、持久化、传输格式(如JSON)、延迟等高级特性,Body则是传递的消息数据实体2.消息投递Exchange、Queue与Routing Key三个概念是理解RabbitMQ消息投递的关键。RabbitMQ中一个核心的原则是,消息不能直接投递到Queue中。Producer只能将自己的消息投递到Exc

文章目录

前言

一、Hello World(简单)模式

1.导入依赖

2.消息生产者

3.消息消费者

二、Work Queues(工作)模式

1.抽取工具类

2.启动两个工作线程

3.启动一个发送线程

4.结果

总结


前言

1.消息属性

RabbitMQ是基于AMQP消息传输协议来实现的消息中间件;类似HTTP有header和body两部分数据,Message是RabbitMQ中的消息体概念。

Message由Properties和Body组成,前者是一些元信息,如消息的优先级、持久化、传输格式(如JSON)、延迟等高级特性,Body则是传递的消息数据实体

2.消息投递

Exchange、Queue与Routing Key三个概念是理解RabbitMQ消息投递的关键。RabbitMQ中一个核心的原则是,消息不能直接投递到Queue中。

Producer只能将自己的消息投递到Exchange中,由Exchange按照routing_key投递到对应的Queue中。

3.消息可靠性

不同于HTTP的同步访问,RabbitMQ中,Producer并不知道消息是否被可靠地投递到了Consumer中处理。那么,RabbitMQ是如何保证消息的可靠投递?

主要是两点:第一,消息确认机制。Consumer处理完消息后,需要发送确认消息给Broker Server,可以选择“确认接收”、“丢弃”、“重新投递”三种方式。如果Consumer在Broker Server收到确认消息之前挂了,Broker Server便会重新投递该消息。

第二,可以选择数据持久化,这样即使RabbitMQ重启,也不会丢失消息

一、Hello World(简单)模式

在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代

表使用者保留的消息缓冲区

博主这里使用JAVA实现。

1.导入依赖

<!--指定 jdk 编译版本--><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build><dependencies><!--rabbitmq 依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的一个依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency></dependencies>

2.消息生产者

publicclassProducer {
privatefinalstaticStringQUEUE_NAME="hello";
publicstaticvoidmain(String[] args) throwsException {
//创建一个连接工厂ConnectionFactoryfactory=newConnectionFactory();
factory.setHost("192.168.10.130");
factory.setUsername("guest");
factory.setPassword("guest");
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭try(Connectionconnection=factory.newConnection();Channelchannel=connection.createChannel()) {
/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Stringmessage="hello world";
/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
        }
    }
}

3.消息消费者

publicclassConsumer {
privatefinalstaticStringQUEUE_NAME="hello";
publicstaticvoidmain(String[] args) throwsException {
ConnectionFactoryfactory=newConnectionFactory();
factory.setHost("192.168.10.130");
factory.setUsername("guest");
factory.setPassword("guest");
Connectionconnection=factory.newConnection();
Channelchannel=connection.createChannel();
System.out.println("等待接收消息....");
//推送的消息如何进行消费的接口回调DeliverCallbackdeliverCallback=(consumerTag,delivery)->{
Stringmessage=newString(delivery.getBody());
System.out.println(message);
        };
//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallbackcancelCallback=(consumerTag)->{
System.out.println("消息消费被中断");
        };
/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
 }

二、Work Queues(工作)模式

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。

相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进

程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

1.抽取工具类

publicclassRabbitMqUtils {
//得到一个连接的 channelpublicstaticChannelgetChannel() throwsException{
//创建一个连接工厂ConnectionFactoryfactory=newConnectionFactory();
factory.setHost("192.168.10.130");
factory.setUsername("guest");
factory.setPassword("guest");
Connectionconnection=factory.newConnection();
Channelchannel=connection.createChannel();
returnchannel;
    }
}

2.启动两个工作线程

publicclassWorker01 {
privatestaticfinalStringQUEUE_NAME="hello";
publicstaticvoidmain(String[] args) throwsException {
Channelchannel=RabbitMqUtils.getChannel();
DeliverCallbackdeliverCallback=(consumerTag,delivery)->{
StringreceivedMessage=newString(delivery.getBody());
System.out.println("接收到消息:"+receivedMessage);
        };
CancelCallbackcancelCallback=(consumerTag)->{
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
System.out.println("C2 消费者启动等待消费......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

3.启动一个发送线程

publicclassTask01 {
privatestaticfinalStringQUEUE_NAME="hello";
publicstaticvoidmain(String[] args) throwsException {
try(Channelchannel=RabbitMqUtils.getChannel();) {
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//从控制台当中接受信息Scannerscanner=newScanner(System.in);
while (scanner.hasNext()){
Stringmessage=scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息完成:"+message);
            }
        }
    }
}

4.结果

通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且

是按照有序的一个接收一次消息


总结

以上就是RabbitMQ 核心部分之简单模式和工作模式的相关知识,希望对你有所帮助。

积跬步以至千里,积怠惰以至深渊。时代在这跟着你一起努力哦!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
8天前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
40 3
|
5月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
87 2
|
5月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
88 1
|
5月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
6月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
6月前
|
消息中间件 存储 Kafka
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
|
5月前
|
消息中间件 Java Maven
RabbitMQ通配符模式
RabbitMQ通配符模式
76 0
|
6月前
|
消息中间件 Java Apache
消息队列 MQ使用问题之如何在内外网环境下使用单组节点单副本模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 负载均衡 RocketMQ
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决

相关产品

  • 云消息队列 MQ