SpringBoot+RabbitMQ 实现手动消息确认(ACK)上

简介: SpringBoot+RabbitMQ 实现手动消息确认(ACK)

一、前言

前几天我研究了关于springboot整合简单消息队列,实现springboot推送消息至队列中,消费者成功消费。同时也加了消息转发器,对消息转发器各种类型的配置等做了总结。

但是,主要还有一点,我一直存在疑问:如何确保消息成功被消费者消费?

说到这里,我相信很多人会说使用ack啊,关闭队列自动删除啊什么的。主要是道理大家都懂,我要实际的代码,网上找了半天,和我设想的有很大差异,还是自己做研究总结吧。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

二、准备

本次写案例,就按照最简单的方式,direct方式进行配置吧,实际流程如下所示:

微信图片_20220908150446.png

  • 消息转发器类型: direct直连方式。
  • 消息队列: 暂时采取公平分发方式。
  • 实现流程: 消息生产者生产的消息发送至队列中,由两个消费者获取并消费,消费完成后,清楚消息队列中的消息。

所以我们接下来先写配置和demo。

2.1、依赖引入

再一般的springboot 2.1.4项目中,添加一个pom依赖。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2、连接yml的配置

我们这边暂时只有一个rabbitmq,所以连接操作,基本rabbitmq的信息配置问题直接再yml中编写就可以了。

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: xiangjiao
    password: bunana
    virtual-host: /xiangjiao
    publisher-confirms: true   #开启发送确认
    publisher-returns: true  #开启发送失败回退
    #开启ack
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual #采取手动应答
        #concurrency: 1 # 指定最小的消费者数量
        #max-concurrency: 1 #指定最大的消费者数量
        retry:
          enabled: true # 是否支持重试

2.3、config注入配置

我们根据图示

微信图片_20220908150510.png

知道我们必须配置以下东西:

  • 一个消息转发器,我们取名directExchangeTx
  • 一个消息队列,取名directQueueTx,并将其绑定至指定的消息转发器上。

所以我们的配置文件需要这么写:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * 直连交换机,发送指定队列信息,但这个队列后有两个消费者同时进行消费
 * @author 7651
 *
 */
@Configuration
public class DirectExchangeTxQueueConfig {
 @Bean(name="getDirectExchangeTx")
 public DirectExchange getDirectExchangeTx(){
  return new DirectExchange("directExchangeTx", true, false);
 }
 @Bean(name="getQueueTx")
 public Queue getQueueTx(){
  return new Queue("directQueueTx", true, false, false);
 }
 @Bean
 public Binding getDirectExchangeQueueTx(
   @Qualifier(value="getDirectExchangeTx") DirectExchange getDirectExchangeTx,
   @Qualifier(value="getQueueTx") Queue getQueueTx){
  return BindingBuilder.bind(getQueueTx).to(getDirectExchangeTx).with("directQueueTxRoutingKey");
 }
}

2.4、消费者的配置

有了队列和消息转发器,消息当然需要去消费啊,所以我们接下来配置消息消费者。

微信图片_20220908150545.png

从图中,我们看出,我们需要配置两个消息消费者,同时监听一个队列,所以我们的配置类为:

消费者一:

import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
@RabbitListener(queues="directQueueTx")
public class Consumer1 {
 @RabbitHandler
 public void process(String msg,Channel channel, Message message) throws IOException {
  //拿到消息延迟消费
  try {
   Thread.sleep(1000*1);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  try {
   /**
    * 确认一条消息:<br>
    * channel.basicAck(deliveryTag, false); <br>
    * deliveryTag:该消息的index <br>
    * multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息 <br>
    */
   channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
   System.out.println("get msg1 success msg = "+msg);
  } catch (Exception e) {
   //消费者处理出了问题,需要告诉队列信息消费失败
   /**
    * 拒绝确认消息:<br>
    * channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ; <br>
    * deliveryTag:该消息的index<br>
    * multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。<br>
    * requeue:被拒绝的是否重新入队列 <br>
    */
   channel.basicNack(message.getMessageProperties().getDeliveryTag(),
     false, true);
   System.err.println("get msg1 failed msg = "+msg);
   /**
    * 拒绝一条消息:<br>
    * channel.basicReject(long deliveryTag, boolean requeue);<br>
    * deliveryTag:该消息的index<br>
    * requeue:被拒绝的是否重新入队列 
    */
   //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
  }
 }
}

消息消费者二:

import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
@RabbitListener(queues="directQueueTx")
public class Consumer2 {
 @RabbitHandler
 public void process(String msg,Channel channel, Message message) throws IOException {
  //拿到消息延迟消费
  try {
   Thread.sleep(1000*3);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  try {
   channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
   System.out.println("get msg2 success msg = "+msg);
  } catch (Exception e) {
   //消费者处理出了问题,需要告诉队列信息消费失败
   channel.basicNack(message.getMessageProperties().getDeliveryTag(),
     false, true);
   System.err.println("get msg2 failed msg = "+msg);
  }
 }
}

两个消费者之间唯一的区别在于两者获取消息后,延迟时间不一致。

2.5、消息生产者

有了消息消费者,我们需要有一个方式提供消息并将消息推送到消息队列中。

public interface IMessageServcie {
 public void sendMessage(String exchange,String routingKey,Object msg);
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import cn.linkpower.service.IMessageServcie;
@Component
public class MessageServiceImpl implements IMessageServcie,ConfirmCallback,ReturnCallback {
 private static Logger log = LoggerFactory.getLogger(MessageServiceImpl.class);
 @Autowired
 private RabbitTemplate rabbitTemplate;
 @Override
 public void sendMessage(String exchange,String routingKey,Object msg) {
  //消息发送失败返回到队列中, yml需要配置 publisher-returns: true
  rabbitTemplate.setMandatory(true);
  //消息消费者确认收到消息后,手动ack回执
  rabbitTemplate.setConfirmCallback(this);
  rabbitTemplate.setReturnCallback(this);
  //发送消息
  rabbitTemplate.convertAndSend(exchange,routingKey,msg);
 }
 @Override
 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" ");
 }
 @Override
 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  log.info("---- confirm ----ack="+ack+"  cause="+String.valueOf(cause));
  log.info("correlationData -->"+correlationData.toString());
  if(ack){
   log.info("---- confirm ----ack==true  cause="+cause);
  }else{
   log.info("---- confirm ----ack==false  cause="+cause);
  }
 }
}

