深入理解Kafka核心设计及原理(三):消费者

简介: 深入理解Kafka核心设计及原理(三):消费者

转载请注明出处:https://www.cnblogs.com/zjdxr-up/p/16114877.html

  目录:

   3.1 消费者与消费组

    3.2 消息消费过程及代码

    3.3 消息消费模式

    3.4 位移提交

    3.5 位移提交过程导致重复消费的现象

    3.6 再均衡

    3.7Kafka消费端重要的参数    

 

3.1 消费者与消费组

  消费者(Consumer)负责订阅Kafka 中的主题(Topic), 并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是: 在Kafka 的消费理念中还有一层消费组(Consumer Group)的概念, 每个消费者都有一个对应的消费组。 当消息发布到主题后, 只会被投递给订阅它的每个消费组中的一个消费者。

  每一个分区只能被一个消费组中的一个消费者所消费。

  对于消息中间件而言,一般有两种消息投递模式:点对点(P2P, Point-to-Point)模式和发布/订阅(Pub/ Sub)模式。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。发布订阅模式定义了如何向 一个内容节点发布和订阅消息,这个内容节点称为主题(Topic) , 主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题中订阅消息。主题使得消息的订阅者和发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。Kafka 同时支待两种消息投递模式,而这正是得益于消费者与消费组模型的契合:

     如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。

     如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。

  消费组是一个逻辑上的概念,它将旗下的消费者归为 一类,每一个消费者只隶属于一个消费组。每一个消费组都会有一个固定的名称,消费者在进行消费前需要指定其所属消费组的名称,这个可以通过消费者客户端参数group.id来配置,默认值为空字符串。

3.2 消息消费过程及代码

  一个正常的消费逻辑需要具备以下几个步骤:

    (1) 配置消费者客户端参数及创建相应的消费者实例。

    (2) 订阅主题。

    (3)拉取消息并消费。

    (4) 提交消费位移。

    (5)关闭消费者实例。

public class KafkaConsumerAnalysis {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupid = "group.demo";
    public static final AtomicBoolean isRunning = new AtomicBoolean(true);
    public static Properties initConfig () {
        Properties props= new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS—CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.GROUP—ID_CONFIG, groupid);
        props. put (ConsumerConfig. CLIENT_ID _ CONFIG, "client. id. demo");
        return props;
    }
    public static void main(String[] args) (
        Properties props= initConfig();
        KafkaConsumer<String, String> consumer= new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
        try {
        while (isRunning. get()) {
            ConsumerRecords<String, String> records=
                consumer.poll(Duration.ofMillis(lOOO));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("topic="+record.topic()+ ", partition = "+         record.partition()+ ", offset="+ record.offset());
                System.out.println("key ="+record.key()+ ", value="+ record.value());
            //do something to process record.
        } catch(Exception e) {
            log.error("occur exception", e);
        } finally {
            consumer.close();
        }
    }
}

    通过 subscribe()方法订阅主题具有 消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。

    如果我们事先并不知道主题中有多少个分区怎么办?KafkaConsumer中的partitionsFor ()方法可以用来查询指定主题的元数据信息,partitionsFor()方法的具体定义如下:

public List<Partitioninfo> partitionsFor(String topic)

    其中 Partitionlnfo类型即为主题的分区元数据信息,此类的主要结构如下:

public class Partitioninfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
    private final Node[] offlineReplicas;
    //这里省略了构造函数、属性提取、toString等方法
}

    Partitioninfo类中的属性topic表示主题名称,partition代表分区编号,leader代表分区的leader副本所在的位置,replicas代表分区的AR集合,inSyncReplicas代表分区的ISR集合,offlineReplicas代表分区的OSR集合。

3.3 消息消费模式

    Kafka中的消费是基于 拉模式的。消息的消费一般有两种模式:推模式和 拉模式。推模式是服务端主动将消息推送给消费者, 而 拉模式是消费者主动向服务端发起请求来拉取消息。Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll()方法,而poll()方法返回的是所订阅的主题(分区)上的一组消息。 对于poll()方法而言,如果某些分区中没有可供消费的消息,那么此分区对应 的消息拉取的结果就为空;如果订阅的所有分区中都没有可供消费的消息, 那么poll()方法返回为空的消息集合

    消费者消费到 的每条消息的类型为ConsumerRecord(注意与ConsumerRecords 的区别,ConsumerRecords为一次获取到的消息集),这个和生产者发送的消息类型ProducerRecord相对应,不过ConsumerRecord中的内容更加丰富,具体的结构参考如下代码:

