实战:RocketMQ削峰,这一篇就够了

简介: 云栖号资讯:【点击查看更多行业资讯】在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来! 前言MQ的主要特点为解耦、异步、削峰,该文章主要记录与分享个人在实际项目中的RocketMQ削峰用法,用于减少数据库压力的业务场景,其中RocketMQ的核心组件概念如下: Producer:生产.

云栖号资讯:【点击查看更多行业资讯
在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来!


前言

MQ的主要特点为解耦、异步、削峰,该文章主要记录与分享个人在实际项目中的RocketMQ削峰用法,用于减少数据库压力的业务场景,其中RocketMQ的核心组件概念如下:

  • Producer:生产发送消息
  • Broker:存储Producer发送过来的消息
  • Consumer:从Broker拉取消息并进行消费
  • NameServer:为Producer或Consumer路由到Broker

1

其中消费流程有以下几点是必须注意的:

  • RocketMQ的Consumer获取消息是通过向Broker发送拉取请求获取的,而不是由Broker发送Consumer接收的方式。
  • Consumer每次拉取消息时消息都会被均匀分发到消息队列再进行传输,所以RocketMQ中的很多参数都是针对队列而不是Topic的(这个是重点,顺便吐槽下源码的文档讲的真不清晰,很多都需要自己试错,但Dashboard做得很好),其中每个Broker消息队列(ConsumeQueue)的数量都可以通过RocketMQ DashBoard实时更改调整。

rocketmq-spring-boot-starter 用法简介

当开发中需要快速集成RocketMQ时可以考虑使用 rocketmq-spring-boot-starter 搭建RocketMQ的集成环境,但该框架并不完全具备RocketMQ所有的配置简化,如需批量消费消息便需要自定义一个DefaultMQPushConsumer bean去消费了。

个人在开发中常用的rocketmq-spring-boot-starter相关类:

RocketMQListener接口:消费者都需实现该接口的消费方法onMessage(msg)。

RocketMQPushConsumerLifecycleListener接口:当@RocketMQMessageListener中的配置不足以满足我们的需求时,可以实现该接口直接更改消费者类DefaultMQPushConsumer配置

@RocketMQMessageListener:被该注解标注并实现了接口RocketMQListener的bean为一个消费者并监听指定topic队列中的消息,该注解中包含消费者的一些常用配置(大部分按默认即可),一般只需更改consumerGroup(消费组)与topic。RocketMQMessageListener中的属性配置是可以使用Placeholder(占位符)从配置文件或配置中心获取的,如下图:

2

业务案例

有一个点赞业务,不限制用户的点赞数只需进行记录(产品需求,开发提议无效),当每个用户都进行x连击享受数量猛增的快感时如果数据库都需要进行x个点赞数据的插入,数据库毫无疑问会塞死导致崩溃。于是想到可以尝试下MQ削峰,比如每秒来了5000消息但数据库只能承受2000,那我消费时每次只拉取消费1600就好了,剩下的放在Broker堆积慢慢消费就好。由于之前的消息中心也在用RocketMQ,于是确认使用RocketMQ来进行削峰。

3

环境配置

文章例子环境:1NameServer + 2Broker + 1Consumer

添加maven依赖

5

application.yml配置

6

点赞接口

PraiseRecord(点赞记录):

7

MessageController(简单的测试接口):

@RequestMapping("/message")
public class MessageController {
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @PostMapping("/praise")
    public ServerResponse praise(@RequestBody PraiseRecordVO vo) {
        rocketMQTemplate.sendOneWay(RocketConstant.Topic.PRAISE_TOPIC, MessageBuilder.withPayload(vo).build());
        return ServerResponse.success();
    }

    // ......

}

由于用户可以连续点赞,所以考虑可以在点赞消息的处理上宽松一点(容许消息丢失)以追求更高的性能,因此选择使用sendOneyWay()进行消息发送。

RocketMQ的消息发送方式主要含syncSend()同步发送、asyncSend()异步发送、sendOneWay()三种方式,sendOneWay()也是异步发送,区别在于不需等待Broker返回确认,所以可能会存在信息丢失的状况,但吞吐量更高,具体需根据业务情况选用。性能:sendOneWay > asyncSend > syncSend

RocketMQTemplate的send()方法默认是同步(syncSend)的,更多可看源码实现。

PraiseListener:点赞消息消费者

@RocketMQMessageListener(topic = RocketConstant.Topic.PRAISE_TOPIC, consumerGroup = RocketConstant.ConsumerGroup.PRAISE_CONSUMER)
@Slf4j
public class PraiseListener implements RocketMQListener<PraiseRecordVO>, RocketMQPushConsumerLifecycleListener {
    @Resource
    private PraiseRecordService praiseRecordService;

