一、实战场景
用户更新钱包金额
->用户向钱包中转入100元,短信通知用户A目前剩余金额100元
->用户下单商品消费50元,短信通知用户A目前剩余金额50元。
普通消息:不顺序发送余额短信,则用户可能存在先收到余额50元,再收到余额100元的信息,带来不好的用户体验。
代码环节
为了更加体现消息的顺序性差异,我们在一次调用中循环发送10次
发送普通消息
publicBooleanupdateUser(UserUpdateReqDTOuserUpdateReqDTO) { StringuserTopic=rocketMqConfig.getSyncUserTopic(); IntStream.range(0, 10).forEach(i->{ MessageWrappermessageSend=MessageWrapper.builder() .keys(userTopic).message("用户向钱包中转入100元,短信通知用户目前剩余金额100元:"+i) .timestamp(System.currentTimeMillis()).build(); MessageWrappermessageSend1=MessageWrapper.builder() .keys(userTopic).message("用户下单商品消费50元,短信通知用户目前剩余金额50元:"+i) .timestamp(System.currentTimeMillis()).build(); rocketMQTemplate.syncSend(userTopic, messageSend); rocketMQTemplate.syncSend(userTopic, messageSend1); }); returnBoolean.TRUE; }
发送顺序消息
publicBooleanupdateUser(UserUpdateReqDTOuserUpdateReqDTO) { StringuserTopic=rocketMqConfig.getSyncUserTopic(); IntStream.range(0, 10).forEach(i->{ MessageWrappermessageSend=MessageWrapper.builder() .keys(userTopic).message("用户向钱包中转入100元,短信通知用户目前剩余金额100元:"+i) .timestamp(System.currentTimeMillis()).build(); MessageWrappermessageSend1=MessageWrapper.builder() .keys(userTopic).message("用户下单商品消费50元,短信通知用户目前剩余金额50元:"+i) .timestamp(System.currentTimeMillis()).build(); rocketMQTemplate.syncSendOrderly(userTopic, messageSend, "11111"); rocketMQTemplate.syncSendOrderly(userTopic, messageSend1, "11111"); }); returnBoolean.TRUE; }
消费者
topic="${rocketmq.sync.user-topic}", consumerGroup="user_consumer", selectorExpression="*", consumeMode=ConsumeMode.ORDERLY) (publicclasssyncUserConsumerimplementsRocketMQListener<MessageWrapper> { publicvoidonMessage(MessageWrappermes) { log.info("user consumer message : {}", JSON.toJSONString(mes)); } }
发送普通消息结果
二、发送顺序消息流程
投递消息队列策略
Hash策略
publicMessageQueueselect(List<MessageQueue>mqs, Messagemsg, Objectarg) { intvalue=arg.hashCode() %mqs.size(); if (value<0) { value=Math.abs(value); } returnmqs.get(value); }
在顺序消息中,我们使用Hash策略,将同一个HashKey分配到同一个队列中。
// 查询主题下消息队列列表
List<MessageQueue>messageQueueList=this.mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); // 获取指定队列StringuserTopic=NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); userMessage.setTopic(userTopic); mq=mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
三、保证顺序消费的机制
- 根据不同的消息监听器初始化消费消息线程池、定时线程池、扫描过期消息清除线程池。
if (this.getMessageListenerInner() instanceofMessageListenerOrderly) { this.consumeOrderly=true; // 顺序消息模式,不初始化扫描过期消息清除线程池this.consumeMessageService=newConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); }
- 启动顺序消息消费者服务。
this.consumeMessageService.start();
- 默认每隔20s执行一次锁定分配给自己的消息消费队列。
publicvoidstart() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(newRunnable() { publicvoidrun() { try { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } catch (Throwablee) { log.error("scheduleAtFixedRate lockMQPeriodically exception", e); } } }, 1000*1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } } publicfinalstaticlongREBALANCE_LOCK_INTERVAL=Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
- 消息队列负载
集群模式下,同一个主题内的消费者组内,消费者们共同承担订阅消息队列的消费。
为了保证消息的顺序性,我们必须保证同一个消息队列在同一时刻只能被消费者组内一个消费者消费。
获取到消息队列之后向Broker发起锁定该消息队列的请求。
updateProcessQueueTableInRebalance逻辑
主要目的是为了将消息队列上锁,并且创建该消息队列的拉取任务。
- 向Broker发起锁定该消息队列的请求。
if (isOrder&&!this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; }
- 拉取消费位置。
longnextOffset=-1L; try { nextOffset=this.computePullFromWhereWithException(mq); } catch (Exceptione) { log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq); continue; }
- 加锁成功则创建该消息队列的拉取任务,否则等待其他消费者释放该消息队列的锁。
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequestpullRequest=newPullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed=true;
- 消息拉取
如果消息处理队列没有被上锁,则延后一会儿延迟3s将pullRequest对象放入拉取拉取任务中。
消息消费
- 提交消费请求,消息提交到内部的线程池。
// 提交消费请求,消息提交到内部的线程池
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); 2.ConsumeMessageOrderlyService执行方法。publicvoidsubmitConsumeRequest( finalList<MessageExt>msgs, finalProcessQueueprocessQueue, finalMessageQueuemessageQueue, finalbooleandispathToConsume) { if (dispathToConsume) { ConsumeRequestconsumeRequest=newConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } }
- 提交消费任务核心逻辑
入口:ConsumeMessageService#ConsumeRequest#run()
3.1 如果消息队列已经下线,则跳出本次消费。
if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; }
3.2 根据前面得到的消费队列,获取对象并且申请一个锁
目的是保证同一时刻,消费队列只会被一个线程池中的一个线程消费。
finalObjectobjLock=messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { //...}
3.3 进入核心逻辑处理
集群模式:前提条件是消息队列上锁成功且锁未过期。
(this.processQueue.isLocked() &&!this.processQueue.isLockExpired())
当消费市场大于MAX_TIME_CONSUME_CONTINUOUSLY设置值,则跳出本次任务,交给线程池其他线程处理。
longinterval=System.currentTimeMillis() -beginTime; if (interval>MAX_TIME_CONSUME_CONTINUOUSLY) { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; }
获取消息默认每次拉取一条信息,在之前我们已经循环读取消息list,存入msgTreeMap
现在从msgTreeMap中获取数据,如果数据为空则continueConsume设为false,跳出当前任务。
finalintconsumeBatchSize=ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); List<MessageExt>msgs=this.processQueue.takeMessages(consumeBatchSize); 向ConsumeMessageContext对象填充数据,执行消费的钩子函数。ConsumeMessageContextconsumeMessageContext=null; if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext=newConsumeMessageContext(); consumeMessageContext .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); // init the consume context typeconsumeMessageContext.setProps(newHashMap<String, String>()); ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); }
申请消费锁
this.processQueue.getConsumeLock().lock();
执行消费注册的消息消费监听器业务逻辑,返回 ConsumeOrderlyStatus 结果。
status=messageListener.consumeMessage( Collections.unmodifiableList(msgs), context);
如果一切正常则返回 ConsumeOrderlyStatus.SUCCESS 值
// 执行commit提交消息消费进度caseSUCCESS:commitOffset=consumeRequest.getProcessQueue().commit(); // 读取旧消息进度,并更新返回Longoffset=this.consumingMsgOrderlyTreeMap.lastKey(); msgCount.addAndGet(0-this.consumingMsgOrderlyTreeMap.size()); for (MessageExtmsg : this.consumingMsgOrderlyTreeMap.values()) { msgSize.addAndGet(0-msg.getBody().length); } this.consumingMsgOrderlyTreeMap.clear(); if (offset!=null) { returnoffset+1; }
如果消息进度偏移量大于0且消费队列没有停止,则更新消息消费进度。
if (commitOffset>=0&&!consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false); }
消息队列重试失败:如果重试达到最大次数重试次数并且向Broker服务器发送ACK消息返回成功,将消息存入DLQ队列,被认定消息消费成功,继续执行后面的消息。
/*** Max re-consume times. * In concurrently mode, -1 means 16;* In orderly mode, -1 means Integer.MAX_VALUE.** If messages are re-consumed more than {@link #maxReconsumeTimes} before success.*/privateintmaxReconsumeTimes=-1;
需要注意的是,顺序消息下,默认重试次数是Integer.MAX_VALUE,会一直进行重试,造成消息的阻塞。
总结:
为了保证消息的顺序性,我们必须保证同一个消息队列在同一时刻只能被消费者组内一个消费者消费,
从负载均衡方面,向Broker发起锁定该消息队列的请求,上锁成功则新建一个拉取任务PullRequest,
从消息消费方面,批量拉取消息成功后,进行提交消费请求,消息提交到内部的线程池,为了保证消息的顺序性,
我们必须为消费队列上锁,来保证同一时刻消费队列只会被线程池中的一个线程消费。
四、消息消费时保持顺序性
上面的通过源码的阅读,我们知道消费失败是有重试机制,默认重试 16 次,重试的次数达到最大之后,将消息存入DLQ队列,即被认定消息消费成功,这里就会中断重试消息与下一跳消息的顺序性。
例:发送消息顺序为 消息A -> 消息B ->消息C
因为消息B进行最大次数的重试后依然没有成功,消息存入了DLQ队列中,
最终我们的消息顺序变成了 消息A ->消息B,破坏了我们的顺序性。
解决方案:在消费消息前,增加一些前置条件,查询同一个订单号下,上一个消息是否被成功消费或者存入DLQ队列中,可以引入消息辅助表,来进行记录。
五、如何提高顺序消费的消费速度?
根据上面的源码,我们了解到为了满足顺序消费,所以对消费队列进行了加锁,
所以消费端的并发度并不取决消费端线程池的大小,而是取决于分给给消费者的队列数量。
解决方案:提高消费者的队列数量。
六、扩容需要注意什么?
顺序消息在消费消息时会锁定消息消费队列,在分配到消息队列时,能从该队列拉取消息还需要在 Broker 端申请该消费队列的锁。
在进行横向扩容的时候会进行重新负载,为了保证消息能够进入同一个队列,就需要保证在扩容的时候队列中没有滞留的消息。