5 张图带你彻底理解 RocketMQ 轨迹消息

简介: 5 张图带你彻底理解 RocketMQ 轨迹消息

大家好,我是君哥。

为了方便跟踪消息发送和消费的轨迹,RocketMQ 引入了轨迹消息,今天一起来学习一下。

1 开启轨迹消息

默认情况下,RocketMQ 是不开启轨迹消息的,需要我们手工开启。

1.1 Broker

Broker 端开启轨迹消息,需要增加下面的配置:

traceTopicEnable=true

1.2 生产者

对于生产者端,要开启轨迹消息,需要在定义生产者时增加参数。定义消费者使用类 DefaultMQProducer,这个类支持开启轨迹消息的构造函数如下:

public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic)
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic)

从上面的构造函数可以看出,自定义消费者时,不仅可以定义开启轨迹消息,还可以指定轨迹消息发送的 Topic。如果不指定轨迹消息的 Topic,默认发送的 Topic 是 RMQ_SYS_TRACE_TOPIC。

1.3 消费者

对于消费者,要开启轨迹消息,需要在定义消费者时增加参数。定义消费者使用类 DefaultMQPushConsumer,这个类支持开启轨迹消息的构造函数如下:

public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace)
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)

2 生产者处理

首先看一个支持轨迹消息的生产者示例:

DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp, true, "");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

下面是一张生产者端的 UML 类图:

微信图片_20221213115338.png

在 DefaultMQProducer 创建时,会初始化 defaultMQProducerImpl、traceDispatcher 和钩子函数 SendMessageHook。

2.1 生产者初始化

生产者初始化代码如下:

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
 boolean enableMsgTrace, final String customizedTraceTopic) {
 this.namespace = namespace;
 this.producerGroup = producerGroup;
 defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
 if (enableMsgTrace) {
  try {
   AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
   dispatcher.setHostProducer(this.defaultMQProducerImpl);
   traceDispatcher = dispatcher;
   //注册轨迹消息钩子函数
   this.defaultMQProducerImpl.registerSendMessageHook(
    new SendMessageTraceHookImpl(traceDispatcher));
   //省略事务消息的钩子注册
  } catch (Throwable e) {
  }
 }
}

初始化的代码中,传入了是否开启轨迹消息(enableMsgTrace)和自定义轨迹消息的 Topic(customizedTraceTopic),同时初始化了 traceDispatcher 并注册了钩子函数 SendMessageTraceHook。

生产者启动时 defaultMQProducerImpl 和 traceDispatcher 也会启动,代码如下:

public void start() throws MQClientException {
 this.setProducerGroup(withNamespace(this.producerGroup));
 this.defaultMQProducerImpl.start();
 if (null != traceDispatcher) {
  try {
   traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
  } catch (MQClientException e) {
   log.warn("trace dispatcher start failed ", e);
  }
 }
}

2.2 traceDispatcher 启动

生产者初始化的时候初始化了 traceDispatcher。traceDispatcher 是轨迹消息的处理器,AsyncTraceDispatcher 构造函数定义一个专门发送轨迹消息的生产者 traceProducer(DefaultMQProducer 类型)。

注意:traceProducer 发送消息的最大值 maxMessageSize 是 128k,虽然 maxMessageSize 初始值被定义为 4M,但是创建 traceProducer 时赋值 128k。

上面提到,生产者启动时 traceDispatcher 也会启动,看一下它的启动方法:

public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
 if (isStarted.compareAndSet(false, true)) {
  traceProducer.setNamesrvAddr(nameSrvAddr);
  traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
  traceProducer.start();
 }
 this.accessChannel = accessChannel;
 this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
 this.worker.setDaemon(true);
 this.worker.start();
 this.registerShutDownHook();
}

可以看到,traceDispatcher 的启动首先启动了 traceProducer,然后启动了一个异步线程 AsyncRunnable,下面看一下 run 方法:

