SpringBoot 整合 AMQP(RabbitMQ)

简介: SpringBoot整合AMQP(RabbitMQ) 添加pom依赖 org.springframework.boot spring-boot-starter-amqp application.

SpringBoot整合AMQP(RabbitMQ)

  1. 添加pom依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  2. application.properties配置

    spring.rabbitmq.host=***.***.***.***
    spring.rabbitmq.port=5762
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=***
  3. 在RabbitMQ中所有的消息生产者提交的消息都会交由Exchange进行再分配,Exchange会根据不同的策略将消息分发到不同的Queue中。RabbitMQ提供了4种不同策略,分别是Direct、Fanout、Topic、Header,4种策略中前三种使用率较高

    • Direct

      DirectExchange的路由策略是将消息队列绑定到一个DirectExchange上,但一条消息到达DirectExchange时会被转发到与该条消息routing key相同的Queue上

      • DirectExchange的配置如下:

        /**
         * @author wsyjlly
         * @create 2019.07.17 - 21:33
         **/
        @Configuration
        public class RabbitDirectConfig {
            public final static String DIRECTNAME = "ysw-direct";
            @Bean
            Queue queue1(){
                return new Queue("queue-direct1");
            }
        
            @Bean
        
            Queue queue2(){
                return new Queue("queue-direct2");
            }
        
            @Bean
            Queue queue3(){
                return new Queue("queue-direct3");
            }
        
            @Bean
            DirectExchange directExchange(){
                return new DirectExchange(DIRECTNAME,true,false);
            }
        
            @Bean
            Binding binding1(){
                return BindingBuilder.bind(queue1()).to(directExchange()).with("direct1");
            }
        
            @Bean
            Binding binding2(){
                return BindingBuilder.bind(queue2()).to(directExchange()).with("direct2");
            }
        }

        DirectExchange和Binding两个Bean的配置可以省略掉,即如果使用DirectExchange,之配置一个Queue的实例即可

      • 配置消费者

        /**
         * @author wsyjlly
         * @create 2019.07.17 - 21:42
         **/
        @Component
        public class DirectReceiver {
            Logger logger= LoggerFactory.getLogger(getClass());
        
            @RabbitListener(queues = "queue-direct1")
            public void directHandler1(String msg){
                logger.info("\033[30;4m"+"queue-direct1:"+msg+"\033[0m");
            }
        
            @RabbitListener(queues = "queue-direct2")
            public void directHandler2(String msg){
                logger.info("\033[30;4m"+"queue-direct2:"+msg+"\033[0m");
            }
        
            @RabbitListener(queues = "queue-direct3")
            public void directHandler3(String msg){
                logger.info("\033[30;4m"+"queue-direct3:"+msg+"\033[0m");
            }
        }

        通过@RabbitListener注解指定一个方法是一个消费者方法,方法参数就是所接收的消息。

      • 消息发送

        通过注入RabbitTemplate对象来进行消息发送,在这里我通过定时任务使其自定发送,须开启定时任务,详细操作可查看一节

        /**
         * @author wsyjlly
         * @create 2019.07.18 - 1:13
         **/
        @Component
        public class RabbitmqSchedule {
            @Autowired
            RabbitTemplate rabbitTemplate;
            Logger logger = LoggerFactory.getLogger(getClass());
        
            @Scheduled(fixedDelay = 5000,initialDelay = 3000)
            public void direct(){
                String message = "direct-task";
                logger.info("\033[30;4m"+message+"\033[0m");
                rabbitTemplate.convertAndSend("ysw-direct","direct1",message);
                rabbitTemplate.convertAndSend("ysw-direct","direct2",message);
                rabbitTemplate.convertAndSend("queue-direct3",message);
            }
        }

    • Fanout

      FanoutExchange的数据交换策略是把所有到达FanoutExchange的消息转发给所有与他绑定的Queue,在这种策略中,routingkey将不起作用。

      • FanoutExchange的配置方式如下:

        /**
         * @author wsyjlly
         * @create 2019.07.17 - 21:33
         **/
        @Configuration
        public class RabbitFanoutConfig {
            public final static String FANOUTNAME = "ysw-fanout";
        
            @Bean
            Queue queue4(){
                return new Queue("queue-fanout1");
            }
        
            @Bean
            Queue queue5(){
                return new Queue("queue-fanout2");
            }
        
            @Bean
            Queue queue6(){
                return new Queue("queue-fanout3");
            }
        
            @Bean
            FanoutExchange fanoutExchange(){
                return new FanoutExchange(FANOUTNAME,true,false);
            }
        
            @Bean
            Binding binding4(){
                return BindingBuilder.bind(queue4()).to(fanoutExchange());
            }
        
            @Bean
            Binding binding5(){
                return BindingBuilder.bind(queue5()).to(fanoutExchange());
            }
        
            @Bean
            Binding binding6(){
                return BindingBuilder.bind(queue6()).to(fanoutExchange());
            }
        }
      • 配置消费者

        /**
         * @author wsyjlly
         * @create 2019.07.17 - 21:42
         **/
        @Component
        public class FanoutReceiver {
            Logger logger= LoggerFactory.getLogger(getClass());
        
            @RabbitListener(queues = "queue-fanout1")
            public void fanoutHandler1(String msg){
                logger.info("\033[31;4m"+"queue-fanout1:"+msg+"\033[0m");
            }
        
            @RabbitListener(queues = "queue-fanout2")
            public void fanoutHandler2(String msg){
                logger.info("\033[31;4m"+"queue-fanout2:"+msg+"\033[0m");
            }
        
            @RabbitListener(queues = "queue-fanout3")
            public void fanoutHandler3(String msg){
                logger.info("\033[31;4m"+"queue-fanout3:"+msg+"\033[0m");
            }
        }
      • 消息发送

        /**
         * @author wsyjlly
         * @create 2019.07.18 - 1:13
         **/
        @Component
        public class RabbitmqSchedule {
            @Autowired
            RabbitTemplate rabbitTemplate;
            Logger logger = LoggerFactory.getLogger(getClass());
        
            Scheduled(fixedDelay = 5000,initialDelay = 4000)
            public void fanout(){
                String message = "fanout-task";
                logger.info("\033[31;4m"+message+"\033[0m");
                rabbitTemplate.convertAndSend("ysw-fanout",null,message);
            }
        }

    • Topic

      TopicExchange是比较复杂也比较灵活的一种路由策略,在TopicExchange中,Queue通过routingkey绑定到TopicExchange上,当消息发送到TopicExchange后,TopicExchange根据消息的routingkey将消息路由到一个或多个Queue上。

      • TopicExchange配置如下:

        /**
         * @author wsyjlly
         * @create 2019.07.17 - 21:33
         **/
        @Configuration
        public class RabbitTopicConfig {
            public final static String TOPIC_NAME = "ysw-topic";
            @Bean
            Queue queue7(){
                return new Queue("queue-topic1");
            }
        
            @Bean
            Queue queue8(){
                return new Queue("queue-topic2");
            }
        
            @Bean
            Queue queue9(){
                return new Queue("queue-topic3");
            }
        
            @Bean
            TopicExchange topicExchange(){
                return new TopicExchange(TOPIC_NAME,true,false);
            }
        
            @Bean
            Binding binding7(){
                 /*
                 * 匹配规则
                 * 绑定键binding key也必须是这种形式。以特定路由键发送的消息将会发送到所有绑定键与之匹配的队列中。但绑定键有两种特殊的情况:
                 * 绑定键binding key也必须是这种形式。以特定路由键发送的消息将会发送到所有绑定键与之匹配的队列中。但绑定键有两种特殊的情况:
                 * ①*(星号)仅代表一个单词
                 * ②#(井号)代表任意个单词
                 **/
                return BindingBuilder.bind(queue7()).to(topicExchange()).with("#.topic1");
            }
        
            @Bean
            Binding binding8(){
                return BindingBuilder.bind(queue8()).to(topicExchange()).with("topic2.#");
            }
        
            @Bean
            Binding binding9(){
                return BindingBuilder.bind(queue9()).to(topicExchange()).with("#.topic3.*");
            }
        }
      • 配置消费者

        /**
         * @author wsyjlly
         * @create 2019.07.17 - 21:42
         **/
        @Component
        public class TopicReceiver {
            Logger logger= LoggerFactory.getLogger(getClass());
        
            @RabbitListener(queues = "queue-topic1")
            public void topicHandler1(String msg){
                logger.info("\033[32;4m"+"queue-topic1:"+msg+"\033[0m");
            }
        
            @RabbitListener(queues = "queue-topic2")
            public void topicHandler2(String msg){
                logger.info("\033[32;4m"+"queue-topic2:"+msg+"\033[0m");
            }
        
            @RabbitListener(queues = "queue-topic3")
            public void topicHandler3(String msg){
                logger.info("\033[32;4m"+"queue-topic3:"+msg+"\033[0m");
            }
        }
      • 消息发送

        /**
         * @author wsyjlly
         * @create 2019.07.18 - 1:13
         **/
        @Component
        public class RabbitmqSchedule {
            @Autowired
            RabbitTemplate rabbitTemplate;
            Logger logger = LoggerFactory.getLogger(getClass());
        
            @Scheduled(cron = "0-30/6 * * * * ?")
            public void topic(){
                String message = "topic-task";
                int i = 0;
                logger.info("\033[32;4m"+message+"\033[0m");
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic1.news",message + 1);//topic1
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic1.salary",message + 2);//topic1
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic2.news",message + 3);//topic2
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic2.item",message + 4);//topic2
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic2.sth.topic1",message + 5);//topic2&topic1
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic1.sth.topic2",message + 6);//topic2&topic1
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic3",message + 7);//topic3
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic3.news",message + 8);//topic3
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic1.topic3",message + 9); //topic1&topic3
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic2.topic3",message + 10);//topic2&topic3
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic3.topic1",message + 11);//topic3&topic1
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic2.topic3.topic1",message + 12);//topic1&topic2&topic3
            }
        }

    • Header

      HeaderExchange是一种较少使用的路由策略,HeaderExchange会根据消息的Header将消息路由到不同的Queue上,这种策略也和routingkey无关。

      • HeaderExchange配置如下:

        /**
         * @author wsyjlly
         * @create 2019.07.17 - 21:33
         **/
        @Configuration
        public class RabbitHeaderConfig {
            public final static String HEADER_NAME = "ysw-header";
            @Bean
            Queue queue10(){
                return new Queue("queue-header1");
            }
        
            @Bean
        
            Queue queue11(){
                return new Queue("queue-header2");
            }
        
            @Bean
            Queue queue12(){
                return new Queue("queue-header3");
            }
        
            @Bean
            HeadersExchange headersExchange(){
                return new HeadersExchange(HEADER_NAME,true,false);
            }
        
            @Bean
            Binding binding10(){
                Map<String,Object> map = new HashMap<>();
                map.put("age", "18");
                map.put("name", "ysw");
                return BindingBuilder.bind(queue10()).to(headersExchange()).whereAny(map).match();
            }
        
            @Bean
            Binding binding11(){
                Map<String,Object> map = new HashMap<>();
                map.put("name", "ysw");
                return BindingBuilder.bind(queue11()).to(headersExchange()).where("age").exists();
            }
        
            @Bean
            Binding binding12(){
                Map<String,Object> map = new HashMap<>();
                map.put("age", "18");
                map.put("name", "ysw");
                return BindingBuilder.bind(queue12()).to(headersExchange()).whereAll(map).match();
            }
        }

        Binding配置注释:whereAny表示消息的Header中只要有一个Header匹配上map中的key/value,就把该消息路由到名为“queue-header1”的Queue上;whereAll方法表示消息的所有Header都要匹配,才将消息路由到名为“queue-header2”的Queue上;where表示只要消息的header中包含age,无论age值为多少,都将消息路由到名为“queue-header2”的Queue上。

      • 配置消费者

        /**
         * @author wsyjlly
         * @create 2019.07.17 - 21:42
         **/
        @Component
        public class HeaderReceiver {
            Logger logger= LoggerFactory.getLogger(getClass());
        
            @RabbitListener(queues = "queue-header1")
            public void headerHandler1(byte[] msg){
                logger.info("\033[33;4m"+"queue-header1:"+new String(msg,0,msg.length)+"\033[0m");
            }
        
            @RabbitListener(queues = "queue-header2")
            public void headerHandler2(byte[] msg){
                logger.info("\033[33;4m"+"queue-header2:"+new String(msg,0,msg.length)+"\033[0m");
            }
        
            @RabbitListener(queues = "queue-header3")
            public void headerHandler3(byte[] msg){
                logger.info("\033[33;4m"+"queue-header3:"+new String(msg,0,msg.length)+"\033[0m");
            }
        }
      • 消息发送

        /**
         * @author wsyjlly
         * @create 2019.07.18 - 1:13
         **/
        @Component
        public class RabbitmqSchedule {
            @Autowired
            RabbitTemplate rabbitTemplate;
            Logger logger = LoggerFactory.getLogger(getClass());
        
            @Scheduled(cron = "0-30/3 * * * * ?")
            public void header(){
                String message = "header-task";
                logger.info("\033[33;4m"+message+"\033[0m");
        
                Message message1 = MessageBuilder.withBody("name=name".getBytes())
                .setHeader("name", "aaa").build();
                Message message2 = MessageBuilder.withBody("name=ysw".getBytes())
                .setHeader("name", "ysw").build();
                Message message3 = MessageBuilder.withBody("age=19".getBytes())
                .setHeader("age", "19").build();
                Message message4 = MessageBuilder.withBody("age=18".getBytes())
                .setHeader("age", "18").build();
                Message message5 = MessageBuilder.withBody("name=ysw&age=18".getBytes())
                .setHeader("name", "ysw").setHeader("age","18").build();
                Message message6 = MessageBuilder.withBody("name=ysw&age=19".getBytes())
                .setHeader("name", "ysw").setHeader("age","19").build();
                Message message7 = MessageBuilder.withBody("name=aaa&age=18".getBytes())
                .setHeader("name", "aaa").setHeader("age","18").build();
        
                rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME,
                null,message1);
                rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME,
                null,message2);
                rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME,
                null,message3);
                rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME,
                null,message4);
                rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME,
                null,message5);
                rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME,
                null,message6);
                rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME,
                null,message7);
            }
        }
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 Java Maven
一文搞懂Spring Boot整合RocketMQ
一文搞懂Spring Boot整合RocketMQ
91 0
|
29天前
|
NoSQL Java Redis
小白版的springboot中集成mqtt服务(超级无敌详细),实现不了掐我头!!!
小白版的springboot中集成mqtt服务(超级无敌详细),实现不了掐我头!!!
231 1
|
2月前
|
消息中间件 存储 监控
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
32 1
|
3月前
|
消息中间件 存储 安全
SpringBoot与RabbitMQ详解与整合
SpringBoot与RabbitMQ详解与整合
56 0
|
2月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
58 0
|
1月前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
74 0
|
23天前
|
消息中间件 JSON Java
SpringBoot+RabbitMQ 方式收发消息
SpringBoot+RabbitMQ 方式收发消息
14 0
|
2月前
|
消息中间件 JSON Java
如何利用springboot + rabbitmq发送邮件?
RabbitMQ相关知识请参考: RabbitMQ消息确认、消息持久化等核心知识总结 - 简书
29 2
|
3月前
|
消息中间件 数据库 RocketMQ
Springboot+RocketMQ通过事务消息优雅的实现订单支付功能
RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。
108 0
|
3月前
|
消息中间件 Java RocketMQ
Springboot整合RocketMQ 基本消息处理
1. 同步消息 2. 异步消息 3. 单向消息 4. 延迟消息 5. 批量消息 6. 顺序消息 7. Tag过滤 8. 广播消息