Spring Cloud Stream 整合Kafka

简介: Spring Cloud Stream是一个构建消息驱动微服务的框架,抽象了MQ的使用方式, 提供统一的API操作。

Spring Cloud Stream是一个构建消息驱动微服务的框架,抽象了MQ的使用方式, 提供统一的API操作。Spring Cloud Stream通过Binder(绑定器)inputs/outputschannel完成应用程序和MQ的解耦。Spring Cloud Stream的模型如下图:
image.png

  • Binder

负责绑定应用程序和MQ中间件,即指定应用程序是和KafKa交互还是和RabbitMQ交互或者和其他的MQ中间件交互

  • inputs/outputs channel

inputs/outputs channel抽象发布订阅消息的方式,即无论是什么类型的MQ应用程序都通过统一的方式发布订阅消息

Spring Cloud Stream主要配置

  • binder

绑定MQ中间件及配置

  • bindings

管理所有的Topic

  • des1tination
    指定发布订阅的Topic
  • contentType
    指定发布订阅消息的格式
  • group
    指定消费者组,(一条消息只能被一组消息者中的一个消息者消费)

示例

高版本的Spring Cloud Stream提供两种使用方式,一种是使用yml配置的方式绑定生产/消费者,另一种是通过Function的方式绑定生产/消费者。以下代码为使用Function的方式绑定生产/消费

  1. 引入依赖

    <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-kafka</artifactId>
                <version>3.1.0</version>
    </dependency>
  2. 配置yml

    spring:
      cloud:
        stream:
          kafka:
            binder:
              auto-create-topics: true   # 自动创建topics
              brokers: ***.****.***.***:9092
          bindings:
            logP-out-0:    # 对用在ProducersConfig中的生产函数logP
              destination: log  # logP将数据发送的topic
              contentType: application/json
            logC-in-0:    # 对用在ConsumersConfig中的生产函数logC
              destination: log
              group: log_group
            addAgeC-in-0:
              destination: addAge
              group: addAge_group
          function:
            definition: logP;logC;addAgeC  # 指定对应的函数为Spring Cloud Stream中的生产消费通道
  3. 编写生产者

    方式1

    @Configuration
    public class ProducersConfig {
    
        private BlockingQueue<Person> unbounded = new LinkedBlockingQueue<>();
        
        /**
         * 对应yml中配置的logP-out-0通道,即topic log
         * @return java.util.function.Supplier<com.example.kafka.entity.Person>
         * @Date 2020-12-27
         **/
        @Bean
        public Supplier<Person> logP(){
            return () -> unbounded.poll();
        }
    
        /**
         * 调用本方法向log topic发送消息 
         * 
         * @param person: 
         * @return void
         * @Date 2020-12-27
         **/
        public void log(Person person){
            unbounded.offer(person);
        }
        
    }

    方式2

    @RestController
    public class UserController {
    
        @Autowired
        private StreamBridge streamBridge;
    
        @PostMapping("/addAge")
        public boolean addAge(@RequestBody Person person){
    
            person.setAge(RandomUtil.randomInt(10, 90));
            person.setSuccess(RandomUtil.randomBoolean());
            person.setBirthday(new Date());
    
            // 通过streamBridge直接对应的topic发送消息
            return streamBridge.send("addAge", person);
        }
        
    }
  4. 编写消费者

    @Configuration
    public class ConsumersConfig {
    
        /**
         * 对应yml中配置的logC-in-0通道,即topic log。
         * 消费topic log中的消息
         *
         * @return java.util.function.Consumer<com.example.kafka.entity.Person>
         * @Date 2020-12-27
         **/
        @Bean
        public Consumer<Person> logC() {
    
            return person -> {
                System.out.println("Received: " + person);
            };
        }
    
        /**
         * 对应yml中配置的addAgeC-in-0通道,即topic addAge。
         * 消费topic addAge中的消息
         *
         * @return java.util.function.Consumer<com.example.kafka.entity.Person>
         * @Date 2020-12-27
         **/
        @Bean
        public Consumer<Person> addAgeC(){
    
            return person -> {
    
                person.setAge(person.getAge() + 10);
                System.out.println("Consumer addAge: " + person.toString());
            };
        }
    }
  5. 发送消息

    @RestController
    public class UserController {
    
        @Autowired
        private StreamBridge streamBridge;
    
        @Autowired
        private ProducersConfig producersConfig;
    
        @PostMapping("/log")
        public void log(@RequestBody Person person){
    
            producersConfig.log(person);
        }
    
        @PostMapping("/addAge")
        public boolean addAge(@RequestBody Person person){
    
            person.setAge(RandomUtil.randomInt(10, 90));
            person.setSuccess(RandomUtil.randomBoolean());
            person.setBirthday(new Date());
    
            System.out.println("Producer addAge: " + person.toString());
            return streamBridge.send("addAge", person);
        }
    
    }
相关文章
|
2月前
|
消息中间件 Cloud Native Java
【Spring云原生系列】SpringBoot+Spring Cloud Stream:消息驱动架构(MDA)解析,实现异步处理与解耦合
【Spring云原生系列】SpringBoot+Spring Cloud Stream:消息驱动架构(MDA)解析,实现异步处理与解耦合
|
24天前
|
消息中间件 Java Kafka
Spring整合kafka
Spring整合kafka
|
4月前
|
消息中间件
SpringCloud Stream集成RabbitMQ
SpringCloud Stream集成RabbitMQ
60 0
|
5月前
|
消息中间件 存储 Java
【Spring Cloud Stream 消息驱动】 —— 每天一点小知识
【Spring Cloud Stream 消息驱动】 —— 每天一点小知识
|
6月前
|
消息中间件 Java Kafka
微服务技术系列教程(36) - SpringCloud-使用Kafka实现消息驱动
微服务技术系列教程(36) - SpringCloud-使用Kafka实现消息驱动
48 0
|
2月前
|
消息中间件 SpringCloudAlibaba Java
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(八)Config服务配置+bus消息总线+stream消息驱动+Sleuth链路追踪
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(八)Config服务配置+bus消息总线+stream消息驱动+Sleuth链路追踪
786 0
|
2月前
|
消息中间件 存储 中间件
【SpringCloud Stream消息驱动、设计思想以及整合rabbitmq消息队列案例--学习笔记】
【SpringCloud Stream消息驱动、设计思想以及整合rabbitmq消息队列案例--学习笔记】
51 0
|
2月前
|
消息中间件 Java Kafka
【Spring底层原理高级进阶】Spring Kafka:实时数据流处理,让业务风起云涌!️
【Spring底层原理高级进阶】Spring Kafka:实时数据流处理,让业务风起云涌!️
|
3月前
|
消息中间件 Java Kafka
玩转Kafka—Spring&Go整合Kafka
玩转Kafka—Spring&Go整合Kafka
37 0
|
3月前
|
消息中间件 Java 开发者
Spring Cloud Stream解密:流式数据在微服务中的魔力
Spring Cloud Stream解密:流式数据在微服务中的魔力
251 1