SpringBoot集成阿里云Kafka 示例

简介: 消息队列Kafka版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列Kafka版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。本文主要演示如何使用SpringBoot 通过公网方式集成阿里云Kakfa。

Step By Step

1、kafka控制台创建公网类型实例
2、创建SpringBoot项目集成阿里云Kafka
3、发送接收测试


一、kafka控制台创建公网类型实例

1.1 Kafka控制台创建实例

图片.png

1.2 获取认证参数

图片.png

图片.png

二、创建SpringBoot项目集成阿里云Kafka

2.1 创建Spring Boot(2.5.2)项目

图片.png

2.2 依赖

 <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

2.3 Sender.class

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class Sender {

    @Autowired
    private KafkaTemplate<String, String> template;

    public void send(String msg) {
        this.template.sendDefault("my_msg", msg);
        System.out.println("send message:" + msg);
    }
}

2.4 Receiver.class

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class Receiver {
    
    @KafkaListener(topics = { "taro_topic" }) // 参数配置要监听的Topic
    public void receiveMessage(ConsumerRecord<String, String> record) {
        System.out.println("Receive Message");
        System.out.println("【*** Message: ***】key = " + record.key() + "、value = " + record.value());
    }
}

2.5 KafkaController.class


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    @Autowired
    private Sender sender;

    @PostMapping("/send/{msg}") // 发送消息测试,注意此处为Post
    public String send(@PathVariable("msg") String msg) {
        sender.send(msg);
        return msg;
    }
}

2.6 application.yml

spring:
  kafka:
    template:
      default-topic: <topic>
    bootstrap-servers: <SSL接入点>
    jaas:
      enabled: true
      loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
      options:
        username: <用户名>
        password: <密码>
    consumer:
      ssl:
        truststoreLocation: file:/kafka.client.truststore.jks
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_SSL
        ssl.endpoint.identification.algorithm:
      group-id: <group>
      max-poll-records: 2
    producer:
      ssl:
        truststoreLocation: file:/kafka.client.truststore.jks
      retries: 3
      acks: 1
      compression-type: lz4
      buffer-memory: 33554432
      batch-size: 51200
      properties:
        send.buffer.bytes: 262144
        sasl.mechanism: PLAIN
        security.protocol: SASL_SSL
        ssl.endpoint.identification.algorithm:
kafka.client.truststore.jks 下载 地址,证书下载后直接放在C盘根目录下。

2.7 项目结构

图片.png

三、发送接收测试

3.1 启动项目,使用PostMan发送Post请求

图片.png

3.2 项目日志

图片.png

3.3 控制台消息监控查看

图片.png

更多参考

SSL接入点PLAIN机制收发消息

相关文章
|
1月前
|
云安全 人工智能 安全
Dify平台集成阿里云AI安全护栏,构建AI Runtime安全防线
阿里云 AI 安全护栏加入Dify平台,打造可信赖的 AI
|
1月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
311 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
3月前
|
弹性计算 运维 安全
云迁移最佳实践:HyperMotion助中小企业高效上云,阿里云工具集深度集成三方迁移工具
中小企业上云需求强烈,但面临缺乏了解、无合适方案及成本过高等挑战。为解决这些问题,推出“云迁移HyperMotion阿里云集成版”,提供三步上云、自助迁移、自动适配等能力,助力企业高效、低成本完成迁移。
125 0
|
2月前
|
消息中间件 运维 监控
爆款游戏背后:尚娱如何借助阿里云 Kafka Serverless 轻松驾驭“潮汐流量”?
阿里云 Kafka 不仅为尚娱提供了高可靠、低延迟的消息通道,更通过 Serverless 弹性架构实现了资源利用率和成本效益的双重优化,助力尚娱在快速迭代的游戏市场中实现敏捷运营、稳定交付与可持续增长。
175 29
|
2月前
|
消息中间件 存储 运维
嘉银科技基于阿里云 Kafka Serverless 提升业务弹性能力,节省成本超过 20%
云消息队列 Kafka 版 Serverless 系列凭借其秒级弹性扩展、按需付费、轻运维的优势,助力嘉银科技业务系统实现灵活扩缩容,在业务效率和成本优化上持续取得突破,保证服务的敏捷性和稳定性,并节省超过 20% 的成本。
185 23
|
4月前
|
JSON 分布式计算 大数据
springboot项目集成大数据第三方dolphinscheduler调度器
springboot项目集成大数据第三方dolphinscheduler调度器
267 3
|
4月前
|
缓存 JSON 前端开发
第07课:Spring Boot集成Thymeleaf模板引擎
第07课:Spring Boot集成Thymeleaf模板引擎
510 0
第07课:Spring Boot集成Thymeleaf模板引擎
|
4月前
|
Java 关系型数据库 MySQL
springboot项目集成dolphinscheduler调度器 实现datax数据同步任务
springboot项目集成dolphinscheduler调度器 实现datax数据同步任务
563 2
|
分布式计算 大数据 Java
springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
88 0