RocketMQ数据存储&集群原理&顺序消费

简介: RocketMQ数据存储&集群原理&顺序消费

正文


一、RocketMQ数据存储原理


111.png


生产者投递消息


生产者在投递消息到mq服务器端,会将该消息存放在commitlog日志文件中(顺序写)。

Mq后台就会开启一个异步的线程将该commitlogoffset实现分配存放到不同队列中。


消费者消费消息:


消费者消费消息的时候订阅到队列(consumerqueue),根据queueoffset 获取到该commitlogoffset

在根据commitlogoffset 去commitlog日志文件中查找到该消息主体返回给客户端。


总结


生产者将消息投递到broker时,会将所有的消息以顺序写的方式追加到Commitlog文件中,MQ开启异步线程将消息分配到相应的队列中(包含commitlogOffset值、msgSize、Tag等信息)。消费者订阅相应的队列,通过consumerQueueOffset的值去获取到commitlogOffset值,然后根据commitlogOffset的值获取到消息体,然后进行消费。

commitlog文件每个文件的大小默认1G ,commitlog的文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824,以此类推,第三个文件名字为00000000002147483648,起始偏移量为2147483648 ,消息存储的时候会顺序写入文件,当文件满了,写入下一个文件。

理想状态下一个消费者对应一个队列,如果消费者数量多于队列数量,那么多余的消费者消费不到消息。因此在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。

在集群消费(Clustering)模式下每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给Consumer Group2 消费。 消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再消费这条消息。

RocketMQ和kafka一样,消息消费之后并不会立即删除消息,而是通过删除策略删除消息


二、集群原理


同步刷盘和异步刷盘


 RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种


写磁盘方式:


 异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入


          优点:性能高


          缺点:Master宕机,磁盘损坏的情况下,会丢失少量的消息, 导致MQ的消息状态和生产者/消费者的消息状态不一致


 同步刷盘方式:在返回应用写成功状态前,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,给应用返回消息写成功的状态。


         优点:可以保持MQ的消息状态和生产者/消费者的消息状态一致


         缺点:性能比异步的低


配置方式在broker.conf中配置


ASYNC_FLUSH 异步刷盘

SYNC_FLUSH 同步刷盘

flushDiskType=ASYNC_FLUSH


同步复制和异步复制


 如果一个broker组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制方式。


      同步复制方式:等Master和Slave均写成功后才反馈给客户端写成功状态


           优点:如果Master出故障,Slave上有全部的备份数据,容易恢复,消费者仍可以从Slave消费, 消息不丢失


           缺点:增大数据写入延迟,降低系统吞吐量,性能比异步复制模式略低,大约低10%左右,发送单个Master的响应时间会略高


     异步复制方式:只要Master写成功即可反馈给客户端写成功状态


           优点:系统拥有较低的延迟和较高的吞吐量. Master宕机之后,消费者仍可以从Slave消费,此过程对应用透明,不需要人工干预,性能同多个Master模式几乎一样


           缺点:如果Master出了故障,有些数据因为没有被写入Slave,而丢失少量消息。


配置方式在broker.conf中配置


brokerRole参数


ASYNC_MASTER 同步

SYNC_MASTER 异步

SLAVE 从节点


集群原理


       nameServer:多个Namesrv实例组成集群,但相互独立,没有信息交换。nameserver类似ZK和nacos等注册中心的功能。broker在启动时会将自己的ip和端口号注册到每一个nameserver中,然后与nameserver建立长连接。nameserver每隔30秒会发送一个心跳包,告诉broker自己还存活。而nameServer 定时器每隔10s的时间检测 故障Broker ,如果发生故障Broker 会直接剔除。生产者投递消息时会从nameserver中获取到broker的地址列表,然后进行消息投递。如果生产者在获取到服务列表之后,恰好当前broker宕机,那么生产者默认会有3次重试,如果依然失败,则重新从nameserver获取broker列表,进行消息投递。


主从broker如何保证消息消费一致性


在/data/rocketmq/store-a/config/consumerOffset.json文件中有如下结构


