集成Kafka到Spring Boot项目中的步骤和配置

简介: 集成Kafka到Spring Boot项目中的步骤和配置

集成Kafka到Spring Boot项目中的步骤和配置如下:

 

### 1. 添加依赖

 

首先,确保在`pom.xml`中添加Spring Kafka依赖:

```xml
    org.springframework.kafka
    spring-kafka
    2.7.6 
```

### 2. 配置Kafka连接信息

 

在`application.properties`或`application.yml`中配置Kafka的连接信息:

 

#### 单个Kafka Broker的配置示例:

```properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group-id
```

#### 多个Kafka Broker的配置示例:

```properties
spring.kafka.bootstrap-servers=broker1:9092,broker2:9092,broker3:9092
spring.kafka.consumer.group-id=my-group-id
```

### 3. 编写Kafka生产者

 

创建一个Kafka生产者来发送消息到指定的Topic:

```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
 
@Component
public class KafkaProducer {
 
    @Autowired
    private KafkaTemplate kafkaTemplate;
 
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}
```

### 4. 编写Kafka消费者

 

创建一个Kafka消费者来监听指定的Topic,并处理接收到的消息:

```java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
@Component
public class KafkaConsumer {
 
    @KafkaListener(topics = "my-topic", groupId = "my-group-id")
    public void listen(String message) {
        System.out.println("Received Message in group my-group-id: " + message);
        // 处理接收到的消息逻辑
    }
}
```

 

### 5. 配置序列化器和反序列化器(可选)

 

默认情况下,Spring Kafka使用`StringSerializer`和`StringDeserializer`来序列化和反序列化消息。如果需要使用其他格式,如JSON,可以配置对应的序列化器和反序列化器。

 

例如,配置使用JSON格式的序列化器和反序列化器:

```java
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.KafkaTemplate;
@Configuration
```

### 1. 使用JSON格式的序列化器和反序列化器

 

如果你希望在消息的传输过程中使用JSON格式,可以配置如下的序列化器和反序列化器。

 

#### 添加依赖

 

确保在`pom.xml`中添加相应的依赖:

```xml
    org.springframework.kafka
    spring-kafka
    org.springframework.kafka
    spring-kafka-json
```

#### 配置KafkaTemplate使用JSON序列化器

```java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerializer;
 
@Configuration
public class KafkaConfig {
 
    @Bean
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()),
                new JsonSerializer<>());
    }
 
    // 如果需要其他配置,比如Bootstrap地址,可以在这里进行配置
}
```

#### 配置KafkaConsumerFactory使用JSON反序列化器

 

```java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
 
import java.util.HashMap;
import java.util.Map;
 
@Configuration
public class KafkaConfig {
 
    // 配置消费者工厂,使用JSON反序列化器
    @Bean
    public DefaultKafkaConsumerFactory kafkaConsumerFactory() {
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // 如果你信任所有的包,可以使用通配符*
 
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
                new JsonDeserializer<>(YourMessageClass.class));
    }
 
    // 配置Kafka监听器容器
    @Bean
    public ConcurrentMessageListenerContainer kafkaListenerContainerFactory() {
        ContainerProperties containerProperties = new ContainerProperties("my-topic");
        return new ConcurrentMessageListenerContainer<>(kafkaConsumerFactory(), containerProperties);
    }
}
```

 

### 2. 手动配置Topic

 

如果需要在应用启动时自动创建Topic,可以进行如下配置:

```java
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.TopicBuilder;
 
@Configuration
@EnableKafka
public class KafkaTopicConfig {
 
    @Bean
    public NewTopic myTopic() {
        return TopicBuilder.name("my-topic")
                .partitions(1)
                .replicas(1)
                .build();
    }
}
```

### 3. 添加Kafka Admin Bean

 

如果需要在应用启动时执行Kafka管理操作(如创建Topic),可以配置KafkaAdmin Bean:

 

```java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
 
import java.util.HashMap;
import java.util.Map;
 
@Configuration
public class KafkaConfig {
 
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map configs = new HashMap<>();
        configs.put("bootstrap.servers", "localhost:9092");
        return new KafkaAdmin(configs);
    }
}
```

### 总结

 

以上是一些常用的Spring Boot集成Kafka的配置和补充内容,根据你的具体需求和项目情况,可以选择性地应用这些配置。配置的核心在于正确设置Kafka的连接信息、序列化器和反序列化器,以及必要的Topic管理。

目录
相关文章
|
25天前
|
JSON 安全 算法
|
5天前
|
存储 运维 安全
Spring运维之boot项目多环境(yaml 多文件 proerties)及分组管理与开发控制
通过以上措施,可以保证Spring Boot项目的配置管理在专业水准上,并且易于维护和管理,符合搜索引擎收录标准。
17 2
|
1月前
|
设计模式 前端开发 Java
Spring MVC——项目创建和建立请求连接
MVC是一种软件架构设计模式,将应用分为模型、视图和控制器三部分。Spring MVC是基于MVC模式的Web框架,通过`@RequestMapping`等注解实现URL路由映射,支持GET和POST请求,并可传递参数。创建Spring MVC项目与Spring Boot类似,使用`@RestController`注解标记控制器类。
34 1
Spring MVC——项目创建和建立请求连接
|
1月前
|
Java 关系型数据库 MySQL
Maven——创建 Spring Boot项目
Maven 是一个项目管理工具,通过配置 `pom.xml` 文件自动获取所需的 jar 包,简化了项目的构建和管理过程。其核心功能包括项目构建和依赖管理,支持创建、编译、测试、打包和发布项目。Maven 仓库分为本地仓库和远程仓库,远程仓库包括中央仓库、私服和其他公共库。此外,文档还介绍了如何创建第一个 SpringBoot 项目并实现简单的 HTTP 请求响应。
116 1
Maven——创建 Spring Boot项目
|
1月前
|
Java 关系型数据库 MySQL
如何使用 maven 创建一个 Spring Boot项目
Maven 是一个强大的项目管理工具,通过配置 `pom.xml` 文件自动获取所需的 jar 包,提高开发效率。其核心功能包括项目构建和依赖管理。项目构建支持编译、测试、打包和发布等流程,而依赖管理则通过中央仓库、本地仓库和私有服务器获取和管理项目依赖。示例中展示了如何创建第一个 SpringBoot 项目并实现简单接口。
24 1
如何使用 maven 创建一个 Spring Boot项目
|
17天前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
1月前
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
74 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
1月前
|
Java 应用服务中间件 Android开发
Eclipse创建Spring项目
本文介绍了在Eclipse中创建Spring项目的步骤,包括如何配置Tomcat服务器、创建项目、部署项目到Tomcat以及添加Spring框架所需的JAR包。
53 1
Eclipse创建Spring项目
|
1月前
|
Java Spring
ij社区版如何创建spring项目
如何在IntelliJ IDEA社区版中创建Spring项目,包括安装Spring Boot Helper插件的步骤和创建过程。
65 1
ij社区版如何创建spring项目
|
1月前
|
Java Apache Maven
Java/Spring项目的包开头为什么是com?
本文介绍了 Maven 项目的初始结构,并详细解释了 Java 包命名惯例中的域名反转规则。通过域名反转(如 `com.example`),可以确保包名的唯一性,避免命名冲突,提高代码的可读性和逻辑分层。文章还讨论了域名反转的好处,包括避免命名冲突、全球唯一性、提高代码可读性和逻辑分层。最后,作者提出了一个关于包名的问题,引发读者思考。
Java/Spring项目的包开头为什么是com?