文章目录:
1.写在前面
所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
上面是MQ的基本抽象模型,但是不同的MQ产品有有者不同的机制,RabbitMQ实际基于AMQP协议的一个开源实现,因此RabbitMQ内部也是AMQP的基本概念。
RabbitMQ的内部接收如下:
1、Message
消息,消息是不具体的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。2、Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。3、Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。4、Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。5、Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。6、Connection
网络连接,比如一个TCP连接。7、Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。8、Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。9、Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。10、Broker
表示消息队列服务器实体。
2.案例详解
上面说了RabbitMQ中的消息发送和消息接收机制,下面我用一个案例来讲解一下。
首先,我们需要创建两个maven-java工程,一个表示消息发送者,另一个表示消息接收者。
在这两个项目的pom文件中加入下面的依赖:👇👇👇
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency>
2.1 编写消息发送类
package com.szh.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * */ public class Send { public static void main(String[] args) { //创建连接工厂 ConnectionFactory factory=new ConnectionFactory(); //配置RabbitMQ的连接相关信息 factory.setHost("192.168.40.130"); //ip factory.setPort(5672); //端口号 factory.setUsername("root"); //用户名 factory.setPassword("root"); //密码 Connection connection=null; //定义连接 Channel channel=null;//定义通道 try { connection=factory.newConnection(); //获取连接 channel=connection.createChannel(); //获取通道 /** * 声明一个队列 * 参数1:队列名称取值 任意的 * 参数2:是否为持久化队列 * 参数3:是否排外,如果排外,则这个队列只允许一个消费者监听 * 参数4:是否自动删除队列,如果为true表示当队列中没有消息,也没有消费者连接时,会自动删除这个队列 * 参数5:队列的其他属性,通常设置为null即可 * 注意: * 1) 声明队列时,这个队列名称如果存在,则放弃声明;如果不存在,则会声明一个新的队列 * 2) 队列名可以任意设置,但是要与消息接收时的队列名一致 * 3) 这行代码可有可无,但是一定要在发送消息前确认队列名已经存在于RabbitMQ中,否则会出现问题 */ channel.queueDeclare("myQueue",true,false,false,null); String message="RabbitMQ测试发送消息"; //定义需要发送的消息 /** * 发送消息到MQ * 参数1:交换机名称,这里为空是因为不使用交换机 * 参数2:如果不指定交换机,这个值就是队列名称;如果指定了交换机,这个值就是RoutingKey * 参数3:消息属性信息,null即可 * 参数4:具体的消息数据内容、字符集格式 */ channel.basicPublish("","myQueue",null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("消息发送成功:" + message); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
2.2 编写消息接收类
package com.szh.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * */ public class Receive { public static void main(String[] args) { //创建连接工厂 ConnectionFactory factory=new ConnectionFactory(); //配置RabbitMQ的连接相关信息 factory.setHost("192.168.40.130"); //ip factory.setPort(5672); //端口号 factory.setUsername("root"); //用户名 factory.setPassword("root"); //密码 Connection connection=null; //定义连接 Channel channel=null;//定义通道 try { connection=factory.newConnection(); //获取连接 channel=connection.createChannel(); //获取通道 channel.queueDeclare("myQueue",true,false,false,null); /** * 接收消息 * 参数1:当前消费者需要监听的队列,该队列名必须要与发送时的队列名一致,否则接收不到消息 * 参数2:消息是否自动确认,true表示自动确认,接收完消息后会自动将消息从队列中移除;false则相反 * 参数3:消费者的标签,用于当多个消费者同时监听一个队列时,来确认不同的消费者,通常为空字符串即可 * 参数4:消费者加收的回调方法,这个方法中具体完成对消息的处理 * 注意:使用了basicConsume方法以后,会启动一个线程在持续监听队列,如果队列中有消息,则会自动接收消息,因此不能关闭连接和通道对象 */ channel.basicConsume("myQueue",true,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //获取消息数据 String message=new String(body,"utf-8"); System.out.println("消息接收成功:" + message); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
注意:
1、Queue的消息只能被同一个消费者消费,如果没有消费监听队列那么消息会存放到队列中持久化保存,直到有消费者来消费这个消息,如果以有消费者监听队列则立即消费发送到队列中的消息
2、Queue的消息可以保证每个消息都一定能被消费
2.3 测试结果1
首先确保你的RabbitMQ是开启状态,在Linux中执行 ps -ef | grep rabbitmq 命令即可查看,如果没启动,则执行 rabbitmq-server start & 命令即可。之后执行 rabbitmqctl add_user root root 添加一个root用户,再执行 rabbitmqctl set_permissions -p / root '.*' '.*' '.*' 用于设置root用户拥有对所有资源的读写配置权限。
然后在浏览器中输入 http:// RabbitMQ服务器ip:15672,访问登录(用户名root,密码root),之后执行消息发送类。
执行完消息发送类之后,等待大概5秒,可以在RabbitMQ的管理界面看到队列中接收到了一条数据。
这里不使用消息接收类也可以将数据取出。
2.4 测试结果2
先执行消息发送类。
之后再执行消息接收类。
而在RabbitMQ的管理界面中可以看到的是,执行完消息发送类之后,队列中多了一条数据,此时再执行消息接收类,那么队列中的数据就会被取出。
2.5 测试结果3
首先执行消息接收类,确保消息的接收者一直处于监听消息队列的状态,然后我们依次执行消息发送类,看看会出现什么结果。
可以看到的是,当消息接收者一直处于监听消息队列的时候,当我们的消息发送者每发送一个数据,消息接收者它就会知道,然后就取走消息队列中的数据。