【消息中间件】默认的RocketMQ消息消费者是如何启动的?(上)

本文涉及的产品
性能测试 PTS,5000VUM额度
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。

前言

大家好,我是小郭,在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。

启动流程图

网络异常,图片无法展示
|

Push和Pull的区别

Apache RocketMQ在消费者服务中,为我们提供了Push模式也提供了Pull模式

那他们主要有什么区别呢?

  • Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
  • Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

BROADCASTING 和 CLUSTERING 模式的区别

BROADCASTING(广播模式):当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。

CLUSTERING(集群模式):当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。

Consumer启动流程

我们直接跑一个官方提供的Demo,大家也可以去官网上去下载源码

public static void main(String[] args) throws InterruptedException, MQClientException {
    /*
     * Instantiate with specified consumer group name.
     * 消费者模式有两种 推和拉
     */
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    /*
     * Specify where to start in case the specific consumer group is a brand-new one.
     * 指定消费从哪里开始
     */
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    /*
     * Subscribe one more topic to consume.
     * 设置监听主题以及过滤条件
     */
    consumer.subscribe("TopicTest999", "*");
    /*
     *  Register callback to execute on arrival of messages fetched from brokers.
     *  注册消息监听器
     */
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            //System.out.println("待消费条数:"+ msgs.size());
            LOGGER.info("Receive New Messages : {}", Thread.currentThread().getName());
            /*try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }*/
            LOGGER.info("success");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    /*
     *  Launch the consumer instance.
     */
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

前置设置

  1. 指定namesrvAddr地址
consumer.setNamesrvAddr("127.0.0.1:9876");
  1. 指定消费从哪里开始
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  • CONSUME_FROM_LAST_OFFSET 从最新的消息开始消费
  • CONSUME_FROM_FIRST_OFFSET 从最新的位点开始消费
  • CONSUME_FROM_TIMESTAMP 从指定的时间戳开始消费
  1. 指定负载均衡策略
  • AllocateMessageQueueAveragely:平均连续分配算法。
  • AllocateMessageQueueAveragelyByCircle:平均轮流分配算法。
  • AllocateMachineRoomNearby:机房内优先就近分配。
  • AllocateMessageQueueByConfig:手动指定,这个通常需要配合配置中心,在消费者启动时,首先先创建 AllocateMessageQueueByConfig 对象,然后根据配置中心的配置,再根据当前的队列信息,进行分配,即该方法不具备队列的自动负载,在 Broker 端进行队列扩容时,无法自动感知,需要手动变更配置。
// AllocateMessageQueueByConfig
AllocateMessageQueueByConfig allocateMessageQueueByConfig = new AllocateMessageQueueByConfig();
        MessageQueue messageQueue = new MessageQueue();
        messageQueue.setBrokerName("broker-a");
        messageQueue.setQueueId(2);
        messageQueue.setTopic("TopicTest");
allocateMessageQueueByConfig.setMessageQueueList(Collections.singletonList(messageQueue));
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig);
  • AllocateMessageQueueByMachineRoom:消费指定机房中的队列,该分配算法首先需要调用该策略的 setConsumeridcs(Set consumerIdCs) 方法,用于设置需要消费的机房,将刷选出来的消息按平均连续分配算法进行队列负载。
// AllocateMessageQueueByMachineRoom    
AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new AllocateMachineRoomNearby.MachineRoomResolver() {
            // Broker部署
            @Override
            public String brokerDeployIn(MessageQueue messageQueue) {
                System.out.println(messageQueue.getBrokerName().split("-")[0]);
                return messageQueue.getBrokerName().split("-")[0];
            }
            // 消费端部署
            @Override
            public String consumerDeployIn(String clientID) {
                    System.out.println(clientID.split("-")[0]);
                return clientID.split("-")[0];
            }
        };
consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely(), machineRoomResolver));
  1. 设置监听主题以及过滤条件

Tag过滤,用于对某个Topic下的消息进行分类,

消息发送到名称为TopicTest999的Topic中,被各个不同的系统所订阅,我们可以利用Tag来区分

consumer.subscribe("TopicTest999", "order");
consumer.subscribe("TopicTest999", "user");
  1. 注册消息监听器

注册消息监听器的目的就是为了接收消息,RocketMQ本身为我们提供了两种模式

  • 并发消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                LOGGER.info("Receive New Messages : {}", Thread.currentThread().getName());
                /*try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }*/
                LOGGER.info("success");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
  • 顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 2) == 0) {
                    return ConsumeOrderlyStatus.SUCCESS;
                } else if ((this.consumeTimes.get() % 5) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

他们主要的区别继承MessageListener接口的实现

除了这一些重要的参数以外,RocketMQ为我们提供了其他非常丰富的配置,我总结在了下图

网络异常,图片无法展示
|

启动流程源码跟踪

需要注意的是,在配置后我们才能去调用启动方法

1. 设置消费者分组后,DefaultMQPushConsumer调用start()启动消费者

入口:org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start

@Override
public void start() throws MQClientException {
    // step 1 设置消费者分组
    setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    this.defaultMQPushConsumerImpl.start();
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

2. 根据serviceState状态启动消费者,当服务未创建时,才能启动成功

在这里主要做了五件事

  1. 检查核心参数是否都配置了
private void checkConfig() throws MQClientException {
         // 检查消费者组,是否满足条件
        Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());
        if (null == this.defaultMQPushConsumer.getConsumerGroup()) {
            ...
        }
        if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
            ...
        }
        if (null == this.defaultMQPushConsumer.getMessageModel()) {
            ...
        }
        if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) {
            ...
        }
        // allocateMessageQueueStrategy
        // subscription
        // messageListener
        // consumeThreadMin
        // consumeThreadMax
        // consumeConcurrentlyMaxSpan
        // pullThresholdForQueue
        // pullThresholdForTopic
        // pullThresholdSizeForQueue
        // pullInterval
        // consumeMessageBatchMaxSize
        // pullBatchSize

主要是进行了参数配置的校验,如果一些参数设置不合理的,在这里就会抛出异常,终止了消费者服务的启动,这里的配置对后面的使用会产生一定的影响,所以我们在配置的时候需要更加的谨慎

  1. 复制订阅信息,生成重试主题
private void copySubscription() throws MQClientException {
    try {
        Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
        if (sub != null) {
            for (final Map.Entry<String, String> entry : sub.entrySet()) {
                final String topic = entry.getKey();
                final String subString = entry.getValue();
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
                // 更新内部订阅关系
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            }
        }
        if (null == this.messageListenerInner) {
            this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
        }
        // 默认情况下我们是CLUSTERING模式
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                break;
            case CLUSTERING:
                // 创建重试主题
                final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
                // 将重试主题放入订阅关系容器中
                this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                break;
            default:
                break;
        }
    } catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}

获取配置的订阅关系,因为setSubscription()方法已经被作废,subscription都是为空的,在下面他会去维护一个subscriptionInner

protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
        new ConcurrentHashMap<String, SubscriptionData>();

subscriptionInner保存了我们在前置配置的时候插入的订阅关系

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
25天前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
2月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
2月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
1月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
83 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
18天前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
3月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
23天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
63 5
|
18天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
65 9
|
22天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!

相关产品

  • 云消息队列 MQ