RocketMQ-Spring 毕业两周年,为什么能成为 Spring 生态中最受欢迎的 messaging 实现?

简介: RocketMQ-Spring 毕业两周年,为什么能成为 Spring 生态中最受欢迎的 messaging 实现?

2019 年 1 月,孵化 6 个月的 RocketMQ-Spring 作为 Apache RocketMQ 的子项目正式毕业,发布了第一个 Release 版本 2.0.1。该项目是把 RocketMQ 的客户端使用 Spring Boot 的方式进行了封装,可以让用户通过简单的 annotation 和标准的 Spring Messaging API 编写代码来进行消息的发送和消费。当时 RocketMQ 社区同学请 Spring 社区的同学对 RocketMQ-Spring 代码进行 review,引出一段罗美琪(RocketMQ)和春波特(Spring Boot)的故事


时隔两年,RocketMQ-Spring 正式发布 2.2.0。在这期间,RocketMQ-Spring 迭代了数个版本,以 RocketMQ-Spring 为基础实现的 Spring Cloud Stream RocketMQ Binder、Spring Cloud Bus RocketMQ 登上了Spring 的官网,Spring 布道师 baeldung 向国外同学介绍如何使用 RocketMQ-Spring,越来越多国内外的同学开始使用 RocketMQ-Spring 收发消息,RocketMQ-Spring 仓库的 star 数也在短短两年时间内超越了 Spring-Kafka 和 Spring-AMQP(注:两者均由 Spring 社区维护),成为 Apache RocketMQ 最受欢迎的生态项目之一。

1.png


RocketMQ-Spring 的受欢迎一方面得益于支持丰富业务场景的 RocketMQ 与微服务生态 Spring 的完美契合,另一方面也与 RocketMQ-Spring 本身严格遵循 Spring Messaging API 规范,支持丰富的消息类型分不开。

遵循 Spring Messaging API 规范


Spring Messaging 提供了一套抽象的 API,对消息发送端和消息接收端的模式进行规定,不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性。


1. 发送端


RocketMQ-Spring 在遵循 Spring Messaging API 规范的基础上结合 RocketMQ 自身的功能特点提供了相应的 API。在消息的发送端,RocketMQ-Spring 通过实现 RocketMQTemplate 完成消息的发送。如下图所示,RocketMQTemplate 继承 AbstractMessageSendingTemplate 抽象类,来支持 Spring Messaging API 标准的消息转换和发送方法,这些方法最终会代理给 doSend 方法,doSend 方法会最终调用 syncSend,由 DefaultMQProducer 实现。

2.png


除 Spring Messaging API 规范中的方法,RocketMQTemplate 还实现了 RocketMQ 原生客户端的一些方法,来支持更加丰富的消息类型。值得注意的是,相比于原生客户端需要自己去构建 RocketMQ Message(比如将对象序列化成 byte 数组放入 Message 对象),RocketMQTemplate 可以直接将对象、字符串或者 byte 数组作为参数发送出去(对象序列化操作由 RocketMQ-Spring 内置完成),在消费端约定好对应的 Schema 即可正常收发。

RocketMQTemplate Send API: 
  SendResult syncSend(String destination, Object payload) 
  SendResult syncSend(String destination, Message<?> message)  
    void asyncSend(String destination, Message<?> message, SendCallback sendCallback)  
    void asyncSend(String destination, Message<?> message, SendCallback sendCallback)  
……


2. 消费端


在消费端,需要实现一个包含 @RocketMQMessageListener 注解的类(需要实现 RocketMQListener 接口,并实现 onMessage 方法,在注解中进行 topic、consumerGroup 等属性配置),这个 Listener 会一对一的被放置到 DefaultRocketMQListenerContainer 容器对象中,容器对象会根据消费的方式(并发或顺序),将 RocketMQListener 封装到具体的 RocketMQ 内部的并发或者顺序接口实现。在容器中创建 RocketMQ DefaultPushConsumer 对象,启动并监听定制的 Topic 消息,完成约定 Schema 对象的转换,回调到 Listener 的 onMessage 方法。


@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${demo.rocketmq.tag}")public class StringConsumer implements RocketMQListener<String> {    
    @Override    
    public void onMessage(String message) {
       System.out.printf("------- StringConsumer received: %s \n", message);    }
       }


除此 Push 接口之外,在最新的 2.2.0 版本中,RocketMQ-Spring 实现了 RocketMQ Lite Pull Consumer。通过在配置文件中进行 consumer 的配置,利用 RocketMQTemplate 的 Recevie 方法即可主动 Pull 消息。

配置文件resource/application.properties:
rocketmq.name-server=localhost:9876
rocketmq.consumer.group=my-group1
rocketmq.consumer.topic=test
Pull Consumer代码:
while(!isStop) {
  List<String> messages = rocketMQTemplate.receive(String.class);
    System.out.println(messages);
}

丰富的消息类型


RocketMQ Spring 消息类型支持方面与 RocketMQ 原生客户端完全对齐,包括同步/异步/one-way、顺序、延迟、批量、事务以及 Request-Reply 消息。在这里,主要介绍较为特殊的事务消息和 request-reply 消息。

