【kafka运维】分区副本重分配、数据迁移、副本扩缩容 (附教学视频)

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 【kafka运维】分区副本重分配、数据迁移、副本扩缩容 (附教学视频)

提示:本文可能已过期,请点击原文查看:【kafka运维】分区副本重分配、数据迁移、副本扩缩容 (附教学视频)


文章目录

脚本参数

1. 脚本的使用介绍

1.1 生成推荐配置脚本

1.2. 执行Json文件

1.3. 验证

2. 副本扩缩

2.1 副本扩容

2.1.1 计算副本分配方式

2.1.2 执行--execute

2.1.2 验证--verify

2.2 副本缩容

3. 分区扩容

4. 分区迁移

5. 副本跨路径迁移

源码解析

脚本参数

参数 描述 例子

--zookeeper 连接zk --zookeeper localhost:2181, localhost:2182

--topics-to-move-json-file 指定json文件,文件内容为topic配置 --topics-to-move-json-file config/move-json-file.json Json文件格式如下:


image.png

image.png

--generate 尝试给出副本重分配的策略,该命令并不实际执行

--broker-list 指定具体的BrokerList,用于尝试给出分配策略,与--generate搭配使用 --broker-list 0,1,2,3

--reassignment-json-file 指定要重分配的json文件,与--execute搭配使用 json文件格式如下例如:

image.png

--execute 开始执行重分配任务,与--reassignment-json-file搭配使用

--verify 验证任务是否执行成功,当有使用--throttle限流的话,该命令还会移除限流;该命令很重要,不移除限流对正常的副本之间同步会有影响

--throttle 迁移过程Broker之间现在流程传输的速率,单位 bytes/sec -- throttle 500000

--replica-alter-log-dirs-throttle broker内部副本跨路径迁移数据流量限制功能,限制数据拷贝从一个目录到另外一个目录带宽上限 单位 bytes/sec --replica-alter-log-dirs-throttle 100000

--disable-rack-aware 关闭机架感知能力,在分配的时候就不参考机架的信息

--bootstrap-server 如果是副本跨路径迁移必须有此参数

1. 脚本的使用介绍

该脚本是kafka提供用来重新分配分区的脚本工具;


1.1 生成推荐配置脚本

关键参数--generate


在进行分区副本重分配之前,最好是用下面方式获取一个合理的分配文件;

编写move-json-file.json文件; 这个文件就是告知想对哪些Topic进行重新分配的计算

{
  "topics": [
    {"topic": "test_create_topic1"}
  ],
  "version": 1
}

然后执行下面的脚本,--broker-list "0,1,2,3" 这个参数是你想要分配的Brokers;


sh bin/kafka-reassign-partitions.sh --zookeeper xxx:2181 --topics-to-move-json-file config/move-json-file.json --broker-list "0,1,2,3" --generate


执行完毕之后会打印

Current partition replica assignment//当前副本分配方式
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[3],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[2],"log_dirs":["any"]}]}
Proposed partition reassignment configuration//期望的重新分配方式
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[0],"log_dirs":["any"]}]}

需求注意的是,此时分区移动尚未开始,它只是告诉你当前的分配和建议。保存当前分配,以防你想要回滚它

1.2. 执行Json文件

关键参数--execute

将上面得到期望的重新分配方式文件保存在一个json文件里面

reassignment-json-file.json

{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[0],"log_dirs":["any"]}]}

然后执行


sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx:2181 --reassignment-json-file config/reassignment-json-file.json --execute


迁移过程注意流量陡增对集群的影响

Kafka提供一个broker之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上限,当重新平衡集群,引导新broker,添加或移除broker时候,这是很有用的。因为它限制了这些密集型的数据操作从而保障了对用户的影响、

例如我们上面的迁移操作加一个限流选项-- throttle 50000000

> sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx:2181 --reassignment-json-file config/reassignment-json-file.json --execute -- throttle 50000000

在后面加上一个—throttle 50000000 参数, 那么执行移动分区的时候,会被限制流量在50000000 B/s

加上参数后你可以看到

The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.

需要注意的是,如果你迁移的时候包含 副本跨路径迁移(同一个Broker多个路径)那么这个限流措施不会生效,你需要再加上|--replica-alter-log-dirs-throttle 这个限流参数,它限制的是同一个Broker不同路径直接迁移的限流;


如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行execute命令,用相同的reassignment-json-file


1.3. 验证

关键参数--verify

该选项用于检查分区重新分配的状态,同时—throttle流量限制也会被移除掉; 否则可能会导致定期复制操作的流量也受到限制。


sh bin/kafka-reassign-partitions.sh --zookeeper xxxx:2181 --reassignment-json-file config/reassignment-json-file.json --verify


image.png

image.png

注意: 当你输入的BrokerId不存在时,该副本的操作会失败,但是不会影响其他的;例如

image.png

2. 副本扩缩

