大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka (正在更新…)

章节内容

上节我们完成了:


topics.sh、producer.sh、consumer.sh 脚本的基本使用

pom.xml 配置

JavaAPI的使用:producer 和 consumer

简单介绍

在Spring Boot中使用Kafka,是构建分布式消息驱动应用程序的一种常见方法。Kafka的强大之处在于其高吞吐量、低延迟和良好的可扩展性,非常适合处理大量实时数据。


Kafka的基本概念

Producer(生产者): 负责向Kafka的主题(topic)发送消息。

Consumer(消费者): 从Kafka的主题中读取消息。

Broker(代理): Kafka集群中的节点,负责消息的存储和传输。

Topic(主题): 类似于消息队列的概念,用于分类和组织消息。一个topic可以有多个分区(partition),每个分区是一个日志(log)。

Partition(分区): Kafka中的主题被分成多个分区,每个分区内部的消息是有序的,但分区之间是无序的。

Consumer Group(消费者组): 一组消费者组成的一个逻辑订阅者,保证每条消息在消费者组中只被一个消费者消费。

spring-kafka

Spring-Kafka 是 Spring 框架对 Apache Kafka 的集成,使得在 Spring 应用中使用 Kafka 更加简便和直观。它提供了一系列功能和配置选项来帮助开发者快速构建基于消息驱动的微服务架构。


KafkaTemplate

KafkaTemplate 是 Spring-Kafka 提供的用于发送消息的核心类。它简化了生产者与 Kafka 交互的过程。你可以通过这个类轻松地将消息发送到 Kafka 的主题中。


KafkaListener

@KafkaListener 是用于消费 Kafka 消息的注解。通过这个注解,可以非常方便地定义消息消费者,处理从指定主题接收到的消息。


Spring-Kafka 的配置

Spring-Kafka 支持通过配置文件来配置 Kafka 客户端的属性。这些配置可以在 application.properties 或 application.yml 中指定。


架构图

上节已经出现过了,这里再放一次

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>springboot-kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

配置文件

我们常见的配置文件如下图:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    template:
      default-topic: my-topic

Producer

编写代码

编写了一个KafkaProducerController

里边写了两个方法,都是使用了 KafkaTemplate 的工具。

@RestController
public class KafkaProducerController {

    @Resource
    private KafkaTemplate<Integer, String> kafkaTemplate;

    @RequestMapping("/sendSync/{message}")
    public String sendSync(@PathVariable String message) {
        ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 1, message);
        ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record);
        try {
            SendResult<Integer, String> result = future.get();
            System.out.println(result.getProducerRecord().key() + "->" +
                    result.getProducerRecord().partition() + "->" +
                    result.getProducerRecord().timestamp());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "Success";
    }

    @RequestMapping("/sendAsync/{message}")
    public String sendAsync(@PathVariable String message) {
        ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 2, message);
        ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record);
        future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送失败!");
                ex.printStackTrace();
            }

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                System.out.println("发送成功");
                System.out.println(result.getProducerRecord().key() + "->" +
                        result.getProducerRecord().partition() + "->" +
                        result.getProducerRecord().timestamp());
            }
        });
        return "Success";
    }

}

测试结果

http://localhost:8085/sendSync/wzktest1
http://localhost:8085/sendAsync/wzktest2
http://localhost:8085/sendAsync/wzktest222222

我们观察控制台的效果如下:

Consumer

编写代码

编一个类来实现Consumer:

@Configuration
public class KafkaConsumer {

    @KafkaListener(topics = {"wzk_topic_test"})
    public void consume(ConsumerRecord<Integer, String> consumerRecord) {
        System.out.println(
                consumerRecord.topic() + "\t"
                        + consumerRecord.partition() + "\t"
                        + consumerRecord.offset() + "\t"
                        + consumerRecord.key() + "\t"
                        + consumerRecord.value());
    }

}

测试运行

2024-07-12 13:48:46.831  INFO 15352 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=wzk-test] Setting offset for partition wzk_topic_test-0 to the committed offset FetchPosition{offset=13, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=h121.wzk.icu:9092 (id: 0 rack: null), epoch=0}}
2024-07-12 13:48:46.926  INFO 15352 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : wzk-test: partitions assigned: [wzk_topic_test-0]
wzk_topic_test  0 13  1 wzktest
wzk_topic_test  0 14  2 wzktest222
wzk_topic_test  0 15  2 wzktest222222

控制台的截图如下:

目录
相关文章
|
2月前
|
Java Linux iOS开发
如何配置 Java 环境变量:设置 JAVA_HOME 和 PATH
本文详细介绍如何在Windows和Linux/macOS系统上配置Java环境变量。
1671 12
|
2月前
|
Java
CentOS7.8配置Adoptium-Java17运行环境
本指南介绍如何设置清华镜像源并安装 Temurin-17-JRE 运行环境。首先,编辑 `/etc/yum.repos.d/adoptium.repo` 文件,配置清华镜像源。接着,使用 `yum install -y temurin-17-jre` 命令安装 Temurin-17-JRE,并通过 `java --version` 验证安装成功。相关配置和操作界面截图附后。
63 8
|
2月前
|
前端开发 NoSQL Java
【Java若依框架】RuoYi-Vue的前端和后端配置步骤和启动步骤
本文介绍了如何配置和启动基于Java的若依(RuoYi)项目,涵盖后端和前端的详细步骤。首先,准备Redis、MySQL以及IDE(如Idea和VS)。接着,通过GitHub获取代码并导入到IDE中,执行必要的SQL文件和配置数据库密码。然后,启动Redis并进行相关配置。最后,按照前端配置步骤克隆前端代码库,打开终端执行命令完成前端配置。整个过程详细记录了每一步的操作,帮助开发者顺利部署若依项目。 如果你觉得有帮助,请点赞、关注和收藏,这将是我持续分享的动力!
565 1
|
2月前
|
前端开发 Java 开发工具
Git使用教程-将idea本地Java等文件配置到gitte上【保姆级教程】
本内容详细介绍了使用Git进行版本控制的全过程,涵盖从本地仓库创建到远程仓库配置,以及最终推送代码至远程仓库的步骤。
53 0
|
10月前
|
Java
使用Java代码打印log日志
使用Java代码打印log日志
354 1
|
Java BI API
在Java代码中打日志需要注意什么?
日志是什么?日志是你在代码运行时打印出来的一些数据和记录,是快速排查问题的好帮手,是撕逼和甩锅的利器!
733 0
|
缓存 Java 网络架构
别在 Java 代码里乱打日志了,这才是正确的打日志姿势!
别在 Java 代码里乱打日志了,这才是正确的打日志姿势!
180 0
|
Java BI Apache
在Java代码中打日志需要注意什么?
云栖号资讯:【点击查看更多行业资讯】在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来! 为什么要打日志? 日志是什么?日志是你在代码运行时打印出来的一些数据和记录,是快速排查问题的好帮手! 做一件事情之前,先思考为什么。
在Java代码中打日志需要注意什么?
|
缓存 架构师 搜索推荐
别在 Java 代码里乱打日志了,这才是正确的日志打印姿势!
使用门面模式的日志框架,有利于维护和各个类的日志处理方式统一。
|
Java Android开发 C语言
02_JNI中Java代码调用C代码,Android中使用log库打印日志,javah命令的使用,Android.mk文件的编写,交叉编译
 1  编写以下案例(下面的三个按钮都调用了底层的C语言): 项目案例的代码结构如下: 2 编写DataProvider的代码: package com.example.ndkpassdata;   public class DataProvider {         /**      * 计算x和y的加法  apktools      *
1389 0