Spring Boot 中使用 RabbitMQ

简介: RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
AMQP,即Advanced message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
常用概念
通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。
准备
环境安装
任选其一
CentOs7.3 搭建 RabbitMQ 3.6 单机服务与使用
http://www.ymq.io/2017/08/16/rabbit-install
CentOs7.3 搭建 RabbitMQ 3.6 Cluster 集群服务与使用
http://www.ymq.io/2017/08/17/rabbit-install-cluster
Github 代码
代码我已放到 Github ,导入spring-boot-rabbitmq 项目
github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rabbitmq

添加依赖
在项目中添加 spring-boot-starter-amqp 依赖

org.springframework.bootspring-boot-starter-amqp

参数配置

spring.application.name=ymq-rabbitmq-spring-boot

spring.rabbitmq.host=10.4.98.15
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

交换机(Exchange)
1.Direct Exchange 根据route key 直接找到队列
2.Topic Exchange 根据route key 匹配队列
3.Topic Exchange 不处理route key 全网发送,所有绑定的队列都发送
Direct Exchange

Direct Exchange 是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。
任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。
1.一般情况可以使用rabbitMQ自带的Exchange:""(该Exchange的名字为空字符串,下文称其为default Exchange)。
2.这种模式下不需要将Exchange进行任何绑定(binding)操作
3.消息传递时需要一个RouteKey,可以简单的理解为要发送到的队列名字。
4.如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。
配置队列

@Configurationpublic class RabbitDirectConfig {    @Bean     public Queue helloQueue() {        return new Queue("hello");     }    @Bean     public Queue directQueue() {        return new Queue("direct");     }    //-------------------配置默认的交换机模式,可以不需要配置以下-----------------------------------     @Bean     DirectExchange directExchange() {        return new DirectExchange("directExchange");     }    //绑定一个key "direct",当消息匹配到就会放到这个队列中     @Bean     Binding bindingExchangeDirectQueue(Queue directQueue, DirectExchange directExchange) {        return BindingBuilder.bind(directQueue).to(directExchange).with("direct");     }    // 推荐使用 helloQueue() 方法写法,这种方式在 Direct Exchange 模式 多此一举,没必要这样写     //---------------------------------------------------------------------------------------------}

监听队列

@Component@RabbitListener(queues = "hello")public class helloReceiver {    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 helloReceiver," + message);
    }
}
@Component@RabbitListener(queues = "direct")public class DirectReceiver {    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 DirectReceiver," + message);
    }
}

发送消息

package io.ymq.rabbitmq.test;import io.ymq.rabbitmq.run.Startup;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;/**
 * 描述: 默认的交换机模式
 *
 * @author: yanpenglei
 * @create: 2017/10/25 1:03
 */@RunWith(SpringRunner.class)@SpringBootTest(classes = Startup.class)public class RabbitDirectTest {    @Autowired
    private AmqpTemplate rabbitTemplate;    @Test
    public void sendHelloTest() {

        String context = "此消息在,默认的交换机模式队列下,有 helloReceiver 可以收到";

        String routeKey = "hello";

        context = "routeKey:" + routeKey + ",context:" + context;

        System.out.println("sendHelloTest : " + context);        this.rabbitTemplate.convertAndSend(routeKey, context);
    }    @Test
    public void sendDirectTest() {

        String context = "此消息在,默认的交换机模式队列下,有 DirectReceiver 可以收到";

        String routeKey = "direct";

        String exchange = "directExchange";

        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

        System.out.println("sendDirectTest : " + context);        // 推荐使用 sendHello() 方法写法,这种方式在 Direct Exchange 多此一举,没必要这样写
        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
    }
}

按顺序执行:响应

接收者 helloReceiver,routeKey:hello,context:此消息在,默认的交换机模式队列下,有 helloReceiver 可以收到

接收者 DirectReceiver,context:directExchange,routeKey:direct,context:此消息在,默认的交换机模式队列下,有 DirectReceiver 可以收到

Fanout Exchange

任何发送到Fanout Exchange 的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
1.可以理解为路由表的模式
2.这种模式不需要 RouteKey
3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
配置队列

