开发者社区 > 云原生 > 云消息队列 > 正文

请问RocketMQ Spring , 如何 让日志里携带 traceId, spanId ?

请问RocketMQ Spring , 如何 让日志里携带 traceId, spanId ?

展开
收起
真的很搞笑 2023-06-11 23:02:17 2095 0
4 条回答
写回答
取消 提交回答
  • 如果您想要在RocketMQ Spring的日志中携带TraceId和SpanId,则可以使用MDC(Mapped Diagnostic Context)功能。MDC是log4j框架提供的一种上下文信息传递的机制,可以将一些变量绑定到当前线程,使得这些变量在整个线程生命周期内都可以访问。

    以下是一个示例配置,可用于设置MDC并在日志中携带TraceId和SpanId:

    %d{yyyy-MM-dd HH:mm:ss.SSS} ${PID:- } [%thread] %-5level %logger{36} - traceId:%X{traceId}, spanId:%X{spanId} - %msg%n

    在上面的示例中,我们使用了logback作为日志框架,并且定义了一个名为“CONSOLE”的输出器,输出内容包括时间、线程、日志级别、Logger名称、TraceId和SpanId。然后我们将“org.apache.rocketmq”Logger与CONSOLE输出器关联,并通过MDC将TraceId和SpanId设置为全局属性。

    接着,在代码中需要生成TraceId和SpanId时,您可以使用Spring Cloud Sleuth库来创建TraceId和SpanId:

    import org.springframework.cloud.sleuth.*; // ...

    @Autowired(required = false) private Tracer tracer;

    public void sendMessage(String message) { if (tracer != null) { Span newSpan = tracer.nextSpan().name("sendMessage").start(); try (Tracer.SpanInScope ws = tracer.withSpan(newSpan.start())) { // Do something with the message // ... } finally { newSpan.tag("message", message); newSpan.finish(); } } else { // Fallback behavior when Tracer is not available // ... } } 在上面的示例中,我们注入了一个名为tracer的Tracer对象,并使用nextSpan()方法创建一个新的Span。然后,我们使用withSpan()方法将当前线程与新的Span关联,并执行需要进行跟踪的操作(例如发送消息)。最后,我们通过tag()方法将消息内容添加到Span中,并调用finish()方法完成Span。

    这样,当执行sendMessage()方法时,MDC会自动将TraceId和SpanId设置为全局属性,并在日志中携带它们。

    2023-06-12 09:11:58
    赞同 展开评论 打赏
  • 在RocketMQ Spring中,可以通过配置RocketMQ的ProducerInterceptorConsumerInterceptor来实现在日志中携带traceIdspanId

    ProducerInterceptor中,你可以通过MessageExtputUserProperty方法将traceIdspanId添加到消息的用户属性中,例如:

    public class TraceProducerInterceptor implements ProducerInterceptor {
        private static final String TRACE_ID = "traceId";
        private static final String SPAN_ID = "spanId";
    
        @Override
        public Message<?> beforeConvert(Message<?> message) {
            String traceId = TraceContext.getTraceId();
            String spanId = TraceContext.getSpanId();
            if (traceId != null && spanId != null) {
                MessageHeaders headers = message.getHeaders();
                headers.put(TRACE_ID, traceId);
                headers.put(SPAN_ID, spanId);
                if (message instanceof ErrorMessage) {
                    ErrorMessage errorMessage = (ErrorMessage) message;
                    Message<?> originalMessage = errorMessage.getOriginalMessage();
                    if (originalMessage != null) {
                        originalMessage = beforeConvert(originalMessage);
                        errorMessage = new ErrorMessage(
                                errorMessage.getPayload(), originalMessage, errorMessage.getHeaders(), errorMessage.getCause());
                    }
                    return errorMessage;
                }
            }
            return message;
        }
    }
    

    ConsumerInterceptor中,你可以通过MessageExtgetUserProperty方法获取到消息中的traceIdspanId,并将其添加到日志中,例如:

    public class TraceConsumerInterceptor implements ConsumerInterceptor {
        private static final String TRACE_ID = "traceId";
        private static final String SPAN_ID = "spanId";
    
        @Override
        public void afterReceive(Message<?> message) {
            String traceId = message.getHeaders().get(TRACE_ID, String.class);
            String spanId = message.getHeaders().get(SPAN_ID, String.class);
            if (traceId != null && spanId != null) {
                MDC.put(TRACE_ID, traceId);
                MDC.put(SPAN_ID, spanId);
            }
        }
    }
    

    在配置文件中添加ProducerInterceptorConsumerInterceptor

    rocketmq:
      producer:
        interceptor:
          - com.example.TraceProducerInterceptor
      consumer:
        interceptor:
          - com.example.TraceConsumerInterceptor
    

    这样就可以在RocketMQ Spring的日志中携带traceIdspanId了。

    2023-06-12 08:58:19
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    RocketMQ Spring 支持使用 OpenTracing(或 Jaeger)来实现分布式链路追踪,可以通过在消息发送和消费时设置 traceId 和 spanId 来携带这些信息。

    具体来说,可以通过在消息体中设置 traceId 和 spanId,然后在消息发送前将这些信息设置到消息的属性中。例如,可以使用 Tracer 接口来生成 traceId 和 spanId,然后将这些信息设置到消息的属性中,例如:

    java Copy @Autowired private Tracer tracer;

    @Autowired private RocketMQTemplate rocketMQTemplate;

    public void sendMessage(String message) { String traceId = tracer.activeSpan().context().toTraceId(); String spanId = tracer.activeSpan().context().toSpanId();

    Message<String> msg = MessageBuilder.withPayload(message)
            .setHeader("TRACE_ID", traceId)
            .setHeader("SPAN_ID", spanId)
            .build();
    rocketMQTemplate.syncSend("topic", msg);
    

    } 在消息消费时,可以通过订阅消息时设置 MessageListenerConcurrently 或 MessageListenerOrderly 接口的实现类,然后在 consumeMessage 方法中获取消息属性中的 traceId 和 spanId。例如:

    java Copy @Component @RocketMQMessageListener(topic = "topic", consumerGroup = "consumer-group") public class MyMessageListener implements MessageListenerConcurrently {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            String traceId = msg.getProperty("TRACE_ID");
            String spanId = msg.getProperty("SPAN_ID");
            // 处理消息
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    

    } 需要注意的是,使用 OpenTracing(或 Jaeger)进行分布式链路追踪需要在应用中引入相应的依赖包,以及配置相应的参数。具体的配置方式可以参考 OpenTracing 或 Jaeger 的官方文档。

    2023-06-12 07:59:28
    赞同 展开评论 打赏
  • 可以通过RocketMQ的消息拦截器(MessageInterceptor)来实现在发送和接收消息时携带traceId和spanId,然后通过日志框架将其记录下来。

    具体实现步骤如下:

    1、定义一个自定义的MessageInterceptor,实现beforeSend和afterConsume方法,在这两个方法中可以获取traceId和spanId,并将其添加到消息的属性中。

    public class TraceMessageInterceptor implements MessageInterceptor {
        @Override
        public Message<?> beforeSend(Message<?> message, org.apache.rocketmq.common.message.Message rocketmqMessage, SendCallback callback) {
            TraceContext traceContext = TraceContext.get();
            if (traceContext != null) {
                rocketmqMessage.putUserProperty("traceId", traceContext.getTraceId());
                rocketmqMessage.putUserProperty("spanId", traceContext.getSpanId());
            }
            return message;
        }
    
        @Override
        public void afterConsume(MessageExt messageExt) {
            String traceId = messageExt.getUserProperty("traceId");
            String spanId = messageExt.getUserProperty("spanId");
            TraceContext.set(new TraceContext(traceId, spanId));
        }
    }
    

    2、在Spring Boot中配置RocketMQ的消息拦截器

    @Configuration
    public class RocketMQConfig {
        @Autowired
        private TraceMessageInterceptor traceMessageInterceptor;
    
        @Bean
        public DefaultMQProducer defaultMQProducer() throws MQClientException {
            DefaultMQProducer producer = new DefaultMQProducer("producer_group");
            producer.setNamesrvAddr("localhost:9876");
            producer.setSendMsgTimeout(10000);
            producer.setVipChannelEnabled(false);
            producer.setRetryTimesWhenSendFailed(3);
            producer.setRetryTimesWhenSendAsyncFailed(3);
            producer.setRetryAnotherBrokerWhenNotStoreOK(true);
            producer.setSendMessageWithVIPChannel(false);
            producer.setSendLatencyFaultEnable(true);
            producer.setUnitMode(false);
            producer.setCompressMsgBodyOverHowmuch(1024 * 4);
            producer.setInterceptors(Arrays.asList(traceMessageInterceptor));
            producer.start();
            return producer;
        }
    
        @Bean
        public DefaultMQPushConsumer defaultMQPushConsumer(MQListener mqListener) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
            consumer.setNamesrvAddr("localhost:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("topic", "*");
            consumer.setConsumeThreadMax(1);
            consumer.setConsumeThreadMin(1);
            consumer.setPullBatchSize(32);
            consumer.setPullInterval(0);
            consumer.setConsumeMessageBatchMaxSize(1);
            consumer.setConsumeTimeout(15);
            consumer.setInterceptors(Arrays.asList(traceMessageInterceptor));
            consumer.registerMessageListener(mqListener);
            consumer.start();
            return consumer;
        }
    }
    

    3、在日志框架中添加traceId和spanId的日志输出

    @Slf4j
    @Component
    public class TraceLogFilter extends OncePerRequestFilter {
        @Override
        protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
            TraceContext traceContext = TraceContext.get();
            if (traceContext != null) {
                MDC.put("traceId", traceContext.getTraceId());
                MDC.put("spanId", traceContext.getSpanId());
            }
    
            try {
                filterChain.doFilter(request, response);
            } finally {
                MDC.clear();
            }
        }
    }
    

    通过这种方式,就可以在RocketMQ的消息中携带traceId和spanId,并通过日志框架将其输出到日志中,从而方便地跟踪分布式调用链路。

    2023-06-12 07:57:18
    赞同 展开评论 打赏

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

相关产品

  • 云消息队列 MQ
  • 热门讨论

    热门文章

    相关电子书

    更多
    RocketMQ Client-GO 介绍 立即下载
    RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
    基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载