源码分析RocketMQ消息轨迹

简介: 源码分析RocketMQ消息轨迹

本文沿着RocketMQ消息轨迹-设计篇的思路,从如下3个方面对其源码进行解读:


  1. 发送消息轨迹
  2. 消息轨迹格式
  3. 存储消息轨迹数据


发送消息轨迹流程


首先我们来看一下在消息发送端如何启用消息轨迹,示例代码如下:

1public class TraceProducer {
 2    public static void main(String[] args) throws MQClientException, InterruptedException {
 3        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);      // @1
 4        producer.setNamesrvAddr("127.0.0.1:9876");
 5        producer.start();
 6        for (int i = 0; i < 10; i++)
 7            try {
 8                {
 9                    Message msg = new Message("TopicTest",
10                        "TagA",
11                        "OrderID188",
12                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
13                    SendResult sendResult = producer.send(msg);
14                    System.out.printf("%s%n", sendResult);
15                }
16
17            } catch (Exception e) {
18                e.printStackTrace();
19            }
20        producer.shutdown();
21    }
22}

从上述代码可以看出其关键点是在创建DefaultMQProducer时指定开启消息轨迹跟踪。我们不妨浏览一下DefaultMQProducer与启用消息轨迹相关的构造函数:

1public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)
2public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
3

参数如下:


  • String producerGroup
    生产者所属组名。
  • boolean enableMsgTrace
    是否开启跟踪消息轨迹,默认为false。
  • String customizedTraceTopic
    如果开启消息轨迹跟踪,用来存储消息轨迹数据所属的主题名称,默认为:RMQ_SYS_TRACE_TOPIC。


1.1 DefaultMQProducer构造函数


1public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {      // @1
 2    this.producerGroup = producerGroup;
 3    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
 4    //if client open the message trace feature
 5    if (enableMsgTrace) {                                                                                                                                                                                            // @2
 6        try {
 7            AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);                                                         
 8            dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
 9            traceDispatcher = dispatcher;
10            this.getDefaultMQProducerImpl().registerSendMessageHook(
11                new SendMessageTraceHookImpl(traceDispatcher));                                                                                                                             // @3
12        } catch (Throwable e) {
13            log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
14        }
15    }
16}

代码@1:首先介绍一下其局部变量。


  • String producerGroup
    生产者所属组。
  • RPCHook rpcHook
    生产者发送钩子函数。
  • boolean enableMsgTrace
    是否开启消息轨迹跟踪。
  • String customizedTraceTopic
    定制用于存储消息轨迹的数据。


代码@2:用来构建AsyncTraceDispatcher,看其名:异步转发消息轨迹数据,稍后重点关注。


代码@3:构建SendMessageTraceHookImpl对象,并使用AsyncTraceDispatcher用来异步转发。


1.2 SendMessageTraceHookImpl钩子函数


1、SendMessageTraceHookImpl类图



636d91127df348ad7dd921bc7a0aeb30.jpg

  1. SendMessageHook
    消息发送钩子函数,用于在消息发送之前、发送之后执行一定的业务逻辑,是记录消息轨迹的最佳扩展点。
  2. TraceDispatcher
    消息轨迹转发处理器,其默认实现类AsyncTraceDispatcher,异步实现消息轨迹数据的发送。下面对其属性做一个简单的介绍:


  • int queueSize
    异步转发,队列长度,默认为2048,当前版本不能修改。
  • int batchSize
    批量消息条数,消息轨迹一次消息发送请求包含的数据条数,默认为100,当前版本不能修改。
  • int maxMsgSize
    消息轨迹一次发送的最大消息大小,默认为128K,当前版本不能修改。
  • DefaultMQProducer traceProducer
    用来发送消息轨迹的消息发送者。
  • ThreadPoolExecutor traceExecuter
    线程池,用来异步执行消息发送。
  • AtomicLong discardCount
    记录丢弃的消息个数。
  • Thread worker
    woker线程,主要负责从追加队列中获取一批待发送的消息轨迹数据,提交到线程池中执行。
  • ArrayBlockingQueue< TraceContext> traceContextQueue
    消息轨迹TraceContext队列,用来存放待发送到服务端的消息。
  • ArrayBlockingQueue< Runnable> appenderQueue
    线程池内部队列,默认长度1024。
  • DefaultMQPushConsumerImpl hostConsumer
    消费者信息,记录消息消费时的轨迹信息。
  • String traceTopicName
    用于跟踪消息轨迹的topic名称。


