- RabbitMQ 支持 AMQP 协议的事务机制,可以在发送和接收消息时开启事务,以保证消息的原子性和一致性。
- 下面是事务性消息的发送和接收示例:
1.发送事务性消息:
Channel channel = connection.createChannel();
try {
channel.txSelect(); // 开启事务
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
channel.txCommit(); // 提交事务
} catch (IOException e) {
channel.txRollback(); // 回滚事务
throw e;
} finally {
channel.close();
}
2.接收事务性消息:
Channel channel = connection.createChannel();
try {
channel.txSelect(); // 开启事务
GetResponse response = channel.basicGet(queueName, false);
if (response != null) {
// 处理消息
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
}
channel.txCommit(); // 提交事务
} catch (IOException e) {
channel.txRollback(); // 回滚事务
throw e;
} finally {
channel.close();
}
在发送和接收事务性消息时,需要注意以下几点:
事务性消息的发送和接收必须在同一个 Channel 中进行。
开启事务后,所有的消息发送和接收操作都将在事务中执行。
提交事务后,所有发送的消息才会被确认,接收的消息才会被消费。
回滚事务后,所有发送的消息都会被取消,接收的消息会重新放回队列中。