RabbitMq数据发送监听

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: RabbitMq数据发送监听

前言

在工作中使用rabbit传输数据时,可能会因为数据、网络等问题,导致数据发送或者接收失败
如果对此类问题没有做好处理,就会存在丢失数据的问题,为此,引入了ConfirmCallback与ReturnCallback,来保证系统能够做到更好的数据监听、以及消费失败的数据做好相应的补偿;

ConfirmCallback与ReturnCallback也被称为Rabbitmq的消息确认机制;

这两个方法主要解决是否发送到交换机和是否发送到队列的监控问题

image.png

实现

@Slf4j
@Configuration
public class RabbitCallBackConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value(value = "${spring.profiles.active}")
    private String activeProfile;

    @SneakyThrows
    @PostConstruct
    public void enableConfirmCallBack() {
        //confirm 监听,当消息成功发到交换机 ack = true,没有发送到交换机 ack = false
        //correlationData 可在发送时指定消息唯一 id
        rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
            if (!ack) {
                // 消息没有发送到交换机,则发送邮件通知
                rabbitTemplate.send(MessageQueueContant.MESSAGE_EXCHANGE + "_" + activeProfile,
                        MessageQueueContant.SEND_MAIL_QUEUE_KEY,
                        getMessage("【重要】 消息未发送到交换机", "", null));
            }
        }));

        //当消息成功发送到交换机没有路由到队列触发此监听
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) ->
                rabbitTemplate.send(MessageQueueContant.MESSAGE_EXCHANGE + "_" + activeProfile,
                        MessageQueueContant.SEND_MAIL_QUEUE_KEY,
                        getMessage(String.format("【重要】 消息未路由到队列,交换机名称:%s", exchange),
                                new String(message.getBody(), StandardCharsets.UTF_8), message)));
    }

    @SneakyThrows
    private Message getMessage(String title, String content, Message message) {
        MailModel mailModel = new MailModel();
        mailModel.setAddressee(Arrays.asList("yangjiawen@dstcar.com", "fengtianzhu@dstcar.com", "dailei@dstcar.com"));
        mailModel.setTitle(Objects.toString(title, "【重要】队列发送消息失败 -- " + this.activeProfile));
        mailModel.setText(content);
        return new Message(JsonUtil.writeValueAsBytes(mailModel), getMessageProperties(message));
    }

    private MessageProperties getMessageProperties(Message message) {
        MessageProperties messageProperties = Objects.isNull(message) ?
                new MessageProperties() : message.getMessageProperties();
        messageProperties.setContentType("application/json");
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        return messageProperties;
    }

}

关联技术

  rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
            if (!ack) {
                // 消息没有发送到交换机,则发送邮件通知
                rabbitTemplate.send(MessageQueueContant.MESSAGE_EXCHANGE + "_" + activeProfile,
                        MessageQueueContant.SEND_MAIL_QUEUE_KEY,
                        getMessage("【重要】 消息未发送到交换机", "", null));
            }
        }));

这段代码使用了一个技术,就是方法的入参是函数式接口,如果不熟悉java8看起来这段代码多少有些疑惑,我们来看下这个方法的入参

    public void setConfirmCallback(ConfirmCallback confirmCallback) {
        Assert.state(this.confirmCallback == null || this.confirmCallback == confirmCallback,
                "Only one ConfirmCallback is supported by each RabbitTemplate");
        this.confirmCallback = confirmCallback;
    }

入参是ConfirmCallback类,我们来看下这个类

@FunctionalInterface
    public interface ConfirmCallback {

        /**
         * Confirmation callback.
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */
        void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);

    }

可以看到他是一个接口类,但是加了@FunctionalInterface注解来修饰,说明他是一个函数式接口,那函数式接口有什么作用呢,看了下面的代码基本就知道了。
我定义一个函数式接口

@FunctionalInterface
public interface FunctionInterface {

    public void sendInfo(String aa,String bb);
}

我们调用一下他

public class TestInteface {

    public static void main(String[] args) {
        TestEntity ss = new TestEntity();
        ss.setTeeee((aa, bb)->{
            System.out.println(aa);
        });
    }

    static class TestEntity{
        public void setTeeee(FunctionInterface functionInterface){
            functionInterface.sendInfo("2","4");
        }
    }


}

这时候控制台输出式 2,我们可以看到函数式接口有一个很不一样的地方,就是当把他作为参数时,我们可以在传参时给他写实现,这样就可以增强一个接口的灵活性了。
其实java8有一个我们熟悉的类,专门处理这种情况,就是Consumer类

@FunctionalInterface
public interface Consumer<T> {

    /**
     * Performs this operation on the given argument.
     *
     * @param t the input argument
     */
    void accept(T t);


    default Consumer<T> andThen(Consumer<? super T> after) {
        Objects.requireNonNull(after);
        return (T t) -> { accept(t); after.accept(t); };
    }
}

这种特性能帮我们实现什么呢?其实有一种我们很常见的场景使用函数式接口作为参数就有很大的用处

    /**
     * 环绕增强
     */
    public void convert(String data, Consumer extendFunctionBefore, Consumer extendFunctionAfter) {
        Optional.ofNullable(extendFunctionBefore).ifPresent(s -> s.accept(1));
        System.out.println(data);
        Optional.ofNullable(extendFunctionAfter).ifPresent(s -> s.accept(1));
    }

这么处理我们就可以在我们想执行的方法前后灵活加上我们想执行的逻辑

结语

本来只是想说一下mq的,但是随着对实现的好奇,就一点点的说到了java8,技术总是一环套一环。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 安全 物联网
MQTT常见问题之新增自定义主题后平台侧收不到发布的数据如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
3月前
|
物联网 网络性能优化 API
MQTT常见问题之单个消息发送数据不能超过64k如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
3月前
|
消息中间件 存储 监控
|
3月前
|
机器学习/深度学习 开发工具
DP活动:HMI-Board以太网数据监视器(二)MQTT和LVGL
DP活动:HMI-Board以太网数据监视器(二)MQTT和LVGL
57 1
|
21天前
|
消息中间件 Java 测试技术
消息队列 MQ使用问题之数据流出规则是否支持平台的云RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
EMQ
|
1月前
|
传感器 人工智能 安全
EMQX 与 MQTT: AI 大模型时代的分布式数据中枢
在以数据为核心的 AI 时代,基于 MQTT 协议的消息服务器 EMQX 能帮助企业更好的利用人工智能和机器学习模型,是智能化系统中核心的数据基础软件。
EMQ
164 3
|
21天前
|
消息中间件 监控 物联网
消息队列 MQ使用问题之如何获取和处理消息堆积数据
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
21天前
|
消息中间件 数据安全/隐私保护 RocketMQ
消息队列 MQ使用问题之如何设置nameserver监听的IP
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 Shell 数据处理
rocket mq 查看消费进度,消息堆积,清除堆积数据命令
该内容是关于RocketMQ的消费进度管理和堆积数据处理的指导。首先,需进入RocketMQ的bin目录,然后使用`mqadmin consumerProgress`命令查看消费者或生产者的消费进度。`broker offset`和`consumer offset`的差值表示未消费消息。通过`resetOffsetByTime`命令可重置消费位点来清除堆积数据,未消费消息默认3天后会被丢弃。此外,`CONSUME_FROM WHERE`枚举类定义了消费起点选项,包括从最后、最开始或指定时间点消费。
701 3
|
3月前
|
消息中间件 JavaScript Java
MQ产品使用合集之视觉智能平台人脸搜索1:N怎么更新人脸数据
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。