《微服务实战》 第十六章 Spring cloud stream应用

简介: 《微服务实战》 第十六章 Spring cloud stream应用

前言

https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。

SpringCloud stream通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

Stream让我们不再关注具体MQ的细节我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换,总的来说Stream能够屏蔽底层消息中间件的差异、降低切换成本,是统一消息的编程模型。

1、stream设计思想

  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

2、编码常用的注解

组成 说明
Middleware 中间件,目前只支持RabbitMQ和Kafka
Binder Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过BInder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
@Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道channel和exchange绑定在一起

3、编码步骤

3.1、添加依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

3.2、修改配置文件

server:
  port: 8088
spring:
  cloud:
    stream:
      binders: #需要绑定的rabbitmq的服务信息
        defaultRabbit:  #定义的名称,用于bidding整合
          type: rabbit  #消息组件类型
          environment:  #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost   #rabbitmq 服务器的地址
                port: 5672           #rabbitmq 服务器端口
                username: tiger       #rabbitmq 用户名
                password: tiger       #rabbitmq 密码
                virtual-host: tiger_vh  #虚拟路径
      bindings:        #服务的整合处理
        saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
          destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
        saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道
          destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组

3.3、生产

/**
 * 订单消息输出通道处理器
 */
@Component
public interface OrderOutputChannelProcesor {
    @Output("saveOrderOutput")
    MessageChannel saveOrderOutput();
}
@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {
    @Autowired
    @Output("saveOrderOutput")
    private MessageChannel messageChannel;
    public void sentMsg(UserInfo userInfo){
        messageChannel.send(MessageBuilder.withPayload(userInfo).build());
        log.info("消息发送成功:" + userInfo);
    }
}

3.4、消费

/**
 * 订单消息输入通道处理器
 */
@Component
public interface OrderInputChannelProcesor {
    @Input("saveOrderInput")
    SubscribableChannel saveOrderInput();
}
@Slf4j
@EnableBinding(OrderInputChannelProcesor.class)
public class OrderMessageConsumer {
    @StreamListener("saveOrderInput")
    public void receiveMsg(Message<UserInfo> userInfoMessage){
        log.info("接收消息成功:" + userInfoMessage.getPayload());
    }
}

3.5、延迟队列

安装延迟队列插件:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez

下载解压,到plugins目录,执行以下的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.5.1、修改配置文件

server:
  port: 8088
spring:
  cloud:
    stream:
      binders: #需要绑定的rabbitmq的服务信息
        defaultRabbit:  #定义的名称,用于bidding整合
          type: rabbit  #消息组件类型
          environment:  #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost   #rabbitmq 服务器的地址
                port: 5672           #rabbitmq 服务器端口
                username: tiger       #rabbitmq 用户名
                password: tiger       #rabbitmq 密码
                virtual-host: tiger_vh  #虚拟路径
      bindings:        #服务的整合处理
        saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
          destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
        saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道
          destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
      rabbit:
        bindings: #服务的整合处理
          saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
            producer:
              delayed-exchange: true
          saveOrderInput:
            consumer:
              delayed-exchange: true

3.5.2、生产端

@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {
    @Autowired
    @Output("saveOrderOutput")
    private MessageChannel messageChannel;
    public void sentMsg(UserInfo userInfo){
        messageChannel.send(MessageBuilder.withPayload(userInfo).setHeader("x-delay", 5000).build());
        log.info("消息发送成功:" + userInfo);
    }
}

3.5.2、消息确认机制 消费端

rabbit:
  bindings: #服务的整合处理
    saveOrderInput:
      consumer:
        acknowledge-mode: MANUAL #手动确认
