kafka官方文档学习笔记2--QuickStart-阿里云开发者社区

开发者社区> 微服务> 正文
登录阅读全文

kafka官方文档学习笔记2--QuickStart

简介: 1)kafka的下载&安装; 2)kafka中bin目录中脚本和config目录下配置文件用途介绍; 3)topic的创建、删除;

下载kafka

https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz

解压安装包

> tar -xzf kafka_2.11-1.0.0.tgz
> cd kafka_2.11-1.0.0/bin

查看bin目录下主要几个脚本功能如下:

脚本 功能
kafka-server-start.sh 启动kafka服务器;
kafka-server-stop.sh 停止kafka服务器;
kafka-topics.sh topic管理;
kafka-console-producer.sh 基于命令行的生产者;
kafka-console-consumer.sh 基于命令行的消费者;
kafka-run-class.sh 运行java类的脚本,由kafka-server-start.sh和kafka-server-stop.sh、kafka-topics.sh等脚本调用;
zookeeper-server-start.sh 启动kafka自带的zookeeper服务器;
zookeeper-server-stop.sh 停止kafka自带的zookeeper服务器;
zookeeper-shell.sh 在命令行连接zookeeper的客户端工具;
connect-standalone.sh 在命令行启动单点的connector;
connect-distributed.sh 在命令行启动基于集群connector;

注:kafka的安装包除了包括kafka自身的工具以外,也包括了一系列简易的zookeeper工具,能够通过zookeeper-server-start.sh脚本启动简易的单点zookeeper实例,供kafka使用。但一般仅限于测试环境使用;

config目录下存放的是kafka服务、自带zk服务以及基于命令行的生产者、消费者工具对应的配置文件,常用如下:

脚本 功能
server.properties kafka实例的配置文件,配置kafka最重要的配置文件;
log4j.properties kafka日志配置;
zookeeper.properties 自带zk的配置文件;
producer.properties 基于命令行的生产者工具配置文件;(测试用)
consumer.properties 基于命令行的消费者工具配置文件;(测试用)
connect-standalone.properties 自带单点connector的配置文件,存放connector的序列化方式、监听broker的地址端口等通用配置;(测试用)
connect-file-source.properties 配置文件读取connector,用于逐行读取文件,导入入topic;(测试用)
connect-file-sink.properties 配置文件写入connector,用于将topic中的数据导出到topic中;(测试用)

启动zk服务,默认端口:2181

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2018-01-16 20:22:52,327] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

启动kafka服务,默认端口:9092

> bin/kafka-server-start.sh config/server.properties
[2018-01-16 20:23:52,758] INFO KafkaConfig values:
...

经过如上两步,我们就启动了一个简易的kafka集群(具有1个zookeeper实例和1个kafka实例的集群)

查看zookeeper中存放的kafka信息

> bin/zookeeper-shell.sh localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
ls /
[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
ls /brokers
[ids, topics, seqid]
ls /brokers/topics
[test]
ls /brokers/ids
[0]

"ls /"命令列出了zk根节点下的所有元素,可以看到kafka在zk中存放了集群(cluster)、实例(brokers)、消费者(consumers)等信息;zookeeper服务作为kafka的元数据管理服务,因而每次对kafka服务操作都需要指定zookeeper服务的地址,以便于获取kafka的元数据,连接到正确的kafka集群;

创建topic

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

创建一个名为test的topic,包含1个复本,1个分区;

查看集群中的所有topic

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

启动生产者,并写入测试消息

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> Hello World1
> I'm a programer

启动消费者,接收消息

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Hello World1
I'm a programer

可以看到生产者写入的消息,都能够立刻被消费者接收并打印出来。需要注意的是,生产者和消费者通过topic这个概念来建立联系,只有消费者指定与生产者相同的topic,才能够消费其产生的消费;

删除topic

> bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

建立多个kafka实例的集群

拷贝配置文件,修改实例ID、日志目录、监听端口:

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

修改配置项如下:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

启动实例:

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

新建topic

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".

新建一个名为my-replicated-topic的topic,有3个副本和1个分区;

查看topic状态描述

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: my-replicated-topic    Partition: 0    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2

topic上有几个partition,就会展示几行记录;字段含义如下:

  • leader:标识当前partition的leader节点是那个,通过broker.id标识;一个partition只有一个leader节点,负责接收和处理读写请求;
  • replicas:标识当前partition的所有副本所在的节点,无论节点是否是leader节点,也无论节点是否"存活",通过broker.id标识;
  • isr:标识存活且与leader节点同步的节点,即可用的副本(包括leader借点);通过broker.id标识;

查看最初创建的test状态描述:

>  bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

可以看到,因为test只有1个副本、1个partition,所以只能分布在一个实例上;

模拟leader切换

对于包含多个副本的topic而言,当一个副本所在的实例不可用时,将会从其它可用副本中选择一个作为leader;
在集群节点都正常的情况下,查看topic的状态:

>  bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: my-replicated-topic    Partition: 0    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2

关掉broker.id=0的实例,再次查看,发现leader节点已经切换,同时isr中不包含"不可用"节点0:

>  bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 0,1,2    Isr: 1,2

重新启动broker.id=0的实例,再次查看,发现isr中包括了节点0,说明可用。

使用kafka connect导入/导出数据

kafka connect是kafka与外部系统交互的工具,通过运行不同的connector,实现与不同外部系统的交互,包括数据的导入/导出。如下模拟从文件导入数据到kafka,以及从kafka导出数据到文件;

  1. 首先,创建文件,写入测试数据:
> cd kafka_2.11-1.0.0
> echo "Hello World" > test.txt

注:一定是在kafka根目录中创建名为test.txt的文件,否则不会读取;

2.启动2个单点的connector,这两个connector都是kafka自带的,一个用于读取文件写入topic,另一个用于将topic中数据导出到文件;

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
[2018-01-17 10:37:32,568] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:65)

connect-console-source.properties文件内容:

name=local-console-source
# connector入口
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
# connector关联的topic
topic=connect-test

connect-console-sink.properties文件内容:

name=local-console-sink
# connector入口
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
# connector关联的topic
topics=connect-test

在kafka根目录可以看到生成了名为test.sink.txt的文件,其中的内容即为test.txt中的内容,持续向test.txt中append内容,test.sink.txt中的内容也随之append;

注:因为同步过程是监听文件的增量变化,如果改变test.txt中旧有内容,则旧数据不发生变化,覆盖同一行旧数据,貌似会产生一个空行;

整个同步过程是:

test.txt ->   FileStreamSourceConnector -> connect-test(topic) -> FileStreamSinkConnector -> test.sink.txt

由于是通过topic存放过往数据,因此在topic中也可以看到相应的数据:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"Hello World"}
{"schema":{"type":"string","optional":false},"payload":""}
{"schema":{"type":"string","optional":false},"payload":"Hello World1"}
{"schema":{"type":"string","optional":false},"payload":"Hello World2"}

使用kafka stream处理数据

参考官方文档:http://kafka.apache.org/10/documentation/streams/quickstart

kafka生态

kafka周边包含很多组件,参看wiki:https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
微服务
使用钉钉扫一扫加入圈子
+ 订阅

构建可靠、高效、易扩展的技术基石

其他文章