SpringCloud学习之SpringCloudStream&集成kafka

简介: 一、关于Spring-Cloud-Stream   Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。

一、关于Spring-Cloud-Stream

  Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。

  在这里我先放一张官网的图:

SCSt与粘合剂

  应用程序通过Spring Cloud Stream注入到输入和输出通道与外界进行通信。根据此规则我们很容易的实现消息传递,订阅消息与消息中转。并且当需要切换消息中间件时,几乎不需要修改代码,只需要变更配置就行了。

  在用例图中 Inputs代表了应用程序监听消息 、outputs代表发送消息、binder的话大家可以理解为将应用程序与消息中间件隔离的抽象,类似于三层架构下利用dao屏蔽service与数据库的实现的原理。

  springcloud默认提供了rabbitmq与kafka的实现。

 

二、springcloud集成kafka

1、添加gradle依赖:

dependencies{
    compile('org.springframework.cloud:spring-cloud-stream')
    compile('org.springframework.cloud:spring-cloud-stream-binder-kafka')
    compile('org.springframework.kafka:spring-kafka')
}
View Code

2、定义一个接口:

  spring-cloud-stream已经给我们定义了最基本的输入与输出接口,他们分别是 Source,Sink, Processor

  Sink接口:

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}
View Code

  Source接口:

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}
View Code

  Processor接口:

package org.springframework.cloud.stream.messaging;

public interface Processor extends Source, Sink {
}
View Code

  这里面Processor这个接口既定义输入通道又定义了输出通道。同时我们也可以自己定义通道接口,代码如下:

package com.bdqn.lyrk.shop.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface ShopChannel {

    /**
     * 发消息的通道名称
     */
    String SHOP_OUTPUT = "shop_output";

    /**
     * 消息的订阅通道名称
     */
    String SHOP_INPUT = "shop_input";

    /**
     * 发消息的通道
     *
     * @return
     */
    @Output(SHOP_OUTPUT)
    MessageChannel sendShopMessage();

    /**
     * 收消息的通道
     *
     * @return
     */
    @Input(SHOP_INPUT)
    SubscribableChannel recieveShopMessage();


}
View Code

 

3、定义服务类

package com.bdqn.lyrk.shop.server;

import com.bdqn.lyrk.shop.channel.ShopChannel;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class ShopService {

    @Resource(name = ShopChannel.SHOP_OUTPUT)
    private MessageChannel sendShopMessageChannel;

    @GetMapping("/sendMsg")
    public String sendShopMessage(String content) {
        boolean isSendSuccess = sendShopMessageChannel.
                send(MessageBuilder.withPayload(content).build());
        return isSendSuccess ? "发送成功" : "发送失败";
    }

    @StreamListener(ShopChannel.SHOP_INPUT)
    public void receive(Message<String> message) {
        System.out.println(message.getPayload());
    }
}
View Code

  这里面大家注意 @StreamListener。这个注解可以监听输入通道里的消息内容,注解里面的属性指定我们刚才定义的输入通道名称,而MessageChannel则可以通过

输出通道发送消息。使用@Resource注入时需要指定我们刚才定义的输出通道名称

 

4、定义启动类

package com.bdqn.lyrk.shop;

import com.bdqn.lyrk.shop.channel.ShopChannel;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding(ShopChannel.class)
public class ShopServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ShopServerApplication.class, args);
    }
}
View Code

  注意@EnableBinding注解,这个注解指定刚才我们定义消息通道的接口名称,当然这里也可以传多个相关的接口

5、定义application.yml文件

spring:
  application:
    name: shop-server
  cloud:
    stream:
      bindings:
        #配置自己定义的通道与哪个中间件交互
        shop_input: #ShopChannel里Input和Output的值
          destination: zhibo #目标主题
        shop_output:
          destination: zhibo
      default-binder: kafka #默认的binder是kafka
  kafka:
    bootstrap-servers: localhost:9092 #kafka服务地址
    consumer:
      group-id: consumer1
    producer:
      key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      client-id: producer1
server:
  port: 8100
View Code

  这里是重头戏,我们必须指定所有通道对应的消息主题,同时指定默认的binder为kafka,紧接着定义Spring-kafka的外部化配置,在这里指定producer的序列化类为ByteArraySerializer

 

启动程序成功后,我们访问 http://localhost:8100/sendMsg?content=2 即可得到如下结果

 

目录
相关文章
|
1月前
|
测试技术
软件质量保护与测试(第2版)学习总结第十三章 集成测试
本文是《软件质量保护与测试》(第2版)第十三章的学习总结,介绍了集成测试的概念、主要任务、测试层次与原则,以及集成测试的不同策略,包括非渐增式集成和渐增式集成(自顶向下和自底向上),并通过图示详细解释了集成测试的过程。
61 1
软件质量保护与测试(第2版)学习总结第十三章 集成测试
|
1月前
|
前端开发 Java 程序员
springboot 学习十五:Spring Boot 优雅的集成Swagger2、Knife4j
这篇文章是关于如何在Spring Boot项目中集成Swagger2和Knife4j来生成和美化API接口文档的详细教程。
109 1
|
1月前
|
消息中间件 大数据 Kafka
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
31 2
|
1月前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
36 1
|
1月前
|
Java Spring
springboot 学习十一:Spring Boot 优雅的集成 Lombok
这篇文章是关于如何在Spring Boot项目中集成Lombok,以简化JavaBean的编写,避免冗余代码,并提供了相关的配置步骤和常用注解的介绍。
105 0
|
1月前
|
机器学习/深度学习 算法 前端开发
集成学习任务七和八、投票法与bagging学习
集成学习任务七和八、投票法与bagging学习
18 0
|
1月前
|
机器学习/深度学习 算法
【机器学习】迅速了解什么是集成学习
【机器学习】迅速了解什么是集成学习
|
3月前
|
人工智能
LLama+Mistral+…+Yi=? 免训练异构大模型集成学习框架DeePEn来了
【8月更文挑战第6天】DeePEn是一种免训练异构大模型集成学习框架,旨在通过融合多个不同架构和参数的大模型输出概率分布,提升整体性能。它首先将各模型输出映射至统一概率空间,然后进行聚合,并最终反转回单一模型空间以生成输出。实验证明,在知识问答和推理任务上,DeePEn相比单一大模型如LLaMA和Mistral有显著提升,但其效果受模型质量和数量影响,并且计算成本较高。[论文: https://arxiv.org/abs/2404.12715]
44 1
|
3月前
|
消息中间件 Kafka 数据处理
实时数据流处理:Dask Streams 与 Apache Kafka 集成
【8月更文第29天】在现代数据处理领域,实时数据流处理已经成为不可或缺的一部分。随着物联网设备、社交媒体和其他实时数据源的普及,处理这些高吞吐量的数据流成为了一项挑战。Apache Kafka 作为一种高吞吐量的消息队列服务,被广泛应用于实时数据流处理场景中。Dask Streams 是 Dask 库的一个子模块,它为 Python 开发者提供了一个易于使用的实时数据流处理框架。本文将介绍如何将 Dask Streams 与 Apache Kafka 结合使用,以实现高效的数据流处理。
82 0
|
2月前
|
SQL 监控 druid
springboot-druid数据源的配置方式及配置后台监控-自定义和导入stater(推荐-简单方便使用)两种方式配置druid数据源
这篇文章介绍了如何在Spring Boot项目中配置和监控Druid数据源,包括自定义配置和使用Spring Boot Starter两种方法。
下一篇
无影云桌面