带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证(下)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证(下)

发送方确认机制 publisher confirm


publisher-confirms: true #确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,只要正确的到达exchange中,broker即可确认该消息返回给客户端


ackpublisher-returns: true #确认消息是否正确到达queue,如果没有则触发,如果有则不触发


ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。


rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    CorrelationDataEx c = (CorrelationDataEx)correlationData;
                    System.out.println("发送消息: " + c.getMsg());
                    System.out.println("HelloSender 消息发送成功 :" + correlationData.toString() );
                    /**
                     * 通过设置correlationData.id为业务主键,消息发送成功后去继续做候选业务。
                     */
                } else {
                    System.out.println("HelloSender消息发送失败" + cause);
                }
            });


ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调


rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                 //Users users1 = (Users)message.getBody().toString();
                 //String correlationId = message.getMessageProperties().getCorrelationId();
                 System.out.println("Message : " + new String(message.getBody()));
                 //System.out.println("Message : " + new String(message.getBody()));
                 System.out.println("replyCode : " + replyCode);
                 System.out.println("replyText : " + replyText);  //错误原因
                 System.out.println("exchange : " + exchange);
                 System.out.println("routingKey : " + routingKey);//queue名称
             });


/**
              * CorrelationDataEx继承CorrelationData, 把需要发送消息的关键字段加入
              * 这样confirmcallback可以返回带有关键字段的correlationData,我们可以通过这个来确定发送的是那条业务记录
              */
             CorrelationDataEx c = new CorrelationDataEx();
             c.setId(users.getId().toString());
             c.setMsg(users.toString());
             /**
              * 加上这个,可以从returncallback参数中读取发送的json消息,否则是二进制bytes
              * 比如:如果returncallback触发,则表明消息没有投递到队列,则继续业务操作,比如将消息记录标志位未投递成功,记录投递次数
              */
             rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
             rabbitTemplate.convertAndSend(EXCHANGE, QUEUE_TWO_ROUTING, users, c);


消息消费


1.配置


listener:
              simple:
                prefetch: 1               #设置一次处理一个消息
                acknowledge-mode: manual  #设置消费端手动 ack
                concurrency: 3            #设置同时有3个消费者消费,需要3个消费者实例


2.代码


@RabbitHandler
            @RabbitListener(queues = QUEUE_ONE_ROUTING) //containerFactory = "rabbitListenerContainerFactory", concurrency = "2")
            public void process(Users users, Channel channel, Message message) throws IOException {
                System.out.println("HelloReceiver收到  : " + users.toString() + "收到时间" + new Date());
                try {
                    //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了
                    // 否则消息服务器以为这条消息没处理掉 后续还会在发
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                    System.out.println("receiver success");
                } catch (IOException e) {
                    e.printStackTrace();
                    //丢弃这条消息,则不会重新发送了
                    //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                    System.out.println("receiver fail");
                }
            }



验证


创建消息生产者和消费者


生产者


集群配置:


spring:
      application:
        name: rabbitmq-producer-demo
      rabbitmq:
        # 单点配置
        #host: localhost
        #port: 5672
        # 集群的配置
        addresses: 10.156.13.92:5672,10.156.13.93:5672,10.156.13.94:5672
        username: rabbitmq  #guest是缺省,只能localhost网络访问,要访问远程网络,需要创建用户
        password: 123456
        # 像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?RabbitMQ也有类似的权限管理。
        # 在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器,
        # 每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。
        # Virtual Name一般以/开头
        virtual-host: /
        # 确认消息是否正确到达queue,如果没有则触发,如果有则不触发
        publisher-returns: on
        # 确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,
        # 只要正确的到达exchange中,broker即可确认该消息返回给客户端ack
        # 如果是simple就不会回调
        publisher-confirm-type: correlated
        template:
          #设置为 on 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
          mandatory: on


队列设置: 设置了queue_sleb_accept队列


