Kafka 消费者解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Kafka 消费者解析

一、消费者相关概念
1.1 消费组&消费者
消费者:

消费者从订阅的主题消费消息,消费消息的偏移量保存在Kafka的名字是__consumer_offsets的主题中
消费者还可以将⾃⼰的偏移量存储到Zookeeper,需要设置offset.storage=zookeeper
推荐使⽤Kafka存储消费者的偏移量。因为Zookeeper不适合⾼并发。
消费组:

多个从同⼀个主题消费的消费者可以加⼊到⼀个消费组中
消费组中的消费者共享group_id。配置方法:configs.put("group.id", "xxx");
group_id⼀般设置为应⽤的逻辑名称。⽐如多个订单处理程序组成⼀个消费组,可以设置group_id为"order_process"
group_id通过消费者的配置指定:group.id=xxxxx
消费组均衡地给消费者分配分区,每个分区只由消费组中⼀个消费者消费
⼀个拥有四个分区的主题,包含⼀个消费者的消费组
此时,消费组中的消费者消费主题中的所有分区。并且没有重复的可能。

image.png

如果在消费组中添加⼀个消费者2,则每个消费者分别从两个分区接收消息
image.png

如果消费组有四个消费者,则每个消费者可以分配到⼀个分区
image.png

如果向消费组中添加更多的消费者,超过主题分区数量,则有⼀部分消费者就会闲置,不会接收任何消息
image.png

向消费组添加消费者是横向扩展消费能⼒的主要⽅式。
必要时,需要为主题创建⼤量分区,在负载增⻓时可以加⼊更多的消费者。但是不要让消费者的数量超过主题分区的数量。

image.png

除了通过增加消费者来横向扩展单个应⽤的消费能⼒之外,经常出现多个应⽤程序从同⼀个主题消费的情况。
此时,每个应⽤都可以获取到所有的消息。只要保证每个应⽤都有⾃⼰的消费组,就可以让它们获取到主题所有的消息。
横向扩展消费者和消费组不会对性能造成负⾯影响。

为每个需要获取⼀个或多个主题全部消息的应⽤创建⼀个消费组,然后向消费组添加消费者来横向扩展消费能⼒和应⽤的处理能⼒,则每个消费者只处理⼀部分消息。

1.2 心跳机制
初始的消费者消费分区:

image.png

消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区
image.png
由于broker宕机,主题X的分区3宕机,此时分区3没有Leader副本,触发再平衡,消费者4没有对应的主题分区,则消费者4闲置

image.png

Kafka 的⼼跳是 Kafka Consumer 和 Broker 之间的健康检查,只有当 Broker Coordinator 正常时,Consumer 才会发送⼼跳。

Consumer 和 Rebalance 相关的 2 个配置参数:

参数 字段
session.timeout.ms MemberMetadata.sessionTimeoutMs
max.poll.interval.ms MemberMetadata.rebalanceTimeoutMs

broker 端,sessionTimeoutMs 参数
broker 处理⼼跳的逻辑在 GroupCoordinator类中。如果⼼跳超期, broker coordinator 会把消费者从 group 中移除,并触发 rebalance。
可以看看源码的kafka.coordinator.group.GroupCoordinator#completeAndScheduleNextHeartbeatExpiration方法。

如果客户端发现⼼跳超期,客户端会标记 coordinator 为不可⽤,并阻塞⼼跳线程;如果超过了 poll 消息的间隔超过了 rebalanceTimeoutMs,则 consumer 告知 broker 主动离开消费组,也会触发 rebalance
可以看看源码的org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread 内部类

二、消息接收相关
2.1 常用参数配置
1653921425(1).png

