Spring Cloud Alibaba,Spring Cloud Stream 事件驱动(五)

简介: Spring Cloud Alibaba,Spring Cloud Stream 事件驱动(五)

1. 简介

事件驱动架构(Event-driven 架构,简称 EDA)是软件设计领域内的一套程序设计模型。这套模型的意义是所有的操作通过事件的发送/接收来完成。举个例子,比如一个订单的创建在传统软件设计中服务端通过接口暴露创建订单的动作,然后客户端访问创建订单。在事件驱动设计里,订单的创建通过接收订单事件来完成,这个过程中有事件发送者和事件接受者这两个模块,事件发送者的作用是发送订单事件,事件接受者的作用的接收订单事件。Spring Cloud Stream 是一套基于消息的事件驱动开发框架,它提供了一套全新的消息编程模型,此模型屏蔽了底层具体消息中间件的使用方式。开发者们使用这套模型可以完成基于消息的事件驱动应用开发。

2. 学习目标

  • 掌握 Spring 对消息的编程模型封装
  • 掌握 RocketMQ 整合 Spring Cloud Stream 完成消息的发送和接收
  • 掌握 RocketMQ 整合 Spring Cloud Bus 完成远程事件的发送和接收

3. 详细内容

  • 概念理解:指导读者理解 Spring 的消息编程模型
  • 消息发送/接收:实战 Spring Cloud Steam RocketMQ Binder
  • 事件发送/接收: 实战 Spring Cloud Bus RocketMQ

4. 理解 Spring 消息编程模型

首先我们来看这个场景,不同的消息中间件发送消息的代码:

image.png每个消息中间件都有自己的消息模型编程,他们的代码编写方式都不一致。同样地,在消息的订阅方面,也是不同的代码。这个时候如果某天想把 Kafka 切换到 RocketMQ,必须得修改大量代码。

Spring 生态里有两个消息相关的模块和项目,分别是 spring-messaging 模块和 Spring Integration 项目,它们对消息的编程模型进行了统一,不论是 Apache RocketMQ 的 Message,或者是 Apache Kafka 的 ProducerRecord,都被统一称为 org.springframework.messaging.Message 接口。

Message 接口有两个方法,分别是 getPayload 以及 getHeaders 用于获取消息体以及消息头。如图所示,这也意味着一个消息 Message 由 Header 和 Payload 组成:

image.png

Payload 是一个泛型,意味是消息体可以放任意数据类型。Header 是一个 MessageHeaders 类型的消息头。

有了消息之后,这个消息被发送到哪里呢?Spring 提供了消息通道 MessageChannel 的概念。消息可以被发送到消息通道里,然后再通过消息处理器 MessageHandler 去处理消息通道里的消息:

image.png消息处理这里又会遇到一个问题。如果消息通道里只有 1 个消息,但是消息处理器有 N 个,这个时候要被哪个消息处理器处理呢?这里又涉及一个消息分发器的问题。UnicastingDispatcher 表示单播的处理方式,消息会通过负载均衡被分发到某一个消息处理器上,BroadcastingDispatcher 表示广播的方式,消息会被所有的消息处理器处理。

image.png

5. Spring Cloud Stream

Spring Cloud Stream 是一套基于消息的事件驱动开发框架。

Spring Cloud Stream 在 Spring Integration 项目的基础上再进行了一些封装,提出一些新的概念,让开发者能够更简单地使用这套消息编程模型。如图所示,这是三者之间的关系:

image.png如下图所示,这是 Spring Cloud Stream 的编程模型。通过 RabbitMQ Binder 构建 input Binding 用于读取 RabbitMQ 上的消息,将 payload 内容转成大写再通过 Kafka Binder 构建的 output Binding 写入到 Kafka 中。图上中间的 4行非常简单的代码就可以完成从 RabbitMQ 读取消息再写入到 Kafka 的动作。

image.png以下代码是使用 Spring Cloud Stream 以最简单的方式完成消息的发送和接收:

@SpringBootApplication@EnableBinding({Source.class, Sink.class})  // ①
public class SCSApplication {
    public static void main(String[] args) {
        new SpringApplicationBuilder().sources(SCSApplication.class)
            .web(WebApplicationType.NONE).run(args);
    }
    @Autowired
    Source source;  // ②
    @Bean
    public CommandLineRunner runner() {
        return (args) -> {
            source.output().send(MessageBuilder.withPayload("custom payload").setHeader("k1", "v1").build());  // ③
        };
    }
    @StreamListener(Sink.INPUT)  // ④
    @SendTo(Source.OUTPUT)  // ⑤
    public String receive(String msg) {
        return msg.toUpperCase();
    }
}
  1. 使用 @EnableBinding 注解,注解里面有两个参数 Source 和 Sink,它们都是接口。Source 接口内部有个 MessageChannel 类型返回值的 output 方法,被 @Output 注解修饰表示这是一个 Output Binding;Sink 接口内部有个 SubscribableChannel 类型返回值的 intput 方法,被 @Input 注解修饰表示这是一个 Input Binding。@EnableBinding 注解会针对这两个接口生成动态代理。
  2. 注入 @EnableBinding 注解对于 Source 接口生成的动态代理。
  3. 使用 @EnableBinding 注解对于 Source 接口生成的动态代理内部的 MessageChannel 发送一条消息。最终消息会被发送到消息中间件对应的 topic 里。
  4. @StreamListener 注解订阅 @EnableBinding 注解对于 Sink 接口生成的动态代理内部的 SubscribableChannel 中的消息,这里会订阅到消息中间件对应的topic 和 group。
  5. 消息处理结果发送到@EnableBinding 注解对于 Source 接口生成的动态代理内部的 MessageChannel。最终消息会被发送到消息中间件对应的topic 里。

