SpringBoot项目中,使用Kafka可以实现数据异步处理
目录
下载安装Kafka
# 从清华镜像下载 wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.2/kafka_2.13-2.8.2.tgz # 解压 tar -zxvf kafka_2.13-2.8.2.tgz cd kafka_2.13-2.8.2 # 使用单节点的ZooKeeper $ ./bin/zookeeper-server-start.sh config/zookeeper.properties 启动Kafka ./bin/kafka-server-start.sh config/server.properties
SpringBoot引入Kafka
项目结构
$ tree . ├── pom.xml └── src ├── main │ ├── java │ │ └── com │ │ └── example │ │ └── demo │ │ ├── Application.java │ │ └── kafkaConsumer │ │ └── DataKafkaConsumer.java │ └── resources │ └── application.yml └── test └── java └── com └── example └── demo └── KafkaProducerTest.java
引入依赖pom.xml
<!--kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.9.1</version> </dependency>
maven仓库: https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka
SpringBoot和Kafka版本对应关系: https://spring.io/projects/spring-kafka#overview
完整依赖 pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.7</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.9.1</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
配置 application.yml
spring: kafka: # 指定kafka server的地址,集群配多个,中间,逗号隔开 bootstrap-servers: 127.0.0.1:9092 producer: # 重试次数 retries: 3 # 批量发送的消息数量 batch-size: 1000 # 32MB的批处理缓冲区 buffer-memory: 33554432 # 默认消费者组 consumer: group-id: consumer-group # 最早未被消费的offset auto-offset-reset: earliest # 批量一次最大拉取数据量 max-poll-records: 4000 # 是否自动提交 enable-auto-commit: true # 自动提交时间间隔,单位ms auto-commit-interval: 1000
消费者
DataKafkaConsumer.java
package com.example.demo.kafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * 消费者 */ @Component public class DataKafkaConsumer { @KafkaListener(topics = {"data_topic"}) public void consumer(ConsumerRecord<String, String> consumerRecord) { System.out.println(consumerRecord.toString()); } }
生产者
KafkaProducerTest.java
package com.example.demo; import com.alibaba.fastjson.JSONObject; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.junit4.SpringRunner; import java.util.LinkedHashMap; import java.util.Map; @RunWith(SpringRunner.class) @SpringBootTest public class KafkaProducerTest { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Test public void testSend() { Map<String, Object> map = new LinkedHashMap<>(); map.put("userId", 1); map.put("name", "Tom"); // 向kafka推送数据 kafkaTemplate.send("data_topic", JSONObject.toJSONString(map)); } }
消息发送和接收
- 启动SpringBoot项目
- 通过测试类KafkaProducerTest 发送Kafka消息
- 消费者监听到的数据
ConsumerRecord( topic = data_topic, partition = 0, leaderEpoch = 0, offset = 5000, CreateTime = 1676945951341, serialized key size = -1, serialized value size = 25, headers = RecordHeaders( headers = [], isReadOnly = false ), key = null, value = {"userId":1,"name":"Tom"} )