2、 SendMessageTraceHookImpl


sendMessageBefore

1public void sendMessageBefore(SendMessageContext context) { 
 2    //if it is message trace data,then it doesn't recorded
 3    if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {   // @1
 4        return;
 5    }
 6    //build the context content of TuxeTraceContext
 7    TraceContext tuxeContext = new TraceContext();
 8    tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
 9    context.setMqTraceContext(tuxeContext);
10    tuxeContext.setTraceType(TraceType.Pub);
11    tuxeContext.setGroupName(context.getProducerGroup());                                                                                                                       // @2
12    //build the data bean object of message trace
13    TraceBean traceBean = new TraceBean();                                                                                                                                                // @3
14    traceBean.setTopic(context.getMessage().getTopic());
15    traceBean.setTags(context.getMessage().getTags());
16    traceBean.setKeys(context.getMessage().getKeys());
17    traceBean.setStoreHost(context.getBrokerAddr());
18    traceBean.setBodyLength(context.getMessage().getBody().length);
19    traceBean.setMsgType(context.getMsgType());
20    tuxeContext.getTraceBeans().add(traceBean);
21}

代码@1:如果topic主题为消息轨迹的Topic,直接返回。


代码@2:在消息发送上下文中,设置用来跟踪消息轨迹的上下环境,里面主要包含一个TraceBean集合、追踪类型(TraceType.Pub)与生产者所属的组。


代码@3:构建一条跟踪消息,用TraceBean来表示,记录原消息的topic、tags、keys、发送到broker地址、消息体长度等消息。


从上文看出,sendMessageBefore主要的用途就是在消息发送的时候,先准备一部分消息跟踪日志,存储在发送上下文环境中,此时并不会发送消息轨迹数据。


sendMessageAfter

1public void sendMessageAfter(SendMessageContext context) {
 2    //if it is message trace data,then it doesn't recorded
 3    if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())     // @1
 4        || context.getMqTraceContext() == null) {
 5        return;
 6    }
 7    if (context.getSendResult() == null) {
 8        return;
 9    }
10
11    if (context.getSendResult().getRegionId() == null
12        || !context.getSendResult().isTraceOn()) {
13        // if switch is false,skip it
14        return;
15    }
16
17    TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
18    TraceBean traceBean = tuxeContext.getTraceBeans().get(0);                                                                                                // @2
19    int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());     // @3
20    tuxeContext.setCostTime(costTime);                                                                                                                                      // @4
21    if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {                                                                    
22        tuxeContext.setSuccess(true);
23    } else {
24        tuxeContext.setSuccess(false);
25    }
26    tuxeContext.setRegionId(context.getSendResult().getRegionId());                                                                                      
27    traceBean.setMsgId(context.getSendResult().getMsgId());
28    traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
29    traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
30    localDispatcher.append(tuxeContext);                                                                                                                                   // @5
31}

代码@1:如果topic主题为消息轨迹的Topic,直接返回。


代码@2:从MqTraceContext中获取跟踪的TraceBean,虽然设计成List结构体,但在消息发送场景,这里的数据永远只有一条,即使是批量发送也不例外。


代码@3:获取消息发送耗时。


代码@4:设置costTime(耗时)、success(是否发送成功)、regionId(发送到broker所在的分区)、msgId(消息ID,全局唯一)、offsetMsgId(消息物理偏移量,如果是批量消息,则是最后一条消息的物理偏移量)、storeTime,这里使用的是(客户端发送时间 + 二分之一的耗时)来表示消息的存储时间,这里是一个估值。


