RocketMQ结合实际场景顺序消费,它是如何保证顺序消费的?

简介: RocketMQ结合实际场景顺序消费,它是如何保证顺序消费的?

一、实战场景

用户更新钱包金额

->用户向钱包中转入100元,短信通知用户A目前剩余金额100元

->用户下单商品消费50元,短信通知用户A目前剩余金额50元。


普通消息:不顺序发送余额短信,则用户可能存在先收到余额50元,再收到余额100元的信息,带来不好的用户体验。

代码环节

为了更加体现消息的顺序性差异,我们在一次调用中循环发送10次

发送普通消息

publicBooleanupdateUser(UserUpdateReqDTOuserUpdateReqDTO) {
StringuserTopic=rocketMqConfig.getSyncUserTopic();
IntStream.range(0, 10).forEach(i->{
MessageWrappermessageSend=MessageWrapper.builder()
                    .keys(userTopic).message("用户向钱包中转入100元,短信通知用户目前剩余金额100元:"+i)
                    .timestamp(System.currentTimeMillis()).build();
MessageWrappermessageSend1=MessageWrapper.builder()
                    .keys(userTopic).message("用户下单商品消费50元,短信通知用户目前剩余金额50元:"+i)
                    .timestamp(System.currentTimeMillis()).build();
rocketMQTemplate.syncSend(userTopic, messageSend);
rocketMQTemplate.syncSend(userTopic, messageSend1);
        });
returnBoolean.TRUE;
    }

发送顺序消息

publicBooleanupdateUser(UserUpdateReqDTOuserUpdateReqDTO) {
StringuserTopic=rocketMqConfig.getSyncUserTopic();
IntStream.range(0, 10).forEach(i->{
MessageWrappermessageSend=MessageWrapper.builder()
                    .keys(userTopic).message("用户向钱包中转入100元,短信通知用户目前剩余金额100元:"+i)
                    .timestamp(System.currentTimeMillis()).build();
MessageWrappermessageSend1=MessageWrapper.builder()
                    .keys(userTopic).message("用户下单商品消费50元,短信通知用户目前剩余金额50元:"+i)
                    .timestamp(System.currentTimeMillis()).build();
rocketMQTemplate.syncSendOrderly(userTopic, messageSend, "11111");
rocketMQTemplate.syncSendOrderly(userTopic, messageSend1, "11111");
        });
returnBoolean.TRUE;
    }

消费者

@Service@RocketMQMessageListener(topic="${rocketmq.sync.user-topic}", consumerGroup="user_consumer", selectorExpression="*", consumeMode=ConsumeMode.ORDERLY)
@Slf4jpublicclasssyncUserConsumerimplementsRocketMQListener<MessageWrapper> {
@OverridepublicvoidonMessage(MessageWrappermes) {
log.info("user consumer message : {}", JSON.toJSONString(mes));
    }
}

发送普通消息结果

二、发送顺序消息流程

投递消息队列策略

Hash策略

publicMessageQueueselect(List<MessageQueue>mqs, Messagemsg, Objectarg) {
intvalue=arg.hashCode() %mqs.size();
if (value<0) {
value=Math.abs(value);
        }
returnmqs.get(value);
    }

在顺序消息中,我们使用Hash策略,将同一个HashKey分配到同一个队列中。

// 查询主题下消息队列列表

List<MessageQueue>messageQueueList=this.mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
// 获取指定队列StringuserTopic=NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
userMessage.setTopic(userTopic);
mq=mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));


三、保证顺序消费的机制

  1. 根据不同的消息监听器初始化消费消息线程池、定时线程池、扫描过期消息清除线程池。
if (this.getMessageListenerInner() instanceofMessageListenerOrderly) {
this.consumeOrderly=true;
// 顺序消息模式,不初始化扫描过期消息清除线程池this.consumeMessageService=newConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
}
  1. 启动顺序消息消费者服务。
this.consumeMessageService.start();
  1. 默认每隔20s执行一次锁定分配给自己的消息消费队列。
publicvoidstart() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(newRunnable() {
@Overridepublicvoidrun() {
try {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
                } catch (Throwablee) {
log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
                }
            }
        }, 1000*1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
    }
}
publicfinalstaticlongREBALANCE_LOCK_INTERVAL=Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
  1. 消息队列负载

