12、RabbitMQ教程-使用消息确认机制confirm带来的问题

简介: 12、RabbitMQ教程-使用消息确认机制confirm带来的问题
既然我们有了消息确认机制,我们可以用来解决很多问题,比如:我们用RabbitMQ的在项目之间消失丢失的问题,但是越多的技术的应用,往往会带来新的更多的问题。

防止消息丢失,真实企业这么应用

消息丢失,我们可以用确认机制来解决,实际项目的应用中我们可以这么用,实例如下:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author echo
 * @date 2021-01-14 14:35
 */
public class TopicConfirmProductTest {

    private static final String EXCHANGE_NAME = "exchange_topic";
    private static final String ROUTING_KEY = "com.echo.level2";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;

<html>
<!--在这里插入内容-->
</html>



    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = createConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        sendMsg(channel);
        closeConnection(connection, channel);
    }

    private static void sendMsg(Channel channel) throws IOException, InterruptedException {
        // 将信道设置为publisher confirm模式
        channel.confirmSelect();
        // 创建一个 type="direct" 、持久化的、非自动删除的交换器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null);
        // 发送一条持久化的消息: topic hello world !
        String message = "topic hello world !";
        while(true) {
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            if (channel.waitForConfirms()) {
                System.out.println("投递成功");
                break;
            }
            Thread.sleep(5000);
        }
    }

    private static void closeConnection(Connection connection, Channel channel) throws IOException, TimeoutException {
        // 关闭资源
        channel.close();
        connection.close();
    }

    private static Connection createConnection() throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ的链接参数
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("echo");
        factory.setPassword("123456");
        // 和RabbitMQ建立一个链接
        return factory.newConnection();
    }

}

可能很多人眼尖,一眼就看到我们代码里面的while循环。这个代码跟我们之前的confirm的基本应用没有什么很大的区别,除了while之外。但它确确实实能够给我们带来很好的效果,真实有效的解决消息丢失的问题。

解决方案的应用,随之出现新的问题

while和channel.waitForConfirms()的配合,很大程度上,直接的避免了消息的丢失,或者说解决99%的这样的问题。但是新的问题也出来了,那就是性能问题。不知道有没有人关注过这方面的情况,可能很多人写实例的时候,感觉没啥。while里面也不会直接构成死循环,但是恰恰是这个地方,会产生问题。比如:RabbitMQ宕机的时候,又比如:while在很多个生产者里面应用,网络一有波动就会有大量的重试。给我们系统带来了很大的性能问题。

在这里其实还好,生产者,我们有一些办法去协调它,最大的问题在于消费者

消费者使用ack确认

可能很多人能够想到的问题如:消息重复消费,消息丢失。没错,这也是MQ常见的问题。但是其实这里最大的问题是消息的确认使用不正确,导致死循环

真实案例

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author echo
 * @date 2021-01-14 15:05
 */
public class TopicConfirmConsumerTest {

    private static final String EXCHANGE_NAME = "exchange_topic";
    private static final String QUEUE_NAME = "queue_topic2";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        try {
            consumerMsg(channel);
            Thread.sleep(50000);
            closeConnection(connection, channel);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void consumerMsg(Channel channel) throws IOException {
        //声明交换机 Fanout模式
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null);
        //进行绑定,指定消费那个队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", null);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("recv message: " + new String(body));
//                try {
                    // TODO: 真实业务逻辑
                   int a = 1/0; channel.basicAck(envelope.getDeliveryTag(), false);
//                } catch (Exception e) {
//                    System.out.println("手动确认失败,错误信息:" + e.getMessage());
//                    // 假若在真实业务逻辑中做了重复校验,我们可以对重复交易做拒绝,同时也可以将消息重新放回队列
//                    if (envelope.isRedeliver()) {
//                        // 拒绝消息
//                        channel.basicReject(envelope.getDeliveryTag(), false);
//                    } else {
//                        // 重新放回队列
//                        channel.basicNack(envelope.getDeliveryTag(), false, true);
//                    }
//                }
            }
        };
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

    private static void closeConnection(Connection connection, Channel channel) throws
            IOException, TimeoutException {
        channel.close();
        connection.close();
    }

    private static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ的链接参数
        factory.setUsername("echo");
        factory.setPassword("123456");
        factory.setPort(PORT);
        factory.setHost(IP_ADDRESS);

        // 和RabbitMQ建立一个链接
        return factory.newConnection();
    }

}

大家不妨尝试一下这个程序,你会发现当我使用的手动确认模式的时候,如上程序,在执行的过程当中直接报错了,这样会导致mq不断重推消息。当放开注释部分之后,这里死循环是解决了,但是又会有消息重复消费的问题,这就是我们手动确认的时候带来的一个比较严重的问题,解决方案只需要应用了对应的ack代码和做幂等性校验即可。

总结

当我们为了解决一些问题,需要用一些其他组件或者框架的时候,应该注意了解新用来解决问题的技术是否会产生新的问题

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
8月前
|
消息中间件 Java RocketMQ
RocketMQ实战教程之RocketMQ安装
这是一篇关于RocketMQ安装的实战教程,主要介绍了在CentOS系统上使用传统安装和Docker两种方式安装RocketMQ。首先,系统需要是64位,并且已经安装了JDK 1.8。传统安装包括下载安装包,解压并启动NameServer和Broker。Docker安装则涉及安装docker和docker-compose,然后通过docker-compose.yaml文件配置并启动服务。教程还提供了启动命令和解决问题的提示。
|
3月前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
38 0
rabbitmq基础教程(ui,java,springamqp)
|
8月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
8月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
115 0
|
5月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
263 2
|
6月前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ使用问题之过期删除机制的触发条件是什么
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之过期删除机制的触发条件是什么
|
5月前
|
消息中间件 监控 Ubuntu
RabbitMQ安装配置,超详细版教程
以上步骤为您提供了在Linux环境下安装RabbitMQ的详细过程。安装Erlang作为基础,然后通过添加官方源并安装RabbitMQ本身,最后对服务进行配置并启用Web管理界面。这些步骤操作简单直观,只需要跟随上述指南,即可在短时间内将RabbitMQ服务器运行起来,并进行进一步的配置和管理。不要忘记硬件和网络资源对性能的影响,确保RabbitMQ能够满足您的应用需求。
327 0
|
5月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
88 0
|
5月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
80 0
|
5月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
70 0