高性能消息中间件 RabbitMQ(四)

简介: 高性能消息中间件 RabbitMQ(四)

4.编写生产者

SpringBoot整合RabbitMQ时,提供了工具类RabbitTemplate发送消息,编写生产者时只需要注入RabbitTemplate即可发送消息。

package com.zj;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testProducer(){
        /*
        * 参数一:交换机名称
        * 参数二:路由关键字
        * 参数三:要发送的消息
        */
        rabbitTemplate.convertAndSend("boot_topic_exchange","message","hello MQ");
    }
}

5.编写消费者

我们编写另一个SpringBoot项目作为RabbitMQ的消费者,因为在同一个项目中的话直接方法调用就可以。

1、创建项目导入依赖。

2、编写配置文件,和生产者的相同

3、编写消费者,监听队列

@Component
public class Consumer {
    // 监听队列
    @RabbitListener(queues = "boot_queue")
    public void listen_message(String message){
        System.out.println("发送短信:"+message);
    }
}

4、运行项目。观察管控台队列和控制台

五、消息的可靠性投递

5.1 概念

RabbitMQ消息投递的路径为:

生产者--->交换机--->队列--->消费者

在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?

  • 确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。
  • 退回模式(return)可以监听消息是否从交换机成功传递到队列。
  • 消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。

首先我们准备两个SpringBoot项目,分别代表生产者和消费者,配置文件如下:

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
#日志格式
logging:
  pattern:
    console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

在生产者的配置类创建交换机和队列:

package com.zj.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
    private final String EXCHANGE_NAME = "boot_topic_exchange";
    private final String QUEUE_NAME = "boot_queue";
    // 创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange getExchange() {
        return ExchangeBuilder
                .topicExchange(EXCHANGE_NAME) // 交换机类型和名称
                .durable(true) // 是否持久化
                .build();
    }
    // 创建队列
    @Bean(QUEUE_NAME)
    public Queue getMessageQueue() {
        return new Queue(QUEUE_NAME); // 队列名
    }
    // 交换机绑定队列
    @Bean
    public Binding bindMessageQueue(@Qualifier(EXCHANGE_NAME) Exchange exchange,
                                    @Qualifier(QUEUE_NAME) Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("#.message.#")
                .noargs();
    }
}

创建生产者

@Component
public class Consumer {
    // 监听队列
    @RabbitListener(queues = "boot_queue")
    public void listen_message(String message){
        System.out.println("发送短信:"+message);
    }
}

5.2 确认模式

确认模式(confirm)可以监听消息是否从生产者成功传递到交换机,使用方法如下:

1、生产者配置文件开启确认模式

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
    # 开启确认模式
    publisher-confirm-type: correlated

2、生产者定义确认模式的回调方法,并模拟向不存在的交换机aaa发送消息。

package com.zj;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testProducer(){
        //定义确认模式的回调方法,当消息向交换机发送后会调用confirm方法。
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * @param correlationData 相关配置信息
             * @param ack  交换机是否收到消息
             * @param cause  失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                  if(ack){
                      System.out.println("消息接受成功");
                  }else {
                      System.out.println("消息接受失败:"+cause);
                      //做一些处理让消息再次发送
                  }
            }
        });
        rabbitTemplate.convertAndSend("aaa","message","hello MQ");
    }
}

3、运行结果

消息接受失败:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'aaa' in vhost '/', class-id=60, method-id=40)

5.3 退回模式

退回模式(return)可以监听消息是否从交换机成功传递到队列,使用方法如下:

1、生产者配置文件开启退回模式

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
    # 开启确认模式
    publisher-confirm-type: correlated
      # 开启回退模式
    publisher-returns: true

2、生产者定义退回模式的回调方法,模拟向不存在的队列bbb发送消息。

@SpringBootTest
class DemoApplicationTests {
    @Resource
    public RabbitTemplate rabbitTemplate;
    @Test
    public void testProducer(){
        //定义退回模式的回调方法,只有交换机将消息发送到队列失败后才会执行该方法。
         rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
             /**
              *
              * @param returnedMessage 失败后将失败信息封装到该参数
              */
             @Override
             public void returnedMessage(ReturnedMessage returnedMessage) {
                 System.out.println("消息对象:"+returnedMessage);
                 System.out.println("错误码:"+returnedMessage.getReplyCode());
                 System.out.println("错误信息:"+returnedMessage.getReplyText());
                 System.out.println("交换机:"+returnedMessage.getExchange());
                 System.out.println("路由键:"+returnedMessage.getRoutingKey());
                 //处理消息……
             }
         });
        rabbitTemplate.convertAndSend("boot_topic_exchange","bbb","hello MQ");
    }
}
消息对象:ReturnedMessage [message=(Body:'hello MQ' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=boot_topic_exchange, routingKey=bbb]
错误码:312
错误信息:NO_ROUTE
交换机:boot_topic_exchange
路由键:bbb

5.4 Ack手动签收

在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。

消息分为自动确认和手动确认。自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。

  • 自动确认:spring.rabbitmq.listener.simple.acknowledge="none"
  • 手动确认:spring.rabbitmq.listener.simple.acknowledge="manual"

1.消费者配置开启手动签收

spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: MQzhang
    password: MQzhang
    virtual-host: /
      # 开启手动签收
    listener:
      simple:
        acknowledge-mode: manual

2、消费者处理消息时定义手动签收和拒绝签收的情况

package com.zj.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class ACKConsumer {
    // 监听队列
    /**
     *
     * @param message  消息对象
     * @param channel   信道对象,用于手动接受消息
     */
    @RabbitListener(queues = "boot_queue")
    public void listen_message(Message message, Channel channel) throws IOException {
        //deliveryTag:消息投递序号,每次投递该值都会+1.
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //签收消息
            /*
             * 参数一:消息投递序号
             * 参数二:一次是否可以签收多条消息
             */
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){
             //拒签消息
            /*
             * 参数一:消息投递序号
             * 参数二:一次是否可以签收多条消息
             * 参数三:拒签后消息是否重回队列(处在队列中的消息会不断的再向消费者发送消息)
             */
            channel.basicNack(deliveryTag,true,true);
            System.out.println("消息消费失败");
        }
    }
}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
216 3
|
19天前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
19天前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
|
19天前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
2月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
|
2月前
|
消息中间件 监控 负载均衡
中间件RabbitMQ性能瓶颈
【7月更文挑战第13天】
121 11
|
2月前
|
消息中间件 NoSQL Kafka
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
消息中间件(RocketMQ、RabbitMQ、ActiveMQ、Redis、kafka、ZeroMQ)以及之间的区别
|
2月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
2月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
下一篇
DDNS