2.2 订阅
Topic:Kafka⽤于分类管理消息的逻辑单元,类似与MySQL的数据库。
*Partition:是Kafka下数据存储的基本单元,这个是物理上的概念。同⼀个topic的数据,会被分散的存储到多个partition中,这些partition可以在同⼀台机器上,也可以是在多台机器上。优势在于:有利于⽔平扩展,避免单台机器在磁盘空间和性能上的限制,同时可以通过复制来增加数据冗余性,提⾼容灾能⼒。为了做到均匀分布,通常partition的数量通常是Broker Server数量的整数倍。
Consumer Group:同样是逻辑上的概念,是Kafka实现单播和⼴播两种消息模型的⼿段**。保证⼀个消费组获取到特定主题的全部的消息。在消费组内部,若⼲个消费者消费主题分区的消息,消费组可以保证⼀个主题的每个分区只被消费组中的⼀个消费者消费。
image.png

consumer 采⽤ pull 模式从 broker 中读取数据。

采⽤ pull 模式,consumer 可⾃主控制消费消息的速率, 可以⾃⼰控制消费⽅式(批量消费/逐条消费),还可以选择不同的提交⽅式从⽽实现不同的传输语义。

订阅主题:consumer.subscribe("tp_demo_01,tp_demo_02")

2.3 反序列化
2.3.1 Kafka 自带反序列化器
Kafka的broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进⾏反序列化处理,然后才能交给⽤户程序消费处理。

常用的Kafka提供的,反序列化器包括key的和value的反序列化器:

key.deserializer:IntegerDeserializer
value.deserializer:StringDeserializer
消费者从订阅的主题拉取消息:consumer.poll(3_000);

在Fetcher类中,对拉取到的消息⾸先进⾏反序列化处理:

private ConsumerRecord<K, V> parseRecord(TopicPartition partition, RecordBatch batch, Record record) {    try {        long offset = record.offset();        long timestamp = record.timestamp();        Optional<Integer> leaderEpoch = this.maybeLeaderEpoch(batch.partitionLeaderEpoch());        TimestampType timestampType = batch.timestampType();        Headers headers = new RecordHeaders(record.headers());        ByteBuffer keyBytes = record.key();        byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);        K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);        ByteBuffer valueBytes = record.value();        byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);        V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);        return new ConsumerRecord(partition.topic(), partition.partition(), offset, timestamp, timestampType, record.checksumOrNull(), keyByteArray == null ? -1 : keyByteArray.length, valueByteArray == null ? -1 : valueByteArray.length, key, value, headers, leaderEpoch);    } catch (RuntimeException var17) {        throw new SerializationException("Error deserializing key/value for partition " + partition + " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", var17);    }}

Kafka默认提供了⼏个反序列化的实现:

org.apache.kafka.common.serialization.ByteArrayDeserializer

org.apache.kafka.common.serialization.ByteBufferDeserializer

org.apache.kafka.common.serialization.BytesDeserializer

org.apache.kafka.common.serialization.DoubleDeserializer

org.apache.kafka.common.serialization.FloatDeserializer

org.apache.kafka.common.serialization.IntegerDeserializer

org.apache.kafka.common.serialization.LongDeserializer

org.apache.kafka.common.serialization.ShortDeserializer

org.apache.kafka.common.serialization.StringDeserializer

2.3.2 自定义反序列化器
反序列化器都需要实现org.apache.kafka.common.serialization.Deserializer接⼝:

这里根据前面自定义的序列化器,再自定义一个反序列化器。

先回顾一下前面的序列化器,添加了一个 User 对象,需要序列化User对象:

