RocketMQ数据存储&集群原理&顺序消费

简介: RocketMQ数据存储&集群原理&顺序消费

正文


一、RocketMQ数据存储原理


111.png


生产者投递消息


生产者在投递消息到mq服务器端,会将该消息存放在commitlog日志文件中(顺序写)。

Mq后台就会开启一个异步的线程将该commitlogoffset实现分配存放到不同队列中。


消费者消费消息:


消费者消费消息的时候订阅到队列(consumerqueue),根据queueoffset 获取到该commitlogoffset

在根据commitlogoffset 去commitlog日志文件中查找到该消息主体返回给客户端。


总结


生产者将消息投递到broker时,会将所有的消息以顺序写的方式追加到Commitlog文件中,MQ开启异步线程将消息分配到相应的队列中(包含commitlogOffset值、msgSize、Tag等信息)。消费者订阅相应的队列,通过consumerQueueOffset的值去获取到commitlogOffset值,然后根据commitlogOffset的值获取到消息体,然后进行消费。

commitlog文件每个文件的大小默认1G ,commitlog的文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824,以此类推,第三个文件名字为00000000002147483648,起始偏移量为2147483648 ,消息存储的时候会顺序写入文件,当文件满了,写入下一个文件。

理想状态下一个消费者对应一个队列,如果消费者数量多于队列数量,那么多余的消费者消费不到消息。因此在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。

在集群消费(Clustering)模式下每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给Consumer Group2 消费。 消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再消费这条消息。

RocketMQ和kafka一样,消息消费之后并不会立即删除消息,而是通过删除策略删除消息


二、集群原理


同步刷盘和异步刷盘


 RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种


写磁盘方式:


 异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入


          优点:性能高


          缺点:Master宕机,磁盘损坏的情况下,会丢失少量的消息, 导致MQ的消息状态和生产者/消费者的消息状态不一致


 同步刷盘方式:在返回应用写成功状态前,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,给应用返回消息写成功的状态。


         优点:可以保持MQ的消息状态和生产者/消费者的消息状态一致


         缺点:性能比异步的低


配置方式在broker.conf中配置


ASYNC_FLUSH 异步刷盘

SYNC_FLUSH 同步刷盘

flushDiskType=ASYNC_FLUSH


同步复制和异步复制


 如果一个broker组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制方式。


      同步复制方式:等Master和Slave均写成功后才反馈给客户端写成功状态


           优点:如果Master出故障,Slave上有全部的备份数据,容易恢复,消费者仍可以从Slave消费, 消息不丢失


           缺点:增大数据写入延迟,降低系统吞吐量,性能比异步复制模式略低,大约低10%左右,发送单个Master的响应时间会略高


     异步复制方式:只要Master写成功即可反馈给客户端写成功状态


           优点:系统拥有较低的延迟和较高的吞吐量. Master宕机之后,消费者仍可以从Slave消费,此过程对应用透明,不需要人工干预,性能同多个Master模式几乎一样


           缺点:如果Master出了故障,有些数据因为没有被写入Slave,而丢失少量消息。


配置方式在broker.conf中配置


brokerRole参数


ASYNC_MASTER 同步

SYNC_MASTER 异步

SLAVE 从节点


集群原理


       nameServer:多个Namesrv实例组成集群,但相互独立,没有信息交换。nameserver类似ZK和nacos等注册中心的功能。broker在启动时会将自己的ip和端口号注册到每一个nameserver中,然后与nameserver建立长连接。nameserver每隔30秒会发送一个心跳包,告诉broker自己还存活。而nameServer 定时器每隔10s的时间检测 故障Broker ,如果发生故障Broker 会直接剔除。生产者投递消息时会从nameserver中获取到broker的地址列表,然后进行消息投递。如果生产者在获取到服务列表之后,恰好当前broker宕机,那么生产者默认会有3次重试,如果依然失败,则重新从nameserver获取broker列表,进行消息投递。


