前言
我们在使用RocketMQ发送消息时,一般都会使用DefaultMQProducer
,类型的代码如下:
DefaultMQProducer producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr("42.192.50.8:9876"); try { producer.start(); producer.send(new Message("topic", "ping".getBytes(StandardCharsets.UTF_8))); } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } 复制代码
上述代码中,在消息发送之前调用了start()
方法,如果不调用start()
方法,直接发送消息,那么会出现以下报错:
报错消息里面很明显地告知我们,目前这个DefaultMQProducer
状态没有准备好,还不能发送消息。为了一探究竟,我们得去看看start()
里面究竟做了什么操作呢?
一探究竟
我们根据源码一路走下来,可以追踪到DefaultMQProducerImpl.start(final boolean startFactory)
这个方法:
public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } // 创建MQClientInstance this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); // 注册Producer到MQClientInstance中 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); // 启动MQClientInstance实例 if (startFactory) { mQClientFactory.start(); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } 复制代码
上述代码主要做了以下几点:
1.创建MQClientInstance实例;
2.注册Producer到MQClientInstance实例中;
3.启动MQClientInstance实例;
MQClientInstance
实例并不是每次都会创建的,它创建出来也会缓存的MQClientManager
中,不过根据源码来看的话,每次创建Producer
都会对应创建一个新的MQClientInstance
实例,所以一般情况下不建议一个应用服务中重复创建Producer
;
最终start()
方法的关键实现逻辑还是需要进入MQClientInstance.start()
中:
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // 如果namesrv地址为null,那么就需要自己找namesrv地址 if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // 开启一个请求响应渠道,没猜错的话,应该是netty实现的 this.mQClientAPIImpl.start(); // 开启定时任务 this.startScheduledTask(); // 开启拉消息服务 this.pullMessageService.start(); // 开启负载均衡服务 this.rebalanceService.start(); // 再开启一个默认生产者,这个生产者不需要启动MQClientInstance实例 this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } } 复制代码
看样子,这才是start()方法真正要做的事情:
1.找namesrv地址,应该是后面需要使用namesrv地址查询对应的
broker
;2.开启Netty客户端的初始化,包括与namesrv建立信道;另外开启两个定时任务,一个清除列表中过期的请求,第二个就是筛选可用的namesrv服务;
3.开启一些定时任务;包括如果没有设置namesrv地址的话,会从指定站点拉namesrv地址;清除下线broker并发送心跳给所有的broker等工作;
4.因为当前是生产者,所以pullMessageService很快就结束;
5.生产者不需要做负载均衡,所以rebalanceService很快也结束;
6.给默认创建的生产者执行一下start()方法,其实啥也没做;
上述大多数任务都是给消费者使用的,作为生产者,唯一起作用的就是前三步,查找namesrv地址、第二步与namesrv建立通信以及第三步对broker的一些定时清理工作;不过没有发生消息之前,是不会从远程获取任何数据的。所以综上所述,start()方法里面只做了以下两件事情:
1.与namesrv建立通信渠道,它甚至都没有从namesrv获取任何数据;
2.启动一些定时任务,包括清理下线的broker;
小结
虽然在生产者中,start()方法里面真正做的事情比较少,但是却是非常有必要的。发送消息之前,我们没有使用start()方法,导致消息发送失败,是因为生产者与namesrv之间的通信渠道没有建立。