Spring Cloud Stream 整合Kafka

简介: Spring Cloud Stream是一个构建消息驱动微服务的框架,抽象了MQ的使用方式, 提供统一的API操作。

Spring Cloud Stream是一个构建消息驱动微服务的框架,抽象了MQ的使用方式, 提供统一的API操作。Spring Cloud Stream通过Binder(绑定器)inputs/outputschannel完成应用程序和MQ的解耦。Spring Cloud Stream的模型如下图:
image.png

  • Binder

负责绑定应用程序和MQ中间件,即指定应用程序是和KafKa交互还是和RabbitMQ交互或者和其他的MQ中间件交互

  • inputs/outputs channel

inputs/outputs channel抽象发布订阅消息的方式,即无论是什么类型的MQ应用程序都通过统一的方式发布订阅消息

Spring Cloud Stream主要配置

  • binder

绑定MQ中间件及配置

  • bindings

管理所有的Topic

  • des1tination
    指定发布订阅的Topic
  • contentType
    指定发布订阅消息的格式
  • group
    指定消费者组,(一条消息只能被一组消息者中的一个消息者消费)

示例

高版本的Spring Cloud Stream提供两种使用方式,一种是使用yml配置的方式绑定生产/消费者,另一种是通过Function的方式绑定生产/消费者。以下代码为使用Function的方式绑定生产/消费

  1. 引入依赖

    <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-kafka</artifactId>
                <version>3.1.0</version>
    </dependency>
  2. 配置yml

    spring:
      cloud:
        stream:
          kafka:
            binder:
              auto-create-topics: true   # 自动创建topics
              brokers: ***.****.***.***:9092
          bindings:
            logP-out-0:    # 对用在ProducersConfig中的生产函数logP
              destination: log  # logP将数据发送的topic
              contentType: application/json
            logC-in-0:    # 对用在ConsumersConfig中的生产函数logC
              destination: log
              group: log_group
            addAgeC-in-0:
              destination: addAge
              group: addAge_group
          function:
            definition: logP;logC;addAgeC  # 指定对应的函数为Spring Cloud Stream中的生产消费通道
  3. 编写生产者

    方式1

    @Configuration
    public class ProducersConfig {
    
        private BlockingQueue<Person> unbounded = new LinkedBlockingQueue<>();
        
        /**
         * 对应yml中配置的logP-out-0通道,即topic log
         * @return java.util.function.Supplier<com.example.kafka.entity.Person>
         * @Date 2020-12-27
         **/
        @Bean
        public Supplier<Person> logP(){
            return () -> unbounded.poll();
        }
    
        /**
         * 调用本方法向log topic发送消息 
         * 
         * @param person: 
         * @return void
         * @Date 2020-12-27
         **/
        public void log(Person person){
            unbounded.offer(person);
        }
        
    }

    方式2

    @RestController
    public class UserController {
    
        @Autowired
        private StreamBridge streamBridge;
    
        @PostMapping("/addAge")
        public boolean addAge(@RequestBody Person person){
    
            person.setAge(RandomUtil.randomInt(10, 90));
            person.setSuccess(RandomUtil.randomBoolean());
            person.setBirthday(new Date());
    
            // 通过streamBridge直接对应的topic发送消息
            return streamBridge.send("addAge", person);
        }
        
    }
  4. 编写消费者

    @Configuration
    public class ConsumersConfig {
    
        /**
         * 对应yml中配置的logC-in-0通道,即topic log。
         * 消费topic log中的消息
         *
         * @return java.util.function.Consumer<com.example.kafka.entity.Person>
         * @Date 2020-12-27
         **/
        @Bean
        public Consumer<Person> logC() {
    
            return person -> {
                System.out.println("Received: " + person);
            };
        }
    
        /**
         * 对应yml中配置的addAgeC-in-0通道,即topic addAge。
         * 消费topic addAge中的消息
         *
         * @return java.util.function.Consumer<com.example.kafka.entity.Person>
         * @Date 2020-12-27
         **/
        @Bean
        public Consumer<Person> addAgeC(){
    
            return person -> {
    
                person.setAge(person.getAge() + 10);
                System.out.println("Consumer addAge: " + person.toString());
            };
        }
    }
  5. 发送消息

    @RestController
    public class UserController {
    
        @Autowired
        private StreamBridge streamBridge;
    
        @Autowired
        private ProducersConfig producersConfig;
    
        @PostMapping("/log")
        public void log(@RequestBody Person person){
    
            producersConfig.log(person);
        }
    
        @PostMapping("/addAge")
        public boolean addAge(@RequestBody Person person){
    
            person.setAge(RandomUtil.randomInt(10, 90));
            person.setSuccess(RandomUtil.randomBoolean());
            person.setBirthday(new Date());
    
            System.out.println("Producer addAge: " + person.toString());
            return streamBridge.send("addAge", person);
        }
    
    }
相关文章
消息中间件 Java Kafka
199 0
|
2月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
217 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
2月前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
191 2
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
7月前
|
消息中间件 Java Kafka
Spring Boot整合kafka
本文简要记录了Spring Boot与Kafka的整合过程。首先通过Docker搭建Kafka环境,包括Zookeeper和Kafka服务的配置文件。接着引入Spring Kafka依赖,并在`application.properties`中配置生产者和消费者参数。随后创建Kafka配置类,定义Topic及重试机制。最后实现生产者发送消息和消费者监听消息的功能,支持手动ACK确认。此方案适用于快速构建基于Spring Boot的Kafka消息系统。
1387 7
|
8月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
408 10
|
9月前
|
消息中间件 Java Kafka
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
172 5
|
12月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
616 5
|
12月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
451 1
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
249 0
|
消息中间件 Java 持续交付
Spring Cloud Alibaba 项目搭建步骤和注意事项
Spring Cloud Alibaba 项目搭建步骤和注意事项
2305 0
Spring Cloud Alibaba 项目搭建步骤和注意事项
下一篇
oss云网关配置