springboot之rabbit - producer的confirm和consumer的ack模式

简介: 本篇和大家分享的是关于rabbit的生产和消费方的一些实用的操作;正如文章标题,主要内容如producer的confirm和consumer的ack,这两者使用的模式都是用来保证数据完整性,防止数据丢失。

本篇和大家分享的是关于rabbit的生产和消费方的一些实用的操作;正如文章标题,主要内容如producer的confirm和consumer的ack,这两者使用的模式都是用来保证数据完整性,防止数据丢失

  • producer的confirm模式
  • consumer的ack模式

producer的confirm模式

首先,有这样一种业务场景1:a系统在做活动前,需要给用户的手机发送一条活动内容短信希望用户来参加,因为用户量有点大,所以通过往短信mq中插入数据方式,让短信服务来消费mq发短信;
此时插入mq消息的服务为了保证给所有用户发消息,并且要在短时间内插入完成(因此用到了异步插入方式(快速)),我们就需要知道每次插入mq是否成功,如果不成功那我们可以收集失败的信息后补发(因此confirm模式排上了用场);如图设计:
image
在springboot中可以使用基于amqp封装的工厂类来开启confirm模式,然后通过RabbitTemplate模板来设置回调函数,如下代码:

    ///region producer生产 - confirm模式

    public RabbitTemplate getRabbitTemplate(RabbitTemplate.ConfirmCallback confirmCallback) {
        return this.getRabbitTemplate(this.connectionFactory(), confirmCallback);
    }

    public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory, RabbitTemplate.ConfirmCallback confirmCallback) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        //product开启confirm模式
        connectionFactory.setPublisherConfirms(true);
        //设置confirm回调处理
        template.setConfirmCallback(confirmCallback);
        return template;
    }
    ///endregion

这里通过RabbitTemplate.ConfirmCallback函数编程来传递我们自定义的回调方法,如下收集confirm返回的结果信息:

        RabbitUtil rabbitUtil = new RabbitUtil(this.getFirstNode().getLink());
        RabbitTemplate template = rabbitUtil.getRabbitTemplate((a, b, c) -> {
            System.out.println("firstNodeTpl - ConfirmCallback的Id:" + a.getId() + ";状态:" + b + ";信息:" + c);
        });

最后再通过RabbitTemplate实例的convertAndSend方法发送mq信息,我们能够在日志中看到如下记录的信息:
image
这里的状态true:表示send成功,false:表示send失败;通常false的时候信息c会有响应的错误提示,这里把网络断开,如下错误提示:
image

consumer的ack模式

再来,有这样一种场景2:短信服务去消费mq队列信息时,倘若服务调用的运营商发送短信接口异常了(短信运营商接口欠费),我们此时的短信是发送失败的,用户也收不到短信,但是在默认(默认开启ack)前提下mq消息已经被消费了rabbit中没有记录了(kafka例外);想要mq消息在业务逻辑异常时还存在,那么可以使用ack方式;

image
在springboot中可以使用基于amqp封装的工厂类关闭自动ack模式,改为手动ack方式;只有当业务代码流程走完后,最后通过代码设置ack标识,来通知rabbit消息可以丢弃了;如果设置了手动模式后,又没有提交ack标识,那么mq中的消息一直存在无法释放(每次consumer消费后,rabbit会把noack的消息重复放入队列中):

    ///region consumer监听 - 手动ack
    public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
        return this.listenerContainerFactory(this.connectionFactory());
    }

    public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //代码手动ack
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //开启消费者数量
        factory.setConcurrentConsumers(2);
        //每次接受数据量,默认250
        factory.setPrefetchCount(300);
        return factory;
    }
    ///endregion

通过连接工厂设置手动ack方式,然后获取mq消息后,走完正常业务逻辑,最后再手动通知ack释放消息,如下:

    @RabbitListener(containerFactory = "firstNodeListener", queues = {"${shenniu.rabbits.firstNode.queue}"})
    private void firstNodeListener(String msg, Channel channel, Message message) {
        try {
            long deliverTag = message.getMessageProperties().getDeliveryTag();
            System.out.println("firstNodeListener - 消费消息 [" + deliverTag + "] - " + msg);
            channel.basicAck(deliverTag, true);
        } catch (Exception ex) {
        }
    }

这里ack主要根据mq消息的唯一编号(deliverTag)来通知;如果我们不设置ack确认,那么消息状态会是这样如下rabbit管理后台:
image

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。     相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
目录
相关文章
|
10月前
|
负载均衡 IDE Java
SpringBoot整合XXL-JOB【04】- 以GLUE模式运行与执行器负载均衡策略
在本节中,我们将介绍XXL-JOB的GLUE模式和集群模式下的路由策略。GLUE模式允许直接在线上改造方法为定时任务,无需重新部署。通过一个测试方法,展示了如何在调度中心配置并使用GLUE模式执行定时任务。接着,我们探讨了多实例环境下的负载均衡策略,确保任务不会重复执行,并可通过修改路由策略(如轮训)实现任务在多个实例间的均衡分配。最后,总结了GLUE模式和负载均衡策略的应用,帮助读者更深入理解XXL-JOB的使用。
465 9
SpringBoot整合XXL-JOB【04】-  以GLUE模式运行与执行器负载均衡策略
|
Kubernetes 虚拟化 网络架构
在K8S中,flannel有几种模式?
在K8S中,flannel有几种模式?
|
弹性计算 Kubernetes Serverless
Kubernetes 的架构问题之Serverless Container中不支持特权模式的问题如何解决
Kubernetes 的架构问题之Serverless Container中不支持特权模式的问题如何解决
202 9
|
存储 Kubernetes 前端开发
k8s部署DataEase1.16.0cluster模式
k8s部署DataEase1.16.0cluster模式
|
存储 Kubernetes 安全
在K8S中,你用的flannel是哪个工作模式及fannel的底层原理如何实现数据报文转发的?
在K8S中,你用的flannel是哪个工作模式及fannel的底层原理如何实现数据报文转发的?
|
Kubernetes 负载均衡 算法
k8s学习--kube-proxy的三种工作模式详细解释
k8s学习--kube-proxy的三种工作模式详细解释
580 0
|
传感器 人工智能 前端开发
JAVA语言VUE2+Spring boot+MySQL开发的智慧校园系统源码(电子班牌可人脸识别)Saas 模式
智慧校园电子班牌,坐落于班级的门口,适合于各类型学校的场景应用,班级学校日常内容更新可由班级自行管理,也可由学校统一管理。让我们一起看看,电子班牌有哪些功能呢?
706 4
JAVA语言VUE2+Spring boot+MySQL开发的智慧校园系统源码(电子班牌可人脸识别)Saas 模式
|
存储 设计模式 Java
Spring Boot中的事件溯源模式
Spring Boot中的事件溯源模式
|
Kubernetes Linux 容器
K8S中,flannel有几种模式?
K8S中,flannel有几种模式?
|
Kubernetes 负载均衡 监控
在K8S中,kube-proxy的工作模式是什么?
在K8S中,kube-proxy的工作模式是什么?

热门文章

最新文章

下一篇
开通oss服务