上述代码需要配置信息:

spring.cloud.stream.bindings.input.destination=test-input
spring.cloud.stream.bindings.input.group=test-input-binder
spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.destination=test-output
spring.cloud.stream.bindings.output.binder=rocketmq

这里的 Input Binding 对应的 topic 是 test-input,group 是 test-input-binder,对应的 MQ 是 Kafka,Output Binding 对应的 topic 是 test-output,对应的 MQ 是 RocketMQ。

所以这段代码的意思是以 test-input-binder 这个 group 去 Kafka 上读取 test-input 这个 topic 下的消息,把消息的内容转换成大写再发送给 RocketMQ 的 test-output topic 上。

目录
相关文章
|
12天前
|
SpringCloudAlibaba 负载均衡 Dubbo
【SpringCloud Alibaba系列】Dubbo高级特性篇
本章我们介绍Dubbo的常用高级特性,包括序列化、地址缓存、超时与重试机制、多版本、负载均衡。集群容错、服务降级等。
【SpringCloud Alibaba系列】Dubbo高级特性篇
|
12天前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
12天前
|
SpringCloudAlibaba JavaScript Dubbo
【SpringCloud Alibaba系列】Dubbo dubbo-admin安装教程篇
本文介绍了 Dubbo-Admin 的安装和使用步骤。Dubbo-Admin 是一个前后端分离的项目,前端基于 Vue,后端基于 Spring Boot。安装前需确保开发环境(Windows 10)已安装 JDK、Maven 和 Node.js,并在 Linux CentOS 7 上部署 Zookeeper 作为注册中心。
【SpringCloud Alibaba系列】Dubbo dubbo-admin安装教程篇
|
12天前
|
SpringCloudAlibaba Dubbo Java
【SpringCloud Alibaba系列】Dubbo基础入门篇
Dubbo是一款高性能、轻量级的开源Java RPC框架,提供面向接口代理的高性能RPC调用、智能负载均衡、服务自动注册和发现、运行期流量调度、可视化服务治理和运维等功能。
【SpringCloud Alibaba系列】Dubbo基础入门篇
|
12天前
|
人工智能 前端开发 Java
Spring AI Alibaba + 通义千问,开发AI应用如此简单!!!
本文介绍了如何使用Spring AI Alibaba开发一个简单的AI对话应用。通过引入`spring-ai-alibaba-starter`依赖和配置API密钥,结合Spring Boot项目,只需几行代码即可实现与AI模型的交互。具体步骤包括创建Spring Boot项目、编写Controller处理对话请求以及前端页面展示对话内容。此外,文章还介绍了如何通过添加对话记忆功能,使AI能够理解上下文并进行连贯对话。最后,总结了Spring AI为Java开发者带来的便利,简化了AI应用的开发流程。
201 0
|
27天前
|
Java Nacos Sentinel
Spring Cloud Alibaba:一站式微服务解决方案
Spring Cloud Alibaba(简称SCA) 是一个基于 Spring Cloud 构建的开源微服务框架,专为解决分布式系统中的服务治理、配置管理、服务发现、消息总线等问题而设计。
213 13
Spring Cloud Alibaba:一站式微服务解决方案
|
2月前
|
人工智能 前端开发 Java
基于开源框架Spring AI Alibaba快速构建Java应用
本文旨在帮助开发者快速掌握并应用 Spring AI Alibaba,提升基于 Java 的大模型应用开发效率和安全性。
241 12
基于开源框架Spring AI Alibaba快速构建Java应用
|
2月前
|
存储 人工智能 Java
Spring AI Alibaba 配置管理,用 Nacos 就够了
本文通过一些实操案例展示了 Spring AI Alibaba + Nacos 在解决 AI 应用中一系列复杂配置管理挑战的方案,从动态 Prompt 模板的灵活调整、模型参数的即时优化,到敏感信息的安全加密存储。Spring AI Alibaba 简化了对接阿里云通义大模型的流程,内置 Nacos 集成也为开发者提供了无缝衔接云端配置托管的捷径,整体上极大提升了 AI 应用开发的灵活性和响应速度。
263 13
|
1月前
|
负载均衡 Java 开发者
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
128 5
|
3月前
|
人工智能 Java API
阿里云开源 AI 应用开发框架:Spring AI Alibaba
近期,阿里云重磅发布了首款面向 Java 开发者的开源 AI 应用开发框架:Spring AI Alibaba(项目 Github 仓库地址:alibaba/spring-ai-alibaba),Spring AI Alibaba 项目基于 Spring AI 构建,是阿里云通义系列模型及服务在 Java AI 应用开发领域的最佳实践,提供高层次的 AI API 抽象与云原生基础设施集成方案,帮助开发者快速构建 AI 应用。本文将详细介绍 Spring AI Alibaba 的核心特性,并通过「智能机票助手」的示例直观的展示 Spring AI Alibaba 开发 AI 应用的便利性。示例源
1629 12