RabbitMQ 常用 API
Connection 和 Channel 的创建、关闭
- 创建 Connection
ConnectionFactory factory = new ConnectionFactory(); // 方式1:通过设置参数创建 factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); // 方式2:通过uri创建 factory.setUri("amqp:/userName:password@ipAddress:portNumber/virtualHost"); Connection connection = factory.newConnection();
- 创建 Channel
Channel channel = connection.createChannel();
- Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程间共享,应用程序应该为每一个线程开辟一个Channel。某些情况下 Channel 的操作可以并发运行,但是在其他情况下会导致在网络,上出现错误的通信帧交错,同时也会影响发送方确认(publisherconfirm)机制的运行,所以多线程间共享 Channel 实例是非线程安全的。
通常情况下,在调用 createXXX 或者 newXXX 方法之后,可以简单地认为 Connection 或者 Channel 已经成功地处于开启状态,而并不会在代码中使用 isOpen 这个检测方法。如果在使用 Channel 的时候其已经处于关闭状态,那么程序会抛出一个 com.rabbitmq.client.ShutdownSignalException,只需捕获这个异常即可。当然同时也要试着捕获 IOException 或者 SocketException,以防 Connection 意外关闭。 - 关闭连接
// 关闭Connection connection.close(); // 关闭Channel channel.close();
- AMQP协议中的 Connection 和 Channel 采用同样的方式来管理网络失败、内部错误和显式地关闭连接。Connection 和Channel 所具备的生命周期如下:
- Open:开启状态,代表当前对象可以使用
- Closing:正在关闭状态。当前对象被显式地通知调用关闭方法(shutdown),这样就产生了一个关闭请求让其内部对象进行相应的操作,并等待这些关闭操作的完成
- Closed:已经关闭状态。当前对象已经接收到所有的内部对象已完成关闭动作的通知,并且其也关闭了自身。
- Connection 和 Channel 最终都是会成为 Closed 的状态,不论是程序正常调用的关闭方法,或者是客户端的异常,再或者是发生了网络异常。
拓展了解:
在 Connection 和 Channel 中,与关闭相关的方法有:
addShutdownListener(ShutdownListener listener) removeShutdownListener(ShutdownListner listener)
- 当 Connection 或者 Channel 的状态转变为 Closed 的时候会调用 ShutdownListener。而且如果将一个 ShutdownListener 注册到一个已经处于 Closed 状态的对象(这里特指 Connection 和 Channel 对象)时,会立刻调用 ShutdownListener
- getCloseReason 方法:可以知道对象关闭的原因
- isOpen 方法:检测对象当前是否处于开启状态
- close (int closeCode, String closeMessage) 方法:显式地通知当前对象执行关闭操作
connection.addShutdownListener(new ShutdownListener() { @Override public void shutdownCompleted(ShutdownSignalException cause) { //... System.out.println("shut down"); } });
- 触发 ShutdownListener 的时候,就可以获取到 ShutdownSignalException,这个 ShutdownSignalException 包含了关闭的原因,这里原因也可以通过调用前面所提及的 getCloseReason 方法获取。
- ShutdownSignalException:提供了多个方法来分析关闭的原因
- isHardError 方法:可以知道是 Connection 的还是 Channel 的错误
- getReason 方法:可以获取cause相关的信息
connection.addShutdownListener(new ShutdownListener() { @Override public void shutdownCompleted(ShutdownSignalException cause) { if (cause.isHardError()) { Connection conn = (Connection) cause.getReference(); if (!cause.isInitiatedByApplication()) { Method reason = cause.getReason(); //... } else { Channel ch = (Channel)cause.getReference(); //... } } } });
Channel 常用 API
交换器的创建、检测、删除
- 创建交换机
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException; // 常用 Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; // 不需要服务器任何返回值的创建交换机 void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException
- 返回值:Exchange . Declare0K,用来标识成功声明了一个交换器。
- 入参:
- exchange:交换器的名称
- type:交换器的类型,常见的如 fanout、direct、 topic
- durable:设置是否持久化。为 true 表示持久化,反之是非持久化。
持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。 - autoDelete:设置是否自动删除。true 则表示自动删除。
自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。
注意:不能错误地把这个参数理解为:“当与此交换器连接的客户端都断开时,RabbitMQ 会自动删除本交换器”。 - internal:设置是否是内置的。
为 true 则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。 - argument:其他一些结构化参数,比如 alternate-exchange。
- 不需要服务器任何返回值的创建交换机这个 nowait 指的是 AMQP 中 Exchange.Declare 命令的参数,意思是不需要服务器返回注意:
- 这个方法的返回值是 void,而普通的 exchangeDeclare 方法的返回值是 Exchange.DeclareOk,意思是在客户端声明了一个交换器之后,需要等待服务器的返回(服务器会返回 Exchange. Declare-Ok 这个 AMQP 命令)
- 在声明完一个交换器之后(实际服务器还并未完成交换器的创建),那么此时客户端紧接着使用这个交换器,必然会发生异常。如果没有特殊的缘由和应用场景,并不建议使用这个方法。
- 检测交换器是否存在
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
- 这个方法在实际应用过程中还是非常有用的,它主要用来检测相应的交换器是否存在。
如果存在则正常返回;如果不存在则抛出异常:404 channel exception
,同时Channel也会被关闭。 - 删除交换器
Exchange.DeleteOk exchangeDelete(String exchange) throws IOException; Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException; void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
- exchange:表示交换器的名称
- ifUnused:用来设置是否在交换器没有被使用的情况下删除。
为 true,则只有在此交换器没有被使用的情况下才会被删除;为 false,则无论如何这个交换器都要被删除。
队列的创建、检测、删除、清空
- 创建队列
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; // 默认创建一个由RabbitMQ命名的(类似这种amq.gen-LhQz1gv3GhDOv8PIDabOXA名称,这种队列也称之为匿名队列)、排他的、自动删除的、非持久化的队列。 Queue.DeclareOk queueDeclare() throws IOException; void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
- queue: 队列的名称。如果队列不存在,自动创建
- durable: 设置是否持久化。为 true 则设置队列为持久化。
持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。 - exclusive: 设置是否排他。为 true 则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。注意:
- 排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列
- “首次”是指如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;
- 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
- autoDelete: 设置是否自动删除。为 true 则设置队列为自动删除。
自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这个队列自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。 - arguments:设置队列的其他一些参数,如 x-message=ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority等。
- **注意:**生产者和消费者都能够使用 queueDeclare 来声明一个队列,**但是如果消费者在同一个信道上订阅了另一个队列,**就无法再声明队列了。必须先取消订阅,然后将信道置为“传输”模式,之后才能声明队列。
- 检测队列
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
- 存在,返回 Queue.DeclareOk
- 不存在,抛出异常
- 删除队列
Queue.DeleteOk queueDelete(String queue) throws IOException; Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException; void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
- 清空队列
Queue.PurgeOk queuePurge(String queue) throws IOException;
- 清空队列的内容,而不删除队列本身
队列和交换器的绑定、解除
- 绑定队列和交换器
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException; Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException; void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
- queue:队列名称
- exchange:交换器的名称
- routingKey:用来绑定队列和交换器的路由键
- argument:定义绑定的一些参数
- 解除队列和交换器的绑定
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException; Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
绑定交换器和交换器
- API 方法
Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException; Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException; void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
- 示例:
channel.exchangeDeclare("source", "direct", false, true, null); channel.exchangeDeclare("destination", "fanout", false, true, null); channel.exchangeBind("destination", "source", "exKey"); channel.queueDeclare("queue", false, false, true, null); channel.queueBind("queue", "destination“, ""); channel.basicPublish("source", "exKey", null, "exToExDemao".getBytes());
- 生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配的另一个交换器 destination,并把消息转发到destination中,进而存储在 destination 绑定的队列 queue 中