除了定义好了消息发送的工具服务接口外,我们还需要一个类,实现请求时产生消息,所以我们写一个controller。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import cn.linkpower.service.IMessageServcie;
@Controller
public class SendMessageTx {
 @Autowired
 private IMessageServcie messageServiceImpl;
 @RequestMapping("/sendMoreMsgTx")
 @ResponseBody
 public String sendMoreMsgTx(){
  //发送10条消息
  for (int i = 0; i < 10; i++) {
   String msg = "msg"+i;
   System.out.println("发送消息  msg:"+msg);
   messageServiceImpl.sendMessage("directExchangeTx", "directQueueTxRoutingKey", msg);
   //每两秒发送一次
   try {
    Thread.sleep(2000);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
  return "send ok";
 }
}

运行springboot项目,访问指定的url,是可以观察到消息产生和消费的。

有些人会问,写到这里就够了吗,你这和之前博客相比,和没写一样啊,都是教我们如何配置,如何生产消息,如何消费消息。

所以接下来的才是重点了,我们一起研究一个事,当我们配置的消费者二出现消费消息时,出问题了,你如何能够保证像之前那样,消费者一处理剩下的消息?

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

三、ack配置和测试

3.1、模拟消费者二出问题

我们发送的消息格式都是 msg1、msg2、…

所以,我们不妨这么想,当我消费者二拿到的消息msg后面的数字大于3,表示我不要了。

import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
@RabbitListener(queues="directQueueTx")
public class Consumer2 {
 @RabbitHandler
 public void process(String msg,Channel channel, Message message) throws IOException {
  //拿到消息延迟消费
  try {
   Thread.sleep(1000*3);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  try {
   if(!isNull(msg)){
    String numstr = msg.substring(3);
    Integer num = Integer.parseInt(numstr);
    if(num >= 3){
     channel.basicNack(message.getMessageProperties().getDeliveryTag(),
       false, true);
     System.out.println("get msg2 basicNack msg = "+msg);
    }else{
     channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
     System.out.println("get msg2 basicAck msg = "+msg);
    }
   }
  } catch (Exception e) {
   //消费者处理出了问题,需要告诉队列信息消费失败
   channel.basicNack(message.getMessageProperties().getDeliveryTag(),
     false, true);
   System.err.println("get msg2 failed msg = "+msg);
  }
 }
 public static boolean isNull(Object obj){
  return obj == null || obj == ""||obj == "null";
 }
}

再次请求接口,我们统计日志信息打印发现:

微信图片_20220908150618.png

发现:

当我们对消息者二进行限制大于等于3时,不接受消息队列传递来的消息时,消息队列会随机重发那条消息,直至消息发送至完好的消费者一时,才会把消息消费掉。

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
6天前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
77 1
|
6天前
|
消息中间件 Java Apache
SpringBoot集成RocketMq
RocketMQ 是一款开源的分布式消息中间件,采用纯 Java 编写,支持事务消息、顺序消息、批量消息、定时消息及消息回溯等功能。其优势包括去除对 ZooKeeper 的依赖、支持异步和同步刷盘、高吞吐量及消息过滤等特性。RocketMQ 具备高可用性和高可靠性,适用于大规模分布式系统,能有效保障消息传输的一致性和顺序性。
348 2
|
4月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
176 32
|
3月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
540 0
|
消息中间件 Java 网络架构
|
9月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
142 6
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
2013 3
|
消息中间件 Java Maven