SpringBoot集成阿里云Kafka 示例

简介: 消息队列Kafka版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列Kafka版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。本文主要演示如何使用SpringBoot 通过公网方式集成阿里云Kakfa。

Step By Step

1、kafka控制台创建公网类型实例
2、创建SpringBoot项目集成阿里云Kafka
3、发送接收测试


一、kafka控制台创建公网类型实例

1.1 Kafka控制台创建实例

图片.png

1.2 获取认证参数

图片.png

图片.png

二、创建SpringBoot项目集成阿里云Kafka

2.1 创建Spring Boot(2.5.2)项目

图片.png

2.2 依赖

 <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

2.3 Sender.class

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class Sender {

    @Autowired
    private KafkaTemplate<String, String> template;

    public void send(String msg) {
        this.template.sendDefault("my_msg", msg);
        System.out.println("send message:" + msg);
    }
}

2.4 Receiver.class

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class Receiver {
    
    @KafkaListener(topics = { "taro_topic" }) // 参数配置要监听的Topic
    public void receiveMessage(ConsumerRecord<String, String> record) {
        System.out.println("Receive Message");
        System.out.println("【*** Message: ***】key = " + record.key() + "、value = " + record.value());
    }
}

2.5 KafkaController.class


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    @Autowired
    private Sender sender;

    @PostMapping("/send/{msg}") // 发送消息测试,注意此处为Post
    public String send(@PathVariable("msg") String msg) {
        sender.send(msg);
        return msg;
    }
}

2.6 application.yml

spring:
  kafka:
    template:
      default-topic: <topic>
    bootstrap-servers: <SSL接入点>
    jaas:
      enabled: true
      loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
      options:
        username: <用户名>
        password: <密码>
    consumer:
      ssl:
        truststoreLocation: file:/kafka.client.truststore.jks
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_SSL
        ssl.endpoint.identification.algorithm:
      group-id: <group>
      max-poll-records: 2
    producer:
      ssl:
        truststoreLocation: file:/kafka.client.truststore.jks
      retries: 3
      acks: 1
      compression-type: lz4
      buffer-memory: 33554432
      batch-size: 51200
      properties:
        send.buffer.bytes: 262144
        sasl.mechanism: PLAIN
        security.protocol: SASL_SSL
        ssl.endpoint.identification.algorithm:
kafka.client.truststore.jks 下载 地址,证书下载后直接放在C盘根目录下。

2.7 项目结构

图片.png

三、发送接收测试

3.1 启动项目,使用PostMan发送Post请求

图片.png

3.2 项目日志

图片.png

3.3 控制台消息监控查看

图片.png

更多参考

SSL接入点PLAIN机制收发消息

相关文章
|
2月前
|
弹性计算 运维 安全
云迁移最佳实践:HyperMotion助中小企业高效上云,阿里云工具集深度集成三方迁移工具
中小企业上云需求强烈,但面临缺乏了解、无合适方案及成本过高等挑战。为解决这些问题,推出“云迁移HyperMotion阿里云集成版”,提供三步上云、自助迁移、自动适配等能力,助力企业高效、低成本完成迁移。
|
8天前
|
消息中间件 运维 监控
爆款游戏背后:尚娱如何借助阿里云 Kafka Serverless 轻松驾驭“潮汐流量”?
阿里云 Kafka 不仅为尚娱提供了高可靠、低延迟的消息通道,更通过 Serverless 弹性架构实现了资源利用率和成本效益的双重优化,助力尚娱在快速迭代的游戏市场中实现敏捷运营、稳定交付与可持续增长。
|
14天前
|
消息中间件 存储 运维
嘉银科技基于阿里云 Kafka Serverless 提升业务弹性能力,节省成本超过 20%
云消息队列 Kafka 版 Serverless 系列凭借其秒级弹性扩展、按需付费、轻运维的优势,助力嘉银科技业务系统实现灵活扩缩容,在业务效率和成本优化上持续取得突破,保证服务的敏捷性和稳定性,并节省超过 20% 的成本。
102 12
|
7月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
660 0
|
4月前
|
消息中间件 存储 大数据
阿里云消息队列 Kafka 架构及典型应用场景
阿里云消息队列 Kafka 是一款基于 Apache Kafka 的分布式消息中间件,支持消息发布与订阅模型,满足微服务解耦、大数据处理及实时流数据分析需求。其通过存算分离架构优化成本与性能,提供基础版、标准版和专业版三种 Serverless 版本,分别适用于不同业务场景,最高 SLA 达 99.99%。阿里云 Kafka 还具备弹性扩容、多可用区部署、冷热数据缓存隔离等特性,并支持与 Flink、MaxCompute 等生态工具无缝集成,广泛应用于用户行为分析、数据入库等场景,显著提升数据处理效率与实时性。
|
8月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
593 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
8月前
|
缓存 运维 监控
Anolis OS深度集成运维利器 阿里云操作系统控制台上线
阿里云在百万服务器运维领域的丰富经验打造。
Anolis OS深度集成运维利器 阿里云操作系统控制台上线
|
8月前
|
机器学习/深度学习 人工智能 自然语言处理
企业级API集成方案:基于阿里云函数计算调用DeepSeek全解析
DeepSeek R1 是一款先进的大规模深度学习模型,专为自然语言处理等复杂任务设计。它具备高效的架构、强大的泛化能力和优化的参数管理,适用于文本生成、智能问答、代码生成和数据分析等领域。阿里云平台提供了高性能计算资源、合规与数据安全、低延迟覆盖和成本效益等优势,支持用户便捷部署和调用 DeepSeek R1 模型,确保快速响应和稳定服务。通过阿里云百炼模型服务,用户可以轻松体验满血版 DeepSeek R1,并享受免费试用和灵活的API调用方式。
494 12
|
7月前
|
安全 持续交付 云计算
课时5:阿里云容器服务:最原生的集成Docker和云服务
阿里云容器服务以服务化形式构建容器基础设施,大幅提升开发效率,简化应用部署流程。通过Docker容器和DevOps工具(如Jenkins),实现自动化部署与迭代,优化企业内部复杂部署问题。该服务支持GPU调度、混合云架构无缝迁移,并与阿里云产品体系无缝集成,提供安全防护、网络负载均衡等多重功能支持。凭借微服务架构,帮助企业突破业务瓶颈,提高资源利用率,轻松应对海量流量。
239 0
课时5:阿里云容器服务:最原生的集成Docker和云服务
|
8月前
|
人工智能 自然语言处理 搜索推荐
阿里云 AI 搜索开放平台集成 DeepSeek 模型
阿里云 AI 搜索开放平台最新上线 DeepSeek -R1系列模型。
390 2