概念
Apache RocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件。
支持Broker和Consumer端消息过滤,支持发布订阅模型和点对点,支持拉pull和推push两种消息模式,单一队列百万消息、亿级消息堆积,支持单master节点,多master节点,多master多slave节点,任意一点都是高可用,水平拓展,Producer、Consumer、队列都可以分布式,消息失败重试机制、支持特定level的定时消息,新版本底层采用Netty,4.3.x支持分布式事务,适合金融类业务,高可用性跟踪和审计功能。
Broker
服务器上部署的RocketMq进程一般称之为Broker,Broker会接收Producer的消息,持久化到本地,然后push给Consumer,通常使用集群部署。主从之间会有数据同步
NameServer
路由服务,类似与dubbo中的注册中心zk,它存储了Broker的路由信息,供Producer和Consumer使用,不然Producer怎么知道往哪个Broker发送消息。多个NameSever之间没有通信,每个NameSever都会保存所有路由信息。
Producer
生产者,即发送消息的一方,往Broker中写入数据。Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息。 再跟Topic涉及的所有Broker建立长连接,每隔30秒发一次心跳。在Broker端也会每10秒扫描一次当前注册的Producer,如果发现某个Producer超过2分钟都没有发心跳,则断开连接。Producer完全无状态,可集群部署。
Consumer
消费者,即消费消息的一方,从Broker中获取数据。Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息。 Consumer跟Broker是长连接,会每隔30秒发心跳信息到Broker。Broker端每10秒检查一次当前存活的Consumer,若发现某个Consumer 2分钟内没有心跳,就断开与该Consumer的连接,并且向该消费组的其他实例发送通知,触发该消费者集群的负载均衡。 Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
消费者分组(ConsumerGroup)
消费者分组是 Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。
和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。
在消费者分组中,统一定义以下消费行为,同一分组下的多个消费者将按照分组内统一的消费行为和负载均衡策略消费消息。
订阅关系:Apache RocketMQ 以消费者分组的粒度管理订阅关系,实现订阅关系的管理和追溯。具体信息,请参见订阅关系(Subscription)。
投递顺序性:Apache RocketMQ 的服务端将消息投递给消费者消费时,支持顺序投递和并发投递,投递方式在消费者分组中统一配置。具体信息,请参见顺序消息。
消费重试策略: 消费者消费消息失败时的重试策略,包括重试次数、死信队列设置等。具体信息
Topic
Topic翻译过来就是主题的意思,但它其实是个抽象概念,我们可以理解成数据集合,比如订单系统有一个Topic叫topic_order_info,这个Topic里面就是订单系统投递的订单信息,如果其他系统想要获取订单信息,就可以从这个Topic中获取。
MessageQueue
MessageQueue即消息队列,在创建Topic的时候会让我们指定MessageQueue的数量,简单来说就是指定Topic中的队列数量。 那么MessageQueue到底是什么呢?这个问题要和Topic、Broker一起来看,大家想一想Topic在Broker中是如何存储的?要知道Broker是集群部署的,如果我们有2个Broker,那Topic中的数据哪些存储在这个Broker,哪些存储在另一个Broker呢?所以RocketMq引入了MessageQueue的概念,本质上是一个数据分片机制。 比如一个Topic指定了4个MessageQueue,该Topic有1W个消息,那么这1W个消息会均匀分配在4个MessageQueue中(实际是根据分配策略),而这4个MessageQueue又是放在Broker上的,一个Broker上存储2个MessageQueue。
死信队列
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列RocketMQ会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明Consumer在正常情况下无法正确地消费该消息。此时,消息队列RocketMQ不会立刻将消息丢弃,而是将这条消息发送到该Consumer对应的特殊队列中。
消息队列RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
应用场景
- 削峰填谷:诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列RocketMQ可提供削峰填谷的服务来解决该问题。
- 异步解耦:交易系统作为淘宝和天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列RocketMQ可实现异步通信和应用解耦,确保主站业务的连续性。
- 顺序收发:细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出FIFO(First In First Out)原理类似,消息队列RocketMQ提供的顺序消息即保证消息FIFO。
- 分布式事务一致性:交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列RocketMQ的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。
- 大数据分析:数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列RocketMQ与流式计算引擎相结合,可以很方便的实现业务数据的实时分析。
- 分布式缓存同步:天猫双11大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因带宽瓶颈,限制了商品变更的访问流量,通过消息队列RocketMQ构建分布式缓存,实时通知商品数据的变化。
普通消息处理
如上所述,注册系统和邮件通知系统之间通过消息队列进行异步处理。注册系统将注册信息写入注册系统之后,发送一条注册成功的消息到消息队列RocketMQ,邮件通知系统订阅消息队列RocketMQ的注册消息,做相应的业务处理,发送注册成功或者失败的邮件。
流程说明如下:
- 注册系统发起注册。
- 注册系统向消息队列RocketMQ发送注册消息成功与否的消息。
2.1. 消息发送成功,进入3。
2.2. 消息发送失败,导致邮件通知系统未收到消息队列RocketMQ发送的注册成功与否的消息,而无法发送邮件,最终邮件通知系统和注册系统之间的状态数据不一致。 - 邮件通知系统收到消息队列RocketMQ的注册成功消息。
- 邮件通知系统发送注册成功邮件给用户。
在这样的情况下,虽然实现了系统间的解耦,上游系统不需要关心下游系统的业务处理结果;但是数据一致性不好处理,如何保证邮件通知系统状态与注册系统状态的最终一致。
事务消息处理
此时,需要利用消息队列RocketMQ所提供的事务消息来实现系统间的状态数据一致性
流程说明如下:
- 注册系统向消息队列RocketMQ发送半事务消息。
1.1. 半事务消息发送成功,进入2。
1.2. 半事务消息发送失败,注册系统不进行注册,流程结束。(最终注册系统与邮件通知系统数据一致) - 注册系统开始注册。
2.1. 注册成功,进入3.1。
2.2. 注册失败,进入3.2。 - 注册系统向消息队列RocketMQ发送半消息状态。
3.1. 提交半事务消息,产生注册成功消息,进入4。
3.2. 回滚半事务消息,未产生注册成功消息,流程结束。
说明 最终注册系统与邮件通知系统数据一致。 - 邮件通知系统接收消息队列RocketMQ的注册成功消息。
- 邮件通知系统发送注册成功邮件。(最终注册系统与邮件通知系统数据一致)
关于分布式事务消息的更多详细内容,请参见事务消息。
消息的顺序收发
消息队列RocketMQ顺序消息分为两种情况:
- 全局顺序:对于指定的一个Topic,所有消息将按照严格的先入先出(FIFO)的顺序,进行顺序发布和顺序消费。
- 分区顺序:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一个分区内的消息将按照严格的FIFO的顺序,进行顺序发布和顺序消费,可以保证一个消息被一个进程消费。
在注册场景中,可使用用户ID作为Sharding Key来进行分区,同一个分区下的新建、更新或删除注册信息的消息必须按照FIFO的顺序发布和消费。
削峰填谷
流量削峰也是消息队列RocketMQ的常用场景,一般在秒杀或团队抢购活动中使用广泛。
在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用和下游通知系统之间加入消息队列RocketMQ。
秒杀处理流程如下所述:
- 用户发起海量秒杀请求到秒杀业务处理系统。
- 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列RocketMQ。
- 下游的通知系统订阅消息队列RocketMQ的秒杀相关消息,再将秒杀成功的消息发送到相应用户。
- 用户收到秒杀成功的通知。
开发实例:
引入依赖:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.3</version> </dependency>
配置参数:
rocketmq.address=127.0.0.1:9876 rocketmq.producer.groupName=groupName rocketmq.producer.sendMsgTimeout=10000 rocketmq.producer.retryWhenSendFailed=3
实例代码:
1、生产者:
/** * 读取配置文件中设置的rocketmq相关属性,创建消息生产者 */ private DefaultMQProducer getRocketMqProducer(){ String mqAddress = PropertyUtil.getProperties("rocketmq.address"); String groupId = PropertyUtil.getProperties("rocketmq.producer.groupName"); String msgTimeout = PropertyUtil.getProperties("rocketmq.producer.sendMsgTimeout"); String retryWhenSendFailed = PropertyUtil.getProperties("rocketmq.producer.retryWhenSendFailed"); // 1 创建消息生产者,指定生成组名 DefaultMQProducer defaultMQProducer = new DefaultMQProducer(groupId); // 2 指定NameServer的地址 defaultMQProducer.setNamesrvAddr(mqAddress); // 3 设置消息超时时间 defaultMQProducer.setSendMsgTimeout(Integer.parseInt(msgTimeout)); // 4 同步发送消息,如果SendMsgTimeout时间内没有发送成功,则重试retryWhenSendFailed次 defaultMQProducer.setRetryTimesWhenSendFailed(Integer.parseInt(retryWhenSendFailed)); return defaultMQProducer; }
2、对普通消息进行监听
//普通消息发送。 MessageBuilder messageBuilder = new MessageBuilder(); Message message = messageBuilder.setTopic("topic") //设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息。 .setTag("messageTag") //消息体。 .setBody("messageBody".getBytes()) .build(); try { //发送消息,需要关注发送结果,并捕获失败等异常。 SendReceipt sendReceipt = producer.send(message); System.out.println(sendReceipt.getMessageId()); } catch (ClientException e) { e.printStackTrace(); } //消费示例一:使用PushConsumer消费普通消息,只需要在消费监听器中处理即可。 MessageListener messageListener = new MessageListener() { @Override public ConsumeResult consume(MessageView messageView) { System.out.println(messageView); //根据消费结果返回状态。 return ConsumeResult.SUCCESS; } }; //消费示例二:使用SimpleConsumer消费普通消息,主动获取消息进行消费处理并提交消费结果。 List<MessageView> messageViewList = null; try { messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30)); messageViewList.forEach(messageView -> { System.out.println(messageView); //消费处理完成后,需要主动调用ACK提交消费结果。 try { simpleConsumer.ack(messageView); } catch (ClientException e) { e.printStackTrace(); } }); } catch (ClientException e) { //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。 e.printStackTrace(); }
3、定时/延时MQ消息监听消费
//定时/延时消息发送 MessageBuilder messageBuilder = null; //以下示例表示:延迟时间为10分钟之后的Unix时间戳。 Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000; Message message = messageBuilder.setTopic("topic") //设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息。 .setTag("messageTag") .setDeliveryTimestamp(deliverTimeStamp) //消息体 .setBody("messageBody".getBytes()) .build(); try { //发送消息,需要关注发送结果,并捕获失败等异常。 SendReceipt sendReceipt = producer.send(message); System.out.println(sendReceipt.getMessageId()); } catch (ClientException e) { e.printStackTrace(); } //消费示例一:使用PushConsumer消费定时消息,只需要在消费监听器处理即可。 MessageListener messageListener = new MessageListener() { @Override public ConsumeResult consume(MessageView messageView) { System.out.println(messageView.getDeliveryTimestamp()); //根据消费结果返回状态。 return ConsumeResult.SUCCESS; } }; //消费示例二:使用SimpleConsumer消费定时消息,主动获取消息进行消费处理并提交消费结果。 List<MessageView> messageViewList = null; try { messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30)); messageViewList.forEach(messageView -> { System.out.println(messageView); //消费处理完成后,需要主动调用ACK提交消费结果。 try { simpleConsumer.ack(messageView); } catch (ClientException e) { e.printStackTrace(); } }); } catch (ClientException e) { //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。 e.printStackTrace(); }
4、顺序消息
//顺序消息发送。 MessageBuilder messageBuilder = null; Message message = messageBuilder.setTopic("topic") //设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息。 .setTag("messageTag") //设置顺序消息的排序分组,该分组尽量保持离散,避免热点排序分组。 .setMessageGroup("fifoGroup001") //消息体。 .setBody("messageBody".getBytes()) .build(); try { //发送消息,需要关注发送结果,并捕获失败等异常 SendReceipt sendReceipt = producer.send(message); System.out.println(sendReceipt.getMessageId()); } catch (ClientException e) { e.printStackTrace(); } //消费顺序消息时,需要确保当前消费者分组是顺序投递模式,否则仍然按并发乱序投递。 //消费示例一:使用PushConsumer消费顺序消息,只需要在消费监听器处理即可。 MessageListener messageListener = new MessageListener() { @Override public ConsumeResult consume(MessageView messageView) { System.out.println(messageView); //根据消费结果返回状态。 return ConsumeResult.SUCCESS; } }; //消费示例二:使用SimpleConsumer消费顺序消息,主动获取消息进行消费处理并提交消费结果。 //需要注意的是,同一个MessageGroup的消息,如果前序消息没有消费完成,再次调用Receive是获取不到后续消息的。 List<MessageView> messageViewList = null; try { messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30)); messageViewList.forEach(messageView -> { System.out.println(messageView); //消费处理完成后,需要主动调用ACK提交消费结果。 try { simpleConsumer.ack(messageView); } catch (ClientException e) { e.printStackTrace(); } }); } catch (ClientException e) { //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。 e.printStackTrace(); }
5、事务消息
事务消息相比普通消息发送时需要修改以下几点:
- 发送事务消息前,需要开启事务并关联本地的事务执行。
- 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复
//演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。 private static boolean checkOrderById(String orderId) { return true; } //演示demo,模拟本地事务的执行结果。 private static boolean doLocalTransaction() { return true; } public static void main(String[] args) throws ClientException { ClientServiceProvider provider = new ClientServiceProvider(); MessageBuilder messageBuilder = new MessageBuilder(); //构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。 Producer producer = provider.newProducerBuilder() .setTransactionChecker(messageView -> { /** * 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。 * 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。 */ final String orderId = messageView.getProperties().get("OrderId"); if (Strings.isNullOrEmpty(orderId)) { // 错误的消息,直接返回Rollback。 return TransactionResolution.ROLLBACK; } return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK; }) .build(); //开启事务分支。 final Transaction transaction; try { transaction = producer.beginTransaction(); } catch (ClientException e) { e.printStackTrace(); //事务分支开启失败,直接退出。 return; } Message message = messageBuilder.setTopic("topic") //设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息。 .setTag("messageTag") //一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。 .addProperty("OrderId", "xxx") //消息体。 .setBody("messageBody".getBytes()) .build(); //发送半事务消息 final SendReceipt sendReceipt; try { sendReceipt = producer.send(message, transaction); } catch (ClientException e) { //半事务消息发送失败,事务可以直接退出并回滚。 return; } /** * 执行本地事务,并确定本地事务结果。 * 1. 如果本地事务提交成功,则提交消息事务。 * 2. 如果本地事务提交失败,则回滚消息事务。 * 3. 如果本地事务未知异常,则不处理,等待事务消息回查。 * */ boolean localTransactionOk = doLocalTransaction(); if (localTransactionOk) { try { transaction.commit(); } catch (ClientException e) { // 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。 e.printStackTrace(); } } else { try { transaction.rollback(); } catch (ClientException e) { // 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。 e.printStackTrace(); } } }
6、消息发送重试机制
Apache RocketMQ 客户端连接服务端发起消息发送请求时,可能会因为网络故障、服务异常等原因导致调用失败。为保证消息的可靠性, Apache RocketMQ 在客户端SDK中内置请求重试逻辑,尝试通过重试发送达到最终调用成功的效果。
同步发送和异步发送模式均支持消息发送重试。
触发消息发送重试机制的条件如下:
- 客户端消息发送请求调用失败或请求超时网络异常造成连接失败或请求超时。
- 服务端节点处于重启或下线等状态造成连接失败。
- 服务端运行慢造成请求超时。
- 服务端返回失败错误码
参考:https://developer.aliyun.com/article/780968
文章下方有交流学习区!一起学习进步!也可以前往官网,加入官方微信交流群
创作不易,如果觉得文章不错,可以点赞收藏评论
你的支持和鼓励是我创作的动力❗❗❗
官网:Doker 多克;官方旗舰店:淘宝网 - 淘!我喜欢