【Spring Cloud Stream 消息驱动】 —— 每天一点小知识

简介: 【Spring Cloud Stream 消息驱动】 —— 每天一点小知识

🐳Spring Cloud Stream 消息驱动

在微服务架构中,消息驱动是一种常见的通信方式,它通过解耦和异步处理提供了可靠的服务间通信机制。Spring Cloud Stream 是 Spring Cloud 生态系统中的一个模块,它提供了一种简化和统一的方式来构建基于消息驱动的应用程序。本文将介绍 Spring Cloud Stream 的基本概念和用法,并通过一个案例来说明如何使用 Spring Cloud Stream 实现消息驱动。


消息驱动概述

 💧在传统的应用程序架构中,服务间的通信通常是通过直接调用来完成的,这种紧耦合的方式会导致系统的可扩展性和灵活性受限。而消息驱动则提供了一种解耦的方式,将消息作为信息载体,在不同的服务之间传递和处理。消息驱动的架构模式具有以下优势:

  • 解耦性:消息驱动通过将消息作为中介来实现服务之间的通信,服务之间不直接依赖于彼此的存在和实现细节,从而实现解耦。
  • 异步处理:消息驱动允许发送方将消息发送到消息队列后立即返回,而不需要等待接收方处理完毕。这种异步处理的方式可以提高系统的响应性和吞吐量。
  • 可靠性:消息驱动使用消息队列来存储和传递消息,消息队列通常具备持久化、可靠性和高可用性的特性,从而确保消息的可靠传递。

 💧Spring Cloud Stream 提供了一种简化和统一的编程模型,使得开发人员可以更轻松地使用消息驱动来构建应用程序。它提供了抽象的消息绑定层,使得应用程序可以与不同的消息中间件(如 RabbitMQ、Kafka 等)进行集成,而无需关心底层消息中间件的细节。

案例说明

 💧我们以一个在线商城的订单系统为例来说明如何使用 Spring Cloud Stream 实现消息驱动。订单系统包括两个微服务:订单服务和库存服务。当用户下单时,订单服务将订单信息发布到消息队列中,库存服务订阅订单消息并处理库存扣减操作。

 💧接下来,让我们一步一步实现这个案例,并使用 RabbitMQ 作为消息中间件。

消息驱动之生产者

 💧首先,我们需要创建订单服务作为消息驱动的生产者。订单服务负责将订单信息发布到消息队列中。

  1. 在您的订单服务项目中,添加以下依赖:
<
dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  1. application.properties 文件中添加 RabbitMQ 的连接信息:
spring.cloud.stream.bindings.output.destination=orders
spring.cloud.stream.bindings.output.content-type=application/json
spring.rabbitmq.host=<RabbitMQ 主机名>
spring.rabbitmq.port=<RabbitMQ 端口>
spring.rabbitmq.username=<RabbitMQ 用户名>
spring.rabbitmq.password=<RabbitMQ 密码>
  1. 创建一个名为 OrderProducer 的类,用于发送订单消息:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Source.class)
public class OrderProducer {
    private final Source source;
    @Autowired
    public OrderProducer(Source source) {
        this.source = source;
    }
    public void sendOrderMessage(Order order) {
        source.output().send(MessageBuilder.withPayload(order).build());
    }
}
  1. 创建一个名为 Order 的类,用于表示订单信息:
public class Order {
    private String orderId;
    private String customerId;
    // 其他订单属性和方法省略...
    // 构造函数、getter、setter 省略...
}
  1. 在需要发送订单消息的地方,使用 OrderProducer 发送消息。例如,在一个订单控制器中:
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrderController {
    private final OrderProducer orderProducer;
    public OrderController(OrderProducer orderProducer) {
        this.orderProducer = orderProducer;
    }
    @PostMapping("/place-order")
    public String placeOrder(@RequestBody Order order) {
        // 处理订单逻辑...
        // 发送订单消息
        orderProducer.sendOrderMessage(order);
        return "Order placed successfully";
    }
}

 💧通过完成上述步骤,订单服务就成为了消息驱动的生产者,它将订单信息发布到名为 “orders” 的消息队列中。

 💧接下来,我们将创建库存服务作为消息驱动的消费者,并处理订单消息中的库存扣减操作。