{
        "offsetTable":{
                "%RETRY%test-group@test-group":{0:0
                },
                "%RETRY%order-consumer@order-consumer":{0:0
                },
                "xiaojie-test@test-group":{0:4,1:3,2:4,3:28
                }
        }
}


在"xiaojie-test@test-group":{0:4,1:3,2:4,3:28}


其中 xiaojie-test为topic名称


test-group为消费组的名称


0:4 表示queueId 为0, consumeroffset为4,也就是说队列id为0的消息消费到偏移量为4的位置。


当master节点消费消息时,主节点会将自己commitLog和consumerOffset.json文件异步的同步到salve节点上。


当主节点宕机之后,从节点不能支持写操作,但是可以执行读的操作。但此时主节点的consumerOffset.json中consumeroffset值滞后于主节点,当主节点恢复之后,如何消费呢?答案是主节点恢复之后,会首先同步从节点的consumerOffset.json文件,然后再进行消费。


三、RocktMQ顺序消费


222.png


       消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。


顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。


       但正如上图所示,消费者是采用多线程的方式消费的,此时即使投递消息时的队列一致,也不能保证消费的时候就严格按照顺序消费。


官网顺序消费demo


package org.apache.rocketmq.example.order2;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* Producer,发送顺序消息
*/
public class Producer {
   public static void main(String[] args) throws Exception {
       DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
       producer.setNamesrvAddr("127.0.0.1:9876");
       producer.start();
       String[] tags = new String[]{"TagA", "TagC", "TagD"};
       // 订单列表
       List<OrderStep> orderList = new Producer().buildOrders();
       Date date = new Date();
       SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
       String dateStr = sdf.format(date);
       for (int i = 0; i < 10; i++) {
           // 加个时间前缀
           String body = dateStr + " Hello RocketMQ " + orderList.get(i);
           Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
           SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
               @Override
               public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                   Long id = (Long) arg;  //根据订单id选择发送queue
                   long index = id % mqs.size();
                   return mqs.get((int) index);
               }
           }, orderList.get(i).getOrderId());//订单id
           System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
               sendResult.getSendStatus(),
               sendResult.getMessageQueue().getQueueId(),
               body));
       }
       producer.shutdown();
   }
   /**
    * 订单的步骤
    */
   private static class OrderStep {
       private long orderId;
       private String desc;
       public long getOrderId() {
           return orderId;
       }
       public void setOrderId(long orderId) {
           this.orderId = orderId;
       }
       public String getDesc() {
           return desc;
       }
       public void setDesc(String desc) {
           this.desc = desc;
       }
       @Override
       public String toString() {
           return "OrderStep{" +
               "orderId=" + orderId +
               ", desc='" + desc + '\'' +
               '}';
       }
   }
   /**
    * 生成模拟订单数据
    */
   private List<OrderStep> buildOrders() {
       List<OrderStep> orderList = new ArrayList<OrderStep>();
       OrderStep orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("推送");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);
       return orderList;
   }
}


消费者代码


package org.apache.rocketmq.example.order2;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class ConsumerInOrder {
   public static void main(String[] args) throws Exception {
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
       consumer.setNamesrvAddr("127.0.0.1:9876");
       /**
        * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
        * 如果非第一次启动,那么按照上次消费的位置继续消费
        */
       consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
       consumer.subscribe("TopicTest", "TagA || TagC || TagD");
       consumer.registerMessageListener(new MessageListenerOrderly() {
           Random random = new Random();
           @Override
           public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
               context.setAutoCommit(true);
               for (MessageExt msg : msgs) {
                   // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                   System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
               }
               try {
                   //模拟业务逻辑处理中...
                   TimeUnit.SECONDS.sleep(random.nextInt(10));
               } catch (Exception e) {
                   e.printStackTrace();
               }
               return ConsumeOrderlyStatus.SUCCESS;
           }
       });
       consumer.start();
       System.out.println("Consumer Started.");
   }
}


Springboot整合顺序消费


package com.xiaojie.rocket.rocket.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
 * @author xiaojie
 * @version 1.0
 * @description: springboot顺序生产者
 * @date 2021/11/23 22:59
 */