kafka并没有提供一个专门的脚本来支持副本的扩缩, 不像kafka-topic.sh脚本一样,是可以扩分区的; 想要对副本进行扩缩,只能是曲线救国了; 利用kafka-reassign-partitions.sh来重新分配副本


2.1 副本扩容

假设我们当前的情况是 3分区1副本,为了提供可用性,我想把副本数升到2;


2.1.1 计算副本分配方式

我们用步骤1.1的 --generate 获取一下当前的分配情况,得到如下json

{
  "version": 1,
  "partitions": [{
    "topic": "test_create_topic1",
    "partition": 2,
    "replicas": [2],
    "log_dirs": ["any"]
  }, {
    "topic": "test_create_topic1",
    "partition": 1,
    "replicas": [1],
    "log_dirs": ["any"]
  }, {
    "topic": "test_create_topic1",
    "partition": 0,
    "replicas": [0],
        "log_dirs": ["any"]
  }]
}

我们想把所有分区的副本都变成2,那我们只需修改"replicas": []里面的值了,这里面是Broker列表,排在第一个的是Leader; 所以我们根据自己想要的分配规则修改一下json文件就变成如下

{
  "version": 1,
  "partitions": [{
    "topic": "test_create_topic1",
    "partition": 2,
    "replicas": [2,0],
    "log_dirs": ["any","any"]
  }, {
    "topic": "test_create_topic1",
    "partition": 1,
    "replicas": [1,2],
    "log_dirs": ["any","any"]
  }, {
    "topic": "test_create_topic1",
    "partition": 0,
    "replicas": [0,1],
        "log_dirs": ["any","any"]
  }]
}

注意log_dirs里面的数量要和replicas数量匹配;或者直接把log_dirs选项删除掉; 这个log_dirs是副本跨路径迁移时候的绝对路径

2.1.2 执行–execute

image.png

如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行execute命令,用相同的reassignment-json-file:

2.1.2 验证–verify

image.png

完事之后,副本数量就增加了;

2.2 副本缩容

副本缩容跟扩容是一个意思; 当副本分配少于之前的数量时候,多出来的副本会被删除;

比如刚刚我新增了一个副本,想重新恢复到一个副本

执行下面的json文件

{
  "version": 1,
  "partitions": [{
    "topic": "test_create_topic1",
    "partition": 2,
    "replicas": [2],
    "log_dirs": ["any"]
  }, {
    "topic": "test_create_topic1",
    "partition": 1,
    "replicas": [1],
    "log_dirs": ["any"]
  }, {
    "topic": "test_create_topic1",
    "partition": 0,
    "replicas": [0],
    "log_dirs": ["any"]
  }]
}

执行之后可以看到其他的副本就被标记为删除了; 一会就会被清理掉

image.png

用这样一种方式我们虽然是实现了副本的扩缩容, 但是副本的分配需要我们自己来把控好, 要做到负载均衡等等; 那肯定是没有kafka自动帮我们分配比较合理一点; 那么我们有什么好的方法来帮我们给出一个合理分配的Json文件吗?PS:


我们之前已经分析过【kafka源码】创建Topic的时候是如何分区和副本的分配规则 那么我们把这样一个分配过程也用同样的规则来分配不就Ok了吗?

--generate本质上也是调用了这个方法,AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)


具体的实现操作请看 【kafka思考】最小成本的扩缩容副本设计方案


自己写一个工程来实现类似的方法,如果觉得很麻烦,可以直接使用

LogIKM 的新增副本功能直接帮你做了这个事情;(未来会实现)


3. 分区扩容

kafka的分区扩容是 kafka-topis.sh脚本实现的;不支持缩容

分区扩容请看 【kafka源码】TopicCommand之alter源码解析(分区扩容)


4. 分区迁移

分区迁移跟上面同理, 请看 1.1,1.2,1.3 部分;


5. 副本跨路径迁移

为什么线上Kafka机器各个磁盘间的占用不均匀,经常出现“一边倒”的情形? 这是因为Kafka只保证分区数量在各个磁盘上均匀分布,但它无法知晓每个分区实际占用空间,故很有可能出现某些分区消息数量巨大导致占用大量磁盘空间的情况。在1.1版本之前,用户对此毫无办法,因为1.1之前Kafka只支持分区数据在不同broker间的重分配,而无法做到在同一个broker下的不同磁盘间做重分配。1.1版本正式支持副本在不同路径间的迁移


怎么在一台Broker上用多个路径存放分区呢?


只需要在配置上接多个文件夹就行了

############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=kafka-logs-5,kafka-logs-6,kafka-logs-7,kafka-logs-8

注意同一个Broker上不同路径只会存放不同的分区,而不会将副本存放在同一个Broker; 不然那副本就没有意义了(容灾)


怎么针对跨路径迁移呢?


迁移的json文件有一个参数是log_dirs; 默认请求不传的话 它是"log_dirs": ["any"] (这个数组的数量要跟副本保持一致)

