下载kafka
https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz
解压安装包
> tar -xzf kafka_2.11-1.0.0.tgz
> cd kafka_2.11-1.0.0/bin
查看bin目录下主要几个脚本功能如下:
脚本 | 功能 |
---|---|
kafka-server-start.sh | 启动kafka服务器; |
kafka-server-stop.sh | 停止kafka服务器; |
kafka-topics.sh | topic管理; |
kafka-console-producer.sh | 基于命令行的生产者; |
kafka-console-consumer.sh | 基于命令行的消费者; |
kafka-run-class.sh | 运行java类的脚本,由kafka-server-start.sh和kafka-server-stop.sh、kafka-topics.sh等脚本调用; |
zookeeper-server-start.sh | 启动kafka自带的zookeeper服务器; |
zookeeper-server-stop.sh | 停止kafka自带的zookeeper服务器; |
zookeeper-shell.sh | 在命令行连接zookeeper的客户端工具; |
connect-standalone.sh | 在命令行启动单点的connector; |
connect-distributed.sh | 在命令行启动基于集群connector; |
注:kafka的安装包除了包括kafka自身的工具以外,也包括了一系列简易的zookeeper工具,能够通过zookeeper-server-start.sh脚本启动简易的单点zookeeper实例,供kafka使用。但一般仅限于测试环境使用;
config目录下存放的是kafka服务、自带zk服务以及基于命令行的生产者、消费者工具对应的配置文件,常用如下:
脚本 | 功能 |
---|---|
server.properties | kafka实例的配置文件,配置kafka最重要的配置文件; |
log4j.properties | kafka日志配置; |
zookeeper.properties | 自带zk的配置文件; |
producer.properties | 基于命令行的生产者工具配置文件;(测试用) |
consumer.properties | 基于命令行的消费者工具配置文件;(测试用) |
connect-standalone.properties | 自带单点connector的配置文件,存放connector的序列化方式、监听broker的地址端口等通用配置;(测试用) |
connect-file-source.properties | 配置文件读取connector,用于逐行读取文件,导入入topic;(测试用) |
connect-file-sink.properties | 配置文件写入connector,用于将topic中的数据导出到topic中;(测试用) |
启动zk服务,默认端口:2181
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2018-01-16 20:22:52,327] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
启动kafka服务,默认端口:9092
> bin/kafka-server-start.sh config/server.properties
[2018-01-16 20:23:52,758] INFO KafkaConfig values:
...
经过如上两步,我们就启动了一个简易的kafka集群(具有1个zookeeper实例和1个kafka实例的集群)
查看zookeeper中存放的kafka信息
> bin/zookeeper-shell.sh localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is disabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
ls /
[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
ls /brokers
[ids, topics, seqid]
ls /brokers/topics
[test]
ls /brokers/ids
[0]
"ls /"命令列出了zk根节点下的所有元素,可以看到kafka在zk中存放了集群(cluster)、实例(brokers)、消费者(consumers)等信息;zookeeper服务作为kafka的元数据管理服务,因而每次对kafka服务操作都需要指定zookeeper服务的地址,以便于获取kafka的元数据,连接到正确的kafka集群;
创建topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
创建一个名为test的topic,包含1个复本,1个分区;
查看集群中的所有topic
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
启动生产者,并写入测试消息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> Hello World1
> I'm a programer
启动消费者,接收消息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Hello World1
I'm a programer
可以看到生产者写入的消息,都能够立刻被消费者接收并打印出来。需要注意的是,生产者和消费者通过topic这个概念来建立联系,只有消费者指定与生产者相同的topic,才能够消费其产生的消费;
删除topic
> bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
建立多个kafka实例的集群
拷贝配置文件,修改实例ID、日志目录、监听端口:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
修改配置项如下:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
启动实例:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
新建topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".
新建一个名为my-replicated-topic的topic,有3个副本和1个分区;
查看topic状态描述
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
topic上有几个partition,就会展示几行记录;字段含义如下:
- leader:标识当前partition的leader节点是那个,通过broker.id标识;一个partition只有一个leader节点,负责接收和处理读写请求;
- replicas:标识当前partition的所有副本所在的节点,无论节点是否是leader节点,也无论节点是否"存活",通过broker.id标识;
- isr:标识存活且与leader节点同步的节点,即可用的副本(包括leader借点);通过broker.id标识;
查看最初创建的test状态描述:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
可以看到,因为test只有1个副本、1个partition,所以只能分布在一个实例上;
模拟leader切换
对于包含多个副本的topic而言,当一个副本所在的实例不可用时,将会从其它可用副本中选择一个作为leader;
在集群节点都正常的情况下,查看topic的状态:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
关掉broker.id=0的实例,再次查看,发现leader节点已经切换,同时isr中不包含"不可用"节点0:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2
重新启动broker.id=0的实例,再次查看,发现isr中包括了节点0,说明可用。
使用kafka connect导入/导出数据
kafka connect是kafka与外部系统交互的工具,通过运行不同的connector,实现与不同外部系统的交互,包括数据的导入/导出。如下模拟从文件导入数据到kafka,以及从kafka导出数据到文件;
- 首先,创建文件,写入测试数据:
> cd kafka_2.11-1.0.0
> echo "Hello World" > test.txt
注:一定是在kafka根目录中创建名为test.txt的文件,否则不会读取;
2.启动2个单点的connector,这两个connector都是kafka自带的,一个用于读取文件写入topic,另一个用于将topic中数据导出到文件;
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
[2018-01-17 10:37:32,568] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:65)
connect-console-source.properties文件内容:
name=local-console-source
# connector入口
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
# connector关联的topic
topic=connect-test
connect-console-sink.properties文件内容:
name=local-console-sink
# connector入口
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
# connector关联的topic
topics=connect-test
在kafka根目录可以看到生成了名为test.sink.txt的文件,其中的内容即为test.txt中的内容,持续向test.txt中append内容,test.sink.txt中的内容也随之append;
注:因为同步过程是监听文件的增量变化,如果改变test.txt中旧有内容,则旧数据不发生变化,覆盖同一行旧数据,貌似会产生一个空行;
整个同步过程是:
test.txt -> FileStreamSourceConnector -> connect-test(topic) -> FileStreamSinkConnector -> test.sink.txt
由于是通过topic存放过往数据,因此在topic中也可以看到相应的数据:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"Hello World"}
{"schema":{"type":"string","optional":false},"payload":""}
{"schema":{"type":"string","optional":false},"payload":"Hello World1"}
{"schema":{"type":"string","optional":false},"payload":"Hello World2"}
使用kafka stream处理数据
参考官方文档:http://kafka.apache.org/10/documentation/streams/quickstart
kafka生态
kafka周边包含很多组件,参看wiki:https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem