第十六章 Spring cloud stream应用

简介: 第十六章 Spring cloud stream应用

前言


https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。

SpringCloud stream通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

Stream让我们不再关注具体MQ的细节我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换,总的来说Stream能够屏蔽底层消息中间件的差异、降低切换成本,是统一消息的编程模型。


1、stream设计思想


  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。


2、编码常用的注解


组成 说明
Middleware 中间件,目前只支持RabbitMQ和Kafka
Binder Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过BInder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
@Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道channel和exchange绑定在一起


3、编码步骤


3.1、添加依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>


3.2、修改配置文件

server:
  port: 8088

spring:
  cloud:
    stream:
      binders: #需要绑定的rabbitmq的服务信息
        defaultRabbit:  #定义的名称,用于bidding整合
          type: rabbit  #消息组件类型
          environment:  #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost   #rabbitmq 服务器的地址
                port: 5672           #rabbitmq 服务器端口
                username: tiger       #rabbitmq 用户名
                password: tiger       #rabbitmq 密码
                virtual-host: tiger_vh  #虚拟路径
      bindings:        #服务的整合处理
        saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
          destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
        saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道
          destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组


3.3、生产

/**
 * 订单消息输出通道处理器
 */
@Component
public interface OrderOutputChannelProcesor {
    @Output("saveOrderOutput")
    MessageChannel saveOrderOutput();
}
@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {
    @Autowired
    @Output("saveOrderOutput")
    private MessageChannel messageChannel;

    public void sentMsg(UserInfo userInfo){
        messageChannel.send(MessageBuilder.withPayload(userInfo).build());
        log.info("消息发送成功:" + userInfo);
    }
}


3.4、消费

/**
 * 订单消息输入通道处理器
 */
@Component
public interface OrderInputChannelProcesor {
    @Input("saveOrderInput")
    SubscribableChannel saveOrderInput();
}
@Slf4j
@EnableBinding(OrderInputChannelProcesor.class)
public class OrderMessageConsumer {
    @StreamListener("saveOrderInput")
    public void receiveMsg(Message<UserInfo> userInfoMessage){
        log.info("接收消息成功:" + userInfoMessage.getPayload());
    }
}

3.5、延迟队列


安装延迟队列插件:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez

下载解压,到plugins目录,执行以下的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.5.1、修改配置文件

server:
  port: 8088

spring:
  cloud:
    stream:
      binders: #需要绑定的rabbitmq的服务信息
        defaultRabbit:  #定义的名称,用于bidding整合
          type: rabbit  #消息组件类型
          environment:  #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost   #rabbitmq 服务器的地址
                port: 5672           #rabbitmq 服务器端口
                username: tiger       #rabbitmq 用户名
                password: tiger       #rabbitmq 密码
                virtual-host: tiger_vh  #虚拟路径
      bindings:        #服务的整合处理
        saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
          destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
        saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道
          destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
      rabbit:
        bindings: #服务的整合处理
          saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
            producer:
              delayed-exchange: true
          saveOrderInput:
            consumer:
              delayed-exchange: true


3.5.2、生产端

@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {
    @Autowired
    @Output("saveOrderOutput")
    private MessageChannel messageChannel;

    public void sentMsg(UserInfo userInfo){
        messageChannel.send(MessageBuilder.withPayload(userInfo).setHeader("x-delay", 5000).build());
        log.info("消息发送成功:" + userInfo);
    }
}


3.5.2、消息确认机制 消费端

rabbit:
  bindings: #服务的整合处理
    saveOrderInput:
      consumer:
        acknowledge-mode: MANUAL #手动确认


