聊聊RocketMQ 消息轨迹

简介: 这篇文章,我们聊一聊 RocketMQ 的**消息轨迹**设计思路。查询消息轨迹可作为生产环境中排查问题强有力的数据支持 ,也是研发同学解决线上问题的重要武器之一。

这篇文章,我们聊一聊 RocketMQ 的消息轨迹设计思路。

查询消息轨迹可作为生产环境中排查问题强有力的数据支持 ,也是研发同学解决线上问题的重要武器之一。

1 基础概念

消息轨迹是指一条消息从生产者发送到 Broker , 再到消费者消费,整个过程中的各个相关节点的时间、状态等数据汇聚而成的完整链路信息。

当我们需要查询消息轨迹时,需要明白一点:消息轨迹数据是存储在 Broker 服务端,我们需要定义一个主题,在生产者,消费者端定义轨迹钩子

2 开启轨迹

2.1 修改 Broker 配置文件

# 开启消息轨迹
traceTopicEnable=true

2.2 生产者配置

public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) 

public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)

在生产者的构造函数里,有两个核心参数:

  • enableMsgTrace:是否开启消息轨迹
  • customizedTraceTopic:记录消息轨迹的 Topic , 默认是:RMQ_SYS_TRACE_TOPIC

执行如下的生产者代码:

public class Producer {
   
   
    public static final String PRODUCER_GROUP = "mytestGroup";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "example";
    public static final String TAG = "TagA";

    public static void main(String[] args) throws MQClientException, InterruptedException {
   
   
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, true);
        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        producer.start();
        try {
   
   
            String key = UUID.randomUUID().toString();
            System.out.println(key);
            Message msg = new Message(
                    TOPIC,
                    TAG,
                    key,
                    ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }
        // 这里休眠十秒,是为了异步发送轨迹消息成功。
        Thread.sleep(10000);
        producer.shutdown();
    }
}

在生产者代码中,我们指定了消息的 key 属性, 便于对于消息进行高性能检索。

执行成功之后,我们从控制台查看轨迹信息。

从图中可以看到,消息轨迹中存储了消息的存储时间存储服务器IP发送耗时

2.3 消费者配置

和生产者类似,消费者的构造函数可以传递轨迹参数:

public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace);

public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic);

执行如下的消费者代码:

public class Consumer {
   
   
    public static final String CONSUMER_GROUP = "exampleGruop";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "example";

    public static void main(String[] args) throws InterruptedException, MQClientException {
   
   
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP , true);
        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe(TOPIC, "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
   
   
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

3 实现原理

轨迹的实现原理主要是在生产者发送、消费者消费时添加相关的钩子。 因此,我们只需要了解钩子的实现逻辑即可。

下面的代码是 DefaultMQProducer 的构造函数。

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
    boolean enableMsgTrace, final String customizedTraceTopic) {
   
   
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    // if client open the message trace feature
    if (enableMsgTrace) {
   
   
        try {
   
   
            //异步轨迹分发器
            AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
            dispatcher.setHostProducer(this.defaultMQProducerImpl);
            traceDispatcher = dispatcher;
            // 发送消息时添加执行钩子
            this.defaultMQProducerImpl.registerSendMessageHook(
                new SendMessageTraceHookImpl(traceDispatcher));
            // 结束事务时添加执行钩子
            this.defaultMQProducerImpl.registerEndTransactionHook(
                new EndTransactionTraceHookImpl(traceDispatcher));
        } catch (Throwable e) {
   
   
            log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
        }
    }
}

当是否开启轨迹开关打开时,创建异步轨迹分发器 AsyncTraceDispatcher ,然后给默认的生产者实现类在发送消息的钩子 SendMessageTraceHookImpl

//发送消息时添加执行钩子
this.defaultMQProducerImpl.registerSendMessageHook(new SendMessageTraceHookImpl(traceDispatcher));

我们把生产者发送消息的流程简化如下代码 :

//DefaultMQProducerImpl#sendKernelImpl
this.executeSendMessageHookBefore(context);

// 发生消息
this.mQClientFactory.getMQClientAPIImpl().sendMessage(....)

// 生产者发送消息后会执行
this.executeSendMessageHookAfter(context);

进入SendMessageTraceHookImpl 类 ,该类主要有两个方法 sendMessageBeforesendMessageAfter