主从broker如何保证消息消费一致性


在/data/rocketmq/store-a/config/consumerOffset.json文件中有如下结构


{
        "offsetTable":{
                "%RETRY%test-group@test-group":{0:0
                },
                "%RETRY%order-consumer@order-consumer":{0:0
                },
                "xiaojie-test@test-group":{0:4,1:3,2:4,3:28
                }
        }
}


在"xiaojie-test@test-group":{0:4,1:3,2:4,3:28}


其中 xiaojie-test为topic名称


test-group为消费组的名称


0:4 表示queueId 为0, consumeroffset为4,也就是说队列id为0的消息消费到偏移量为4的位置。


当master节点消费消息时,主节点会将自己commitLog和consumerOffset.json文件异步的同步到salve节点上。


当主节点宕机之后,从节点不能支持写操作,但是可以执行读的操作。但此时主节点的consumerOffset.json中consumeroffset值滞后于主节点,当主节点恢复之后,如何消费呢?答案是主节点恢复之后,会首先同步从节点的consumerOffset.json文件,然后再进行消费。


三、RocktMQ顺序消费


222.png


       消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。


顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。


       但正如上图所示,消费者是采用多线程的方式消费的,此时即使投递消息时的队列一致,也不能保证消费的时候就严格按照顺序消费。


官网顺序消费demo


package org.apache.rocketmq.example.order2;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* Producer,发送顺序消息
*/
public class Producer {
   public static void main(String[] args) throws Exception {
       DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
       producer.setNamesrvAddr("127.0.0.1:9876");
       producer.start();
       String[] tags = new String[]{"TagA", "TagC", "TagD"};
       // 订单列表
       List<OrderStep> orderList = new Producer().buildOrders();
       Date date = new Date();
       SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
       String dateStr = sdf.format(date);
       for (int i = 0; i < 10; i++) {
           // 加个时间前缀
           String body = dateStr + " Hello RocketMQ " + orderList.get(i);
           Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
           SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
               @Override
               public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                   Long id = (Long) arg;  //根据订单id选择发送queue
                   long index = id % mqs.size();
                   return mqs.get((int) index);
               }
           }, orderList.get(i).getOrderId());//订单id
           System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
               sendResult.getSendStatus(),
               sendResult.getMessageQueue().getQueueId(),
               body));
       }
       producer.shutdown();
   }
   /**
    * 订单的步骤
    */
   private static class OrderStep {
       private long orderId;
       private String desc;
       public long getOrderId() {
           return orderId;
       }
       public void setOrderId(long orderId) {
           this.orderId = orderId;
       }
       public String getDesc() {
           return desc;
       }
       public void setDesc(String desc) {
           this.desc = desc;
       }
       @Override
       public String toString() {
           return "OrderStep{" +
               "orderId=" + orderId +
               ", desc='" + desc + '\'' +
               '}';
       }
   }
   /**
    * 生成模拟订单数据
    */
   private List<OrderStep> buildOrders() {
       List<OrderStep> orderList = new ArrayList<OrderStep>();
       OrderStep orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("推送");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);
       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);
       return orderList;
   }
}


消费者代码


package org.apache.rocketmq.example.order2;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class ConsumerInOrder {
   public static void main(String[] args) throws Exception {
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
       consumer.setNamesrvAddr("127.0.0.1:9876");
       /**
        * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
        * 如果非第一次启动,那么按照上次消费的位置继续消费
        */
       consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
       consumer.subscribe("TopicTest", "TagA || TagC || TagD");
       consumer.registerMessageListener(new MessageListenerOrderly() {
           Random random = new Random();
           @Override
           public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
               context.setAutoCommit(true);
               for (MessageExt msg : msgs) {
                   // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                   System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
               }
               try {
                   //模拟业务逻辑处理中...
                   TimeUnit.SECONDS.sleep(random.nextInt(10));
               } catch (Exception e) {
                   e.printStackTrace();
               }
               return ConsumeOrderlyStatus.SUCCESS;
           }
       });
       consumer.start();
       System.out.println("Consumer Started.");
   }
}


