Spring Boot整合rabbitmq

简介:

Spring Boot整合rabbitmq

rabbitmq的基本概念和其它相关知识请自主去官网学习
rabbitmq官网,
本文只介绍rabbitmq在springboot中如何使用

添加依赖包

    <!--rabbitmq客户端 start-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.2.7.RELEASE</version>
    </dependency>
    <!--rabbitmq客户端 end-->

添加配置文件 application.yml
spring:
rabbitmq: #rabbit连接配置信息

host: 39.97.234.52
port: 5672
username: admin
password: admin
virtual-host:  /vhost_1

rabbitmq五种模式的使用

  1. 简单队列
    创建消费者

@Component
public class MQ {

/**
 * 简单队列
 * autoDelete = "true"  表示没有生产者和消费者连接时自动删除
 * durable = "true"  表示队列持久化,默认就是 true
 * @param msg
 */
@RabbitListener(queuesToDeclare = @Queue(value = "simpleQueue",autoDelete = "true", durable = "true"))
public void simpleQueue(String msg) {
    System.out.println("接收  " + msg);
}

}
创建生产者
@SpringBootTest
public class RabbitTest {

@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void simpleQueue() {
    rabbitTemplate.convertAndSend("simpleQueue", "this is simpleQueue")
    System.out.println("simple success");
}

}

  1. 工作队列,实现了能者多劳
    创建消费者

@Component
public class MQ {

/**
 * 工作队列,多个消费者消费一个队列
 * <p>
 * AMQP默认实现消费者确认模式,原文如下
 * It's a common mistake to miss the basicAck and Spring AMQP helps to avoid this through its default configuration.
 * The consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery),
 * but RabbitMQ will eat more and more memory as it won't be able to release any unacked messages.
 * <p>
 * Fair dispatch vs Round-robin dispatching
 * 官网说: AMQP默认实现消费者fair转发,也就是能者多劳,原文如下(应该是说反了,默认的是250,但是是Round-robin dispatching)
 * However, "Fair dispatch" is the default configuration for Spring AMQP.
 * The AbstractMessageListenerContainer defines the value for DEFAULT_PREFETCH_COUNT to be 250.
 * If the DEFAULT_PREFETCH_COUNT were set to 1 the behavior would be the round robin delivery as described above.
 */
//设置消费者的确认机制,并达到能者多劳的效果
@Bean("workListenerFactory")
public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory containerFactory =
            new SimpleRabbitListenerContainerFactory();
    containerFactory.setConnectionFactory(connectionFactory);
    //自动ack,没有异常的情况下自动发送ack
    //auto  自动确认,默认是auto
    //MANUAL  手动确认
    //none  不确认,发完自动丢弃
    containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
    //拒绝策略,true回到队列 false丢弃,默认是true
    containerFactory.setDefaultRequeueRejected(true);

    //默认的PrefetchCount是250,采用Round-robin dispatching,效率低
    //setPrefetchCount 为 1,即可启用fair 转发
    containerFactory.setPrefetchCount(1);
    return containerFactory;
}

/**
 * 若不使用自定义containerFactory = "workListenerFactory",默认的轮询消费效率低
 *
 * @param s
 */
@RabbitListener(queuesToDeclare = @Queue("workQueue"), containerFactory = "workListenerFactory")
public void workQueue1(String s) {
    System.out.println("workQueue 1  " + s);
}

@RabbitListener(queuesToDeclare = @Queue("workQueue"), containerFactory = "workListenerFactory")
public void workQueue2(String s) throws InterruptedException {
    Thread.sleep(1000);
    System.out.println("workQueue 2  " + s);
}

}
创建生产者
@SpringBootTest
public class RabbitTest {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void workQueue() {
    for (int i = 0; i < 10; i++) {
        rabbitTemplate.convertAndSend("workQueue", i);
    }
    System.out.println("workQueue success");
}

}

  1. 订阅模式
    创建消费者

@Component
public class MQ {

/**
 * 订阅模式  fanout
 */
@RabbitListener(bindings = {
        @QueueBinding(value = @Queue,  //临时路由
                exchange = @Exchange(value = "exchange1", type = ExchangeTypes.FANOUT))
})
public void fanout(String s) {
    System.out.println("订阅模式1    " + s);
}

@RabbitListener(bindings = {
        @QueueBinding(value = @Queue, exchange = @Exchange(value = "exchange1", type = ExchangeTypes.FANOUT))
})
public void fanout2(String s) {
    System.out.println("订阅模式2    " + s);
}

}
创建生产者
@SpringBootTest
public class RabbitTest {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void fanOut() {
    rabbitTemplate.convertAndSend("exchange1", "", "fan out......");
}

}

  1. 路由模式
    创建消费者

@Component
public class MQ {

    /**
     * 路由模式  DIRECT
     */
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,  //临时路由
                    exchange = @Exchange(value = "exchange2", type = ExchangeTypes.DIRECT),
                    key = {"error", "info"}  //路由键
            )
    })
    public void router(String s) {
        System.out.println("路由模式1    " + s);
    }

    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    exchange = @Exchange(value = "exchange2", type = ExchangeTypes.DIRECT),
                    key = {"error"}  //路由键
            )
    })
    public void router2(String s) {
        System.out.println("路由模式2    " + s);
    }

}
创建生产者
@SpringBootTest
public class RabbitTest {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void router() {
    rabbitTemplate.convertAndSend("exchange2", "info", "router");
    System.out.println("router");
}

}

  1. 主题模式
    创建消费者

@Component
public class MQ {