@Configuration
    public class RabbitConfig {
        /**
         * 投保消息交换机的名字
         */
        public static final String EXCHANGE_SLEB_ACCEPT = "exchange_sleb_accept";
        /**
         * 投保消息队列
         */
        public static final String QUEUE_SLEB_ACCEPT = "queue_sleb_accept";
        /**
         * 投保消息路由键
         */
        public static final String ROUTING_KEY_ACCEPT = "routing_key_accept";
        /**
         *  投保消息死信交换机
         */
        public static final String DLX_EXCHANGE_SLEB_ACCEPT = "exchange_dlx_sleb_accept";
        /**
         * 投保消息死信队列
         */
        public static final String DLX_QUEUE_SLEB_ACCEPT = "queue_dlx_sleb_accept";
        /**
         *  常用交换器类型如下:
         *       Direct(DirectExchange):direct 类型的行为是"先匹配, 再投送".
         *       即在绑定时设定一个 routing_key, 消息的routing_key完全匹配时, 才会被交换器投送到绑定的队列中去。
         *       Topic(TopicExchange):按规则转发消息(最灵活)。
         *       Headers(HeadersExchange):设置header attribute参数类型的交换机。
         *       Fanout(FanoutExchange):转发消息到所有绑定队列。
         *
         * 下面都是采用direct, 必须严格匹配exchange和queue
         * 投保消息交换机
         */
        @Bean("slebAcceptExchange")
        DirectExchange slebAcceptExchange() {
            return ExchangeBuilder.directExchange(EXCHANGE_SLEB_ACCEPT).durable(true).build();
        }
        /**
         * 第二个参数 durable: 是否持久化,如果true,则此种队列叫持久化队列(Durable queues)。此队列会被存储在磁盘上,
         *                 当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。
         * 第三个参数 execulusive: 表示此对应只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
         * 第四个参数 autoDelete: 当没有生成者/消费者使用此队列时,此队列会被自动删除。(即当最后一个消费者退订后即被删除)
         *
         * 这儿是(queue)队列持久化(durable=true),exchange也需要持久化
         * ********************死信队列**********************************************************
         *            x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
         *            x-dead-letter-routing-key  这里声明当前队列的死信路由key
         *            采用死信队列,才会用到下面的参数
         *            Map<String, Object> args = new HashMap<>(2);
         *            args.put("x-dead-letter-exchange", DLX_EXCHANGE_SLEB_ACCEPT);
         *            args.put("x-dead-letter-routing-key", ROUTING_KEY_ACCEPT);
         *            return QueueBuilder.durable(QUEUE_SLEB_ACCEPT).withArguments(args).build();
         * ********************死信队列**********************************************************
         * 投保消息队列
         */
        @Bean("slebAcceptQueue")
        public Queue slebAcceptQueue() {
            return QueueBuilder.durable(QUEUE_SLEB_ACCEPT).build();
        }
        /**
         * 交换机、队列、绑定
         */
        @Bean("bindingSlebAcceptExchange")
        Binding bindingSlebAcceptExchange(@Qualifier("slebAcceptQueue") Queue queue,
                                          @Qualifier("slebAcceptExchange") DirectExchange directExchange) {
            return BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY_ACCEPT);
        }
        /**
         * 投保死信交换机
         */
        @Bean("slebDlxAcceptExchange")
        DirectExchange slebDlxAcceptExchange() {
            return ExchangeBuilder.directExchange(DLX_EXCHANGE_SLEB_ACCEPT).durable(true).build();
        }
        /**
         * 投保死信队列
         */
        @Bean("slebDlxAcceptQueue")
        public Queue slebDlxAcceptQueue() {
            return QueueBuilder.durable(DLX_QUEUE_SLEB_ACCEPT).build();
        }
        /**
         * 死信交换机、队列、绑定
         */
        @Bean("bindingDlxSlebAcceptExchange")
        Binding bindingDlxSlebAcceptExchange(@Qualifier("slebDlxAcceptQueue") Queue     queue, @Qualifier("slebDlxAcceptExchange") DirectExchange directExchange) {
            return BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY_ACCEPT);
        }


生产消息


@Service
    public class AcceptProducerServiceImpl implements AcceptProducerService {
        private final Logger logger = LoggerFactory.getLogger(AcceptProducerServiceImpl.class);
        private final RabbitTemplate rabbitTemplate;
        public AcceptProducerServiceImpl(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }
        @Override
        public void sendMessage(PolicyModal policyModal) {
            logger.info("开始发送时间: " + DateUtils.localDateTimeToString(LocalDateTime.now())
                    + ",保单号: " + policyModal.getPolicyNo()
                    + ",发送内容: " + policyModal.toString());
            /*
             * policyDataEx继承CorrelationData, 把需要发送消息的关键字段加入
             * 这样confirmcallback可以返回带有关键字段的correlationData,我们可以通过这个来确定发送的是那条业务记录
             * policyno为唯一的值
             */
            PolicyDataEx policyDataEx = new PolicyDataEx();
            policyDataEx.setId(policyModal.getPolicyNo());
            policyDataEx.setMessage(policyModal.toString());
            /*
             * 加上这个,可以从returncallback参数中读取发送的json消息,否则是二进制bytes
             * 比如:如果returncallback触发,则表明消息没有投递到队列,则继续业务操作,比如将消息记录标志位未投递成功,记录投递次数
             */
            //rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            //PolicyModal类的全限名称(包名+类名)会带入到mq, 所以消费者服务一边必须有同样全限名称的类,否则接收会失败。
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_SLEB_ACCEPT, RabbitConfig.ROUTING_KEY_ACCEPT, policyModal, policyDataEx);
        }


