RocketMQ 实战(六) - 最佳实践

简介: RocketMQ 实战(六) - 最佳实践

1 Producer

  1. 一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置

只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags在broker做消息过滤

message.setTags("TagA"); 
  1. 如有可靠性需要,消息发送成功或者失败,要打印消息日志(sendresult和key信 息)
  2. 如果相同性质的消息量大,使用批量消息,可以提升性能
  3. 建议消息大小不超过512KB
  4. send(msg)会阻塞,如果有性能要求,可以使用异步的方式: send(msg, callback)
  5. 如果在一个JVM中,有多个生产者进行大数据处理,建议:
  • 少数生产者使用异步发送方式(3~5个就够了)
  • 通过setInstanceName方法,给每个生产者设置一个实例名
  1. send消息方法,只要不抛异常,就代表发送成功 , 但是发送成功会有多个状态, 在sendStatus类里定义


14.png

● SEND_ OK : 消息发送成功

● FLUSH_ DISK_ TIMEOUT: 消息发送成功, 但是服务器刷盘超时,消息已经进入

服务器队列,只有此时服务器宕机,消息才会丢失

● FLUSH SLAVE_ TIMEOUT: 消息发送成功,但是服务器同步到Slave时超时,

消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

● SLAVE_ NOT_ AVAILABLE: 消息发送成功, 但是此时slave不可用, 消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

● 如果状态是FLUSH_ DISK_ TIMEOUT或FLUSH SLAVE_ _TIMEOUT,并且Broker正好关闭

此时,可以丢弃这条消息,或者重发。但建议最好重发,由消费端去重


● Producer向Broker发送请求会等待响应,但如果达到最大等待时间,未得到响应,则客户端将抛出RemotingTimeoutException

● 默认等待时间是3秒,如果使用send(msg, timeout),则可以自己设定超时时间,

但超时时间不能设置太小,应为Borker需要一些时间来刷新磁盘或与从属设备同步

● 如果该值超过syncFlushTimeout,则该值可能影响不大,因为Broker可能会在超时之前返回FLUSH_ SLAVE_ TIMEOUT或FLUSH_ SLAVE_ TIMEOUT的响应

image.png

8.对于消息不可丢失应用,务必要有消息重发机制

Producer的send方法本身支持内部重试:

● 至多重试3次

● 如果发送失败,则轮转到下一-个Broker

● 这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s

所以,如果本身向broker发送消息产生超时异常,就不会再做重试


以上策略仍然不能保证消息一定发送成功,为保证消息一定成功,建议将消息存储到db,由后台线程定时重试,保证消息一定到达Broker


2 Consumer

每个消息在业务层面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题

服务器会为每个消息创建索引(哈希索引),应用可以通过topic, key来查询这条消息内容,以及消息被谁消费

由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突

String orderld =“1250689524981";
message.setKeys(orderld);

console客户端使GUI


16.png

mvn clean package -Dmaven.test.skip=true

18.png

--server.port=8081 --rocketmq.config. namesrvAddr=192.168.1.17:9876

image.png


http://localhost:8081/#/


20.png


2.1 消费者组和订阅

不同的消费群体可以独立地消费同样的主题,并且每个消费者都有自己的消费偏移量(offsets) 。

确保同一组中的每个消费者订阅相同的主题

2.2 消息监听器(MessageListener)

2.2.1 顺序 (Orderly)

消费者将锁定每个MessageQueue,以确保每个消息被一个按顺序使用。

这将导致性能损失

如果关心消息的顺序时,它就很有用了。不建议抛出异常,可以返回

ConsumeOrderlyStatus. SUSPEND_ CURRENT_ QUEUE_ A_ MOMENT代替

2.2.2 消费状态(Consume Status)

21.png

对于MessageListenerConcurrently,可以返回RECONSUME_ LATER告诉消费者,当前不能消费它并且希望以后重新消费。然后可以继续使用其他消息


22.png

对于MessageListenerOrderly, 如果关心顺序,就不能跳过消息,可以返回SUSPEND_ CURRENT_ QUEUE_ A_ MOMENT来告诉消费者等待片刻。


阻塞(Blocking)

不建议阻塞Listener,因为它会阻塞线程池,最终可能会停止消费程序

线程数

DefaultMQPushConsumer

消费者使用一个ThreadPoolExecutor来处理内部的消费,因此可以通过设

23.png

更改它

从何处开始消费


● 当建立一个新的Consumer Group时,需要决定是否需要消费Broker中已经

存在的历史消息。

● CONSUME_ FROM LAST_ OFFSET将忽略历史消息,并消费此后生成的任何

内容。

● CONSUME_ FROM_ FIRST_ OFFSET将消耗Broker中存在的所有消息。还可以使用CONSUME_ FROM_ TIMESTAMP 来消费在指定的时间戳之后生成的消息。

重复(幂等性)

RocketMQ无法避免消息重复,如果业务对重复消费非常敏感,务必在业务层面做去重:

● 通过记录消息唯一键进行去重

● 使用业务层面的状态机制去重


3 最佳实践之 NameServer

在Apache RocketMQ中,NameServer用于协调分布式系统的每个组件,主要通过管理主题路由信息来实现协调。


管理由两部分组成:

  1. Brokers定期更新保存在每个名称服务器中的元数据
  2. 名称服务器是为客户端提供最新的路由信息服务的,包括生产者、消费者和命令行客户端。


因此,在启动brokers和clients之前,我们需要告诉他们如何通过给他们提

供的一个名称服务器地址列表来访问名称服务器。

在Apache RocketMQ中,可以用四种方式完成。


3.1 编程方式

  • 对于brokers,我们可以在broker的配置文件中指定
