高性能消息中间件 RabbitMQ(四)

简介: 高性能消息中间件 RabbitMQ(四)

4.编写生产者

SpringBoot整合RabbitMQ时,提供了工具类RabbitTemplate发送消息,编写生产者时只需要注入RabbitTemplate即可发送消息。

package com.zj;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testProducer(){
        /*
        * 参数一:交换机名称
        * 参数二:路由关键字
        * 参数三:要发送的消息
        */
        rabbitTemplate.convertAndSend("boot_topic_exchange","message","hello MQ");
    }
}

5.编写消费者

我们编写另一个SpringBoot项目作为RabbitMQ的消费者,因为在同一个项目中的话直接方法调用就可以。

1、创建项目导入依赖。

2、编写配置文件,和生产者的相同

3、编写消费者,监听队列

@Component
public class Consumer {
    // 监听队列
    @RabbitListener(queues = "boot_queue")
    public void listen_message(String message){
        System.out.println("发送短信:"+message);
    }
}

4、运行项目。观察管控台队列和控制台

五、消息的可靠性投递

5.1 概念

RabbitMQ消息投递的路径为:

生产者--->交换机--->队列--->消费者

在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?

  • 确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。
  • 退回模式(return)可以监听消息是否从交换机成功传递到队列。
  • 消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。

首先我们准备两个SpringBoot项目,分别代表生产者和消费者,配置文件如下:

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
#日志格式
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

在生产者的配置类创建交换机和队列:

package com.zj.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
    private final String EXCHANGE_NAME = "boot_topic_exchange";
    private final String QUEUE_NAME = "boot_queue";
    // 创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange getExchange() {
        return ExchangeBuilder
                .topicExchange(EXCHANGE_NAME) // 交换机类型和名称
                .durable(true) // 是否持久化
                .build();
    }
    // 创建队列
    @Bean(QUEUE_NAME)
    public Queue getMessageQueue() {
        return new Queue(QUEUE_NAME); // 队列名
    }
    // 交换机绑定队列
    @Bean
    public Binding bindMessageQueue(@Qualifier(EXCHANGE_NAME) Exchange exchange,
                                    @Qualifier(QUEUE_NAME) Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("#.message.#")
                .noargs();
    }
}

创建生产者

@Component
public class Consumer {
    // 监听队列
    @RabbitListener(queues = "boot_queue")
    public void listen_message(String message){
        System.out.println("发送短信:"+message);
    }
}

5.2 确认模式

确认模式(confirm)可以监听消息是否从生产者成功传递到交换机,使用方法如下:

1、生产者配置文件开启确认模式

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
    # 开启确认模式
    publisher-confirm-type: correlated

2、生产者定义确认模式的回调方法,并模拟向不存在的交换机aaa发送消息。

package com.zj;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testProducer(){
        //定义确认模式的回调方法,当消息向交换机发送后会调用confirm方法。
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * @param correlationData 相关配置信息
             * @param ack  交换机是否收到消息
             * @param cause  失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                  if(ack){
                      System.out.println("消息接受成功");
                  }else {
                      System.out.println("消息接受失败:"+cause);
                      //做一些处理让消息再次发送
                  }
            }
        });
        rabbitTemplate.convertAndSend("aaa","message","hello MQ");
    }
}

3、运行结果

消息接受失败:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'aaa' in vhost '/', class-id=60, method-id=40)

5.3 退回模式

退回模式(return)可以监听消息是否从交换机成功传递到队列,使用方法如下:

1、生产者配置文件开启退回模式

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
    # 开启确认模式
    publisher-confirm-type: correlated
      # 开启回退模式
    publisher-returns: true

2、生产者定义退回模式的回调方法,模拟向不存在的队列bbb发送消息。

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testProducer(){
        //定义退回模式的回调方法,只有交换机将消息发送到队列失败后才会执行该方法。
         rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
             /**
              *
              * @param returnedMessage 失败后将失败信息封装到该参数
              */
             @Override
             public void returnedMessage(ReturnedMessage returnedMessage) {
                 System.out.println("消息对象:"+returnedMessage);
                 System.out.println("错误码:"+returnedMessage.getReplyCode());
                 System.out.println("错误信息:"+returnedMessage.getReplyText());
                 System.out.println("交换机:"+returnedMessage.getExchange());
                 System.out.println("路由键:"+returnedMessage.getRoutingKey());
                 //处理消息……
             }
         });
        rabbitTemplate.convertAndSend("boot_topic_exchange","bbb","hello MQ");
    }
}
消息对象:ReturnedMessage [message=(Body:'hello MQ' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=boot_topic_exchange, routingKey=bbb]
错误码:312
错误信息:NO_ROUTE
交换机:boot_topic_exchange
路由键:bbb

5.4 Ack手动签收

在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。

消息分为自动确认和手动确认。自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。

  • 自动确认:spring.rabbitmq.listener.simple.acknowledge="none"
  • 手动确认:spring.rabbitmq.listener.simple.acknowledge="manual"

1.消费者配置开启手动签收

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
      # 开启手动签收
    listener:
      simple:
        acknowledge-mode: manual

2、消费者处理消息时定义手动签收和拒绝签收的情况

package com.zj.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class ACKConsumer {
    // 监听队列
    /**
     *
     * @param message  消息对象
     * @param channel   信道对象,用于手动接受消息
     */
    @RabbitListener(queues = "boot_queue")
    public void listen_message(Message message, Channel channel) throws IOException {
        //deliveryTag:消息投递序号,每次投递该值都会+1.
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //签收消息
            /*
             * 参数一:消息投递序号
             * 参数二:一次是否可以签收多条消息
             */
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){
             //拒签消息
            /*
             * 参数一:消息投递序号
             * 参数二:一次是否可以签收多条消息
             * 参数三:拒签后消息是否重回队列(处在队列中的消息会不断的再向消费者发送消息)
             */
            channel.basicNack(deliveryTag,true,true);
            System.out.println("消息消费失败");
        }
    }
}


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
消息中间件 存储 设计模式
RocketMQ原理—5.高可用+高并发+高性能架构
本文主要从高可用架构、高并发架构、高性能架构三个方面来介绍RocketMQ的原理。
3550 21
RocketMQ原理—5.高可用+高并发+高性能架构
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
965 2
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
1563 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
11月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
9月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
610 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
1074 100
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
544 114

热门文章

最新文章