@Component
@Slf4j
public class OrderProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    public void orderSend() {
        String msg = "这是测试顺序发送消息的内容-------insert";
        String msg1 = "这是测试顺序发送消息的内容-------update";
        String msg2 = "这是测试顺序发送消息的内容-------delete";
        String orderId = UUID.randomUUID().toString();
        SendResult sendResult1 = rocketMQTemplate.syncSendOrderly("test-orderly", msg, orderId);
        log.info(">>>>>>>>>>>>>>>result1{}", sendResult1);
        SendResult sendResult2 = rocketMQTemplate.syncSendOrderly("test-orderly", msg1, orderId);
        log.info(">>>>>>>>>>>>>>>result2{}", sendResult2);
        SendResult sendResult3 = rocketMQTemplate.syncSendOrderly("test-orderly", msg2, orderId);
        log.info(">>>>>>>>>>>>>>>result3{}", sendResult2);
    }
}


package com.xiaojie.rocket.rocket.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.Random;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 顺序消费者
 * @date 2021/11/23 23:18
 */
@Component
@RocketMQMessageListener(topic = "test-orderly", consumerGroup = "orderly-1", consumeMode = ConsumeMode.ORDERLY)
@Slf4j
public class OrderConsumer implements RocketMQListener {
    @Override
    public void onMessage(Object message) {
        try {
            Random r = new Random(100);
            int i = r.nextInt(500);
            Thread.sleep(i);
        } catch (Exception e) {
        }
        log.info("消费者监听到消息:<msg:{}>", message);
    }
}


参考 :https://blog.csdn.net/guyue35/article/details/105674044

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
10天前
|
消息中间件 存储 Kafka
RocketMQ 工作原理图解,看这篇就够了!
本文详细解析了 RocketMQ 的核心架构、消息领域模型、关键特性和应用场景,帮助深入理解消息中间件的工作原理。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
RocketMQ 工作原理图解,看这篇就够了!
|
3月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
22天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
3月前
|
消息中间件 存储 负载均衡
|
3月前
|
消息中间件 存储 负载均衡
"RabbitMQ集群大揭秘!让你的消息传递系统秒变超级英雄,轻松应对亿级并发挑战!"
【8月更文挑战第24天】RabbitMQ是一款基于AMQP的开源消息中间件,以其高可靠性、扩展性和易用性闻名。面对高并发和大数据挑战时,可通过构建集群提升性能。本文深入探讨RabbitMQ集群配置、工作原理,并提供示例代码。集群由多个通过网络连接的节点组成,共享消息队列,确保高可用性和负载均衡。搭建集群需准备多台服务器,安装Erlang和RabbitMQ,并确保节点间通信顺畅。核心步骤包括配置.erlang.cookie文件、使用rabbitmqctl命令加入集群。消息发布至任一节点时,通过集群机制同步至其他节点;消费者可从任一节点获取消息。
46 2
|
3月前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
3月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
3月前
|
消息中间件 存储 RocketMQ
2分钟看懂RocketMQ延迟消息核心原理
本文从源码层面解析了RocketMQ延迟消息的实现原理,包括延迟消息的使用、Broker端处理机制以及定时任务对延迟消息的处理流程。
2分钟看懂RocketMQ延迟消息核心原理
|
3月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
3月前
|
存储 C# 关系型数据库
“云端融合:WPF应用无缝对接Azure与AWS——从Blob存储到RDS数据库,全面解析跨平台云服务集成的最佳实践”
【8月更文挑战第31天】本文探讨了如何将Windows Presentation Foundation(WPF)应用与Microsoft Azure和Amazon Web Services(AWS)两大主流云平台无缝集成。通过具体示例代码展示了如何利用Azure Blob Storage存储非结构化数据、Azure Cosmos DB进行分布式数据库操作;同时介绍了如何借助Amazon S3实现大规模数据存储及通过Amazon RDS简化数据库管理。这不仅提升了WPF应用的可扩展性和可用性,还降低了基础设施成本。
83 0
下一篇
无影云桌面