Java整合Kafka实现生产及消费

简介: Java整合Kafka实现生产及消费

前提条件

  1. 创建maven项目。
  2. pom.xml文件中引入kafka依赖。
    <dependencies>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.11</artifactId>
             <version>2.1.0</version>
         </dependency>
    </dependencies>
    

    创建Topic

    创建topic命名为testtopic并指定2个分区。
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic testtopic --partitions 2

生产消息

public class Producer {
   
    public static void main(String[] args) throws ExecutionException, InterruptedException {
   
        // 生产参数配置
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        int i=0;
        while (true) {
   
            //生产消息
            Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<String, String>("testtopic", "key"+i, "value"+i));
            //获取生产的数据信息
            RecordMetadata recordMetadata = future.get();
            System.out.println("time:"+recordMetadata.timestamp()+" key:"+i+" value:"+i+" partition:"+recordMetadata.partition()+" offset:"+recordMetadata.offset());
            Thread.sleep(1000);
            i+=1;
        }
    }
}

生产者参数配置

// ACK机制,默认为1 (0,1,-1)
properties.setProperty(ProducerConfig.ACKS_CONFIG, "");
// Socket发送消息缓冲区大小,默认为128K,设置为-1代表操作系统的默认值
properties.setProperty(ProducerConfig.SEND_BUFFER_CONFIG, ""); 
// Socket接收消息缓冲区大小,默认为32K,设置为-1代表操作系统的默认值
properties.setProperty(ProducerConfig.RECEIVE_BUFFER_CONFIG, ""); 
// 生产者客户端发送消息的最大值,默认1M
properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, ""); 
// 发送消息异常时重试次数,默认为0
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "");   
// 重试间隔时间,默认100
properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "");    
// 生产消息自定义分区策略类
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "");
// 开启幂等 ,默认true
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "");

更多配置信息查看ProducerConfig类

生产自定义分区策略

  1. 创建分区策略类,实现org.apache.kafka.clients.producer.Partitioner接口,编写具体策略。
public class PartitionPolicy implements Partitioner {
   

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
   
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
   
            int nextValue = this.nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
   
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
   
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
   
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }


    private int nextValue(String topic) {
   
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
        if (null == counter) {
   
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
   
                counter = currentCounter;
            }
        }

        return counter.getAndIncrement();
    }

    @Override
    public void close() {
   

    }

    @Override
    public void configure(Map<String, ?> map) {
   

    }
}
  1. 参数配置。
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionPolicy.class.getName());

生产到指定分区

ProducerRecord有指定分区的构造方法,设置分区号
public ProducerRecord(String topic, Integer partition, K key, V value)

Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<String, String>("testtopic", 1, "key"+i, "value"+i));

消费消息

public class Consumer {
   
    public static void main(String[] args) throws InterruptedException {
   
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //约定的编解码
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
        //默认为自动提交
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        //当设置为自动提交时,默认5秒自动提交
        //properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
        //
        //properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "5000");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        //订阅topic
        kafkaConsumer.subscribe(Arrays.asList("testtopic"));
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        ConsumerRecords<String, String> records = null;
        while (assignment.size() == 0) {
   
            records = kafkaConsumer.poll(Duration.ofMillis(100));
            assignment = kafkaConsumer.assignment();
        }
        /*//1.根据时间戳获取 offset,设置 offset
        Map<TopicPartition, Long> offsetsForTimes=new HashMap<>();
        for (TopicPartition topicPartition : assignment) {
            offsetsForTimes.put(topicPartition,1669972273941L);
        }
        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = kafkaConsumer.offsetsForTimes(offsetsForTimes);
        offsetAndTimestampMap.forEach((tp,offsettime)->{
            kafkaConsumer.seek(tp,offsettime.offset());
        });*/
        /*//2.指定从头开始消费
        kafkaConsumer.seekToBeginning(assignment);*/
        /*//3.指定从某offset开始消费
        kafkaConsumer.seek(tp,0);*/
        while (true) {
   
            if (records.isEmpty()) {
   
                Thread.sleep(3000);
            } else {
   
                System.out.printf("records count:" + records.count());
                Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
                while (iterator.hasNext()) {
   
                    ConsumerRecord<String, String> record = iterator.next();
                    System.out.println(" time:" + record.timestamp() + " key:" + record.key() + " value:" + record.value() + " partition:" + record.partition() + " offset:" + record.offset());
                }
                kafkaConsumer.commitSync();
            }
            records = kafkaConsumer.poll(Duration.ofMillis(0));
        }
    }
}

消费参数配置

// 消费者必须指定一个消费组
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "");
// 消费者每次最多POLL的数量
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "");
// 消费者POLL的时间间隔
properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_DOC, "");
// 设置是否自动提交,默认为true
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "");  
// 如果是自动提交,默认5s后提交,会发生丢失消息和重复消费情况
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "");   
// 当一个新的消费组或者消费信息丢失后,在哪里开始进行消费。earliest:消费最早的消息。latest(默认):消费最近可用的消息。none:没有找到消费组消费数据时报异常。

更多配置信息查看ConsumerConfig类

offset设置方式

如代码所示,设置offset的几种方式:

  • 指定 offset,需要自己维护 offset,方便重试。
  • 指定从头开始消费。
  • 指定 offset 为最近可用的 offset (默认)。
  • 根据时间戳获取 offset,设置 offset。

代码仓库

https://gitee.com/codeWBG/learn_kafka

相关文章
|
9天前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
31 8
|
9天前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
37 7
|
9天前
|
消息中间件 负载均衡 Java
"深入Kafka核心:探索高效灵活的Consumer机制,以Java示例展示数据流的优雅消费之道"
【8月更文挑战第10天】在大数据领域,Apache Kafka凭借其出色的性能成为消息传递与流处理的首选工具。Kafka Consumer作为关键组件,负责优雅地从集群中提取并处理数据。它支持消息的负载均衡与容错,通过Consumer Group实现消息的水平扩展。下面通过一个Java示例展示如何启动Consumer并消费数据,同时体现了Kafka Consumer设计的灵活性与高效性,使其成为复杂消费场景的理想选择。
37 4
|
9天前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
28 3
|
23天前
|
消息中间件 Java Kafka
Java 客户端访问kafka
Java 客户端访问kafka
29 9
|
23天前
|
消息中间件 Java Kafka
kafka Linux环境搭建安装及命令创建队列生产消费消息
kafka Linux环境搭建安装及命令创建队列生产消费消息
36 4
|
26天前
|
消息中间件 存储 缓存
面试题Kafka问题之Kafka的生产消费基本流程如何解决
面试题Kafka问题之Kafka的生产消费基本流程如何解决
30 1
|
1月前
|
消息中间件 存储 Kafka
微服务分布问题之Kafka分区的副本和分布如何解决
微服务分布问题之Kafka分区的副本和分布如何解决
|
1月前
|
消息中间件 Kafka 数据处理
实时计算 Flink版操作报错合集之使用kafka connector时,报错:java.lang.ClassNotFoundException,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
监控 负载均衡 Kubernetes
Java系统线上生产问题排查一把梭(下)
Java系统线上生产问题排查一把梭
197 0