public void run() {
 while (!stopped) {
     //batchSize=100
  List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
  //traceContextQueue队列长度等于1024
  synchronized (traceContextQueue) {
   for (int i = 0; i < batchSize; i++) {
    TraceContext context = null;
    try {
     //get trace data element from blocking Queue - traceContextQueue
     context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
    }
    if (context != null) {
     contexts.add(context);
    } else {
     break;
    }
   }
   if (contexts.size() > 0) {
    AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
    traceExecutor.submit(request);
   } else if (AsyncTraceDispatcher.this.stopped) {
    this.stopped = true;
   }
  }
 }
}

从上面的代码可以看到,每次从 traceContextQueue 中拉取 100 条 TraceContext,然后通过 AsyncAppenderRequest 异步发送出去。

注意:

  1. 发送轨迹消息时需要组装消息进行批量发送,每次发送的消息大小不超过 128k;
  2. 如果保存轨迹消息的 Broker 有多个,则需要按照轮询的方式依次发送到不同的 Broker 上,具体代码见 AsyncTraceDispatcher 类中的 sendTraceDataByMQ 方法。

2.3 钩子函数

看到这里相信你一定会有一个疑问,traceContextQueue 中的消息是从哪儿来的呢?答案是生产者初始化时定义的 SendMessageTraceHook。

看一下发送消息的代码:

//DefaultMQProducerImpl 类
private SendResult sendKernelImpl(final Message msg,
 final MessageQueue mq,
 final CommunicationMode communicationMode,
 final SendCallback sendCallback,
 final TopicPublishInfo topicPublishInfo,
 final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
 //省略部分代码
 SendMessageContext context = null;
 if (brokerAddr != null) {
  try {
            //省略部分代码
   if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    //1.发送消息前执行钩子函数
    this.executeSendMessageHookBefore(context);
   }
   SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
   //省略requestHeader封装代码
   SendResult sendResult = null;
   //-------------2.这里发送消息-------------
   if (this.hasSendMessageHook()) {
    context.setSendResult(sendResult);
    //3.发送消息后执行钩子函数
    this.executeSendMessageHookAfter(context);
   }
   return sendResult;
  } 
  //catch和finally省略
 }
 throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

由于 sendKernelImpl 代码比较多,我这里只贴了骨架代码。我在上面加了注释,可以看到在发送消息前后都会执行钩子函数。

在发送消息前,通过调用钩子函数封装一个轨迹消息。发送消息后,再通过钩子函数对轨迹消息进行完善,主要加入消息发送结果、发送消息花费时间等属性,然后把轨迹消息加到 traceContextQueue 上。轨迹消息包含的内容如下图:

微信图片_20221213115409.png

轨迹消息的内容比较多,包含了发送消息的详细信息,比如:Topic、Message、MessageQueue、Group、生产者地址(clientHost)、消息发送结果等。

3 Broker 处理

轨迹消息发送到 Broker 后,会保存到 Broker 上,默认保存的 Topic 是 RMQ_SYS_TRACE_TOPIC。Broker 启动时,会自动初始化默认 Topic 的路由配置,代码如下:

//TopicConfigManager 类
if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {
 String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
 TopicConfig topicConfig = new TopicConfig(topic);
 TopicValidator.addSystemTopic(topic);
 topicConfig.setReadQueueNums(1);
 topicConfig.setWriteQueueNums(1);
 this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}

前面提到过,生产者也可以自己定义轨迹消息 Topic,不过需要在 Broker 上提前创建好自定义的 Topic。

如果想要轨迹消息和业务消息隔离,可以专门用一个 Broker 来保存轨迹消息,这样需要单独在这个 Broker 上开启轨迹消息。

4 消费端处理

消费端对轨迹消息的处理跟生产端非常类似。首先我们看一下消费端处理的 UML 类图:

微信图片_20221213115437.png

我们以推模式处理并发消息为例,ConsumeMessageConcurrentlyService 在消费消息前,通过 DefaultMQPushConsumerImpl 调用了钩子函数 executeHookBefore,消费消息后通过 DefaultMQPushConsumerImpl 调用了钩子函数 executeHookAfter。代码如下:

