开发者社区 > 云原生 > 消息队列 > 正文

请问RocketMQ Spring , 如何 让日志里携带 traceId, spanId ?

请问RocketMQ Spring , 如何 让日志里携带 traceId, spanId ?

展开
收起
cuicuicuic 2023-06-11 23:02:17 1232 0
4 条回答
写回答
取消 提交回答
  • 如果您想要在RocketMQ Spring的日志中携带TraceId和SpanId,则可以使用MDC(Mapped Diagnostic Context)功能。MDC是log4j框架提供的一种上下文信息传递的机制,可以将一些变量绑定到当前线程,使得这些变量在整个线程生命周期内都可以访问。

    以下是一个示例配置,可用于设置MDC并在日志中携带TraceId和SpanId:

    %d{yyyy-MM-dd HH:mm:ss.SSS} ${PID:- } [%thread] %-5level %logger{36} - traceId:%X{traceId}, spanId:%X{spanId} - %msg%n

    在上面的示例中,我们使用了logback作为日志框架,并且定义了一个名为“CONSOLE”的输出器,输出内容包括时间、线程、日志级别、Logger名称、TraceId和SpanId。然后我们将“org.apache.rocketmq”Logger与CONSOLE输出器关联,并通过MDC将TraceId和SpanId设置为全局属性。

    接着,在代码中需要生成TraceId和SpanId时,您可以使用Spring Cloud Sleuth库来创建TraceId和SpanId:

    import org.springframework.cloud.sleuth.*; // ...

    @Autowired(required = false) private Tracer tracer;

    public void sendMessage(String message) { if (tracer != null) { Span newSpan = tracer.nextSpan().name("sendMessage").start(); try (Tracer.SpanInScope ws = tracer.withSpan(newSpan.start())) { // Do something with the message // ... } finally { newSpan.tag("message", message); newSpan.finish(); } } else { // Fallback behavior when Tracer is not available // ... } } 在上面的示例中,我们注入了一个名为tracer的Tracer对象,并使用nextSpan()方法创建一个新的Span。然后,我们使用withSpan()方法将当前线程与新的Span关联,并执行需要进行跟踪的操作(例如发送消息)。最后,我们通过tag()方法将消息内容添加到Span中,并调用finish()方法完成Span。

    这样,当执行sendMessage()方法时,MDC会自动将TraceId和SpanId设置为全局属性,并在日志中携带它们。

    2023-06-12 09:11:58
    赞同 展开评论 打赏
  • 在RocketMQ Spring中,可以通过配置RocketMQ的ProducerInterceptorConsumerInterceptor来实现在日志中携带traceIdspanId

    ProducerInterceptor中,你可以通过MessageExtputUserProperty方法将traceIdspanId添加到消息的用户属性中,例如:

    public class TraceProducerInterceptor implements ProducerInterceptor {
        private static final String TRACE_ID = "traceId";
        private static final String SPAN_ID = "spanId";
    
        @Override
        public Message<?> beforeConvert(Message<?> message) {
            String traceId = TraceContext.getTraceId();
            String spanId = TraceContext.getSpanId();
            if (traceId != null && spanId != null) {
                MessageHeaders headers = message.getHeaders();
                headers.put(TRACE_ID, traceId);
                headers.put(SPAN_ID, spanId);
                if (message instanceof ErrorMessage) {
                    ErrorMessage errorMessage = (ErrorMessage) message;
                    Message<?> originalMessage = errorMessage.getOriginalMessage();
                    if (originalMessage != null) {
                        originalMessage = beforeConvert(originalMessage);
                        errorMessage = new ErrorMessage(
                                errorMessage.getPayload(), originalMessage, errorMessage.getHeaders(), errorMessage.getCause());
                    }
                    return errorMessage;
                }
            }
            return message;
        }
    }
    

    ConsumerInterceptor中,你可以通过MessageExtgetUserProperty方法获取到消息中的traceIdspanId,并将其添加到日志中,例如:

    public class TraceConsumerInterceptor implements ConsumerInterceptor {
        private static final String TRACE_ID = "traceId";
        private static final String SPAN_ID = "spanId";
    
        @Override
        public void afterReceive(Message<?> message) {
            String traceId = message.getHeaders().get(TRACE_ID, String.class);
            String spanId = message.getHeaders().get(SPAN_ID, String.class);
            if (traceId != null && spanId != null) {
                MDC.put(TRACE_ID, traceId);
                MDC.put(SPAN_ID, spanId);
            }
        }
    }
    

    在配置文件中添加ProducerInterceptorConsumerInterceptor

    rocketmq:
      producer:
        interceptor:
          - com.example.TraceProducerInterceptor
      consumer:
        interceptor:
          - com.example.TraceConsumerInterceptor
    

    这样就可以在RocketMQ Spring的日志中携带traceIdspanId了。

    2023-06-12 08:58:19
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    RocketMQ Spring 支持使用 OpenTracing(或 Jaeger)来实现分布式链路追踪,可以通过在消息发送和消费时设置 traceId 和 spanId 来携带这些信息。

    具体来说,可以通过在消息体中设置 traceId 和 spanId,然后在消息发送前将这些信息设置到消息的属性中。例如,可以使用 Tracer 接口来生成 traceId 和 spanId,然后将这些信息设置到消息的属性中,例如:

    java Copy @Autowired private Tracer tracer;

    @Autowired private RocketMQTemplate rocketMQTemplate;

    public void sendMessage(String message) { String traceId = tracer.activeSpan().context().toTraceId(); String spanId = tracer.activeSpan().context().toSpanId();

    Message<String> msg = MessageBuilder.withPayload(message)
            .setHeader("TRACE_ID", traceId)
            .setHeader("SPAN_ID", spanId)
            .build();
    rocketMQTemplate.syncSend("topic", msg);
    

    } 在消息消费时,可以通过订阅消息时设置 MessageListenerConcurrently 或 MessageListenerOrderly 接口的实现类,然后在 consumeMessage 方法中获取消息属性中的 traceId 和 spanId。例如:

    java Copy @Component @RocketMQMessageListener(topic = "topic", consumerGroup = "consumer-group") public class MyMessageListener implements MessageListenerConcurrently {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            String traceId = msg.getProperty("TRACE_ID");
            String spanId = msg.getProperty("SPAN_ID");
            // 处理消息
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    

    } 需要注意的是,使用 OpenTracing(或 Jaeger)进行分布式链路追踪需要在应用中引入相应的依赖包,以及配置相应的参数。具体的配置方式可以参考 OpenTracing 或 Jaeger 的官方文档。

    2023-06-12 07:59:28
    赞同 展开评论 打赏
  • 可以通过RocketMQ的消息拦截器(MessageInterceptor)来实现在发送和接收消息时携带traceId和spanId,然后通过日志框架将其记录下来。

    具体实现步骤如下:

    1、定义一个自定义的MessageInterceptor,实现beforeSend和afterConsume方法,在这两个方法中可以获取traceId和spanId,并将其添加到消息的属性中。

    public class TraceMessageInterceptor implements MessageInterceptor {
        @Override
        public Message<?> beforeSend(Message<?> message, org.apache.rocketmq.common.message.Message rocketmqMessage, SendCallback callback) {
            TraceContext traceContext = TraceContext.get();
            if (traceContext != null) {
                rocketmqMessage.putUserProperty("traceId", traceContext.getTraceId());
                rocketmqMessage.putUserProperty("spanId", traceContext.getSpanId());
            }
            return message;
        }
    
        @Override
        public void afterConsume(MessageExt messageExt) {
            String traceId = messageExt.getUserProperty("traceId");
            String spanId = messageExt.getUserProperty("spanId");
            TraceContext.set(new TraceContext(traceId, spanId));
        }
    }
    

    2、在Spring Boot中配置RocketMQ的消息拦截器

    @Configuration
    public class RocketMQConfig {
        @Autowired
        private TraceMessageInterceptor traceMessageInterceptor;
    
        @Bean
        public DefaultMQProducer defaultMQProducer() throws MQClientException {
            DefaultMQProducer producer = new DefaultMQProducer("producer_group");
            producer.setNamesrvAddr("localhost:9876");
            producer.setSendMsgTimeout(10000);
            producer.setVipChannelEnabled(false);
            producer.setRetryTimesWhenSendFailed(3);
            producer.setRetryTimesWhenSendAsyncFailed(3);
            producer.setRetryAnotherBrokerWhenNotStoreOK(true);
            producer.setSendMessageWithVIPChannel(false);
            producer.setSendLatencyFaultEnable(true);
            producer.setUnitMode(false);
            producer.setCompressMsgBodyOverHowmuch(1024 * 4);
            producer.setInterceptors(Arrays.asList(traceMessageInterceptor));
            producer.start();
            return producer;
        }
    
        @Bean
        public DefaultMQPushConsumer defaultMQPushConsumer(MQListener mqListener) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
            consumer.setNamesrvAddr("localhost:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("topic", "*");
            consumer.setConsumeThreadMax(1);
            consumer.setConsumeThreadMin(1);
            consumer.setPullBatchSize(32);
            consumer.setPullInterval(0);
            consumer.setConsumeMessageBatchMaxSize(1);
            consumer.setConsumeTimeout(15);
            consumer.setInterceptors(Arrays.asList(traceMessageInterceptor));
            consumer.registerMessageListener(mqListener);
            consumer.start();
            return consumer;
        }
    }
    

    3、在日志框架中添加traceId和spanId的日志输出

    @Slf4j
    @Component
    public class TraceLogFilter extends OncePerRequestFilter {
        @Override
        protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
            TraceContext traceContext = TraceContext.get();
            if (traceContext != null) {
                MDC.put("traceId", traceContext.getTraceId());
                MDC.put("spanId", traceContext.getSpanId());
            }
    
            try {
                filterChain.doFilter(request, response);
            } finally {
                MDC.clear();
            }
        }
    }
    

    通过这种方式,就可以在RocketMQ的消息中携带traceId和spanId,并通过日志框架将其输出到日志中,从而方便地跟踪分布式调用链路。

    2023-06-12 07:57:18
    赞同 展开评论 打赏

多个子产品线联合打造金融级高可用消息服务以及对物联网的原生支持,覆盖多行业。

相关产品

  • 云消息队列 MQ
  • 相关电子书

    更多
    PostgresChina2018_赖思超_PostgreSQL10_hash索引的WAL日志修改版final 立即下载
    Kubernetes下日志实时采集、存储与计算实践 立即下载
    日志数据采集与分析对接 立即下载