Java:SpringBoot整合Kafka实现数据异步处理

简介: Java:SpringBoot整合Kafka实现数据异步处理

SpringBoot项目中,使用Kafka可以实现数据异步处理

目录

下载安装Kafka

# 从清华镜像下载
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.2/kafka_2.13-2.8.2.tgz
# 解压
tar -zxvf kafka_2.13-2.8.2.tgz
cd kafka_2.13-2.8.2
# 使用单节点的ZooKeeper
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
启动Kafka
./bin/kafka-server-start.sh config/server.properties

SpringBoot引入Kafka

项目结构

$ tree 
.
├── pom.xml
└── src
    ├── main
    │   ├── java
    │   │   └── com
    │   │       └── example
    │   │           └── demo
    │   │               ├── Application.java
    │   │               └── kafkaConsumer
    │   │                   └── DataKafkaConsumer.java
    │   └── resources
    │       └── application.yml
    └── test
        └── java
            └── com
                └── example
                    └── demo
                        └── KafkaProducerTest.java

引入依赖pom.xml

<!--kafka-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.9.1</version>
</dependency>

maven仓库: https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka

SpringBoot和Kafka版本对应关系: https://spring.io/projects/spring-kafka#overview

image.png

完整依赖 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.7</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.9.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

配置 application.yml

spring:
  kafka:
    # 指定kafka server的地址,集群配多个,中间,逗号隔开
    bootstrap-servers: 127.0.0.1:9092
    producer:
      # 重试次数
      retries: 3
      # 批量发送的消息数量
      batch-size: 1000
      # 32MB的批处理缓冲区
      buffer-memory: 33554432
    # 默认消费者组
    consumer:
      group-id: consumer-group
      # 最早未被消费的offset
      auto-offset-reset: earliest
      # 批量一次最大拉取数据量
      max-poll-records: 4000
      # 是否自动提交
      enable-auto-commit: true
      # 自动提交时间间隔,单位ms
      auto-commit-interval: 1000

消费者

DataKafkaConsumer.java

package com.example.demo.kafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
 * 消费者
 */
@Component
public class DataKafkaConsumer {
    @KafkaListener(topics = {"data_topic"})
    public void consumer(ConsumerRecord<String, String> consumerRecord) {
        System.out.println(consumerRecord.toString());
    }
}

生产者

KafkaProducerTest.java

package com.example.demo;
import com.alibaba.fastjson.JSONObject;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.LinkedHashMap;
import java.util.Map;
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Test
    public void testSend() {
        Map<String, Object> map = new LinkedHashMap<>();
        map.put("userId", 1);
        map.put("name", "Tom");
        // 向kafka推送数据
        kafkaTemplate.send("data_topic", JSONObject.toJSONString(map));
    }
}

消息发送和接收

  1. 启动SpringBoot项目
  2. 通过测试类KafkaProducerTest 发送Kafka消息
  3. 消费者监听到的数据
ConsumerRecord(
    topic = data_topic, 
    partition = 0, 
    leaderEpoch = 0, 
    offset = 5000, 
    CreateTime = 1676945951341, 
    serialized key size = -1, 
    serialized value size = 25, 
    headers = RecordHeaders(
        headers = [], 
        isReadOnly = false
    ), 
    key = null, 
    value = {"userId":1,"name":"Tom"}
)

参考


相关文章
|
28天前
|
前端开发 JavaScript Java
java常用数据判空、比较和类型转换
本文介绍了Java开发中常见的数据处理技巧,包括数据判空、数据比较和类型转换。详细讲解了字符串、Integer、对象、List、Map、Set及数组的判空方法,推荐使用工具类如StringUtils、Objects等。同时,讨论了基本数据类型与引用数据类型的比较方法,以及自动类型转换和强制类型转换的规则。最后,提供了数值类型与字符串互相转换的具体示例。
|
2天前
|
存储 Java BI
java怎么统计每个项目下的每个类别的数据
通过本文,我们详细介绍了如何在Java中统计每个项目下的每个类别的数据,包括数据模型设计、数据存储和统计方法。通过定义 `Category`和 `Project`类,并使用 `ProjectManager`类进行管理,可以轻松实现项目和类别的数据统计。希望本文能够帮助您理解和实现类似的统计需求。
38 17
|
2月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
72 5
|
2月前
|
JSON Java 程序员
Java|如何用一个统一结构接收成员名称不固定的数据
本文介绍了一种 Java 中如何用一个统一结构接收成员名称不固定的数据的方法。
26 3
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
56 1
|
2月前
|
Java 程序员 容器
Java中的变量和常量:数据的‘小盒子’和‘铁盒子’有啥不一样?
在Java中,变量是一个可以随时改变的数据容器,类似于一个可以反复打开的小盒子。定义变量时需指定数据类型和名称。例如:`int age = 25;` 表示定义一个整数类型的变量 `age`,初始值为25。 常量则是不可改变的数据容器,类似于一个锁死的铁盒子,定义时使用 `final` 关键字。例如:`final int MAX_SPEED = 120;` 表示定义一个名为 `MAX_SPEED` 的常量,值为120,且不能修改。 变量和常量的主要区别在于变量的数据可以随时修改,而常量的数据一旦确定就不能改变。常量主要用于防止意外修改、提高代码可读性和便于维护。
|
2月前
|
存储 缓存 安全
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见。本文介绍了使用 `File.createTempFile` 方法和自定义创建临时文件的两种方式,详细探讨了它们的使用场景和注意事项,包括数据缓存、文件上传下载和日志记录等。强调了清理临时文件、确保文件名唯一性和合理设置文件权限的重要性。
125 2
|
2月前
|
Java
Java 8 引入的 Streams 功能强大,提供了一种简洁高效的处理数据集合的方式
Java 8 引入的 Streams 功能强大,提供了一种简洁高效的处理数据集合的方式。本文介绍了 Streams 的基本概念和使用方法,包括创建 Streams、中间操作和终端操作,并通过多个案例详细解析了过滤、映射、归并、排序、分组和并行处理等操作,帮助读者更好地理解和掌握这一重要特性。
37 2
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
111 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
63 1