//ConsumeMessageConcurrentlyService 类
public void run() {
 //省略部分逻辑
 ConsumeMessageContext consumeMessageContext = null;
 if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
  consumeMessageContext = new ConsumeMessageContext();
  consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
  consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
  consumeMessageContext.setProps(new HashMap<String, String>());
  consumeMessageContext.setMq(messageQueue);
  consumeMessageContext.setMsgList(msgs);
  consumeMessageContext.setSuccess(false);
  //1.消费消息前执行钩子函数
  ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
 }
    //省略部分逻辑
 try {
    //2.消费消息
  status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
 } catch (Throwable e) {
 }
 //省略部分逻辑
 if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
  consumeMessageContext.setStatus(status.toString());
  consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
  //3.消费消息前执行钩子函数
  ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
 }
    //省略部分逻辑
}

如果消费端开启轨迹消息,就会初始化 traceDispatcher 并且注册钩子函数。

if (enableMsgTrace) {
 try {
  AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
  dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
  traceDispatcher = dispatcher;
  this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
   new ConsumeMessageTraceHookImpl(traceDispatcher));
 } catch (Throwable e) {
  log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
 }
}

可以看到,traceDispatcher 跟生产者使用的都是 AsyncTraceDispatcher,处理逻辑完全一样。

同样,钩子函数的使用跟生产者也类似,在消费消息之前调用钩子函数(executeHookBefore)封装轨迹消息,在消费消息之后再次调用钩子函数(executeHookAfter)完善轨迹消息。消费端轨迹消息的内容如下图:

微信图片_20221213115502.png

5 总结

本文主要讲解了 RocketMQ 的轨迹消息实现机制。轨迹消息分为生产端和消费端的轨迹消息,生产端和消费端 RocketMQ 都提供了构造函数来指定是否开启轨迹消息。通过钩子函数,把轨迹消息加入队列,也就是变量 traceContextQueue,而 traceDispatcher 则以 100 条为单位不停地从队列中拉取消息进行组装并发送到 Broker。如下图:

微信图片_20221213115524.png

理解了 traceDispatcher 和钩子函数 ,就很容易理解 RocketMQ 轨迹消息的处理逻辑了。

在 Broker 端,则通过增加配置参数 traceTopicEnable 来指定是否存储轨迹消息。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
659 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
21天前
|
消息中间件 网络性能优化
消息队列 MQ产品使用合集之通过MQTT控制台查询不到设备轨迹或消息轨迹是什么原因
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
2月前
|
消息中间件 负载均衡 应用服务中间件
MQ产品使用合集之使用的RocketMQ5.1.3时,grpc客户端没有产生消息轨迹如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 运维 Serverless
Serverless 应用引擎产品使用之在阿里云函数计算中,使用了RocketMQ的触发器,并且发送和接收消息都没有问题,但是消息轨迹中没有体现出来消费的情况如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
2月前
|
物联网 开发工具
MQTT常见问题之查轨迹失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
消息中间件 物联网 RocketMQ
MQTT常见问题之RocketMQ到MQTT的消息轨迹查询失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
物联网 Serverless
MQTT常见问题之通过mqtt控制台查询不到设备轨迹如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
9月前
|
存储 消息中间件 Java
聊聊RocketMQ 消息轨迹
这篇文章,我们聊一聊 RocketMQ 的**消息轨迹**设计思路。 查询消息轨迹可作为生产环境中排查问题强有力的数据支持 ,也是研发同学解决线上问题的重要武器之一。
聊聊RocketMQ 消息轨迹
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67629 2
3 张图带你彻底理解 RocketMQ 事务消息
|
消息中间件 Java uml
5张图带你理解 RocketMQ 顺序消息实现机制
5张图带你理解 RocketMQ 顺序消息实现机制
452 1
5张图带你理解 RocketMQ 顺序消息实现机制