代码@5:将需要跟踪的信息通过TraceDispatcher转发到Broker服务器。其代码如下:

1public boolean append(final Object ctx) {
2    boolean result = traceContextQueue.offer((TraceContext) ctx);
3    if (!result) {
4        log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
5    }
6    return result;
7}

这里一个非常关键的点是offer方法的使用,当队列无法容纳新的元素时会立即返回false,并不会阻塞。


接下来将目光转向TraceDispatcher的实现。


1.3 TraceDispatcher实现原理


TraceDispatcher,用于客户端消息轨迹数据转发到Broker,其默认实现类:AsyncTraceDispatcher。


TraceDispatcher构造函数


1public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException {    
 2    // queueSize is greater than or equal to the n power of 2 of value
 3    this.queueSize = 2048;
 4    this.batchSize = 100;
 5    this.maxMsgSize = 128000;                                        
 6    this.discardCount = new AtomicLong(0L);         
 7    this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
 8    this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
 9    if (!UtilAll.isBlank(traceTopicName)) {
10        this.traceTopicName = traceTopicName;
11    } else {
12        this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
13    }                   // @1
14    this.traceExecuter = new ThreadPoolExecutor(// :
15        10, //
16        20, //
17        1000 * 60, //
18        TimeUnit.MILLISECONDS, //
19        this.appenderQueue, //
20        new ThreadFactoryImpl("MQTraceSendThread_"));
21    traceProducer = getAndCreateTraceProducer(rpcHook);      // @2
22}

代码@1:初始化核心属性,该版本这些值都是“固化”的,用户无法修改。


  • queueSize
    队列长度,默认为2048,异步线程池能够积压的消息轨迹数量。
  • batchSize
    一次向Broker批量发送的消息条数,默认为100.
  • maxMsgSize
    向Broker汇报消息轨迹时,消息体的总大小不能超过该值,默认为128k。
  • discardCount
    整个运行过程中,丢弃的消息轨迹数据,这里要说明一点的是,如果消息TPS发送过大,异步转发线程处理不过来时,会主动丢弃消息轨迹数据。
  • traceContextQueue
    traceContext积压队列,客户端(消息发送、消息消费者)在收到处理结果后,将消息轨迹提交到噶队列中,则会立即返回。
  • appenderQueue
    提交到Broker线程池中队列。
  • traceTopicName
    用于接收消息轨迹的Topic,默认为RMQ_SYS_TRANS_HALF_TOPIC。
  • traceExecuter
    用于发送到Broker服务的异步线程池,核心线程数默认为10,最大线程池为20,队列堆积长度2048,线程名称:MQTraceSendThread_。、
  • traceProducer
    发送消息轨迹的Producer。


代码@2:调用getAndCreateTraceProducer方法创建用于发送消息轨迹的Producer(消息发送者),下面详细介绍一下其实现。


getAndCreateTraceProducer详解


1private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {
 2        DefaultMQProducer traceProducerInstance = this.traceProducer;
 3        if (traceProducerInstance == null) {  //@1
 4            traceProducerInstance = new DefaultMQProducer(rpcHook);
 5            traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
 6            traceProducerInstance.setSendMsgTimeout(5000);
 7            traceProducerInstance.setVipChannelEnabled(false);
 8            // The max size of message is 128K
 9            traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
10        }
11        return traceProducerInstance;
12    }

代码@1:如果还未建立发送者,则创建用于发送消息轨迹的消息发送者,其GroupName为:_INNER_TRACE_PRODUCER,消息发送超时时间5s,最大允许发送消息大小118K。


start


1public void start(String nameSrvAddr) throws MQClientException {
 2    if (isStarted.compareAndSet(false, true)) {     // @1
 3        traceProducer.setNamesrvAddr(nameSrvAddr);
 4        traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
 5        traceProducer.start();
 6    }
 7    this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);   // @2
 8    this.worker.setDaemon(true);
 9    this.worker.start();                                                                                   
