大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(正在更新…)

章节内容

上节我们完成了如下内容:

现实中业务中我们遇到了分区副本数量想要调整的问题,假设起初我们的分区副本数只有1,想要修改为2、3来保证当部分Kafka的Broker宕机时,仍然可以提供服务给我们,但是不可以用脚本直接修改,所以我们通过JSON+脚本的方式,来达到Kafka副本分区的调整。


启动服务、创建主题、查看主题

修改分区副本因子(不允许)、修改分区副本因子(成功)

查看结果

fef323d0c4a1a6a1cb5d00dceeddbb58_6733d1e133de41cd9cad41908290b96b.png 分区分配策略

在Kafka中,每个Topic会包含多个分区,默认情况下一个分区只会被一个消费组下面的一个消费者消费,这里就产生了分区分配的问题。

Kafka中提供了多重分区分配算法(PartitionAssignor):


RangeAssignor

RoundRobinAssignor

StickAssignor

6c87df8410dcb4aa83ded2e05a029435_220d66b90dd4477e87254435deea424a.png RangeAssignor

PartionAssignor 接口用于用户自定义分区分配算法,以实现Consumer之间的分区分配。

消费组的成员定义他们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker,协调者选择其中一个消费者来执行这个消费组的分区,并将分配结果转发给消费组内所有的消费者。

Kafka默认采用的是RangeAssignor的分配算法。

a90379c29b81006808b942904f403436_546b7d12a9c346e1b6cbf962886103a6.png RangeAssignor对每个Topic进行独立的分区分配,对于每一个Topic,首先对分区按照分区ID进行数值排序,然后订阅这个Topic的消费组的消费者再进行字典排序,之后尽量均衡的将分区分配给消费者,这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,有一些消费者就会多分配到一些分区。

RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运行来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能的均匀的分配给所有的消费者。

对于每一个Topic,RangerAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够分配均衡,那么字典序靠前的消费者会被多分配一个分区。

0278642221a0bb4aa54db260c20f97d1_dd3e2adbca7f45a0a0665e5a18dd2ddf.png RoundRobinAssignor

RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。

如果消费组内,消费订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果尽量均衡。如果订阅的Topic列表是不同的,那么分配结果不保证尽量均衡。

5d0f06cccb4a971771f7fb2081336048_f6d58f70bdd94e9ea1d5290cee805c3b.png

对于 RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的方式能让消费者之间尽量均衡的分配到分区(分配到的分区的差值不会超过1,而RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越大)

对于消费组内消费者订阅Topic不一致的情况:假设有两个消费者分别为C0和C1,有2个TopicT1、T2,分别有3个分区、2个分区,并且 C0 订阅了T1和T2,那么RoundRobinAssignor的分配结果如下:

303db0e99c74de9985f49e5d34d33c35_b4bec463acdf4b328d495d3efc27b097.png

StickyAssignor

尽管 RoundRobinAssignor 已经在 RangeAssignoror 上做了一些优化来更均衡的分配分区,但是在一些情况下依旧会产生严重的分配偏差,比如消费组中订阅的Topic列表不相同的情况下。

更核心的问题是无论是 RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上一次的分配结果。显然,在执行一次新的分配之前,如果能考虑到上一次的分配结果,尽量少的调整分区分配的变动,显然是能减少很多开销的。

Sticky是“粘性的”,可以理解为分配是带粘性的:


分区的分配尽量的均衡

每一次重分配的结果尽量与上一次分配结果保持一致

当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个才是真正体现出 StickyAssignor 特性的。


假设当前有如下内容:


3个Consumer C0、C1、C2

4个Topic:T0、T1、T2、T3 每个Topic有2个分区

所有Consumer都订阅了4个分区

05b0bb6ec691eeb9cf8a7e3a4959dc07_47fa0f7fd92149aeb2ba2d389379c26b.png ab46a2d50f86157e92ce0c790d1bef7d_708893afa77d4b3084a5087e5596e848.png 自定义分区策略

基本概念

需要实现:org.apache.kafka.clients.consumer.internals.PartitionAssignor 接口

其中定义了两个内部类:


Subscription:用来表示消费者的订阅信息,类中有两个属性:topics、userData,分别表示消费者所订阅Topic列表和用户自定义信息。