    @Override
    public void onMessage(PraiseRecordVO vo) {
        praiseRecordService.insert(vo.copyProperties(PraiseRecord::new));
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        // 每次拉取的间隔,单位为毫秒
        consumer.setPullInterval(2000);
        // 设置每次从队列中拉取的消息数为16
        consumer.setPullBatchSize(16);
    }
}

单次pull消息的最大数目受broker存储的MessageStoreConfig.maxTransferCountOnMessageInMemory(默认为32)值限制,即若想要消费者从队列拉取的消息数大于32有效(pullBatchSize>32)则需更改Broker的启动参数maxTransferCountOnMessageInMemory值。在MQ削峰的配置参数里,以下几个DefaultMQPushConsumer的参数是需要注意一下的:

  • pullInterval:每次从Broker拉取消息的间隔,单位为毫秒
  • pullBatchSize:每次从Broker队列拉取到的消息数,该参数很容易让人误解,一开始我以为是每次拉取的消息总数,但测试过几次后确认了实质上是从每个队列的拉取数(源码上的注释文档真的很差,跟没有一样),即Consume每次拉取的消息总数如下:
  • EachPullTotal=所有Broker上的写队列数和(writeQueueNums=readQueueNums) * pullBatchSize

consumeMessageBatchMaxSize:每次消费(即将多条消息合并为List消费)的最大消息数目,默认值为1,rocketmq-spring-boot-starter 目前不支持批量消费(2.1.0版本)

在消费者开始消息消费时会先从各队列中拉取一条消息进行消费,消费成功后再以每次pullBatchSize的数目进行拉取。

PraiseListener中设置了每次拉取的间隔为2s,每次从队列拉取的消息数为16,在搭建了2master broker且broker上

writeQueueNums=readQueueNums=4的环境下每次拉取的消息理论数值为16 2 4 = 128,在第一次从各队列拉取1条消息(即共8条)后消费成功后会每次就会拉取最多128条消息进行消费,想验证下的可以把onMessage()的insert()改为log.info("1")然后统计单位秒内打印的日志数是否为128。

8

根据以上配置单Conumer情况下每2s理论消费为128,即每2秒数据库新增的点赞数据大概为128条左右,有20%偏差都在个人可接受范围内,然后对点赞接口进行简单压测1s 2000请求校验MQ效果,根据消费配置理论上需要16次拉取即需32s才能消费完,压测后查看数据库校验效果:

9

11
12

由上图可以看出除第一次2s和最后一次2s外数据库每2s的插入数据数和一般都在128附近波动,也用了34s(因第一次拉取数较少所以比理论多花费一次拉取)消费的偏差大小可能会受每次拉取数pullBatchSize、Broker上的消息队列数、网络波动等情况影响,但需要的目的已经达到了,我只想把单位时间内过多的数据库操作交给MQ做分隔成多个单位时间内的小批量操作,消息过多就堆积,当请求峰值过了后直到MQ堆积的消息消费完前数据库的插入数依旧会与峰值期的插入数相差不大,达到了MQ削峰填谷的效果。

上线了但消费效率预估失误如何动态更改消费效率 ?

当把拉取数pullBatchSize设置Broker的默认最大传输值32了,线上又不想重启Broker更改maxTransferCountOnMessageInMemory参数,如有2个Broker且queue都为4,那么拉取消费效率才为32 2 4 = 256,如果想要动态调整,可以从Broker数或Broker队列数下手,可以将Broker的writeQueueNums、readQueueNums增大,如都改为8,那么效率就成了32 2 8 = 512。需要注意的是更改完queues后必须去Dashboard的Topic下的CONSUMER MANAGER查看新增的队列上是否都有Consumer成功注册上去了,因为遇到了在测试与生产上使用rocketmq-spring-boot-starter @RocketMQListener标注消费者不会自动注册到新队列上的情况,但没排除是不是RocketMQ版本的原因(个人本地的版本比环境上的高了一个小版本0.0.1,本地没出现没消费者注册到新队列上的问题),而是使用了自定义DefaultMQPushConsumer bean(原生的方式都是没有问题的)的备用方案。当再启动新的消费者应用时CONSUMER MANAGER(下图)中就会出现 新Consumer数 * 各Broker队列数和的队列行。

13

如何使用RocketMQ批量消费 ?

虽然点赞业务使用MQ单条插入后TPS已经达到当前业务指标要求了,但考虑到如果后续要求在不添加机器数的情况下增加TPS,且数据量还没到分库分表的程度,个人就打算从批量消费下手,由一次插入一条点赞记录改为一次性插入多条(insertBatch)。当然能满足现有需求能不做肯定不做的,过度优化过分碍事,但想多点方案不会坏事。rocketmq-spring-boot-starter并没有提供批量消费的功能,所以要批量消费消息需要自定义DefaultMQPushConsumer并配置其consumeMessageBatchMaxSize属性。consumeMessageBatchMaxSize属性默认值为1,即每次只消费一条消息,需要注意的是该属性也会受pullBatchSize影响,如果consumeMessageBatchMaxSize为32但pullBatchSize只为12,那么每次批量消费的最大消息数也就只有12。

