引言
前面章节介绍了RocketMQ的路由管理模块,它所解决的问题是如何让消息的发 送者以及消费者找到正确的地址信息。本章主要阐述消息在发送之前,如何进行客户端启动的。
- 消息格式
- 启动生产者
- 总结
一、消息格式
要想弄清楚消息发送的过程,我们需要了解消息的格式是怎样的。在common模块中对message进行了定义,如下所示:
以下为message的一个全属性构造函数:
public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) { this.topic = topic; this.flag = flag; this.body = body; if (tags != null && tags.length() > 0) this.setTags(tags); if (keys != null && keys.length() > 0) this.setKeys(keys); this.setWaitStoreMsgOK(waitStoreMsgOK); }
二、启动生产者
消息生产者相关代码都是在client模块中,DefaultMQProducer该类为默认的消息生产者实现类,它继承了ClientConfig 同时实现了MQProducer 接口。对于RocketMQ,生产者就是发送消息的客户端,所以在发送消息前需要初始化对应的实例来进行消息的发送。
public class DefaultMQProducer extends ClientConfig implements MQProducer { ... }
类图关系如下所示:
DefaultMQProducer
的相关属性如下所示:
/** * 生产者所属组 */ private String producerGroup; /** * 默认的topicKey */ private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; /** * 默认主题中的Broker队列的数量 */ private volatile int defaultTopicQueueNums = 4; /** * 消息发送默认超时时间为3s */ private int sendMsgTimeout = 3000; /** * 消息体大小超过默认值则需要进行压缩,此处默认为4K */ private int compressMsgBodyOverHowmuch = 1024 * 4; /** * 同步方式发送消息的重试次数,默认为为2次,总共会执行3次 */ private int retryTimesWhenSendFailed = 2; /** * 异方式发送消息的重试次数,默认为为2次 */ private int retryTimesWhenSendAsyncFailed = 2; /** * 消息重试时选择另外一个Broker,是否不等待存储结果就进行返回,默认为false */ private boolean retryAnotherBrokerWhenNotStoreOK = false; /** * 允许发送消息的最大消息长度,默认为4M */ private int maxMessageSize = 1024 * 1024 * 4; // 4M
接下来,我们来一起看下消息生产者实例到底是怎么进行启动的。首先看下涉及到的主要方法调用如下所示:
我们来具体看下代码实现,首先是
public void start() throws MQClientException { this.defaultMQProducerImpl.start(); if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } } }
后面调用defaultMQProducerImpl中的start方法,具体分析如下所示:
public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; //检查producerGroup是否符合要求 this.checkConfig(); //修改生产者的instanceName为进程的ID if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } //创建MQClientInstance实例 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); //进行注册 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); if (startFactory) { mQClientFactory.start(); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } //想所有的Broker发送心跳 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); }
那么在MQClientManager
中进行clientId
进行生成,源码如如下所示:
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { //创建clientId String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); //维护缓存表,一个clientId对应一个MQClientInstance MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } return instance; }
clientId
的组成如下所示,它的组成为IP
+ instanceName
+ unitName
public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); sb.append("@"); sb.append(this.getInstanceName()); if (!UtilAll.isBlank(this.unitName)) { sb.append("@"); sb.append(this.unitName); } return sb.toString(); }
三、总结
本文主要叙述了消息的结构以及在消息发送之前,客户端启动的流程是怎样的,而客户端实例的创建时消息发送的前提。在下一篇文章中,将介绍具体的消息发送流程。