【Kafka】Kafka 新旧消费者的区别

简介: 【4月更文挑战第7天】【Kafka】Kafka 新旧消费者的区别

image.png

Kafka 中的消费者分为新旧两种类型,它们分别是基于不同版本的消费者 API 实现的。本文将详细介绍 Kafka 新旧消费者的区别,包括特性、用法、适用场景等方面,并提供示例代码加以说明。

1. 新旧消费者概述

1.1 旧消费者(Old Consumer)

旧消费者基于 Kafka 0.8.x 版本引入的消费者 API,主要包括 SimpleConsumerConsumerConnector 两个类。旧消费者 API 在设计上较为简单,但功能有限,不支持消费者组等高级特性。

1.2 新消费者(New Consumer)

新消费者是在 Kafka 0.9 版本引入的,使用 KafkaConsumer 类实现,提供了更强大、灵活的功能,并支持消费者组、自动提交位移、动态分区分配等高级特性。

2. 新旧消费者的区别

2.1 API 设计

  • 旧消费者: 使用 SimpleConsumerConsumerConnector 类进行消息消费,较为简单,但功能有限。
  • 新消费者: 使用 KafkaConsumer 类进行消息消费,提供了更丰富、灵活的功能,并支持消费者组等高级特性。

2.2 功能特性

  • 旧消费者:

    • 不支持消费者组。
    • 不支持自动位移提交。
    • 不支持动态分区分配。
    • 不支持自动重平衡。
  • 新消费者:

    • 支持消费者组,可以实现水平扩展和高可用性。
    • 支持自动提交位移,简化了位移管理。
    • 支持动态分区分配,能够自动处理分区的增加和减少。
    • 支持自动重平衡,确保消费者组内各个消费者之间的负载均衡。

2.3 高级特性

  • 旧消费者: 缺乏高级特性,适用于简单的消息消费场景。
  • 新消费者: 提供了丰富的高级特性,适用于复杂的消息处理场景,如实现 Exactly Once 语义等。

2.4 兼容性

  • 旧消费者: 由于基于较旧的消费者 API,可能在后续版本的 Kafka 中逐渐被淘汰。
  • 新消费者: 是 Kafka 推荐的消费者 API,具有较好的兼容性和稳定性。

3. 示例代码

下面分别演示使用旧消费者和新消费者消费 Kafka 消息的示例代码:

3.1 旧消费者示例代码

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class OldConsumerExample {
   
   
    public static void main(String[] args) {
   
   
        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "test-group");

        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(props));

        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put("test-topic", 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

        KafkaStream<byte[], byte[]> stream = consumerMap.get("test-topic").get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();

        while (it.hasNext()) {
   
   
            MessageAndMetadata<byte[], byte[]> message = it.next();
            System.out.println(new String(message.message()));
        }

        consumer.shutdown();
    }
}

3.2 新消费者示例代码

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class NewConsumerExample {
   
   
    public static void main(String[] args) {
   
   
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-topic"));

        while (true) {
   
   
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach(record -> {
   
   
                System.out.println(record.value());
            });
        }
    }
}
相关文章
|
5月前
|
消息中间件 Linux Kafka
linux命令使用消费kafka的生产者、消费者
linux命令使用消费kafka的生产者、消费者
262 16
|
7月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
304 10
|
8月前
|
消息中间件 Kafka
【赵渝强老师】Kafka的消费者与消费者组
Kafka消费者是从Kafka集群中消费数据的客户端。单消费者模型在数据生产速度超过消费速度时会导致数据堆积。为解决此问题,Kafka引入了消费者组的概念,允许多个消费者共同消费同一主题的消息。消费者组由一个或多个消费者组成,它们动态分配和重新分配主题分区,确保消息处理的高效性和可靠性。视频讲解及示意图详细展示了这一机制。
169 1
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
209 62
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
255 58
|
11月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
363 2
|
11月前
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
742 1
|
消息中间件 存储 监控
说说如何解决RocketMq消息积压?为什么Kafka性能比RocketMq高?它们区别是什么?
【10月更文挑战第8天】在分布式系统中,消息队列扮演着至关重要的角色,它不仅能够解耦系统组件,还能提供异步处理、流量削峰和消息持久化等功能。在众多的消息队列产品中,RocketMQ和Kafka无疑是其中的佼佼者。本文将围绕如何解决RocketMQ消息积压、为什么Kafka性能比RocketMQ高以及它们之间的区别进行深入探讨。
488 1
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
150 1
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
532 3

热门文章

最新文章