如下为个人测试批量消费Consumer的测试bean:

@Bean
public DefaultMQPushConsumer userMQPushConsumer() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ConsumerGroup.SPRING_BOOT_USER_CONSUMER);
    consumer.setNamesrvAddr(nameServer);
    consumer.subscribe(RocketConstant.Topic.SPRING_BOOT_USER_TOPIC, "*");

15

    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context)
            -> {
        List<UserInfo> userInfos = new ArrayList<>(msgs.size());
        Map<Integer, Integer> queueMsgMap = new HashMap<>(8);
        msgs.forEach(msg -> {
            userInfos.add(JSONObject.parseObject(msg.getBody(), UserInfo.class));
            queueMsgMap.compute(msg.getQueueId(), (key, val) -> val == null ? 1 : ++val);
        });
        log.info("userInfo size: {}, content: {}", userInfos.size(), userInfos);
        /*
          处理批量消息,如批量插入:userInfoMapper.insertBatch(userInfos);
         */
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();
    return consumer;
}

如果默认配置情况下log打印出的userInfo size恒为1,但由于设置了consumeMessageBatchMaxSize与pullBatchSize,且pullBatchSize较小,所以每次消费的消息数最大值为12,如下图:

14

【云栖号在线课堂】每天都有产品技术专家分享!
课程地址:https://yqh.aliyun.com/live

立即加入社群,与专家面对面,及时了解课程最新动态!
【云栖号在线课堂 社群】https://c.tb.cn/F3.Z8gvnK

原文发布时间:2020-04-23
本文作者:WilsonHe
本文来自:“掘金”,了解相关信息可以关注“掘金”

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
96 0
|
6月前
|
消息中间件 Java RocketMQ
RocketMQ实战教程之RocketMQ安装
这是一篇关于RocketMQ安装的实战教程,主要介绍了在CentOS系统上使用传统安装和Docker两种方式安装RocketMQ。首先,系统需要是64位,并且已经安装了JDK 1.8。传统安装包括下载安装包,解压并启动NameServer和Broker。Docker安装则涉及安装docker和docker-compose,然后通过docker-compose.yaml文件配置并启动服务。教程还提供了启动命令和解决问题的提示。
|
1月前
|
消息中间件 数据采集 中间件
RabbitMQ的使用—实战
RabbitMQ的使用—实战
|
6月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
2月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
154 0
|
4月前
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
162 14
|
6月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
88 0
|
6月前
|
消息中间件 存储 Apache
RocketMQ实战教程之常见概念和模型
Apache RocketMQ 实战教程介绍了其核心概念和模型。消息是基本的数据传输单元,主题是消息的分类容器,支持字节、数字和短划线命名,最长64个字符。消息类型包括普通、顺序、事务和定时/延时消息。消息队列是实际存储和传输消息的容器,是主题的分区。消费者分组是一组行为一致的消费者的逻辑集合,也有命名限制。此外,文档还提到了一些使用约束和建议,如主题和消费者组名的命名规则,消息大小限制,请求超时时间等。RocketMQ 提供了多种消息模型,包括发布/订阅模型,有助于理解和优化消息处理。
|
6月前
|
消息中间件 存储 Java
RocketMQ实战教程之NameServer与BrokerServer
这是一个关于RocketMQ实战教程的概要,主要讨论NameServer和BrokerServer的角色。NameServer负责管理所有BrokerServer,而BrokerServer存储和传输消息。生产者和消费者通过NameServer找到合适的Broker进行交互,不需要直接知道Broker的具体信息。工作流程包括生产者向NameServer查询后发送消息到Broker,以及消费者同样通过NameServer获取消息进行消费。这种设计类似于服务注册中心的概念,便于系统扩展和集群管理。
|
6月前
|
消息中间件 中间件 Java
RocketMQ实战教程之几种MQ优缺点以及选型
该文介绍了几种主流消息中间件,包括ActiveMQ、RabbitMQ、RocketMQ和Kafka。ActiveMQ和RabbitMQ是较老牌的选择,前者在中小企业中常见,后者因强大的并发能力和活跃社区而流行。RocketMQ是阿里巴巴的开源产品,适用于大规模分布式系统,尤其在数据可靠性方面进行了优化。Kafka最初设计用于大数据日志处理,强调高吞吐量。在选择MQ时,考虑因素包括性能、功能、开发语言、社区支持、学习难度、稳定性和集群功能。小型公司推荐使用RabbitMQ,而大型公司则可在RocketMQ和Kafka之间根据具体需求抉择。
下一篇
无影云桌面