正文
一、RocketMQ数据存储原理
生产者投递消息
生产者在投递消息到mq服务器端,会将该消息存放在commitlog日志文件中(顺序写)。
Mq后台就会开启一个异步的线程将该commitlogoffset实现分配存放到不同队列中。
消费者消费消息:
消费者消费消息的时候订阅到队列(consumerqueue),根据queueoffset 获取到该commitlogoffset
在根据commitlogoffset 去commitlog日志文件中查找到该消息主体返回给客户端。
总结
生产者将消息投递到broker时,会将所有的消息以顺序写的方式追加到Commitlog文件中,MQ开启异步线程将消息分配到相应的队列中(包含commitlogOffset值、msgSize、Tag等信息)。消费者订阅相应的队列,通过consumerQueueOffset的值去获取到commitlogOffset值,然后根据commitlogOffset的值获取到消息体,然后进行消费。
commitlog文件每个文件的大小默认1G ,commitlog的文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824,以此类推,第三个文件名字为00000000002147483648,起始偏移量为2147483648 ,消息存储的时候会顺序写入文件,当文件满了,写入下一个文件。
理想状态下一个消费者对应一个队列,如果消费者数量多于队列数量,那么多余的消费者消费不到消息。因此在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。
在集群消费(Clustering)模式下每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给Consumer Group2 消费。 消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再消费这条消息。
RocketMQ和kafka一样,消息消费之后并不会立即删除消息,而是通过删除策略删除消息
二、集群原理
同步刷盘和异步刷盘
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种
写磁盘方式:
异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入
优点:性能高
缺点:Master宕机,磁盘损坏的情况下,会丢失少量的消息, 导致MQ的消息状态和生产者/消费者的消息状态不一致
同步刷盘方式:在返回应用写成功状态前,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,给应用返回消息写成功的状态。
优点:可以保持MQ的消息状态和生产者/消费者的消息状态一致
缺点:性能比异步的低
配置方式在broker.conf中配置
ASYNC_FLUSH 异步刷盘
SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
同步复制和异步复制
如果一个broker组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制方式。
同步复制方式:等Master和Slave均写成功后才反馈给客户端写成功状态
优点:如果Master出故障,Slave上有全部的备份数据,容易恢复,消费者仍可以从Slave消费, 消息不丢失
缺点:增大数据写入延迟,降低系统吞吐量,性能比异步复制模式略低,大约低10%左右,发送单个Master的响应时间会略高
异步复制方式:只要Master写成功即可反馈给客户端写成功状态
优点:系统拥有较低的延迟和较高的吞吐量. Master宕机之后,消费者仍可以从Slave消费,此过程对应用透明,不需要人工干预,性能同多个Master模式几乎一样
缺点:如果Master出了故障,有些数据因为没有被写入Slave,而丢失少量消息。
配置方式在broker.conf中配置
brokerRole参数
ASYNC_MASTER 同步
SYNC_MASTER 异步
SLAVE 从节点
集群原理
nameServer:多个Namesrv实例组成集群,但相互独立,没有信息交换。nameserver类似ZK和nacos等注册中心的功能。broker在启动时会将自己的ip和端口号注册到每一个nameserver中,然后与nameserver建立长连接。nameserver每隔30秒会发送一个心跳包,告诉broker自己还存活。而nameServer 定时器每隔10s的时间检测 故障Broker ,如果发生故障Broker 会直接剔除。生产者投递消息时会从nameserver中获取到broker的地址列表,然后进行消息投递。如果生产者在获取到服务列表之后,恰好当前broker宕机,那么生产者默认会有3次重试,如果依然失败,则重新从nameserver获取broker列表,进行消息投递。
主从broker如何保证消息消费一致性
在/data/rocketmq/store-a/config/consumerOffset.json文件中有如下结构
{ "offsetTable":{ "%RETRY%test-group@test-group":{0:0 }, "%RETRY%order-consumer@order-consumer":{0:0 }, "xiaojie-test@test-group":{0:4,1:3,2:4,3:28 } } }
在"xiaojie-test@test-group":{0:4,1:3,2:4,3:28}
其中 xiaojie-test为topic名称
test-group为消费组的名称
0:4 表示queueId 为0, consumeroffset为4,也就是说队列id为0的消息消费到偏移量为4的位置。
当master节点消费消息时,主节点会将自己commitLog和consumerOffset.json文件异步的同步到salve节点上。
当主节点宕机之后,从节点不能支持写操作,但是可以执行读的操作。但此时主节点的consumerOffset.json中consumeroffset值滞后于主节点,当主节点恢复之后,如何消费呢?答案是主节点恢复之后,会首先同步从节点的consumerOffset.json文件,然后再进行消费。
三、RocktMQ顺序消费
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
但正如上图所示,消费者是采用多线程的方式消费的,此时即使投递消息时的队列一致,也不能保证消费的时候就严格按照顺序消费。
官网顺序消费demo
package org.apache.rocketmq.example.order2; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; /** * Producer,发送顺序消息 */ public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); String[] tags = new String[]{"TagA", "TagC", "TagD"}; // 订单列表 List<OrderStep> orderList = new Producer().buildOrders(); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateStr = sdf.format(date); for (int i = 0; i < 10; i++) { // 加个时间前缀 String body = dateStr + " Hello RocketMQ " + orderList.get(i); Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long id = (Long) arg; //根据订单id选择发送queue long index = id % mqs.size(); return mqs.get((int) index); } }, orderList.get(i).getOrderId());//订单id System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s", sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), body)); } producer.shutdown(); } /** * 订单的步骤 */ private static class OrderStep { private long orderId; private String desc; public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}'; } } /** * 生成模拟订单数据 */ private List<OrderStep> buildOrders() { List<OrderStep> orderList = new ArrayList<OrderStep>(); OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("推送"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("完成"); orderList.add(orderDemo); return orderList; } }
消费者代码
package org.apache.rocketmq.example.order2; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; /** * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交) */ public class ConsumerInOrder { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); consumer.setNamesrvAddr("127.0.0.1:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for (MessageExt msg : msgs) { // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序 System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody())); } try { //模拟业务逻辑处理中... TimeUnit.SECONDS.sleep(random.nextInt(10)); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
Springboot整合顺序消费
package com.xiaojie.rocket.rocket.producer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; /** * @author xiaojie * @version 1.0 * @description: springboot顺序生产者 * @date 2021/11/23 22:59 */ @Component @Slf4j public class OrderProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void orderSend() { String msg = "这是测试顺序发送消息的内容-------insert"; String msg1 = "这是测试顺序发送消息的内容-------update"; String msg2 = "这是测试顺序发送消息的内容-------delete"; String orderId = UUID.randomUUID().toString(); SendResult sendResult1 = rocketMQTemplate.syncSendOrderly("test-orderly", msg, orderId); log.info(">>>>>>>>>>>>>>>result1{}", sendResult1); SendResult sendResult2 = rocketMQTemplate.syncSendOrderly("test-orderly", msg1, orderId); log.info(">>>>>>>>>>>>>>>result2{}", sendResult2); SendResult sendResult3 = rocketMQTemplate.syncSendOrderly("test-orderly", msg2, orderId); log.info(">>>>>>>>>>>>>>>result3{}", sendResult2); } }
package com.xiaojie.rocket.rocket.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.util.Random; /** * @author xiaojie * @version 1.0 * @description: 顺序消费者 * @date 2021/11/23 23:18 */ @Component @RocketMQMessageListener(topic = "test-orderly", consumerGroup = "orderly-1", consumeMode = ConsumeMode.ORDERLY) @Slf4j public class OrderConsumer implements RocketMQListener { @Override public void onMessage(Object message) { try { Random r = new Random(100); int i = r.nextInt(500); Thread.sleep(i); } catch (Exception e) { } log.info("消费者监听到消息:<msg:{}>", message); } }