版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_26654727/article/details/82752988
文章目录
- kafka指令
- 1.常用指令
- 1.1 [ Adding and removing topics](http://kafka.apache.org/090/documentation.html#basic_ops_add_topic)
- 1.2 [Modifying topics](http://kafka.apache.org/090/documentation.html#basic_ops_modify_topic)
- 1.3 balancing leadership
- 1.4 checking consumer position
- 1.5 [Expanding your cluster(水平扩展集群)](http://kafka.apache.org/090/documentation.html#basic_ops_cluster_expansion)
- 1.6 [自定义分区分配和迁移](http://kafka.apache.org/090/documentation.html#basic_ops_partitionassignment)
- 1.7 [Increasing replication factor(新增副本数)](http://kafka.apache.org/090/documentation.html#basic_ops_increase_replication_factor)
- 1.常用指令
kafka指令
1.常用指令
1.1 Adding and removing topics
add:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
--partitions 20 --replication-factor 3 --config x=y
remove:
1.2 Modifying topics
add partition:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name
--partitions 40
add configes:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y
To remove a config:
> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
delete config (不使用):
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
1.3 balancing leadership
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
前提是需要加上配置
auto.leader.rebalance.enable=true
1.4 checking consumer position
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test
Group Topic Pid Offset logSize Lag Owner
my-group my-topic 0 0 0 0 test_jkreps-mn-1394154511599-60744496-0
my-group my-topic 1 0 0 0 test_jkreps-mn-1394154521217-1a0be913-0
1.5 Expanding your cluster(水平扩展集群)
- –generate: 在此模式下,给定主题列表和代理列表,该工具会生成候选重新分配,以将指定主题的所有分区移动到新代理。此选项仅提供了一种方便的方法,可在给定主题和目标代理列表的情况下生成分区重新分配计划。
- –execute:在此模式下,该工具将根据用户提供的重新分配计划启动分区的重新分配。 (使用–reassignment-json-file选项)。这可以是由管理员手工制作的自定义重新分配计划,也可以使用–generate选项提供
- –verify: 在此模式下,该工具将验证最后一次–execute期间列出的所有分区的重新分配状态。状态可以是成功完成,失败或正在进行中
1.由于该工具接受主题的输入列表作为json文件,因此首先需要确定要移动的主题并创建json文件,如下所示 -
> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
{"topic": "foo2"}],
"version":1
}
2.一旦json文件准备就绪,使用分区重新分配工具生成候选分配 -
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
Proposed partition reassignment configuration
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}
3.该工具生成一个候选分配,将所有分区从主题foo1,foo2移动到代理5,6。 但请注意,此时分区移动尚未开始,它只是告诉您当前的分配和建议的新分配。 应保存当前分配,以防您想要回滚它。 新的分配应保存在json文件中(例如expand-cluster-reassignment.json),以便使用–execute选项输入工具,如下所示 -
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}
4.检查状态
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully
1.6 自定义分区分配和迁移
1.创建分区文件 custom-reassignment.json
cat custom-reassignment.json
{
"version": 1,
"partitions": [{
"topic": "foo1",
"partition": 0,
"replicas": [5, 6]
}, {
"topic": "foo2",
"partition": 1,
"replicas": [2, 3]
}]
}
2.使用json文件执行分区操作
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
{"topic":"foo2","partition":1,"replicas":[3,4]}]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
3.确认状态
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
{"topic":"foo2","partition":1,"replicas":[3,4]}]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
1.7 Increasing replication factor(新增副本数)
逻辑和上两个一致 通过json文件 手动修改副本数据位置
提供json文件:
> cat increase-replication-factor.json
{
"version": 1,
"partitions": [{
"topic": "foo",
"partition": 0,
"replicas": [5, 6, 7]
}]
}
1.8 启动 消费 等指令
1.启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
2.启动kafka
bin/kafka-server-start.sh config/server.properties &
3.停止kafka
bin/kafka-server-stop.sh
4.停止zookeeper
bin/zookeeper-server-stop.sh
5.创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
6.展示topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
7.描述topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
8.生产者:
bin/kafka-console-producer.sh --broker-list 130.51.23.95:9092 --topic my-replicated-topic
9.消费者:
bin/kafka-console-consumer.sh --zookeeper 130.51.23.95:2181 --topic test --from-beginnin