springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称

简介: springboot 使用注解的方式创建rabbitmq的交换机、路由key、以及监听队列的名称

maven依赖:

<!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

rabbitmq yml文件配置:

#配置rabbitMq 服务器
  rabbitmq:
    host: IP地址
    port: 5672
    username: guest
    password: guest
    #确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    #确认消息已发送到队列(Queue)
    publisher-returns: true

消息的生产者:

package com.sinochem.agency.rabbitmq.provider;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
 * @author : jiagang
 * @date : Created in 2021/9/26 14:03
 */
@RestController
public class SendMessageTestController {
    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法
    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "hello word";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);
        //将消息携带绑定键值:testRouting 发送到交换机directExchange
        rabbitTemplate.convertAndSend("directExchange", "testRouting", map);
        return "success";
    }
}

消息的消费者端:

package com.sinochem.agency.rabbitmq.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
 * @author : jiagang
 * @date : Created in 2021/9/26 14:58
 */
@Component
@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange("directExchange"), // 交换机(直连)
        key = "testRouting",                // 路由key
        value = @Queue("testQueue"))) //监听队列的名称
public class TestReceiver{
    @RabbitHandler
    public void process(@Payload Map message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws Exception{
        System.out.println("testReceiver消费者收到消息  : " + message.toString());
        channel.basicAck(deliveryTag,true);
    }
}

发送方回调配置文件:

package com.sinochem.agency.rabbitmq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * rabbit 消息确认回调
 * @author : jiagang
 * @date : Created in 2021/9/26 15:14
 */
@Configuration
public class RabbitConfirmCallbackConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);
                System.out.println("ConfirmCallback:     "+"确认情况:"+ack);
                System.out.println("ConfirmCallback:     "+"原因:"+cause);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("ReturnCallback:     "+"消息:"+message);
                System.out.println("ReturnCallback:     "+"回应码:"+replyCode);
                System.out.println("ReturnCallback:     "+"回应信息:"+replyText);
                System.out.println("ReturnCallback:     "+"交换机:"+exchange);
                System.out.println("ReturnCallback:     "+"路由键:"+routingKey);
            }
        });
        return rabbitTemplate;
    }
}

消费方配置文件(手动ack确认):

package com.sinochem.agency.rabbitmq;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * rabbit 消费者 消息确认机制配置
 * @author : jiagang
 * @date : Created in 2021/9/26 15:26
 */
@Configuration
public class MessageListenerConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
}

文章持续更新,可以关注下方公众号或者微信搜一搜「 最后一支迷迭香 」第一时间阅读,获取更完整的链路资料。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6天前
|
安全 数据处理 网络虚拟化
|
6天前
|
运维 Java 程序员
Spring5深入浅出篇:基于注解实现的AOP
# Spring5 AOP 深入理解:注解实现 本文介绍了基于注解的AOP编程步骤,包括原始对象、额外功能、切点和组装切面。步骤1-3旨在构建切面,与传统AOP相似。示例代码展示了如何使用`@Around`定义切面和执行逻辑。配置中,通过`@Aspect`和`@Around`注解定义切点,并在Spring配置中启用AOP自动代理。 进一步讨论了切点复用,避免重复代码以提高代码维护性。通过`@Pointcut`定义通用切点表达式,然后在多个通知中引用。此外,解释了AOP底层实现的两种动态代理方式:JDK动态代理和Cglib字节码增强,默认使用JDK,可通过配置切换到Cglib
|
4天前
|
Java
Springboot 使用自定义注解结合AOP方式校验接口参数
Springboot 使用自定义注解结合AOP方式校验接口参数
Springboot 使用自定义注解结合AOP方式校验接口参数
|
6天前
|
存储 缓存 Java
【JavaEE】Spring中注解的方式去获取Bean对象
【JavaEE】Spring中注解的方式去获取Bean对象
3 0
|
6天前
|
存储 Java 对象存储
【JavaEE】Spring中注解的方式去存储Bean对象
【JavaEE】Spring中注解的方式去存储Bean对象
9 0
|
6天前
|
消息中间件 JSON Java
RabbitMQ的springboot项目集成使用-01
RabbitMQ的springboot项目集成使用-01
|
6天前
|
消息中间件 Java Spring
Springboot 集成Rabbitmq之延时队列
Springboot 集成Rabbitmq之延时队列
7 0
|
6天前
|
JSON 前端开发 Java
【JAVA进阶篇教学】第七篇:Spring中常用注解
【JAVA进阶篇教学】第七篇:Spring中常用注解
|
6天前
|
JavaScript Java 开发者
Spring Boot中的@Lazy注解:概念及实战应用
【4月更文挑战第7天】在Spring Framework中,@Lazy注解是一个非常有用的特性,它允许开发者控制Spring容器的bean初始化时机。本文将详细介绍@Lazy注解的概念,并通过一个实际的例子展示如何在Spring Boot应用中使用它。
22 2
|
6天前
|
前端开发 Java
SpringBoot之自定义注解参数校验
SpringBoot之自定义注解参数校验
19 2

热门文章

最新文章