Spring Cloud Stream是一个构建消息驱动微服务的框架,抽象了MQ的使用方式, 提供统一的API操作。Spring Cloud Stream通过Binder(绑定器)
、inputs/outputs
channel完成应用程序和MQ的解耦。Spring Cloud Stream的模型如下图:
- Binder
负责绑定应用程序和MQ中间件,即指定应用程序是和KafKa交互还是和RabbitMQ交互或者和其他的MQ中间件交互
- inputs/outputs channel
inputs/outputs channel抽象发布订阅消息的方式,即无论是什么类型的MQ应用程序都通过统一的方式发布订阅消息
Spring Cloud Stream主要配置
binder
绑定MQ中间件及配置
bindings
管理所有的Topic
des1tination
指定发布订阅的TopiccontentType
指定发布订阅消息的格式group
指定消费者组,(一条消息只能被一组消息者中的一个消息者消费)
示例
高版本的Spring Cloud Stream提供两种使用方式,一种是使用yml配置的方式绑定生产/消费
者,另一种是通过Function
的方式绑定生产/消费
者。以下代码为使用Function
的方式绑定生产/消费
者
引入依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> <version>3.1.0</version> </dependency>
配置yml
spring: cloud: stream: kafka: binder: auto-create-topics: true # 自动创建topics brokers: ***.****.***.***:9092 bindings: logP-out-0: # 对用在ProducersConfig中的生产函数logP destination: log # logP将数据发送的topic contentType: application/json logC-in-0: # 对用在ConsumersConfig中的生产函数logC destination: log group: log_group addAgeC-in-0: destination: addAge group: addAge_group function: definition: logP;logC;addAgeC # 指定对应的函数为Spring Cloud Stream中的生产消费通道
编写生产者
方式1
@Configuration public class ProducersConfig { private BlockingQueue<Person> unbounded = new LinkedBlockingQueue<>(); /** * 对应yml中配置的logP-out-0通道,即topic log * @return java.util.function.Supplier<com.example.kafka.entity.Person> * @Date 2020-12-27 **/ @Bean public Supplier<Person> logP(){ return () -> unbounded.poll(); } /** * 调用本方法向log topic发送消息 * * @param person: * @return void * @Date 2020-12-27 **/ public void log(Person person){ unbounded.offer(person); } }
方式2
@RestController public class UserController { @Autowired private StreamBridge streamBridge; @PostMapping("/addAge") public boolean addAge(@RequestBody Person person){ person.setAge(RandomUtil.randomInt(10, 90)); person.setSuccess(RandomUtil.randomBoolean()); person.setBirthday(new Date()); // 通过streamBridge直接对应的topic发送消息 return streamBridge.send("addAge", person); } }
编写消费者
@Configuration public class ConsumersConfig { /** * 对应yml中配置的logC-in-0通道,即topic log。 * 消费topic log中的消息 * * @return java.util.function.Consumer<com.example.kafka.entity.Person> * @Date 2020-12-27 **/ @Bean public Consumer<Person> logC() { return person -> { System.out.println("Received: " + person); }; } /** * 对应yml中配置的addAgeC-in-0通道,即topic addAge。 * 消费topic addAge中的消息 * * @return java.util.function.Consumer<com.example.kafka.entity.Person> * @Date 2020-12-27 **/ @Bean public Consumer<Person> addAgeC(){ return person -> { person.setAge(person.getAge() + 10); System.out.println("Consumer addAge: " + person.toString()); }; } }
发送消息
@RestController public class UserController { @Autowired private StreamBridge streamBridge; @Autowired private ProducersConfig producersConfig; @PostMapping("/log") public void log(@RequestBody Person person){ producersConfig.log(person); } @PostMapping("/addAge") public boolean addAge(@RequestBody Person person){ person.setAge(RandomUtil.randomInt(10, 90)); person.setSuccess(RandomUtil.randomBoolean()); person.setBirthday(new Date()); System.out.println("Producer addAge: " + person.toString()); return streamBridge.send("addAge", person); } }