【消息中间件】默认RocketMQ消息发送者是如何启动的?

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
性能测试 PTS,5000VUM额度
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
简介: 上一篇文章,主要介绍了RocketMQ消息发送-请求与响应,了解了消息发送的请求参数和响应结果,今天我们主要来学习默认消息发送者的源码,看看消息发送主要是做了那哪些事情。

前言

上一篇文章,主要介绍了RocketMQ消息发送-请求与响应,了解了消息发送的请求参数和响应结果,今天我们主要来学习默认消息发送者的源码,看看消息发送主要是做了那哪些事情。

思考问题

  1. 对于过期的请求,发送者是如何处理的?
  2. MQClientFactory的作用是什么?

默认消息发送者-工作原理

源码入口:org.apache.rocketmq.client.producer.DefaultMQProducer#start

DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("xxx:9876");
producer.start();
  1. serviceState状态为CREATE_JUST,服务将创建未启动 默认的状态为CREATE_JUST
private ServiceState serviceState = ServiceState.CREATE_JUST;
  1. 检查配置

入口:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkConfig

主要就是为了检查生产者分组是否符合要求

  1. 分组不能为空
  2. 符合正则规则
  3. 字符串长度不能大于255
  4. 不能使用默认分组DEFAULT_PRODUCER
private void checkConfig() throws MQClientException {
    Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
    if (null == this.defaultMQProducer.getProducerGroup()) {
        throw new MQClientException("producerGroup is null", null);
    }
    if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
        throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
            null);
    }
}
  1. 如果分组不是CLIENT_INNER_PRODUCER,则修改生产者InstanceNama为进程ID

入口:org.apache.rocketmq.client.ClientConfig#changeInstanceNameToPID

InstanceNama:客户端实例名称,是客户端标识 CID 的组成部分

public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
        this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
    }
}
  1. 生成MQ客户端工厂MQClientFactory

入口: org.apache.rocketmq.client.impl.MQClientManager#getOrCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)

创建MQClientInstance实例,才用单例模式,保证JVM中只存在一个实例

将实例保存进ConcurrentMap

private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
    new ConcurrentHashMap<String, MQClientInstance>();

一个clientId只会对应一个实例,根据客户端ID获取实例,如果未获取到,则初始化一个客户端实例

在这里进行了两次instance的检查,利用恶汉模式,确保只存在一个实例

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    String clientId = clientConfig.buildMQClientId();
    MQClientInstance instance = this.factoryTable.get(clientId);
    if (null == instance) {
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),
                this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        //第二次检查,如果出现重复Key,则添加失败
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }
    return instance;
}
  1. 向MQClientInstance注册发送者,方便后续网络请求

入口:org.apache.rocketmq.client.impl.factory.MQClientInstance#registerProducer

将生产者保存进ConcurrentMap

private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = 
                            new ConcurrentHashMap<String, MQProducerInner>();
public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
    if (null == group || null == producer) {
        return false;
    }
    MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
    if (prev != null) {
        log.warn("the producer group[{}] exist already.", group);
        return false;
    }
    return true;
}
  1. 更改serviceState状态为运行中
this.serviceState = ServiceState.RUNNING;
  1. 开启定时线程池扫描过期请求

入口:org.apache.rocketmq.client.producer.RequestFutureHolder#startScheduledTask

public synchronized void startScheduledTask(DefaultMQProducerImpl producer) {
    this.producerSet.add(producer);
    if (null == scheduledExecutorService) {
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RequestHouseKeepingService"));
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    RequestFutureHolder.getInstance().scanExpiredRequest();
                } catch (Throwable e) {
                    log.error("scan RequestFutureTable exception", e);
                }
            }
        }, 1000 * 3, 1000, TimeUnit.MILLISECONDS);
    }
}

默认消息发送者的有七个步骤,通过检查配置、生成MQ客户端工厂、注册发送者、开启过期扫描来完成消息发送者的启动。

回答思考的问题

  1. 对于过期的请求,发送者是如何处理的? 主要通过开启线程池,定期进行过期请求的扫描,然后进行请求的移除
  2. MQClientFactory的在消息发送者中作用是什么?MQClientFactory就是一个MQClientInstance实例,通过单例模式,生成一个可复用的实例
  1. 在消息生产者启动的过程中,进行了消息生产者的注册和启动
  1. Start函数中的MQClientAPIImpl对象用来负责底层消息通信,然后启动pullMessageService和rebalanceService。
  1. 执行定时任务,获取NameServer地址、更新TopicRoute信息、清理离线的Broker和保存消费者的Offset。

总结

一图总结

网络异常,图片无法展示
|

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
5月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
5月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
3月前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
200 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
4月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
175 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
3月前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
818 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67828 2
3 张图带你彻底理解 RocketMQ 事务消息
|
消息中间件 NoSQL 关系型数据库
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
实战:如何防止mq消费方消息重复消费 如果因为网络延迟等原因,mq无法及时接收到消费方的应答,导致mq重试。(计算机网络)。在重试过程中造成重复消费的问题
2814 1
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
|
消息中间件 Java uml
5张图带你理解 RocketMQ 顺序消息实现机制
5张图带你理解 RocketMQ 顺序消息实现机制
737 1
5张图带你理解 RocketMQ 顺序消息实现机制

相关产品

  • 云消息队列 MQ