PartitionAssignor接口通过subscription()方法来设置消费者自身相关的Subscription信息,注意此方法中只有一个参数Topics,与Subscription类中的topics相互呼应,但是并没有有关userData的参数体现。为了增强用户对分配结果的控制,可以在Subscription()方法内部添加一些影响分配的用户自定义信息赋予userData,比如:权重、IP地址、HOST或者机架

Assignment:用来表示分配信息的,类中有两个属性:partitions、userData,分别表示所分配到的分区集合和用户自定义的数据,可以通过PartitonAssignor接口中的onAssignment()方法是在每个消费者收到消费组Leader分配结果时的回调函数,例如在:StickyAssignor策略中就是通过这个方法保存当前的分配方案,以备下次消费组再平衡(Rebalance)时可以提供分配参考依据。

Kafka还提供了一个抽象类:org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化 PartitionAssignor 接口的实现,对 assign() 方法进行了实现,其中将Subscription的 userData信息去掉后,在进行分配。


代码实现

import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;

import java.nio.ByteBuffer;
import java.util.*;

public class WeightedPartitionAssignor implements ConsumerPartitionAssignor {

    @Override
    public Subscription subscription(Set<String> topics) {
        // 在这里添加权重信息到 userData
        ByteBuffer buffer = ByteBuffer.allocate(4);
        buffer.putInt(getWeight());
        buffer.flip();
        return new Subscription(new ArrayList<>(topics), buffer);
    }

    @Override
    public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
        Map<String, Assignment> assignments = new HashMap<>();
        Map<TopicPartition, List<String>> partitionConsumers = new HashMap<>();

        // 遍历所有订阅的topics
        for (String topic : metadata.topics()) {
            List<TopicPartition> partitions = metadata.partitionsForTopic(topic);
            for (TopicPartition partition : partitions) {
                partitionConsumers.putIfAbsent(partition, new ArrayList<>());
            }
        }

        // 根据权重分配分区
        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
            String consumerId = subscriptionEntry.getKey();
            Subscription subscription = subscriptionEntry.getValue();
            int weight = subscription.userData().getInt();

            for (String topic : subscription.topics()) {
                List<TopicPartition> partitions = metadata.partitionsForTopic(topic);
                for (TopicPartition partition : partitions) {
                    List<String> consumers = partitionConsumers.get(partition);
                    for (int i = 0; i < weight; i++) {
                        consumers.add(consumerId);  // 权重高的消费者多次添加,增加选中的机会
                    }
                }
            }
        }

        // 随机分配分区给消费者
        Random random = new Random();
        for (Map.Entry<TopicPartition, List<String>> entry : partitionConsumers.entrySet()) {
            List<String> consumers = entry.getValue();
            String assignedConsumer = consumers.get(random.nextInt(consumers.size()));
            assignments.computeIfAbsent(assignedConsumer, k -> new Assignment(new ArrayList<>()))
                       .partitions().add(entry.getKey());
        }

        return assignments;
    }

    @Override
    public void onAssignment(Assignment assignment, Cluster metadata) {
        // 可以在这里处理分配后的逻辑,比如保存当前分配的快照
    }

    @Override
    public String name() {
        return "weighted";
    }

    private int getWeight() {
        // 获取权重,可以从配置文件或环境变量中获取
        return 10; // 默认权重为10
    }
}

注册使用

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, WeightedPartitionAssignor.class.getName());
// 配置其他消费者属性

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1", "topic2"));


目录
相关文章
|
3月前
|
自然语言处理 大数据 应用服务中间件
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
81 5
|
3月前
|
SQL 大数据 API
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
77 0
|
3月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
229 0
|
9天前
|
消息中间件 运维 大数据
道旅科技借助云消息队列 Kafka 版加速旅游大数据创新发展
阿里云云消息队列 Kafka 版 Serverless 系列凭借其卓越的弹性能力,为道旅科技提供了灵活高效的数据流处理解决方案。无论是应对突发流量还是规划长期资源需求,该方案均能帮助企业实现资源动态调整和成本优化,同时保障业务的高可用性和连续性。
|
3月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
57 3
|
3月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
47 2
|
3月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
68 1
|
3月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
211 0
|
2月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
435 7
|
2月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
59 2