SpringBoot使用RabbitMQ(二)

简介: springboot使用rabbitmq发送回调队列

如未看第一章的建议先看第一章基本用法:SpringBoot使用RabbitMQ,这里讲解SpringBoot使用RabbitMQ进行有回调的用法和消费者端手动签收消息的用法。

一、yml文件配置

server:
  port: 8080
spring:
  application:
    name: rabbit-confirm
  rabbitmq:
    template:
      # 使用return-callback时必须设置mandatory为true
      mandatory: true
    # 消息发送到交换机确认机制,是否确认回调
    publisher-confirms: true
    # 消息发送到交换机确认机制,是否返回回调
    publisher-returns: true
    listener:
      simple:
        # 并发消费者初始化值
        concurrency: 5
        # 最大值
        max-concurrency: 10
        # 每个消费者每次监听时可拉取处理的消息数量
        prefetch: 20
        # 确认模式设置为手动签收
        acknowledge-mode: manual

二、定义配置类

/**
 * @author Gjing
 **/
@Configuration
public class ConfirmConfiguration {

    /**
     * 声明confirm.message队列
     */
    @Bean
    public Queue confirmQueue() {
        return new Queue("confirm.message");
    }

    /**
     * 声明一个名为exchange-2的交换机
     */
    @Bean
    public TopicExchange exchange2() {
        return new TopicExchange("exchange-2");
    }

    /**
     * 将confirm.message的队列绑定到exchange-2交换机
     */
    @Bean
    public Binding bindMessage1() {
        return BindingBuilder.bind(confirmQueue()).to(exchange2()).with("confirm.message");
    }
}

三、定义生产者

/**
 * @author Gjing
 **/
@Component
@Slf4j
public class ConfirmProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 如果消息没有到exchange,则confirm回调,ack=false
     * 如果消息到达exchange,则confirm回调,ack=true
     * exchange到queue成功,则不回调return
     * exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
     */
    private final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
        if (!ack) {
            log.error("消息发送失败:correlationData: {},cause: {}", correlationData, cause);
        }else {
            log.info("消息发送成功:correlationData: {},ack: {}", correlationData, ack);
        }
    };

    private final RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText, exchange, routeKey) ->
            log.error("消息丢失: exchange: {},routeKey: {},replyCode: {},replyText: {}", exchange, routeKey, replyCode, replyText);

    /**
     * 发送消息
     * @param message 消息内容
     */
    public void send(String message) {
        // 构建回调返回的数据
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(TimeUtil.localDateTimeToStamp(LocalDateTime.now()) + "");

        Message message1 = MessageBuilder.withBody(message.toString().getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                // 将CorrelationData的id 与 Message的correlationId绑定,然后关系保存起来,然后人工处理
                .setCorrelationId(correlationData.getId())
                .build();
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        rabbitTemplate.convertAndSend("exchange-2", "confirm.message", message1, correlationData);
    }
}

四、定义消费者

/**
 * @author Gjing
 **/
@Component
@Slf4j
public class ConfirmConsumer {

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "confirm.message",durable = "true")
            ,exchange = @Exchange(value = "exchange-2",type = "topic")
            ,key = "confirm.message"))
    public void receive(String message, Message message1, Channel channel) throws IOException {
        log.info("消费者收到消息:{}", message);
        long deliverTag = message1.getMessageProperties().getDeliveryTag();
        //第一个deliveryTag参数为每条信息带有的tag值,第二个multiple参数为布尔类型
        //为true时会将小于等于此次tag的所有消息都确认掉,如果为false则只确认当前tag的信息,可根据实际情况进行选择。
        channel.basicAck(deliverTag, false);
    }
}

五、创建controller调用

/**
 * @author Gjing
 **/
@RestController
public class ConfirmController {
    @Resource
    private ConfirmProducer confirmProducer;

    @PostMapping("/confirm-message")
    public void confirmMessage() {
        confirmProducer.send("hello confirm message");
    }
}

六、执行结果

1560498638_1_


以上为个人理解,如有误欢迎各位指正

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 Java 网络架构
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
36 6
|
5月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
5月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
1044 3
|
5月前
|
消息中间件 Java Maven
|
6月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
412 1
|
6月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
|
6月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
8月前
|
XML 安全 Java
深入实践springboot实战 蓄势待发 我不是雷锋 我是知识搬运工
springboot,说白了就是一个集合了功能的大类库,包括springMVC,spring,spring data,spring security等等,并且提供了很多和可以和其他常用框架,插件完美整合的接口(只能说是一些常用框架,基本在github上能排上名次的都有完美整合,但如果是自己写的一个框架就无法实现快速整合)。
|
5月前
|
缓存 Java Maven
Java本地高性能缓存实践问题之SpringBoot中引入Caffeine作为缓存库的问题如何解决
Java本地高性能缓存实践问题之SpringBoot中引入Caffeine作为缓存库的问题如何解决
128 1