RocketMQ生产者核心详解
核心参数详解
- ProducerGroup:组名在一个应用里面是唯一的。
- CreateTopicKey:实际生产中不会使此参数进行生产者创建Topic。
- defaultTopicQueueNums:默认大小为4,一个topic下默认挂载的是四个队列。
- sendMsgTimeout:单位ms,消息发送的超时时间。
- compressMsgBodyOverHowmuch:默认压缩字节4096,自动压缩机制,当消息超过4096就会压缩。
- retryTimesWhenSendFailed:同步重发次数。
- retryAnotherBrokerWhenNotStoreOK:默认false,没有存储成功的话,是否可以向其它Broker存储。
- maxMessageSize:默认128k,最大消息长度。
主从同步机制解析
我们之前已经了解了,当一条消息发送到Master节点时候,会将消息同步到Slave节点。但是怎么做的呢?
首先主从同步需要同步哪些内容?
第一点就是元数据信息同步,第二点就是消息数据的同步。
元数据信息:是指topic config配置信息,还有consumer的offset(消费端的进度信息)。
需要注意的是,并不是即时同步,而是底层代码启动定时任务去同步的。
同步信息:数据内容+元数据信息。
数据内容:commitlog实际消息的存储信息,是实时同步的,并且底层使用的是Socket而不是Netty。
元数据信息:slave和master基于commitlog里面的数据不断对比,然后不断的同步。
元数据丢失是可以接受的,可以恢复。如果元数据在slave和master里面不一致,可以做恢复,可以调整offset位置或者重启consumer。需要注意的是:commitlog里面的数据丢失了,无法恢复。
主从同步相关源码
如果Broker角色为从服务器,会通过定时任务调用syncAll。
我们点击syncAll()方法。从主服务器定时同步topic配置信息、消息消费偏移量、延迟队列偏移量、消费组订阅信息。
commitlog数据同步代码
HAConnection主要用于消息读写操作。里面包含两个内部类:ReadSocketService、WriteSocketService。
Master节点:
- AcceptSocketService:接收Slave节点连接。
- HAConnection
- ReadSocketService:读来自Slave节点的数据。
- WriteSocketService:写往到Slave节点的数据。
Slave节点:
- HAService
- HAClient:对Master节点连接、读写数据。
通信协议:Master节点与Slave节点通信协议很简单,只有如下两条。
对象 | 用途 | 第几位 | 字段 | 数据类型 | 字节数 | 说明 |
Slave=>Master | 上报CommitLog已经同步到的物理位置 | 0 | maxPhyOffset | Long | 8 | CommitLog最大物理位置 |
Master=>Slave | 传输新的CommitLog数据 | 0 | fromPhyOffset | Long | 8 | CommitLog开始物理位置 |
1 | size | Int | 4 | 传输的数据长度 | ||
2 | body | Bytes | size | 传输的数据 |
知道了每个类的大概用途,下面我们看一下代码。
在HAService中我们可以看到ConnectMaster(),用于连接Master的方法。
使用NIO函数:目的很明显,就是为了更加的高效。
private boolean connectMaster() throws ClosedChannelException { if (null == socketChannel) { String addr = this.masterAddress.get(); if (addr != null) { SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr); if (socketAddress != null) { this.socketChannel = RemotingUtil.connect(socketAddress); if (this.socketChannel != null) { this.socketChannel.register(this.selector, SelectionKey.OP_READ); } } } this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); this.lastWriteTimestamp = System.currentTimeMillis(); } return this.socketChannel != null; }
我们可以点开RemotingUtil.connect(socketAddress),然后继续跟进去
public static SocketChannel connect(SocketAddress remote) { return connect(remote, 1000 * 5); //连接远程地址,超时时间5000ms }
public static SocketChannel connect(SocketAddress remote, final int timeoutMillis) { SocketChannel sc = null; try { sc = SocketChannel.open(); //打开channel sc.configureBlocking(true); // 设置同步阻塞 sc.socket().setSoLinger(false, -1); //设置关闭socket的延迟事件,当线程执行到socket的close()方法时候,进入阻塞状态,知道底层数据发送完成,或者超过了延迟时间,才从close()方法返回 sc.socket().setTcpNoDelay(true);//禁止使用Nagle算法,使用小数据即时传输 sc.socket().setReceiveBufferSize(1024 * 64);//设置缓冲区大小 sc.socket().setSendBufferSize(1024 * 64);//设置发送缓冲区大小 sc.socket().connect(remote, timeoutMillis);//连接 sc.configureBlocking(false); //不清楚为什么设置回去了? return sc; } catch (Exception e) { if (sc != null) { try { sc.close(); } catch (IOException e1) { e1.printStackTrace(); } } } return null; }
接下来我们看另一个重要的方法:dispatchReadRequest()。
读取Master传输的CommitLog数据,并返回是否OK。
如果读取到数据,就写入CommitLog。
如果发生异常:
- Master传输的数据开始位置Offset不等于Slave的CommitLog数据最大Offset。
- 上报到Master进度失败。
从dispatchReadRequest( )方法里可以看到,Slave使用dispatchPostion变量来指定每次处理的位置,其目的是为了应对粘包问题。每次提取数据的body部分,追加到CommitLog,当添加成功一次就马上向Master上报此次的进度。
private boolean dispatchReadRequest() { final int msgHeaderSize = 8 + 4; // phyoffset + size int readSocketPos = this.byteBufferRead.position(); while (true) { // begin -> 读取到请求数据 int diff = this.byteBufferRead.position() - this.dispatchPostion; if (diff >= msgHeaderSize) { // 读取MasterPhyOffset、BodySize,使用dispatchPostion的原因是:处理数据"粘包"导致数据读取不完整 long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion); int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8); // 获取slave节点上commitLog文件最大的offset位置 long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); if (slavePhyOffset != 0) { // 校验 Master传输来的数据offset 是否和 Slave的CommitLog数据最大offset 是否相同 if (slavePhyOffset != masterPhyOffset) { log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + slavePhyOffset + " MASTER: " + masterPhyOffset); return false; } } // 读取到消息 if (diff >= (msgHeaderSize + bodySize)) { // 写入CommitLog byte[] bodyData = new byte[bodySize]; this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize); this.byteBufferRead.get(bodyData); HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); // 设置处理到的位置 this.byteBufferRead.position(readSocketPos); this.dispatchPostion += msgHeaderSize + bodySize; // 上报到Master进度 if (!reportSlaveMaxOffsetPlus()) { return false; } //继续读数据 continue; } } // 空间写满,重新分配空间 if (!this.byteBufferRead.hasRemaining()) { this.reallocateByteBuffer(); } break; } return true; }
消息同步发送机制分析
消息的同步发送:Producer.send(msg)
同步发送消息核心实现:DefaultMQProducerImpl
消息的异步发送:Producer.send(msg,SendCallback sendCallback)
异步发送消息核心实现:DefaultMQProducerImpl
producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("消息发送结果:" + sendResult); } @Override public void onException(Throwable e) { System.out.println("消息发送失败:" + e); } });
我们可以看一下源码
最终调用sendDefaultImpl()
方法,在此方法中主要做了:
- 查找路由信息
- 使用故障容错组件选择消息队列。
private SendResult sendDefaultImpl( Message msg, // 发送的消息 final CommunicationMode communicationMode, // 网络通信的模式:同步、异步、单向 final SendCallback sendCallback, // 消息发送后的回调函数,主要用在异步发送 final long timeout // 超时时间 ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 检查消息发送客户端是否是在运行状态 this.makeSureStateOK(); // 检查消息,再一次检查 Validators.checkMessage(msg, this.defaultMQProducer); // 生成一个调用编号,用于下面打印日志,标记为同一次发送消息 final long invokeID = random.nextLong(); // 开始时间戳 long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // 获取topic路由信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // 只在有路由信息的时候,且路由信息正常(有消息队列) if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // 次数,同步=重试次数+1,异步=1, int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 当前为第几次发送 int times = 0; // 存储每次发送消息选择的Broker名称 String[] brokersSent = new String[timesTotal]; // 循环timesTotal次数进行发送,直到发送成功为止 for (; times < timesTotal; times++) { // 选择的broker String lastBrokerName = null == mq ? null : mq.getBrokerName(); //根据路由信息和Broker选择消息队列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; // 设置当前发送的broker brokersSent[times] = mq.getBrokerName(); try { // 开始时间 beginTimestampPrev = System.currentTimeMillis(); // 如果重试次数大于0,表明已经重试了 if (times > 0) { msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } // 已经用了的时间 long costTime = beginTimestampPrev - beginTimestampFirst; // 超时了,就不继续了,直接退出循环,也就是重试必须在设置的超时时间以内才重新发送 if (timeout < costTime) { callTimeout = true; break; } // 找到后消息队列和路由信息后处理 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); // 结束时间戳 endTimestamp = System.currentTimeMillis(); // 没有出现异常时的更新故障容错 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); // 根据不同的发送方式返回不同的结果 // 异步和单向直接返回null switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: // 如果返回的结果不是OK的话且能重试那么就重试,如果得到的结果不是 SEND_OK // 没有返回结果时,比如超时了,那么此时就直接进行重试 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { // 同步发送成功但存储有问题时候并且配置存储异常时重新发送开关时,进行重试 if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } // 如果不重试的话直接返回结果了 return sendResult; // 如果通信模式是其他,那么直接返回 default: break; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); // 更新故障容错 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { // 如果选择到的消息队列为空,那么直接退出循环 break; } } // if (sendResult != null) { return sendResult; } // 重试仍然失败 String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException(info, exception); // 如果超时了就抛出异常 if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } // 出现其他异常的情况 if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); } throw mqClientException; } validateNameServerSetting(); throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }
消息的返回状态
public enum SendStatus { SEND_OK, FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT, SLAVE_NOT_AVAILABLE, }
FLUSH_DISK_TIMEOUT
如果设置了 FlushDiskType=SYNC_FLUSH (默认是 ASYNC_FLUSH),并且 Broker 没有在 syncFlushTimeout (默认是 5 秒)设置的时间内完成刷盘,就会收到此状态码。
FLUSH_SLAVE_TIMEOUT
如果设置为 SYNC_MASTER,并且 slave Broker 没有在 syncFlushTimeout 设定时间内完成同步,就会收到此状态码。
SLAVE_NOT_AVAILABLE
如果设置为 SYNC_MASTER,并没有配置 slave Broker,就会收到此状态码。
SEND_OK
这个状态可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK。需要注意的是,SEND_OK 并不意味着可靠,如果想严格确保没有消息丢失,需要开启 SYNC_MASTER or SYNC_FLUSH。
如果收到了 FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT,意味着消息会丢失,有2个选择,一是无所谓,适用于消息不关紧要的场景,二是重发,但可能产生消息重复,这就需要consumer进行去重控制。
消息的延迟投递
延迟消息:消息到达Broker后,要在特定的时间后才会被Consumer消费。
目前只支持固定精度的定时消息。
MessageSoreConfig类中有messageDelayLevel属性。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; message.setDelayTimeLevel(1); // 也就是延迟1秒之后投递
消息的自定义投递规则
实现消息的自定义投递,我们需要在发送的时候去指定某一个队列。重写MessageQueueSelector的select方法。 SendResult sr = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer queueNumber = (Integer)arg; return mqs.get(queueNumber); } }, 2); //发送同一topic第二个队列里面 System.err.println(sr);
RocketMQ消费者核心详解
PushConsumer核心参数详解
- consumeFromWhere:消费者从那个位置开始消费。
- CONSUME_FROM_LAST_OFFSET: 第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费。
- CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费。
- CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费。
- allocateMessageQueueStrategy:默认AllocateMessageQueueAveragely,Rebalance(轮询)算法实现策略。
- subscription:订阅。
- offsetStore:消息进度存储,存储实际的偏移量,两种实现:分为本地和远程的存储。
- consumeThreadMin/consumeThreadMax:线程池的数量。
- consumeConcurrentlyMaxSpan/pullThresholdForQueue:单队列并行消费允许的最大跨度,默认2000;拉消息本地队列缓存消息最大数,默认1000。
- pullInterval:默认0,拉消息间隔,由于是长轮询,所以为0,但是如果应用为了流控,也可以设置大于0的值,单位毫秒。
- pullBatchSize: 默认32, 批量拉消息,一次最多拉多少条。
- consumeMessageBatchMaxSize: 默认1,批量消费,一次消费多少条消息。
PushConsumer消费模式-集群模式
RocketMQ有两种消费模式:Broadcasting广播模式,Clustering集群模式,默认的是集群消费模式。
Clustering集群模式(默认):
- 通过consumer.setMessageModel(MessageModel.CLUSTERING)进行设置。
- GroupName用于把多个Consumer组织到一起。
- 相同GroupName的Consumer只消费所订阅消息的一部分,即ConsumerGroup中的Consumer实例平均分摊消费topic的消息。
- 目的:达到天然的负载均衡机制。
- 消息的消费进度,即consumerOffset.json保存在broker上。
- 消息消费失败后,consumer会发回broker,broker根据消费失败次数设置不同的delayLevel进行重发。
- 相同topic不同的consumerGroup组成伪广播模式,可达到所有consumer都会收到消息。
PushConsumer消费模式-广播模式
- 通过consumer.setMessageModel(MessageModel.BROADCASTING)进行设置。
- 消息的消费进度保存在consumer的机器上。
- 同一个ConsumerGroup里的Consumer都消费订阅Topic的全部信息。
- 不同ConsumerGroup里的Consumer可以实现根据tags进行消费即:
consumer1.subscribe("test_model_topic","TagA"); consumer2.subscribe("test_model_topic","TagB");
- 消息消费失败后直接丢弃,不会发回broker进行重新投递。
- 由于所有consumer都需要收到消息,所以不存在负载均衡策略。
消息存储核心-Offset存储
Offset是消息消费进度的核心,指某个topic下的一条消息在某个MessageQueue里的位置,通过Offset可以进行消息的定位
Offset的存储实现分为远程文件类型和本地文件类型两种:集群模式下offset存在broker
中; 广播模式下offset存在consumer
中。
RocketMQ默认是集群消费模式Clustering,采用远程文件存储Offset,即存储在broker中本质是因为多消费模式,每个Consumer只消费所订阅主题的一部分,这种情况下就需要由Broker去控制Offset的值,使用RemoteBrokerOffsetStore来实现。
在广播模式下,由于每个Consumer都会收到消息且消费,那么各个Consumer之间没有任何干扰,都是独立线程消费,所以使用LocalFileOffsetStore,即把Offset存储到本地。
PushConsumer消费者长轮询模式
DefaultPushConsumer是使用长轮询模式进行实现的。
常见的数据同步方式有下面几种:
- push:producer发送消息后,broker马上把消息投递给consumer。这种方式好在实时性比较高,但是会增加broker的负载;而且消费端能力不同,如果push推送过快,消费端会出现很多问题。
- pull:producer发送消息后,broker什么也不做,等着consumer自己来读取。它的优点在于主动权在消费者端,可控性好;但是间隔时间不好设置,间隔太短浪费资源,间隔太长又会消费不及时。
- 长轮询机制:当consumer过来请求时,broker会保持当前连接一段时间 默认15s,如果这段时间内有消息到达,则立刻返回给consumer;15s没消息的话则返回空然后重新请求。这种方式的缺点就是服务端要保存consumer状态,客户端过多会一直占用资源。
consumer是长轮询拉消息,当consumer拉消息时,broker端如果没有新消息,broker会通过PullRequestHoldService服务hold住这个请求。
public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); // 检查是否有新的消息 this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info("{} service end", this.getServiceName()); }
RocketMQ消费者-PullConsumer使用
pull方式主要做了三件事:
- 获取MessageQueue并遍历
- 维护OffsetStore
- 根据不同的消息状态做不同的处理
DefaultMQPullConsumer,Pull模式简单样例
/** * @author 又坏又迷人 * 公众号: Java菜鸟程序员 * @date 2021/1/27 * @Description: */ public class Consumer { // Map<key,value> key为指定队列,value为这个队列拉取数据的最后位置 private static final Map<MessageQueue, Long> offsetTable = new HashMap<>(); public static final String NAME_SRV_ADDR = "192.168.3.160:9876;192.168.3.161"; public static void main(String[] args) { try { String group_name = "test_pull_producer_name"; DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name); consumer.setNamesrvAddr(NAME_SRV_ADDR); consumer.start(); //从topicTest这个主题去获取所有队列(默认会有4个队列) Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("test_pull_topic"); //遍历每一个队列进行数据拉取 for (MessageQueue messageQueue : messageQueues) { System.out.println("consumer from the queue:" + messageQueue); SINGLE_MQ: while (true) { try { //从queue中获取数据,从什么位置开始拉取数据,单次最多拉取32条数据 PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, null, getMessageQueueOffset(messageQueue), 32); System.out.println(pullResult); System.out.println(pullResult.getPullStatus()); putMessageQueueOffset(messageQueue, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: System.out.println("没有新的数据"); break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } } catch (MQClientException e) { e.printStackTrace(); } } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = offsetTable.get(mq); if (offset != null) { return offset; } return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { offsetTable.put(mq, offset); } }
RocketMQ Pull模式下提供的负载均衡样例(基于MQPullConsumerScheduleService)
/** * @author 又坏又迷人 * 公众号: Java菜鸟程序员 * @date 2021/1/26 * @Description: */ public class PullConsumerScheduleService { public static final String NAME_SRV_ADDR = "192.168.3.160:9876;192.168.3.161"; public static void main(String[] args) throws MQClientException { String group_name = "test_pull_consumer_name"; final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name); scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(NAME_SRV_ADDR); scheduleService.setMessageModel(MessageModel.CLUSTERING); scheduleService.registerPullTaskCallback("test_pull_topic", (mq, context) -> { MQPullConsumer consumer = context.getPullConsumer(); System.err.println("-------------- queueId: " + mq.getQueueId() + "-------------"); try { // 获取从哪里拉取 long offset = consumer.fetchConsumeOffset(mq, false); if (offset < 0) { offset = 0; } PullResult pullResult = consumer.pull(mq, "*", offset, 32); switch (pullResult.getPullStatus()) { case FOUND: List<MessageExt> list = pullResult.getMsgFoundList(); for (MessageExt msg : list) { //消费数据 System.out.println(new String(msg.getBody())); } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: case OFFSET_ILLEGAL: break; default: break; } consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset()); // 设置再过3000ms后重新拉取 context.setPullNextDelayTimeMillis(3000); } catch (Exception e) { e.printStackTrace(); } }); scheduleService.start(); } }
核心原理解析
RocketMQ消息的存储结构
如下图所示:
- 消息主体以及元数据都存储在CommitLog文件当中,完全顺序写,随机读。
- Consume Queue相当于kafka中的partition,是一个逻辑队列,存储了这个Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。
- 每次读取消息队列先读取consumerQueue,然后再通过consumerQueue去commitLog中拿到消息主体。
同步刷盘和异步刷盘
RocketMQ消息存储:内存+磁盘存储,两种刷盘方式。
RocketMQ和Redis等其他存储系统类似,提供了同步和异步两种刷盘方式,同步刷盘方式能够保证数据被写入硬盘,做到真正的持久化,但是也会让系统的写入速度受制于磁盘的IO速度;而异步刷盘方式在将数据写入缓冲之后就返回,提供了系统的IO速度,却存在系统发生故障时未来得及写入硬盘的数据丢失的风险。
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。
RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种:
- 异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入。
- 同步刷盘方式:在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。
同步刷盘还是异步刷盘,是通过Broker配置文件里的flushDiskType参数设置的,这个参数被设置成SYNC_FLUSH、ASYNC_FLUSH中的一个。
同步复制和异步复制
同一组broker中有Master和Slave,消息需要从Master复制到Slave上,那么有同步和异步两种复制方式。
同步复制:是等Master和Slave均写成功后才反馈给客户端写成功状态。
异步复制:是只要Master写成功即可反馈给客户端写成功状态。
两种复制方式对比:
- 异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写入Slave,有可能会丢失。
- 同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
配置方式:
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、SYNC_MASTER、SLAVE三个值中的一个。
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式,尤其是SYNC_FLUSH方式,由于频繁的触发写磁盘动作,会明显降低性能。
通常情况下,应该把Master和Slave设置成ASYNC_FLUSH的刷盘方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台机器出故障,仍然可以保证数据不丢。
高可用机制
当Master节点繁忙,可自动切换到Slave节点读取信息。
当Master节点down机或不可用时,rocketmq基于raft 协议支持主从切换,引入了多副本机制,即DLedger,支持主从切换,即当一个复制组内的主节点宕机后,会在该复制组内触发重新选主,选主完成后即可继续提供消息写功能。
NameServer协调服务
Namesrv的功能,就相当于RPC或微服务中的注册中心。对于MQ而言,broker启动,将自身创建的topic等信息注册到Namesrv上。consumer和producer需要配置namesrv的地址,启动后,首先和namesrv建立长连接,并获取相应的topic信息(比如,哪些broker有topic路由信息),然后再和broker建立长连接。Namesrv本身无状态,可集群横向扩展部署。所有的注册信息,都保存在namesrv的类似map内存数据结构中。
public class RouteInfoManager { private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; }