        /**
         * topic  topics
         */
        @RabbitListener(bindings = {
                @QueueBinding(value = @Queue,
                        exchange = @Exchange(value = "exchange3", type = ExchangeTypes.TOPIC),
                        key = {"user.#"}  //路由键
                )
        })
        public void topic2(String s) {
            System.out.println("topic2    " + s);
        }

}
创建生产者
@SpringBootTest
public class RabbitTest {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void topic() {
    rabbitTemplate.convertAndSend("exchange3", "user.name", "hhh");
    System.out.println("topic");
}

}
默认消息是持久化的,也可以设置不持久化,以简单队列示例
@SpringBootTest
public class RabbitTest {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
 * AMQP 默认消息是持久化的,但只有在队列也是持久化时才有作用,原文如下:
 * Messages are persistent by default with Spring AMQP.
 * Note the queue the message will end up in needs to be durable as well,
 * otherwise the message will not survive a broker restart as a non-durable queue does not itself survive a restart.
 * <p>
 * MessageProperties类中源码如下:
 * static {
 * DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
 * DEFAULT_PRIORITY = 0;
 * }
 * <p>
 * 如何设置消息不持久化?
 * 设置消息不持久化,默认是持久化的,这里只为记录如何设置消息不持久化,一般不设置
 * 发送消息时,添加 MessagePostProcessor即可,这里使用 lambda 表达式
 * (message) -> {
 * message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
 * return message;
 * }
 * <p>
 * 完整示例如下:
 * rabbitTemplate.convertAndSend("simpleQueue", "this is simpleQueue",
 * (message) -> {
 * message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
 * return message;
 * });
 */
@Test
public void simpleQueue() {
    rabbitTemplate.convertAndSend("simpleQueue", "this is simpleQueue",
            (message) -> {
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                return message;
            });
    System.out.println("simple success");
}

}
如何设置生产者消息确认,避免消息发送失败而丢失(确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。)
参考资料:https://www.cnblogs.com/wangiqngpei557/p/9381478.html

在配置文件中添加
spring:
rabbitmq: #rabbit连接配置信息

publisher-returns: true             #开启消息从 交换机----》队列发送失败的回调
publisher-confirm-type: correlated  #开启消息从 生产者----》交换机的回调

添加配置类
@Component
public class ProducerConfig {

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void pre() {
    /**
     *
     *  消息发送到交换机的回调
     *
     *   public void confirm(CorrelationData correlationData, boolean b, String s) {
     *
     *         System.out.println("消息唯一标识:"+correlationData);
     *         System.out.println("确认结果:"+ b);
     *         System.out.println("失败原因:"+ s);
     *    }
     *
     */
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        System.out.println("setConfirmCallback-------------------");
        System.out.println("correlationData:  " + correlationData);
        System.out.println(ack);
        System.out.println(cause);
        if (ack) {
            System.out.println("发送成功");
        } else {
            System.out.println("发送失败");
            // 可以记录下来,也可以重新发送消息。。。
        }

    });

    /**
     *
     * 消息从交换机发送到队列的回调,只有发送失败时才会回调
     *  public void returnedMessage(Message message, int i, String s, String s1, String s2) {
     *         System.out.println("消息主体 message : "+message);
     *         System.out.println("消息主体 message : "+ i);
     *         System.out.println("描述:"+ s);
     *         System.out.println("消息使用的交换器 exchange : "+ s1);
     *         System.out.println("消息使用的路由键 routing : "+ s2);
     *    }
     */
    rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
        System.out.println("setReturnCallback---------------------");
        System.out.println("消息主体 message : " + message);
        System.out.println("响应码 replyCode: " + replyCode);
        System.out.println("响应内容 replyText:" + replyText);
        System.out.println("消息使用的交换器 exchange : " + exchange);
        System.out.println("消息使用的路由键 routeKey : " + routingKey);

        //也可以重新发送消息
        rabbitTemplate.convertAndSend(exchange, routingKey, new String(message.getBody()));
        System.out.println("重新发送消息: -----" + new String(message.getBody()));
    });

    /**
     * 网上都说必须设置rabbitTemplate.setMandatory(true),才能触发ReturnCallback回调,
     * 我尝试了一下,并不需要设置为true,交换机发送消息给队列失败时,也能触发回调
     */
    //rabbitTemplate.setMandatory(true);
}

}
代码github地址:https://github.com/1612480331/Spring-Boot-rabbitmq

原文地址https://www.cnblogs.com/yloved/p/12864289.html

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
2497 1
|
7月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
473 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
7月前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
379 2
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
11月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
336 32
|
10月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
2070 0
|
XML Java 应用服务中间件
Spring Boot 两种部署到服务器的方式
本文介绍了Spring Boot项目的两种部署方式:jar包和war包。Jar包方式使用内置Tomcat,只需配置JDK 1.8及以上环境,通过`nohup java -jar`命令后台运行,并开放服务器端口即可访问。War包则需将项目打包后放入外部Tomcat的webapps目录,修改启动类继承`SpringBootServletInitializer`并调整pom.xml中的打包类型为war,最后启动Tomcat访问应用。两者各有优劣,jar包更简单便捷,而war包适合传统部署场景。需要注意的是,war包部署时,内置Tomcat的端口配置不会生效。
3073 17
Spring Boot 两种部署到服务器的方式
|
Java 数据库 微服务
微服务——SpringBoot使用归纳——Spring Boot中的项目属性配置——指定项目配置文件
在实际项目中,开发环境和生产环境的配置往往不同。为简化配置切换,可通过创建 `application-dev.yml` 和 `application-pro.yml` 分别管理开发与生产环境配置,如设置不同端口(8001/8002)。在 `application.yml` 中使用 `spring.profiles.active` 指定加载的配置文件,实现环境快速切换。本节还介绍了通过配置类读取参数的方法,适用于微服务场景,提升代码可维护性。课程源码可从 [Gitee](https://gitee.com/eson15/springboot_study) 下载。
587 0
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
372 6

热门文章

最新文章