既然我们有了消息确认机制,我们可以用来解决很多问题,比如:我们用RabbitMQ的在项目之间消失丢失的问题,但是越多的技术的应用,往往会带来新的更多的问题。
防止消息丢失,真实企业这么应用
消息丢失,我们可以用确认机制来解决,实际项目的应用中我们可以这么用,实例如下:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author echo
* @date 2021-01-14 14:35
*/
public class TopicConfirmProductTest {
private static final String EXCHANGE_NAME = "exchange_topic";
private static final String ROUTING_KEY = "com.echo.level2";
private static final String IP_ADDRESS = "192.168.230.131";
private static final int PORT = 5672;
<html>
<!--在这里插入内容-->
</html>
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = createConnection();
// 创建一个频道
Channel channel = connection.createChannel();
sendMsg(channel);
closeConnection(connection, channel);
}
private static void sendMsg(Channel channel) throws IOException, InterruptedException {
// 将信道设置为publisher confirm模式
channel.confirmSelect();
// 创建一个 type="direct" 、持久化的、非自动删除的交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null);
// 发送一条持久化的消息: topic hello world !
String message = "topic hello world !";
while(true) {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
if (channel.waitForConfirms()) {
System.out.println("投递成功");
break;
}
Thread.sleep(5000);
}
}
private static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException {
// 关闭资源
channel.close();
connection.close();
}
private static Connection createConnection() throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ的链接参数
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("echo");
factory.setPassword("123456");
// 和RabbitMQ建立一个链接
return factory.newConnection();
}
}
可能很多人眼尖,一眼就看到我们代码里面的while循环。这个代码跟我们之前的confirm的基本应用没有什么很大的区别,除了while之外。但它确确实实能够给我们带来很好的效果,真实有效的解决消息丢失的问题。
解决方案的应用,随之出现新的问题
while和channel.waitForConfirms()的配合,很大程度上,直接的避免了消息的丢失,或者说解决99%的这样的问题。但是新的问题也出来了,那就是性能问题。不知道有没有人关注过这方面的情况,可能很多人写实例的时候,感觉没啥。while里面也不会直接构成死循环,但是恰恰是这个地方,会产生问题。比如:RabbitMQ宕机的时候,又比如:while在很多个生产者里面应用,网络一有波动就会有大量的重试。给我们系统带来了很大的性能问题。
在这里其实还好,生产者,我们有一些办法去协调它,最大的问题在于消费者
消费者使用ack确认
可能很多人能够想到的问题如:消息重复消费,消息丢失。没错,这也是MQ常见的问题。但是其实这里最大的问题是消息的确认使用不正确,导致死循环
真实案例
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author echo
* @date 2021-01-14 15:05
*/
public class TopicConfirmConsumerTest {
private static final String EXCHANGE_NAME = "exchange_topic";
private static final String QUEUE_NAME = "queue_topic2";
private static final String IP_ADDRESS = "192.168.230.131";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = getConnection();
Channel channel = connection.createChannel();
try {
consumerMsg(channel);
Thread.sleep(50000);
closeConnection(connection, channel);
} catch (Exception e) {
e.printStackTrace();
}
}
private static void consumerMsg(Channel channel) throws IOException {
//声明交换机 Fanout模式
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null);
//进行绑定,指定消费那个队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv message: " + new String(body));
// try {
// TODO: 真实业务逻辑
int a = 1/0; channel.basicAck(envelope.getDeliveryTag(), false);
// } catch (Exception e) {
// System.out.println("手动确认失败,错误信息:" + e.getMessage());
// // 假若在真实业务逻辑中做了重复校验,我们可以对重复交易做拒绝,同时也可以将消息重新放回队列
// if (envelope.isRedeliver()) {
// // 拒绝消息
// channel.basicReject(envelope.getDeliveryTag(), false);
// } else {
// // 重新放回队列
// channel.basicNack(envelope.getDeliveryTag(), false, true);
// }
// }
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
private static void closeConnection(Connection connection, Channel channel) throws
IOException, TimeoutException {
channel.close();
connection.close();
}
private static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ的链接参数
factory.setUsername("echo");
factory.setPassword("123456");
factory.setPort(PORT);
factory.setHost(IP_ADDRESS);
// 和RabbitMQ建立一个链接
return factory.newConnection();
}
}
大家不妨尝试一下这个程序,你会发现当我使用的手动确认模式的时候,如上程序,在执行的过程当中直接报错了,这样会导致mq不断重推消息。当放开注释部分之后,这里死循环是解决了,但是又会有消息重复消费的问题,这就是我们手动确认的时候带来的一个比较严重的问题,解决方案只需要应用了对应的ack代码和做幂等性校验即可。
总结
当我们为了解决一些问题,需要用一些其他组件或者框架的时候,应该注意了解新用来解决问题的技术是否会产生新的问题