Spring Boot整合kafka

简介: 本文简要记录了Spring Boot与Kafka的整合过程。首先通过Docker搭建Kafka环境,包括Zookeeper和Kafka服务的配置文件。接着引入Spring Kafka依赖,并在`application.properties`中配置生产者和消费者参数。随后创建Kafka配置类,定义Topic及重试机制。最后实现生产者发送消息和消费者监听消息的功能,支持手动ACK确认。此方案适用于快速构建基于Spring Boot的Kafka消息系统。

Spring Boot整合kafka

此处简单记录一下 SpringBoot 和 Kafka 的整合。 先初始化一个SpringBoot工程

搭建kafka环境

这里从用docker方式搭建kafka,kafka需要注册到注册中心上,所以要先安装zookeeper zookeeper的docker-compose.yaml文件

yaml

代码解读

复制代码

version: '3.1'

services:
  zookeeper:
    image: zookeeper
    container_name: zookeeper
    volumes:
      - ./config:/conf
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1

接下来是kafka的docker-compose.yaml文件

yaml

代码解读

复制代码

version: '3.7'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    volumes:
      - ./data:/data
    ports:
      - 2182:2181

  kafka:
    image: wurstmeister/kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_CREATE_TOPICS: "kafeidou:2:0"   #kafka启动后初始化一个有2个partition(分区)0个副本名叫kafeidou的topic
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    volumes:
      - ./kafka-logs:/kafka
    depends_on:
      - zookeeper

引入依赖

xml

代码解读

复制代码

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.properties中添加配置

Propreties

代码解读

复制代码

# 生产者配置
spring.kafka.bootstrap-servers=127.0.0.1:14993
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=1
# key的序列化方式为String
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# value的序列化方式为字节数组
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 消费者配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.consumer.group-id=story-has-you
# 自动偏移量设置
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者改成手动提交
spring.kafka.consumer.enable-auto-commit=false
# 手动ack
spring.kafka.advertised.listener.ack-mode=manual
spring.kafka.listener.ack-mode=manual
spring.kafka.listener.missing-topics-fatal=false

新增配置类

kafkaConfig.java

java

代码解读

复制代码

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

/**
 * The type Kafka config.
 *
 * @author fangxi created by 2021/6/15
 */
@Configuration
public class KafkaConfig {

    /**
     * The Kafka template.
     */
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    /**
     * 初始化topic
     *
     * @return the new topic
     */
    @Bean
    public NewTopic changChunFawRealFaultInfo() {
        return new NewTopic("story-has-you", 1, (short) 1);
    }


    /**
     * 配置kafka的重试次数
     *
     * @param configurer           the configurer
     * @param kafkaConsumerFactory the kafka consumer factory
     * @return the concurrent kafka listener container factory
     */
    @Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        // 最大重试次数3次
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0, 3)));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

生产者发送

java

代码解读

复制代码

@Autowired
private KafkaTemplate<String, byte[]> kafkaTemplate;

kafkaTemplate.send("story-has-you", "hello");

消费者监听

java

代码解读

复制代码


/**
    * On message.
    *
    * @param records the records
    */
@KafkaListener(topics = "story-has-you", groupId = "story-has-you", containerFactory = "kafkaListenerContainerFactory")
public void qingDaoMessage(ConsumerRecord<String, byte[]> records, Acknowledgment ack) {
    try {
		String data = new String(records.value());
        if (data == null) {
            return;
        }
		// 打印hello
        log.info("从kafka接收到消息, {}", data)
    } catch (Exception e) {
        log.error("kafka处理消息异常", e);
    } finally {
		// 手动ack, 通知kafka已经消费
        ack.acknowledge();
    }

}


转载来源:https://juejin.cn/post/7225809587676004413

相关文章
消息中间件 Java Kafka
403 0
|
4月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
341 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
10月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
547 10
|
11月前
|
消息中间件 Java Kafka
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
216 5
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
674 5
|
消息中间件 Java Kafka
spring boot 集成kafka
spring boot 集成kafka
256 0
spring boot 集成kafka
|
消息中间件 Java Kafka
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
18061 1
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
|
XML 消息中间件 Apache
spring集成kafka
一、添加依赖项 compile 'org.springframework.kafka:spring-kafka:1.2.2.RELEASE'   二、发消息(生产者) 2.1 xml配置 1 2 6 7 8 9 ...
1677 0
|
3月前
|
Java 测试技术 数据库连接
【SpringBoot(四)】还不懂文件上传?JUnit使用?本文带你了解SpringBoot的文件上传、异常处理、组件注入等知识!并且带你领悟JUnit单元测试的使用!
Spring专栏第四章,本文带你上手 SpringBoot 的文件上传、异常处理、组件注入等功能 并且为你演示Junit5的基础上手体验
945 2
|
3月前
|
JavaScript Java Maven
【SpringBoot(二)】带你认识Yaml配置文件类型、SpringMVC的资源访问路径 和 静态资源配置的原理!
SpringBoot专栏第二章,从本章开始正式进入SpringBoot的WEB阶段开发,本章先带你认识yaml配置文件和资源的路径配置原理,以方便在后面的文章中打下基础
424 3