Spring Boot 整合 RabbitMQ

简介: Spring Boot 整合 RabbitMQ

Spring Boot 整合RabbitMQ


搭建环境


创建测试项目:test_rabbitmq_boot


6575fd56cc7ab489737120c5bf97cfbb.jpg


添加依赖


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>xc_test_parent</artifactId>
        <groupId>com.czxy.xuecheng</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>test_rabbitmq_boot</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>


添加yml文件


server:
  port: 8090
spring:
  application:
    name: test_rabbitmq_producer
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    passowrd: guest
    virtualHost: /


创建启动类:TestRabbitMQBootApplication


package com.czxy.xuecheng;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/> 
 * Created by liangtong.
 */
@SpringBootApplication
public class TestRabbitMQBootApplication {
    public static void main(String[] args) {
        SpringApplication.run(TestRabbitMQBootApplication.class, args);
    }
}


配置类


6f247f6930cb983bd2a4fbe4cbab4245.jpg


package com.czxy.xuecheng.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/> 
 * Created by liangtong.
 */
@Configuration
public class RabbitConfig {
    // 交换机名称
    public static final String EXCHANGE_TOPIC_INFORM = "inform_exchange_topic";
    //队列名称
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    />  交换机配置
     *  ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置
     *  channel.exchangeDeclare(EXCHANGE_TOPIC_INFORM, BuiltinExchangeType.TOPIC);
     * @return
     */
    @Bean(EXCHANGE_TOPIC_INFORM)
    public Exchange exchange_topic() {
        //durable(true)持久化,消息队列重启后交换机仍然存在
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_INFORM).durable(true).build();
    }
    /> 
     * 声明队列
     * channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
     * @return
     */
    @Bean(QUEUE_INFORM_SMS)
    public Queue queue_inform_sms(){
        return new Queue(QUEUE_INFORM_SMS);
    }
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue queue_inform_email(){
        return new Queue(QUEUE_INFORM_EMAIL,true,false,false);
    }
    /> 
     * 绑定队列到交换机
     * channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPIC_INFORM, "inform.#.email.#");
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding binding_queue_inform_sms(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPIC_INFORM) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();
    }
    @Bean
    public Binding binding_queue_inform_email(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPIC_INFORM) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();
    }
}


生产者


7b886c656e25ec8f229f4c234d8b2817.jpg


package com.czxy.xuecheng;
import com.czxy.xuecheng.config.RabbitConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
/> 
 * Created by liangtong.
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = TestRabbitMQBootApplication.class)
public class Producer05Topic {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendEmail() {
        //channel.basicPublish(EXCHANGE_TOPIC_INFORM, "inform.email", null, message.getBytes());
        for(int i = 0 ; i < 5 ; i ++) {
            String message = "email inform to user" + i;
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC_INFORM,"inform.email",message);
            System.out.println("Send Message is:'" + message + "'");
        }
    }
    @Test
    public void testSendSms() {
        //channel.basicPublish(EXCHANGE_TOPIC_INFORM, "inform.sms", null, message.getBytes());
        for(int i = 0 ; i < 5 ; i ++) {
            String message = "sms inform to user" + i;
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC_INFORM,"inform.sms",message);
            System.out.println("Send Message is:'" + message + "'");
        }
    }
    @Test
    public void testSendSmsAndEmail() {
        //channel.basicPublish(EXCHANGE_TOPIC_INFORM, "inform.sms.email", null, message.getBytes());
        for(int i = 0 ; i < 5 ; i ++) {
            String message = "sms and email inform to user" + i;
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC_INFORM,"inform.sms.email",message);
            System.out.println("Send Message is:'" + message + "'");
        }
    }
}


消费者


package com.czxy.xuecheng.listener;
import com.czxy.xuecheng.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/> 
 * Created by liangtong.
 */
@Component
public class Consumer05Topic {
    @RabbitListener(queues = RabbitConfig.QUEUE_INFORM_EMAIL)
    public void receiveEmail(String msg , Message message){
        System.out.println("receive message is:" + msg);
    }
/*
    @RabbitListener(queues = RabbitConfig.QUEUE_INFORM_SMS)
    public void receiveSmS(String msg , Message message){
        System.out.println("receive message is:" + msg);
    }
    */
}
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
15小时前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的广播交换机(SpringAMQP 实现Fanout广播交换机)
SpringBoot实现RabbitMQ的广播交换机(SpringAMQP 实现Fanout广播交换机)
19 2
|
15小时前
|
消息中间件 JSON Java
RabbitMQ的springboot项目集成使用-01
RabbitMQ的springboot项目集成使用-01
|
15小时前
|
消息中间件 Java Spring
Springboot 集成Rabbitmq之延时队列
Springboot 集成Rabbitmq之延时队列
5 0
|
16小时前
|
消息中间件 Java RocketMQ
Spring Cloud RocketMQ:构建可靠消息驱动的微服务架构
【4月更文挑战第28天】消息队列在微服务架构中扮演着至关重要的角色,能够实现服务之间的解耦、异步通信以及数据分发。Spring Cloud RocketMQ作为Apache RocketMQ的Spring Cloud集成,为微服务架构提供了可靠的消息传输机制。
30 1
|
15小时前
|
消息中间件 人工智能 Java
Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动
Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动
26 1
|
15小时前
|
消息中间件 Java
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
35 1
|
15小时前
|
消息中间件 安全 Java
SpringBoot基于RabbitMQ实现消息可靠性
SpringBoot基于RabbitMQ实现消息可靠性
37 0
|
15小时前
|
消息中间件 Java
SpringBoot实现RabbitMQ的通配符交换机(SpringAMQP 实现Topic交换机)
SpringBoot实现RabbitMQ的通配符交换机(SpringAMQP 实现Topic交换机)
11 1
|
15小时前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的定向交换机(SpringAMQP 实现Direct定向交换机)
SpringBoot实现RabbitMQ的定向交换机(SpringAMQP 实现Direct定向交换机)
20 1
|
15小时前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的WorkQueue(SpringAMQP 实现WorkQueue)
SpringBoot实现RabbitMQ的WorkQueue(SpringAMQP 实现WorkQueue)
20 2