但是你想实现跨路径迁移,只需要在这里填入绝对路径就行了,例如下面


迁移的json文件示例

{
  "version": 1,
  "partitions": [{
    "topic": "test_create_topic4",
    "partition": 2,
    "replicas": [0],
    "log_dirs": ["/Users/xxxxx/work/IdeaPj/source/kafka/kafka-logs-5"]
  }, {
    "topic": "test_create_topic4",
    "partition": 1,
    "replicas": [0],
    "log_dirs": ["/Users/xxxxx/work/IdeaPj/source/kafka/kafka-logs-6"]
  }]
}

然后执行脚本

sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx --reassignment-json-file config/reassignment-json-file.json --execute --bootstrap-server
xxxxx:9092 --replica-alter-log-dirs-throttle 10000

注意 --bootstrap-server 在跨路径迁移的情况下,必须传入此参数


如果需要限流的话 加上参数|--replica-alter-log-dirs-throttle ; 跟--throttle不一样的是 --replica-alter-log-dirs-throttle限制的是Broker内不同路径的迁移流量;


相关文章
|
5月前
|
消息中间件 SQL 分布式计算
大数据-64 Kafka 高级特性 分区Partition 分区重新分配 实机实测重分配
大数据-64 Kafka 高级特性 分区Partition 分区重新分配 实机实测重分配
210 7
|
7月前
|
消息中间件 负载均衡 Kafka
Kafka分区分配策略大揭秘:RoundRobin、Range、Sticky,你真的了解它们吗?
【8月更文挑战第24天】Kafka是一款突出高吞吐量、可扩展性和数据持久性的分布式流处理平台。其核心特性之一是分区分配策略,对于实现系统的负载均衡和高可用性至关重要。Kafka支持三种主要的分区分配策略:RoundRobin(轮询)、Range(范围)和Sticky(粘性)。RoundRobin策略通过轮询方式均衡分配分区;Range策略根据主题分区数和消费者数量分配;而Sticky策略则在保持原有分配的基础上动态调整,以确保各消费者负载均衡。理解这些策略有助于优化Kafka性能并满足不同业务场景需求。
504 59
|
5月前
|
消息中间件 分布式计算 算法
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
91 3
|
6月前
|
消息中间件 运维 Linux
linux之centos运维kafka
linux之centos运维kafka
|
8月前
|
消息中间件 分布式计算 NoSQL
EMR-Kafka Connect:高效数据迁移的革新实践与应用探索
Kafka Connect是Kafka官方提供的一个可扩展的数据传输框架,它允许用户以声明式的方式在Kafka与其他数据源之间进行数据迁移,无需编写复杂的数据传输代码。
|
8月前
|
消息中间件 存储 Kafka
如何通过 CloudCanal 实现从 Kafka 到 AutoMQ 的数据迁移
随着大数据技术的飞速发展,Apache Kafka 作为一种高吞吐量、低延迟的分布式消息系统,已经成为企业实时数据处理的核心组件。然而,随着业务的扩展和技术的发展,企业面临着不断增加的存储成本和运维复杂性问题。为了更好地优化系统性能和降低运营成本,企业开始寻找更具优势的消息系统解决方案。其中,AutoMQ [1] 作为一种基于云重新设计的消息系统,凭借其显著的成本优势和弹性能力,成为了企业的理想选择。
54 0
如何通过 CloudCanal 实现从 Kafka 到 AutoMQ 的数据迁移
|
8月前
|
消息中间件 算法 Kafka
从零开始掌握Kafka Rebalance和分区分配
**Kafka Rebalance详解:**当消费者组成员、订阅主题或分区变化时,集群需重新分配任务。涉及关键点:成员增减、主题数量及分区数变更。Rebalance包括Leader选举、RangeAssignor算法的分区分配,以及创建、删除、修改和查询Topic的基本操作。了解这些有助于优化Kafka集群管理。关注“软件求生”获取更多技术内容!
446 0
|
8月前
|
运维
会员系统功能售卖之一之学习资源,余老师的优点PTF做的好,公众号做的好,自媒体运维的好,教学视频类网站,学习自媒体运维可以多看看别人视频,讲课不会讲可以用音频转文字看看人家是怎样讲的,可以在网站上视频
会员系统功能售卖之一之学习资源,余老师的优点PTF做的好,公众号做的好,自媒体运维的好,教学视频类网站,学习自媒体运维可以多看看别人视频,讲课不会讲可以用音频转文字看看人家是怎样讲的,可以在网站上视频
|
10月前
|
消息中间件 存储 负载均衡
【Kafka】Kafka 的分区分配策略分析
【4月更文挑战第7天】【Kafka】Kafka 的分区分配策略分析
|
10月前
|
消息中间件 存储 缓存
分布式实时消息队列Kafka(四)消费分配策略与存储机制
分布式实时消息队列Kafka(四)消费分配策略与存储机制
431 1