Springboot整合顺序消费


package com.xiaojie.rocket.rocket.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
 * @author xiaojie
 * @version 1.0
 * @description: springboot顺序生产者
 * @date 2021/11/23 22:59
 */
@Component
@Slf4j
public class OrderProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    public void orderSend() {
        String msg = "这是测试顺序发送消息的内容-------insert";
        String msg1 = "这是测试顺序发送消息的内容-------update";
        String msg2 = "这是测试顺序发送消息的内容-------delete";
        String orderId = UUID.randomUUID().toString();
        SendResult sendResult1 = rocketMQTemplate.syncSendOrderly("test-orderly", msg, orderId);
        log.info(">>>>>>>>>>>>>>>result1{}", sendResult1);
        SendResult sendResult2 = rocketMQTemplate.syncSendOrderly("test-orderly", msg1, orderId);
        log.info(">>>>>>>>>>>>>>>result2{}", sendResult2);
        SendResult sendResult3 = rocketMQTemplate.syncSendOrderly("test-orderly", msg2, orderId);
        log.info(">>>>>>>>>>>>>>>result3{}", sendResult2);
    }
}


package com.xiaojie.rocket.rocket.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.Random;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 顺序消费者
 * @date 2021/11/23 23:18
 */
@Component
@RocketMQMessageListener(topic = "test-orderly", consumerGroup = "orderly-1", consumeMode = ConsumeMode.ORDERLY)
@Slf4j
public class OrderConsumer implements RocketMQListener {
    @Override
    public void onMessage(Object message) {
        try {
            Random r = new Random(100);
            int i = r.nextInt(500);
            Thread.sleep(i);
        } catch (Exception e) {
        }
        log.info("消费者监听到消息:<msg:{}>", message);
    }
}


参考 :https://blog.csdn.net/guyue35/article/details/105674044

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 负载均衡 监控
【面试问题】RabbitMQ 的集群
【1月更文挑战第27天】【面试问题】RabbitMQ 的集群
|
6月前
|
消息中间件 存储 Kubernetes
k8s1.20版本部署RabbitMQ集群(持久化)——2023.05
k8s1.20版本部署RabbitMQ集群(持久化)——2023.05
265 1
|
27天前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
25 0
|
7月前
|
消息中间件 存储 网络协议
从原理到实战,手把手教你在项目中使用RabbitMQ
RabbitMQ 的文章之前写过,但是当时给的示例是 Demo 版的,这篇文章主要是结合之前写的理论知识,将 RabbitMQ 集成到技术派项目中。 话不多说,上文章目录: 下面我们先回顾一下理论知识,如果对这块知识已经清楚的同学,可以直接跳到实战部分。 1. 消息队列 1.1 消息队列模式 消息队列目前主要 2 种模式,分别为“点对点模式”和“发布/订阅模式”。 点对点模式 一个具体的消息只能由一个消费者消费,多个生产者可以向同一个消息队列发送消息,但是一个消息在被一个消息者处理的时候,这个消息在队列上会被锁住或者被移除并且其他消费者无法处理该消息。 需要额外注意的是,如果消费者
446 5
|
28天前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
48 1
|
5月前
|
消息中间件 Kafka 测试技术
微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理
微服务轮子项目(33) -RocketMQ特点、安装部署、异常处理、事务消息原理
93 0
|
1月前
|
消息中间件 存储 Cloud Native
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
|
1月前
|
消息中间件 存储 缓存
【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶
【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶
|
2月前
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
365 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
|
6月前
|
消息中间件 存储 监控
消息中间件第八讲:消息队列 RocketMQ 版实战、集群及原理
消息中间件第八讲:消息队列 RocketMQ 版实战、集群及原理
100 0

热门文章

最新文章