@Configurationpublic class RabbitFanoutConfig {    final static String PENGLEI = "fanout.penglei.net";    final static String SOUYUNKU = "fanout.souyunku.com";    @Bean     public Queue queuePenglei() {        return new Queue(RabbitFanoutConfig.PENGLEI);     }    @Bean     public Queue queueSouyunku() {        return new Queue(RabbitFanoutConfig.SOUYUNKU);     }    /**      * 任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有队列上。      */     @Bean     FanoutExchange fanoutExchange() {        return new FanoutExchange("fanoutExchange");     }    @Bean     Binding bindingExchangeQueuePenglei(Queue queuePenglei, FanoutExchange fanoutExchange) {        return BindingBuilder.bind(queuePenglei).to(fanoutExchange);     }    @Bean     Binding bindingExchangeQueueSouyunku(Queue queueSouyunku, FanoutExchange fanoutExchange) {        return BindingBuilder.bind(queueSouyunku).to(fanoutExchange);     } }

监听队列

@Component@RabbitListener(queues = "fanout.penglei.net")public class FanoutReceiver1 {    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 FanoutReceiver1," + message);
    }
}
@Component@RabbitListener(queues = "fanout.souyunku.com")public class FanoutReceiver2 {    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 FanoutReceiver2," + message);
    }
}

发送消息

package io.ymq.rabbitmq.test;import io.ymq.rabbitmq.run.Startup;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;/**
 * 描述: 广播模式或者订阅模式队列
 *
 * @author: yanpenglei
 * @create: 2017/10/25 1:08
 */@RunWith(SpringRunner.class)@SpringBootTest(classes = Startup.class)public class RabbitFanoutTest {    @Autowired
    private AmqpTemplate rabbitTemplate;    @Test
    public void sendPengleiTest() {

        String context = "此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到";

        String routeKey = "topic.penglei.net";

        String exchange = "fanoutExchange";

        System.out.println("sendPengleiTest : " + context);

        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
    }    @Test
    public void sendSouyunkuTest() {

        String context = "此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到";

        String routeKey = "topic.souyunku.com";

        String exchange = "fanoutExchange";

        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

        System.out.println("sendSouyunkuTest : " + context);        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
    }
}

按顺序执行:响应

接收者 FanoutReceiver1,context:fanoutExchange,routeKey:topic.penglei.net,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到
接收者 FanoutReceiver2,context:fanoutExchange,routeKey:topic.penglei.net,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到


接收者 FanoutReceiver2,context:fanoutExchange,routeKey:topic.souyunku.com,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到
接收者 FanoutReceiver1,context:fanoutExchange,routeKey:topic.souyunku.com,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到

Topic Exchange

任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上
1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个标题``(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
2.这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
3.在进行绑定时,要提供一个该队列关心的主题,如#.log.#表示该队列关心所有涉及log的消息(一个RouteKey为MQ.log.error的消息会被转发到该队列)。
4.#表示0个或若干个关键字,表示一个关键字。如topic.能与topic.warn匹配,无法与topic.warn.timeout匹配;但是topic.#能与上述两者匹配。
5.同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。

配置队列

@Configurationpublic class RabbitTopicConfig {    final static String MESSAGE = "topic.message";    final static String MESSAGES = "topic.message.s";    final static String YMQ = "topic.ymq";    @Bean     public Queue queueMessage() {        return new Queue(RabbitTopicConfig.MESSAGE);     }    @Bean     public Queue queueMessages() {        return new Queue(RabbitTopicConfig.MESSAGES);     }    @Bean     public Queue queueYmq() {        return new Queue(RabbitTopicConfig.YMQ);     }    /**      * 交换机(Exchange) 描述:接收消息并且转发到绑定的队列,交换机不存储消息      */     @Bean     TopicExchange topicExchange() {        return new TopicExchange("topicExchange");     }    //綁定队列 queueMessages() 到 topicExchange 交换机,路由键只接受完全匹配 topic.message 的队列接受者可以收到消息     @Bean     Binding bindingExchangeMessage(Queue queueMessage, TopicExchange topicExchange) {        return BindingBuilder.bind(queueMessage).to(topicExchange).with("topic.message");     }    //綁定队列 queueMessages() 到 topicExchange 交换机,路由键只要是以 topic.message 开头的队列接受者可以收到消息     @Bean     Binding bindingExchangeMessages(Queue queueMessages, TopicExchange topicExchange) {        return BindingBuilder.bind(queueMessages).to(topicExchange).with("topic.message.#");     }    //綁定队列 queueYmq() 到 topicExchange 交换机,路由键只要是以 topic 开头的队列接受者可以收到消息     @Bean     Binding bindingExchangeYmq(Queue queueYmq, TopicExchange topicExchange) {        return BindingBuilder.bind(queueYmq).to(topicExchange).with("topic.#");     } }

监听队列

@Component@RabbitListener(queues = "topic.message")public class TopicReceiver1 {    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 TopicReceiver1," + message);
    }

}
@Component@RabbitListener(queues = "topic.message.s")public class TopicReceiver2 {    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 TopicReceiver2," + message);
    }

}
@Component@RabbitListener(queues = "topic.ymq")public class TopicReceiver3 {    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 TopicReceiver3," + message);
    }

}