public class ConsumerRecord<K, V> {
    private final Stringtopic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final Headers headers;
    private final K key;
    private final V value;
    private volatile Long checksum;
//省略若干方法

    timestarnpType 有两种类型:CreateTime 和LogAppendTime, 分别代表消息创建的时间戳和消息追加到日志的时间戳。

3.4 位移提交

    对于 Kafka 中的分区而言,它的每条消息都有唯一 的 offset,用来表示消息在分区中对应 的位置 。 对于消费者而言 , 它也有一个 offset 的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位置。

    在每次调用 poll ()方法时,它返回的是还没有被消费过的消息集(当然这个前提是消息己经存储在 Kafka 中 了,并且暂不考虑异常情况的发生),要做到这一点,就需要记录上一 次消费时的消费位移 。 并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移 。

    在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的 。 而在新消费者客户端中,消费位移存储在 Kafka 内 部的主题consumer offsets 中 。 这里把将消费位移存储起来(持久化)的动作称为“提交’,消费者在消费完消息之后需要执行消费位移的提交。

    在Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable. auto. commit 配置,默认值为 true。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto. commit. interval. ms配置,默认值为 5 秒,此参数生效的前提是 enable. auto.commit 参数为 true 。

    在默认的方式下,消费者每隔 5 秒会将拉取到的每个分区中最大的消息位移进行提交 。自动位移提交的动作是在 poll()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。

3.5 位移提交过程导致重复消费的现象

    如果在业务逻辑处理完之后,并且在同步位移提交前,程序出现了崩渍 ,那么待恢复之后又只能从上一次位移提交的地方拉取消息,由此在两次位移提交的窗口中出现了重复消费的现象。

    KafkaConsumer 中的 seek()方法提供了追前消费或 回溯消费。

public void seek(TopicPartition partition ,long offset)

