Spring Cloud Alibaba 七天训练营(六)分布式消息(事件)驱动

本文涉及的产品
应用实时监控服务-用户体验监控,每月100OCU免费额度
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: 对文档有任何问题,请在评论区留言!
文档目录

1. 简介

事件驱动架构(Event-driven 架构,简称 EDA)是软件设计领域内的一套程序设计模型。这套模型的意义是所有的操作通过事件的发送/接收来完成。举个例子,比如一个订单的创建在传统软件设计中服务端通过接口暴露创建订单的动作,然后客户端访问创建订单。在事件驱动设计里,订单的创建通过接收订单事件来完成,这个过程中有事件发送者和事件接受者这两个模块,事件发送者的作用是发送订单事件,事件接受者的作用的接收订单事件。Spring Cloud Stream 是一套基于消息的事件驱动开发框架,它提供了一套全新的消息编程模型,此模型屏蔽了底层具体消息中间件的使用方式。开发者们使用这套模型可以完成基于消息的事件驱动应用开发。

2. 学习目标

  • 掌握 Spring 对消息的编程模型封装
  • 掌握 RocketMQ 整合 Spring Cloud Stream 完成消息的发送和接收
  • 掌握 RocketMQ 整合 Spring Cloud Bus 完成远程事件的发送和接收

3. 详细内容

  • 概念理解:指导读者理解 Spring 的消息编程模型
  • 消息发送/接收:实战 Spring Cloud Steam RocketMQ Binder
  • 事件发送/接收: 实战 Spring Cloud Bus RocketMQ

4. 理解 Spring 消息编程模型

首先我们来看这个场景,不同的消息中间件发送消息的代码:

1.png


每个消息中间件都有自己的消息模型编程,他们的代码编写方式都不一致。同样地,在消息的订阅方面,也是不同的代码。这个时候如果某天想把 Kafka 切换到 RocketMQ,必须得修改大量代码。

Spring 生态里有两个消息相关的模块和项目,分别是 spring-messaging 模块和 Spring Integration 项目,它们对消息的编程模型进行了统一,不论是 Apache RocketMQ 的 Message,或者是 Apache Kafka 的 ProducerRecord,都被统一称为 org.springframework.messaging.Message 接口。

Message 接口有两个方法,分别是 getPayload 以及 getHeaders 用于获取消息体以及消息头。如图所示,这也意味着一个消息 Message 由 Header 和 Payload 组成:

2.png

Payload 是一个泛型,意味是消息体可以放任意数据类型。Header 是一个 MessageHeaders 类型的消息头。

有了消息之后,这个消息被发送到哪里呢?Spring 提供了消息通道 MessageChannel 的概念。消息可以被发送到消息通道里,然后再通过消息处理器 MessageHandler 去处理消息通道里的消息:

3.png

消息处理这里又会遇到一个问题。如果消息通道里只有 1 个消息,但是消息处理器有 N 个,这个时候要被哪个消息处理器处理呢?这里又涉及一个消息分发器的问题。UnicastingDispatcher 表示单播的处理方式,消息会通过负载均衡被分发到某一个消息处理器上,BroadcastingDispatcher 表示广播的方式,消息会被所有的消息处理器处理。

4.png

5. Spring Cloud Stream

Spring Cloud Stream 是一套基于消息的事件驱动开发框架。

Spring Cloud Stream 在 Spring Integration 项目的基础上再进行了一些封装,提出一些新的概念,让开发者能够更简单地使用这套消息编程模型。如图所示,这是三者之间的关系:

5.png


如下图所示,这是 Spring Cloud Stream 的编程模型。通过 RabbitMQ Binder 构建 input Binding 用于读取 RabbitMQ 上的消息,将 payload 内容转成大写再通过 Kafka Binder 构建的 output Binding 写入到 Kafka 中。图上中间的 [4 ]()行非常简单的代码就可以完成从 RabbitMQ 读取消息再写入到 Kafka 的动作。

6.png

以下代码是使用 Spring Cloud Stream 以最简单的方式完成消息的发送和接收:

