【Kafka】Kafka 新旧消费者的区别

简介: 【4月更文挑战第7天】【Kafka】Kafka 新旧消费者的区别

image.png

Kafka 中的消费者分为新旧两种类型,它们分别是基于不同版本的消费者 API 实现的。本文将详细介绍 Kafka 新旧消费者的区别,包括特性、用法、适用场景等方面,并提供示例代码加以说明。

1. 新旧消费者概述

1.1 旧消费者(Old Consumer)

旧消费者基于 Kafka 0.8.x 版本引入的消费者 API,主要包括 SimpleConsumerConsumerConnector 两个类。旧消费者 API 在设计上较为简单,但功能有限,不支持消费者组等高级特性。

1.2 新消费者(New Consumer)

新消费者是在 Kafka 0.9 版本引入的,使用 KafkaConsumer 类实现,提供了更强大、灵活的功能,并支持消费者组、自动提交位移、动态分区分配等高级特性。

2. 新旧消费者的区别

2.1 API 设计

  • 旧消费者: 使用 SimpleConsumerConsumerConnector 类进行消息消费,较为简单,但功能有限。
  • 新消费者: 使用 KafkaConsumer 类进行消息消费,提供了更丰富、灵活的功能,并支持消费者组等高级特性。

2.2 功能特性

  • 旧消费者:

    • 不支持消费者组。
    • 不支持自动位移提交。
    • 不支持动态分区分配。
    • 不支持自动重平衡。
  • 新消费者:

    • 支持消费者组,可以实现水平扩展和高可用性。
    • 支持自动提交位移,简化了位移管理。
    • 支持动态分区分配,能够自动处理分区的增加和减少。
    • 支持自动重平衡,确保消费者组内各个消费者之间的负载均衡。

2.3 高级特性

  • 旧消费者: 缺乏高级特性,适用于简单的消息消费场景。
  • 新消费者: 提供了丰富的高级特性,适用于复杂的消息处理场景,如实现 Exactly Once 语义等。

2.4 兼容性

  • 旧消费者: 由于基于较旧的消费者 API,可能在后续版本的 Kafka 中逐渐被淘汰。
  • 新消费者: 是 Kafka 推荐的消费者 API,具有较好的兼容性和稳定性。

3. 示例代码

下面分别演示使用旧消费者和新消费者消费 Kafka 消息的示例代码:

3.1 旧消费者示例代码

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class OldConsumerExample {
   
   
    public static void main(String[] args) {
   
   
        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "test-group");

        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(props));

        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put("test-topic", 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

        KafkaStream<byte[], byte[]> stream = consumerMap.get("test-topic").get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();

        while (it.hasNext()) {
   
   
            MessageAndMetadata<byte[], byte[]> message = it.next();
            System.out.println(new String(message.message()));
        }

        consumer.shutdown();
    }
}

3.2 新消费者示例代码

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class NewConsumerExample {
   
   
    public static void main(String[] args) {
   
   
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-topic"));

        while (true) {
   
   
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach(record -> {
   
   
                System.out.println(record.value());
            });
        }
    }
}
相关文章
|
17天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
48 2
|
16天前
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
61 1
|
3月前
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
136 62
|
1月前
|
消息中间件 存储 监控
说说如何解决RocketMq消息积压?为什么Kafka性能比RocketMq高?它们区别是什么?
【10月更文挑战第8天】在分布式系统中,消息队列扮演着至关重要的角色,它不仅能够解耦系统组件,还能提供异步处理、流量削峰和消息持久化等功能。在众多的消息队列产品中,RocketMQ和Kafka无疑是其中的佼佼者。本文将围绕如何解决RocketMQ消息积压、为什么Kafka性能比RocketMQ高以及它们之间的区别进行深入探讨。
70 1
|
3月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
130 58
|
1月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
32 1
|
3月前
|
消息中间件 存储 监控
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别,设计目标、适用场景、吞吐量、消息存储和持久化、可靠性、集群负载均衡
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别
|
3月前
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
83 3
|
3月前
|
图形学 C# 开发者
全面掌握Unity游戏开发核心技术:C#脚本编程从入门到精通——详解生命周期方法、事件处理与面向对象设计,助你打造高效稳定的互动娱乐体验
【8月更文挑战第31天】Unity 是一款强大的游戏开发平台,支持多种编程语言,其中 C# 最为常用。本文介绍 C# 在 Unity 中的应用,涵盖脚本生命周期、常用函数、事件处理及面向对象编程等核心概念。通过具体示例,展示如何编写有效的 C# 脚本,包括 Start、Update 和 LateUpdate 等生命周期方法,以及碰撞检测和类继承等高级技巧,帮助开发者掌握 Unity 脚本编程基础,提升游戏开发效率。
75 0
|
4月前
|
消息中间件 存储 负载均衡
深入理解Kafka核心设计及原理(三):消费者
深入理解Kafka核心设计及原理(三):消费者
89 8