1. 事务消息


RocketMQ 的事务消息不同于 Spring Messaging 中的事务消息,依然采用 RocketMQ 原生事务消息的方案。如下所示,发送事务消息时需要实现一个包含 @RocketMQTransactionListener 注解的类,并实现 executeLocalTransaction 和 checkLocalTransaction 方法,从而来完成执行本地事务以及检查本地事务执行结果。

// Build a SpringMessage for sending in transaction
Message msg = MessageBuilder.withPayload(..)...;
// In sendMessageInTransaction(), the first parameter transaction name ("test")
// must be same with the @RocketMQTransactionListener's member field 'transName'
rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);
// Define transaction listener with the annotation @RocketMQTransactionListener
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // ... local transaction process, return bollback, commit or unknown
        return RocketMQLocalTransactionState.UNKNOWN;
    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // ... check transaction status and return bollback, commit or unknown
        return RocketMQLocalTransactionState.COMMIT;
    }
}

在 2.1.0 版本中,RocketMQ-Spring 重构了事务消息的实现,如下图所示,旧版本中每一个 group 对应一个 TransactionProducer,而在新版本中改为每一个 RocketMQTemplate 对应一个 TransationProducer,从而解决了并发使用多个事务消息的问题。当用户需要在单进程使用多个事务消息时,可以使用 ExtRocketMQTemplate 来完成(一般情况下,推荐一个进程使用一个 RocketMQTemplate,ExtRocketMQTemplate 可以使用在同进程中需要使用多个 Producer / LitePullConsumer 的场景,可以为 ExtRocketMQTemplate 指定与标准模版 RocketMQTemplate 不同的 nameserver、group 等配置),并在对应的 RocketMQTransactionListener 注解中指定 rocketMQTemplateBeanName 为 ExtRocketMQTemplate 的 BeanName。

3.png


2. Request-Reply 消息


在 2.1.0 版本中,RocketMQ-Spring 开始支持 Request-Reply 消息。Request-Reply 消息指的是上游服务投递消息后进入等待被通知的状态,直到消费端返回结果并返回给发送端。在 RocketMQ-Spring 中,发送端通过 RocketMQTemplate 的 sendAndReceivce 方法进行发送,如下所示,主要有同步和异步两种方式。异步方式中通过实现 RocketMQLocalRequestCallback 进行回调。


// 同步发送request并且等待String类型的返回值
String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
// 异步发送request并且等待User类型的返回值
rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {
    @Override public void onSuccess(User message) {
      ……
    }
    @Override public void onException(Throwable e) {
      ……
    }
});

在消费端,仍然需要实现一个包含 @RocketMQMessageListener 注解的类,但需要实现的接口是 RocketMQReplyListener 接口(普通消息为 RocketMQListener 接口),其中 T 表示接收值的类型,R 表示返回值的类型,接口需要实现带返回值的 onMessage 方法,返回值的内容返回给对应的 Producer。

@Service
@RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
    @Override
    public String onMessage(String message) {
        ……
        return "reply string";
    }
}


RocketMQ-Spring 遵循 Spring 约定大于配置(Convention over configuration)的理念,通过启动器(Spring Boot Starter)的方式,在 pom 文件引入依赖(groupId:org.apache.rocketmq,artifactId:rocketmq-spring-boot-starter)便可以在 Spring Boot 中集成所有 RocketMQ 客户端的所有功能,通过简单的注解使用即可完成消息的收发。在RocketMQ-Spring Github Wiki 中有更加详细的用法和常见问题解答。


据统计,从 RocketMQ-Spring 发布第一个正式版本以来,RocketMQ-Spring 完成 16 个 bug 修复,37 个 imporvement,其中包括事务消息重构,消息过滤、消息序列化、多实例 RocketMQTemplate 优化等重要优化,欢迎更多的小伙伴能参与到 RocketMQ 社区的建设中来,罗美琪(RocketMQ)和春波特(Spring Boot)的故事还在继续...

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
19天前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
31 6
|
4月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
72 0
|
6月前
|
消息中间件 监控 Java
使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收
使用Spring Boot结合ActiveMQ和MQTT实现消息的发送和接收
574 3
|
5月前
|
消息中间件 Java 数据安全/隐私保护
Spring Boot与RabbitMQ的集成
Spring Boot与RabbitMQ的集成
|
5月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
5月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
6月前
|
消息中间件 JavaScript Java
消息队列 MQ产品使用合集之如何嵌入到Spring Boot中运行
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Java Spring
Spring Boot与RabbitMQ的集成应用
Spring Boot与RabbitMQ的集成应用
|
6月前
|
消息中间件 Java Spring
实现Spring Boot与RabbitMQ消息中间件的无缝集成
实现Spring Boot与RabbitMQ消息中间件的无缝集成
|
6月前
|
消息中间件 Java RocketMQ
教程:Spring Boot整合RocketMQ的配置与优化
教程:Spring Boot整合RocketMQ的配置与优化
下一篇
DataWorks