运行验证


http://localhost:9020/sendsing


image.png


查看3台服务器控制台:看到已经创建了镜像队列,并且有一个消息在队列里面:


image.png


image.png


消费者


配置


spring:
      application:
        name: rabbitmq-consumer-demo
      rabbitmq:
        # 单点配置
        #host: localhost
        #port: 5672
        # 集群的配置
        addresses: 10.156.13.92:5672,10.156.13.93:5672,10.156.13.94:5672
        username: rabbitmq
        password: 123456
        # 像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?RabbitMQ也有类似的权限管理。
        # 在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器,
        # 每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。
        # Virtual Name一般以/开头
        virtual-host: /
        listener:
          simple:
            prefetch: 1               #设置一次处理一个消息
            acknowledge-mode: manual  #设置消费端手动 ack
            concurrency: 3            #设置同时有3个消费者消费
            #消息接收确认,可选模式:NONE(不确认)、AUTO(自动确认)、MANUAL(手动确认)


配置队列名称,主要名称和生产者里面的名称一样


public class RabbitMQConfigInfo {
        /**
         * 投保消息队列
         */
        public static final String QUEUE_SLEB_ACCEPT = "queue_sleb_accept";
        /**
         * 投保消息交换机的名字
         */
        public static final String EXCHANGE_SLEB_ACCEPT = "exchange_sleb_accept";
        /**
         * 投保消息路由键
         */
        public static final String ROUTING_KEY_ACCEPT = "routing_key_accept";
    }


消费


@Service
    public class RabbitConsumerServiceImpl implements RabbitConsumerService {
        private final Logger logger = LoggerFactory.getLogger(RabbitConsumerServiceImpl.class);
        @RabbitHandler
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = QUEUE_SLEB_ACCEPT, durable = "true"),
                exchange = @Exchange(name = EXCHANGE_SLEB_ACCEPT,
                        ignoreDeclarationExceptions = "true"),
                key = {ROUTING_KEY_ACCEPT}
        ))
        @Override
        public void process(Channel channel, Message message) throws IOException {
            String jsonStr = new String(message.getBody());
            logger.info("接收信息时间: " + DateUtils.localDateTimeToString(LocalDateTime.now())
                    + "\n,消息:" + jsonStr);
            //PolicyModal类的全限名称(包名+类名)会带入到mq, 所以消费者服务一边必须有同样全限名称的类,否则接收会失败。
            PolicyModal policyModal = JsonUtils.JSON2Object(jsonStr, PolicyModal.class);
            assert policyModal != null;
            try {
                //将message中的body获取出来, 转换为PolicyModal,再获取policyno
                //更根据policyno新数据库里面的标志,
                // todo
                //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了
                // 否则消息服务器以为这条消息没处理掉 后续还会在发
                //throw new IOException("myself");
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                /*logger.info("接收处理成功:\n"
                        + "接收信息时间: " + DateUtils.localDateTimeToString(LocalDateTime.now())
                        + ",保单号: " + policyModal.getPolicyNo()
                        + "\n,消息:" + new String(message.getBody()));
    */
            } catch (IOException e) {
                e.printStackTrace();
                //丢弃这条消息,则不会重新发送了
                //一般不丢弃,超时后mq自动会转到死信队列(如果设置了超时时间和死信交换机和队列后)
                //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                logger.info("接收处理失败:\n"
                        + "接收信息时间: " + DateUtils.localDateTimeToString(LocalDateTime.now())
                        + ",保单号: " + policyModal.getPolicyNo()
                        + "\n,消息:" + new String(message.getBody()));
            }
        }
    }


启动验证


image.png


在看各个服务器控制台:消息已经被消费,队列里面消息为0


image.png

image.png


结束


技术文章难写,这个花了前后一个礼拜的时间,希望对大家有帮助。有要验证代码的,可以发邮件:lazasha@163.com联系我,我给你发。懒,没空上github,回来再说。


END



