Spring Boot整合kafka

简介: 本文简要记录了Spring Boot与Kafka的整合过程。首先通过Docker搭建Kafka环境,包括Zookeeper和Kafka服务的配置文件。接着引入Spring Kafka依赖,并在`application.properties`中配置生产者和消费者参数。随后创建Kafka配置类,定义Topic及重试机制。最后实现生产者发送消息和消费者监听消息的功能,支持手动ACK确认。此方案适用于快速构建基于Spring Boot的Kafka消息系统。

Spring Boot整合kafka

此处简单记录一下 SpringBoot 和 Kafka 的整合。 先初始化一个SpringBoot工程

搭建kafka环境

这里从用docker方式搭建kafka,kafka需要注册到注册中心上,所以要先安装zookeeper zookeeper的docker-compose.yaml文件

yaml

代码解读

复制代码

version: '3.1'

services:
  zookeeper:
    image: zookeeper
    container_name: zookeeper
    volumes:
      - ./config:/conf
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1

接下来是kafka的docker-compose.yaml文件

yaml

代码解读

复制代码

version: '3.7'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    volumes:
      - ./data:/data
    ports:
      - 2182:2181

  kafka:
    image: wurstmeister/kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_CREATE_TOPICS: "kafeidou:2:0"   #kafka启动后初始化一个有2个partition(分区)0个副本名叫kafeidou的topic
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    volumes:
      - ./kafka-logs:/kafka
    depends_on:
      - zookeeper

引入依赖

xml

代码解读

复制代码

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.properties中添加配置

Propreties

代码解读

复制代码

# 生产者配置
spring.kafka.bootstrap-servers=127.0.0.1:14993
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=1
# key的序列化方式为String
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# value的序列化方式为字节数组
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 消费者配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.consumer.group-id=story-has-you
# 自动偏移量设置
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者改成手动提交
spring.kafka.consumer.enable-auto-commit=false
# 手动ack
spring.kafka.advertised.listener.ack-mode=manual
spring.kafka.listener.ack-mode=manual
spring.kafka.listener.missing-topics-fatal=false

新增配置类

kafkaConfig.java

java

代码解读

复制代码

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

/**
 * The type Kafka config.
 *
 * @author fangxi created by 2021/6/15
 */
@Configuration
public class KafkaConfig {

    /**
     * The Kafka template.
     */
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    /**
     * 初始化topic
     *
     * @return the new topic
     */
    @Bean
    public NewTopic changChunFawRealFaultInfo() {
        return new NewTopic("story-has-you", 1, (short) 1);
    }


    /**
     * 配置kafka的重试次数
     *
     * @param configurer           the configurer
     * @param kafkaConsumerFactory the kafka consumer factory
     * @return the concurrent kafka listener container factory
     */
    @Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        // 最大重试次数3次
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0, 3)));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

生产者发送

java

代码解读

复制代码

@Autowired
private KafkaTemplate<String, byte[]> kafkaTemplate;

kafkaTemplate.send("story-has-you", "hello");

消费者监听

java

代码解读

复制代码


/**
    * On message.
    *
    * @param records the records
    */
@KafkaListener(topics = "story-has-you", groupId = "story-has-you", containerFactory = "kafkaListenerContainerFactory")
public void qingDaoMessage(ConsumerRecord<String, byte[]> records, Acknowledgment ack) {
    try {
		String data = new String(records.value());
        if (data == null) {
            return;
        }
		// 打印hello
        log.info("从kafka接收到消息, {}", data)
    } catch (Exception e) {
        log.error("kafka处理消息异常", e);
    } finally {
		// 手动ack, 通知kafka已经消费
        ack.acknowledge();
    }

}


转载来源:https://juejin.cn/post/7225809587676004413

相关文章
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
144 10
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
427 5
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
191 1
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
167 2
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
205 0
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
17746 1
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
spring集成kafka
一、添加依赖项 compile 'org.springframework.kafka:spring-kafka:1.2.2.RELEASE'   二、发消息(生产者) 2.1 xml配置 1 2 6 7 8 9 ...
1628 0
|
13天前
|
SpringBoot自动配置的原理是什么?
Spring Boot自动配置核心在于@EnableAutoConfiguration注解,它通过@Import导入配置选择器,加载META-INF/spring.factories中定义的自动配置类。这些类根据@Conditional系列注解判断是否生效。但Spring Boot 3.0后已弃用spring.factories,改用新格式的.imports文件进行配置。
105 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问