RabbitMQ 常用 API(一)

简介: RabbitMQ 常用 API(一)

RabbitMQ 常用 API

Connection 和 Channel 的创建、关闭

  • 创建 Connection
ConnectionFactory factory = new ConnectionFactory();
// 方式1:通过设置参数创建
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
// 方式2:通过uri创建
factory.setUri("amqp:/userName:password@ipAddress:portNumber/virtualHost");
Connection connection = factory.newConnection();
  • 创建 Channel
Channel channel = connection.createChannel();
  • Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程间共享,应用程序应该为每一个线程开辟一个Channel。某些情况下 Channel 的操作可以并发运行,但是在其他情况下会导致在网络,上出现错误的通信帧交错,同时也会影响发送方确认(publisherconfirm)机制的运行,所以多线程间共享 Channel 实例是非线程安全的
    通常情况下,在调用 createXXX 或者 newXXX 方法之后,可以简单地认为 Connection 或者 Channel 已经成功地处于开启状态,而并不会在代码中使用 isOpen 这个检测方法。如果在使用 Channel 的时候其已经处于关闭状态,那么程序会抛出一个 com.rabbitmq.client.ShutdownSignalException,只需捕获这个异常即可。当然同时也要试着捕获 IOException 或者 SocketException,以防 Connection 意外关闭。
  • 关闭连接
// 关闭Connection
connection.close();
// 关闭Channel
channel.close();
  • AMQP协议中的 Connection 和 Channel 采用同样的方式来管理网络失败、内部错误和显式地关闭连接。Connection 和Channel 所具备的生命周期如下:
  • Open:开启状态,代表当前对象可以使用
  • Closing:正在关闭状态。当前对象被显式地通知调用关闭方法(shutdown),这样就产生了一个关闭请求让其内部对象进行相应的操作,并等待这些关闭操作的完成
  • Closed:已经关闭状态。当前对象已经接收到所有的内部对象已完成关闭动作的通知,并且其也关闭了自身。
  • Connection 和 Channel 最终都是会成为 Closed 的状态,不论是程序正常调用的关闭方法,或者是客户端的异常,再或者是发生了网络异常。
    拓展了解:
    在 Connection 和 Channel 中,与关闭相关的方法有:
addShutdownListener(ShutdownListener listener) 
removeShutdownListener(ShutdownListner listener)
  • 当 Connection 或者 Channel 的状态转变为 Closed 的时候会调用 ShutdownListener。而且如果将一个 ShutdownListener 注册到一个已经处于 Closed 状态的对象(这里特指 Connection 和 Channel 对象)时,会立刻调用 ShutdownListener
  • getCloseReason 方法:可以知道对象关闭的原因
  • isOpen 方法:检测对象当前是否处于开启状态
  • close (int closeCode, String closeMessage) 方法:显式地通知当前对象执行关闭操作
connection.addShutdownListener(new ShutdownListener() {
    @Override
    public void shutdownCompleted(ShutdownSignalException cause) {
        //...
        System.out.println("shut down");
    }
});
  • 触发 ShutdownListener 的时候,就可以获取到 ShutdownSignalException,这个 ShutdownSignalException 包含了关闭的原因,这里原因也可以通过调用前面所提及的 getCloseReason 方法获取。
  • ShutdownSignalException:提供了多个方法来分析关闭的原因
  • isHardError 方法:可以知道是 Connection 的还是 Channel 的错误
  • getReason 方法:可以获取cause相关的信息
connection.addShutdownListener(new ShutdownListener() {
    @Override
    public void shutdownCompleted(ShutdownSignalException cause) {
        if (cause.isHardError()) {
            Connection conn = (Connection) cause.getReference();
            if (!cause.isInitiatedByApplication()) {
                Method reason = cause.getReason();
                //...
            } else {
                Channel ch = (Channel)cause.getReference();
                //...
            }
        }
    }
});


Channel 常用 API

