本文重点讲发送方确认机制
RabbitMQ的消息确认机制有两种
- 事务机制
- 发送方确认机制
所谓的确认机制就是让消息能够被明确的知道是否成功投递和消费
怎么使用消息确认模式
开启消息确认模式比较简单,只需要做两件事
- 第一个,在我们生产者投递消息的时候生产者将信道设置成 confirm 确认)模式
- 消费者消费的时候确认该消息已被消费即可
confirm应用实例
// 生产者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author echo
* @date 2021-01-14 14:35
*/
public class TopicConfirmProductTest {
private static final String EXCHANGE_NAME = "exchange_topic";
private static final String ROUTING_KEY = "com.echo.level2";
private static final String IP_ADDRESS = "192.168.230.131";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = createConnection();
// 创建一个频道
Channel channel = connection.createChannel();
sendMsg(channel);
closeConnection(connection, channel);
}
private static void sendMsg(Channel channel) throws IOException, InterruptedException {
// 将信道设置为publisher confirm模式
channel.confirmSelect();
// 创建一个 type="direct" 、持久化的、非自动删除的交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null);
// 发送一条持久化的消息: topic hello world !
String message = "topic hello world !";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
if (channel.waitForConfirms()) {
System.out.println("投递成功");
} else {
System.out.println("投递失败");
}
Thread.sleep(5000);
}
private static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException {
// 关闭资源
channel.close();
connection.close();
}
private static Connection createConnection() throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ的链接参数
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("echo");
factory.setPassword("123456");
// 和RabbitMQ建立一个链接
return factory.newConnection();
}
}
// 消费者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author tang.sl
* @date 2021-01-14 15:05
*/
public class TopicConfirmConsumerTest {
private static final String EXCHANGE_NAME = "exchange_topic";
private static final String QUEUE_NAME = "queue_topic2";
private static final String IP_ADDRESS = "192.168.230.131";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = getConnection();
Channel channel = connection.createChannel();
try {
consumerMsg(channel);
Thread.sleep(5000);
closeConnection(connection, channel);
} catch (Exception e) {
e.printStackTrace();
}
}
private static void consumerMsg(Channel channel) throws IOException {
//声明交换机 Fanout模式
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null);
//进行绑定,指定消费那个队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("recv message: " + new String(body));
try {
// TODO: 真实业务逻辑
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
System.out.println("手动确认失败,错误信息:" + e.getMessage());
// 假若在真实业务逻辑中做了重复校验,我们可以对重复交易做拒绝,同时也可以将消息重新放回队列
if (envelope.isRedeliver()) {
// 拒绝消息
channel.basicReject(envelope.getDeliveryTag(), false);
} else {
// 重新放回队列
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
private static void closeConnection(Connection connection, Channel channel) throws
IOException, TimeoutException {
channel.close();
connection.close();
}
private static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ的链接参数
factory.setUsername("echo");
factory.setPassword("123456");
factory.setPort(PORT);
factory.setHost(IP_ADDRESS);
// 和RabbitMQ建立一个链接
return factory.newConnection();
}
}
确认投递确认了啥?
确认投递是生产者发送之后,能直接感知到消费者消费了吗?显然不是的,我们可以从我们的实例中看到明显的现状。当我们投递消息之后,我们是否消费成功是不影响这个消息的投递的。所以,确认消息机制是分成两块完成的。
- 第一块就是生产者确认投递,它通过tagId的形式确认了是否真正到达了需要投递的队列。
- 第二块就是消费者确认是否消息,它消费成功与否会决定该消息是否还继续停留在队列里面,也可以做是否重新投递的操作。
注意
消费者中消费消息是异步的,在关闭资源前可能消息还没能确认,所以一定在关系前要休眠