初始 Kafka Consumer 消费者

简介: 初始 Kafka Consumer 消费者


1、KafkaConsumer 概述


根据 KafkaConsumer 类上的注释上来看 KafkaConsumer 具有如下特征:


  • 在 Kafka 中 KafkaConsumer 是线程不安全的。


  • 2.2.1 版本的KafkaConsumer 兼容 kafka 0.10.0 和 0.11.0 等低版本。


  • 消息偏移量与消费偏移量(消息消费进度)
    Kafka 为分区中的每一条消息维护一个偏移量,即消息偏移量。这个偏移量充当该分区内记录的唯一标识符。消费偏移量(消息消费进度)存储的是消费组当前的处理进度。消息消费进度的提交在 kafka 中可以定时自动提交也可以手动提交。手动提交可以调用 commitSync() 或 commitAsync 方法。


  • 消费组 与 订阅关系
    多个消费这可以同属于一个消费组,消费组内的所有消费者共同消费主题下的所有消息。一个消费组可以订阅多个主题。


  • 队列负载机制
    既然同一个消费组内的消费者共同承担主题下所有队列的消费,那他们如何进行分工呢?默认情况下采取平均分配,例如一个消费组有两个消费者c1、c2,一个 topic 的分区数为6,那 c1 会负责3个分区的消费,同样 c2 会负责另外3个分区的分配。
    那如果其中一个消费者宕机或新增一个消费者,那队列能动态调整吗?
    答案是会重新再次平衡,例如如果新增一个消费者 c3,则c1,c2,c3都会负责2个分区的消息消费,分区重平衡会在后续文章中重点介绍。消费者也可以通过 assign 方法手动指定分区,此时会禁用默认的自动分配机制。


  • 消费者故障检测机制
    当通过 subscribe 方法订阅某些主题时,此时该消费者还未真正加入到订阅组,只有当 consumeer#poll 方法被调用后,并且会向 broker 定时发送心跳包,如果 broker 在 session.timeout.ms 时间内未收到心跳包,则 broker 会任务该消费者已宕机,会将其剔除,并触发消费端的分区重平衡。
    消费者也有可能遇到“活体锁”的情况,即它继续发送心跳,但没有任何进展。在这种情况下,为了防止消费者无限期地占用它的分区,可以使用max.poll.interval.ms 设置提供了一个活性检测机制。基本上,如果您调用轮询的频率低于配置的最大间隔,那么客户机将主动离开组,以便另一个消费者可以接管它的分区。当这种情况发生时,您可能会看到一个偏移提交失败(由调用{@link #commitSync()}抛出的{@link CommitFailedException}表示)。


  • kafka 对 poll loop 行为的控制参数
    Kafka 提供了如下两个参数来控制 poll 的行为:
  • max.poll.interval.ms
    允许 两次调用 poll 方法的最大间隔,即设置每一批任务最大的处理时间。
  • max.poll.records
    每一次 poll 最大拉取的消息条数。


  • 对于消息处理时间不可预测的情况下上述两个参数可能不够用,那将如何是好呢?

  • 通常的建议将消息拉取与消息消费分开,一个线程负责 poll 消息,处理这些消息使用另外的线程,这里就需要手动提交消费进度。为了控制消息拉起的过快,您可能会需要用到 Consumer#pause(Collection) 方法,暂时停止向该分区拉起消息。RocketMQ 的推模式就是采用了这种策略。如果大家有兴趣的话,可以从笔者所著的《RocketMQ技术内幕》一书中详细了解。


2、KafkaConsume 使用示例


2.1 自动提交消费进度


public static void testConsumer1() {
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072");
    props.setProperty("group.id", "C_ODS_ORDERCONSUME_01");
    props.setProperty("enable.auto.commit", "true");
    props.setProperty("auto.commit.interval.ms", "1000");
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("TOPIC_ORDER"));
    while (true) {
        ConsumerRecords<String, String>  records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("消息消费中");
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}


2.2 手动提交消费进度


public static void testConsumer2() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
        final int minBatchSize = 200;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
                // insertIntoDb(buffer);
                // 省略处理逻辑
                consumer.commitSync();
                buffer.clear();
            }
        }
    }


3、认识 Consumer 接口


要认识 Kafka 的消费者,个人认为最好的办法就是从它的类图着手,下面给出 Consumer 接口的类图。

62b7a0afef4482e03f336b0cec49ce2e.jpg

