请问RocketMQ Spring , 如何 让日志里携带 traceId, spanId ?
如果您想要在RocketMQ Spring的日志中携带TraceId和SpanId,则可以使用MDC(Mapped Diagnostic Context)功能。MDC是log4j框架提供的一种上下文信息传递的机制,可以将一些变量绑定到当前线程,使得这些变量在整个线程生命周期内都可以访问。
以下是一个示例配置,可用于设置MDC并在日志中携带TraceId和SpanId:
%d{yyyy-MM-dd HH:mm:ss.SSS} ${PID:- } [%thread] %-5level %logger{36} - traceId:%X{traceId}, spanId:%X{spanId} - %msg%n
在上面的示例中,我们使用了logback作为日志框架,并且定义了一个名为“CONSOLE”的输出器,输出内容包括时间、线程、日志级别、Logger名称、TraceId和SpanId。然后我们将“org.apache.rocketmq”Logger与CONSOLE输出器关联,并通过MDC将TraceId和SpanId设置为全局属性。
接着,在代码中需要生成TraceId和SpanId时,您可以使用Spring Cloud Sleuth库来创建TraceId和SpanId:
import org.springframework.cloud.sleuth.*; // ...
@Autowired(required = false) private Tracer tracer;
public void sendMessage(String message) { if (tracer != null) { Span newSpan = tracer.nextSpan().name("sendMessage").start(); try (Tracer.SpanInScope ws = tracer.withSpan(newSpan.start())) { // Do something with the message // ... } finally { newSpan.tag("message", message); newSpan.finish(); } } else { // Fallback behavior when Tracer is not available // ... } } 在上面的示例中,我们注入了一个名为tracer的Tracer对象,并使用nextSpan()方法创建一个新的Span。然后,我们使用withSpan()方法将当前线程与新的Span关联,并执行需要进行跟踪的操作(例如发送消息)。最后,我们通过tag()方法将消息内容添加到Span中,并调用finish()方法完成Span。
这样,当执行sendMessage()方法时,MDC会自动将TraceId和SpanId设置为全局属性,并在日志中携带它们。
在RocketMQ Spring中,可以通过配置RocketMQ的ProducerInterceptor
和ConsumerInterceptor
来实现在日志中携带traceId
和spanId
。
在ProducerInterceptor
中,你可以通过MessageExt
的putUserProperty
方法将traceId
和spanId
添加到消息的用户属性中,例如:
public class TraceProducerInterceptor implements ProducerInterceptor {
private static final String TRACE_ID = "traceId";
private static final String SPAN_ID = "spanId";
@Override
public Message<?> beforeConvert(Message<?> message) {
String traceId = TraceContext.getTraceId();
String spanId = TraceContext.getSpanId();
if (traceId != null && spanId != null) {
MessageHeaders headers = message.getHeaders();
headers.put(TRACE_ID, traceId);
headers.put(SPAN_ID, spanId);
if (message instanceof ErrorMessage) {
ErrorMessage errorMessage = (ErrorMessage) message;
Message<?> originalMessage = errorMessage.getOriginalMessage();
if (originalMessage != null) {
originalMessage = beforeConvert(originalMessage);
errorMessage = new ErrorMessage(
errorMessage.getPayload(), originalMessage, errorMessage.getHeaders(), errorMessage.getCause());
}
return errorMessage;
}
}
return message;
}
}
在ConsumerInterceptor
中,你可以通过MessageExt
的getUserProperty
方法获取到消息中的traceId
和spanId
,并将其添加到日志中,例如:
public class TraceConsumerInterceptor implements ConsumerInterceptor {
private static final String TRACE_ID = "traceId";
private static final String SPAN_ID = "spanId";
@Override
public void afterReceive(Message<?> message) {
String traceId = message.getHeaders().get(TRACE_ID, String.class);
String spanId = message.getHeaders().get(SPAN_ID, String.class);
if (traceId != null && spanId != null) {
MDC.put(TRACE_ID, traceId);
MDC.put(SPAN_ID, spanId);
}
}
}
在配置文件中添加ProducerInterceptor
和ConsumerInterceptor
:
rocketmq:
producer:
interceptor:
- com.example.TraceProducerInterceptor
consumer:
interceptor:
- com.example.TraceConsumerInterceptor
这样就可以在RocketMQ Spring的日志中携带traceId
和spanId
了。
RocketMQ Spring 支持使用 OpenTracing(或 Jaeger)来实现分布式链路追踪,可以通过在消息发送和消费时设置 traceId 和 spanId 来携带这些信息。
具体来说,可以通过在消息体中设置 traceId 和 spanId,然后在消息发送前将这些信息设置到消息的属性中。例如,可以使用 Tracer 接口来生成 traceId 和 spanId,然后将这些信息设置到消息的属性中,例如:
java Copy @Autowired private Tracer tracer;
@Autowired private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String message) { String traceId = tracer.activeSpan().context().toTraceId(); String spanId = tracer.activeSpan().context().toSpanId();
Message<String> msg = MessageBuilder.withPayload(message)
.setHeader("TRACE_ID", traceId)
.setHeader("SPAN_ID", spanId)
.build();
rocketMQTemplate.syncSend("topic", msg);
} 在消息消费时,可以通过订阅消息时设置 MessageListenerConcurrently 或 MessageListenerOrderly 接口的实现类,然后在 consumeMessage 方法中获取消息属性中的 traceId 和 spanId。例如:
java Copy @Component @RocketMQMessageListener(topic = "topic", consumerGroup = "consumer-group") public class MyMessageListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String traceId = msg.getProperty("TRACE_ID");
String spanId = msg.getProperty("SPAN_ID");
// 处理消息
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
} 需要注意的是,使用 OpenTracing(或 Jaeger)进行分布式链路追踪需要在应用中引入相应的依赖包,以及配置相应的参数。具体的配置方式可以参考 OpenTracing 或 Jaeger 的官方文档。
可以通过RocketMQ的消息拦截器(MessageInterceptor)来实现在发送和接收消息时携带traceId和spanId,然后通过日志框架将其记录下来。
具体实现步骤如下:
1、定义一个自定义的MessageInterceptor,实现beforeSend和afterConsume方法,在这两个方法中可以获取traceId和spanId,并将其添加到消息的属性中。
public class TraceMessageInterceptor implements MessageInterceptor {
@Override
public Message<?> beforeSend(Message<?> message, org.apache.rocketmq.common.message.Message rocketmqMessage, SendCallback callback) {
TraceContext traceContext = TraceContext.get();
if (traceContext != null) {
rocketmqMessage.putUserProperty("traceId", traceContext.getTraceId());
rocketmqMessage.putUserProperty("spanId", traceContext.getSpanId());
}
return message;
}
@Override
public void afterConsume(MessageExt messageExt) {
String traceId = messageExt.getUserProperty("traceId");
String spanId = messageExt.getUserProperty("spanId");
TraceContext.set(new TraceContext(traceId, spanId));
}
}
2、在Spring Boot中配置RocketMQ的消息拦截器
@Configuration
public class RocketMQConfig {
@Autowired
private TraceMessageInterceptor traceMessageInterceptor;
@Bean
public DefaultMQProducer defaultMQProducer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.setSendMsgTimeout(10000);
producer.setVipChannelEnabled(false);
producer.setRetryTimesWhenSendFailed(3);
producer.setRetryTimesWhenSendAsyncFailed(3);
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
producer.setSendMessageWithVIPChannel(false);
producer.setSendLatencyFaultEnable(true);
producer.setUnitMode(false);
producer.setCompressMsgBodyOverHowmuch(1024 * 4);
producer.setInterceptors(Arrays.asList(traceMessageInterceptor));
producer.start();
return producer;
}
@Bean
public DefaultMQPushConsumer defaultMQPushConsumer(MQListener mqListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("topic", "*");
consumer.setConsumeThreadMax(1);
consumer.setConsumeThreadMin(1);
consumer.setPullBatchSize(32);
consumer.setPullInterval(0);
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setConsumeTimeout(15);
consumer.setInterceptors(Arrays.asList(traceMessageInterceptor));
consumer.registerMessageListener(mqListener);
consumer.start();
return consumer;
}
}
3、在日志框架中添加traceId和spanId的日志输出
@Slf4j
@Component
public class TraceLogFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
TraceContext traceContext = TraceContext.get();
if (traceContext != null) {
MDC.put("traceId", traceContext.getTraceId());
MDC.put("spanId", traceContext.getSpanId());
}
try {
filterChain.doFilter(request, response);
} finally {
MDC.clear();
}
}
}
通过这种方式,就可以在RocketMQ的消息中携带traceId和spanId,并通过日志框架将其输出到日志中,从而方便地跟踪分布式调用链路。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/