前言
实际业务中,例如秒杀系统,秒杀商品成功会有截止时间,这时需要用到RabbitMQ延迟服务。
1、RabbitMQ延迟队列
1.1、方式1:RabbitMQ通过死信机制来实现延迟队列的功能
- TTL ,即 Time-To-Live,存活时间,消息和队列都可以设置存活时间
- Dead Letter,即死信,若给消息设置了存活时间,当超过存活时间后消息还没有被消费,则该消息变成了死信
- Dead Letter Exchanges(DLX),即死信交换机
- Dead Letter Routing Key (DLK),死信路由键
/***********************延迟队列*************************/ //创建立即消费队列 @Bean public Queue immediateQueue(){ return new Queue("immediateQueue"); } //创建立即消费交换机 @Bean public DirectExchange immediateExchange(){ return new DirectExchange("immediateExchange"); } @Bean public Binding bindingImmediate(@Qualifier("immediateQueue") Queue queue,@Qualifier("immediateExchange") DirectExchange directExchange){ return BindingBuilder.bind(queue).to(directExchange).with("immediateRoutingKey"); } //创建延迟队列 @Bean public Queue delayQueue(){ Map<String,Object> params = new HashMap<>(); //死信队列转发的死信转发到立即处理信息的交换机 params.put("x-dead-letter-exchange","immediateExchange"); //死信转化携带的routing-key params.put("x-dead-letter-routing-key","immediateRoutingKey"); //设置消息过期时间,单位:毫秒 params.put("x-message-ttl",60 * 1000); return new Queue("delayQueue",true,false,false,params); } @Bean public DirectExchange delayExchange(){ return new DirectExchange("delayExchange"); } @Bean public Binding bindingDelay(@Qualifier("delayQueue") Queue queue,@Qualifier("delayExchange") DirectExchange directExchange){ return BindingBuilder.bind(queue).to(directExchange).with("delayRoutingKey"); }
@Test public void sendDelay(){ this.rabbitTemplate.convertAndSend("delayExchange","delayRoutingKey","Hello world topic"); }
1.2、方式二:安装延迟队列插件
1.2.1、安装延迟队列插件:
下载解压,到plugins目录,执行以下的命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
/**************延迟队列一个单一queue*******************/ @Bean public Queue delayNewQueue(){ return new Queue("delayNewQueue"); } @Bean public CustomExchange delayNewExchange(){ Map<String, Object> args = new HashMap<>(); // 设置类型,可以为fanout、direct、topic args.put("x-delayed-type", "direct"); return new CustomExchange("delayNewExchange","x-delayed-message", true,false,args); } @Bean public Binding bindingNewDelay(@Qualifier("delayNewQueue") Queue queue,@Qualifier("delayNewExchange") CustomExchange customExchange){ return BindingBuilder.bind(queue).to(customExchange).with("delayNewRoutingKey").noargs(); }
@Test public void sendDelay() { //生产端写完了 UserInfo userInfo = new UserInfo(); userInfo.setPassword("13432432"); userInfo.setUserAccount("tiger"); this.rabbitTemplate.convertAndSend("delayNewExchange", "delayNewRoutingKey", userInfo , a -> { //单位毫秒 a.getMessageProperties().setDelay(30000); return a; }); }
2、消息确认机制
消息确认分为两部分: 生产确认 和 消费确认。
生产确认: 生产者生产消息后,将消息发送到交换机,触发确认回调;交换机将消息转发到绑定队列,若失败则触发返回回调。
消费确认: 默认情况下消息被消费者从队列中获取后即发送确认,不管消费者处理消息时是否失败,不需要额外代码,但是不能保证消息被正确消费。我们增加手动确认,则需要代码中明确进行消息确认。
2.1、生产确认
@Bean public RabbitTemplate getTemplate(ConnectionFactory connectionFactory){ RabbitTemplate template = new RabbitTemplate(connectionFactory); //消息发送到交换器Exchange后触发回调 template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { // 可以进行消息入库操作 log.info("消息唯一标识 correlationData = {}", correlationData); log.info("确认结果 ack = {}", ack); log.info("失败原因 cause = {}", cause); } }); // 配置这个,下面的ReturnCallback 才会起作用 template.setMandatory(true); // 如果消息从交换器发送到对应队列失败时触发(比如 根据发送消息时指定的routingKey找不到队列时会触发) template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { // 可以进行消息入库操作 log.info("消息主体 message = {}", returnedMessage.getMessage()); log.info("回复码 replyCode = {}", returnedMessage.getReplyCode()); log.info("回复描述 replyText = {}", returnedMessage.getReplyText()); log.info("交换机名字 exchange = {}", returnedMessage.getExchange()); log.info("路由键 routingKey = {}", returnedMessage.getRoutingKey()); } }); return template; }
spring: cloud: nacos: discovery: server-addr: localhost:8848 application: name: drp-user-service #微服务名称 datasource: username: root password: root url: jdbc:mysql://127.0.0.1:3306/drp driver-class-name: com.mysql.cj.jdbc.Driver rabbitmq: host: 127.0.0.1 port: 5672 username: tiger password: tiger virtual-host: tiger_vh # 确认消息已发送到交换机(Exchange) publisher-confirm-type: correlated # 确认消息已发送到队列 publisher-returns: true listener: simple: acknowledge-mode: manual # 开启消息消费手动确认 retry: enabled: true
2.2、消费确认
@RabbitHandler public void process(UserInfo data, Message message, Channel channel){ log.info("收到directQueue队列信息:" + data); long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //成功消费确认 channel.basicAck(deliveryTag,true); log.info("消费成功确认完毕。。。。。"); } catch (IOException e) { log.error("确认消息时抛出异常 ", e); // 重新确认,成功确认消息 try { Thread.sleep(50); channel.basicAck(deliveryTag, true); } catch (IOException | InterruptedException e1) { log.error("确认消息时抛出异常 ", e); // 可以考虑入库 } }catch (Exception e){ log.error("业务处理失败", e); try { // 失败确认 channel.basicNack(deliveryTag, false, false); } catch (IOException e1) { log.error("消息失败确认失败", e1); } } }