接下来对起重点方法进行一个初步的介绍,从下篇文章开始将对其进行详细设计。


  • Set< TopicPartition> assignment()
    获取该消费者的队列分配列表。
  • Set< String> subscription()
    获取该消费者的订阅信息。
  • void subscribe(Collection< String> topics)
    订阅主题。
  • void subscribe(Collection< String> topics, ConsumerRebalanceListener callback)
    订阅主题,并指定队列重平衡的监听器。
  • void assign(Collection< TopicPartition> partitions)
    取代 subscription,手动指定消费哪些队列。
  • void unsubscribe()
    取消订阅关系。
  • ConsumerRecords poll(Duration timeout)
    拉取消息,是 KafkaConsumer 的核心方法,将在下文详细介绍。
  • void commitSync()
    同步提交消费进度,为本批次的消费提交,将在后续文章中详细介绍。
  • void commitSync(Duration timeout)
    同步提交消费进度,可设置超时时间。
  • void commitSync(Map offsets)
    显示同步提交消费进度, offsets 指明需要提交消费进度的信息。
  • void commitSync(final Map offsets, final Duration timeout)
    显示同步提交消费进度,带超时间。
  • void seek(TopicPartition partition, long offset)
    重置 consumer#poll 方法下一次拉消息的偏移量。
  • void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)
    seek 方法重载方法。
  • void seekToBeginning(Collection< TopicPartition> partitions)
    将 poll 方法下一次的拉取偏移量设置为队列的初始偏移量。
  • void seekToEnd(Collection< TopicPartition> partitions)
    将 poll 方法下一次的拉取偏移量设置为队列的最大偏移量。
  • long position(TopicPartition partition)
    获取将被拉取的偏移量。
  • long position(TopicPartition partition, final Duration timeout)
    同上。
  • OffsetAndMetadata committed(TopicPartition partition)
    获取指定分区已提交的偏移量。
  • OffsetAndMetadata committed(TopicPartition partition, final Duration timeout)
    同上。
  • Map metrics()
    统计指标。
  • List< PartitionInfo> partitionsFor(String topic)
    获取主题的路由信息。
  • List< PartitionInfo> partitionsFor(String topic, Duration timeout)
    同上。
  • Map> listTopics()
    获取所有 topic 的路由信息。
  • Map> listTopics(Duration timeout)
    同上。
  • Set< TopicPartition> paused()
    获取已挂起的分区信息。
  • void pause(Collection< TopicPartition> partitions)
    挂起分区,下一次 poll 方法将不会返回这些分区的消息。
  • void resume(Collection< TopicPartition> partitions)
    恢复挂起的分区。
  • Map offsetsForTimes(Map timestampsToSearch)
    根据时间戳查找最近的一条消息的偏移量。
  • Map offsetsForTimes(Map timestampsToSearch, Duration timeout)
    同上。
  • Map beginningOffsets(Collection< TopicPartition> partitions)
    查询指定分区当前最小的偏移量。
  • Map beginningOffsets(Collection< TopicPartition> partitions, Duration timeout)
    同上。
  • Map endOffsets(Collection< TopicPartition> partitions)
    查询指定分区当前最大的偏移量。
  • Map endOffsets(Collection< TopicPartition> partitions, Duration timeout)
    同上。
  • void close()
    关闭消费者。
  • void close(Duration timeout)
    关闭消费者。
  • void wakeup()
    唤醒消费者。


4、初始 KafkaConsumer


82007e144a40b494330d75cb491447f7.jpg

接下来笔者根据其构造函数,对一一介绍其核心属性的含义,为接下来讲解其核心方法打下基础。


  • String groupId
    消费组ID。同一个消费组内的多个消费者共同消费一个主题下的消息。
  • String clientId
    发出请求时传递给服务器的id字符串。设置该值的目的是方便在服务器端请求日志中包含逻辑应用程序名称,从而能够跟踪ip/端口之外的请求源。该值可以设置为应用名称。
  • ConsumerCoordinator coordinator
    消费协调器,后续会详细介绍。
  • Deserializer< K> keyDeserializer
    key 序列化器。
  • Deserializer< V> valueDeserializer
    值序列化器。
  • ConsumerNetworkClient client
    网络通讯客户端。
  • SubscriptionState subscriptions
    用于管理订阅状态的类,用于跟踪 topics, partitions, offsets 等信息。后续会详细介绍。
  • ConsumerMetadata metadata
    消费者元数据信息,包含路由信息。
  • long retryBackoffMs
    如果向 broker 发送请求失败后,发起重试之前需要等待的间隔时间,通过属性 retry.backoff.ms 指定。
  • long requestTimeoutMs
    一次请求的超时时间。
  • int defaultApiTimeoutMs
    为所有可能阻塞的API设置一个默认的超时时间。
  • List< PartitionAssignor> assignors
    分区分配算法(分区负载算法)。


Kafka Consumer 消费者就介绍到这里了,从下篇文章开始将开始详细介绍 Kafka 关于消息消费的方方面面。

相关文章
|
23天前
|
消息中间件 Kafka
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
|
1月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
79 2
|
4月前
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
142 62
|
4月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
136 58
|
2月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
81 4
|
2月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
44 1
|
3月前
|
消息中间件 安全 大数据
Kafka多线程Consumer是实现高并发数据处理的有效手段之一
【9月更文挑战第2天】Kafka多线程Consumer是实现高并发数据处理的有效手段之一
335 4
|
4月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
117 4
|
4月前
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
132 3
下一篇
DataWorks