一、前言
前几天我研究了关于springboot整合简单消息队列,实现springboot推送消息至队列中,消费者成功消费。同时也加了消息转发器,对消息转发器各种类型的配置等做了总结。
但是,主要还有一点,我一直存在疑问:如何确保消息成功被消费者消费?
说到这里,我相信很多人会说使用ack啊,关闭队列自动删除啊什么的。主要是道理大家都懂,我要实际的代码,网上找了半天,和我设想的有很大差异,还是自己做研究总结吧。
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
二、准备
本次写案例,就按照最简单的方式,direct方式进行配置吧,实际流程如下所示:
- 消息转发器类型: direct直连方式。
- 消息队列: 暂时采取公平分发方式。
- 实现流程: 消息生产者生产的消息发送至队列中,由两个消费者获取并消费,消费完成后,清楚消息队列中的消息。
所以我们接下来先写配置和demo。
2.1、依赖引入
再一般的springboot 2.1.4项目中,添加一个pom依赖。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.2、连接yml的配置
我们这边暂时只有一个rabbitmq,所以连接操作,基本rabbitmq的信息配置问题直接再yml中编写就可以了。
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: xiangjiao password: bunana virtual-host: /xiangjiao publisher-confirms: true #开启发送确认 publisher-returns: true #开启发送失败回退 #开启ack listener: direct: acknowledge-mode: manual simple: acknowledge-mode: manual #采取手动应答 #concurrency: 1 # 指定最小的消费者数量 #max-concurrency: 1 #指定最大的消费者数量 retry: enabled: true # 是否支持重试
2.3、config注入配置
我们根据图示
知道我们必须配置以下东西:
- 一个消息转发器,我们取名
directExchangeTx
。 - 一个消息队列,取名
directQueueTx
,并将其绑定至指定的消息转发器上。
所以我们的配置文件需要这么写:
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 直连交换机,发送指定队列信息,但这个队列后有两个消费者同时进行消费 * @author 7651 * */ @Configuration public class DirectExchangeTxQueueConfig { @Bean(name="getDirectExchangeTx") public DirectExchange getDirectExchangeTx(){ return new DirectExchange("directExchangeTx", true, false); } @Bean(name="getQueueTx") public Queue getQueueTx(){ return new Queue("directQueueTx", true, false, false); } @Bean public Binding getDirectExchangeQueueTx( @Qualifier(value="getDirectExchangeTx") DirectExchange getDirectExchangeTx, @Qualifier(value="getQueueTx") Queue getQueueTx){ return BindingBuilder.bind(getQueueTx).to(getDirectExchangeTx).with("directQueueTxRoutingKey"); } }
2.4、消费者的配置
有了队列和消息转发器,消息当然需要去消费啊,所以我们接下来配置消息消费者。
从图中,我们看出,我们需要配置两个消息消费者,同时监听一个队列,所以我们的配置类为:
消费者一:
import java.io.IOException; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; @Component @RabbitListener(queues="directQueueTx") public class Consumer1 { @RabbitHandler public void process(String msg,Channel channel, Message message) throws IOException { //拿到消息延迟消费 try { Thread.sleep(1000*1); } catch (InterruptedException e) { e.printStackTrace(); } try { /** * 确认一条消息:<br> * channel.basicAck(deliveryTag, false); <br> * deliveryTag:该消息的index <br> * multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息 <br> */ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("get msg1 success msg = "+msg); } catch (Exception e) { //消费者处理出了问题,需要告诉队列信息消费失败 /** * 拒绝确认消息:<br> * channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ; <br> * deliveryTag:该消息的index<br> * multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。<br> * requeue:被拒绝的是否重新入队列 <br> */ channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); System.err.println("get msg1 failed msg = "+msg); /** * 拒绝一条消息:<br> * channel.basicReject(long deliveryTag, boolean requeue);<br> * deliveryTag:该消息的index<br> * requeue:被拒绝的是否重新入队列 */ //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } }
消息消费者二:
import java.io.IOException; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; @Component @RabbitListener(queues="directQueueTx") public class Consumer2 { @RabbitHandler public void process(String msg,Channel channel, Message message) throws IOException { //拿到消息延迟消费 try { Thread.sleep(1000*3); } catch (InterruptedException e) { e.printStackTrace(); } try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("get msg2 success msg = "+msg); } catch (Exception e) { //消费者处理出了问题,需要告诉队列信息消费失败 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); System.err.println("get msg2 failed msg = "+msg); } } }
两个消费者之间唯一的区别在于两者获取消息后,延迟时间不一致。
2.5、消息生产者
有了消息消费者,我们需要有一个方式提供消息并将消息推送到消息队列中。
public interface IMessageServcie { public void sendMessage(String exchange,String routingKey,Object msg); } import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import cn.linkpower.service.IMessageServcie; @Component public class MessageServiceImpl implements IMessageServcie,ConfirmCallback,ReturnCallback { private static Logger log = LoggerFactory.getLogger(MessageServiceImpl.class); @Autowired private RabbitTemplate rabbitTemplate; @Override public void sendMessage(String exchange,String routingKey,Object msg) { //消息发送失败返回到队列中, yml需要配置 publisher-returns: true rabbitTemplate.setMandatory(true); //消息消费者确认收到消息后,手动ack回执 rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); //发送消息 rabbitTemplate.convertAndSend(exchange,routingKey,msg); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" "); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("---- confirm ----ack="+ack+" cause="+String.valueOf(cause)); log.info("correlationData -->"+correlationData.toString()); if(ack){ log.info("---- confirm ----ack==true cause="+cause); }else{ log.info("---- confirm ----ack==false cause="+cause); } } }
除了定义好了消息发送的工具服务接口外,我们还需要一个类,实现请求时产生消息,所以我们写一个controller。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import cn.linkpower.service.IMessageServcie; @Controller public class SendMessageTx { @Autowired private IMessageServcie messageServiceImpl; @RequestMapping("/sendMoreMsgTx") @ResponseBody public String sendMoreMsgTx(){ //发送10条消息 for (int i = 0; i < 10; i++) { String msg = "msg"+i; System.out.println("发送消息 msg:"+msg); messageServiceImpl.sendMessage("directExchangeTx", "directQueueTxRoutingKey", msg); //每两秒发送一次 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } return "send ok"; } }
运行springboot项目,访问指定的url,是可以观察到消息产生和消费的。
有些人会问,写到这里就够了吗,你这和之前博客相比,和没写一样啊,都是教我们如何配置,如何生产消息,如何消费消息。
所以接下来的才是重点了,我们一起研究一个事,当我们配置的消费者二出现消费消息时,出问题了,你如何能够保证像之前那样,消费者一处理剩下的消息?
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
三、ack配置和测试
3.1、模拟消费者二出问题
我们发送的消息格式都是 msg1、msg2、…
所以,我们不妨这么想,当我消费者二拿到的消息msg后面的数字大于3,表示我不要了。
import java.io.IOException; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; @Component @RabbitListener(queues="directQueueTx") public class Consumer2 { @RabbitHandler public void process(String msg,Channel channel, Message message) throws IOException { //拿到消息延迟消费 try { Thread.sleep(1000*3); } catch (InterruptedException e) { e.printStackTrace(); } try { if(!isNull(msg)){ String numstr = msg.substring(3); Integer num = Integer.parseInt(numstr); if(num >= 3){ channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); System.out.println("get msg2 basicNack msg = "+msg); }else{ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("get msg2 basicAck msg = "+msg); } } } catch (Exception e) { //消费者处理出了问题,需要告诉队列信息消费失败 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); System.err.println("get msg2 failed msg = "+msg); } } public static boolean isNull(Object obj){ return obj == null || obj == ""||obj == "null"; } }
再次请求接口,我们统计日志信息打印发现:
发现:
当我们对消息者二进行限制大于等于3时,不接受消息队列传递来的消息时,消息队列会随机重发那条消息,直至消息发送至完好的消费者一时,才会把消息消费掉。