Spring Cloud Stream 整合Kafka

简介: Spring Cloud Stream是一个构建消息驱动微服务的框架,抽象了MQ的使用方式, 提供统一的API操作。

Spring Cloud Stream是一个构建消息驱动微服务的框架,抽象了MQ的使用方式, 提供统一的API操作。Spring Cloud Stream通过Binder(绑定器)inputs/outputschannel完成应用程序和MQ的解耦。Spring Cloud Stream的模型如下图:
image.png

  • Binder

负责绑定应用程序和MQ中间件,即指定应用程序是和KafKa交互还是和RabbitMQ交互或者和其他的MQ中间件交互

  • inputs/outputs channel

inputs/outputs channel抽象发布订阅消息的方式,即无论是什么类型的MQ应用程序都通过统一的方式发布订阅消息

Spring Cloud Stream主要配置

  • binder

绑定MQ中间件及配置

  • bindings

管理所有的Topic

  • des1tination
    指定发布订阅的Topic
  • contentType
    指定发布订阅消息的格式
  • group
    指定消费者组,(一条消息只能被一组消息者中的一个消息者消费)

示例

高版本的Spring Cloud Stream提供两种使用方式,一种是使用yml配置的方式绑定生产/消费者,另一种是通过Function的方式绑定生产/消费者。以下代码为使用Function的方式绑定生产/消费

  1. 引入依赖

    <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-kafka</artifactId>
                <version>3.1.0</version>
    </dependency>
  2. 配置yml

    spring:
      cloud:
        stream:
          kafka:
            binder:
              auto-create-topics: true   # 自动创建topics
              brokers: ***.****.***.***:9092
          bindings:
            logP-out-0:    # 对用在ProducersConfig中的生产函数logP
              destination: log  # logP将数据发送的topic
              contentType: application/json
            logC-in-0:    # 对用在ConsumersConfig中的生产函数logC
              destination: log
              group: log_group
            addAgeC-in-0:
              destination: addAge
              group: addAge_group
          function:
            definition: logP;logC;addAgeC  # 指定对应的函数为Spring Cloud Stream中的生产消费通道
  3. 编写生产者

    方式1

    @Configuration
    public class ProducersConfig {
    
        private BlockingQueue<Person> unbounded = new LinkedBlockingQueue<>();
        
        /**
         * 对应yml中配置的logP-out-0通道,即topic log
         * @return java.util.function.Supplier<com.example.kafka.entity.Person>
         * @Date 2020-12-27
         **/
        @Bean
        public Supplier<Person> logP(){
            return () -> unbounded.poll();
        }
    
        /**
         * 调用本方法向log topic发送消息 
         * 
         * @param person: 
         * @return void
         * @Date 2020-12-27
         **/
        public void log(Person person){
            unbounded.offer(person);
        }
        
    }

    方式2

    @RestController
    public class UserController {
    
        @Autowired
        private StreamBridge streamBridge;
    
        @PostMapping("/addAge")
        public boolean addAge(@RequestBody Person person){
    
            person.setAge(RandomUtil.randomInt(10, 90));
            person.setSuccess(RandomUtil.randomBoolean());
            person.setBirthday(new Date());
    
            // 通过streamBridge直接对应的topic发送消息
            return streamBridge.send("addAge", person);
        }
        
    }
  4. 编写消费者

    @Configuration
    public class ConsumersConfig {
    
        /**
         * 对应yml中配置的logC-in-0通道,即topic log。
         * 消费topic log中的消息
         *
         * @return java.util.function.Consumer<com.example.kafka.entity.Person>
         * @Date 2020-12-27
         **/
        @Bean
        public Consumer<Person> logC() {
    
            return person -> {
                System.out.println("Received: " + person);
            };
        }
    
        /**
         * 对应yml中配置的addAgeC-in-0通道,即topic addAge。
         * 消费topic addAge中的消息
         *
         * @return java.util.function.Consumer<com.example.kafka.entity.Person>
         * @Date 2020-12-27
         **/
        @Bean
        public Consumer<Person> addAgeC(){
    
            return person -> {
    
                person.setAge(person.getAge() + 10);
                System.out.println("Consumer addAge: " + person.toString());
            };
        }
    }
  5. 发送消息

    @RestController
    public class UserController {
    
        @Autowired
        private StreamBridge streamBridge;
    
        @Autowired
        private ProducersConfig producersConfig;
    
        @PostMapping("/log")
        public void log(@RequestBody Person person){
    
            producersConfig.log(person);
        }
    
        @PostMapping("/addAge")
        public boolean addAge(@RequestBody Person person){
    
            person.setAge(RandomUtil.randomInt(10, 90));
            person.setSuccess(RandomUtil.randomBoolean());
            person.setBirthday(new Date());
    
            System.out.println("Producer addAge: " + person.toString());
            return streamBridge.send("addAge", person);
        }
    
    }
相关文章
|
19天前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
32 3
|
28天前
|
消息中间件 Java Kafka
|
1月前
|
消息中间件 Java Kafka
|
2月前
|
负载均衡 Java Spring
Spring cloud gateway 如何在路由时进行负载均衡
Spring cloud gateway 如何在路由时进行负载均衡
279 15
|
17天前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
46 0
|
1月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
|
2月前
|
消息中间件 Java Kafka
spring boot 整合kafka
spring boot 整合kafka
31 8
|
23天前
|
消息中间件 Java 开发工具
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
|
2月前
|
Java Spring
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
58 3
|
1月前
|
消息中间件 Java Kafka
SpringBoot Kafka SSL接入点PLAIN机制收发消息
SpringBoot Kafka SSL接入点PLAIN机制收发消息
27 0