《微服务实战》 第十六章 Spring cloud stream应用

简介: 《微服务实战》 第十六章 Spring cloud stream应用

前言

https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。

SpringCloud stream通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

Stream让我们不再关注具体MQ的细节我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换,总的来说Stream能够屏蔽底层消息中间件的差异、降低切换成本,是统一消息的编程模型。

1、stream设计思想

  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

2、编码常用的注解

组成 说明
Middleware 中间件,目前只支持RabbitMQ和Kafka
Binder Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过BInder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
@Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道channel和exchange绑定在一起

3、编码步骤

3.1、添加依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

3.2、修改配置文件

server:
  port: 8088
spring:
  cloud:
    stream:
      binders: #需要绑定的rabbitmq的服务信息
        defaultRabbit:  #定义的名称,用于bidding整合
          type: rabbit  #消息组件类型
          environment:  #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost   #rabbitmq 服务器的地址
                port: 5672           #rabbitmq 服务器端口
                username: tiger       #rabbitmq 用户名
                password: tiger       #rabbitmq 密码
                virtual-host: tiger_vh  #虚拟路径
      bindings:        #服务的整合处理
        saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
          destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
        saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道
          destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组

3.3、生产

/**
 * 订单消息输出通道处理器
 */
@Component
public interface OrderOutputChannelProcesor {
    @Output("saveOrderOutput")
    MessageChannel saveOrderOutput();
}
@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {
    @Autowired
    @Output("saveOrderOutput")
    private MessageChannel messageChannel;
    public void sentMsg(UserInfo userInfo){
        messageChannel.send(MessageBuilder.withPayload(userInfo).build());
        log.info("消息发送成功:" + userInfo);
    }
}

3.4、消费

/**
 * 订单消息输入通道处理器
 */
@Component
public interface OrderInputChannelProcesor {
    @Input("saveOrderInput")
    SubscribableChannel saveOrderInput();
}
@Slf4j
@EnableBinding(OrderInputChannelProcesor.class)
public class OrderMessageConsumer {
    @StreamListener("saveOrderInput")
    public void receiveMsg(Message<UserInfo> userInfoMessage){
        log.info("接收消息成功:" + userInfoMessage.getPayload());
    }
}

3.5、延迟队列

安装延迟队列插件:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez

下载解压,到plugins目录,执行以下的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.5.1、修改配置文件

server:
  port: 8088
spring:
  cloud:
    stream:
      binders: #需要绑定的rabbitmq的服务信息
        defaultRabbit:  #定义的名称,用于bidding整合
          type: rabbit  #消息组件类型
          environment:  #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost   #rabbitmq 服务器的地址
                port: 5672           #rabbitmq 服务器端口
                username: tiger       #rabbitmq 用户名
                password: tiger       #rabbitmq 密码
                virtual-host: tiger_vh  #虚拟路径
      bindings:        #服务的整合处理
        saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
          destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
        saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道
          destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
      rabbit:
        bindings: #服务的整合处理
          saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
            producer:
              delayed-exchange: true
          saveOrderInput:
            consumer:
              delayed-exchange: true

3.5.2、生产端

@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {
    @Autowired
    @Output("saveOrderOutput")
    private MessageChannel messageChannel;
    public void sentMsg(UserInfo userInfo){
        messageChannel.send(MessageBuilder.withPayload(userInfo).setHeader("x-delay", 5000).build());
        log.info("消息发送成功:" + userInfo);
    }
}

3.5.2、消息确认机制 消费端

rabbit:
  bindings: #服务的整合处理
    saveOrderInput:
      consumer:
        acknowledge-mode: MANUAL #手动确认
@StreamListener("saveOrderInput")
public void receiveMsg(Message<UserInfo> userInfoMessage){
    log.info("接收消息成功:" + userInfoMessage.getPayload());
    Channel channel = (Channel) userInfoMessage.getHeaders().get(AmqpHeaders.CHANNEL);
    Long delieverTag = (Long) userInfoMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    /*
     * deliveryTag:Channel的消息投递的唯一标识符。
     * multiple:是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;
     * 如果设置为false,则仅否定应答带指定deliveryTag的单条消息。
     * requeue:被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;
     * 如果设置为false,则消息被丢弃或发送到死信Exchange。
     */
    try {
        channel.basicAck(delieverTag,true);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

定义交换机类型为direct

rabbit:
  bindings: #服务的整合处理
    saveOrderInput:
      consumer:
        bindingRoutingKey: orderRoutingKey
        bindQueue: true
        exchangeType: direct
    saveOrderOutput:
      producer:
        routingKeyExpression: '''orderRoutingKey'''
        exchangeType: direct


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
9天前
|
Java Nacos Sentinel
Spring Cloud Alibaba:一站式微服务解决方案
Spring Cloud Alibaba(简称SCA) 是一个基于 Spring Cloud 构建的开源微服务框架,专为解决分布式系统中的服务治理、配置管理、服务发现、消息总线等问题而设计。
111 13
Spring Cloud Alibaba:一站式微服务解决方案
|
6天前
|
JSON Java Nacos
SpringCloud 应用 Nacos 配置中心注解
在 Spring Cloud 应用中可以非常低成本地集成 Nacos 实现配置动态刷新,在应用程序代码中通过 Spring 官方的注解 @Value 和 @ConfigurationProperties,引用 Spring enviroment 上下文中的属性值,这种用法的最大优点是无代码层面侵入性,但也存在诸多限制,为了解决问题,提升应用接入 Nacos 配置中心的易用性,Spring Cloud Alibaba 发布一套全新的 Nacos 配置中心的注解。
|
15天前
|
XML Java 数据格式
Spring Core核心类库的功能与应用实践分析
【12月更文挑战第1天】大家好,今天我们来聊聊Spring Core这个强大的核心类库。Spring Core作为Spring框架的基础,提供了控制反转(IOC)和依赖注入(DI)等核心功能,以及企业级功能,如JNDI和定时任务等。通过本文,我们将从概述、功能点、背景、业务点、底层原理等多个方面深入剖析Spring Core,并通过多个Java示例展示其应用实践,同时指出对应实践的优缺点。
40 14
|
16天前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
31 6
|
16天前
|
负载均衡 Java 开发者
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
59 5
|
13天前
|
XML 前端开发 安全
Spring MVC:深入理解与应用实践
Spring MVC是Spring框架提供的一个用于构建Web应用程序的Model-View-Controller(MVC)实现。它通过分离业务逻辑、数据、显示来组织代码,使得Web应用程序的开发变得更加简洁和高效。本文将从概述、功能点、背景、业务点、底层原理等多个方面深入剖析Spring MVC,并通过多个Java示例展示其应用实践,同时指出对应实践的优缺点。
40 2
|
16天前
|
Java 关系型数据库 MySQL
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
36 5
|
16天前
|
缓存 监控 Java
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
27 5
|
16天前
|
Prometheus 监控 Java
如何全面监控所有的 Spring Boot 微服务
如何全面监控所有的 Spring Boot 微服务
30 3
|
18天前
|
Cloud Native 安全 持续交付
深入理解微服务架构及其在现代软件开发中的应用
深入理解微服务架构及其在现代软件开发中的应用
39 3