发送消息

package io.ymq.rabbitmq.test;import io.ymq.rabbitmq.run.Startup;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;/**
 * 描述: 配置转发消息模式队列
 *
 * @author: yanpenglei
 * @create: 2017/10/25 1:20
 */@RunWith(SpringRunner.class)@SpringBootTest(classes = Startup.class)public class RabbitTopicTest {    @Autowired
    private AmqpTemplate rabbitTemplate;    @Test
    public void sendMessageTest() {

        String context = "此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到";

        String routeKey = "topic.message";

        String exchange = "topicExchange";

        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

        System.out.println("sendMessageTest : " + context);        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
    }    @Test
    public void sendMessagesTest() {


        String context = "此消息在,配置转发消息模式队列下,有  TopicReceiver2 TopicReceiver3 可以收到";

        String routeKey = "topic.message.s";

        String exchange = "topicExchange";

        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

        System.out.println("sendMessagesTest : " + context);        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
    }    @Test
    public void sendYmqTest() {

        String context = "此消息在,配置转发消息模式队列下,有 TopicReceiver3 可以收到";

        String routeKey = "topic.ymq";

        String exchange = "topicExchange";

        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

        System.out.println("sendYmqTest : " + context);        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
    }
}

按顺序执行:响应

接收者 TopicReceiver2,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到
接收者 TopicReceiver1,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到
接收者 TopicReceiver3,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到


接收者 TopicReceiver3,context:topicExchange,routeKey:topic.message.s,context:此消息在,配置转发消息模式队列下,有  TopicReceiver2 TopicReceiver3 可以收到
接收者 TopicReceiver2,context:topicExchange,routeKey:topic.message.s,context:此消息在,配置转发消息模式队列下,有  TopicReceiver2 TopicReceiver3 可以收到


接收者 TopicReceiver3,context:topicExchange,routeKey:topic.ymq,context:此消息在,配置转发消息模式队列下,有 TopicReceiver3 可以收到

更多参考内容:http://www.roncoo.com/article/index?tn=SpringBoot

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
1114 1
|
2月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
214 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
2月前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
191 2
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
6月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
237 32
|
5月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
930 0
|
10月前
|
XML Java 应用服务中间件
Spring Boot 两种部署到服务器的方式
本文介绍了Spring Boot项目的两种部署方式:jar包和war包。Jar包方式使用内置Tomcat,只需配置JDK 1.8及以上环境,通过`nohup java -jar`命令后台运行,并开放服务器端口即可访问。War包则需将项目打包后放入外部Tomcat的webapps目录,修改启动类继承`SpringBootServletInitializer`并调整pom.xml中的打包类型为war,最后启动Tomcat访问应用。两者各有优劣,jar包更简单便捷,而war包适合传统部署场景。需要注意的是,war包部署时,内置Tomcat的端口配置不会生效。
2549 17
Spring Boot 两种部署到服务器的方式
|
8月前
|
Java 数据库 微服务
微服务——SpringBoot使用归纳——Spring Boot中的项目属性配置——指定项目配置文件
在实际项目中,开发环境和生产环境的配置往往不同。为简化配置切换,可通过创建 `application-dev.yml` 和 `application-pro.yml` 分别管理开发与生产环境配置,如设置不同端口(8001/8002)。在 `application.yml` 中使用 `spring.profiles.active` 指定加载的配置文件,实现环境快速切换。本节还介绍了通过配置类读取参数的方法,适用于微服务场景,提升代码可维护性。课程源码可从 [Gitee](https://gitee.com/eson15/springboot_study) 下载。
351 0
|
11月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
260 6
下一篇
oss云网关配置