SpringBoot 整合 RabbitMQ

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: SpringBoot 整合 RabbitMQ

应用场景:

  • 异步处理:如果用户注册后需要给用户发送邮件和短信,那么就可以使用消息队列中间件进行异步发送,因为发送邮件和发送短信这两个操作没有任何的关联顺序。
  • 应用解耦:例如订单系统调用库存系统的时候,可以加个中间件做到应用解耦。
  • 流量控制:流量控制也叫流量削峰,是说当高并发的时候或者是秒杀的场景下,可以将流量都放到消息队列中,然后处理秒杀的业务再以此进行处理,避免高并发下服务宕机。

消息中间件的模式:

  • 点对点发布模式:消息发送者发布消息到消息代理,消息代理将消息放入队列,消息接收者从队列中获取消息,之后队列移除消息,消息可以有多个接收者进行处理
  • 发布订阅模式:消息发送者发送到主题,多个消息接收者对这个主题进行监听,那么就会在消息到达的时候同时受到消息

概念

  • Publisher生产者,会与代理服务器Broker建立一条长连接,并且开辟出很多信道Chanel,通过这些Chanel将消息发送给Broker
  • Message 消息(头,路由键 + 体)
  • Broker:代理服务器(有多个交换机多个队列)
  • VHost:用于做隔离,例如隔离生不同语言之间的消息管理或者是生产环境和开发环境之间的隔离
  • Exchange 交换机,负责接收消息,交换机和队列之间有绑定关系Binding,交换机根据消息头中的路由键转发到相应的队列里
  • Exchange 交换机,负责接收消息,交换机和队列之间有绑定关系Binding,交换机根据消息头中的路由键转发到相应的队列里
  • Exchange 交换机,负责接收消息,交换机和队列之间有绑定关系Binding,交换机根据消息头中的路由键转发到相应的队列里
  • Queue队列,存储消息
  • Queue队列,存储消息
  • Queue队列,存储消息
  • Consumer消费者,一个消费者也可以与代理服务器Broker建立一条长连接,并开辟很多Chanel,这些Chanel可以监听队列中的消息

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

安装

[root@localhost ~]# docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
[root@localhost ~]# docker update rabbitmq --restart=always
rabbitmq

交换机类型:

  • Direct 直连,交换机会根据路由键精确匹配到一个队列上
  • fanout广播,交换机会给已经绑定的所有队列群发消息
  • topic 根据模糊匹配规则#和*进行路由匹配发送给队列消息,#代表多个字符, * 代表必须要有一个字符

SpringBoot整合RabbitMQ

引入依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

在主启动类中开启@EnableRabbit

配置yml文件

spring:
  rabbitmq:
    port: 5672
    host: 192.168.100.10
    virtual-host: /
    publisher-confirms: true  #开启手动确认机制
    publisher-returns: true
    template:
      mandatory: true
    listener:
      simple:
        acknowledge-mode: manual

配置mq配置类,不选择的话是使用的jdk虚拟化而是使用JackSon的虚拟化,保证在传输对象到队列的时候显示json数据

@Configuration
public class MyRabbitConfig {
    @Bean 
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

测试,创建exchange、binding、queue、message

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimallOrderApplicationTests {
    @Autowired
    AmqpAdmin amqpAdmin;
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void sendMsg() {
        String msg = "hello world";
        rabbitTemplate.convertAndSend("hello.java.exchange","hello.java.exchange",msg);
        log.info("发送的消息是{}",msg);
    }
    @Test
    public void createExchange() {
        amqpAdmin.declareExchange(new DirectExchange("hello.java.exchange",true,false));
        log.info("exchange创建成功hello.java.exchange");
    }
    @Test
    public void createQueue() {
        Queue queue = new Queue("hello.java.queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        log.info("queue创建成功hello.java.queue");
    }
    @Test
    public void createBinding() {
        Binding binding = new Binding("hello.java.queue",
                Binding.DestinationType.QUEUE,
                "hello.java.exchange",
                "hello.java.exchange",
                null
                );
        amqpAdmin.declareBinding(binding);
        log.info("binding成功");
    }

两个重要的注解:

  • @RabbitListener(queues = {“hello.java.queue”})这个注解可以标识在类和方法中
  • @RabbitHandler这个注解只能放在方法上,并且搭配@RabbitListener(queues = {“hello.java.queue”})作为类注解来使用,使用场景是当消息中有不同类型的消息的时候,我们需要让不同的方法接收不同的消息进行处理,这时候我们可是用@RabbitHandler作为方法注解来重载不同的方法,来解决这个问题

消息确认机制

保证消息不丢失,可靠抵达,可以使用事务机制,但是性能会下降250倍,为此可以引入确认机制

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

消息确认机制配置

# RabbitMQ配置
spring.rabbitmq.host=192.168.77.130
spring.rabbitmq.port=5672
# 虚拟主机配置
spring.rabbitmq.virtual-host=/
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@Configuration
public class MyRabbitConfig {
    private RabbitTemplate rabbitTemplate;
    @Primary
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setMessageConverter(messageConverter());
        initRabbitTemplate();
        return rabbitTemplate;
    }
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    /**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     *      1、spring.rabbitmq.publisher-confirms: true
     *      2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     *      1、spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     *
     */
    // @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate() {
        /**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        //设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });
        /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }
}

创建一个订单的延迟队列和普通队列并设置延迟队列的路由键,当消息过期以后路由到取消队列

@Configuration
public class MyRabbitConfig {
    /**
     * 使用JSON序列化机制,进行消息转换
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    // @RabbitListener(queues = "stock.release.stock.queue")
    // public void handle(Message message) {
    //
    // }
    /**
     * 库存服务默认的交换机
     * @return
     */
    @Bean
    public Exchange stockEventExchange() {
        //String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
        TopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false);
        return topicExchange;
    }
    /**
     * 普通队列
     * @return
     */
    @Bean
    public Queue stockReleaseStockQueue() {
        //String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        Queue queue = new Queue("stock.release.stock.queue", true, false, false);
        return queue;
    }
    /**
     * 延迟队列
     * @return
     */
    @Bean
    public Queue stockDelay() {
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "stock-event-exchange");
        arguments.put("x-dead-letter-routing-key", "stock.release");
        // 消息过期时间 2分钟
        arguments.put("x-message-ttl", 120000);
        Queue queue = new Queue("stock.delay.queue", true, false, false,arguments);
        return queue;
    }
    /**
     * 交换机与普通队列绑定
     * @return
     */
    @Bean
    public Binding stockLocked() {
        //String destination, DestinationType destinationType, String exchange, String routingKey,
        //           Map<String, Object> arguments
        Binding binding = new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.release.#",
                null);
        return binding;
    }
    /**
     * 交换机与延迟队列绑定
     * @return
     */
    @Bean
    public Binding stockLockedBinding() {
        return new Binding("stock.delay.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.locked",
                null);
    }
}

向指定交换机和指定队列发送消息

rabbitTemplate.convertAndSend("stock-event-exchange","stock.release",orderEntityTo);

创建监听器监听发来的消息

@Service
@RabbitListener(queues = "order.release.order.queue")
public class OrderCloseListener {
    @Autowired
    OrderService orderService;
    @RabbitHandler
    public void orderClose(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
        try {
            orderService.orderClose(orderEntity);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 Java 网络架构
|
2月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
2月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
307 2
|
2月前
|
消息中间件 Java Maven
|
3月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
173 1
|
3月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
|
3月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
3月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
3月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
10天前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
43 15
下一篇
无影云桌面