SpringCloud学习之SpringCloudStream&集成kafka

简介: 一、关于Spring-Cloud-Stream   Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。

一、关于Spring-Cloud-Stream

  Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。

  在这里我先放一张官网的图:

SCSt与粘合剂

  应用程序通过Spring Cloud Stream注入到输入和输出通道与外界进行通信。根据此规则我们很容易的实现消息传递,订阅消息与消息中转。并且当需要切换消息中间件时,几乎不需要修改代码,只需要变更配置就行了。

  在用例图中 Inputs代表了应用程序监听消息 、outputs代表发送消息、binder的话大家可以理解为将应用程序与消息中间件隔离的抽象,类似于三层架构下利用dao屏蔽service与数据库的实现的原理。

  springcloud默认提供了rabbitmq与kafka的实现。

 

二、springcloud集成kafka

1、添加gradle依赖:

dependencies{
    compile('org.springframework.cloud:spring-cloud-stream')
    compile('org.springframework.cloud:spring-cloud-stream-binder-kafka')
    compile('org.springframework.kafka:spring-kafka')
}
View Code

2、定义一个接口:

  spring-cloud-stream已经给我们定义了最基本的输入与输出接口,他们分别是 Source,Sink, Processor

  Sink接口:

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}
View Code

  Source接口:

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}
View Code

  Processor接口:

package org.springframework.cloud.stream.messaging;

public interface Processor extends Source, Sink {
}
View Code

  这里面Processor这个接口既定义输入通道又定义了输出通道。同时我们也可以自己定义通道接口,代码如下:

package com.bdqn.lyrk.shop.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface ShopChannel {

    /**
     * 发消息的通道名称
     */
    String SHOP_OUTPUT = "shop_output";

    /**
     * 消息的订阅通道名称
     */
    String SHOP_INPUT = "shop_input";

    /**
     * 发消息的通道
     *
     * @return
     */
    @Output(SHOP_OUTPUT)
    MessageChannel sendShopMessage();

    /**
     * 收消息的通道
     *
     * @return
     */
    @Input(SHOP_INPUT)
    SubscribableChannel recieveShopMessage();


}
View Code

 

3、定义服务类

package com.bdqn.lyrk.shop.server;

import com.bdqn.lyrk.shop.channel.ShopChannel;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class ShopService {

    @Resource(name = ShopChannel.SHOP_OUTPUT)
    private MessageChannel sendShopMessageChannel;

    @GetMapping("/sendMsg")
    public String sendShopMessage(String content) {
        boolean isSendSuccess = sendShopMessageChannel.
                send(MessageBuilder.withPayload(content).build());
        return isSendSuccess ? "发送成功" : "发送失败";
    }

    @StreamListener(ShopChannel.SHOP_INPUT)
    public void receive(Message<String> message) {
        System.out.println(message.getPayload());
    }
}
View Code

  这里面大家注意 @StreamListener。这个注解可以监听输入通道里的消息内容,注解里面的属性指定我们刚才定义的输入通道名称,而MessageChannel则可以通过

输出通道发送消息。使用@Resource注入时需要指定我们刚才定义的输出通道名称

 

4、定义启动类

package com.bdqn.lyrk.shop;

import com.bdqn.lyrk.shop.channel.ShopChannel;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding(ShopChannel.class)
public class ShopServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ShopServerApplication.class, args);
    }
}
View Code

  注意@EnableBinding注解,这个注解指定刚才我们定义消息通道的接口名称,当然这里也可以传多个相关的接口

5、定义application.yml文件

spring:
  application:
    name: shop-server
  cloud:
    stream:
      bindings:
        #配置自己定义的通道与哪个中间件交互
        shop_input: #ShopChannel里Input和Output的值
          destination: zhibo #目标主题
        shop_output:
          destination: zhibo
      default-binder: kafka #默认的binder是kafka
  kafka:
    bootstrap-servers: localhost:9092 #kafka服务地址
    consumer:
      group-id: consumer1
    producer:
      key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      client-id: producer1
server:
  port: 8100
View Code

  这里是重头戏,我们必须指定所有通道对应的消息主题,同时指定默认的binder为kafka,紧接着定义Spring-kafka的外部化配置,在这里指定producer的序列化类为ByteArraySerializer

 

启动程序成功后,我们访问 http://localhost:8100/sendMsg?content=2 即可得到如下结果

 

目录
相关文章
|
6月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
648 0
|
7月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
589 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
7月前
|
消息中间件 Java Kafka
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
122 5
|
10月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
545 5
|
10月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
313 1
|
11月前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
213 1
|
2月前
|
Java Spring 容器
SpringBoot自动配置的原理是什么?
Spring Boot自动配置核心在于@EnableAutoConfiguration注解,它通过@Import导入配置选择器,加载META-INF/spring.factories中定义的自动配置类。这些类根据@Conditional系列注解判断是否生效。但Spring Boot 3.0后已弃用spring.factories,改用新格式的.imports文件进行配置。
734 0
|
6月前
|
前端开发 Java 数据库
微服务——SpringBoot使用归纳——Spring Boot集成Thymeleaf模板引擎——Thymeleaf 介绍
本课介绍Spring Boot集成Thymeleaf模板引擎。Thymeleaf是一款现代服务器端Java模板引擎,支持Web和独立环境,可实现自然模板开发,便于团队协作。与传统JSP不同,Thymeleaf模板可以直接在浏览器中打开,方便前端人员查看静态原型。通过在HTML标签中添加扩展属性(如`th:text`),Thymeleaf能够在服务运行时动态替换内容,展示数据库中的数据,同时兼容静态页面展示,为开发带来灵活性和便利性。
313 0
|
2月前
|
缓存 JSON 前端开发
第07课:Spring Boot集成Thymeleaf模板引擎
第07课:Spring Boot集成Thymeleaf模板引擎
386 0
第07课:Spring Boot集成Thymeleaf模板引擎
|
6月前
|
XML Java 数据库连接
微服务——SpringBoot使用归纳——Spring Boot集成MyBatis——基于 xml 的整合
本教程介绍了基于XML的MyBatis整合方式。首先在`application.yml`中配置XML路径,如`classpath:mapper/*.xml`,然后创建`UserMapper.xml`文件定义SQL映射,包括`resultMap`和查询语句。通过设置`namespace`关联Mapper接口,实现如`getUserByName`的方法。Controller层调用Service完成测试,访问`/getUserByName/{name}`即可返回用户信息。为简化Mapper扫描,推荐在Spring Boot启动类用`@MapperScan`注解指定包路径避免逐个添加`@Mapper`
296 0

热门文章

最新文章