SpringCloud学习笔记(五)-SpringCloudStream集成kafka(上)

简介: Spring Cloud Stream是构建消息驱动的微服务应用程序框架。提供统一的接收发送管道以连接到消息代理。通过@EnableBinding注解开启SpringCloudStream的支持。通过@StreamListener注解,使其接收流处理的时间。

Spring Cloud Stream是构建消息驱动的微服务应用程序框架。提供统一的接收发送管道以连接到消息代理。通过@EnableBinding注解开启SpringCloudStream的支持。通过@StreamListener注解,使其接收流处理的时间。


image.png

SpringCloudStream应用模型


一、引入依赖包


<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>


二、自定义信息通道


官方提供了Sink(输入通道)、Source(输出通道)、Processor(集成Sink和Source通道),我们也可以自定义我们自己的信息通道。


@Input注解标识一个输入通道


@Output注解标识一个输出通道


通道名称作为参数,如果未提供参数,默认使用方法名称作为通道名称。


如下我们自定义信息通道EsChannel

/**
 * 自定义信息通道
 * @author dbq
 * @date 2019/9/26 14:54
 */
public interface EsChannel {
    /**
     * 缺省发送消息通道名称
     */
    String ES_DEFAULT_OUTPUT = "es_default_output";
    /**
     * 缺省接收消息通道名称
     */
    String ES_DEFAULT_INPUT = "es_default_input";
    /**
     * 告警发送消息通道名称
     */
    String ES_ALARM_OUTPUT = "es_alarm_output";
    /**
     * 告警接收消息通道名称
     */
    String ES_ALARM_INPUT = "es_alarm_input";
    /**
     * 缺省发送消息通道
     * @return channel 返回缺省信息发送通道
     */
    @Output(ES_DEFAULT_OUTPUT)
    MessageChannel sendEsDefaultMessage();
    /**
     * 告警发送消息通道
     * @return channel 返回告警信息发送通道
     */
    @Output(ES_ALARM_OUTPUT)
    MessageChannel sendEsAlarmMessage();
    /**
     * 缺省接收消息通道
     * @return channel 返回缺省信息接收通道
     */
    @Input(ES_DEFAULT_INPUT)
    MessageChannel recieveEsDefaultMessage();
    /**
     * 告警接收消息通道
     * @return channel 返回告警信息接收通道
     */
    @Input(ES_ALARM_INPUT)
    MessageChannel recieveEsAlarmMessage();
}


三、@EnableBinding使应用程序连接到消息代理


@EnableDiscoveryClient
@SpringBootApplication
@EnableFeignClients
@EnableHystrix
@MapperScan(basePackages = "com.es.mapper")
@EnableBinding(EsChannel.class)
public class EsOnenetApplication {
    public static void main(String[] args) {
        SpringApplication.run(EsOnenetApplication.class, args);
    }
}


四、SpringCloudStream及kafka配置


#==============================================================
#spring-cloud-stream-Kafka配置 开始
#==============================================================
#是否开启kafka(非spring-cloud-stream配置)
spring.kafka.enabled=false
#缺省的输入、输出通道
spring.cloud.stream.bindings.es_default_input.destination=es_default_topic
spring.cloud.stream.bindings.es_default_input.binder=kafka
spring.cloud.stream.bindings.es_default_input.group=es_default_group
spring.cloud.stream.bindings.es_default_output.destination=es_default_topic
spring.cloud.stream.bindings.es_default_output.binder=kafka
#入站消费者的并发性
spring.cloud.stream.bindings.es_default_input.consumer.concurrency=2
#告警的输入、输出通道(多主题、分组测试用,实际开发中根据业务需求定义)
spring.cloud.stream.bindings.es_alarm_input.destination=es_alarm_topic
spring.cloud.stream.bindings.es_alarm_input.binder=kafka
spring.cloud.stream.bindings.es_alarm_input.group=es_alarm_group
spring.cloud.stream.bindings.es_alarm_output.destination=es_alarm_topic
spring.cloud.stream.bindings.es_alarm_output.binder=kafka
#kafka配置
spring.cloud.stream.kafka.binder.brokers=172.*.*.6:9092,172.*.*.7:9092,172.*.*.8:9092
spring.cloud.stream.kafka.binder.zkNodes=172.*.*.6:2181,172.*.*.7:2181,172.*.*.8:2181
spring.cloud.stream.kafka.binder.requiredAcks=1
#==============================================================
#spring-cloud-stream-Kafka配置 结束
#==============================================================


从上面配置可以看出


1、定义了通道名称及分组,binder代表绑定实现的标识名称(如kafka或者rabbit),与3中的定义名称相对应。


2、定义了入站消费者的并发性,指在一个实例内的并发性,不同实例之间本身就是并发的,默认值为1


spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2


3、定义了kafka连接信息


如果未配置autoCommitOffset,默认自动提交偏移量


详细参数配置可参考官网

相关文章
|
9月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
1012 0
|
10月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
811 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
10月前
|
Cloud Native Java Nacos
springcloud/springboot集成NACOS 做注册和配置中心以及nacos源码分析
通过本文,我们详细介绍了如何在 Spring Cloud 和 Spring Boot 中集成 Nacos 进行服务注册和配置管理,并对 Nacos 的源码进行了初步分析。Nacos 作为一个强大的服务注册和配置管理平台,为微服务架构提供
4230 14
|
12月前
|
存储 JavaScript 开发工具
基于HarmonyOS 5.0(NEXT)与SpringCloud架构的跨平台应用开发与服务集成研究【实战】
本次的.HarmonyOS Next ,ArkTS语言,HarmonyOS的元服务和DevEco Studio 开发工具,为开发者提供了构建现代化、轻量化、高性能应用的便捷方式。这些技术和工具将帮助开发者更好地适应未来的智能设备和服务提供方式。
基于HarmonyOS 5.0(NEXT)与SpringCloud架构的跨平台应用开发与服务集成研究【实战】
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
653 5
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
508 1
|
11月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
491 1
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
370 1
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
1297 9

热门文章

最新文章