@StreamListener("saveOrderInput")
public void receiveMsg(Message<UserInfo> userInfoMessage){
    log.info("接收消息成功:" + userInfoMessage.getPayload());
    Channel channel = (Channel) userInfoMessage.getHeaders().get(AmqpHeaders.CHANNEL);
    Long delieverTag = (Long) userInfoMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    /*
     * deliveryTag:Channel的消息投递的唯一标识符。
     * multiple:是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;
     * 如果设置为false,则仅否定应答带指定deliveryTag的单条消息。
     * requeue:被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;
     * 如果设置为false,则消息被丢弃或发送到死信Exchange。
     */
    try {
        channel.basicAck(delieverTag,true);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

定义交换机类型为direct

rabbit:
  bindings: #服务的整合处理
    saveOrderInput:
      consumer:
        bindingRoutingKey: orderRoutingKey
        bindQueue: true
        exchangeType: direct
    saveOrderOutput:
      producer:
        routingKeyExpression: '''orderRoutingKey'''
        exchangeType: direct


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
20天前
|
Dubbo Java 应用服务中间件
Spring Cloud Dubbo:微服务通信的高效解决方案
【10月更文挑战第15天】随着信息技术的发展,微服务架构成为企业应用开发的主流。Spring Cloud Dubbo结合了Dubbo的高性能RPC和Spring Cloud的生态系统,提供高效、稳定的微服务通信解决方案。它支持多种通信协议,具备服务注册与发现、负载均衡及容错机制,简化了服务调用的复杂性,使开发者能更专注于业务逻辑的实现。
44 2
|
10天前
|
JavaScript 安全 Java
如何使用 Spring Boot 和 Ant Design Pro Vue 构建一个具有动态路由和菜单功能的前后端分离应用。
本文介绍了如何使用 Spring Boot 和 Ant Design Pro Vue 构建一个具有动态路由和菜单功能的前后端分离应用。首先,创建并配置 Spring Boot 项目,实现后端 API;然后,使用 Ant Design Pro Vue 创建前端项目,配置动态路由和菜单。通过具体案例,展示了如何快速搭建高效、易维护的项目框架。
88 62
|
7天前
|
人工智能 前端开发 Java
基于开源框架Spring AI Alibaba快速构建Java应用
本文旨在帮助开发者快速掌握并应用 Spring AI Alibaba,提升基于 Java 的大模型应用开发效率和安全性。
基于开源框架Spring AI Alibaba快速构建Java应用
|
8天前
|
JavaScript 安全 Java
如何使用 Spring Boot 和 Ant Design Pro Vue 构建一个前后端分离的应用框架,实现动态路由和菜单功能
本文介绍了如何使用 Spring Boot 和 Ant Design Pro Vue 构建一个前后端分离的应用框架,实现动态路由和菜单功能。首先,确保开发环境已安装必要的工具,然后创建并配置 Spring Boot 项目,包括添加依赖和配置 Spring Security。接着,创建后端 API 和前端项目,配置动态路由和菜单。最后,运行项目并分享实践心得,帮助开发者提高开发效率和应用的可维护性。
21 2
|
16天前
|
自然语言处理 Java API
Spring Boot 接入大模型实战:通义千问赋能智能应用快速构建
【10月更文挑战第23天】在人工智能(AI)技术飞速发展的今天,大模型如通义千问(阿里云推出的生成式对话引擎)等已成为推动智能应用创新的重要力量。然而,对于许多开发者而言,如何高效、便捷地接入这些大模型并构建出功能丰富的智能应用仍是一个挑战。
69 6
|
21天前
|
人工智能 开发框架 Java
总计 30 万奖金,Spring AI Alibaba 应用框架挑战赛开赛
Spring AI Alibaba 应用框架挑战赛邀请广大开发者参与开源项目的共建,助力项目快速发展,掌握 AI 应用开发模式。大赛分为《支持 Spring AI Alibaba 应用可视化调试与追踪本地工具》和《基于 Flow 的 AI 编排机制设计与实现》两个赛道,总计 30 万奖金。
|
20天前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
52 2
|
19天前
|
JSON Java 数据格式
【微服务】SpringCloud之Feign远程调用
本文介绍了使用Feign作为HTTP客户端替代RestTemplate进行远程调用的优势及具体使用方法。Feign通过声明式接口简化了HTTP请求的发送,提高了代码的可读性和维护性。文章详细描述了Feign的搭建步骤,包括引入依赖、添加注解、编写FeignClient接口和调用代码,并提供了自定义配置的示例,如修改日志级别等。
44 1
|
15天前
|
存储 Java 数据管理
强大!用 @Audited 注解增强 Spring Boot 应用,打造健壮的数据审计功能
本文深入介绍了如何在Spring Boot应用中使用`@Audited`注解和`spring-data-envers`实现数据审计功能,涵盖从添加依赖、配置实体类到查询审计数据的具体步骤,助力开发人员构建更加透明、合规的应用系统。
|
22天前
|
XML Java 数据格式
Spring IOC容器的深度解析及实战应用
【10月更文挑战第14天】在软件工程中,随着系统规模的扩大,对象间的依赖关系变得越来越复杂,这导致了系统的高耦合度,增加了开发和维护的难度。为解决这一问题,Michael Mattson在1996年提出了IOC(Inversion of Control,控制反转)理论,旨在降低对象间的耦合度,提高系统的灵活性和可维护性。Spring框架正是基于这一理论,通过IOC容器实现了对象间的依赖注入和生命周期管理。
53 0