概述
简单说明一下图中箭头含义,从 Broker 开始,Broker Master1 和 Broker Slave1 是主从结构,它们之间会进行数据同步,即 Date Sync。同时每个 Broker 与NameServer集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer中。
Producer 与 NameServer集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。Producer 只能将消息发送到 Broker master.
Consumer 与 NameServer集群中的其中一个节点(随机选择)建立长连接,同时和提供 Topic 服务的 Master 和 Slave建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息
1.重要概念
NameServer: 提供轻量级的服务发现和路由。 每个 NameServer记录完整的路由信息,提供等效的读写服务,并支持快速存储扩展。
Broker: 消息中转角色,通过提供轻量级的 Topic 和 Queue 机制来处理消息存储,同时支持推(push)和拉(pull)模式以及主从结构的容错机制。
Producer:生产者,产生消息的实例,拥有相同 Producer Group 的 Producer 组成一个集群。
Consumer:消费者,接收消息进行消费的实例,拥有相同 Consumer Group 的Consumer组成消费组
ProducerGroup
可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个Producer对象
producerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,如果Producer中途意外宕机,Broker
会主动回调Producer Group内的任意一台机器来确认事务状态。Producer在启动时,会选择一个namesrv相连,通过topic关系找到
broker,并和存有topic的所有master broker相连,也就是说,消息只会发到master的broker上去。
ConsumerGroup
可以是多台机器,也可以是多个进程,或者是一个进程的多个Consumer对象每个Consumer Group会分别将该Topic的消息消费一遍;
在每一 个Consumer Group内,各Consumer通过负载均衡的方式消费该Topic的消息
将一个Consumer Group对应业务系统中的一个独立的业务模块,是一个比较值得推荐的ConsuerGroup划分方法。
BrokerGroup
和Topic之间的关系是多对多的关系
一个Topic由多个Broker Group提供服务即《RocketMQ用户指南》中提到的多Master,或多Master多Slave模式。
一个Topic由一个Broker Group提供服务即《RocketMQ用户指南》中提到的单Master模式(包含Slave或不包含Slave)。
2.消息存储
(1)消息主体以及元数据都存储在CommitLog当中
(2)Consume Queue相当于kafka中的partition,是一个逻辑队列(也可以理解为数据字典),存储了这个Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。
(3)每次读取消息队列先读取consumerQueue,然后再通过consumerQueue去commitLog中拿到消息主体。
存储特点
RocketMQ的设计理念很大程度借鉴了kafka,所以有必要介绍下kafka的存储结构设计:
和RocketMQ类似,每个Topic有多个partition(queue),kafka的每个partition都是一个独立的物理文件,消息直接从里面读写。根据之前阿里中间件团队的测试,一旦kafka中Topic的partitoin数量过多,队列文件会过多,会给磁盘的IO读写造成很大的压力,造成tps迅速下降。kafka吞吐受topic数量的影响特别 明显。虽然topic比较小的时候,RocketMQ吞吐较小,但是基本非常稳定
所以RocketMQ进行了上述这样设计,consumerQueue中只存储很少的数据,消息主体都是通过CommitLog来进行读写。
(图中的目录树中,一个0目录就是一个MapedFileQueue,一个commitLog目录也是一个MapedFileQueue,右侧的000000000就是一个MapedFile。)
MapedFile:每个MapedFile对应的就是一个物理二进制文件了,在代码中负责文件读写的就是MapedByteBuffer和fileChannel。相当于对pageCache文件的封装。
介绍一下各个关键对象的作用:
DefaultMessageStore:这是存储模块里面最重要的一个类,包含了很多对存储文件的操作API,其他模块对消息实体的操作都是通过DefaultMessageStore进行操作。
commitLog:commitLog是所有物理消息实体的存放文件,这篇文章的架构图里可以看得到。其中commitLog持有了MapedFileQueue。
consumeQueue:consumeQueue就对应了相对的每个topic下的一个逻辑队列(rocketMQ中叫queue,kafka的概念里叫partition), 它是一个逻辑队列!存储了消息在commitLog中的offSet。
indexFile:存储具体消息索引的文件,以一个类似hash桶的数据结构进行索引维护。
MapedFileQueue:这个对象包含一个MapedFileList,维护了多个mapedFile,升序存储。一个MapedFileQueue针对的就是一个目录下的所有二进制存储文件。理论上无线增长,定期删除过期文件。
为什么要这样设计?
没有一种方案是银弹,那么RocketMQ这样处理有什么 优缺点 ?
优点:
队列轻量化,单个队列数据量非常少。对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致IO WAIT增高。
缺点:
写虽然完全是顺序写,但是读却变成了完全的随机读。
读一条消息,会先读ConsumeQueue,再读CommitLog,增加了开销。
要保证CommitLog与ConsumeQueue完全的一致,增加了编程的复杂度。
什么时候清理物理消息文件
消息存储在CommitLog之后,的确是会被清理的,但是这个清理只会在以下任一条件成立才会批量删除消息文件(CommitLog):
消息文件过期(默认3天),且到达清理时点(默认是凌晨4点),删除过期文件。
消息文件过期(默认3天),且磁盘空间达到了水位线(默认75%),删除过期文件。
磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。
注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。RocketMQ官方建议Linux下文件系统改为Ext4,对于文件删除操作,相比Ext3有非常明显的提升
ConsumeQueue的消息处理
生产者发送消息只是把消息主体存储到了物理文件commitLog中,但是并没有把消息处理到consumeQueue文件中,那么到底是哪里存入的?
任务处理一般都分为两种:
一种是同步,把消息主体存入到commitLog的同时把消息存入ConsumeQueue,RocketMQ的早期版本就是这样处理的。
另一种是异步处理,起一个线程,不停的轮询,将当前的ConsumeQueue中的offSet和commitLog中的offSet进行对比,将多出来的offSet进行解析,然后put到consumeQueue中的MapedFile中。
问题:为什么要改同步为异步处理?应该是为了增加发送消息的吞吐量。
3底层实现
3.1 MappedByteBuffer
•RocketMQ中的文件读写主要就是通过MappedByteBuffer进行操作,来进行文件映射。利用了nio中的FileChannel模型,可以直接将物理文件映射到缓冲区,提高读写速度。
3.2 page cache
•通俗的说:pageCache是系统读写磁盘时为了提高性能将部分文件缓存到内存中,下面是详细解释:
•page cache:这里所提及到的page cache,是linux中vfs虚拟文件系统层的cache层,一般pageCache默认是4K大小,它被操作系统的内存管理模块所管理,文件被映射到内存,一般都是被mmap()函数映射上去的。
•mmap()函数会返回一个指针,指向逻辑地址空间中的逻辑地址,逻辑地址通过MMU映射到page cache上。
4.部署模式
4.1多 master 多 slave 异步复制模式(实时性)
在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。
Master节点可读可写,但是 slave 只能读不能写,类似于 mysql的主备模式。
优点: 在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。
缺点:使用异步复制的同步方式有可能会有消息丢失的问题。
4.2多 master 多 slave 同步双写模式
同多 master 多 slave 异步复制模式类似,区别在于 master 和 slave 之间的数据同步方式。
优点:同步双写的同步模式能保证数据不丢失。
缺点:发送单个消息 RT 会略长,性能相比异步复制低10%左右。
5.RocketMq官方源码结构
rocketmq-broker broker可单独启动
rocketmq-client 提供发送、接受消息的客户端API。rocketmq-common
rocketmq-console-ng rocketmq的监控平台单独启动
rocketmq-example rocketmq用法示例
rocketmq-filtersrv 消息过滤器Server,需要注意的是,要实现这种过滤,需要上传代码到MQ!
rocketmq-namesrv namesrv可单独启动
rocketmq-remoting 基于Netty4的client/server + fastjson序列化 + 自定义二进制协议
rocketmq-srvutil
rocketmq-store 消息、索引存储等
rocketmq-tools 命令行工具
在Eclipse中启动RocketMQ的方法
RocketMq的监控平台
ConsumeFromWhere
消费者从那个位置消费,分别为:
1 CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
2 CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
3 CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费
以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始
重复消费问题
Exactly Only Once
(1). 发送消息阶段,不允许发送重复的消息。
(2). 消费消息阶段,不允许消费重复的消息。
只有以上两个条件都满足情况下,才能认为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。所以RocketMQ为了追求高性能,并不保证此特性,要求在业务上进行去重,也就是说消费消息要做到幂等性。RocketMQ虽然不能严格保证不重复,但是正常情况下很少会出现重复发送、消费情况,只有网络异常,Consumer启停等异常情况下会出现消息重复
6.RocketMq的四种消费类型
6.1.普通消费
不追求时间顺序,只要把生产出来的事件全部消费完就可以。这种可以用并行的方式处理,效率高很多:
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
6.2. 顺序消费
严格按照时间消费的模式,这种模式需要用串行方式,生产者生产的时候,这时候生产者需要往特定的队列里有序push
实现了MessageListenerOrderly表示一个队列只会被一个线程取到,第二个线程无法访问这个队列
consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { return ConsumeOrderlyStatus.CONSUME_SUCCESS; } }
6.3.事务消费
可以解决如下类似场景的问题:A用户和B用户的账户体系不在同一台服务器上面,现在A用户向B用户转账100元,为了提高执行效率,就采用消息队列的方式实现异步处理。大致逻辑是A用户扣款100元,然后发送消息给消息队列,B用户的程序从队列中获取转账信息并向B用户上账100元
通过消息的异步事务,可以保证本地事务和消息发送同时执行成功或失败,从而保证了数据的最终一致性
rocketmq在发送事物消息时,会先发送一个prepared消息,返回消息所在地址。然后再执行本地事物,根据事物执行结果去更新prepared消息状态。消息接收者只能消费消息集群中消息状态为已提交的消息。
发送prepare消息复用了普通消息发送,只是给消息增加
rocketmq在发送事物消息时,会先发送一个prepared消息,返回消息所在地址。然后再执行本地事物,根据事物执行结果去更新prepared消息状态。消息接收者只能消费消息集群中消息状态为已提交的消息。
import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.client.producer.TransactionMQProducer; import com.alibaba.rocketmq.common.message.Message; /** * 发送事务消息例子 * 回滚需要修改消息状态,一定涉及到根据 Key 去查找 Message 的劢作。 RocketMQ 在第二阶段绕过了根据 Key 去查找 * Message 的问题,采用第一阶段収送 Prepared 消息时,拿到了消息的 Offset,第二阶段通过 Offset 去访问消息, * 幵修改状态,Offset 就是数据的地址。 * RocketMQ 返种实现事务方式,没有通过 KV 存储做,而是通过 Offset 方式,存在一个显著缺陷,即通过 Offset * 更改数据,会令系统的脏页过多,需要特别关注 */ public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("ExampleTransaction"); // 事务回查最小并发数 producer.setCheckThreadPoolMinSize(2); // 事务回查最大并发数 producer.setCheckThreadPoolMaxSize(2); // 队列数 producer.setCheckRequestHoldMax(2000); producer.setTransactionCheckListener(transactionCheckListener); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" }; TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); for (int i = 0; i < 100; i++) { try { Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes()); SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); System.out.println(sendResult); Thread.sleep(10); } catch (MQClientException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } }
第一阶段producer执行本地事务
import java.util.concurrent.atomic.AtomicInteger; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.common.message.Message; /** * 执行本地事务 * * 自定义执行本地业务逻辑的类,该类实现LocalTransactionExecuter接口的 * executeLocalTransactionBranch(final Message msg, final Object arg)方法, * 在该方法中执行本地业务逻辑,根据业务逻辑执行情况反馈事务消息的状态(commit或者rollback) * * 1) 一阶段,向broker发送一条prepared的消息,返回消息的offset即消息地址commitLog中消息偏移 * 量。Prepared状态消息不被 消费发送消息ok,执行本地事物分支, 本地事物方法需要实现rocketmq * 的回调接口LocalTransactionExecuter,处理本地事物逻辑返回处理的事物状态 * LocalTransactionState(本处代码) * 2) 二阶段,处理完本地事物中业务得到事物状态, 根据offset查找到commitLog中的prepared消息,设 * 置消息状态commitType或者rollbackType, 让后将信息添加到commitLog中, 其实二阶段生成了 * 两条消息(下处代码) */ public class TransactionExecuterImpl implements LocalTransactionExecuter { private AtomicInteger transactionIndex = new AtomicInteger(1); @Override public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { int value = transactionIndex.getAndIncrement(); if (value == 0) { throw new RuntimeException("Could not find db"); } else if ((value % 5) == 0) { return LocalTransactionState.ROLLBACK_MESSAGE; } else if ((value % 4) == 0) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.UNKNOW; } }
第二阶段:补救一直处于PREPARED状态的消息
import java.util.concurrent.atomic.AtomicInteger; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.common.message.MessageExt; /** * 长期未回复的prepare的消息,broker回查producer * 自定义业务类,该类实现TransactionCheckListener接口的checkLocalTransactionState(final MessageExt msg)方法, * 该方法是对于本地业务执行完毕之后发送事务消息状态时失败导致Broker端的事务消息一直处于PREPARED 状态的补救(小概率事件), * 在Broker启动TransactionStateService时,为每个tranStateTable文件创建了一个定时任务,该定时 任务是每隔1分钟遍历一遍文件中的所有消息, * Broker对于长期处于PREPARED状态的事务消息发起回查请求时,Producer在收到回查事务消息状态请求之 后, * 调用该checkLocalTransactionState方法,该方法的请求参数是之前发送的事务消息, * 在该方法中根据此前发送的事务消息来检查该消息对应的本地业务逻辑处理的情况(自己写代码判断之前该 消息对应的本地事务是成功还是失败), * 根据处理情况告知Broker该事务消息的最终状态commit或者rollback * */ public class TransactionCheckListenerImpl implements TransactionCheckListener { private AtomicInteger transactionIndex = new AtomicInteger(0); @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) { System.out.println("server checking TrMsg " + msg.toString()); int value = transactionIndex.getAndIncrement(); if ((value % 6) == 0) { throw new RuntimeException("Could not find db"); } else if ((value % 5) == 0) { return LocalTransactionState.ROLLBACK_MESSAGE; } else if ((value % 4) == 0) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.UNKNOW; } }