SpringCloud Day08--消息驱动(SpringCloud Stream)(二)

简介: SpringCloud Day08--消息驱动(SpringCloud Stream)(二)

发送消息接口实现类:

@EnableBinding(Source.class)//可以理解为是一个消息的发送管道的定义
@Slf4j
public class MessageProviderImpl implements IMessageProvider {
    @Resource
    private MessageChannel output;//消息发送管道
    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        this.output.send(MessageBuilder.withPayload(serial).build());//创建并发送消息
        log.info("*****serial:{}",serial);
        return serial;
    }
}

controller

@RestController
public class SendMessageController
{
    @Resource
    private IMessageProvider messageProvider;
    @GetMapping(value = "/sendMessage")
    public String sendMessage()
    {
        return messageProvider.send();
    }
}


  • 测试


1.启动Eureka7001,7002,7004;

2.启动RabbitMQ

3.启动8801

可以看到exchange中定义的交换机.



d7fed7247201599fc1c997b2542666ec.png


4.访问:http://localhost:8801/sendMessage


469ae25e0eaec2887a733363e07fcf68.png

74ab7032338fc7cb81df19d95220b502.png

11.4 消息驱动之消费者


  • 建Module—cloud-stream-rabbitmq-consumer8802
  • POM


<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--基础配置-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

YML

server:
  port: 8802
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.174.128
                port: 5672
                username: admin
                password: admin
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit #设置要绑定的消息服务的具体设置
eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka,http://eureka7004.com:7004/eureka  # 集群版
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: receive-8802.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址
  • 主启动类:
@SpringBootApplication
public class StreamMQMain8802
{
    public static void main(String[] args)
    {
        SpringApplication.run(StreamMQMain8802.class,args);
    }
}
  • 业务类
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
    @Value("${server.port}")
    private String serverPort;
    @StreamListener(Sink.INPUT)
    public void input(Message <String> message){
        System.out.println("消费者1号,------------>接收到的消息: "+message.getPayload()+"\t port: "+serverPort);
    }
}
  • 测试8801发送8802接收消息


访问:http://localhost:8801/sendMessage



7b6fdc38d93e4a895a6dc3f6bb07df26.png


1dc3fdcd8cf67b37af90b32a8b0eded4.png



11.5 分组消费与持久化


11.5.1 提出问题


  • 依照8802,clone出来一份运行8803
  • 启动Eureka7001,7002,7004、provider8001、consumer8002,8003
  • 运行后存在问题


1.有重复消费问题


30857f14a15c45086403cf3b4d6a1c3f.png

40509db9ebe3b4d106e191561ca39b3b.png9d5173af4113d62ce4c815e0bb0031a6.png


2.消息持久化问题


11.5.2 分析问题


目前是8802/8803同时都收到了,存在重复消费问题


如何解决?—分组和持久化属性group


生产实际案例


ec4b971748e6618c160ad8a25b470cbc.png

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。


不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。


11.5.3 分组案例


1.原理


微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。

不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。


2.代码演示


  • 8802/8803都变成不同组,group两个不同

8802修改YML

806505616c8428ad90ea17c0955c6122.png


8803修改YML


b485dc33e391bf05bcf7482b573a330a.png


我们自己配置:

b9bcbb85861985d6afdb2eb4e17ae290.png


分布式微服务应用为了实现高可用和负载均衡,实际上都会部署多个实例,本例启动了两个消费微服务(8802/8803)


多数情况,生产者发送消息给某个具体微服务时只希望被消费一次,按照上面我们启动两个应用的例子,虽然它们同属一个应用,但是这个消息出现了被重复消费两次的情况。为了解决这个问题,在Spring Cloud Stream中提供了消费组的概念。


结论:还是重复消费


8802/8803都变成相同组,group两个相同

8802/8803实现了轮询分组,每次只有一个消费者.8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费


8802修改YML,修改组属性为:group: atrjxyA


8803修改YML,修改组属性为:group: atrjxyA


结论:同一个组的多个微服务实例,每次只会有一个拿到


3c4a1572ecc441cc1e1845c041216fe9.png

c5387dd7a0c5803723e8ccfb2c7c58f6.png

cefc7feeb0854c16448918d478977900.png


11.5.4 持久化案例


  • 停止8802/8803并去除掉8802的分组group: atrjxyA,8803的分组group: atguiguA没有去掉
  • 8801先发送4条消息到rabbitmq
  • 先启动8802,无分组属性配置,后台没有打出来消息

b5b4fb2dc22f9409d52bfe6d6bd00673.png


  • 再启动8803,有分组属性配置,后台打出来了MQ上的消息


0f5020dba51c94717eb1d8ef96b1b9ff.png


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8天前
|
前端开发 Java Nacos
🛡️Spring Boot 3 整合 Spring Cloud Gateway 工程实践
本文介绍了如何使用Spring Cloud Alibaba 2023.0.0.0技术栈构建微服务网关,以应对微服务架构中流量治理与安全管控的复杂性。通过一个包含鉴权服务、文件服务和主服务的项目,详细讲解了网关的整合与功能开发。首先,通过统一路由配置,将所有请求集中到网关进行管理;其次,实现了限流防刷功能,防止恶意刷接口;最后,添加了登录鉴权机制,确保用户身份验证。整个过程结合Nacos注册中心,确保服务注册与配置管理的高效性。通过这些实践,帮助开发者更好地理解和应用微服务网关。
35 0
🛡️Spring Boot 3 整合 Spring Cloud Gateway 工程实践
|
14天前
|
消息中间件 Java Kafka
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
|
2月前
|
人工智能 安全 Java
AI 时代:从 Spring Cloud Alibaba 到 Spring AI Alibaba
本次分享由阿里云智能集团云原生微服务技术负责人李艳林主讲,主题为“AI时代:从Spring Cloud Alibaba到Spring AI Alibaba”。内容涵盖应用架构演进、AI agent框架发展趋势及Spring AI Alibaba的重磅发布。分享介绍了AI原生架构与传统架构的融合,强调了API优先、事件驱动和AI运维的重要性。同时,详细解析了Spring AI Alibaba的三层抽象设计,包括模型支持、工作流智能体编排及生产可用性构建能力,确保安全合规、高效部署与可观测性。最后,结合实际案例展示了如何利用私域数据优化AI应用,提升业务价值。
200 4
|
3月前
|
负载均衡 Java 开发者
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
219 5
|
5月前
|
负载均衡 Java API
【Spring Cloud生态】Spring Cloud Gateway基本配置
【Spring Cloud生态】Spring Cloud Gateway基本配置
91 0
|
7月前
|
Java Spring
【Azure Spring Cloud】Spring Cloud Azure 4.0 调用Key Vault遇见认证错误 AADSTS90002: Tenant not found.
【Azure Spring Cloud】Spring Cloud Azure 4.0 调用Key Vault遇见认证错误 AADSTS90002: Tenant not found.
152 1
|
7月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
102 0
|
7月前
|
消息中间件 Java 开发工具
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
|
7月前
|
Java Spring 容器
【Azure Spring Cloud】在Azure Spring Apps上看见 App Memory Usage 和 jvm.menory.use 的指标的疑问及OOM
【Azure Spring Cloud】在Azure Spring Apps上看见 App Memory Usage 和 jvm.menory.use 的指标的疑问及OOM
|
7月前
|
存储 Java Spring
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?