12、RabbitMQ教程-使用消息确认机制confirm带来的问题

简介: 12、RabbitMQ教程-使用消息确认机制confirm带来的问题
既然我们有了消息确认机制,我们可以用来解决很多问题,比如:我们用RabbitMQ的在项目之间消失丢失的问题,但是越多的技术的应用,往往会带来新的更多的问题。

防止消息丢失,真实企业这么应用

消息丢失,我们可以用确认机制来解决,实际项目的应用中我们可以这么用,实例如下:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author echo
 * @date 2021-01-14 14:35
 */
public class TopicConfirmProductTest {

    private static final String EXCHANGE_NAME = "exchange_topic";
    private static final String ROUTING_KEY = "com.echo.level2";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;

<html>
<!--在这里插入内容-->
</html>



    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = createConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        sendMsg(channel);
        closeConnection(connection, channel);
    }

    private static void sendMsg(Channel channel) throws IOException, InterruptedException {
        // 将信道设置为publisher confirm模式
        channel.confirmSelect();
        // 创建一个 type="direct" 、持久化的、非自动删除的交换器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null);
        // 发送一条持久化的消息: topic hello world !
        String message = "topic hello world !";
        while(true) {
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            if (channel.waitForConfirms()) {
                System.out.println("投递成功");
                break;
            }
            Thread.sleep(5000);
        }
    }

    private static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException {
        // 关闭资源
        channel.close();
        connection.close();
    }

    private static Connection createConnection() throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ的链接参数
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("echo");
        factory.setPassword("123456");
        // 和RabbitMQ建立一个链接
        return factory.newConnection();
    }

}

可能很多人眼尖,一眼就看到我们代码里面的while循环。这个代码跟我们之前的confirm的基本应用没有什么很大的区别,除了while之外。但它确确实实能够给我们带来很好的效果,真实有效的解决消息丢失的问题。

解决方案的应用,随之出现新的问题

while和channel.waitForConfirms()的配合,很大程度上,直接的避免了消息的丢失,或者说解决99%的这样的问题。但是新的问题也出来了,那就是性能问题。不知道有没有人关注过这方面的情况,可能很多人写实例的时候,感觉没啥。while里面也不会直接构成死循环,但是恰恰是这个地方,会产生问题。比如:RabbitMQ宕机的时候,又比如:while在很多个生产者里面应用,网络一有波动就会有大量的重试。给我们系统带来了很大的性能问题。

在这里其实还好,生产者,我们有一些办法去协调它,最大的问题在于消费者

消费者使用ack确认

可能很多人能够想到的问题如:消息重复消费,消息丢失。没错,这也是MQ常见的问题。但是其实这里最大的问题是消息的确认使用不正确,导致死循环

真实案例

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author echo
 * @date 2021-01-14 15:05
 */
public class TopicConfirmConsumerTest {

    private static final String EXCHANGE_NAME = "exchange_topic";
    private static final String QUEUE_NAME = "queue_topic2";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        try {
            consumerMsg(channel);
            Thread.sleep(50000);
            closeConnection(connection, channel);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void consumerMsg(Channel channel) throws IOException {
        //声明交换机 Fanout模式
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null);
        //进行绑定,指定消费那个队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", null);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("recv message: " + new String(body));
//                try {
                    // TODO: 真实业务逻辑
                   int a = 1/0; channel.basicAck(envelope.getDeliveryTag(), false);
//                } catch (Exception e) {
//                    System.out.println("手动确认失败,错误信息:" + e.getMessage());
//                    // 假若在真实业务逻辑中做了重复校验,我们可以对重复交易做拒绝,同时也可以将消息重新放回队列
//                    if (envelope.isRedeliver()) {
//                        // 拒绝消息
//                        channel.basicReject(envelope.getDeliveryTag(), false);
//                    } else {
//                        // 重新放回队列
//                        channel.basicNack(envelope.getDeliveryTag(), false, true);
//                    }
//                }
            }
        };
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

    private static void closeConnection(Connection connection, Channel channel) throws
            IOException, TimeoutException {
        channel.close();
        connection.close();
    }

    private static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ的链接参数
        factory.setUsername("echo");
        factory.setPassword("123456");
        factory.setPort(PORT);
        factory.setHost(IP_ADDRESS);

        // 和RabbitMQ建立一个链接
        return factory.newConnection();
    }

}

大家不妨尝试一下这个程序,你会发现当我使用的手动确认模式的时候,如上程序,在执行的过程当中直接报错了,这样会导致mq不断重推消息。当放开注释部分之后,这里死循环是解决了,但是又会有消息重复消费的问题,这就是我们手动确认的时候带来的一个比较严重的问题,解决方案只需要应用了对应的ack代码和做幂等性校验即可。

总结

当我们为了解决一些问题,需要用一些其他组件或者框架的时候,应该注意了解新用来解决问题的技术是否会产生新的问题

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 存储 监控
|
1月前
|
消息中间件 存储 运维
|
28天前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
26 0
|
1天前
|
消息中间件 存储 数据安全/隐私保护
RabbitMQ使用教程
RabbitMQ使用教程
8 2
|
4月前
|
消息中间件 存储
【RabbitMQ教程】第四章 —— RabbitMQ - 交换机(上)
【RabbitMQ教程】第四章 —— RabbitMQ - 交换机
|
30天前
|
消息中间件 Linux 开发工具
Linux系统安装RabbitMQ详细教程
Linux系统安装RabbitMQ详细教程
24 0
|
1月前
|
消息中间件 存储 Cloud Native
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
|
3月前
|
消息中间件 Java
RabbitMQ中的消息确认机制是什么?为什么需要消息确认?
RabbitMQ中的消息确认机制是什么?为什么需要消息确认?
31 0
|
4月前
|
消息中间件 数据安全/隐私保护 Docker
百度搜索:蓝易云【Docker安装RabbitMQ docker安装RabbitMQ完整详细教程】
通过按照以上步骤,你应该能够成功在Docker上安装并运行RabbitMQ。请记住,具体步骤可能会因Docker版本和操作系统而有所不同。如果遇到任何问题,可以查阅官方文档或社区寻求更多帮助。
122 0
|
4月前
|
消息中间件 JSON 运维
spring boot RabbitMq基础教程(三)
spring boot RabbitMq基础教程
75 1

热门文章

最新文章