【RocketMQ系列七】消费者和生产者的实现细节

简介: 【RocketMQ系列七】消费者和生产者的实现细节

本文首先会介绍消费者的推模式,拉模式,接着会介绍 生产者负载均衡策略。

介绍之前我们首先需要拉取RocketMQ的源码,源码地址是:https://github.com/apache/rocketmq.git

1. 消费者的消费模式

RocketMQ 同时支持消费者的推模式以及拉模式。推模式顾名思义就是broker将消息推送给消费者,拉模式则是消费者主动到队列中拉取消息。默认情况下,RocketMQ使用的是推模式。

在IDEA中导入RocketMQ源码之后,找到 example模块,然后在此模块中找到各种例子。

1.1.推模式

消费者推模式的例子就是 org.apache.rocketmq.example.simple.PushConsumer 。推模式的消费者的实现类是 DefaultMQPushConsumer 。之前的文章已经做了详细介绍,在此就不在赘述了。推模式适合于大部分正常消费的情况

public static final String TOPIC = "TopicTest";
    public static final String CONSUMER_GROUP = "CID_JODIE_1";
    public static final String NAMESRV_ADDR = "127.0.0.1:9876";
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
//        consumer.setNamesrvAddr(NAMESRV_ADDR);
        consumer.subscribe(TOPIC, "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

1.2. 拉模式

消费者拉模式的例子是:org.apache.rocketmq.example.simple.LitePullConsumerAssign 。拉模式主要适用于回溯消费消息。比如:某个消息你消费失败了,你现在想重新消费该消息的情况。我们知道RocketMQ中消息消费完之后不会里面会被删除,默认会在队列中保留48小时。通过broker配置文件中的fileReservedTime参数进行设置。

//1.创建DefaultLitePullConsumer实例
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
        litePullConsumer.setAutoCommit(false);
        //2.启动litePullConsumer实例
        litePullConsumer.start();
        //3.获取TopicTest主题下所有的队列
        Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
        List<MessageQueue> list = new ArrayList<>(mqSet);
        List<MessageQueue> assignList = new ArrayList<>();
        for (int i = 0; i < list.size() / 2; i++) {
            assignList.add(list.get(i));
        }
        //4.消费者需要拉取的队列的集合
        litePullConsumer.assign(assignList);
        //5.消费者需要定位,哪个队列,多少偏移量的消息。
        litePullConsumer.seek(assignList.get(0), 10);
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s %n", messageExts);
                litePullConsumer.commit();
            }
        } finally {
            litePullConsumer.shutdown();
        }

2. 生产者负载均衡策略

我们都知道一个主题下会有多个消息队列(MessageQueue),那么,生产者在发送消息的时候如何选择消息队列呢?

首先找到生产者的示例代码类:org.apache.rocketmq.example.simple.Producer。在该类中找到发送消息的方法 producer.send(msg)

接着找到发送消息的默认实现方法 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

在此方法中可以找到 selectOneMessageQueue 方法,从方法名可以知道此方法就是用来选出一个MessageQueue的

//省略部分代码
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex);
                //省略部分代码

在selectOneMessageQueue方法中通过调用 tpInfo.selectOneMessageQueue 方法来获取

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {
        BrokerFilter brokerFilter = threadBrokerFilter.get();
        brokerFilter.setLastBrokerName(lastBrokerName);
        if (this.sendLatencyFaultEnable) {
            if (resetIndex) {
                tpInfo.resetIndex();
            }
            MessageQueue mq = tpInfo.selectOneMessageQueue(availableFilter, brokerFilter);
            if (mq != null) {
                return mq;
            }
            mq = tpInfo.selectOneMessageQueue(reachableFilter, brokerFilter);
            if (mq != null) {
                return mq;
            }
            return tpInfo.selectOneMessageQueue();
        }
        MessageQueue mq = tpInfo.selectOneMessageQueue(brokerFilter);
        if (mq != null) {
            return mq;
        }
        return tpInfo.selectOneMessageQueue();
    }

那么最终的实现逻辑就是在 org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue 方法中了。我们可以查看此方法。

private MessageQueue selectOneMessageQueue(List<MessageQueue> messageQueueList, ThreadLocalIndex sendQueue, QueueFilter ...filter) {
       //省略非核心代码
        if (filter != null && filter.length != 0) {
            for (int i = 0; i < messageQueueList.size(); i++) {
                int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());
                MessageQueue mq = messageQueueList.get(index);
                boolean filterResult = true;
                for (QueueFilter f: filter) {
                    Preconditions.checkNotNull(f);
                    filterResult &= f.filter(mq);
                }
                if (filterResult) {
                    return mq;
                }
            }
            return null;
        }

这里的核心代码就是下面这句代码:

int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());

首先通过 sendQueue.incrementAndGet() 方法获取当前线程下index值。然后对该主题下所有的队列数进行求模取余。也就是说RocketMQ默认会采取轮询的方式选择消息队列 接着我们来看下该方法的实现。

private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<>();
    private final Random random = new Random();
    public int incrementAndGet() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            index = random.nextInt();
        }
        this.threadLocalIndex.set(++index);
        return index & POSITIVE_MASK;
    }

首先从线程本地变量 threadLocalIndex 中获取索引值index,如果没有的话则随机取一个值。然后将取到index中进行加一操作放回threadLocalIndex中。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5天前
|
消息中间件 网络安全 开发工具
消息队列 MQ产品使用合集之使用grpc proxy,生产者心跳并没有发送至Default中,如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2天前
|
消息中间件 负载均衡 算法
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
10 2
|
16天前
|
消息中间件 监控 网络安全
在RocketMQ中,生产者提交数据导致连接不上问题
【6月更文挑战第19天】在RocketMQ中,生产者提交数据导致连接不上问题
55 1
|
5天前
|
消息中间件 网络协议 物联网
消息队列 MQ产品使用合集之如何让消费者不从最开始进行消费,而是从最后一条消息开始消费
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5天前
|
消息中间件 运维 Apache
消息队列 MQ产品使用合集之消费者在消费完成后没有关闭链接,导致连接数达到上限,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 Java RocketMQ
MQ产品使用合集之在同一个 Java 进程内建立三个消费对象并设置三个消费者组订阅同一主题和标签的情况下,是否会发生其中一个消费者组无法接收到消息的现象
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
87 0
RabbitMQ入门指南(九):消费者可靠性
|
2月前
|
消息中间件 Java 调度
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
30 1
|
2月前
|
消息中间件 RocketMQ
RocketMq消费者/生产者配置
RocketMq消费者/生产者配置
|
5天前
|
消息中间件 测试技术 RocketMQ
消息队列 MQ产品使用合集之在异步发送消息函数sendMessage()中出现了错误,错误代码为-3,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。