Java:SpringBoot整合Kafka实现数据异步处理

简介: Java:SpringBoot整合Kafka实现数据异步处理

SpringBoot项目中,使用Kafka可以实现数据异步处理

目录

下载安装Kafka

# 从清华镜像下载
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.2/kafka_2.13-2.8.2.tgz
# 解压
tar -zxvf kafka_2.13-2.8.2.tgz
cd kafka_2.13-2.8.2
# 使用单节点的ZooKeeper
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
启动Kafka
./bin/kafka-server-start.sh config/server.properties

SpringBoot引入Kafka

项目结构

$ tree 
.
├── pom.xml
└── src
    ├── main
    │   ├── java
    │   │   └── com
    │   │       └── example
    │   │           └── demo
    │   │               ├── Application.java
    │   │               └── kafkaConsumer
    │   │                   └── DataKafkaConsumer.java
    │   └── resources
    │       └── application.yml
    └── test
        └── java
            └── com
                └── example
                    └── demo
                        └── KafkaProducerTest.java

引入依赖pom.xml

<!--kafka-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.9.1</version>
</dependency>

maven仓库: https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka

SpringBoot和Kafka版本对应关系: https://spring.io/projects/spring-kafka#overview

image.png

完整依赖 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.7</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.9.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

配置 application.yml

spring:
  kafka:
    # 指定kafka server的地址,集群配多个,中间,逗号隔开
    bootstrap-servers: 127.0.0.1:9092
    producer:
      # 重试次数
      retries: 3
      # 批量发送的消息数量
      batch-size: 1000
      # 32MB的批处理缓冲区
      buffer-memory: 33554432
    # 默认消费者组
    consumer:
      group-id: consumer-group
      # 最早未被消费的offset
      auto-offset-reset: earliest
      # 批量一次最大拉取数据量
      max-poll-records: 4000
      # 是否自动提交
      enable-auto-commit: true
      # 自动提交时间间隔,单位ms
      auto-commit-interval: 1000

消费者

DataKafkaConsumer.java

package com.example.demo.kafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
 * 消费者
 */
@Component
public class DataKafkaConsumer {
    @KafkaListener(topics = {"data_topic"})
    public void consumer(ConsumerRecord<String, String> consumerRecord) {
        System.out.println(consumerRecord.toString());
    }
}

生产者

KafkaProducerTest.java

package com.example.demo;
import com.alibaba.fastjson.JSONObject;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.LinkedHashMap;
import java.util.Map;
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Test
    public void testSend() {
        Map<String, Object> map = new LinkedHashMap<>();
        map.put("userId", 1);
        map.put("name", "Tom");
        // 向kafka推送数据
        kafkaTemplate.send("data_topic", JSONObject.toJSONString(map));
    }
}

消息发送和接收

  1. 启动SpringBoot项目
  2. 通过测试类KafkaProducerTest 发送Kafka消息
  3. 消费者监听到的数据
ConsumerRecord(
    topic = data_topic, 
    partition = 0, 
    leaderEpoch = 0, 
    offset = 5000, 
    CreateTime = 1676945951341, 
    serialized key size = -1, 
    serialized value size = 25, 
    headers = RecordHeaders(
        headers = [], 
        isReadOnly = false
    ), 
    key = null, 
    value = {"userId":1,"name":"Tom"}
)

参考


相关文章
|
4月前
|
Java API 开发工具
【Azure Developer】Java代码实现获取Azure 资源的指标数据却报错 "invalid time interval input"
在使用 Java 调用虚拟机 API 获取指标数据时,因本地时区设置非 UTC,导致时间格式解析错误。解决方法是在代码中手动指定时区为 UTC,使用 `ZoneOffset.ofHours(0)` 并结合 `withOffsetSameInstant` 方法进行时区转换,从而避免因时区差异引发的时间格式问题。
271 4
|
4月前
|
算法 Java
Java多线程编程:实现线程间数据共享机制
以上就是Java中几种主要处理多线程序列化资源以及协调各自独立运行但需相互配合以完成任务threads 的技术手段与策略。正确应用上述技术将大大增强你程序稳定性与效率同时也降低bug出现率因此深刻理解每项技术背后理论至关重要.
359 16
|
5月前
|
数据采集 JSON Java
Java爬虫获取1688店铺所有商品接口数据实战指南
本文介绍如何使用Java爬虫技术高效获取1688店铺商品信息,涵盖环境搭建、API调用、签名生成及数据抓取全流程,并附完整代码示例,助力市场分析与选品决策。
|
5月前
|
数据采集 存储 前端开发
Java爬虫性能优化:多线程抓取JSP动态数据实践
Java爬虫性能优化:多线程抓取JSP动态数据实践
|
5月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
294 7
|
6月前
|
Java 数据库连接 API
Java 8 + 特性及 Spring Boot 与 Hibernate 等最新技术的实操内容详解
本内容涵盖Java 8+核心语法、Spring Boot与Hibernate实操,按考试考点分类整理,含技术详解与代码示例,助力掌握最新Java技术与应用。
198 2
|
7月前
|
Java 数据库连接 API
Java 对象模型现代化实践 基于 Spring Boot 与 MyBatis Plus 的实现方案深度解析
本文介绍了基于Spring Boot与MyBatis-Plus的Java对象模型现代化实践方案。采用Spring Boot 3.1.2作为基础框架,结合MyBatis-Plus 3.5.3.1进行数据访问层实现,使用Lombok简化PO对象,MapStruct处理对象转换。文章详细讲解了数据库设计、PO对象实现、DAO层构建、业务逻辑封装以及DTO/VO转换等核心环节,提供了一个完整的现代化Java对象模型实现案例。通过分层设计和对象转换,实现了业务逻辑与数据访问的解耦,提高了代码的可维护性和扩展性。
299 1
|
7月前
|
Java 调度 流计算
基于Java 17 + Spring Boot 3.2 + Flink 1.18的智慧实验室管理系统核心代码
这是一套基于Java 17、Spring Boot 3.2和Flink 1.18开发的智慧实验室管理系统核心代码。系统涵盖多协议设备接入(支持OPC UA、MQTT等12种工业协议)、实时异常检测(Flink流处理引擎实现设备状态监控)、强化学习调度(Q-Learning算法优化资源分配)、三维可视化(JavaFX与WebGL渲染实验室空间)、微服务架构(Spring Cloud构建分布式体系)及数据湖建设(Spark构建实验室数据仓库)。实际应用中,该系统显著提升了设备调度效率(响应时间从46分钟降至9秒)、设备利用率(从41%提升至89%),并大幅减少实验准备时间和维护成本。
401 0
|
前端开发 Java Spring
Java:SpringBoot实现文件上传
Java:SpringBoot实现文件上传
356 0