  1. sendMessageBefore 方法
public void sendMessageBefore(SendMessageContext context) {
   
   
    //if it is message trace data,then it doesn't recorded
    if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher)  localDispatcher).getTraceTopicName())) {
   
   
        return;
    }
    //build the context content of TuxeTraceContext
    TraceContext tuxeContext = new TraceContext();
    tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
    context.setMqTraceContext(tuxeContext);
    tuxeContext.setTraceType(TraceType.Pub);
    tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
    //build the data bean object of message trace
    TraceBean traceBean = new TraceBean();
    traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
    traceBean.setTags(context.getMessage().getTags());
    traceBean.setKeys(context.getMessage().getKeys());
    traceBean.setStoreHost(context.getBrokerAddr());
    traceBean.setBodyLength(context.getMessage().getBody().length);
    traceBean.setMsgType(context.getMsgType());
    tuxeContext.getTraceBeans().add(traceBean);
}

发送消息之前,先收集消息的 topic 、tag、key 、存储 Broker 的 IP 地址、消息体的长度等基础信息,并将消息轨迹数据存储在调用上下文中。

  1. sendMessageAfter 方法
public void sendMessageAfter(SendMessageContext context) {
   
   
    // ...省略部分代码 
    TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
    TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
    int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
    tuxeContext.setCostTime(costTime);
    if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
   
   
        tuxeContext.setSuccess(true);
    } else {
   
   
        tuxeContext.setSuccess(false);
    }
    tuxeContext.setRegionId(context.getSendResult().getRegionId());
    traceBean.setMsgId(context.getSendResult().getMsgId());
    traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
    traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
    localDispatcher.append(tuxeContext);
}

跟踪对象里会保存 costTime (消息发送时间)、success (是否发送成功)、regionId (发送到 Broker 所在的分区) 、 msgId (消息 ID,全局唯一)、offsetMsgId (消息物理偏移量) ,storeTime (存储时间 ) 。

存储时间并没有取消息的实际存储时间,而是估算出来的:客户端发送时间的一般的耗时表示消息的存储时间。

最后将跟踪上下文添加到本地轨迹分发器:

localDispatcher.append(tuxeContext);

下面我们分析下轨迹分发器的原理:

public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) {
   
   
    // 省略代码 ....   
    this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
    this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
    if (!UtilAll.isBlank(traceTopicName)) {
   
   
        this.traceTopicName = traceTopicName;
    } else {
   
   
        this.traceTopicName = TopicValidator.RMQ_SYS_TRACE_TOPIC;
    }
    this.traceExecutor = new ThreadPoolExecutor(//
            10, 
            20, 
            1000 * 60, 
            TimeUnit.MILLISECONDS, 
            this.appenderQueue, 
            new ThreadFactoryImpl("MQTraceSendThread_"));
    traceProducer = getAndCreateTraceProducer(rpcHook);
}
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
   
   
        if (isStarted.compareAndSet(false, true)) {
   
   
            traceProducer.setNamesrvAddr(nameSrvAddr);
            traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
            traceProducer.start();
        }
        this.accessChannel = accessChannel;
        this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
        this.worker.setDaemon(true);
        this.worker.start();
        this.registerShutDownHook();
}

上面的代码展示了分发器的构造函数和启动方法,构造函数创建了一个发送消息的线程池 traceExecutor ,启动 start 后会启动一个 worker线程

class AsyncRunnable implements Runnable {
   
   
    private boolean stopped;
    @Override
    public void run() {
   
   
        while (!stopped) {
   
   
            synchronized (traceContextQueue) {
   
   
                long endTime = System.currentTimeMillis() + pollingTimeMil;
                while (System.currentTimeMillis() < endTime) {
   
   
                    try {
   
   
                        TraceContext traceContext = traceContextQueue.poll(
                                endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS
                        );
                        if (traceContext != null && !traceContext.getTraceBeans().isEmpty()) {
   
   
                            // get the topic which the trace message will send to
                            String traceTopicName = this.getTraceTopicName(traceContext.getRegionId());

                            // get the traceDataSegment which will save this trace message, create if null
                            TraceDataSegment traceDataSegment = taskQueueByTopic.get(traceTopicName);
                            if (traceDataSegment == null) {
   
   
                                traceDataSegment = new TraceDataSegment(traceTopicName, traceContext.getRegionId());
                                taskQueueByTopic.put(traceTopicName, traceDataSegment);
                            }

                            // encode traceContext and save it into traceDataSegment
                            // NOTE if data size in traceDataSegment more than maxMsgSize,
                            //  a AsyncDataSendTask will be created and submitted
                            TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(traceContext);
                            traceDataSegment.addTraceTransferBean(traceTransferBean);
                        }
                    } catch (InterruptedException ignore) {
   
   
                        log.debug("traceContextQueue#poll exception");
                    }
                }
                // NOTE send the data in traceDataSegment which the first TraceTransferBean
                //  is longer than waitTimeThreshold
                sendDataByTimeThreshold();
                if (AsyncTraceDispatcher.this.stopped) {
   
   
                    this.stopped = true;
                }
            }
        }
  }

worker 启动后,会从轨迹上下文队列 traceContextQueue 中不断的取出轨迹上下文,并将上下文转换成轨迹数据片段 TraceDataSegment

为了提升系统的性能,并不是每一次从队列中获取到数据就直接发送到 MQ ,而是积累到一定程度的临界点才触发这个操作,我们可以简单的理解为批量操作

这里面有两个维度 :