相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
边缘计算 负载均衡 NoSQL
FreeMQTT Plus: 一个新型 MQTT Broker 集群的实现
FreeMQTT Plus 是一款基于 MQTT 协议的高性能消息中间件,采用分布式架构解决单点瓶颈问题。其核心由 Nginx 负载均衡器、黑(A)节点(MQTT Broker)、白(B)节点(消息路由)和日志(L)节点组成。通过无主从设计,支持高可用性、负载均衡与灵活扩展。针对会话同步、消息路由等挑战,FreeMQTT Plus 利用 MQTT5 特性定义元命令,实现节点间高效通信,无需依赖第三方组件。适用于物联网海量设备接入与高并发场景,为未来边缘计算和多级集群部署提供坚实基础。
1342 74
|
9月前
|
消息中间件 监控 RocketMQ
Docker部署RocketMQ5.2.0集群
本文详细介绍了如何使用Docker和Docker Compose部署RocketMQ 5.2.0集群。通过创建配置文件、启动集群和验证容器状态,您可以快速搭建起一个RocketMQ集群环境。希望本文能够帮助您更好地理解和应用RocketMQ,提高消息中间件的部署和管理效率。
1179 91
|
消息中间件 Linux API
centos7 安装rabbitmq自定义版本及配置
centos7 安装rabbitmq自定义版本及配置
|
11月前
|
消息中间件 存储 运维
2024最全RabbitMQ集群方案汇总
本文梳理了RabbitMQ集群的几种方案,主要包括普通集群、镜像集群(高可用)、Quorum队列(仲裁队列)、Streams集群模式(高可用+负载均衡)和插件方式。重点介绍了每种方案的特点、优缺点及适用场景。搭建步骤包括安装Erlang和RabbitMQ、配置集群节点、修改hosts文件、配置Erlang Cookie、启动独立节点并创建集群,以及配置镜像队列以提高可用性和容错性。推荐使用Quorum队列与Streams模式,其中Quorum队列适合高可用集群,Streams模式则同时支持高可用和负载均衡。此外,还有Shovel和Federation插件可用于特定场景下的集群搭建。
2209 2
|
11月前
|
消息中间件 RocketMQ
2024最全RocketMQ集群方案汇总
在研究RocketMQ集群方案时,发现网上存在诸多不一致之处,如组件包含NameServer、Broker、Proxy等。通过查阅官方文档,了解到v4.x和v5.x版本的差异。v4.x部署模式包括单主、多主、多主多从(异步复制、同步双写),而v5.x新增Local与Cluster模式,主要区别在于Broker和Proxy是否同进程部署。Local模式适合平滑升级,Cluster模式适合高可用需求。不同模式下,集群部署方案大致相同,涵盖单主、多主、多主多从等模式,以满足不同的高可用性和性能需求。
1546 0
ly~
|
消息中间件 搜索推荐 大数据
一般情况下在 RocketMQ 中添加 access key 的步骤: 一、确定配置文件位置 RocketMQ 的配置文件通常位于安装目录下的 conf 文件夹中。你需要找到 broker.conf 或相关的配置文件。 二、编辑配置文件 打开配置文件,查找与 ACL(访问控制列表)相关的配置部分。 在配置文件中添加以下内容:
大数据广泛应用于商业、金融、医疗和政府等多个领域。在商业上,它支持精准营销、客户细分及流失预测,并优化供应链管理;金融领域则利用大数据进行风险评估、市场预测及欺诈检测;医疗行业通过大数据预测疾病、提供个性化治疗;政府运用大数据进行城市规划和公共安全管理;工业领域则借助大数据进行设备维护、故障预测及质量控制。
ly~
846 2
|
消息中间件 存储 负载均衡
|
消息中间件 存储 负载均衡
"RabbitMQ集群大揭秘!让你的消息传递系统秒变超级英雄,轻松应对亿级并发挑战!"
【8月更文挑战第24天】RabbitMQ是一款基于AMQP的开源消息中间件,以其高可靠性、扩展性和易用性闻名。面对高并发和大数据挑战时,可通过构建集群提升性能。本文深入探讨RabbitMQ集群配置、工作原理,并提供示例代码。集群由多个通过网络连接的节点组成,共享消息队列,确保高可用性和负载均衡。搭建集群需准备多台服务器,安装Erlang和RabbitMQ,并确保节点间通信顺畅。核心步骤包括配置.erlang.cookie文件、使用rabbitmqctl命令加入集群。消息发布至任一节点时,通过集群机制同步至其他节点;消费者可从任一节点获取消息。
208 2
|
存储 C# 关系型数据库
“云端融合:WPF应用无缝对接Azure与AWS——从Blob存储到RDS数据库,全面解析跨平台云服务集成的最佳实践”
【8月更文挑战第31天】本文探讨了如何将Windows Presentation Foundation(WPF)应用与Microsoft Azure和Amazon Web Services(AWS)两大主流云平台无缝集成。通过具体示例代码展示了如何利用Azure Blob Storage存储非结构化数据、Azure Cosmos DB进行分布式数据库操作;同时介绍了如何借助Amazon S3实现大规模数据存储及通过Amazon RDS简化数据库管理。这不仅提升了WPF应用的可扩展性和可用性,还降低了基础设施成本。
338 0
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建