namesrvAddr=name-server-ip1:port;name-server-ip2:port
  • 对于生产者和消费者,我们可以给他们提供姓名服务器地址列表如下:


DefaultMQProducer producer = new DefaultMQProducer(" please_ rename_ unique_ group name");
producer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port"); 
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(" please_ rename_ unique_ _group_ name");
consumer.setNamesrvAddr(" name-server1-ip:port;name-server2-ip:port");
  • 如果从shell中使用管理命令行,也可以这样指定:
sh mqadmin command-name -n name-server-ip1:port;name-server-ip2:port -X OTHER-OPTION
  • 一个简单的例子,在NameServer节点上查询集群信息:
sh mqadmin -n localhost:9876 clusterList
  • 如果将管理工具集成到自己的项目中,可以这样
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(" please_ rename_ _unique_ group_ _name");
defaultMQAdminExt.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");

3.2 Java参数

NameServer的地址列表也可以通过java参数rocketmq.namesrv.addr

在启动之前指定

3.3 环境变量

可以设置NAMESRV_ ADDR环境变量。如果设置了,Broker和clients将检 查并使用其值

3.4 HTTP端点(HTTP Endpoint)

如果没有使用前面提到的方法指定NameServer地址列表,Apache RocketMQ将每2分钟发送一次HTTP请求,以获取和更新NameServer地址列表,初始延迟10秒。

默认情况下,访问的HTTP地址是:

http://jmenv.tbsite.net:8080/rocketmq/nsaddr

通过Java参数rocketmq.namesrv.domain,可以修改jmenv.tbsite.net

通过Java参数rocketmq.namesrv.domain.subgroup,可以修改nsaddr

3.5 优先级

编程方式> Java参数>环境变量> HTTP方式

4 JVM与Linux内核配置

4.1 JVM配置

推荐使用JDK 1.8版本,使用服务器编译器和8g堆。

设置相同的Xms和Xmx值,以防止JVM动态调整堆大小以获得更好的性能。

简单的JVM配置如下所示:

-server -Xms8g -Xmx8g -Xmn4g

如果不关心Broker的启动时间,可以预先触摸Java堆,以确保在JVM初始化期间分配页是更好的选择。

-XX:+AlwaysPreTouch
  • 禁用偏置锁定可能会减少JVM暂停:
-XX: UseBiasedL ocking
  • 对于垃圾回收,建议使用G1收集器:
-XX:+UseG1GC -XX:G1HeapRegionSize= 16m -XX:G lReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30

这些GC选项看起来有点激进,但事实证明它在生产环境中具有良好的性能。

-XX:MaxGCPauseMillis不要设置太小的值,否则JVM将使用一个小的新生代,这将导致非常频繁的新生代GC。

  • 推荐使用滚动GC日志文件:
-XX:+UseGCLogFileRotation -Xx:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m
  • 如果写入GC文件会增加代理的延迟,请将重定向GC日志文件考虑在内存文件系统中:
-Xloggc:/dev/shm/mq_ gc. _%p.log

4.2 Linux内核配置

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
消息中间件 大数据 关系型数据库
RocketMQ实战—3.基于RocketMQ升级订单系统架构
本文主要介绍了基于MQ实现订单系统核心流程的异步化改造、基于MQ实现订单系统和第三方系统的解耦、基于MQ实现将订单数据同步给大数据团队、秒杀系统的技术难点以及秒杀商详页的架构设计和基于MQ实现秒杀系统的异步化架构。
494 64
RocketMQ实战—3.基于RocketMQ升级订单系统架构
|
1月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
588 1
|
6月前
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
6月前
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
6月前
|
消息中间件 存储 NoSQL
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
|
4月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
3226 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
6月前
|
消息中间件 Java 中间件
RocketMQ实战—2.RocketMQ集群生产部署
本文主要介绍了大纲什么是消息中间件、消息中间件的技术选型、RocketMQ的架构原理和使用方式、消息中间件路由中心的架构原理、Broker的主从架构原理、高可用的消息中间件生产部署架构、部署一个小规模的RocketMQ集群进行压测、如何对RocketMQ集群进行可视化的监控和管理、进行OS内核参数和JVM参数的调整、如何对小规模RocketMQ集群进行压测、消息中间件集群生产部署规划梳理。
RocketMQ实战—2.RocketMQ集群生产部署
|
6月前
|
消息中间件 NoSQL 大数据
RocketMQ实战—5.消息重复+乱序+延迟的处理
本文围绕RocketMQ的使用与优化展开,分析了优惠券重复发放的原因及解决方案。首先,通过案例说明了优惠券系统因消息重复、数据库宕机或消费失败等原因导致重复发券的问题,并提出引入幂等性机制(如业务判断法、Redis状态判断法)来保证数据唯一性。其次,探讨了死信队列在处理消费失败时的作用,以及如何通过重试和死信队列解决消息处理异常。接着,分析了订单库同步中消息乱序的原因,提出了基于顺序消息机制的代码实现方案,确保消息按序处理。此外,介绍了利用Tag和属性过滤数据提升效率的方法,以及延迟消息机制优化定时退款扫描的功能。最后,总结了RocketMQ生产实践中的经验.
RocketMQ实战—5.消息重复+乱序+延迟的处理
|
6月前
|
消息中间件 Java 测试技术
RocketMQ实战—7.生产集群部署和生产参数
本文详细介绍了RocketMQ生产集群的部署与调优过程,包括集群规划、环境搭建、参数配置和优化策略。
RocketMQ实战—7.生产集群部署和生产参数
|
6月前
|
消息中间件 NoSQL Java
RocketMQ实战—10.营销系统代码优化
本文主要介绍了如何对营销系统的四大促销场景的代码进行优化,包括:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。