消息中间件系列教程(12) -RabbitMQ-消息确认机制

简介: 消息中间件系列教程(12) -RabbitMQ-消息确认机制

引言

代码已上传到Github,有兴趣的同学可以下载来看看:https://github.com/ylw-github/RabbitMQ-Demo

场景:生产者发送消息出去之后,不知道到底有没有发送到RabbitMQ服务器, 默认是不知道的。而且有的时候我们在发送消息之后,后面的逻辑出问题了,我们不想要发送之前的消息了,需要撤回该怎么做。

这个熟悉的场景容易的让我们想到了“事务”,其实RabbitMQ也是有事务机制的。

解决方案:

  1. AMQP 事务机制
  2. Confirm 模式

事务模式:

  • 「txSelect」 :将当前channel设置为transaction模式
  • 「txCommit」 :提交当前事务
  • 「txRollback」 :事务回滚

1. AMQP 事务机制案例

1.新建Maven项目RabbitMQ-Demo

2.添加Maven依赖:

<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
  </dependency>
</dependencies>

3.连接工具类

package com.ylw.rabbitmq;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQConnecUtils {
    public static Connection newConnection() throws IOException, TimeoutException {
        // 1.定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2.设置服务器地址
        factory.setHost("127.0.0.1");
        // 3.设置协议端口号
        factory.setPort(5672);
        // 4.设置vhost
        factory.setVirtualHost("OrderHost");
        // 5.设置用户名称
        factory.setUsername("OrderAdmin");
        // 6.设置用户密码
        factory.setPassword("123456");
        // 7.创建新的连接
        Connection newConnection = factory.newConnection();
        return newConnection;
    }
}

4.生产者

public class Producer {
    private static final String QUEUE_NAME = "test_trans_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.获取连接
        Connection newConnection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = newConnection.createChannel();
        // 3.创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 将当前管道设置为 txSelect 将当前channel设置为transaction模式 开启事务
        channel.txSelect();
        String msg = "test transaction msg ...";
        try {
            // 4.发送消息
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            // int i = 1 / 0;
            channel.txCommit();// 提交事务
            System.out.println("生产者发送消息:" + msg);
        } catch (Exception e) {
            System.out.println("消息进行回滚操作");
            channel.txRollback();// 回滚事务
        } finally {
            channel.close();
            newConnection.close();
        }
    }
}

5.消费者

public class Consumer {
    private static final String QUEUE_NAME = "test_trans_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.获取连接
        Connection newConnection = RabbitMQConnecUtils.newConnection();
        // 2.获取通道
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者获取消息->" + msgString);
            }
        };
        // 3.监听队列
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

6.依次启动消费者和生产者,可以看到消费者能获取到消息:

7.现在模拟异常,把生产者的异常代码打开:

8.启动生产者,发现消费者没有获取到消息:

2. Confirm机制

和上面的代码一样,需要修改一下生产者,我们重新新建一个类ConfirmProducer

public class ConfirmProducer {
    private static final String QUEUE_NAME = "test_trans_queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1.获取连接
        Connection newConnection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = newConnection.createChannel();
        // 3.创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // confirm机制
        channel.confirmSelect();
        String msg = "test confirm msg ...";
        // 4.发送消息
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        System.out.println("生产者发送消息:" + msg);
        if (!channel.waitForConfirms()) {
            System.out.println("消息发送失败!");
        } else {
            System.out.println("消息发送成功!");
        }
        channel.close();
        newConnection.close();
    }
}

依次启动消费者和新建的生产者,可以看到生产者发送消息成功,消费者消费消息也成功:

本文完!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
2月前
|
Web App开发 JSON JavaScript
Node.js 中的中间件机制与 Express 应用
Node.js 中的中间件机制与 Express 应用
|
2月前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
128 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
2月前
|
JavaScript 安全 中间件
深入浅出Node.js中间件机制
【10月更文挑战第36天】在探索Node.js的奥秘之旅中,中间件的概念如同魔法一般,它让复杂的请求处理变得优雅而高效。本文将带你领略这一机制的魅力,从概念到实践,一步步揭示如何利用中间件简化和增强你的应用。
|
3月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
145 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
2月前
|
消息中间件 JavaScript 中间件
深入浅出Node.js中间件机制
【10月更文挑战第24天】在Node.js的世界里,中间件如同厨房中的调料,为后端服务增添风味。本文将带你走进Node.js的中间件机制,从基础概念到实际应用,一探究竟。通过生动的比喻和直观的代码示例,我们将一起解锁中间件的奥秘,让你轻松成为后端料理高手。
46 1
|
2月前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
3月前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
41 0
rabbitmq基础教程(ui,java,springamqp)
|
3月前
|
JavaScript 安全 中间件
深入浅出Node.js中间件机制
【10月更文挑战第4天】在探索Node.js的海洋中,中间件机制犹如一座灯塔,为开发者指引方向。本文将带你一探究竟,从浅入深地理解这一核心概念。我们将通过生动的比喻和实际代码示例,揭示中间件如何在请求和响应之间搭建桥梁,实现功能的扩展与定制。无论你是初学者还是有经验的开发者,这篇文章都将为你提供新的视角和深入的理解。
66 0
|
7月前
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
1962 0