@StreamListener("saveOrderInput")
public void receiveMsg(Message<UserInfo> userInfoMessage){

    log.info("接收消息成功:" + userInfoMessage.getPayload());
    Channel channel = (Channel) userInfoMessage.getHeaders().get(AmqpHeaders.CHANNEL);
    Long delieverTag = (Long) userInfoMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    /*
     * deliveryTag:Channel的消息投递的唯一标识符。
     * multiple:是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;
     * 如果设置为false,则仅否定应答带指定deliveryTag的单条消息。
     * requeue:被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;
     * 如果设置为false,则消息被丢弃或发送到死信Exchange。
     */
    try {
        channel.basicAck(delieverTag,true);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

定义交换机类型为direct

rabbit:
  bindings: #服务的整合处理
    saveOrderInput:
      consumer:
        bindingRoutingKey: orderRoutingKey
        bindQueue: true
        exchangeType: direct
    saveOrderOutput:
      producer:
        routingKeyExpression: '''orderRoutingKey'''
        exchangeType: direct
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
监控 Java API
Spring Boot 3.2 结合 Spring Cloud 微服务架构实操指南 现代分布式应用系统构建实战教程
Spring Boot 3.2 + Spring Cloud 2023.0 微服务架构实践摘要 本文基于Spring Boot 3.2.5和Spring Cloud 2023.0.1最新稳定版本,演示现代微服务架构的构建过程。主要内容包括: 技术栈选择:采用Spring Cloud Netflix Eureka 4.1.0作为服务注册中心,Resilience4j 2.1.0替代Hystrix实现熔断机制,配合OpenFeign和Gateway等组件。 核心实操步骤: 搭建Eureka注册中心服务 构建商品
379 3
|
26天前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
2月前
|
人工智能 监控 安全
如何快速上手【Spring AOP】?核心应用实战(上篇)
哈喽大家好吖~欢迎来到Spring AOP系列教程的上篇 - 应用篇。在本篇,我们将专注于Spring AOP的实际应用,通过具体的代码示例和场景分析,帮助大家掌握AOP的使用方法和技巧。而在后续的下篇中,我们将深入探讨Spring AOP的实现原理和底层机制。 AOP(Aspect-Oriented Programming,面向切面编程)是Spring框架中的核心特性之一,它能够帮助我们解决横切关注点(如日志记录、性能统计、安全控制、事务管理等)的问题,提高代码的模块化程度和复用性。
|
2月前
|
安全 算法 Java
在Spring Boot中应用Jasypt以加密配置信息。
通过以上步骤,可以在Spring Boot应用中有效地利用Jasypt对配置信息进行加密,这样即使配置文件被泄露,其中的敏感信息也不会直接暴露给攻击者。这是一种在不牺牲操作复杂度的情况下提升应用安全性的简便方法。
718 10
|
3月前
|
安全 Java Nacos
0代码改动实现Spring应用数据库帐密自动轮转
Nacos作为国内被广泛使用的配置中心,已经成为应用侧的基础设施产品,近年来安全问题被更多关注,这是中国国内软件行业逐渐迈向成熟的标志,也是必经之路,Nacos提供配置加密存储-运行时轮转的核心安全能力,将在应用安全领域承担更多职责。
|
3月前
|
NoSQL Java Redis
Redis基本数据类型及Spring Data Redis应用
Redis 是开源高性能键值对数据库,支持 String、Hash、List、Set、Sorted Set 等数据结构,适用于缓存、消息队列、排行榜等场景。具备高性能、原子操作及丰富功能,是分布式系统核心组件。
398 2
|
3月前
|
Java Linux 网络安全
Linux云端服务器上部署Spring Boot应用的教程。
此流程涉及Linux命令行操作、系统服务管理及网络安全知识,需要管理员权限以进行配置和服务管理。务必在一个测试环境中验证所有步骤,确保一切配置正确无误后,再将应用部署到生产环境中。也可以使用如Ansible、Chef等配置管理工具来自动化部署过程,提升效率和可靠性。
353 13
|
4月前
|
SQL Java 数据库
解决Java Spring Boot应用中MyBatis-Plus查询问题的策略。
保持技能更新是侦探的重要素质。定期回顾最佳实践和新技术。比如,定期查看MyBatis-Plus的更新和社区的最佳做法,这样才能不断提升查询效率和性能。
167 1
|
5月前
|
安全 Java API
Spring Boot 功能模块全解析:构建现代Java应用的技术图谱
Spring Boot不是一个单一的工具,而是一个由众多功能模块组成的生态系统。这些模块可以根据应用需求灵活组合,构建从简单的REST API到复杂的微服务系统,再到现代的AI驱动应用。
|
4月前
|
Java 数据库 开发者
Spring Boot 框架超级详细总结及长尾关键词应用解析
本文深入讲解Spring Boot框架的核心概念、功能特性及实际应用,涵盖自动配置、独立运行、starter依赖等优势。通过Web开发、微服务架构、批处理等适用场景分析,结合在线书店实战案例,演示项目初始化、数据库设计、分层架构实现全流程。同时探讨热部署、多环境配置、缓存机制与事务管理等高级特性,助你高效掌握Spring Boot开发技巧。代码示例详尽,适合从入门到进阶的学习者。
1253 0