Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

简介: Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

应用场景

有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的:

@StreamListener(value = TestTopic.INPUT)
public void receiveV1(String payload, @Header("version") String version) {
    if("1.0".equals(version)) {
        // Version 1.0
    }
    if("2.0".equals(version)) {
        // Version 2.0
    }
}

那么当消息处理逻辑复杂的时候,这段逻辑就会变得特别复杂。针对这个问题,在@StreamListener注解中提供了一个不错的属性condition,可以用来优化这样的处理结构。

动手试试

下面通过编写一个简单的例子来具体体会一下这个属性的用法:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {
    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }
    @RestController
    static class TestController {
        @Autowired
        private TestTopic testTopic;
        /**
         * 消息生产接口
         *
         * @param message
         * @return
         */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build());
            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build());
            return "ok";
        }
    }
    /**
     * 消息消费逻辑
     */
    @Slf4j
    @Component
    static class TestListener {
        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")
        public void receiveV1(String payload, @Header("version") String version) {
            log.info("Received v1 : " + payload + ", " + version);
        }
        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")
        public void receiveV2(String payload, @Header("version") String version) {
            log.info("Received v2 : " + payload + ", " + version);
        }
    }
    interface TestTopic {
        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";
        @Output(OUTPUT)
        MessageChannel output();
        @Input(INPUT)
        SubscribableChannel input();
    }
}

内容很简单,既包含了消息的生产,也包含了消息消费。在/sendMessage接口的定义中,发送了两条消息,一条消息的头信息中包含version=1.0,另外一条消息的头信息中包含version=2.0。在消息监听类TestListener中,对TestTopic.INPUT通道定义了两个@StreamListener,这两个监听逻辑有不同的condition,这里的表达式表示会根据消息头信息中的version值来做不同的处理逻辑分发。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-content-route
spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

2018-12-24 15:50:33.361  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v1 : hello, 1.0
2018-12-24 15:50:33.363  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v2 : hello, 2.0

从日志中可以看到,两条带有不同头信息的消息,分别通过不同的监听处理逻辑输出了对应的日志打印。

代码示例

本文示例读者可以通过查看下面仓库的中的stream-content-route项目:

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!

以下专题教程也许您会有兴趣

目录
相关文章
|
9天前
|
消息中间件 Java 数据安全/隐私保护
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
|
10天前
|
负载均衡 监控 Java
我把Spring Cloud的超详细资料介绍给你,面试官不会生气吧?geigei
我把Spring Cloud的超详细资料介绍给你,面试官不会生气吧?geigei
|
10天前
|
负载均衡 Java 应用服务中间件
Spring Cloud 负载平衡的意义什么?
负载平衡是指将网络流量在多个服务器之间分布,以达到提高系统性能、增强可靠性和提供更好用户体验的目的。在负载平衡的架构中,多个服务器被组织成一个集群,共同处理用户的请求。
37 4
|
11天前
|
监控 安全 Java
Spring cloud原理详解
Spring cloud原理详解
25 0
|
11天前
|
消息中间件 负载均衡 Java
【Spring Cloud 初探幽】
【Spring Cloud 初探幽】
21 1
|
11天前
|
安全 Java Docker
|
11天前
|
Java 开发者 微服务
Spring Cloud原理详解
【5月更文挑战第4天】Spring Cloud是Spring生态系统中的微服务框架,包含配置管理、服务发现、断路器、API网关等工具,简化分布式系统开发。核心组件如Eureka(服务发现)、Config Server(配置中心)、Ribbon(负载均衡)、Hystrix(断路器)、Zuul(API网关)等。本文讨论了Spring Cloud的基本概念、核心组件、常见问题及解决策略,并提供代码示例,帮助开发者更好地理解和实践微服务架构。此外,还涵盖了服务通信方式、安全性、性能优化、自动化部署、服务网格和无服务器架构的融合等话题,揭示了微服务架构的未来趋势。
40 6
|
11天前
|
JSON Java Apache
Spring Cloud Feign 使用Apache的HTTP Client替换Feign原生httpclient
Spring Cloud Feign 使用Apache的HTTP Client替换Feign原生httpclient
|
11天前
|
负载均衡 Java 开发者
Spring Cloud:一文读懂其原理与架构
Spring Cloud 是一套微服务解决方案,它整合了Netflix公司的多个开源框架,简化了分布式系统开发。Spring Cloud 提供了服务注册与发现、配置中心、消息总线、负载均衡、熔断机制等工具,让开发者可以快速地构建一些常见的微服务架构。
|
11天前
|
消息中间件 Java RocketMQ
Spring Cloud RocketMQ:构建可靠消息驱动的微服务架构
【4月更文挑战第28天】消息队列在微服务架构中扮演着至关重要的角色,能够实现服务之间的解耦、异步通信以及数据分发。Spring Cloud RocketMQ作为Apache RocketMQ的Spring Cloud集成,为微服务架构提供了可靠的消息传输机制。
30 1