@SpringBootApplication@EnableBinding({Source.class, Sink.class})  // ①
public class SCSApplication {
    public static void main(String[] args) {
        new SpringApplicationBuilder().sources(SCSApplication.class)
            .web(WebApplicationType.NONE).run(args);
    }
    @Autowired
    Source source;  // ②
    @Bean
    public CommandLineRunner runner() {
        return (args) -> {
            source.output().send(MessageBuilder.withPayload("custom payload").setHeader("k1", "v1").build());  // ③
        };
    }
    @StreamListener(Sink.INPUT)  // ④
    @SendTo(Source.OUTPUT)  // ⑤
    public String receive(String msg) {
        return msg.toUpperCase();
    }
}
  1. 使用 [@EnableBinding ] 注解,注解里面有两个参数 Source 和 Sink,它们都是接口。Source 接口内部有个 MessageChannel 类型返回值的 output 方法,被 [@Output ] 注解修饰表示这是一个 Output Binding;Sink 接口内部有个 SubscribableChannel 类型返回值的 intput 方法,被 [@Input ] 注解修饰表示这是一个 Input Binding。[@EnableBinding ] 注解会针对这两个接口生成动态代理。
  2. 注入 [@EnableBinding ]注解对于 Source 接口生成的动态代理。
  3. 使用 [@EnableBinding ] 注解对于 Source 接口生成的动态代理内部的 MessageChannel 发送一条消息。最终消息会被发送到消息中间件对应的 topic 里。
  4. [@StreamListener ] 注解订阅 [@EnableBinding ]注解对于 Sink 接口生成的动态代理内部的 SubscribableChannel 中的消息,这里会订阅到消息中间件对应的topic 和 group。
  5. 消息处理结果发送到[@EnableBinding ]注解对于 Source 接口生成的动态代理内部的 MessageChannel。最终消息会被发送到消息中间件对应的topic 里。

上述代码需要配置信息:

spring.cloud.stream.bindings.input.destination=test-input
spring.cloud.stream.bindings.input.group=test-input-binder
spring.cloud.stream.bindings.input.binder=kafka

spring.cloud.stream.bindings.output.destination=test-output
spring.cloud.stream.bindings.output.binder=rocketmq

这里的 Input Binding 对应的 topic 是 test-input,group 是 test-input-binder,对应的 MQ 是 Kafka,Output Binding 对应的 topic 是 test-output,对应的 MQ 是 RocketMQ。

所以这段代码的意思是以 test-input-binder 这个 group 去 Kafka 上读取 test-input 这个 topic 下的消息,把消息的内容转换成大写再发送给 RocketMQ 的 test-output topic 上。

当然,你也可以直接通过沙箱环境直接查看案例

相关文章
|
14天前
|
Java Nacos Sentinel
Spring Cloud Alibaba:一站式微服务解决方案
Spring Cloud Alibaba(简称SCA) 是一个基于 Spring Cloud 构建的开源微服务框架,专为解决分布式系统中的服务治理、配置管理、服务发现、消息总线等问题而设计。
151 13
Spring Cloud Alibaba:一站式微服务解决方案
|
19天前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
52 5
|
20天前
|
缓存 NoSQL Java
Spring Boot中的分布式缓存方案
Spring Boot提供了简便的方式来集成和使用分布式缓存。通过Redis和Memcached等缓存方案,可以显著提升应用的性能和扩展性。合理配置和优化缓存策略,可以有效避免常见的缓存问题,保证系统的稳定性和高效运行。
37 3
|
1月前
|
存储 人工智能 Java
Spring AI Alibaba 配置管理,用 Nacos 就够了
本文通过一些实操案例展示了 Spring AI Alibaba + Nacos 在解决 AI 应用中一系列复杂配置管理挑战的方案,从动态 Prompt 模板的灵活调整、模型参数的即时优化,到敏感信息的安全加密存储。Spring AI Alibaba 简化了对接阿里云通义大模型的流程,内置 Nacos 集成也为开发者提供了无缝衔接云端配置托管的捷径,整体上极大提升了 AI 应用开发的灵活性和响应速度。
219 13
|
1月前
|
人工智能 前端开发 Java
基于开源框架Spring AI Alibaba快速构建Java应用
本文旨在帮助开发者快速掌握并应用 Spring AI Alibaba,提升基于 Java 的大模型应用开发效率和安全性。
基于开源框架Spring AI Alibaba快速构建Java应用
|
26天前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
46 6
|
25天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
37 1
|
2月前
|
人工智能 开发框架 Java
总计 30 万奖金,Spring AI Alibaba 应用框架挑战赛开赛
Spring AI Alibaba 应用框架挑战赛邀请广大开发者参与开源项目的共建,助力项目快速发展,掌握 AI 应用开发模式。大赛分为《支持 Spring AI Alibaba 应用可视化调试与追踪本地工具》和《基于 Flow 的 AI 编排机制设计与实现》两个赛道,总计 30 万奖金。
|
2月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
22天前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
42 8