SpringCloud学习笔记(五)-SpringCloudStream集成kafka(上)

简介: Spring Cloud Stream是构建消息驱动的微服务应用程序框架。提供统一的接收发送管道以连接到消息代理。通过@EnableBinding注解开启SpringCloudStream的支持。通过@StreamListener注解,使其接收流处理的时间。

Spring Cloud Stream是构建消息驱动的微服务应用程序框架。提供统一的接收发送管道以连接到消息代理。通过@EnableBinding注解开启SpringCloudStream的支持。通过@StreamListener注解,使其接收流处理的时间。


image.png

SpringCloudStream应用模型


一、引入依赖包


<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>


二、自定义信息通道


官方提供了Sink(输入通道)、Source(输出通道)、Processor(集成Sink和Source通道),我们也可以自定义我们自己的信息通道。


@Input注解标识一个输入通道


@Output注解标识一个输出通道


通道名称作为参数,如果未提供参数,默认使用方法名称作为通道名称。


如下我们自定义信息通道EsChannel

/**
 * 自定义信息通道
 * @author dbq
 * @date 2019/9/26 14:54
 */
public interface EsChannel {
    /**
     * 缺省发送消息通道名称
     */
    String ES_DEFAULT_OUTPUT = "es_default_output";
    /**
     * 缺省接收消息通道名称
     */
    String ES_DEFAULT_INPUT = "es_default_input";
    /**
     * 告警发送消息通道名称
     */
    String ES_ALARM_OUTPUT = "es_alarm_output";
    /**
     * 告警接收消息通道名称
     */
    String ES_ALARM_INPUT = "es_alarm_input";
    /**
     * 缺省发送消息通道
     * @return channel 返回缺省信息发送通道
     */
    @Output(ES_DEFAULT_OUTPUT)
    MessageChannel sendEsDefaultMessage();
    /**
     * 告警发送消息通道
     * @return channel 返回告警信息发送通道
     */
    @Output(ES_ALARM_OUTPUT)
    MessageChannel sendEsAlarmMessage();
    /**
     * 缺省接收消息通道
     * @return channel 返回缺省信息接收通道
     */
    @Input(ES_DEFAULT_INPUT)
    MessageChannel recieveEsDefaultMessage();
    /**
     * 告警接收消息通道
     * @return channel 返回告警信息接收通道
     */
    @Input(ES_ALARM_INPUT)
    MessageChannel recieveEsAlarmMessage();
}


三、@EnableBinding使应用程序连接到消息代理


@EnableDiscoveryClient
@SpringBootApplication
@EnableFeignClients
@EnableHystrix
@MapperScan(basePackages = "com.es.mapper")
@EnableBinding(EsChannel.class)
public class EsOnenetApplication {
    public static void main(String[] args) {
        SpringApplication.run(EsOnenetApplication.class, args);
    }
}


四、SpringCloudStream及kafka配置


#==============================================================
#spring-cloud-stream-Kafka配置 开始
#==============================================================
#是否开启kafka(非spring-cloud-stream配置)
spring.kafka.enabled=false
#缺省的输入、输出通道
spring.cloud.stream.bindings.es_default_input.destination=es_default_topic
spring.cloud.stream.bindings.es_default_input.binder=kafka
spring.cloud.stream.bindings.es_default_input.group=es_default_group
spring.cloud.stream.bindings.es_default_output.destination=es_default_topic
spring.cloud.stream.bindings.es_default_output.binder=kafka
#入站消费者的并发性
spring.cloud.stream.bindings.es_default_input.consumer.concurrency=2
#告警的输入、输出通道(多主题、分组测试用,实际开发中根据业务需求定义)
spring.cloud.stream.bindings.es_alarm_input.destination=es_alarm_topic
spring.cloud.stream.bindings.es_alarm_input.binder=kafka
spring.cloud.stream.bindings.es_alarm_input.group=es_alarm_group
spring.cloud.stream.bindings.es_alarm_output.destination=es_alarm_topic
spring.cloud.stream.bindings.es_alarm_output.binder=kafka
#kafka配置
spring.cloud.stream.kafka.binder.brokers=172.*.*.6:9092,172.*.*.7:9092,172.*.*.8:9092
spring.cloud.stream.kafka.binder.zkNodes=172.*.*.6:2181,172.*.*.7:2181,172.*.*.8:2181
spring.cloud.stream.kafka.binder.requiredAcks=1
#==============================================================
#spring-cloud-stream-Kafka配置 结束
#==============================================================


从上面配置可以看出


1、定义了通道名称及分组,binder代表绑定实现的标识名称(如kafka或者rabbit),与3中的定义名称相对应。


2、定义了入站消费者的并发性,指在一个实例内的并发性,不同实例之间本身就是并发的,默认值为1


spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2


3、定义了kafka连接信息


如果未配置autoCommitOffset,默认自动提交偏移量


详细参数配置可参考官网

相关文章
|
12天前
|
消息中间件 Java Kafka
Springboot集成高低版本kafka
Springboot集成高低版本kafka
|
12天前
|
SpringCloudAlibaba Java 网络架构
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(二)Rest微服务工程搭建
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(二)Rest微服务工程搭建
69 0
|
12天前
|
SpringCloudAlibaba Java 网络架构
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
136 0
|
12天前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
78 2
|
12天前
|
SpringCloudAlibaba 负载均衡 Java
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(目录大纲)
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(目录大纲)
77 1
|
12天前
|
Java Nacos Sentinel
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(九)Nacos+Sentinel+Seata
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(九)Nacos+Sentinel+Seata
270 0
|
12天前
|
消息中间件 SpringCloudAlibaba Java
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(八)Config服务配置+bus消息总线+stream消息驱动+Sleuth链路追踪
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(八)Config服务配置+bus消息总线+stream消息驱动+Sleuth链路追踪
804 0
|
12天前
|
SpringCloudAlibaba Java 测试技术
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(六)Hystrix(豪猪哥)的使用
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(六)Hystrix(豪猪哥)的使用
51 1
|
12天前
|
SpringCloudAlibaba 负载均衡 Java
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(五)OpenFeign的使用
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(五)OpenFeign的使用
53 0
|
12天前
|
负载均衡 算法 Java
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(四)Ribbon的使用
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(四)Ribbon的使用
31 0

热门文章

最新文章