Java:SpringBoot整合Kafka实现数据异步处理

简介: Java:SpringBoot整合Kafka实现数据异步处理

SpringBoot项目中,使用Kafka可以实现数据异步处理

目录

下载安装Kafka

# 从清华镜像下载
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.2/kafka_2.13-2.8.2.tgz
# 解压
tar -zxvf kafka_2.13-2.8.2.tgz
cd kafka_2.13-2.8.2
# 使用单节点的ZooKeeper
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
启动Kafka
./bin/kafka-server-start.sh config/server.properties

SpringBoot引入Kafka

项目结构

$ tree 
.
├── pom.xml
└── src
    ├── main
    │   ├── java
    │   │   └── com
    │   │       └── example
    │   │           └── demo
    │   │               ├── Application.java
    │   │               └── kafkaConsumer
    │   │                   └── DataKafkaConsumer.java
    │   └── resources
    │       └── application.yml
    └── test
        └── java
            └── com
                └── example
                    └── demo
                        └── KafkaProducerTest.java

引入依赖pom.xml

<!--kafka-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.9.1</version>
</dependency>

maven仓库: https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka

SpringBoot和Kafka版本对应关系: https://spring.io/projects/spring-kafka#overview

image.png

完整依赖 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.7</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--kafka-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.9.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

配置 application.yml

spring:
  kafka:
    # 指定kafka server的地址,集群配多个,中间,逗号隔开
    bootstrap-servers: 127.0.0.1:9092
    producer:
      # 重试次数
      retries: 3
      # 批量发送的消息数量
      batch-size: 1000
      # 32MB的批处理缓冲区
      buffer-memory: 33554432
    # 默认消费者组
    consumer:
      group-id: consumer-group
      # 最早未被消费的offset
      auto-offset-reset: earliest
      # 批量一次最大拉取数据量
      max-poll-records: 4000
      # 是否自动提交
      enable-auto-commit: true
      # 自动提交时间间隔,单位ms
      auto-commit-interval: 1000

消费者

DataKafkaConsumer.java

package com.example.demo.kafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
 * 消费者
 */
@Component
public class DataKafkaConsumer {
    @KafkaListener(topics = {"data_topic"})
    public void consumer(ConsumerRecord<String, String> consumerRecord) {
        System.out.println(consumerRecord.toString());
    }
}

生产者

KafkaProducerTest.java

package com.example.demo;
import com.alibaba.fastjson.JSONObject;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.LinkedHashMap;
import java.util.Map;
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Test
    public void testSend() {
        Map<String, Object> map = new LinkedHashMap<>();
        map.put("userId", 1);
        map.put("name", "Tom");
        // 向kafka推送数据
        kafkaTemplate.send("data_topic", JSONObject.toJSONString(map));
    }
}

消息发送和接收

  1. 启动SpringBoot项目
  2. 通过测试类KafkaProducerTest 发送Kafka消息
  3. 消费者监听到的数据
ConsumerRecord(
    topic = data_topic, 
    partition = 0, 
    leaderEpoch = 0, 
    offset = 5000, 
    CreateTime = 1676945951341, 
    serialized key size = -1, 
    serialized value size = 25, 
    headers = RecordHeaders(
        headers = [], 
        isReadOnly = false
    ), 
    key = null, 
    value = {"userId":1,"name":"Tom"}
)

参考


相关文章
|
5天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
560 0
|
2天前
|
消息中间件 Java Kafka
集成Kafka到Spring Boot项目中的步骤和配置
集成Kafka到Spring Boot项目中的步骤和配置
24 7
|
4天前
|
消息中间件 Java Kafka
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
|
4天前
|
NoSQL Java 数据库
SpringBoot实用开发篇第三章(数据层解决方案操作)
SpringBoot实用开发篇第三章(数据层解决方案操作)
|
2天前
|
Java 数据库连接 数据库
实现Spring Boot与MyBatis结合进行数据库历史数据的定时迁移
实现Spring Boot与MyBatis结合进行数据库历史数据的定时迁移
13 2
|
3天前
|
easyexcel Java API
SpringBoot集成EasyExcel 3.x:高效实现Excel数据的优雅导入与导出
SpringBoot集成EasyExcel 3.x:高效实现Excel数据的优雅导入与导出
13 1
|
5天前
|
Java
在 Java 中,类是一种定义对象的模板,它包含数据成员(字段)和方法。
在 Java 中,类是一种定义对象的模板,它包含数据成员(字段)和方法。
|
1天前
|
存储 Java 关系型数据库
数据类型的取值范围以及Java和Mysql数据库的类型对照--强调时间类型的转换(jdk1.8)
数据类型的取值范围以及Java和Mysql数据库的类型对照--强调时间类型的转换(jdk1.8)
5 0
|
3天前
|
JavaScript Java 测试技术
基于SpringBoot+Vue+uniapp的个人健康数据管理系统的详细设计和实现(源码+lw+部署文档+讲解等)
基于SpringBoot+Vue+uniapp的个人健康数据管理系统的详细设计和实现(源码+lw+部署文档+讲解等)
|
4天前
|
存储 Java API
深入剖析Java Map:不只是存储数据,更是设计艺术的体现!
【6月更文挑战第18天】Java Map是键值对数据结构的艺术,展示了设计效率与易用性的平衡。HashMap利用哈希表实现快速访问,TreeMap通过红黑树保证排序。选择合适的实现类如HashMap、TreeMap或LinkedHashMap至关重要。注意空指针异常,谨慎在遍历时修改Map。Map的高效使用能提升编程效果。