10    this.registerShutDownHook();
11}

开始启动,其调用的时机为启动DefaultMQProducer时,如果启用跟踪消息轨迹,则调用之。


代码@1:如果用于发送消息轨迹的发送者没有启动,则设置nameserver地址,并启动着。


代码@2:启动一个线程,用于执行AsyncRunnable任务,接下来将重点介绍。


AsyncRunnable


1class AsyncRunnable implements Runnable {
 2         private boolean stopped;
 3    public void run() {
 4        while (!stopped) {
 5            List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);     // @1
 6            for (int i = 0; i < batchSize; i++) {
 7                TraceContext context = null;
 8                try {
 9                    //get trace data element from blocking Queue — traceContextQueue
10                    context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);        // @2
11                } catch (InterruptedException e) {
12                }
13                if (context != null) {
14                    contexts.add(context);
15                } else {
16                    break;
17                }
18            }
19            if (contexts.size() > 0) {                                                                               :
20                AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);  // @3
21                traceExecuter.submit(request);                                                               
22            } else if (AsyncTraceDispatcher.this.stopped) {
23                this.stopped = true;
24            }
25        }
26    }
27}

代码@1:构建待提交消息跟踪Bean,每次最多发送batchSize,默认为100条。


代码@2:从traceContextQueue中取出一个待提交的TraceContext,设置超时时间为5s,即如何该队列中没有待提交的TraceContext,则最多等待5s。


代码@3:向线程池中提交任务AsyncAppenderRequest。


AsyncAppenderRequest#sendTraceData


1public void sendTraceData(List<TraceContext> contextList) {
 2    Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
 3    for (TraceContext context : contextList) {        //@1
 4        if (context.getTraceBeans().isEmpty()) {
 5            continue;
 6        }
 7        // Topic value corresponding to original message entity content
 8        String topic = context.getTraceBeans().get(0).getTopic();     // @2
 9        // Use  original message entity's topic as key
10        String key = topic;
11        List<TraceTransferBean> transBeanList = transBeanMap.get(key);
12        if (transBeanList == null) {
13            transBeanList = new ArrayList<TraceTransferBean>();
14            transBeanMap.put(key, transBeanList);
15        }
16        TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);    // @3
17        transBeanList.add(traceData);
18    }
19    for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {       // @4
20        flushData(entry.getValue());
21    }
22}

代码@1:遍历收集的消息轨迹数据。


代码@2:获取存储消息轨迹的Topic。


代码@3:对TraceContext进行编码,这里是消息轨迹的传输数据,稍后对其详细看一下,了解其上传的格式。


代码@4:将编码后的数据发送到Broker服务器。


TraceDataEncoder#encoderFromContextBean


根据消息轨迹跟踪类型,其格式会有一些不一样,下面分别来介绍其合适。


PUB(消息发送)

1case Pub: {
 2    TraceBean bean = ctx.getTraceBeans().get(0);
 3    //append the content of context and traceBean to transferBean's TransData
 4    sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
 5      .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
 6      .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
 7      .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
 8      .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)//
 9      .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
10      .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)//
11      .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
12      .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
13      .append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)//
14      .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
15      .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
16      .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
17     .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);
18}

消息轨迹数据的协议使用字符串拼接,字段的分隔符号为1,整个数据以2结尾,感觉这个设计还是有点“不可思议”,为什么不直接使用json协议呢?


SubBefore(消息消费之前)

1for (TraceBean bean : ctx.getTraceBeans()) {
 2    sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
 3      .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
 4      .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
 5      .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
 6      .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
 7      .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
 8      .append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)//
 9      .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);//
10    }
11}

轨迹就是按照上述顺序拼接而成,各个字段使用1分隔,每一条记录使用2结尾。


SubAfter(消息消费后)

