生产者启动流程|学习笔记

简介: 快速学习生产者启动流程

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)生产者启动流程】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12470


生产者启动流程

 

启动流程

image.png

生产者启动入口是DefaultMQProducer,它里面有个start方法:

public void start() throws MQclientException {

this.setProducerGroup(withNamespace(this.producerGroup));

this.defaultMQProducerImpl.start();

if (null != traceDispatcher) {

try {

traceDispatcher.start(this.getNamesrvAddr(), this.getAccesschannel());

catch (MQclientException e) {

log.warn( "trace dispatcher start failed ", e);

}

}

}

通过start方法启动生产者,首先是设置当前生产者的组名,真正的启动的业务逻辑是在defaultMQProducerImpl类中完成的,这个类在创建DefaultMQProducer时已经完成实例化

public DefaultwQProducer(final string producerGroup,RPCHook rpcHook, boolean enableMsgTrace

final string customizedTraceTopic) {

this.producerGroup = producerGroup;

defaultMQProducerImpl = new pefaultMOProducerImp

(defaultMQProducer: this,rpcHook);

真正的启动流程是在defaultMQProducerImpl的start方法中:

//检查生产者组是否满足要求

this.checkconfig(;

//更改当前instanceName为进程IDif(!this.defau7tMQProducer.getProducerGroup().equa1s(Nix411.CLTENT_INNER_PRODUCER_GROUP)){

this.defau1tMQProducer.changeInstanceNameToPID();

}

//获得MQ客户端实例(获取MQ客户端管理器,通过MQClientManger获取客户端示例)

this.mQclientFactory =

MQc1ientManager.getInstance(). getAndcreateMQc1ientInstance(this.defaultNQProducer,rpcHook);

在check.config中完成生产者组名校验工作:

private void checkconfig() throws MQclientException {validators.checkGroup(this.defaultMQProducer.getProducerGroup());

//组名不能为空,组名不能和默认组名相同

if (null == this.defaultMQProducer.getProducerGroup()) {

throw new MQclientException("producerGroup is null" , null);

}

If (this.defaultMQProducer.getProducerGroup( ) .equal

s(NixAll.DEFAULT_PRODUCER_GROUP)){

throw new MCclientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GRO!

null);

}

}

说明:

整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表

ConcurrentMap<String/* clientld */,MQClientInstance> factoryTable = new

ConcurrentHashMap<String,MQClientlnstance>();

同一个clientld只会创建一个MQClientInstance。

MQClientinstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道;包括和MQ进行交互时主要通过MQClientInstance向外发送请求。

代码:MQClientManager#getAndCreateMQClientInstan

Ce

pub7ic MoclientInstance getAndcreateMQclientInstance(final clientconfig clientconfig, RPCHook rpcHook) {

//构建客户端ID

string c1ientId = clientconfig.bui1dMQc1ientId(;

//根据客户端ID或者客户端实例

MQclientInstance instance = this.factoryTable.get(clientId);

//实例如果为空就创建新的实例,并添加到实例表中

if (nu17 == instance) {

instance =

new MQclientInstance(clientconfig.cloneclientconfig(),

this.factoryIndexGenerator.getAndIncrement(,clientId,rpcHook);

MQclientInstance prev = this.factoryTab1e.putIfAbsent(clientId,instance);

if (prev != null) {

instance = prev;

1og.warn("Returned Previous MQclientInstance for clientId:[i]",clientId);

}else {

log.info("created new MoclientInstance for clientId:[0]",clientId);

}

}

return instance;

}

拿到客户端实例之后,将当前生产者注册到客户端实例中,注册完成之后,客户端实例就启动了,下面是启动过程。

//注册当前生产者到MQClientInstance管理中,方便后续调用网路请求

boolean registerOK = mClientFactory.registerProducer

(this.defaultMQProducer.getProducerGroup()producer.this);

if ( !registeroK) {

this.servicestate = servicestate.CREATE_JUST;

throw new MQclientException("The producer group["+ this.defaultMQProducer.getProducerGroup()

+ "] has been created before, specify another name please."+FAQUrl.suggestTodo(FAQUr1.GRoup_NAME_DUPLICATEnull);this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

//启动生产者

if (startFactory) {

mQclientFactory . start();

}

总结:真正的producer启动过程是在DefaultMQProducerImpl的start方法中完成的,首先是检查生产者组是否符合要求,然后将实例名称改为进程ID、获取MQ客户端实例,得到客户端实例后将当前生产者注册到客户端实例中去启动生产者。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 负载均衡
5 张图带你理解 RocketMQ 消费者启动过程
5 张图带你理解 RocketMQ 消费者启动过程
246 0
5 张图带你理解 RocketMQ 消费者启动过程
|
12月前
|
消息中间件 存储 网络协议
RibbitMQ学习笔记之MQ 的相关概念
RibbitMQ学习笔记之MQ 的相关概念
64 0
|
消息中间件 存储 算法
RocketMQ消费者启动流程
问题 消费者启动的时候,去哪拿的消息呢?
153 0
RocketMQ消费者启动流程
|
消息中间件 缓存 负载均衡
RocketMQ中生产者发消息前为啥一定要调用start()方法?
RocketMQ中生产者发消息前为啥一定要调用start()方法?
161 0
RocketMQ中生产者发消息前为啥一定要调用start()方法?
|
存储 消息中间件 负载均衡
消息消费启动流程|学习笔记
快速学习消息消费启动流程
80 0
消息消费启动流程|学习笔记
|
消息中间件 存储 RocketMQ
生产者核心类介绍|学习笔记
快速学习生产者核心类介绍
68 0
生产者核心类介绍|学习笔记
|
前端开发 Java Nacos
服务生产者 | 学习笔记
快速学习服务生产者。
67 0
服务生产者 | 学习笔记
|
消息中间件 RocketMQ 开发者
SCS 介绍及 RocketMQ Binder 的基本实现原理 | 学习笔记
快速学习 SCS 介绍及 RocketMQ Binder 的基本实现原理,介绍了 SCS 介绍及 RocketMQ Binder 的基本实现原理系统机制, 以及在实际应用过程中如何使用。
161 0
SCS 介绍及 RocketMQ Binder 的基本实现原理 | 学习笔记
|
Java Maven Nacos
创建生产者实现服务的注册 | 学习笔记
快速学习 创建生产者实现服务的注册
52 0
|
消息中间件 存储 Java
RocketMQ 消费者启动流程
RocketMQ 消费者启动流程
413 0
RocketMQ 消费者启动流程