SpringBoot整合Kafka简单配置实现生产消费

简介: 本文基于SpringBoot整合Kafka,通过简单配置实现生产及消费,包括生产消费的配置说明、消费者偏移设置方式等。更多功能细节可参考

*本文基于SpringBoot整合Kafka,通过简单配置实现生产及消费,包括生产消费的配置说明、消费者偏移设置方式等。更多功能细节可参考

spring kafka 文档:https: //docs.spring.io/spring-kafka/docs/current/reference/html

前提条件

项目环境

  1. 创建Springboot项目。
  2. pom.xml文件中引入kafka依赖。
<dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
</dependencies>

创建Topic

创建topic命名为testtopic并指定2个分区。

./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic testtopic --partitions 2

配置信息

application.yml配置文件信息

spring:
  application:
    name: kafka_springboot
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      #ACK机制,默认为1 (0,1,-1)
      acks: -1
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        # 自定义分区策略
        partitioner:
          class: org.bg.kafka.PartitionPolicy

    consumer:
      #设置是否自动提交,默认为true
      enable-auto-commit: false
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #当一个新的消费组或者消费信息丢失后,在哪里开始进行消费。earliest:消费最早的消息。latest(默认):消费最近可用的消息。none:没有找到消费组消费数据时报异常。
      auto-offset-reset: latest
      #批量消费时每次poll的数量
      #max-poll-records: 5
    listener:
      #      当每一条记录被消费者监听器处理之后提交
      #      RECORD,
      #      当每一批数据被消费者监听器处理之后提交
      #      BATCH,
      #      当每一批数据被消费者监听器处理之后,距离上次提交时间大于TIME时提交
      #      TIME,
      #      当每一批数据被消费者监听器处理之后,被处理record数量大于等于COUNT时提交
      #      COUNT,
      #      #TIME | COUNT 有一个条件满足时提交
      #      COUNT_TIME,
      #      #当每一批数据被消费者监听器处理之后,手动调用Acknowledgment.acknowledge()后提交:
      #      MANUAL,
      #      # 手动调用Acknowledgment.acknowledge()后立即提交
      #      MANUAL_IMMEDIATE;
      ack-mode: manual
      #批量消费
      type: batch

更多配置信息查看KafkaProperties

生产消息

@Component
public class Producer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void send(String msg) {
        kafkaTemplate.send(new ProducerRecord<String, String>("testtopic", "key111", msg));
    }
}

生产自定义分区策略

package org.bg.kafka;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

public class PartitionPolicy implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = this.nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }


    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }

        return counter.getAndIncrement();
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

生产到指定分区

ProducerRecord有指定分区的构造方法,设置分区号
public ProducerRecord(String topic, Integer partition, K key, V value)

kafkaTemplate.send(new ProducerRecord<String, String>("testtopic",1, "key111", msg));

消费消息


/**
 * 自定义seek参考
 * https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek
 */
@Component
public class Consumer implements ConsumerSeekAware{


    @KafkaListener(topics = {"testtopic"},groupId = "test_group",clientIdPrefix = "bg",id = "testconsumer")
    public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment ack){
        System.out.println(records.size());
        System.out.println(records.toString());
        ack.acknowledge();
    }


    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        //按照时间戳设置偏移
        callback.seekToTimestamp(assignments.keySet(),1670233826705L);
        //设置偏移到最近
        callback.seekToEnd(assignments.keySet());
        //设置偏移到最开始
        callback.seekToBeginning(assignments.keySet());
        //指定 offset
        for (TopicPartition topicPartition : assignments.keySet()) {
            callback.seek(topicPartition.topic(),topicPartition.partition(),0L);
        }

    }

}

offset设置方式

如代码所示,实现ConsumerSeekAware接口,设置offset几种方式:

  • 指定 offset,需要自己维护 offset,方便重试。
  • 指定从头开始消费。
  • 指定 offset 为最近可用的 offset (默认)。
  • 根据时间戳获取 offset,设置 offset。

代码仓库

https://gitee.com/codeWBG/learn_kafka

相关文章
|
2月前
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
83 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
28天前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
2月前
|
消息中间件 分布式计算 Java
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
31 2
|
2月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
70 2
|
2月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
152 0
|
4月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
128 3
|
4月前
|
消息中间件 Java Kafka
|
4月前
|
消息中间件 Java Kafka
|
4月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
113 0
|
4月前
|
消息中间件 Java Kafka
SpringBoot Kafka SSL接入点PLAIN机制收发消息
SpringBoot Kafka SSL接入点PLAIN机制收发消息
38 0