消息驱动之消费者

 💧我们将创建库存服务作为消息驱动的消费者,它将订阅订单消息并处理库存扣减操作。

  1. 在库存服务项目中,添加以下依赖:
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  1. application.properties 文件中添加 RabbitMQ 的连接信息,与订单服务中的配置保持一致。
  2. 创建一个名为 OrderConsumer 的类,用于接收和处理订单消息:
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class OrderConsumer {
    @StreamListener(Sink.INPUT)
    public void processOrderMessage(Order order) {
        // 处理订单消息,执行库存扣减操作
        // ...
        // 打印日志
        System.out.println("Received order: " + order.getOrderId());
    }
}
  1. 启动库存服务应用程序。库存服务现在已经成为消息驱动的消费者,并订阅了名为 “orders” 的消息队列。每当有订单消息到达队列时,processOrderMessage 方法将被调用,并处理相应的库存扣减操作。

 💧通过以上步骤,我们完成了消息驱动的生产者和消费者的搭建。订单服务作为生产者将订单消息发布到消息队列中,而库存服务作为消费者订阅订单消息并处理相应的业务逻辑。

分组与持久化

 💧Spring Cloud Stream 还提供了一些高级功能,例如消费者分组和消息持久化,以满足更复杂的应用需求。

消费者分组

 💧消费者分组可以确保相同分组名称的消费者实例共享消息的处理负载。这对于水平扩展和负载均衡非常有用。

 💧要为消费者设置分组,只需在消费者类上添加 @EnableBinding 注解,并在 @StreamListener 注解中指定分组名称,如下所示:

@Component
@EnableBinding(Sink.class)
public class OrderConsumer {
    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='order' and headers['group']=='group1'")
    public void processOrderMessage(Order order) {
        // 处理订单消息
        // ...
    }
}

消息持久化

 💧消息持久化是确保消息在发生故障或重启后仍然可靠地传递的重要机制。Spring Cloud Stream 默认情况下会将消息持久化到消息中间件中,但需要确保消息中间件也配置了持久化机制。

 💧例如,对于 RabbitMQ,可以通过在 application.properties 文件中设置以下属性来启用消息持久化:

spring.cloud.stream.rabbit.bindings.input.consumer.de
clare-durable-queue=true
spring.cloud.stream.rabbit.bindings.input.consumer.durable-subscription=true

 💧这将确保消费者队列和订阅是持久化的。

总结

通过使用 Spring Cloud Stream,我们可以轻松构建基于消息驱动的应用程序。本文介绍了消息驱动的概念,通过一个在线商城的订单系统案例演示了如何使用 Spring Cloud Stream 来实现消息驱动。我们创建了订单服务作为消息驱动的生产者,将订单信息发布到消息队列中;同时创建了库存服务作为消息驱动的消费者,订阅订单消息并处理库存扣减操作。此外,还介绍了消费者分组和消息持久化的高级功能。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
21天前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
31 6
|
21天前
|
Java 关系型数据库 MySQL
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
39 5
|
21天前
|
缓存 监控 Java
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
31 5
|
24天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
36 1
|
3月前
|
人工智能 开发框架 Java
重磅发布!AI 驱动的 Java 开发框架:Spring AI Alibaba
随着生成式 AI 的快速发展,基于 AI 开发框架构建 AI 应用的诉求迅速增长,涌现出了包括 LangChain、LlamaIndex 等开发框架,但大部分框架只提供了 Python 语言的实现。但这些开发框架对于国内习惯了 Spring 开发范式的 Java 开发者而言,并非十分友好和丝滑。因此,我们基于 Spring AI 发布并快速演进 Spring AI Alibaba,通过提供一种方便的 API 抽象,帮助 Java 开发者简化 AI 应用的开发。同时,提供了完整的开源配套,包括可观测、网关、消息队列、配置中心等。
2859 13
|
5月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
15051 32
|
5月前
|
负载均衡 Java Spring
Spring cloud gateway 如何在路由时进行负载均衡
Spring cloud gateway 如何在路由时进行负载均衡
582 15
|
4月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
72 0
|
5月前
|
Java Spring
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
126 3
|
4月前
|
消息中间件 Java 开发工具
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
下一篇
DataWorks