SpringCloud学习笔记(五)-SpringCloudStream集成kafka(下)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Spring Cloud Stream是构建消息驱动的微服务应用程序框架。提供统一的接收发送管道以连接到消息代理。通过@EnableBinding注解开启SpringCloudStream的支持。通过@StreamListener注解,使其接收流处理的时间。

五、发送消息到输出通道


/**
 * kafka消息发送器
 * @author dbq
 * @date 2019/9/26 17:50
 */
@Component
public class EsKafkaMessageSender {
    @Autowired
    private EsChannel channel;
    /**
     * 消息发送到默认通道:缺省通道对应缺省主题
     * @param message
     */
    public void sendToDefaultChannel(String message){
        channel.sendEsDefaultMessage().send(MessageBuilder.withPayload(message).build());
    }
    /**
     * 消息发送到告警通道:告警通道对应告警主题
     * @param message
     */
    public void sendToAlarmChannel(String message){
        channel.sendEsAlarmMessage().send(MessageBuilder.withPayload(message).build());
    }
}


注入先前定义的通道EsChannel,sendToDefaultChannel、sendToAlarmChannel分别为我们自定义的两个发送方法,可将消息发送到不同的通道中,每个通道对应一个kafka的主题。


六、从输入通道订阅消息


@EnableBinding(value = EsChannel.class)
public class EsStreamListener {
    /**
     * 从缺省通道接收消息
     * @param message
     */
    @StreamListener(EsChannel.ES_DEFAULT_INPUT)
    public void receive(Message<String> message){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        System.out.println(sdf.format(new Date())+"------start--------安全用电默认消息:" + message);
        try {
            Thread.sleep(1000*10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(sdf.format(new Date())+"------end--------安全用电默认消息");
    }
    /**
     * 从告警通道接收消息
     * @param message
     */
    @StreamListener(EsChannel.ES_ALARM_INPUT)
    public void receiveAlarm(Message<String> message){
        System.out.println("订阅告警消息:" + message);
    }
}


从不同的通道实现消息的订阅。


七、这样完整的消息系统就搭建好了,定义Controller发送消息测试


@ApiOperation(value = "test1", httpMethod = "POST")
    @PostMapping(value = "/test1", produces = "application/json;charset=UTF-8")
    public void test1(String message, HttpServletRequest request,
                             HttpServletResponse response) {
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
    }
    @ApiOperation(value = "test", httpMethod = "POST")
    @PostMapping(value = "/test2", produces = "application/json;charset=UTF-8")
    public void test2(String message, HttpServletRequest request,
                      HttpServletResponse response) {
        sender.sendToAlarmChannel(message);
    }


test1:发送消息的缺省消息通道


test2:发送消息到告警消息通道


八、并发性测试


如七中所示,一次发送4条消息到缺省消息通道中,并启动两个实例(即两个微服务组成一个小型集群),在并发性配置为1的情况下,即spring.cloud.stream.bindings.es_default_input.consumer.concurrency=1


实例1

2019-09-30 11:13:14------start--------默认消息...
2019-09-30 11:13:24------end--------默认消息


实例2

2019-09-30 11:13:14------start--------默认消息:...
2019-09-30 11:13:24------end--------默认消息
2019-09-30 11:13:24------start--------默认消息:...
2019-09-30 11:13:34------end--------默认消息
2019-09-30 11:13:34------start--------默认消息:...
2019-09-30 11:13:44------end--------默认消息


通过打印日志(日志做了简化处理)可以看出,两个实例之间是做到了并发消费,但是在1个实例内部,并没有并发消费。


如果将concurrency修改为2.


日志如下


实例1

2019-09-30 11:31:13------start--------:...
2019-09-30 11:31:13------start--------默认消息:...
2019-09-30 11:31:23------end--------默认消息
2019-09-30 11:31:23------end--------默认消息
2019-09-30 11:31:23------start--------默认消息:...
2019-09-30 11:31:33------end--------默认消息


实例2

2019-09-30 11:31:13------start--------默认消息:...
2019-09-30 11:31:23------end--------


从日志可以看出,实例1中实现了两个线程的并发消费。


相关文章
|
3月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
376 0
|
4月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
471 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
8月前
|
存储 数据可视化 Java
基于MicrometerTracing门面和Zipkin实现集成springcloud2023的服务追踪
Sleuth将会停止维护,Sleuth最新版本也只支持springboot2。作为替代可以使用MicrometerTracing在微服务中作为服务追踪的工具。
324 2
|
4月前
|
Cloud Native Java Nacos
springcloud/springboot集成NACOS 做注册和配置中心以及nacos源码分析
通过本文,我们详细介绍了如何在 Spring Cloud 和 Spring Boot 中集成 Nacos 进行服务注册和配置管理,并对 Nacos 的源码进行了初步分析。Nacos 作为一个强大的服务注册和配置管理平台,为微服务架构提供
1058 14
|
4月前
|
消息中间件 Java Kafka
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
|
6月前
|
存储 JavaScript 开发工具
基于HarmonyOS 5.0(NEXT)与SpringCloud架构的跨平台应用开发与服务集成研究【实战】
本次的.HarmonyOS Next ,ArkTS语言,HarmonyOS的元服务和DevEco Studio 开发工具,为开发者提供了构建现代化、轻量化、高性能应用的便捷方式。这些技术和工具将帮助开发者更好地适应未来的智能设备和服务提供方式。
169 8
基于HarmonyOS 5.0(NEXT)与SpringCloud架构的跨平台应用开发与服务集成研究【实战】
|
7月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
381 5
|
7月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
168 1
|
3月前
|
负载均衡 Dubbo Java
Spring Cloud Alibaba与Spring Cloud区别和联系?
Spring Cloud Alibaba与Spring Cloud区别和联系?
|
4月前
|
人工智能 SpringCloudAlibaba 自然语言处理
SpringCloud Alibaba AI整合DeepSeek落地AI项目实战
在现代软件开发领域,微服务架构因其灵活性、可扩展性和模块化特性而受到广泛欢迎。微服务架构通过将大型应用程序拆分为多个小型、独立的服务,每个服务运行在其独立的进程中,服务与服务间通过轻量级通信机制(通常是HTTP API)进行通信。这种架构模式有助于提升系统的可维护性、可扩展性和开发效率。
1289 1