交换器的创建、检测、删除

  • 创建交换机
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;  // 常用
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, 
                                   Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, 
                                   boolean autoDelete, boolean internal, 
                                   Map<String, Object> arguments) throws IOException;
// 不需要服务器任何返回值的创建交换机
void exchangeDeclareNoWait(String exchange, String type, boolean durable, 
                           boolean autoDelete, boolean internal,
                           Map<String, Object> arguments) throws IOException
  • 返回值:Exchange . Declare0K,用来标识成功声明了一个交换器。
  • 入参:
  • exchange:交换器的名称
  • type:交换器的类型,常见的如 fanout、direct、 topic
  • durable:设置是否持久化。为 true 表示持久化,反之是非持久化。
    持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
  • autoDelete:设置是否自动删除。true 则表示自动删除。
    自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。
    注意:不能错误地把这个参数理解为:“当与此交换器连接的客户端都断开时,RabbitMQ 会自动删除本交换器”。
  • internal:设置是否是内置的。
    为 true 则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
  • argument:其他一些结构化参数,比如 alternate-exchange。
  • 不需要服务器任何返回值的创建交换机这个 nowait 指的是 AMQP 中 Exchange.Declare 命令的参数,意思是不需要服务器返回注意:
  • 这个方法的返回值是 void,而普通的 exchangeDeclare 方法的返回值是 Exchange.DeclareOk,意思是在客户端声明了一个交换器之后,需要等待服务器的返回(服务器会返回 Exchange. Declare-Ok 这个 AMQP 命令)
  • 在声明完一个交换器之后(实际服务器还并未完成交换器的创建),那么此时客户端紧接着使用这个交换器,必然会发生异常。如果没有特殊的缘由和应用场景,并不建议使用这个方法。
  • 检测交换器是否存在
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
  • 这个方法在实际应用过程中还是非常有用的,它主要用来检测相应的交换器是否存在。
    如果存在则正常返回;如果不存在则抛出异常:404 channel exception,同时Channel也会被关闭。
  • 删除交换器
Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
  • exchange:表示交换器的名称
  • ifUnused:用来设置是否在交换器没有被使用的情况下删除。
    为 true,则只有在此交换器没有被使用的情况下才会被删除;为 false,则无论如何这个交换器都要被删除。


队列的创建、检测、删除、清空

  • 创建队列
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, 
                             Map<String, Object> arguments) throws IOException;
// 默认创建一个由RabbitMQ命名的(类似这种amq.gen-LhQz1gv3GhDOv8PIDabOXA名称,这种队列也称之为匿名队列)、排他的、自动删除的、非持久化的队列。
Queue.DeclareOk queueDeclare() throws IOException;
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
  • queue: 队列的名称。如果队列不存在,自动创建
  • durable: 设置是否持久化。为 true 则设置队列为持久化。
    持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。
  • exclusive: 设置是否排他。为 true 则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。注意:
  • 排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列
  • “首次”是指如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;
  • 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
  • autoDelete: 设置是否自动删除。为 true 则设置队列为自动删除。
    自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
    不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这个队列自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
  • arguments:设置队列的其他一些参数,如 x-message=ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority等。
  • **注意:**生产者和消费者都能够使用 queueDeclare 来声明一个队列,**但是如果消费者在同一个信道上订阅了另一个队列,**就无法再声明队列了。必须先取消订阅,然后将信道置为“传输”模式,之后才能声明队列。
  • 检测队列
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
  • 存在,返回 Queue.DeclareOk
  • 不存在,抛出异常
  • 删除队列
Queue.DeleteOk queueDelete(String queue) throws IOException;
Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
  • 清空队列
Queue.PurgeOk queuePurge(String queue) throws IOException;
  • 清空队列的内容,而不删除队列本身


队列和交换器的绑定、解除

  • 绑定队列和交换器
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
Queue.BindOk queueBind(String queue, String exchange, String routingKey, 
                       Map<String, Object> arguments) throws IOException;
