【Spring Boot 快速入门】八、Spring Boot集成RabbitMQ

简介: 【Spring Boot 快速入门】八、Spring Boot集成RabbitMQ

初始RabbitMQ


  RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。


RabbitMQ优势


  • 可靠性(Reliability):持久化、传输确认、发布确认。
  • 灵活的路由(Flexible Routing)
  • 消息集群(Clustering)
  • 高可用(Highly Available Queues)
  • 多种协议(Multi-protocol):支持多种消息队列协议,比如 STOMP、MQTT 等等。
  • 多语言客户端(Many Clients):几乎支持所有常用语言Java、.NET、Ruby 等等。
  • 管理界面(Management UI):用户可以监控和管理消息 Broker 的许多方面。
  • 跟踪机制(Tracing)
  • 插件机制(Plugin System):众多查件,方便扩展

RabbitMQ作用


同步变异步


  在用户下单的过程中,比如某东中,下单之后,如果开启微信消息推送和发送邮件。


高内聚低耦合


  当多个系统进行交互时,为了多个业务系统相互独立互不影响,而且可以正常通信,只需推送服务系统去订阅订单服务系统在RabbitMQ上发布的信息,并完成推送服务。


流量削峰


  当大量用户请求服务系统时,若不对用户请求进行数量控制,可能导致服务器崩溃,因此在中间新增一个RabbitMQ消息队列,直接将请求的数据信息放到消息队列中。然后将队列中请求依次发送到业务系统进行业务处理。常用的场景有:秒杀系统,


快速开始


POM


  本次使用Maven进行项目构建,因此引入pom依赖。导入maven依赖需要2个,如下文所示。

<!-- RabbitMQ  strat-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- RabbitMQ  end-->


application.properties


  在application.properties文件当中引入RabbitMQ基本的配置信息,最简单的配置包含:RabbitMQ的主机地址、端口号、基础的用户名和密码。

##rabbitmq 配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin


RabbitMqConfig


  RabbitMQ配置,主要是配置队列,如果提前存在该队列,可以省略本配置类。

  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息的载体,每个消息都会被投到一个或多个队列。 其中durable: 是否持久化,exclusive: 是否排它。
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
  • Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输。

  RabbitMqConfig把交换机,队列,通过路由关键字进行绑定。一个交换机可以绑定多个消息队列,也就是消息通过一个交换机


package com.example.demo.config;
import com.example.demo.constant.RabbitConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
/**
 * @ClassName RabbitMqConfig
 * @Description: RabbitMQ配置
 * @Author  JavaZhan @公众号:Java全栈架构师
 * @Date 2020/6/12
 * @Version V1.0
 **/
@Slf4j
public class RabbitMqConfig {
     @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
        return rabbitTemplate;
    }
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
    /**
     * 队列
     * */
    @Bean
    public Queue testQueue() {
        return new Queue(RabbitConstant.TEST_QUEUE);
    }
    /**
     * 交换机
     * */
    @Bean
    public DirectExchange testExchange() {
        return new DirectExchange(RabbitConstant.TEST_QUEUE_KEE_EXCHANGE,
                true,
                false);
    }
    /**
     * 队列绑定路由和交换机
     * */
    @Bean
    public Binding taskOrderHandleBinding() {
        return BindingBuilder.bind(testQueue()).to(testExchange()).with(RabbitConstant.TEST_QUEUE_KEY);
    }
}


消息消费者


/**
 * @ClassName TestConsumer
 * @Description: 消息消费者
 * @Author JavaZhan @公众号:Java全栈架构师
 * @Date 2020/6/12
 * @Version V1.0
 **/
@Component
public class TestConsumer {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RabbitConstant.TEST_QUEUE),
            key = RabbitConstant.TEST_QUEUE_KEY,
            exchange = @Exchange(value = RabbitConstant.TEST_QUEUE_KEE_EXCHANGE)
    ))
    @RabbitHandler()
    public void getMsg(String messageBody, Message message, Channel channel) throws Exception {
        System.out.println(messageBody);
    }
}


消息生产者


/**
 * @ClassName 消息生产者
 * @Description: TODO
 * @Author JavaZhan @公众号:Java全栈架构师
 * @Date 2020/6/12
 * @Version V1.0
 **/
@Component
public class TestSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void send(String str){
        rabbitTemplate.convertAndSend(RabbitConstant.TEST_QUEUE_KEE_EXCHANGE,RabbitConstant.TEST_QUEUE_KEY,str);
    }
}


RabbitConstant


RabbitMq的基本配置信息,里面可以包含多个交换机、队列和路由。

package com.example.demo.constant;
/**
 * @ClassName RabbitConstant
 * @Description: TODO
 * @Author JavaZhan @公众号:Java全栈架构师
 * @Date 2020/6/12
 * @Version V1.0
 **/
