前言
上一篇文章,主要介绍了RocketMQ消息发送-请求与响应,了解了消息发送的请求参数和响应结果,今天我们主要来学习默认消息发送者的源码,看看消息发送主要是做了那哪些事情。
思考问题
- 对于过期的请求,发送者是如何处理的?
- MQClientFactory的作用是什么?
默认消息发送者-工作原理
源码入口:org.apache.rocketmq.client.producer.DefaultMQProducer#start
DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("xxx:9876"); producer.start();
- serviceState状态为CREATE_JUST,服务将创建未启动 默认的状态为CREATE_JUST
private ServiceState serviceState = ServiceState.CREATE_JUST;
- 检查配置
入口:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkConfig
主要就是为了检查生产者分组是否符合要求
- 分组不能为空
- 符合正则规则
- 字符串长度不能大于255
- 不能使用默认分组DEFAULT_PRODUCER
private void checkConfig() throws MQClientException { Validators.checkGroup(this.defaultMQProducer.getProducerGroup()); if (null == this.defaultMQProducer.getProducerGroup()) { throw new MQClientException("producerGroup is null", null); } if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) { throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.", null); } }
- 如果分组不是CLIENT_INNER_PRODUCER,则修改生产者InstanceNama为进程ID
入口:org.apache.rocketmq.client.ClientConfig#changeInstanceNameToPID
InstanceNama:客户端实例名称,是客户端标识 CID 的组成部分
public void changeInstanceNameToPID() { if (this.instanceName.equals("DEFAULT")) { this.instanceName = UtilAll.getPid() + "#" + System.nanoTime(); } }
- 生成MQ客户端工厂MQClientFactory
入口: org.apache.rocketmq.client.impl.MQClientManager#getOrCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)
创建MQClientInstance实例,才用单例模式,保证JVM中只存在一个实例
将实例保存进ConcurrentMap
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>();
一个clientId只会对应一个实例,根据客户端ID获取实例,如果未获取到,则初始化一个客户端实例
在这里进行了两次instance的检查,利用恶汉模式,确保只存在一个实例
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); //第二次检查,如果出现重复Key,则添加失败 if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } return instance; }
- 向MQClientInstance注册发送者,方便后续网络请求
入口:org.apache.rocketmq.client.impl.factory.MQClientInstance#registerProducer
将生产者保存进ConcurrentMap
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) { if (null == group || null == producer) { return false; } MQProducerInner prev = this.producerTable.putIfAbsent(group, producer); if (prev != null) { log.warn("the producer group[{}] exist already.", group); return false; } return true; }
- 更改serviceState状态为运行中
this.serviceState = ServiceState.RUNNING;
- 开启定时线程池扫描过期请求
入口:org.apache.rocketmq.client.producer.RequestFutureHolder#startScheduledTask
public synchronized void startScheduledTask(DefaultMQProducerImpl producer) { this.producerSet.add(producer); if (null == scheduledExecutorService) { this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RequestHouseKeepingService")); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { RequestFutureHolder.getInstance().scanExpiredRequest(); } catch (Throwable e) { log.error("scan RequestFutureTable exception", e); } } }, 1000 * 3, 1000, TimeUnit.MILLISECONDS); } }
默认消息发送者的有七个步骤,通过检查配置、生成MQ客户端工厂、注册发送者、开启过期扫描来完成消息发送者的启动。
回答思考的问题
- 对于过期的请求,发送者是如何处理的? 主要通过开启线程池,定期进行过期请求的扫描,然后进行请求的移除
- MQClientFactory的在消息发送者中作用是什么?MQClientFactory就是一个MQClientInstance实例,通过单例模式,生成一个可复用的实例
- 在消息生产者启动的过程中,进行了消息生产者的注册和启动
- Start函数中的MQClientAPIImpl对象用来负责底层消息通信,然后启动pullMessageService和rebalanceService。
- 执行定时任务,获取NameServer地址、更新TopicRoute信息、清理离线的Broker和保存消费者的Offset。
总结
一图总结