rocketmq-spring : 实战与源码解析一网打尽

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: RocketMQ 是大家耳熟能详的消息队列,开源项目 rocketmq-spring 可以帮助开发者在 Spring Boot 项目中快速整合 RocketMQ。这篇文章会介绍 Spring Boot 项目使用 rocketmq-spring SDK 实现消息收发的操作流程,同时笔者会从开发者的角度解读 SDK 的设计逻辑。

RocketMQ 是大家耳熟能详的消息队列,开源项目 rocketmq-spring 可以帮助开发者在 Spring Boot 项目中快速整合 RocketMQ。

这篇文章会介绍 Spring Boot 项目使用 rocketmq-spring SDK 实现消息收发的操作流程,同时笔者会从开发者的角度解读 SDK 的设计逻辑

1 SDK 简介


项目地址:

https://github.com/apache/rocketmq-spring

rocketmq-spring 的本质是一个 Spring Boot starter

Spring Boot 基于“约定大于配置”(Convention over configuration)这一理念来快速地开发、测试、运行和部署 Spring 应用,并能通过简单地与各种启动器(如 spring-boot-web-starter)结合,让应用直接以命令行的方式运行,不需再部署到独立容器中。

Spring Boot starter 构造的启动器使用起来非常方便,开发者只需要在 pom.xml 引入 starter 的依赖定义,在配置文件中编写约定的配置即可。

下面我们看下 rocketmq-spring-boot-starter 的配置:

1、引入依赖

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.3</version>
</dependency>

2、约定配置

接下来,我们分别按照生产者和消费者的顺序,详细的讲解消息收发的操作过程。

2 生产者

首先我们添加依赖后,进行如下三个步骤:

1、配置文件中配置如下

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
      group: platform-sms-server-group
    # access-key: myaccesskey
    # secret-key: mysecretkey
  topic: sms-common-topic

生产者配置非常简单,主要配置名字服务地址生产者组

2、需要发送消息的类中注入 RcoketMQTemplate

@Autowired
private RocketMQTemplate rocketMQTemplate;

@Value("${rocketmq.topic}")
private String smsTopic;

3、发送消息,消息体可以是自定义对象,也可以是 Message 对象

rocketMQTemplate 类包含多钟发送消息的方法:

  1. 同步发送 syncSend
  2. 异步发送 asyncSend
  3. 顺序发送 syncSendOrderly
  4. oneway发送 sendOneWay

下面的代码展示如何同步发送消息。

String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
SendResult sendResult =
         rocketMQTemplate.syncSend(
            destination, 
            MessageBuilder.withPayload(messageContent).
            setHeader(MessageConst.PROPERTY_KEYS, uniqueId).
            build()
          );
if (sendResult != null) {
    if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
       // send message success ,do something 
    }
}

syncSend 方法的第一个参数是发送的目标,格式是:topic + ":" + tags

第二个参数是:spring-message 规范的 message 对象 ,而 MessageBuilder 是一个工具类,方法链式调用创建消息对象。

3 消费者

1、配置文件中配置如下

rocketmq:
  name-server: 127.0.0.1:9876
  consumer1:
    group: platform-sms-worker-common-group
    topic: sms-common-topic

2、实现消息监听器

@Component
@RocketMQMessageListener(
    consumerGroup = "${rocketmq.consumer1.group}",  //消费组
    topic = "${rocketmq.consumer1.topic}"                      //主题
)
public class SmsMessageCommonConsumer implements RocketMQListener<String> {
    public void onMessage(String message) {
       System.out.println("普通短信:" + message);
    }
}

消费者实现类也可以实现 RocketMQListener\<MessageExt>, 在 onMessage 方法里通过 RocketMQ 原生消息对象 MessageExt 获取更详细的消息数据

public void onMessage(MessageExt message) {
    try {
        String body = new String(message.getBody(), "UTF-8");
        logger.info("普通短信:" + message);
    } catch (Exception e) {
        logger.error("common onMessage error:", e);
    }
}

4 源码概览

最新源码中,我们可以看到源码中包含四个模块:

1、rocketmq-spring-boot-parent

该模块是父模块,定义项目所有依赖的 jar 包。

2、rocketmq-spring-boot

核心模块,实现了 starter 的核心逻辑。

3、rocketmq-spring-boot-starter

SDK 模块,简单封装,外部项目引用。

4、rocketmq-spring-boot-samples

示例代码模块。这个模块非常重要,当用户使用 SDK 时,可以参考示例快速开发。

5 starter 实现

我们重点分析下 rocketmq-spring-boot 模块的核心源码:

<br/>

spring-boot-starter 实现需要包含如下三个部分:

1、定义 Spring 自身的依赖包和 RocketMQ 的依赖包 ;

2、定义spring.factories 文件

在 resources 包下创建 META-INF 目录后,新建 spring.factories 文件,并在文件中定义自动加载类,文件内容是:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

spring boot 会根据文件中配置的自动化配置类来自动初始化相关的 Bean、Component 或 Service。

3、实现自动加载类

在 RocketMQAutoConfiguration 类的具体实现中,我们重点分析下生产者和消费者是如何分别启动的。

▍生产者发送模板类:RocketMQTemplate

RocketMQAutoConfiguration 类定义了两个默认的 Bean :

