一文看懂RocketMQ生产者发送消息源码解析(中)

简介: 一文看懂RocketMQ生产者发送消息源码解析

启动过程



通过单元测试中的代码可以看到,在init()和terminate()这两个测试方法中,分别执行了Producer的start和shutdown方法。

说明RocketMQ Producer是个有状态服务,在发送消息前需要先启动Producer。这个启动过程,实际上就是为了发消息做的准备工作,所以,在分析发消息流程之前,我们需要先理清Producer中维护了哪些状态,在启动过程中,Producer都做了哪些初始化的工作。有了这个基础才能分析其发消息的实现流程。


init()


image.png


  • 创建DefaultMQProducer实例
  • 设置一些参数值
  • 然后调用start。


跟进start方法的实现,继续分析其初始化过程。

  • DefaultMQProducer#start()直接调用DefaultMQProducerImpl#start()

image.png

image.png    


ocketMQ使用一个成员变量serviceState来记录和管理自身的服务状态,这实际上是(State Pattern)设计模式的变体。

状态模式允许一个对象在其内部状态改变时改变它的行为,对象看起来就像是改变了它的类。

与标准的状态模式不同的是,它没有使用状态子类,而是使用分支流程(switch-case)来实现不同状态下的不同行为,在管理比较简单的状态时,使用这种设计会让代码更加简洁。这种模式非常广泛地用于管理有状态的类,推荐你在日常开发中使用。


在设计状态的时候,有两个要点是需要注意的



不仅要设计正常的状态,还要设计中间状态和异常状态,否则,一旦系统出现异常,你的状态就不准确了,很难处理这种异常状态。比如在这段代码中,RUNNING和SHUTDOWN_ALREADY是正常状态,CREATE_JUST是一个中间状态,START_FAILED是一个异常状态。

这些状态之间的转换路径考虑清楚,并在进行状态转换的时候,检查上一个状态是否能转换到下一个状态。

比如这里,只有处于CREATE_JUST态才能转为RUNNING状,可确保这服务一次性,只能启动一次。避免了多次启动服务。



启动过程的实现:


通过一个单例模式(Singleton Pattern)的MQClientManager获取MQClientInstance的实例mQClientFactory,没有则自动创建新的实例

在mQClientFactory中注册自己

启动mQClientFactory

给所有Broker发送心跳。



其中实例mQClientFactory对应的类MQClientInstance是RocketMQ客户端中的顶层类,大多数情况下,可以简单地理解为每个客户端对应类MQClientInstance的一个实例。这个实例维护着客户端的大部分状态信息,以及所有的Producer、Consumer和各种服务的实例,想要学习客户端整体结构的同学可以从分析这个类入手,逐步细化分析下去。


我们进一步分析一下MQClientInstance#start()中的代码:


image.png


DefaultMQProducerImpl:Producer的内部实现类,大部分Producer的业务逻辑,也就是发消息的逻辑,都在这类。

image.png


消息发送过程


接下来我们一起分析Producer发送消息的流程。


在Producer的接口MQProducer中,定义了19个不同参数的发消息的方法,按照发送方式不同可以分成三类:


单向发送(Oneway):发送消息后立即返回,不处理响应,不关心是否发送成功;

同步发送(Sync):发送消息后等待响应;

异步发送(Async):发送消息后立即返回,在提供的回调方法中处理响应。

这三类发送实现基本上是相同的,异步发送稍微有一点儿区别,我们看一下异步发送的实现方法"DefaultMQProducerImpl#send()"(对应源码中的1132行):

@Deprecated
public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout)
    throws MQClientException, RemotingException, InterruptedException {
    final long beginStartTime = System.currentTimeMillis();
    ExecutorService executor = this.getAsyncSenderExecutor();
    try {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeout > costTime) {
                    try {
                        try {
                            sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback,
                                timeout - costTime);
                        } catch (MQBrokerException e) {
                            throw new MQClientException("unknownn exception", e);
                        }
                    } catch (Exception e) {
                        sendCallback.onException(e);
                    }
                } else {
                    sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
                }
            }
        });
    } catch (RejectedExecutionException e) {
        throw new MQClientException("exector rejected ", e);
    }
}

我们可以看到,RocketMQ使用了一个ExecutorService来实现异步发送:使用asyncSenderExecutor的线程池,异步调用方法sendSelectImpl(),继续发送消息的后续工作,当前线程把发送任务提交给asyncSenderExecutor就可以返回了。单向发送和同步发送的实现则是直接在当前线程中调用方法sendSelectImpl()。


我们来继续看方法sendSelectImpl()的实现:

// 省略部分代码
MessageQueue mq = null;
// 选择将消息发送到哪个队列(Queue)中
try {
    List<MessageQueue> messageQueueList =
        mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
    Message userMessage = MessageAccessor.cloneMessage(msg);
    String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
    userMessage.setTopic(userTopic);
    mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
    throw new MQClientException("select message queue throwed exception.", e);
}
// 省略部分代码
// 发送消息
if (mq != null) {
    return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
    throw new MQClientException("select message queue return null.", null);
}
// 省略部分代码

方法sendSelectImpl()中主要的功能就是选定要发送的队列,然后调用方法sendKernelImpl()发送消息。


选择哪个队列发送由MessageQueueSelector#select决定。

RocketMQ使用策略模式解决不同场景下需要使用不同队列选择算法问题。


RocketMQ提供了很多MessageQueueSelector的实现,例如随机选择策略,哈希选择策略和同机房选择策略等


image.pngimage.png

image.png


也可以自己实现选择策略。如果要保证相同key消息的严格顺序,你需要使用哈希选择策略,或提供一个自己实现的选择策略。

再看方法

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 Java Apache
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
360 4
|
消息中间件 测试技术 RocketMQ
消息队列 MQ产品使用合集之在异步发送消息函数sendMessage()中出现了错误,错误代码为-3,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 网络安全 开发工具
消息队列 MQ产品使用合集之使用grpc proxy,生产者心跳并没有发送至Default中,如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 存储 监控
RocketMQ消息重试机制解析!
RocketMQ消息重试机制解析!
1289 1
RocketMQ消息重试机制解析!
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
379 3
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
563 2
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
231 1
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
454 2
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
消息中间件 负载均衡 算法
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
717 2

推荐镜像

更多
  • DNS