引言
代码已上传到Github,有兴趣的同学可以下载来看看:https://github.com/ylw-github/RabbitMQ-Demo
场景:生产者发送消息出去之后,不知道到底有没有发送到RabbitMQ服务器, 默认是不知道的。而且有的时候我们在发送消息之后,后面的逻辑出问题了,我们不想要发送之前的消息了,需要撤回该怎么做。
这个熟悉的场景容易的让我们想到了“事务”,其实RabbitMQ也是有事务机制的。
解决方案:
- AMQP 事务机制
- Confirm 模式
事务模式:
- 「txSelect」 :将当前channel设置为transaction模式
- 「txCommit」 :提交当前事务
- 「txRollback」 :事务回滚
1. AMQP 事务机制案例
1.新建Maven项目RabbitMQ-Demo
2.添加Maven依赖:
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> </dependencies>
3.连接工具类
package com.ylw.rabbitmq; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitMQConnecUtils { public static Connection newConnection() throws IOException, TimeoutException { // 1.定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置服务器地址 factory.setHost("127.0.0.1"); // 3.设置协议端口号 factory.setPort(5672); // 4.设置vhost factory.setVirtualHost("OrderHost"); // 5.设置用户名称 factory.setUsername("OrderAdmin"); // 6.设置用户密码 factory.setPassword("123456"); // 7.创建新的连接 Connection newConnection = factory.newConnection(); return newConnection; } }
4.生产者
public class Producer { private static final String QUEUE_NAME = "test_trans_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 1.获取连接 Connection newConnection = RabbitMQConnecUtils.newConnection(); // 2.创建通道 Channel channel = newConnection.createChannel(); // 3.创建队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 将当前管道设置为 txSelect 将当前channel设置为transaction模式 开启事务 channel.txSelect(); String msg = "test transaction msg ..."; try { // 4.发送消息 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); // int i = 1 / 0; channel.txCommit();// 提交事务 System.out.println("生产者发送消息:" + msg); } catch (Exception e) { System.out.println("消息进行回滚操作"); channel.txRollback();// 回滚事务 } finally { channel.close(); newConnection.close(); } } }
5.消费者
public class Consumer { private static final String QUEUE_NAME = "test_trans_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 1.获取连接 Connection newConnection = RabbitMQConnecUtils.newConnection(); // 2.获取通道 Channel channel = newConnection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msgString = new String(body, "UTF-8"); System.out.println("消费者获取消息->" + msgString); } }; // 3.监听队列 channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }
6.依次启动消费者和生产者,可以看到消费者能获取到消息:
7.现在模拟异常,把生产者的异常代码打开:
8.启动生产者,发现消费者没有获取到消息:
2. Confirm机制
和上面的代码一样,需要修改一下生产者,我们重新新建一个类ConfirmProducer
:
public class ConfirmProducer { private static final String QUEUE_NAME = "test_trans_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // 1.获取连接 Connection newConnection = RabbitMQConnecUtils.newConnection(); // 2.创建通道 Channel channel = newConnection.createChannel(); // 3.创建队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // confirm机制 channel.confirmSelect(); String msg = "test confirm msg ..."; // 4.发送消息 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("生产者发送消息:" + msg); if (!channel.waitForConfirms()) { System.out.println("消息发送失败!"); } else { System.out.println("消息发送成功!"); } channel.close(); newConnection.close(); } }
依次启动消费者和新建的生产者,可以看到生产者发送消息成功,消费者消费消息也成功:
本文完!