开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):生产者启动流程】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12470
生产者启动流程
启动流程
生产者启动入口是DefaultMQProducer,它里面有个start方法:
public void start() throws MQclientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccesschannel());
catch (MQclientException e) {
log.warn( "trace dispatcher start failed ", e);
}
}
}
通过start方法启动生产者,首先是设置当前生产者的组名,真正的启动的业务逻辑是在defaultMQProducerImpl类中完成的,这个类在创建DefaultMQProducer时已经完成实例化
public DefaultwQProducer(final string producerGroup,RPCHook rpcHook, boolean enableMsgTrace
final string customizedTraceTopic) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new pefaultMOProducerImp
(defaultMQProducer: this,rpcHook);
真正的启动流程是在defaultMQProducerImpl的start方法中:
//检查生产者组是否满足要求
this.checkconfig(;
//更改当前instanceName为进程IDif(!this.defau7tMQProducer.getProducerGroup().equa1s(Nix411.CLTENT_INNER_PRODUCER_GROUP)){
this.defau1tMQProducer.changeInstanceNameToPID();
}
//获得MQ客户端实例(获取MQ客户端管理器,通过MQClientManger获取客户端示例)
this.mQclientFactory =
MQc1ientManager.getInstance(). getAndcreateMQc1ientInstance(this.defaultNQProducer,rpcHook);
在check.config中完成生产者组名校验工作:
private void checkconfig() throws MQclientException {
validators.checkGroup(this.defaultMQProducer.getProducerGroup());
//组名不能为空,组名不能和默认组名相同
if (null == this.defaultMQProducer.getProducerGroup()) {
throw new MQclientException("producerGroup is null" , null);
}
If (this.defaultMQProducer.getProducerGroup( ) .equal
s(NixAll.DEFAULT_PRODUCER_GROUP)){
throw new MCclientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GRO!
null);
}
}
说明:
整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表
ConcurrentMap<String/* clientld */,MQClientInstance> factoryTable = new
ConcurrentHashMap<String,MQClientlnstance>();
同一个clientld只会创建一个MQClientInstance。
MQClientinstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道;包括和MQ进行交互时主要通过MQClientInstance向外发送请求。
代码:MQClientManager#getAndCreateMQClientInstan
Ce
pub7ic MoclientInstance getAndcreateMQclientInstance(final clientconfig clientconfig, RPCHook rpcHook) {
//构建客户端ID
string c1ientId = clientconfig.bui1dMQc1ientId(;
//根据客户端ID或者客户端实例
MQclientInstance instance = this.factoryTable.get(clientId);
//实例如果为空就创建新的实例,并添加到实例表中
if (nu17 == instance) {
instance =
new MQclientInstance(clientconfig.cloneclientconfig(),
this.factoryIndexGenerator.getAndIncrement(,clientId,rpcHook);
MQclientInstance prev = this.factoryTab1e.putIfAbsent(clientId,instance);
if (prev != null) {
instance = prev;
1og.warn("Returned Previous MQclientInstance for clientId:[i]",clientId);
}else {
log.info("created new MoclientInstance for clientId:[0]",clientId);
}
}
return instance;
}
拿到客户端实例之后,将当前生产者注册到客户端实例中,注册完成之后,客户端实例就启动了,下面是启动过程。
//注册当前生产者到MQClientInstance管理中,方便后续调用网路请求
boolean registerOK = mClientFactory.registerProducer
(this.defaultMQProducer.getProducerGroup()producer.this);
if ( !registeroK) {
this.servicestate = servicestate.CREATE_JUST;
throw new MQclientException("The producer group["+ this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please."+FAQUrl.suggestTodo(FAQUr1.GRoup_NAME_DUPLICATEnull);
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
//启动生产者
if (startFactory) {
mQclientFactory . start();
}
总结:真正的producer启动过程是在DefaultMQProducerImpl的start方法中完成的,首先是检查生产者组是否符合要求,然后将实例名称改为进程ID、获取MQ客户端实例,得到客户端实例后将当前生产者注册到客户端实例中去启动生产者。