从零学SpringCloud系列(九):利用kafka实现消息总线Spring Cloud Bus

简介: 从零学SpringCloud系列(九):利用kafka实现消息总线Spring Cloud Bus

一、什么是消息总线


相信大多数读者之前都使用过各种各样的消息队列,例如RabbitMQ、kafka等等,消息总线和他的概念差不多,在微服务系统的架构中,我们通常会使用轻量级的消息代理来 构建一个共用的消息主题让系统中所有的微服务都连接上来,由于该主题中产生的消息会被所有实例监听和消费,所以 我们称他们为消息总线。在总线上的各个实例都可以方便的广播一些需要让其他连接到该主题上的实例都知道的消息,例如配置的变更或者其他一些管理操作等。


二、整合消息总线实现配置自动刷新


在上一篇博客中spring cloud config 中我们实现了微服务架构中的分布式配置中心,但是存在一个问题就是,当我们在git上修改了配置以后,需要我们手动通知每一个服务实例,这样的操作在实例较多的项目中是会死人的,这样的问题sping cloud 家族肯定也是会考虑到并且给出解决方案的,下面我们就来搞一下。


2.1 面向客户端基本架构

image.png


当我们系统按照上图启动以后,图中的 serviceA的三个实例会请求Config Server以获取配置,Config Server根据应用配置的规则从Git仓库中获取配置信息并返回。


此时,如果我们想要修改serviceA的配置。首先,去git服务器上修改对应的参数值,但是这样并不会触发serviceA实例的属性更新。此时我们向实例3发送post请求,此时,实例3就会将刷新请求发送到消息总线中,该消息事件会被serviceA的实例1和实例2从总线中获取到,并重新从config server中获取他们的配置信息,从而实现配置信息的动态更新。


2.2 面向服务端的架构


image.png


在之前的架构中,服务的配置更新需要通过具体 服务中的某个实例发送请求,再触发对整个服务集群的配置更新。虽然能 伤心啊功能,但是 这样的结果是,我们指定的应用实例会不同于集群中的其他应用 实例,这样会增加集群内容的复杂度,不利于将来的运维工作。


三、利用kafka实现消息总线


3.1 Spring Boot 整合kafka


参考博文:https://blog.csdn.net/hao134838/article/details/90242719


如果 spring boot 版本采用 2.2.5,则kafka版本使用 2.4.0.RELEASE。


3.2 实现动态 刷新


我们利用上一篇博客中的config 的两个工程来进行改造。


3.2.1 服务端改造


增加依赖:

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-bus</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

增加配置:

spring.kafka.bootstrap-servers=211.159.167.180:9092
spring.kafka.consumer.group-id=test-consumer-group
spring.cloud.bus.enabled=true
management.endpoints.web.exposure.include= *


关于management.endpoints.web.exposure.include= * 的配置需要注意


注意:


如果是yum 的话 ‘’ 需要加 ‘ ’ 单引号*


include: '*' http://localhost:8769/actuator/bus-refresh 刷新所有微服务


include: 'refresh' http://localhost:8769/actuator/bus-refresh 不能访问


3.2.2 客户端改造


增加依赖:

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-bus</artifactId>
        </dependency>

增加配置:


1. management.endpoints.web.exposure.include= *
2. spring.kafka.bootstrap-servers=211.159.167.180:9092
3. spring.cloud.bus.enabled=true


这样就ok 了,启动项目以后,当配置修改以后,我们 给服务端发发送POST请求:http://localhost:7071/actuator/bus-refresh


就可以实现动态刷新:


完整项目地址:https://github.com/zhenghaoxiao/spring-cloud-in-action/tree/bus   bus 分支


3.3 指定刷新范围


在上面的例子中,我们通过向服务端请求/actuator/bus-refresh接口,从而触发总线上所有服务实例刷新,但是在一些特殊场景下,我们希望可以刷新服务中某个具体实例的配置,Spring Cloud Bus 对这种场景也有很好的支持,/actuator/bus-refresh?destination=customers:9000 提供了一个destination参数,用来定位具体要刷新的应用程序。当我们调用带有destination参数的 接口时,此时总线上的个应用实例会根据destination属性的值来判断是否为自己的实例名,若符合才进行配置刷新,若不符合就忽略该 消息。

目录
相关文章
|
5月前
|
消息中间件 Java Kafka
Spring Boot整合kafka
本文简要记录了Spring Boot与Kafka的整合过程。首先通过Docker搭建Kafka环境,包括Zookeeper和Kafka服务的配置文件。接着引入Spring Kafka依赖,并在`application.properties`中配置生产者和消费者参数。随后创建Kafka配置类,定义Topic及重试机制。最后实现生产者发送消息和消费者监听消息的功能,支持手动ACK确认。此方案适用于快速构建基于Spring Boot的Kafka消息系统。
954 7
|
6月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
189 10
|
6月前
|
负载均衡 Dubbo Java
Spring Cloud Alibaba与Spring Cloud区别和联系?
Spring Cloud Alibaba与Spring Cloud区别和联系?
|
7月前
|
消息中间件 Java Kafka
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
119 5
|
7月前
|
前端开发 Java Nacos
🛡️Spring Boot 3 整合 Spring Cloud Gateway 工程实践
本文介绍了如何使用Spring Cloud Alibaba 2023.0.0.0技术栈构建微服务网关,以应对微服务架构中流量治理与安全管控的复杂性。通过一个包含鉴权服务、文件服务和主服务的项目,详细讲解了网关的整合与功能开发。首先,通过统一路由配置,将所有请求集中到网关进行管理;其次,实现了限流防刷功能,防止恶意刷接口;最后,添加了登录鉴权机制,确保用户身份验证。整个过程结合Nacos注册中心,确保服务注册与配置管理的高效性。通过这些实践,帮助开发者更好地理解和应用微服务网关。
1110 0
🛡️Spring Boot 3 整合 Spring Cloud Gateway 工程实践
|
8月前
|
人工智能 安全 Java
AI 时代:从 Spring Cloud Alibaba 到 Spring AI Alibaba
本次分享由阿里云智能集团云原生微服务技术负责人李艳林主讲,主题为“AI时代:从Spring Cloud Alibaba到Spring AI Alibaba”。内容涵盖应用架构演进、AI agent框架发展趋势及Spring AI Alibaba的重磅发布。分享介绍了AI原生架构与传统架构的融合,强调了API优先、事件驱动和AI运维的重要性。同时,详细解析了Spring AI Alibaba的三层抽象设计,包括模型支持、工作流智能体编排及生产可用性构建能力,确保安全合规、高效部署与可观测性。最后,结合实际案例展示了如何利用私域数据优化AI应用,提升业务价值。
749 4
|
9月前
|
负载均衡 Java 开发者
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
深入探索Spring Cloud与Spring Boot:构建微服务架构的实践经验
517 5
|
10月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
523 5
|
10月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
278 1
|
11月前
|
负载均衡 Java API
【Spring Cloud生态】Spring Cloud Gateway基本配置
【Spring Cloud生态】Spring Cloud Gateway基本配置
730 0