首先SpringBoot项目中配置文件中的配置值会根据属性条件绑定到 RocketMQProperties 对象 中,然后使用 RocketMQ 的原生 API 分别创建生产者 Bean 和拉取消费者 Bean , 分别将两个 bean 设置到 RocketMQTemplate 对象中。

两个重点需要强调:

  • 发送消息时,将 spring-message 规范下的消息对象封装成 RocketMQ 消息对象

  • 默认拉取消费者 litePullConsumer 。拉取消费者一般用于大数据批量处理场景 。

    原生使用方式

​ RocketMQTemplate 类封装了拉取消费者的receive方法,以方便开发者使用。

▍自定义消费者类

下图是并发消费者的例子:

消费者示例代码

那么 rocketmq-spring 是如何自动启动消费者呢 ?

spring 容器首先注册了消息监听器后置处理器,然后调用 ListenerContainerConfiguration 类的 registerContainer 方法 。

对比并发消费者的例子,我们可以看到: DefaultRocketMQListenerContainer 是对 DefaultMQPushConsumer 消费逻辑的封装。

封装消费消息的逻辑,同时满足 RocketMQListener 泛化接口支持不同参数,比如 String 、MessageExt 、自定义对象 。

首先DefaultRocketMQListenerContainer初始化之后, 获取 onMessage 方法的参数类型 。

然后消费者调用 consumeMessage 处理消息时,封装了一个 handleMessage 方法 ,将原生 RocketMQ 消息对象 MessageExt 转换成 onMessage 方法定义的参数对象,然后调用 rocketMQListener 的 onMessage 方法。

mnjh9

上图右侧标红的代码也就是该方法的精髓:

rocketMQListener.onMessage(doConvertMessage(messageExt));

6 写到最后

开源项目 rocketmq-spring 有很多值得学习的地方 ,我们可以从如下四个层面逐层进阶:

1、学会如何使用 :参考 rocketmq-spring-boot-samples 模块的示例代码,学会如何发送和接收消息,快速编码;

2、模块设计:学习项目的模块分层 (父模块、SDK 模块、核心实现模块、示例代码模块);

3、starter 设计思路 :定义自动配置文件 spring.factories 、设计配置属性类 、在 RocketMQ client 的基础上实现优雅的封装、深入理解 RocketMQ 源码等;

4、举一反三:当我们理解了 rocketmq-spring 的源码,我们可以尝试模仿该项目写一个简单的 spring boot starter。


如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
10天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
36 2
|
22天前
|
自然语言处理 编译器 Linux
|
16天前
|
前端开发 Java 开发者
Spring生态学习路径与源码深度探讨
【11月更文挑战第13天】Spring框架作为Java企业级开发中的核心框架,其丰富的生态系统和强大的功能吸引了无数开发者的关注。学习Spring生态不仅仅是掌握Spring Framework本身,更需要深入理解其周边组件和工具,以及源码的底层实现逻辑。本文将从Spring生态的学习路径入手,详细探讨如何系统地学习Spring,并深入解析各个重点的底层实现逻辑。
40 9
|
10天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
23天前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
40 3
|
24天前
|
UED
<大厂实战经验> Flutter&鸿蒙next 中使用 initState 和 mounted 处理异步请求的详细解析
在 Flutter 开发中,处理异步请求是常见需求。本文详细介绍了如何在 `initState` 中触发异步请求,并使用 `mounted` 属性确保在适当时机更新 UI。通过示例代码,展示了如何安全地进行异步操作和处理异常,避免在组件卸载后更新 UI 的问题。希望本文能帮助你更好地理解和应用 Flutter 中的异步处理。
64 3
|
24天前
|
JavaScript API 开发工具
<大厂实战场景> ~ Flutter&鸿蒙next 解析后端返回的 HTML 数据详解
本文介绍了如何在 Flutter 中解析后端返回的 HTML 数据。首先解释了 HTML 解析的概念,然后详细介绍了使用 `http` 和 `html` 库的步骤,包括添加依赖、获取 HTML 数据、解析 HTML 内容和在 Flutter UI 中显示解析结果。通过具体的代码示例,展示了如何从 URL 获取 HTML 并提取特定信息,如链接列表。希望本文能帮助你在 Flutter 应用中更好地处理 HTML 数据。
103 1
|
17天前
|
前端开发 中间件 PHP
PHP框架深度解析:Laravel的魔力与实战应用####
【10月更文挑战第31天】 本文作为一篇技术深度好文,旨在揭开PHP领域璀璨明星——Laravel框架的神秘面纱。不同于常规摘要的概括性介绍,本文将直接以一段引人入胜的技术剖析开场,随后通过具体代码示例和实战案例,逐步引导读者领略Laravel在简化开发流程、提升代码质量及促进团队协作方面的卓越能力。无论你是PHP初学者渴望深入了解现代开发范式,还是经验丰富的开发者寻求优化项目架构的灵感,本文都将为你提供宝贵的见解与实践指导。 ####
|
20天前
|
前端开发 JavaScript
JavaScript新纪元:ES6+特性深度解析与实战应用
【10月更文挑战第29天】本文深入解析ES6+的核心特性,包括箭头函数、模板字符串、解构赋值、Promise、模块化和类等,结合实战应用,展示如何利用这些新特性编写更加高效和优雅的代码。
39 0
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
70 0
下一篇
无影云桌面