1case SubAfter: {
 2    for (TraceBean bean : ctx.getTraceBeans()) {
 3        sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
 4          .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
 5          .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
 6          .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
 7          .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
 8          .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
 9          .append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR);
10        }
11    }
12}

格式编码一样,就不重复多说。


经过上面的源码跟踪,消息发送端的消息轨迹跟踪流程、消息轨迹数据编码协议就清晰了,接下来我们使用一张序列图来结束本部分的讲解。

ebc4378885474bfdf1ef1cc545f838ed.jpg


其实行文至此,只关注了消息发送的消息轨迹跟踪,消息消费的轨迹跟踪又是如何呢?其实现原理其实是一样的,就是在消息消费前后执行特定的钩子函数,其实现类为ConsumeMessageTraceHookImpl,由于其实现与消息发送的思路类似,故就不详细介绍了。


消息轨迹数据如何存储


其实从上面的分析,我们已经得知,RocketMQ的消息轨迹数据存储在到Broker上,那消息轨迹的主题名如何指定?其路由信息又怎么分配才好呢?是每台Broker上都创建还是只在其中某台上创建呢?RocketMQ支持系统默认与自定义消息轨迹的主题。


2.1 使用系统默认的主题名称


RocketMQ默认的消息轨迹主题为:RMQ_SYS_TRACE_TOPIC,那该Topic需要手工创建吗?其路由信息呢?

1{
 2    if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {    // @1
 3        String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
 4        TopicConfig topicConfig = new TopicConfig(topic);
 5        this.systemTopicList.add(topic);
 6        topicConfig.setReadQueueNums(1);                                              // @2
 7        topicConfig.setWriteQueueNums(1);
 8        this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
 9    }
10}

上述代码出自TopicConfigManager的构造函数,在Broker启动的时候会创建topicConfigManager对象,用来管理topic的路由信息。


代码@1:如果Broker开启了消息轨迹跟踪(traceTopicEnable=true)时,会自动创建默认消息轨迹的topic路由信息,注意其读写队列数为1。


2.2 用户自定义消息轨迹主题


在创建消息发送者、消息消费者时,可以显示的指定消息轨迹的Topic,例如:

1public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic)
2
3public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
4        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)
5

通过customizedTraceTopic来指定消息轨迹Topic。


温馨提示:通常在生产环境上,不会开启自动创建主题,故需要RocketMQ运维管理人员提前创建好Topic。


好了,本文就介绍到这里了,详细介绍了RocktMQ消息轨迹的实现原理,下一篇,我们将进入到多副本的学习中。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
7月前
|
消息中间件 负载均衡 应用服务中间件
MQ产品使用合集之使用的RocketMQ5.1.3时,grpc客户端没有产生消息轨迹如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
169 3
|
6月前
|
消息中间件 网络性能优化
消息队列 MQ产品使用合集之通过MQTT控制台查询不到设备轨迹或消息轨迹是什么原因
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
7月前
|
消息中间件 运维 Serverless
Serverless 应用引擎产品使用之在阿里云函数计算中,使用了RocketMQ的触发器,并且发送和接收消息都没有问题,但是消息轨迹中没有体现出来消费的情况如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
7月前
|
物联网 开发工具
MQTT常见问题之查轨迹失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
7月前
|
消息中间件 物联网 RocketMQ
MQTT常见问题之RocketMQ到MQTT的消息轨迹查询失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
7月前
|
物联网 Serverless
MQTT常见问题之通过mqtt控制台查询不到设备轨迹如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
消息中间件 负载均衡 中间件
【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析
【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析
199 2
【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析
|
存储 消息中间件 Java
聊聊RocketMQ 消息轨迹
这篇文章,我们聊一聊 RocketMQ 的**消息轨迹**设计思路。 查询消息轨迹可作为生产环境中排查问题强有力的数据支持 ,也是研发同学解决线上问题的重要武器之一。
聊聊RocketMQ 消息轨迹
下一篇
DataWorks