<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont

简介: 参考,https://cwiki.apache.org/confluence/display/KAFKA/System+Toolshttps://cwiki.
参考,

https://cwiki.apache.org/confluence/display/KAFKA/System+Tools

https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools

http://kafka.apache.org/documentation.html#quickstart

http://kafka.apache.org/documentation.html#operations

 

为了便于使用,kafka提供了比较强大的Tools,把经常需要使用的整理一下

 

开关kafka Server

bin/kafka-server-start.sh config/server.properties
bin/kafka-server-stop.sh
JMX_PORT=9999 nohup bin/kafka-server-start.sh config/server.properties &

 

topic相关

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181

describe topic的详细情况

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

修改topic的partition,只能增加

bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 3 --topic test

到0.8.2才正式支持删除topic,当前是beta版

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

查看有问题的partition

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --unavailable-partitions --topic test
per-topic 修改参数
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1 
        --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic 
    --config max.message.bytes=128000
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic 
    --deleteConfig max.message.bytes

 

集群扩展
集群扩展,对于broker还是比较简单的,但是现有的topic上的partition是不会做自动迁移的
需要手工做迁移,但kafka提供了比较方便的工具,

--generate,生成参考的迁移计划
given a list of topics and a list of brokers,工具会给出迁徙方案

把topic完全迁移到新的brokers

> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
            {"topic": "foo2"}],
 "version":1
}
复制代码
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate 
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,4]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,4]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Proposed partition reassignment configuration

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
               {"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":2,"replicas":[5,6]},
               {"topic":"foo2","partition":0,"replicas":[5,6]},
               {"topic":"foo1","partition":1,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[5,6]}]
}
复制代码

给出当前的assignment情况和,迁移方案

我们可以同时保存当前的assignment情况和迁移方案,当前的assignment情况可以用于rollback

--execute,开始执行迁移

复制代码
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,4]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,4]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
               {"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":2,"replicas":[5,6]},
               {"topic":"foo2","partition":0,"replicas":[5,6]},
               {"topic":"foo1","partition":1,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[5,6]}]
}
复制代码

--verify,check当前的迁移状态

复制代码
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully 
Reassignment of partition [foo2,2] completed successfully
复制代码

选择topic的某个partition的某些replica进行迁徙

moves partition 0 of topic foo1 to brokers 5,6 and partition 1 of topic foo2 to brokers 2,3

> cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
复制代码
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
               {"topic":"foo2","partition":1,"replicas":[3,4]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}
复制代码

brokers下线

当前版本不支持下线的规划,需要到0.8.2才支持,这需要把一个broker上的replica清空

增加replication factor

partition 0的replica数从1增长到3,当前replica存在broker5,在broker6,7上增加replica

> cat increase-replication-factor.json
{"version":1,
 "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
复制代码
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
复制代码

 

Producer console

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a message
This is another message

后面可以任意的输入message,都会发到broker的topic中

 

Comsumer console

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

从头读这个topic,可以重复读到所有数据
我在想为啥,每次都能replay,原来每次都是随机产生一个groupid
consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000))

 

Consumer Offset Checker

这个会显示出consumer group的offset情况, 必须参数为--group, 不指定--topic,默认为所有topic

Displays the:  Consumer Group, Topic, Partitions, Offset, logSize, Lag, Owner for the specified set of Topics and Consumer Group

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker

required argument: [group]
Option Description
------ -----------
--broker-info Print broker info
--group Consumer group.
--help Print this message.
--topic Comma-separated list of consumer
   topics (all topics if absent).
--zkconnect ZooKeeper connect string. (default: localhost:2181)

Example,

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv

Group           Topic                          Pid Offset          logSize         Lag             Owner
pv              page_visits                    0   21              21              0               none
pv              page_visits                    1   19              19              0               none
pv              page_visits                    2   20              20              0               none

 

 

Export Zookeeper Offsets

将Zk中的offset信息以下面的形式打到file里面去

A utility that retrieves the offsets of broker partitions in ZK and prints to an output file in the following format:

/consumers/group1/offsets/topic1/1-0:286894308
/consumers/group1/offsets/topic1/2-0:284803985

bin/kafka-run-class.sh kafka.tools.ExportZkOffsets

required argument: [zkconnect]
Option Description
------ -----------
--group Consumer group.
--help Print this message.
--output-file Output file
--zkconnect ZooKeeper connect string. (default: localhost:2181)

 

Update Offsets In Zookeeper

这个挺有用,用于replay, kafka的文档有点坑爹,看了不知道咋用,还是看源码才看明白

A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK.

bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK

USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic

Example,

bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties  page_visits

Group           Topic                          Pid Offset          logSize         Lag             Owner
pv              page_visits                    0   0               21              21              none
pv              page_visits                    1   0               19              19              none
pv              page_visits                    2   0               20              20              none

可以看到offset已经被清0,Lag=logSize

 

更加直接的方式是,直接去Zookeeper里面看

通过zkCli.sh连上后,通过ls查看

Broker Node Registry

/brokers/ids/[0...N] --> host:port (ephemeral node)

Broker Topic Registry

/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)

Consumer Id Registry

/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)

Consumer Offset Tracking

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)

Partition Owner registry

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)
目录
相关文章
|
Web App开发 数据库
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
可伸缩系统的架构经验 Feb 27th, 2013 | Comments 最近,阅读了Will Larson的文章Introduction to Architecting System for Scale,感觉很有价值。
2498 0
|
SQL Web App开发 前端开发
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
     如果在INSERT语句末尾指定了ON DUPLICATE KEY UPDATE,并且插入行后会导致在一个UNIQUE索引或PRIMARY KEY中出现重复值,则在出现重复值的行执行UPDATE;如果不会导致唯一值列重复的问题,则插入新行。
907 0
|
Web App开发 前端开发
|
数据库
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
CentOS 6.5安装配置ldap 时间:2015-07-14 00:54来源:blog.51cto.com 作者:“ly36843运维” 博客 举报 点击:274次 一.
1044 0
|
Web App开发 前端开发 大数据
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
一、概述   多维数据模型是最流行的数据仓库的数据模型,多维数据模型最典型的数据模式包括星型模式、雪花模式和事实星座模式,本文以实例方式展示三者的模式和区别。
876 0
|
Web App开发 前端开发
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
一个典型的星型模式包括一个大型的事实表和一组逻辑上围绕这个事实表的维度表。  事实表是星型模型的核心,事实表由主键和度量数据两部分组成。
639 0
|
Web App开发 前端开发 Java
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
Runnable:一般指该线程正在执行状态中,该线程占用了资源,正在处理某个请求,例如有可能在对某个文件操作,有可能进行数据类型等转换。
721 0
|
Web App开发 Java
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
                                                                                序列化对单例的破坏 本文将通过实例+阅读Java源码的方式介绍序列化是如何破坏单例模式的,以及如何避免序列化对单例的破坏。
1038 0
|
新零售 Web App开发 前端开发
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
各大互联网公司架构演进之路汇总 大型网站架构演化历程 大型网站架构技术一览 Web 支付宝和蚂蚁花呗的技术架构及实践 支付宝的高可用与容灾架构演进 聚划算架构演进和系统优化 (视频+PPT) 淘宝交易系统演进之路 (专访) 淘宝数据魔方技术架构解析 淘宝技术发展历程和架构经验分享(视频+PPT)(2.
2224 0

热门文章

最新文章