  1. 轨迹数据片段的数据大小大于某个数据大小阈值。笔者认为这段 RocketMQ 4.9.4 版本代码存疑,因为最新的 5.0 版本做了优化。

    if (currentMsgSize >= traceProducer.getMaxMessageSize()) {
         
         
        List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);
        AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend);
        traceExecutor.submit(asyncDataSendTask);
        this.clear();
    }
    
  2. 当前时间 - 轨迹数据片段的首次存储时间 是否大于刷新时间 ,也就是每500毫秒刷新一次。

    private void sendDataByTimeThreshold() {
         
         
        long now = System.currentTimeMillis();
        for (TraceDataSegment taskInfo : taskQueueByTopic.values()) {
         
         
            if (now - taskInfo.firstBeanAddTime >= waitTimeThresholdMil) {
         
         
                taskInfo.sendAllData();
            }
        }
    }
    

轨迹数据存储的格式如下:

TraceBean bean = ctx.getTraceBeans().get(0);
//append the content of context and traceBean to transferBean's TransData
case Pub: {
   
   
  sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)
    .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
    .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)
    .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)
    .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)
    .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)
    .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)
    .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)
    .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)
    .append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)
    .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)
    .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)
    .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)
    .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);
}
break;

轨迹消息数据

注意:

分隔符 CONTENT_SPLITOR = (char) 1 它在内存中的值是:00000001 , 但是 char i = '1' 它在内存中的值是 49 ,即 00110001。


参考资料:

阿里云文档:

https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/user-guide/query-a-message-trace

石臻臻:

https://mp.weixin.qq.com/s/saYD3mG9F1z-oAU6STxewQ

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 负载均衡 应用服务中间件
MQ产品使用合集之使用的RocketMQ5.1.3时,grpc客户端没有产生消息轨迹如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
164 3
|
5月前
|
消息中间件 网络性能优化
消息队列 MQ产品使用合集之通过MQTT控制台查询不到设备轨迹或消息轨迹是什么原因
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
6月前
|
消息中间件 运维 Serverless
Serverless 应用引擎产品使用之在阿里云函数计算中,使用了RocketMQ的触发器,并且发送和接收消息都没有问题,但是消息轨迹中没有体现出来消费的情况如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
6月前
|
物联网 开发工具
MQTT常见问题之查轨迹失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
6月前
|
消息中间件 物联网 RocketMQ
MQTT常见问题之RocketMQ到MQTT的消息轨迹查询失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
6月前
|
物联网 Serverless
MQTT常见问题之通过mqtt控制台查询不到设备轨迹如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
消息中间件 存储 uml
5 张图带你彻底理解 RocketMQ 轨迹消息
5 张图带你彻底理解 RocketMQ 轨迹消息
480 0
5 张图带你彻底理解 RocketMQ 轨迹消息
|
存储 消息中间件 JSON
源码分析RocketMQ消息轨迹
源码分析RocketMQ消息轨迹
源码分析RocketMQ消息轨迹
|
存储 消息中间件 数据库
RocketMQ消息轨迹-设计篇
RocketMQ消息轨迹-设计篇
RocketMQ消息轨迹-设计篇
|
消息中间件 Apache 数据安全/隐私保护
Apache RocketMQ 发布 v4.4.0,新添权限控制和消息轨迹特性
近日,Apache RocketMQ 发布了 v4.4.0,该版本主要增加了权限控制(ACL)和消息轨迹(Message Trace)两大特性,并做了8项优化,和修复了4处bug。 权限控制(ACL) 该特性主要为 RocketMQ提供权限访问控制。
5214 8

相关产品

  • 云消息队列 MQ
  • 下一篇
    无影云桌面