void queueBindNoWait(String queue, String exchange, String routingKey, 
                     Map<String, Object> arguments) throws IOException;
  • queue:队列名称
  • exchange:交换器的名称
  • routingKey:用来绑定队列和交换器的路由键
  • argument:定义绑定的一些参数
  • 解除队列和交换器的绑定
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, 
                           Map<String, Object> arguments) throws IOException;


绑定交换器和交换器

  • API 方法
Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, 
                             Map<String, Object> arguments) throws IOException;
void exchangeBindNoWait(String destination, String source, String routingKey, 
                        Map<String, Object> arguments) throws IOException;
  • 示例:
channel.exchangeDeclare("source", "direct", false, true, null);
channel.exchangeDeclare("destination", "fanout", false, true, null);
channel.exchangeBind("destination", "source", "exKey");
channel.queueDeclare("queue", false, false, true, null);
channel.queueBind("queue", "destination“, "");
channel.basicPublish("source", "exKey", null, "exToExDemao".getBytes());
  • 生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配的另一个交换器 destination,并把消息转发到destination中,进而存储在 destination 绑定的队列 queue 中
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
消息中间件 存储 安全
RabbitMQ 常用 API(二)
RabbitMQ 常用 API(二)
|
8月前
|
消息中间件 Java 测试技术
SpringBoot整合RabbitMQ图文过程以及RabbitTemplate常用API介绍
SpringBoot整合RabbitMQ图文过程以及RabbitTemplate常用API介绍
106 0
|
4月前
|
消息中间件 API RocketMQ
RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage
RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage
30 0
|
4月前
|
消息中间件 存储 Java
RocketMQ-初体验RocketMQ(06)-使用API操作RocketMQ ,理解RocketMQ的存储结构
RocketMQ-初体验RocketMQ(06)-使用API操作RocketMQ ,理解RocketMQ的存储结构
176 0
|
6月前
|
消息中间件 API 数据安全/隐私保护
使用 REST API 操作 RabbitMQ(二)
使用 REST API 操作 RabbitMQ
|
6月前
|
消息中间件 JSON API
使用 REST API 操作 RabbitMQ(一)
使用 REST API 操作 RabbitMQ
176 0
|
消息中间件 运维 Cloud Native
RocketMQ 5.0 API 与 SDK 的演进
RocketMQ 5.0 SDK 采用了全新的 API,使用 gRPC 作为通信层的实现,并在可观测性上做了很大幅度的提升。
366 0
RocketMQ 5.0 API 与  SDK 的演进
|
消息中间件 缓存 运维
RocketMQ 5.0 API 与 SDK 的演进
RocketMQ 5.0 SDK 采用了全新的 API,使用 gRPC 作为通信层的实现,并在可观测性上做了很大幅度的提升。
RocketMQ 5.0 API 与 SDK 的演进
EMQ
|
消息中间件 缓存 运维
NanoMQ Newsletter 2022-08|v0.11:MQTT 5.0 + MQTT over QUIC 桥接,新增 HTTP API 监控客户端状态
八月,0.11.0版本发布:增加了MQTT 5.0+MQTT over QUIC桥接模式,新增和修复了对已连接客户端状态进行监控和查询的HTTP API。
EMQ
272 0
NanoMQ Newsletter 2022-08|v0.11:MQTT 5.0 + MQTT over QUIC 桥接,新增 HTTP API 监控客户端状态
EMQ
|
边缘计算 监控 物联网
NanoMQ Newsletter 2022-07|v0.10:多路桥接、HTTP 发布 MQTT 消息 API、NanoSDK 支持 MQTT 5.0
v0.10.0已于8月初正式发布,此版本主要增强了桥接功能,新增了发布消息的HTTP API,同时还为NanoSDK增加了MQTT 5.0支持。
EMQ
267 0
NanoMQ Newsletter 2022-07|v0.10:多路桥接、HTTP 发布 MQTT 消息 API、NanoSDK 支持 MQTT 5.0

热门文章

最新文章