集群模式下,同一个主题内的消费者组内,消费者们共同承担订阅消息队列的消费。

为了保证消息的顺序性,我们必须保证同一个消息队列在同一时刻只能被消费者组内一个消费者消费。

获取到消息队列之后向Broker发起锁定该消息队列的请求。

updateProcessQueueTableInRebalance逻辑

主要目的是为了将消息队列上锁,并且创建该消息队列的拉取任务。

  1. 向Broker发起锁定该消息队列的请求。
if (isOrder&&!this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
  1. 拉取消费位置。
longnextOffset=-1L;
try {
nextOffset=this.computePullFromWhereWithException(mq);
} catch (Exceptione) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
  1. 加锁成功则创建该消息队列的拉取任务,否则等待其他消费者释放该消息队列的锁。
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequestpullRequest=newPullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed=true;
  1. 消息拉取

如果消息处理队列没有被上锁,则延后一会儿延迟3s将pullRequest对象放入拉取拉取任务中。

消息消费

  1. 提交消费请求,消息提交到内部的线程池。

// 提交消费请求,消息提交到内部的线程池

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
2.ConsumeMessageOrderlyService执行方法。publicvoidsubmitConsumeRequest(
finalList<MessageExt>msgs,
finalProcessQueueprocessQueue,
finalMessageQueuemessageQueue,
finalbooleandispathToConsume) {
if (dispathToConsume) {
ConsumeRequestconsumeRequest=newConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
    }
}
  1. 提交消费任务核心逻辑

入口:ConsumeMessageService#ConsumeRequest#run()

3.1 如果消息队列已经下线,则跳出本次消费。

if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}

3.2 根据前面得到的消费队列,获取对象并且申请一个锁

目的是保证同一时刻,消费队列只会被一个线程池中的一个线程消费。

finalObjectobjLock=messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
//...}

3.3 进入核心逻辑处理

集群模式:前提条件是消息队列上锁成功且锁未过期。

(this.processQueue.isLocked() &&!this.processQueue.isLockExpired())

当消费市场大于MAX_TIME_CONSUME_CONTINUOUSLY设置值,则跳出本次任务,交给线程池其他线程处理。

longinterval=System.currentTimeMillis() -beginTime;
if (interval>MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}

获取消息默认每次拉取一条信息,在之前我们已经循环读取消息list,存入msgTreeMap

现在从msgTreeMap中获取数据,如果数据为空则continueConsume设为false,跳出当前任务。

finalintconsumeBatchSize=ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt>msgs=this.processQueue.takeMessages(consumeBatchSize);
向ConsumeMessageContext对象填充数据,执行消费的钩子函数。ConsumeMessageContextconsumeMessageContext=null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext=newConsumeMessageContext();
consumeMessageContext        .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context typeconsumeMessageContext.setProps(newHashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}

申请消费锁

this.processQueue.getConsumeLock().lock();

执行消费注册的消息消费监听器业务逻辑,返回 ConsumeOrderlyStatus 结果。

status=messageListener.consumeMessage(
Collections.unmodifiableList(msgs), context);

如果一切正常则返回 ConsumeOrderlyStatus.SUCCESS 值

// 执行commit提交消息消费进度caseSUCCESS:commitOffset=consumeRequest.getProcessQueue().commit();
// 读取旧消息进度,并更新返回Longoffset=this.consumingMsgOrderlyTreeMap.lastKey();
msgCount.addAndGet(0-this.consumingMsgOrderlyTreeMap.size());
for (MessageExtmsg : this.consumingMsgOrderlyTreeMap.values()) {
msgSize.addAndGet(0-msg.getBody().length);
}
this.consumingMsgOrderlyTreeMap.clear();
if (offset!=null) {
returnoffset+1;
}


如果消息进度偏移量大于0且消费队列没有停止,则更新消息消费进度。

if (commitOffset>=0&&!consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}


消息队列重试失败:如果重试达到最大次数重试次数并且向Broker服务器发送ACK消息返回成功,将消息存入DLQ队列,被认定消息消费成功,继续执行后面的消息。

/*** Max re-consume times. * In concurrently mode, -1 means 16;* In orderly mode, -1 means Integer.MAX_VALUE.** If messages are re-consumed more than {@link #maxReconsumeTimes} before success.*/privateintmaxReconsumeTimes=-1;