    seek()方法中的参数 partition 表示分区,而 offset 参数用来指定从分区的哪个位置开始消费。seek()方法只能重置消费者分配到的分区的消费位置,而分区的分配是在poll()方法的调用过程中实现的 。 也就是说,在执行 seek()方法之前需要先执行一次 poll ()方法 ,等到分配到分区之后才可以重置消费位置 。

KafkaConsumer <String ,String> consumer= new KafkaConsumer<> (props);
consumer.subscribe(Arrays.asList(topic));
consumer . poll(Duration.ofMillis(lOOOO));
Set<TopicPartition> assignment = consumer.assignment();
for(TopicPartition tp : assignment) {
    consumer.seek(tp , 10) ;
    while(true) {
    ConsumerRecords<String , String> records = consumer.poll(Duration.ofMillis (1000)) ;
    //consume the record .
}

    timeout参数用来设置等待获取的超时时间。如果没有指 定timeout参数的值, 那么endOffsets() 方 法 的 等 待时 间由客户端参 数request.timeout.ms来设置,默认值为30000。

    seek()方法为我们提供了从特定位置读取消息的能力,我们可以通过这个方法来向前跳过若干消息, 也可以通过这个方法来向后回溯若干消息, 这样为消息的消费提供了很大的灵活性。seek()方法也为我们提供了将消费位移保存在外部存储介质中的能力,还可以配合再均衡监听器来提供更加精准的消费能力。

3.6 再均衡

    再均衡是指分区的所属权从一个消费者转移到另一消费者的行为, 它为消费组具备高可用性和伸缩性提供保障, 使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。 不过在再均衡发生期间, 消费组内的消费者是无法读取消息的。 也就是说, 在再均衡发生期间的这一小段时间内, 消费组会变得不可用。 另外, 当 一个分区被重新分配给另 一个消费者时, 消费者当前的状态也会丢失。 比如消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再均衡操作, 之后这个分区又被分配给了消费组内的另 一个消费者,原来被消费完的那部分消息又被重新消费 一遍, 也就是发生了重复消费。 一般情况下, 应尽量避免不必要的再均衡的发生。

    subscribe()方法中有再均衡监听器ConsumerRebalanceListener, 在subscribe(Collection<String> topics, ConsumerRebalanceListener listener)和subscribe(Pattem pattern, ConsumerRebalanceListener listener)方法中都有它的身影。再均衡监听器用来设定发生再均衡动作前后的一些准备或收尾的动作。 ConsumerRebalanceListener是 一个接口.

3.7Kafka消费端重要的参数

参数名称 默认值 参数释义
bootstrap.servers “” 指定连接 Kafka 集群所需的 broker 地址清单
key.deserializer   消息key对应的反序列化类,需要实现org.apache.kafka.common.serialization.Deserializer接口
value.deserializer   消息key 所对应的反序列化类,需要实现org.apache.kafka.common.serialization.Deserializer接口
group.id "" 消费者所隶属的消费组的唯一标识,即消费组的名称
session. timeout.ms 10000 组管理协议中用来检测消费者是否失效的超时时间
max.poll.interval.ms 300000 消费组管理消费者时,该配置指定拉取消息线程最长空闲时间,若超过这个时 间间 隔还没有发起 poll 操作,则消费组认为该消费者己离开了消费组 ,将进行再均衡操作
auto.offset.reset latest 有效值为“ earliest ”" latest ” “ none”
enable.auto.commit true 是否开启自动提交消费位移的功能,默认开启
auto.commit.interval.ms 5000 当 enable.auto.commit 参数设置为 true 时才生效 ,表示开启自动提交消费位移功能 时自 动提交消费位移的时间间 隔
partition.assignment. strategy   消费者的分区分配策略
fetch .min.bytes 1( B ) Consumer 在一次拉取中从 Kafka 中拉取的最小数据量
fetch .max.bytes 50MB Consumer 在一次拉取中从 Kafka 中拉取的最大数据量
max.poll.records 500条 Consumer 在一次拉取请求中拉取的最大消息数
connections.max.idle.ms 9分钟 用来指定在多久之后关闭限制的连接
isolation.level read_ uncommitted 事务隔离级别。字符串类型,有效值为“ read_uncommitted ,和“ read committed ",表示消费者所消费到的位置,可以消费到 HW (High Watermark )处的位置

 

深入理解Kafka核心设计及原理(一):初始Kafka

深入理解Kafka核心设计及原理(二):生产者

深入理解Kafka核心设计及原理(三):消费者

深入理解Kafka核心设计及原理(四):主题管理

深入理解Kafka核心设计及原理(五):消息存储

深入理解Kafka核心设计及原理(六):Controller选举机制,分区副本leader选举机制,再均衡机制

标签: kafka

目录
相关文章
|
6天前
|
消息中间件 存储 缓存
大厂面试高频:Kafka 工作原理 ( 详细图解 )
本文详细解析了 Kafka 的核心架构和实现原理,消息中间件是亿级互联网架构的基石,大厂面试高频,非常重要,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka 工作原理 ( 详细图解 )
|
18天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
49 2
|
3月前
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
136 62
|
1月前
|
消息中间件 缓存 分布式计算
大数据-59 Kafka 高级特性 消息发送03-自定义拦截器、整体原理剖析
大数据-59 Kafka 高级特性 消息发送03-自定义拦截器、整体原理剖析
27 2
|
1月前
|
消息中间件 缓存 大数据
大数据-57 Kafka 高级特性 消息发送相关01-基本流程与原理剖析
大数据-57 Kafka 高级特性 消息发送相关01-基本流程与原理剖析
40 3
|
3月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
130 58
|
1月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
32 1
|
1月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
129 0
|
3月前
|
消息中间件 Kafka 数据库
深入理解Kafka的数据一致性原理及其与传统数据库的对比
【8月更文挑战第24天】在分布式系统中,确保数据一致性至关重要。传统数据库利用ACID原则保障事务完整性;相比之下,Kafka作为高性能消息队列,采用副本机制与日志结构确保数据一致性。通过同步所有副本上的数据、维护消息顺序以及支持生产者的幂等性操作,Kafka在不牺牲性能的前提下实现了高可用性和数据可靠性。这些特性使Kafka成为处理大规模数据流的理想工具。
81 6
|
3月前
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
86 3