public interface RabbitConstant {
    /**
     * 测试队列
     * */
    String  TEST_QUEUE ="TEST_QUEUE";
    /**
     * 测试路由
     * */
    String  TEST_QUEUE_KEY ="TEST_QUEUE_KEY";
    /**
     * 测试交换机
     * */
    String  TEST_QUEUE_KEE_EXCHANGE ="TEST_QUEUE_KEY_EXCHANGE";
}


测试类


@Test
    void testMqSendMsg(){
        for(int i =0 ;i<20;i++){
            testSender.send("这是消息"+i+"这是一个测试的消息!来自【掘金,小阿杰】");
        }
    }


启动本地MQ


如下图所示,本地已经那种RabbitMQ,启动并登陆访问正常。


image.png


运行测试用例之后,输出


image.png


好了,一个简单的Spring Boot集成RabbitMQ就搭建完成了。


结语


  这样RabbitMQ与Spring Boot集成成功啦。更多的测试大家可以深入研究一下RabbitMQ 集群配置的高可用性。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
27天前
|
XML Java 应用服务中间件
Spring Boot 两种部署到服务器的方式
本文介绍了Spring Boot项目的两种部署方式:jar包和war包。Jar包方式使用内置Tomcat,只需配置JDK 1.8及以上环境,通过`nohup java -jar`命令后台运行,并开放服务器端口即可访问。War包则需将项目打包后放入外部Tomcat的webapps目录,修改启动类继承`SpringBootServletInitializer`并调整pom.xml中的打包类型为war,最后启动Tomcat访问应用。两者各有优劣,jar包更简单便捷,而war包适合传统部署场景。需要注意的是,war包部署时,内置Tomcat的端口配置不会生效。
203 17
Spring Boot 两种部署到服务器的方式
|
4月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
1月前
|
缓存 安全 Java
Spring Boot 3 集成 Spring Security + JWT
本文详细介绍了如何使用Spring Boot 3和Spring Security集成JWT,实现前后端分离的安全认证概述了从入门到引入数据库,再到使用JWT的完整流程。列举了项目中用到的关键依赖,如MyBatis-Plus、Hutool等。简要提及了系统配置表、部门表、字典表等表结构。使用Hutool-jwt工具类进行JWT校验。配置忽略路径、禁用CSRF、添加JWT校验过滤器等。实现登录接口,返回token等信息。
348 12
|
1月前
|
存储 安全 Java
Spring Boot 3 集成Spring AOP实现系统日志记录
本文介绍了如何在Spring Boot 3中集成Spring AOP实现系统日志记录功能。通过定义`SysLog`注解和配置相应的AOP切面,可以在方法执行前后自动记录日志信息,包括操作的开始时间、结束时间、请求参数、返回结果、异常信息等,并将这些信息保存到数据库中。此外,还使用了`ThreadLocal`变量来存储每个线程独立的日志数据,确保线程安全。文中还展示了项目实战中的部分代码片段,以及基于Spring Boot 3 + Vue 3构建的快速开发框架的简介与内置功能列表。此框架结合了当前主流技术栈,提供了用户管理、权限控制、接口文档自动生成等多项实用特性。
82 8
|
6月前
|
消息中间件 Java 网络架构
|
2月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
47 6
|
3月前
|
存储 运维 安全
Spring运维之boot项目多环境(yaml 多文件 proerties)及分组管理与开发控制
通过以上措施,可以保证Spring Boot项目的配置管理在专业水准上,并且易于维护和管理,符合搜索引擎收录标准。
79 2
|
4月前
|
SQL JSON Java
mybatis使用三:springboot整合mybatis,使用PageHelper 进行分页操作,并整合swagger2。使用正规的开发模式:定义统一的数据返回格式和请求模块
这篇文章介绍了如何在Spring Boot项目中整合MyBatis和PageHelper进行分页操作,并且集成Swagger2来生成API文档,同时定义了统一的数据返回格式和请求模块。
138 1
mybatis使用三:springboot整合mybatis,使用PageHelper 进行分页操作,并整合swagger2。使用正规的开发模式:定义统一的数据返回格式和请求模块
|
3月前
|
消息中间件 监控 Java
您是否已集成 Spring Boot 与 ActiveMQ?
您是否已集成 Spring Boot 与 ActiveMQ?
78 0
|
4月前
|
缓存 NoSQL Java
Springboot自定义注解+aop实现redis自动清除缓存功能
通过上述步骤,我们不仅实现了一个高度灵活的缓存管理机制,还保证了代码的整洁与可维护性。自定义注解与AOP的结合,让缓存清除逻辑与业务逻辑分离,便于未来的扩展和修改。这种设计模式非常适合需要频繁更新缓存的应用场景,大大提高了开发效率和系统的响应速度。
118 2

热门文章

最新文章