public class User {    private Integer userId;    private String username;    // set、get、toString、全参构造函数、无参构造函数 方法省略,} /** * User对象的序列化器 */public class UserSerializer implements Serializer<User> {    @Override    public void configure(Map<String, ?> map, boolean b) {        // do Nothing    }     @Override    public byte[] serialize(String topic, User user) {        try {            // 如果数据是null,则返回null            if (user == null) return null;            Integer userId = user.getUserId();            String username = user.getUsername();            int length = 0;            byte[] bytes = null;            if (null != username) {                bytes = username.getBytes("utf-8");                length = bytes.length;            }            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);            buffer.putInt(userId);            buffer.putInt(length);            buffer.put(bytes);            return buffer.array();        } catch (UnsupportedEncodingException e) {            throw new SerializationException("序列化数据异常");        }    }     @Override    public void close() {        // do Nothing    }}

这里再自定义一个反序列化器:

public class UserDeserializer implements Deserializer<User> {    @Override    public void configure(Map<String, ?> configs, boolean isKey) {        // do Nothing    }     @Override    public User deserialize(String topic, byte[] data) {        ByteBuffer allocate = ByteBuffer.allocate(data.length);        allocate.put(data);        allocate.flip();        int userId = allocate.getInt();        int length = allocate.getInt();        String userName = new String(data, 8, length);        return new User(userId, userName);    }     @Override    public void close() {        // do Nothing    }}

消费者使用自定义反序列化器:
image.png

2.4 拦截器
消费者在拉取了分区消息之后,要⾸先经过反序列化器对key和value进⾏反序列化处理。

处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应⽤程序进⾏处理。

消费端定义消息拦截器,需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor<K, V>接⼝。

⼀个可插拔接⼝,允许拦截甚⾄更改消费者接收到的消息。⾸要的⽤例在于将第三⽅组件引⼊消费者应⽤程序,⽤于定制的监控、⽇志处理等.
该接⼝的实现类通过configre⽅法获取消费者配置的属性,如果消费者配置中没有指定clientID,还可以获取KafkaConsumer⽣成的clientId。获取的这个配置是跟其他拦截器共享的,需要保证不会在各个拦截器之间产⽣冲突。
ConsumerInterceptor⽅法抛出的异常会被捕获、记录,但是不会向下传播。如果⽤户配置了错误的key或value类型参数,消费者不会抛出异常,⽽仅仅是记录下来。
ConsumerInterceptor回调发⽣在org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)⽅法同⼀个线程
该接⼝中有如下⽅法:

public interface ConsumerInterceptor<K, V> extends Configurable {     /**     * 该⽅法在poll⽅法返回之前调⽤。调⽤结束后poll⽅法就返回消息了。     *      * 该⽅法可以修改消费者消息,返回新的消息。拦截器可以过滤收到的消息或⽣成新的消息。     * 如果有多个拦截器,则该⽅法按照KafkaConsumer的configs中配置的顺序调⽤。     *      * @param records 由上个拦截器返回的由客户端消费的消息。     */    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);     /**     * 当消费者提交偏移量时,调⽤该⽅法     * 该⽅法抛出的任何异常调⽤者都会忽略。     */    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);     /**     * This is called when interceptor is closed     */    public void close();}

代码实现

自定义一个消费者拦截器:

public class OneInterceptor implements ConsumerInterceptor<String, String> {    @Override    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {        // poll⽅法返回结果之前最后要调⽤的⽅法        System.out.println("One -- 开始");        // 消息不做处理,直接返回        return records;     }        @Override    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {        // 消费者提交偏移量的时候,经过该⽅法        System.out.println("One -- 结束");     }        @Override    public void close() {        // ⽤于关闭该拦截器⽤到的资源,如打开的⽂件,连接的数据库等    }        @Override    public void configure(Map<String, ?> configs) {        // ⽤于获取消费者的设置参数        configs.forEach((k, v) -> {             System.out.println(k + "\t" + v);            });    }}

按照 OneInterceptor 拦截器复制两个拦截器,更名为 TwoInterceptor、ThreeInterceptor

消费者使用自定义拦截器:
image.png

2.5 位移提交&位移管理
位移提交介绍:

Consumer需要向Kafka记录⾃⼰的位移数据,这个汇报过程称为提交位移(Committing Offsets)
Consumer 需要为分配给它的每个分区提交各⾃的位移数据
位移提交的由Consumer端负责的,Kafka只负责保管。__consumer_offsets
位移提交分为⾃动提交和⼿动提交
位移提交分为同步提交和异步提交
2.5.1 位移自动提交
Kafka Consumer 后台提交

开启⾃动提交:enable.auto.commit=true
配置⾃动提交间隔:Consumer端:auto.commit.interval.ms,默认 5s
在消费者中设置自动提交和自动提交间隔:

Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "192.168.0.102:9092");configs.put("group.id", "mygrp");// 设置偏移量⾃动提交。⾃动提交是默认值。这⾥做示例。configs.put("enable.auto.commit", "true");// 偏移量⾃动提交的时间间隔configs.put("auto.commit.interval.ms", "3000");configs.put("key.deserializer", StringDeserializer.class);configs.put("value.deserializer", StringDeserializer.class);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);

⾃动提交位移的顺序:

配置 enable.auto.commit = true
Kafka会保证在开始调⽤poll⽅法时,提交上次poll返回的所有消息
因此⾃动提交不会出现消息丢失,但会重复消费
重复消费举例:

Consumer 每 5s 提交 offset
假设提交 offset 后的 3s 发⽣了 Rebalance
Rebalance 之后的所有 Consumer 从上⼀次提交的 offset 处继续消费
因此 Rebalance 发⽣前 3s 的消息会被重复消费
2.5.2 位移手动同步提交
使⽤ KafkaConsumer#commitSync():会提交 KafkaConsumer#poll()返回的最新 offset

该⽅法为同步操作,等待直到 offset 被成功提交才返回

while (true) {     ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));    process(records); // 处理消息     try {         consumer.commitSync();     } catch (CommitFailedException e) {        handle(e); // 处理提交失败异常     }}

