本篇翻译的是RabbitMQ官方文档关于API的内容,原文链接:http://www.rabbitmq.com/api-guide.html。博主对其内容进行大体上的翻译,有些许部分会保留英文,个人觉得这样更加有韵味,如果全部翻译成中文,会存在偏差,文不达意(主要是功力浅薄~~)。文章也对部分内容进行一定的解释,增强对相关知识点的理解。
Overview
RabbitMQ java client uses com.rabbitmq.client as its top-level package, 关键的classes和interface如下:
- Channel
- Connection
- ConnectionFactory
- Consumer
AMQP协议层面的操作通过Channel接口实现。Connection是用来open Channels的,可以注册event handlers,也可以在结束是close connections. Connection是通过ConnectionFactory来进行初始化操作的,当然也需要配置不同的connection设置,比如vhost或者username等。
Connections and Channels
关键的API如Connection和Channel,分别代表了AMQP-0-9-1的connection和channel。典型的包导入如下:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
Connecting to a broker
下面的代码用来在给定的参数(hostname, port number等)下连接一个AMQP broker:
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();
也可以选择使用URI来实现,示例如下:
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();
Connection接口被用来open一个channel:
Channel channel = conn.createChannel();
这样在创建之后,Channel可以用来发送或者接受消息了。
在使用完之后,关闭连接:
channel.close();
conn.close();
显示的关闭channel是一个很好的习惯,但这不是必须的,在基本的connection关闭的时候channel也会自动的关闭。
Using Exchanges and Queues
AMQP的high-level构建模块exchanges和queues是Client端应用所必须的。在使用之前必须先“declared”(声明),确保在使用之前已经存在,如果不存在则创建它,这些操作都包含在declare里。
下面的代码是演示如何declare一个exchange和queue:
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
上面创建了一个durable, non-autodelete并且绑定类型为direct的exchange以及一个non-durable, exclusive,autodelete的queue(此queue的名称由broker端自动生成)。这里的exchange和queue也都没有设置特殊的arguments。
上面的代码也展示了如果使用routing key将queue和exchange绑定起来。上面声明的queue具备如下特性:排他的(只对当前client同一个Connection可用, 同一个Connection的不同的Channel可共用),并且也会在client连接断开时自动删除。
如果要在client共享一个queue,可以做如下声明:
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
这里的queue是durable的,非排他的,non-autodelete, 而且也有一个确定的已知的名称(又Client指定而非broker端自动生成)。
注意:Channel的API方法都是可以重载的,比如exchangeDeclare,queueDeclare根据参数的不同,可以有不同的重载形式,根据自身的需要去进行调用。
Publish messages
如果要发送一个消息可以采用Channel.basicPublish的方式:
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
为了更好的控制,你也可以使用mandatory这个属性,或者可以发送一些特定属性的消息:
channel.basicPublish(exchangeName, routingKey, mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);
这个方法发送了一条消息,这条消息的delivery mode为2,即消息需要被持久化在broker中,同时priority优先级为1,content-type为text/plain。你可以可以自己设定消息的属性:
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("bob")
.build()),
messageBodyBytes);
你也可以发送一条带有header的消息:
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude", 51.5252949);
headers.put("longitude", -0.0905493);
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build()),
messageBodyBytes);
你也可以发送一条带有超时时间expiration的消息:
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("60000")
.build()),
messageBodyBytes);
以上只是举例,由于篇幅关系,这里就不一一列举所有的可能情形了。
Channel#basicPublish方法在以下两种情形下会被阻塞,具体可以参考http://www.rabbitmq.com/alarms.html:
- When memory use goes above the configured limit.(内存不够)
- When disk space drops below the configured limit.(磁盘空间不足)
Channles and Concurrency Consideration(Thread Safaty)
Channel实例不能在线程建共享,应用程序应该为每一个线程开辟一个Channel, 而不是在多线程建共享Channel。某些情况下Channel的操作可以并发运行,但是某些情况下并发会导致在网络上错误的帧交叉,同时也会影响publisher confirm, 故多线程共享Channel是非线程安全的。
Receiving messages by subscription
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
接受消息一般是通过实现Consumer接口或者继承DefaultConsumer来实现。当调用与Consumer相关的API方法时,不同的订阅采用consumer tags以作彼此的区分,在同一个Channel中的Consumer也需要通过唯一的consumer tags以作区分。。
消费消息demo如下:
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
注意到上面代码我们显示的设置autoAck=false, 对于Consumer来说这个设置是非常必要的。(译者注:具体可以参考RabbitMQ之消息确认机制(事务+Confirm)中Consumer确认那一章节。)
同时对于Consumer来说重写handleDelivery方法也是十分方便的。更复杂的Consumer会重写(override)更多的方法,比如handleShutdownSignal当channels和connections close的时候会调用,handleConsumeOk在其他callback方法之前调用,返回consumer tags.
Consumer同样可以override handleCancelOk和handleCancel方法,这样在显示的或者隐式的取消的时候调用。
你可以通过Channel.basicCancel方法显示的cancel一个指定的Consumer:
channel.basicCancel(consumerTag);
(译者注:这句代码首先触发handleConsumerOk,之后触发handleDelivery方法,最后触发handleCancelOk方法。)
单个Consumer在Connection上都分配单个的线程来调用这些callback的方法,也就是说Consumer这里安全的调用阻塞式的方法,比如queueDeclare, txCommit, basicCancel或者basicPublish。
每个Channel都有自己的独立的线程。最常用的用法是一个Channel对应一个Consumer, 也就是意味着Consumers彼此间没有任何关联。当然你也可以在一个Channel中维持多个Consumers, 但是要注意一个问题,如果在Channel的一个Consumer一直在运行,那么对于其他Consumer的callbacks而言会被hold up(耽搁)。
Retrieving individual messages
通过Channel.basicGet可以一个一个的获取消息,其返回值是GetResponse(from which the header information(properties) and message body can be extracted)。
示例Demo如下:
boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
// No message retrieved.
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
long deliveryTag = response.getEnvelope().getDeliveryTag();
...
如果设置autoAck为false,那么你同样需要显示的调用Channel.basicAck来确认消息已经被成功的接受了:
channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message
(译者注:有关RabbitMQ的消费端的更多信息可以参考:RabbitMQ之Consumer消费模式(Push & Pull))
Handing unroutable messages
如果一个消息在publish的时候设置了mandatory标记,如果消息没有成功的路由到某个队列的时候,broker端会通过Basic.Return返回回来。
这时候客户端需要实现ReturnListener这个接口,并且调用Channel.setReturnListener。 如果client没有配置相关的return listener那么相应的需要被returned的消息就会被drop掉。
channel.setReturnListener(new ReturnListener() {
public void handleBasicReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
...
}
});
(译者注:有关mandatory的更多内容可以参考:RabbitMQ之mandatory和immediate。)
Shutdown Protocol
Overview of the AMQP client shutdown
AMQP-0-9-1的connection和channel采用同样的方式来管理网络失败,内部错误以及显示的local shutdown。
AMQP-0-9-1的connection和channel具备如下的生命周期状态(lifecycle states):
- open: the object is ready to use.
- closing:当前对象被显示的通知调用shutdown,这样就产生了一个shutdown的请求至lower-layer的对象进行相应的操作,并等待这些shutdown操作的完成。
- closed:当前对象已经接受到所有的shutdown完成的通知,并且也shutdown了自身。
这些对象最终成closed的状态,而不管是由于什么原因引起的,或者是一个applicatin request,或者是内部client library的失败,或者是a remote network request, 亦或者是network failure。
AMQP的connecton和channel对象控制(possess)了shutdown-related的方法:addShutdownListener(ShutdownListener listener)和removeShutdownListener(ShutdownListener listener)。当connection和channel转向closed状态时会调用ShutdownListener, 而且如果将一个ShutdownListener注册到一个已经处于closed状态的object(特指connection或者channel的对象)时,会立刻调用ShutdownListener。
- getCloseReason():可以让你知道the object’s shutdown的原因。
- isOpen():检测the objects当前的是否处于open状态。
- close(int closeCode, String closeMessage):显示的通知the object执行shutdown。
示例代码:
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause)
{
...
}
});
Information about the ircumstances of a shutdown
当触发ShutdownListener的时候,就可以获取到ShutdownSignalException,这个ShutdownSignalException包含了close的原因,这个原因也可以通过getCloseReason()方法获取。
ShutdownSignalException提供了多个方法用来分析shutdown的原因。isHardError()方法可以知道是connection还是channel的error,getReason()方法可以获取cause相关的信息(以AMQP method的形式,com.rabbitmq.client.Method:AMQP.Channel.Close or AMQP.Connection.Close):
public void shutdownCompleted(ShutdownSignalException cause)
{
if (cause.isHardError())
{
Connection conn = (Connection)cause.getReference();
if (!cause.isInitiatedByApplication())
{
Method reason = cause.getReason();
...
}
...
} else {
Channel ch = (Channel)cause.getReference();
...
}
}
Atomicity and use of the isOpen() method
我们并不推荐在生产环境的代码上使用channel或者connection的isOpen()方法,这个isOpen()方法的返回值依赖于shutdown cause的存在,有可能会产生竞争。
(译者添加:关于isOpen依赖于shutdown cause, isOpen的实现代码如下:)
public boolean isOpen() {
synchronized(this.monitor) {
return this.shutdownCause == null;
}
}
错误的使用方式如下:
public void brokenMethod(Channel channel)
{
if (channel.isOpen())
{
// The following code depends on the channel being in open state.
// However there is a possibility of the change in the channel state
// between isOpen() and basicQos(1) call
...
channel.basicQos(1);
}
}
正确的使用方式:
public void validMethod(Channel channel)
{
try {
...
channel.basicQos(1);
} catch (ShutdownSignalException sse) {
// possibly check if channel was closed
// by the time we started action and reasons for
// closing it
...
} catch (IOException ioe) {
// check why connection was closed
...
}
}
Advanced Connection options
Consumer thread pool
默认情况下客户端会自动分配一个ExecutorService给Consumer线程,同样你也可以使用自定义的线程池,比如:
ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);
当connection关闭的时候,默认的ExecutorService会被shutdown,但是如果是自定义的ExecutorService将不会被自动的shutdown,所以Clients程序需要在最终关闭的时候手动的去执行shutdown(),否则将会阻止JVM的正常关闭。
同一个executor service可以被多个connections共用。除非有明显的证据证明默认的ExecutorService不能满足当前Consumer callbacks的需要,否则不建议使用自定义的ExecutorService.
Using Lists of Hosts
可以通过使用Address来执行newConnection(). com.rabbitmq.client.Address的使用是比较方便的,例如:
Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
, new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);
如果hostname1:portnumber1成功了连接,而hostname2:portnumber2连接失败了,connection照样会成功returned, 也不会跑出IOException。这个和你重复设置host,port然后调用factory.newConnection()直到有一组成功为止一个效果。
同样可以指定自定义的ExecutorService, 比如:factory.newConnection(es, addrArr)。
If you want moew control over the host to connect to, see the support for service discovery.
Service discovery with the AddressResolver interface
在版本3.6.6开始,可以通过AddressResolver接口的实现来创建connection:
Connection conn = factory.newConnection(addressResolver);
AddressResolver接口如下:
public interface AddressResolver {
List<Address> getAddresses() throws IOException;
}
使用AddressResolver可以更好的实现custom service discovery逻辑,和“automatic recovery”组合使用,客户端可以自动的和broker nodes连接.AddressResolver也可以有效的配合负载均衡策略。
AddressResolver有两个实现:DnsRecordIpAddressResolver和DnsSrvRecordAddressResolver.(博主没用过AddressResolver,这里就不多做解释了)
Heartbeat Timeout
有关Heartbeat的内容请参考Heatbeats guide。(原文就是这么说的。)
Custom Thread Factories
略。和Google App Engine有关。
Support for Java non-blocking IO
4.0版本开始客户端引入了java的NIO,这里引入NIO的目的不是为了比BIO的更快,而是是的更加容易的控制资源。
对于默认的BIO模式,每个connection都需要一个独立的线程来进行网络通讯。但在NIO模式下,你可以控制网络通讯读写线程的数量。
如果你的java程序需要许多的connections(几十个或者几百个),那么使用NIO模式是一个很好的选择。相比BIO而言,你所使用的线程数很少,通过设置合理的线程数,你可以不必担心性能的损耗,尤其是在connections不怎么busy的时候。
NIO必须被显示的设置:
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
你也可以设置NIO的参数:
connectionFactory.setNioParams(new NioParams().setNbIoThreads(4));
NIO模式下使用合理的默认值,同时你也可以根据自身的负载情况来进行合理的变换。
Automatic Recovery From Network Failures
Connection Recovery
客户端和broker之间的网络通讯可能会失败。RabbitMQ java client支持connections和拓扑topology(指queues, exchanges, bindings and consumers)的自动回复。自动恢复过程有如下几个步骤:
- Reconnect
- Restore connection listeners
- Re-open channels
- Restore channel listeners
- Restore channel basic.qos setting, publisher confirms and transaction settings
topology的恢复包括如下行为,performed for every channel:
- Re-declare exchange (exception for predefined ones)
- Re-declare queues
- Recover all bindings
- Recover all consumers
在版本4.0.0开始,自动回复默认是开启的。你也通过factory.setAutomaticRecoveryEnabled(boolean)可以手动的设置automatic onnection recovery.
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
factory.setAutomaticRecoveryEnabled(true);
// connection that will recover automatically
Connection conn = factory.newConnection();
如果由于某些异常(比如RabbitMQ节点始终连接不上)而导致的恢复失败。那么会在某个特定的时间间隔内重试,默认此间隔为5s,当然此值可配:
ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds
factory.setNetworkRecoveryInterval(10000);
Recovery Listeners
It is possible to register one or more recovery listeners on recoverable connections and channels. 当connection recovery启用的时候,通过调用ConnectionFactory#newConnection和Connection#createChannel返回的connections实现com.rabbitmq.client.Recoverable. 这里提供了两个方法:addRecoveryListener和removerRecoveryListener.
/**
* Provides a way to register (network, AMQP 0-9-1) connection recovery
* callbacks.
*
* When connection recovery is enabled via {@link ConnectionFactory},
* {@link ConnectionFactory#newConnection()} and {@link Connection#createChannel()}
* return {@link Recoverable} connections and channels.
*
* @see com.rabbitmq.client.impl.recovery.AutorecoveringConnection
* @see com.rabbitmq.client.impl.recovery.AutorecoveringChannel
*/
public interface Recoverable {
/**
* Registers a connection recovery callback.
*
* @param listener Callback function
*/
void addRecoveryListener(RecoveryListener listener);
void removeRecoveryListener(RecoveryListener listener);
}
当然你必须将connections和channels强制转换为Recoverable的才能使用这些方法。
Effects on Publishing
消息通过Channel,basicPublish发布,如果connection down了那么消息就会丢失。客户端不会在connection恢复之后重新delivery这些消息。为了确保消息的可靠性,可以参考Publisher Confims.(或者可以参考博主的博文:RabbitMQ之消息确认机制(事务+Confirm))。
Topology Recovery
Topology recovery涉及到exchanges, queues, bindings and consumer.当automatic recovery可用时topology recovery默认也可用。当然topology也可显示的设置为disabled:
ConnectionFactory factory = new ConnectionFactory();
Connection conn = factory.newConnection();
// enable automatic recovery (e.g. Java client prior 4.0.0)
factory.setAutomaticRecoveryEnabled(true);
// disable topology recovery
factory.setTopologyRecoveryEnabled(false);
Manual Acknowledgements and Automatic Recovery
当autoAck设置为false的时候,在消息delivery和ack的时候有可能会由于网络原因故障,在connection recovery之后,RabbitMQ会将所有的channels的delivery tags进行重置。这就意味着basic.ack, basic,nack以及basic.reject带有old delivery tags的将会引起channel exception。为了解决这个为题,RabbitMQ java client会记录和更新相应的delivery tags来确保在恢复期间保持单调递增。带有过时的delivery tags的ack将不会被发送。采用manual ack和automatic recovery的应用必须具备处理redeliveries的能力。
Unhandled Exceptions
在connection, channel, recovery, consumer生命周期内涉及的未被处理的异常可以委托给exception handler. Exception handler实现了ExceptionHandler这个接口,默认情况下使用的是DefaultExceptionHandler, 只是在标准输出流中打印一些exception的细节。
你可以使用ConnectionFactory#setExceptionHandler来override这个handler,这个handler可以被ConnectionFactory创建的所有的Connections所使用:
ConnectionFactory factory = new ConnectionFactory();
factory.setExceptionHandler(customHandler);