消息中间件系列教程(12) -RabbitMQ-消息确认机制

简介: 消息中间件系列教程(12) -RabbitMQ-消息确认机制

引言

代码已上传到Github,有兴趣的同学可以下载来看看:https://github.com/ylw-github/RabbitMQ-Demo

场景:生产者发送消息出去之后,不知道到底有没有发送到RabbitMQ服务器, 默认是不知道的。而且有的时候我们在发送消息之后,后面的逻辑出问题了,我们不想要发送之前的消息了,需要撤回该怎么做。

这个熟悉的场景容易的让我们想到了“事务”,其实RabbitMQ也是有事务机制的。

解决方案:

  1. AMQP 事务机制
  2. Confirm 模式

事务模式:

  • 「txSelect」 :将当前channel设置为transaction模式
  • 「txCommit」 :提交当前事务
  • 「txRollback」 :事务回滚

1. AMQP 事务机制案例

1.新建Maven项目RabbitMQ-Demo

2.添加Maven依赖:

<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
  </dependency>
</dependencies>

3.连接工具类

package com.ylw.rabbitmq;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQConnecUtils {
    public static Connection newConnection() throws IOException, TimeoutException {
        // 1.定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2.设置服务器地址
        factory.setHost("127.0.0.1");
        // 3.设置协议端口号
        factory.setPort(5672);
        // 4.设置vhost
        factory.setVirtualHost("OrderHost");
        // 5.设置用户名称
        factory.setUsername("OrderAdmin");
        // 6.设置用户密码
        factory.setPassword("123456");
        // 7.创建新的连接
        Connection newConnection = factory.newConnection();
        return newConnection;
    }
}

4.生产者

public class Producer {
    private static final String QUEUE_NAME = "test_trans_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.获取连接
        Connection newConnection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = newConnection.createChannel();
        // 3.创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 将当前管道设置为 txSelect 将当前channel设置为transaction模式 开启事务
        channel.txSelect();
        String msg = "test transaction msg ...";
        try {
            // 4.发送消息
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            // int i = 1 / 0;
            channel.txCommit();// 提交事务
            System.out.println("生产者发送消息:" + msg);
        } catch (Exception e) {
            System.out.println("消息进行回滚操作");
            channel.txRollback();// 回滚事务
        } finally {
            channel.close();
            newConnection.close();
        }
    }
}

5.消费者

public class Consumer {
    private static final String QUEUE_NAME = "test_trans_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.获取连接
        Connection newConnection = RabbitMQConnecUtils.newConnection();
        // 2.获取通道
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者获取消息->" + msgString);
            }
        };
        // 3.监听队列
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

6.依次启动消费者和生产者,可以看到消费者能获取到消息:

7.现在模拟异常,把生产者的异常代码打开:

8.启动生产者,发现消费者没有获取到消息:

2. Confirm机制

和上面的代码一样,需要修改一下生产者,我们重新新建一个类ConfirmProducer

public class ConfirmProducer {
    private static final String QUEUE_NAME = "test_trans_queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1.获取连接
        Connection newConnection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = newConnection.createChannel();
        // 3.创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // confirm机制
        channel.confirmSelect();
        String msg = "test confirm msg ...";
        // 4.发送消息
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        System.out.println("生产者发送消息:" + msg);
        if (!channel.waitForConfirms()) {
            System.out.println("消息发送失败!");
        } else {
            System.out.println("消息发送成功!");
        }
        channel.close();
        newConnection.close();
    }
}

依次启动消费者和新建的生产者,可以看到生产者发送消息成功,消费者消费消息也成功:

本文完!

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 存储 监控
|
4月前
|
消息中间件 存储 负载均衡
消息中间件的选择:RabbitMQ是一个明智的选择
消息中间件的选择:RabbitMQ是一个明智的选择
50 0
|
1月前
|
消息中间件 存储 运维
|
29天前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
26 0
|
2天前
|
消息中间件 存储 数据安全/隐私保护
RabbitMQ使用教程
RabbitMQ使用教程
9 2
|
1月前
|
消息中间件 Linux 开发工具
Linux系统安装RabbitMQ详细教程
Linux系统安装RabbitMQ详细教程
24 0
|
1月前
|
消息中间件 存储 Cloud Native
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
|
3月前
|
消息中间件 Java
RabbitMQ中的消息确认机制是什么?为什么需要消息确认?
RabbitMQ中的消息确认机制是什么?为什么需要消息确认?
32 0
|
4月前
|
消息中间件 数据安全/隐私保护 Docker
百度搜索:蓝易云【Docker安装RabbitMQ docker安装RabbitMQ完整详细教程】
通过按照以上步骤,你应该能够成功在Docker上安装并运行RabbitMQ。请记住,具体步骤可能会因Docker版本和操作系统而有所不同。如果遇到任何问题,可以查阅官方文档或社区寻求更多帮助。
122 0
|
4月前
|
消息中间件 JSON 运维
spring boot RabbitMq基础教程(三)
spring boot RabbitMq基础教程
75 1