Spring Boot整合kafka

简介: 本文简要记录了Spring Boot与Kafka的整合过程。首先通过Docker搭建Kafka环境,包括Zookeeper和Kafka服务的配置文件。接着引入Spring Kafka依赖,并在`application.properties`中配置生产者和消费者参数。随后创建Kafka配置类,定义Topic及重试机制。最后实现生产者发送消息和消费者监听消息的功能,支持手动ACK确认。此方案适用于快速构建基于Spring Boot的Kafka消息系统。

Spring Boot整合kafka

此处简单记录一下 SpringBoot 和 Kafka 的整合。 先初始化一个SpringBoot工程

搭建kafka环境

这里从用docker方式搭建kafka,kafka需要注册到注册中心上,所以要先安装zookeeper zookeeper的docker-compose.yaml文件

yaml

代码解读

复制代码

version: '3.1'

services:
  zookeeper:
    image: zookeeper
    container_name: zookeeper
    volumes:
      - ./config:/conf
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1

接下来是kafka的docker-compose.yaml文件

yaml

代码解读

复制代码

version: '3.7'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    volumes:
      - ./data:/data
    ports:
      - 2182:2181

  kafka:
    image: wurstmeister/kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_CREATE_TOPICS: "kafeidou:2:0"   #kafka启动后初始化一个有2个partition(分区)0个副本名叫kafeidou的topic
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    volumes:
      - ./kafka-logs:/kafka
    depends_on:
      - zookeeper

引入依赖

xml

代码解读

复制代码

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.properties中添加配置

Propreties

代码解读

复制代码

# 生产者配置
spring.kafka.bootstrap-servers=127.0.0.1:14993
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=1
# key的序列化方式为String
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# value的序列化方式为字节数组
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 消费者配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.consumer.group-id=story-has-you
# 自动偏移量设置
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者改成手动提交
spring.kafka.consumer.enable-auto-commit=false
# 手动ack
spring.kafka.advertised.listener.ack-mode=manual
spring.kafka.listener.ack-mode=manual
spring.kafka.listener.missing-topics-fatal=false

新增配置类

kafkaConfig.java

java

代码解读

复制代码

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

/**
 * The type Kafka config.
 *
 * @author fangxi created by 2021/6/15
 */
@Configuration
public class KafkaConfig {

    /**
     * The Kafka template.
     */
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    /**
     * 初始化topic
     *
     * @return the new topic
     */
    @Bean
    public NewTopic changChunFawRealFaultInfo() {
        return new NewTopic("story-has-you", 1, (short) 1);
    }


    /**
     * 配置kafka的重试次数
     *
     * @param configurer           the configurer
     * @param kafkaConsumerFactory the kafka consumer factory
     * @return the concurrent kafka listener container factory
     */
    @Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        // 最大重试次数3次
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0, 3)));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

生产者发送

java

代码解读

复制代码

@Autowired
private KafkaTemplate<String, byte[]> kafkaTemplate;

kafkaTemplate.send("story-has-you", "hello");

消费者监听

java

代码解读

复制代码


/**
    * On message.
    *
    * @param records the records
    */
@KafkaListener(topics = "story-has-you", groupId = "story-has-you", containerFactory = "kafkaListenerContainerFactory")
public void qingDaoMessage(ConsumerRecord<String, byte[]> records, Acknowledgment ack) {
    try {
		String data = new String(records.value());
        if (data == null) {
            return;
        }
		// 打印hello
        log.info("从kafka接收到消息, {}", data)
    } catch (Exception e) {
        log.error("kafka处理消息异常", e);
    } finally {
		// 手动ack, 通知kafka已经消费
        ack.acknowledge();
    }

}


转载来源:https://juejin.cn/post/7225809587676004413

相关文章
|
2月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
100 10
|
3月前
|
消息中间件 Java Kafka
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
|
6月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
280 5
|
6月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
135 1
|
7月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
136 2
|
9月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
172 0
|
4月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
7月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
281 1
|
7月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
168 1
|
9月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
539 9