commitSync 在处理完所有消息之后

⼿动同步提交可以控制offset提交的时机和频率

⼿动同步提交会:

调⽤ commitSync 时,Consumer 处于阻塞状态,直到 Broker 返回结果
会影响 TPS
可以选择拉⻓提交间隔,但有以下问题
会导致 Consumer 的提交频率下降
Consumer 重启后,会有更多的消息被消费
2.5.3 位移手动异步提交
KafkaConsumer#commitAsync()

while (true) {     ConsumerRecords<String, String> records = consumer.poll(3_000);             process(records);     // 处理消息     consumer.commitAsync((offsets, exception) -> {         if (exception != null) {            handle(exception);        }    });}

commitAsync出现问题不会⾃动重试

手动异步提交不会自动重试的解决方案:

try {     while(true) {         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));         process(records); // 处理消息         commitAysnc(); // 使⽤异步提交规避阻塞     }} catch(Exception e) {     handle(e); // 处理异常} finally {     try {         consumer.commitSync(); // 最后⼀次提交使⽤同步阻塞式提交    } finally {         consumer.close();     }}

2.5.3 消费者位移管理
Kafka中,消费者根据消息的位移顺序消费消息。

消费者的位移由消费者管理,可以存储于zookeeper中,也可以存储于Kafka主题__consumer_offsets中。

Kafka提供了消费者API,让消费者可以管理⾃⼰的位移。

KafkaConsumer<K, V> 的 API如下:

public void assign(Collection partitions)

说明:

给当前消费者⼿动分配⼀系列主题分区。

⼿动分配分区不⽀持增量分配,如果先前有分配分区,则该操作会覆盖之前的分配。

如果给出的主题分区是空的,则等价于调⽤unsubscribe⽅法。

⼿动分配主题分区的⽅法不使⽤消费组管理功能。当消费组成员变了,或者集群或主题的元数据改变了,不会触发分区分配的再平衡。

⼿动分区分配assign(Collection)不能和⾃动分区分配subscribe(Collection,ConsumerRebalanceListener)⼀起使⽤。

如果启⽤了⾃动提交偏移量,则在新的分区分配替换旧的分区分配之前,会对旧的分区分配中的消费偏移量进⾏异步提交。

public Set assignment()

说明:

获取给当前消费者分配的分区集合。如果订阅是通过调⽤assign⽅法直接分配主题分区,则返回相同的集合。如果使⽤了主题订阅,该⽅法返回当前分配给该消费者的主题分区集合。如果分区订阅还没开始进⾏分区分配,或者正在重新分配分区,则会返回none。

public Map<String, List> listTopics()

说明:

获取对⽤户授权的所有主题分区元数据。该⽅法会对服务器发起远程调⽤。

public List partitionsFor(String topic)

说明:

获取指定主题的分区元数据。如果当前消费者没有关于该主题的元数据,就会对服务器发起远程调⽤。

public Map<TopicPartition, Long> beginningOffsets(Collection partitions)

说明:

对于给定的主题分区,列出它们第⼀个消息的偏移量。

注意,如果指定的分区不存在,该⽅法可能会永远阻塞。

该⽅法不改变分区的当前消费者偏移量。

public void seekToEnd(Collection partitions)

说明:

将偏移量移动到每个给定分区的最后⼀个。

该⽅法延迟执⾏,只有当调⽤过poll⽅法或position⽅法之后才可以使⽤。

如果没有指定分区,则将当前消费者分配的所有分区的消费者偏移量移动到最后。

如果设置了隔离级别为:isolation.level=read_committed,则会将分区的消费偏移量移动到最后⼀个稳定的偏移量,即下⼀个要消费的消息现在还是未提交状态的事务消息。

public void seek(TopicPartition partition, long offset)

说明:

将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下⼀条要消费的消息偏移量。

若该⽅法多次调⽤,则最后⼀次的覆盖前⾯的。

如果在消费中间随意使⽤,可能会丢失数据。

public long position(TopicPartition partition)

说明:

检查指定主题分区的消费偏移量

public void seekToBeginning(Collection partitions)

说明:

将给定每个分区的消费者偏移量移动到它们的起始偏移量。该⽅法懒执⾏,只有当调⽤过poll⽅法或position⽅法之后才会执⾏。如果没有提供分区,则将所有分配给当前消费者的分区消费偏移量移动到起始偏移量。

准备数据

# ⽣成消息⽂件[root@node1 ~]# for i in `seq 60`; do echo "hello $i" >> nm.txt; done# 创建主题,三个分区,每个分区⼀个副本[root@node1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic tp_demo_01 --partitions 3 --replication-factor 1# 将消息⽣产到主题中[root@node1 ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic tp_demo_01 < nm.txt

API 实战:

/** * 消费者位移管理 */public class MyConsumer2 {    public static void main(String[] args) {        Map<String, Object> config = new HashMap<>();        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.102:9092");        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        config.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);         // 给当前消费者⼿动分配⼀系列主题分区        consumer.assign(Arrays.asList(new TopicPartition("tp_demo_01", 1)));         // 获取给当前消费者分配的分区集合        Set<TopicPartition> assignment = consumer.assignment();        assignment.forEach(topicPartition -> System.out.println(topicPartition));         // 获取对⽤户授权的所有主题分区元数据。该⽅法会对服务器发起远程调⽤        Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();        stringListMap.forEach((k, v) -> {            System.out.println("主题:" + k);            v.forEach(info -> System.out.println(info));        });         Set<String> strings = consumer.listTopics().keySet();        strings.forEach(topicName -> System.out.println(topicName));         // 获取指定主题的分区元数据        List<PartitionInfo> partitionInfos = consumer.partitionsFor("tp_demo_01");        for (PartitionInfo partitionInfo : partitionInfos) {            Node leader = partitionInfo.leader();            System.out.println(leader);            System.out.println(partitionInfo);            // 当前分区在线副本            Node[] nodes = partitionInfo.inSyncReplicas();            // 当前分区下线副本            Node[] nodes1 = partitionInfo.offlineReplicas();        }         // 对于给定的主题分区,列出它们第⼀个消息的偏移量。        // 注意,如果指定的分区不存在,该⽅法可能会永远阻塞。        // 该⽅法不改变分区的当前消费者偏移量。        Map<TopicPartition, Long> topicPartitionLongMap = consumer.beginningOffsets(consumer.assignment());        topicPartitionLongMap.forEach((k, v) -> {            System.out.println("主题:" + k.topic() + "\t分区:" + k.partition() + "偏移量\t" + v);        });        // 将偏移量移动到每个给定分区的最后⼀个。        consumer.seekToEnd(consumer.assignment());        //将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下⼀条要消费的消息偏移量。        consumer.seek(new TopicPartition("tp_demo_01", 1), 10);        // 检查指定主题分区的消费偏移量        long position = consumer.position(new TopicPartition("tp_demo_01", 1));        System.out.println(position);        // 将偏移量移动到每个给定分区的最后⼀个。        consumer.seekToEnd(Arrays.asList(new TopicPartition("tp_demo_01", 1)));         // 关闭⽣产者        consumer.close();     }}

2.6 再平衡
2.6.1 再平衡介绍
重平衡可以说是kafka为⼈诟病最多的⼀个点了。

重平衡其实就是⼀个协议,它规定了如何让消费者组下的所有消费者来分配topic中的每⼀个分区。⽐如⼀个topic有100个分区,⼀个消费者组内有20个消费者,在协调者的控制下让组内每⼀个消费者分配到5个分区,这个分配的过程就是重平衡。

重平衡的触发条件主要有三个:

消费者组内成员发⽣变更,这个变更包括了增加和减少消费者,⽐如消费者宕机退出消费组。
主题的分区数发⽣变更,kafka⽬前只⽀持增加分区,当增加的时候就会触发重平衡
订阅的主题发⽣变化,当消费者组使⽤正则表达式订阅主题,⽽恰好⼜新建了对应的主题,就会触发重平衡

image.png

消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区。
image.png

由于broker宕机,主题X的分区3宕机,此时分区3没有Leader副本,触发再平衡,消费者4没有对应的主题分区,则消费者4闲置。
image.png

主题增加分区,需要主题分区和消费组进⾏再均衡。
image.png
由于使⽤正则表达式订阅主题,当增加的主题匹配正则表达式的时候,也要进⾏再均衡。
image.png
为什么说重平衡为⼈诟病呢?因为重平衡过程中,消费者⽆法从kafka消费消息,这对kafka的TPS影响极⼤,⽽如果kafka集内节点较多,⽐如数百个,那重平衡可能会耗时极多。数分钟到数⼩时都有可能,⽽这段时间kafka基本处于不可⽤状态。所以在实际环境中,应该尽量避免重平衡发⽣。

2.6.2 避免再平衡
不可能完全避免重平衡,因为你⽆法完全保证消费者不会故障。⽽消费者故障其实也是最常⻅的引发重平衡的地⽅,所以我们需要保证尽⼒避免消费者故障。

⽽其他⼏种触发重平衡的⽅式,增加分区,或是增加订阅的主题,抑或是增加消费者,更多的是主动控制。

如果消费者真正挂掉了,就没办法了,但实际中,会有⼀些情况,kafka错误地认为⼀个正常的消费者已经挂掉了,我们要的就是避免这样的情况出现。

⾸先要知道哪些情况会出现错误判断挂掉的情况。

在分布式系统中,通常是通过⼼跳来维持分布式系统的,kafka也不例外。

在分布式系统中,由于⽹络问题你不清楚没接收到⼼跳,是因为对⽅真正挂了还是只是因为负载过重没来得及发⽣⼼跳或是⽹络堵塞。所以⼀般会约定⼀个时间,超时即判定对⽅挂了。⽽在kafka消费者场景中,session.timout.ms参数就是规定这个超时时间是多少。

还有⼀个参数,heartbeat.interval.ms,这个参数控制发送⼼跳的频率,频率越⾼越不容易被误判,但也会消耗更多资源。

此外,还有最后⼀个参数,max.poll.interval.ms,消费者poll数据后,需要⼀些处理,再进⾏拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。也就是说,拉取,然后处理,这个处理的时间不能超过max.poll.interval.ms这个参数的值。这个参数的默认值是5分钟,⽽如果消费者接收到数据后会执⾏耗时的操作,则应该将其设置得⼤⼀些。

总结:

session.timout.ms控制⼼跳超时时间。
heartbeat.interval.ms控制⼼跳发送频率。
max.poll.interval.ms控制poll的间隔。
这⾥给出⼀个相对较为合理的配置,如下:

session.timout.ms:设置为6s
heartbeat.interval.ms:设置2s
max.poll.interval.ms:推荐为消费者处理消息最⻓耗时再加1分钟
2.7 其他消费者参数配置
1653921667(1).png

1653921682(1).png

1653921699(1).png
image.png

三、 消费组管理
3.1 消费者组的概念
consumer group是kafka提供的可扩展且具有容错性的消费者机制。

三个特性:

消费组有⼀个或多个消费者,消费者可以是⼀个进程,也可以是⼀个线程
group.id 是⼀个字符串,唯⼀标识⼀个消费组
消费组订阅的主题每个分区只能分配给消费组⼀个消费者。
3.2 消费者位移(consumer position)
消费者在消费的过程中记录已消费的数据,即消费位移(offset)信息。

每个消费组保存⾃⼰的位移信息,那么只需要简单的⼀个整数表示位置就够了;同时可以引⼊checkpoint机制定期持久化。

3.3 位移管理(offset management)
⾃动VS⼿动

Kafka默认定期⾃动提交位移(enable.auto.commit = true),也⼿动提交位移。另外kafka会定期把group消费情况保存起来,做成⼀个offset map,如下图所示:

image.png

位移提交

位移是提交到Kafka中的__consumer_offsets主题。__consumer_offsets中的消息保存了每个消费组某⼀时刻提交的offset信息。

[root@localhost kafka_2.12-1.0.2]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config /usr/src/kafka_2.12-1.0.2/config/consumer.properties --from-beginning | head

image.png

上图中,标出来的,表示消费组为console-consumer-46068,消费的主题为topic_1,消费的分区是0,偏移量为5。

__consumers_offsets主题配置了compact策略,使得它总是能够保存最新的位移信息,既控制了该topic总体的⽇志容量,也能实现保存最新offset的⽬的。

3.4 再谈再平衡
什么是再平衡:

再均衡(Rebalance)本质上是⼀种协议,规定了⼀个消费组中所有消费者如何达成⼀致来分配订阅主题的每个分区。

⽐如某个消费组有20个消费组,订阅了⼀个具有100个分区的主题。正常情况下,Kafka平均会为每个消费者分配5个分区。这个分配的过程就叫再均衡。

什么时候再平衡:

再均衡的触发条件:

组成员发⽣变更(新消费者加⼊消费组组、已有消费者主动离开或崩溃了)
订阅主题数发⽣变更。如果正则表达式进⾏订阅,则新建匹配正则表达式的主题触发再均衡。
订阅主题的分区数发⽣变更
如何进行组内分区分配:

三种分配策略:RangeAssignor和RoundRobinAssignor以及StickyAssignor

谁来执⾏再均衡和消费组管理:

Kafka提供了⼀个⻆⾊:Group Coordinator来执⾏对于消费组的管理。

Group Coordinator——每个消费组分配⼀个消费组协调器⽤于组管理和位移管理。当消费组的第⼀个消费者启动的时候,它会去和Kafka Broker确定谁是它们组的组协调器。之后该消费组内所有消费者和该组协调器协调通信。

如何确定coordinator:

确定消费组位移信息写⼊__consumers_offsets的哪个分区。具体计算公式:

_consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。

该分区leader所在的broker就是组协调器。

Rebalance Generation:

它表示Rebalance之后主题分区到消费组中消费者映射关系的⼀个版本,主要是⽤于保护消费组,隔离⽆效偏移量提交的。如上⼀个版本的消费者⽆法提交位移到新版本的消费组中,因为映射关系变了,你消费的或许已经不是原来的那个分区了。每次group进⾏Rebalance之后,Generation号都会加1,表示消费组和分区的映射关系到了⼀个新版本,如下图所示: Generation 1时group有3个成员,随后成员2退出组,消费组协调器触发Rebalance,消费组进⼊Generation 2,之后成员4加⼊,再次触发Rebalance,消费组进⼊Generation 3.
image.png

协议(protocol)

kafka提供了5个协议来处理与消费组协调相关的问题:

Heartbeat请求:consumer需要定期给组协调器发送⼼跳来表明⾃⼰还活着
LeaveGroup请求:主动告诉组协调器我要离开消费组
SyncGroup请求:消费组Leader把分配⽅案告诉组内所有成员
JoinGroup请求:成员请求加⼊组
DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配⽅案,订阅信息等。通常该请求是给管理员使⽤
组协调器在再均衡的时候主要⽤到了前⾯4种请求。

liveness

消费者如何向消费组协调器证明⾃⼰还活着?

通过定时向消费组协调器发送Heartbeat请求。如果超过了设定的超时时间,那么协调器认为该消费者已经挂了。⼀旦协调器认为某个消费者挂了,那么它就会开启新⼀轮再均衡,并且在当前其他消费者的⼼跳响应中添加“REBALANCE_IN_PROGRESS”,告诉其他消费者:重新分配分区。

再均衡过程

再均衡分为2步:Join和Sync

Join, 加⼊组。所有成员都向消费组协调器发送JoinGroup请求,请求加⼊消费组。⼀旦所有成员都发送了JoinGroup请求,协调i器从中选择⼀个消费者担任Leader的⻆⾊,并把组成员信息以及订阅信息发给Leader。
Sync,Leader开始分配消费⽅案,即哪个消费者负责消费哪些主题的哪些分区。⼀旦完成分配,Leader会将这个⽅案封装进SyncGroup请求中发给消费组协调器,⾮Leader也会发SyncGroup请求,只是内容为空。消费组协调器接收到分配⽅案之后会把⽅案塞进SyncGroup的response中发给各个消费者。

image.png

注意:在协调器收集到所有成员请求前,它会把已收到请求放⼊⼀个叫purgatory(炼狱)的地⽅。然后是分发分配⽅案的过程,即SyncGroup请求:
image.png

消费组状态机

消费组组协调器根据状态机对消费组做不同的处理:
image.png

说明:

Dead:组内已经没有任何成员的最终状态,组的元数据也已经被组协调器移除了。这种状态响应各种请求都是⼀个response: UNKNOWN_MEMBER_ID
Empty:组内⽆成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求
PreparingRebalance:组准备开启新的rebalance,等待成员加⼊
AwaitingSync:正在等待leader consumer将分配⽅案传给各个成员
Stable:再均衡完成,可以开始消费。

相关文章
|
2月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
99 2
|
3月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
28 1
|
3月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
55 1
|
4月前
|
消息中间件 安全 Kafka
Kafka支持SSL/TLS协议技术深度解析
SSL(Secure Socket Layer,安全套接层)及其继任者TLS(Transport Layer Security,传输层安全)是为网络通信提供安全及数据完整性的一种安全协议。这些协议在传输层对网络连接进行加密,确保数据在传输过程中不被窃取或篡改。
339 0
|
5月前
|
图形学 C# 开发者
全面掌握Unity游戏开发核心技术:C#脚本编程从入门到精通——详解生命周期方法、事件处理与面向对象设计,助你打造高效稳定的互动娱乐体验
【8月更文挑战第31天】Unity 是一款强大的游戏开发平台,支持多种编程语言,其中 C# 最为常用。本文介绍 C# 在 Unity 中的应用,涵盖脚本生命周期、常用函数、事件处理及面向对象编程等核心概念。通过具体示例,展示如何编写有效的 C# 脚本,包括 Start、Update 和 LateUpdate 等生命周期方法,以及碰撞检测和类继承等高级技巧,帮助开发者掌握 Unity 脚本编程基础,提升游戏开发效率。
133 0
|
5月前
|
消息中间件 域名解析 网络协议
【Azure 应用服务】部署Kafka Trigger Function到Azure Function服务中,解决自定义域名解析难题
【Azure 应用服务】部署Kafka Trigger Function到Azure Function服务中,解决自定义域名解析难题
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
127 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
66 1
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
385 9
|
5月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
89 3

推荐镜像

更多