需要注意的是,顺序消息下,默认重试次数是Integer.MAX_VALUE,会一直进行重试,造成消息的阻塞。

image.png

总结:

为了保证消息的顺序性,我们必须保证同一个消息队列在同一时刻只能被消费者组内一个消费者消费,
从负载均衡方面,向Broker发起锁定该消息队列的请求,上锁成功则新建一个拉取任务PullRequest,
从消息消费方面,批量拉取消息成功后,进行提交消费请求,消息提交到内部的线程池,为了保证消息的顺序性,
我们必须为消费队列上锁,来保证同一时刻消费队列只会被线程池中的一个线程消费。

四、消息消费时保持顺序性

上面的通过源码的阅读,我们知道消费失败是有重试机制,默认重试 16 次,重试的次数达到最大之后,将消息存入DLQ队列,即被认定消息消费成功,这里就会中断重试消息与下一跳消息的顺序性。

例:发送消息顺序为 消息A -> 消息B ->消息C

因为消息B进行最大次数的重试后依然没有成功,消息存入了DLQ队列中,

最终我们的消息顺序变成了 消息A ->消息B,破坏了我们的顺序性。

解决方案:在消费消息前,增加一些前置条件,查询同一个订单号下,上一个消息是否被成功消费或者存入DLQ队列中,可以引入消息辅助表,来进行记录。

五、如何提高顺序消费的消费速度?

根据上面的源码,我们了解到为了满足顺序消费,所以对消费队列进行了加锁,

所以消费端的并发度并不取决消费端线程池的大小,而是取决于分给给消费者的队列数量。

解决方案:提高消费者的队列数量。

六、扩容需要注意什么?

顺序消息在消费消息时会锁定消息消费队列,在分配到消息队列时,能从该队列拉取消息还需要在 Broker 端申请该消费队列的锁。

在进行横向扩容的时候会进行重新负载,为了保证消息能够进入同一个队列,就需要保证在扩容的时候队列中没有滞留的消息。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 存储 数据库
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
88524 9
|
10月前
|
消息中间件 弹性计算 Java
使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践
使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践
1199 5
|
4月前
|
消息中间件 存储 运维
|
4月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
1月前
|
消息中间件 固态存储 RocketMQ
RocketMQ消息堆积常见场景与处理方案
文章分析了在使用RocketMQ时消息堆积的常见场景,如消费者注册失败或消费速度慢于生产速度,并提供了相应的处理方案,包括提高消费并行度、批量消费、跳过非重要消息以及优化消费代码业务逻辑等。
|
2月前
|
消息中间件 存储 RocketMQ
MetaQ/RocketMQ 原理问题之在解耦场景中,消息队列工作的问题如何解决
MetaQ/RocketMQ 原理问题之在解耦场景中,消息队列工作的问题如何解决
|
4月前
|
消息中间件 SQL 容灾
深度剖析 RocketMQ 5.0,消息进阶:如何支撑复杂业务消息场景?
本文主要学习 RocketMQ 的一致性特性,一致性对于交易、金融都是刚需。从大规模复杂业务出发,学习 RocketMQ 的 SQL 订阅、定时消息等特性。再从高可用的角度来看,这里更多的是大型公司对于高阶可用性的要求,如同城容灾、异地多活等。
108497 287
|
3月前
|
消息中间件 存储 运维
RocketMQ与Kafka深度对比:特性与适用场景解析
RocketMQ与Kafka深度对比:特性与适用场景解析
|
3月前
|
消息中间件 Serverless Windows
消息队列 MQ产品使用合集之MQTT协议是否可以应用于社交软件的系统通知场景
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
4月前
|
消息中间件 Cloud Native 物联网
深度剖析 RocketMQ 5.0,消息基础:RocketMQ 在业务消息场景的基础优势是什么?
本文主要介绍业务消息的应用解耦场景,具体解耦什么? RocketMQ 在业务消息场景的基础特性。业界那么多消息队列能实现应用解耦,RocketMQ 在基础特性上有哪些增强?
125436 2
深度剖析 RocketMQ 5.0,消息基础:RocketMQ 在业务消息场景的基础优势是什么?