RabbitMQ 核心部分之简单模式和工作模式

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【1月更文挑战第9天】1.消息属性RabbitMQ是基于AMQP消息传输协议来实现的消息中间件;类似HTTP有header和body两部分数据,Message是RabbitMQ中的消息体概念。Message由Properties和Body组成,前者是一些元信息,如消息的优先级、持久化、传输格式(如JSON)、延迟等高级特性,Body则是传递的消息数据实体2.消息投递Exchange、Queue与Routing Key三个概念是理解RabbitMQ消息投递的关键。RabbitMQ中一个核心的原则是,消息不能直接投递到Queue中。Producer只能将自己的消息投递到Exc

文章目录

前言

一、Hello World(简单)模式

1.导入依赖

2.消息生产者

3.消息消费者

二、Work Queues(工作)模式

1.抽取工具类

2.启动两个工作线程

3.启动一个发送线程

4.结果

总结


前言

1.消息属性

RabbitMQ是基于AMQP消息传输协议来实现的消息中间件;类似HTTP有header和body两部分数据,Message是RabbitMQ中的消息体概念。

Message由Properties和Body组成,前者是一些元信息,如消息的优先级、持久化、传输格式(如JSON)、延迟等高级特性,Body则是传递的消息数据实体

2.消息投递

Exchange、Queue与Routing Key三个概念是理解RabbitMQ消息投递的关键。RabbitMQ中一个核心的原则是,消息不能直接投递到Queue中。

Producer只能将自己的消息投递到Exchange中,由Exchange按照routing_key投递到对应的Queue中。

3.消息可靠性

不同于HTTP的同步访问,RabbitMQ中,Producer并不知道消息是否被可靠地投递到了Consumer中处理。那么,RabbitMQ是如何保证消息的可靠投递?

主要是两点:第一,消息确认机制。Consumer处理完消息后,需要发送确认消息给Broker Server,可以选择“确认接收”、“丢弃”、“重新投递”三种方式。如果Consumer在Broker Server收到确认消息之前挂了,Broker Server便会重新投递该消息。

第二,可以选择数据持久化,这样即使RabbitMQ重启,也不会丢失消息

一、Hello World(简单)模式

在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代

表使用者保留的消息缓冲区

博主这里使用JAVA实现。

1.导入依赖

<!--指定 jdk 编译版本--><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build><dependencies><!--rabbitmq 依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的一个依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency></dependencies>

2.消息生产者

publicclassProducer {
privatefinalstaticStringQUEUE_NAME="hello";
publicstaticvoidmain(String[] args) throwsException {
//创建一个连接工厂ConnectionFactoryfactory=newConnectionFactory();
factory.setHost("192.168.10.130");
factory.setUsername("guest");
factory.setPassword("guest");
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭try(Connectionconnection=factory.newConnection();Channelchannel=connection.createChannel()) {
/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Stringmessage="hello world";
/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
        }
    }
}

3.消息消费者

publicclassConsumer {
privatefinalstaticStringQUEUE_NAME="hello";
publicstaticvoidmain(String[] args) throwsException {
ConnectionFactoryfactory=newConnectionFactory();
factory.setHost("192.168.10.130");
factory.setUsername("guest");
factory.setPassword("guest");
Connectionconnection=factory.newConnection();
Channelchannel=connection.createChannel();
System.out.println("等待接收消息....");
//推送的消息如何进行消费的接口回调DeliverCallbackdeliverCallback=(consumerTag,delivery)->{
Stringmessage=newString(delivery.getBody());
System.out.println(message);
        };
//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallbackcancelCallback=(consumerTag)->{
System.out.println("消息消费被中断");
        };
/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
 }

二、Work Queues(工作)模式

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。

相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进

程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

1.抽取工具类

publicclassRabbitMqUtils {
//得到一个连接的 channelpublicstaticChannelgetChannel() throwsException{
//创建一个连接工厂ConnectionFactoryfactory=newConnectionFactory();
factory.setHost("192.168.10.130");
factory.setUsername("guest");
factory.setPassword("guest");
Connectionconnection=factory.newConnection();
Channelchannel=connection.createChannel();
returnchannel;
    }
}

2.启动两个工作线程

publicclassWorker01 {
privatestaticfinalStringQUEUE_NAME="hello";
publicstaticvoidmain(String[] args) throwsException {
Channelchannel=RabbitMqUtils.getChannel();
DeliverCallbackdeliverCallback=(consumerTag,delivery)->{
StringreceivedMessage=newString(delivery.getBody());
System.out.println("接收到消息:"+receivedMessage);
        };
CancelCallbackcancelCallback=(consumerTag)->{
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
System.out.println("C2 消费者启动等待消费......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

3.启动一个发送线程

publicclassTask01 {
privatestaticfinalStringQUEUE_NAME="hello";
publicstaticvoidmain(String[] args) throwsException {
try(Channelchannel=RabbitMqUtils.getChannel();) {
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//从控制台当中接受信息Scannerscanner=newScanner(System.in);
while (scanner.hasNext()){
Stringmessage=scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息完成:"+message);
            }
        }
    }
}

4.结果

通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且

是按照有序的一个接收一次消息


总结

以上就是RabbitMQ 核心部分之简单模式和工作模式的相关知识,希望对你有所帮助。

积跬步以至千里,积怠惰以至深渊。时代在这跟着你一起努力哦!

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 存储 负载均衡
Rabbitmq direct模式保证一个队列只对应一个消费者
Rabbitmq direct模式保证一个队列只对应一个消费者
115 0
|
6月前
|
消息中间件 存储 网络协议
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
|
5月前
|
Java Maven
SpringBoot集成RabbitMQ-三种模式的实现
SpringBoot集成RabbitMQ-三种模式的实现
93 0
|
4月前
|
物联网 Go 网络性能优化
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式【1月更文挑战第21天】【1月更文挑战第104篇】
113 1
|
4月前
|
消息中间件 Java
Java操作RabbitMQ单一生产-消费者模式
Java操作RabbitMQ单一生产-消费者模式
32 0
|
4月前
|
消息中间件 存储 数据安全/隐私保护
深入学习RabbitMQ五种模式(一)
深入学习RabbitMQ五种模式(一)
40 0
|
5月前
|
消息中间件 中间件 Kafka
RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
83 1
|
2月前
|
消息中间件 Java 调度
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
14 1
|
2月前
|
消息中间件 Java RocketMQ
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
12 1
|
2月前
|
消息中间件 存储 NoSQL
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
27 1

相关产品

  • 云消息队列 MQ