本文沿着RocketMQ消息轨迹-设计篇的思路,从如下3个方面对其源码进行解读:
- 发送消息轨迹
- 消息轨迹格式
- 存储消息轨迹数据
发送消息轨迹流程
首先我们来看一下在消息发送端如何启用消息轨迹,示例代码如下:
1public class TraceProducer { 2 public static void main(String[] args) throws MQClientException, InterruptedException { 3 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true); // @1 4 producer.setNamesrvAddr("127.0.0.1:9876"); 5 producer.start(); 6 for (int i = 0; i < 10; i++) 7 try { 8 { 9 Message msg = new Message("TopicTest", 10 "TagA", 11 "OrderID188", 12 "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); 13 SendResult sendResult = producer.send(msg); 14 System.out.printf("%s%n", sendResult); 15 } 16 17 } catch (Exception e) { 18 e.printStackTrace(); 19 } 20 producer.shutdown(); 21 } 22}
从上述代码可以看出其关键点是在创建DefaultMQProducer时指定开启消息轨迹跟踪。我们不妨浏览一下DefaultMQProducer与启用消息轨迹相关的构造函数:
1public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) 2public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) 3
参数如下:
- String producerGroup
生产者所属组名。 - boolean enableMsgTrace
是否开启跟踪消息轨迹,默认为false。 - String customizedTraceTopic
如果开启消息轨迹跟踪,用来存储消息轨迹数据所属的主题名称,默认为:RMQ_SYS_TRACE_TOPIC。
1.1 DefaultMQProducer构造函数
1public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) { // @1 2 this.producerGroup = producerGroup; 3 defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); 4 //if client open the message trace feature 5 if (enableMsgTrace) { // @2 6 try { 7 AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); 8 dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); 9 traceDispatcher = dispatcher; 10 this.getDefaultMQProducerImpl().registerSendMessageHook( 11 new SendMessageTraceHookImpl(traceDispatcher)); // @3 12 } catch (Throwable e) { 13 log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); 14 } 15 } 16}
代码@1:首先介绍一下其局部变量。
- String producerGroup
生产者所属组。 - RPCHook rpcHook
生产者发送钩子函数。 - boolean enableMsgTrace
是否开启消息轨迹跟踪。 - String customizedTraceTopic
定制用于存储消息轨迹的数据。
代码@2:用来构建AsyncTraceDispatcher,看其名:异步转发消息轨迹数据,稍后重点关注。
代码@3:构建SendMessageTraceHookImpl对象,并使用AsyncTraceDispatcher用来异步转发。
1.2 SendMessageTraceHookImpl钩子函数
1、SendMessageTraceHookImpl类图
- SendMessageHook
消息发送钩子函数,用于在消息发送之前、发送之后执行一定的业务逻辑,是记录消息轨迹的最佳扩展点。 - TraceDispatcher
消息轨迹转发处理器,其默认实现类AsyncTraceDispatcher,异步实现消息轨迹数据的发送。下面对其属性做一个简单的介绍:
- int queueSize
异步转发,队列长度,默认为2048,当前版本不能修改。 - int batchSize
批量消息条数,消息轨迹一次消息发送请求包含的数据条数,默认为100,当前版本不能修改。 - int maxMsgSize
消息轨迹一次发送的最大消息大小,默认为128K,当前版本不能修改。 - DefaultMQProducer traceProducer
用来发送消息轨迹的消息发送者。 - ThreadPoolExecutor traceExecuter
线程池,用来异步执行消息发送。 - AtomicLong discardCount
记录丢弃的消息个数。 - Thread worker
woker线程,主要负责从追加队列中获取一批待发送的消息轨迹数据,提交到线程池中执行。 - ArrayBlockingQueue< TraceContext> traceContextQueue
消息轨迹TraceContext队列,用来存放待发送到服务端的消息。 - ArrayBlockingQueue< Runnable> appenderQueue
线程池内部队列,默认长度1024。 - DefaultMQPushConsumerImpl hostConsumer
消费者信息,记录消息消费时的轨迹信息。 - String traceTopicName
用于跟踪消息轨迹的topic名称。
2、 SendMessageTraceHookImpl
sendMessageBefore
1public void sendMessageBefore(SendMessageContext context) { 2 //if it is message trace data,then it doesn't recorded 3 if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) { // @1 4 return; 5 } 6 //build the context content of TuxeTraceContext 7 TraceContext tuxeContext = new TraceContext(); 8 tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1)); 9 context.setMqTraceContext(tuxeContext); 10 tuxeContext.setTraceType(TraceType.Pub); 11 tuxeContext.setGroupName(context.getProducerGroup()); // @2 12 //build the data bean object of message trace 13 TraceBean traceBean = new TraceBean(); // @3 14 traceBean.setTopic(context.getMessage().getTopic()); 15 traceBean.setTags(context.getMessage().getTags()); 16 traceBean.setKeys(context.getMessage().getKeys()); 17 traceBean.setStoreHost(context.getBrokerAddr()); 18 traceBean.setBodyLength(context.getMessage().getBody().length); 19 traceBean.setMsgType(context.getMsgType()); 20 tuxeContext.getTraceBeans().add(traceBean); 21}
代码@1:如果topic主题为消息轨迹的Topic,直接返回。
代码@2:在消息发送上下文中,设置用来跟踪消息轨迹的上下环境,里面主要包含一个TraceBean集合、追踪类型(TraceType.Pub)与生产者所属的组。
代码@3:构建一条跟踪消息,用TraceBean来表示,记录原消息的topic、tags、keys、发送到broker地址、消息体长度等消息。
从上文看出,sendMessageBefore主要的用途就是在消息发送的时候,先准备一部分消息跟踪日志,存储在发送上下文环境中,此时并不会发送消息轨迹数据。
sendMessageAfter
1public void sendMessageAfter(SendMessageContext context) { 2 //if it is message trace data,then it doesn't recorded 3 if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName()) // @1 4 || context.getMqTraceContext() == null) { 5 return; 6 } 7 if (context.getSendResult() == null) { 8 return; 9 } 10 11 if (context.getSendResult().getRegionId() == null 12 || !context.getSendResult().isTraceOn()) { 13 // if switch is false,skip it 14 return; 15 } 16 17 TraceContext tuxeContext = (TraceContext) context.getMqTraceContext(); 18 TraceBean traceBean = tuxeContext.getTraceBeans().get(0); // @2 19 int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size()); // @3 20 tuxeContext.setCostTime(costTime); // @4 21 if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) { 22 tuxeContext.setSuccess(true); 23 } else { 24 tuxeContext.setSuccess(false); 25 } 26 tuxeContext.setRegionId(context.getSendResult().getRegionId()); 27 traceBean.setMsgId(context.getSendResult().getMsgId()); 28 traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId()); 29 traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2); 30 localDispatcher.append(tuxeContext); // @5 31}
代码@1:如果topic主题为消息轨迹的Topic,直接返回。
代码@2:从MqTraceContext中获取跟踪的TraceBean,虽然设计成List结构体,但在消息发送场景,这里的数据永远只有一条,即使是批量发送也不例外。
代码@3:获取消息发送耗时。
代码@4:设置costTime(耗时)、success(是否发送成功)、regionId(发送到broker所在的分区)、msgId(消息ID,全局唯一)、offsetMsgId(消息物理偏移量,如果是批量消息,则是最后一条消息的物理偏移量)、storeTime,这里使用的是(客户端发送时间 + 二分之一的耗时)来表示消息的存储时间,这里是一个估值。
代码@5:将需要跟踪的信息通过TraceDispatcher转发到Broker服务器。其代码如下:
1public boolean append(final Object ctx) { 2 boolean result = traceContextQueue.offer((TraceContext) ctx); 3 if (!result) { 4 log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx); 5 } 6 return result; 7}
这里一个非常关键的点是offer方法的使用,当队列无法容纳新的元素时会立即返回false,并不会阻塞。
接下来将目光转向TraceDispatcher的实现。
1.3 TraceDispatcher实现原理
TraceDispatcher,用于客户端消息轨迹数据转发到Broker,其默认实现类:AsyncTraceDispatcher。
TraceDispatcher构造函数
1public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException { 2 // queueSize is greater than or equal to the n power of 2 of value 3 this.queueSize = 2048; 4 this.batchSize = 100; 5 this.maxMsgSize = 128000; 6 this.discardCount = new AtomicLong(0L); 7 this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024); 8 this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize); 9 if (!UtilAll.isBlank(traceTopicName)) { 10 this.traceTopicName = traceTopicName; 11 } else { 12 this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; 13 } // @1 14 this.traceExecuter = new ThreadPoolExecutor(// : 15 10, // 16 20, // 17 1000 * 60, // 18 TimeUnit.MILLISECONDS, // 19 this.appenderQueue, // 20 new ThreadFactoryImpl("MQTraceSendThread_")); 21 traceProducer = getAndCreateTraceProducer(rpcHook); // @2 22}
代码@1:初始化核心属性,该版本这些值都是“固化”的,用户无法修改。
- queueSize
队列长度,默认为2048,异步线程池能够积压的消息轨迹数量。 - batchSize
一次向Broker批量发送的消息条数,默认为100. - maxMsgSize
向Broker汇报消息轨迹时,消息体的总大小不能超过该值,默认为128k。 - discardCount
整个运行过程中,丢弃的消息轨迹数据,这里要说明一点的是,如果消息TPS发送过大,异步转发线程处理不过来时,会主动丢弃消息轨迹数据。 - traceContextQueue
traceContext积压队列,客户端(消息发送、消息消费者)在收到处理结果后,将消息轨迹提交到噶队列中,则会立即返回。 - appenderQueue
提交到Broker线程池中队列。 - traceTopicName
用于接收消息轨迹的Topic,默认为RMQ_SYS_TRANS_HALF_TOPIC。 - traceExecuter
用于发送到Broker服务的异步线程池,核心线程数默认为10,最大线程池为20,队列堆积长度2048,线程名称:MQTraceSendThread_。、 - traceProducer
发送消息轨迹的Producer。
代码@2:调用getAndCreateTraceProducer方法创建用于发送消息轨迹的Producer(消息发送者),下面详细介绍一下其实现。
getAndCreateTraceProducer详解
1private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) { 2 DefaultMQProducer traceProducerInstance = this.traceProducer; 3 if (traceProducerInstance == null) { //@1 4 traceProducerInstance = new DefaultMQProducer(rpcHook); 5 traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME); 6 traceProducerInstance.setSendMsgTimeout(5000); 7 traceProducerInstance.setVipChannelEnabled(false); 8 // The max size of message is 128K 9 traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000); 10 } 11 return traceProducerInstance; 12 }
代码@1:如果还未建立发送者,则创建用于发送消息轨迹的消息发送者,其GroupName为:_INNER_TRACE_PRODUCER,消息发送超时时间5s,最大允许发送消息大小118K。
start
1public void start(String nameSrvAddr) throws MQClientException { 2 if (isStarted.compareAndSet(false, true)) { // @1 3 traceProducer.setNamesrvAddr(nameSrvAddr); 4 traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr); 5 traceProducer.start(); 6 } 7 this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); // @2 8 this.worker.setDaemon(true); 9 this.worker.start(); 10 this.registerShutDownHook(); 11}
开始启动,其调用的时机为启动DefaultMQProducer时,如果启用跟踪消息轨迹,则调用之。
代码@1:如果用于发送消息轨迹的发送者没有启动,则设置nameserver地址,并启动着。
代码@2:启动一个线程,用于执行AsyncRunnable任务,接下来将重点介绍。
AsyncRunnable
1class AsyncRunnable implements Runnable { 2 private boolean stopped; 3 public void run() { 4 while (!stopped) { 5 List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize); // @1 6 for (int i = 0; i < batchSize; i++) { 7 TraceContext context = null; 8 try { 9 //get trace data element from blocking Queue — traceContextQueue 10 context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); // @2 11 } catch (InterruptedException e) { 12 } 13 if (context != null) { 14 contexts.add(context); 15 } else { 16 break; 17 } 18 } 19 if (contexts.size() > 0) { : 20 AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); // @3 21 traceExecuter.submit(request); 22 } else if (AsyncTraceDispatcher.this.stopped) { 23 this.stopped = true; 24 } 25 } 26 } 27}
代码@1:构建待提交消息跟踪Bean,每次最多发送batchSize,默认为100条。
代码@2:从traceContextQueue中取出一个待提交的TraceContext,设置超时时间为5s,即如何该队列中没有待提交的TraceContext,则最多等待5s。
代码@3:向线程池中提交任务AsyncAppenderRequest。
AsyncAppenderRequest#sendTraceData
1public void sendTraceData(List<TraceContext> contextList) { 2 Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>(); 3 for (TraceContext context : contextList) { //@1 4 if (context.getTraceBeans().isEmpty()) { 5 continue; 6 } 7 // Topic value corresponding to original message entity content 8 String topic = context.getTraceBeans().get(0).getTopic(); // @2 9 // Use original message entity's topic as key 10 String key = topic; 11 List<TraceTransferBean> transBeanList = transBeanMap.get(key); 12 if (transBeanList == null) { 13 transBeanList = new ArrayList<TraceTransferBean>(); 14 transBeanMap.put(key, transBeanList); 15 } 16 TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context); // @3 17 transBeanList.add(traceData); 18 } 19 for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) { // @4 20 flushData(entry.getValue()); 21 } 22}
代码@1:遍历收集的消息轨迹数据。
代码@2:获取存储消息轨迹的Topic。
代码@3:对TraceContext进行编码,这里是消息轨迹的传输数据,稍后对其详细看一下,了解其上传的格式。
代码@4:将编码后的数据发送到Broker服务器。
TraceDataEncoder#encoderFromContextBean
根据消息轨迹跟踪类型,其格式会有一些不一样,下面分别来介绍其合适。
PUB(消息发送)
1case Pub: { 2 TraceBean bean = ctx.getTraceBeans().get(0); 3 //append the content of context and traceBean to transferBean's TransData 4 sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// 5 .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)// 6 .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)// 7 .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)// 8 .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)// 9 .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// 10 .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)// 11 .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// 12 .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)// 13 .append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)// 14 .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)// 15 .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)// 16 .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)// 17 .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR); 18}
消息轨迹数据的协议使用字符串拼接,字段的分隔符号为1,整个数据以2结尾,感觉这个设计还是有点“不可思议”,为什么不直接使用json协议呢?
SubBefore(消息消费之前)
1for (TraceBean bean : ctx.getTraceBeans()) { 2 sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// 3 .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)// 4 .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)// 5 .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)// 6 .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)// 7 .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// 8 .append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)// 9 .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);// 10 } 11}
轨迹就是按照上述顺序拼接而成,各个字段使用1分隔,每一条记录使用2结尾。
SubAfter(消息消费后)
1case SubAfter: { 2 for (TraceBean bean : ctx.getTraceBeans()) { 3 sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// 4 .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)// 5 .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// 6 .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)// 7 .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)// 8 .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// 9 .append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR); 10 } 11 } 12}
格式编码一样,就不重复多说。
经过上面的源码跟踪,消息发送端的消息轨迹跟踪流程、消息轨迹数据编码协议就清晰了,接下来我们使用一张序列图来结束本部分的讲解。
其实行文至此,只关注了消息发送的消息轨迹跟踪,消息消费的轨迹跟踪又是如何呢?其实现原理其实是一样的,就是在消息消费前后执行特定的钩子函数,其实现类为ConsumeMessageTraceHookImpl,由于其实现与消息发送的思路类似,故就不详细介绍了。
消息轨迹数据如何存储
其实从上面的分析,我们已经得知,RocketMQ的消息轨迹数据存储在到Broker上,那消息轨迹的主题名如何指定?其路由信息又怎么分配才好呢?是每台Broker上都创建还是只在其中某台上创建呢?RocketMQ支持系统默认与自定义消息轨迹的主题。
2.1 使用系统默认的主题名称
RocketMQ默认的消息轨迹主题为:RMQ_SYS_TRACE_TOPIC,那该Topic需要手工创建吗?其路由信息呢?
1{ 2 if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) { // @1 3 String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName(); 4 TopicConfig topicConfig = new TopicConfig(topic); 5 this.systemTopicList.add(topic); 6 topicConfig.setReadQueueNums(1); // @2 7 topicConfig.setWriteQueueNums(1); 8 this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); 9 } 10}
上述代码出自TopicConfigManager的构造函数,在Broker启动的时候会创建topicConfigManager对象,用来管理topic的路由信息。
代码@1:如果Broker开启了消息轨迹跟踪(traceTopicEnable=true)时,会自动创建默认消息轨迹的topic路由信息,注意其读写队列数为1。
2.2 用户自定义消息轨迹主题
在创建消息发送者、消息消费者时,可以显示的指定消息轨迹的Topic,例如:
1public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) 2 3public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, 4 AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) 5
通过customizedTraceTopic来指定消息轨迹Topic。
温馨提示:通常在生产环境上,不会开启自动创建主题,故需要RocketMQ运维管理人员提前创建好Topic。
好了,本文就介绍到这里了,详细介绍了RocktMQ消息轨迹的实现原理,下一篇,我们将进入到多副本的学习中。