一、安装、配置
1.下载
kafka是由linkedin开源的,但是已经托管在了apache,所以需要从apache下载,http://kafka.apache.org/downloads.html。
安装推荐的版本安装就可以了,例如下面0.10.0.0是最新的release,也是推荐稳定的release。
建议下载scala 2.11版本(kafka是scala语言开发的)
Releases 0.10.0.0 is the latest release. The current stable version is 0.10.0.0. You can verify your download by following these procedures and using these KEYS. 0.10.0.0 Release Notes Source download: kafka-0.10.0.0-src.tgz (asc, md5) Binary downloads: Scala 2.10 - kafka_2.10-0.10.0.0.tgz (asc, md5) Scala 2.11 - kafka_2.11-0.10.0.0.tgz (asc, md5) We build for multiple versions of Scala. This only matters if you are using Scala and you want a version built for the same Scala version you use. Otherwise any version should work (2.11 is recommended).
2. kafka目录结构
解压后:
drwxr-xr-x 3 root root 4096 Sep 3 2015 bin drwxr-xr-x 2 root root 4096 Sep 3 2015 config drwxr-xr-x 2 root root 4096 Sep 3 2015 libs -rw-r--r-- 1 root root 11358 Sep 3 2015 LICENSE -rw-r--r-- 1 root root 162 Sep 3 2015 NOTICE
bin: 可执行文件,例如启动关闭kafka,生产者、消费者客户端,zookeeper启动(kafka依赖zookeeper)等等
config: kafka的相关配置
libs: kafka的类库
3. 配置
kafka有很多配置选项,本次只说明重要的或者必备的一些配置。
[@zw_94_190 /opt/soft/kafka/config]# cat server.properties | grep -v "#" | grep -v "^$" broker.id=0 port=9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000
broker.id | broker的唯一标识 | 0 |
log.dirs | 看着名字是日志目录,其实是kafka的持久化文件目录(可以配置多个用逗号隔开) | /tmp/kafka-logs |
zookeeper.connect | zookeeper集群地址(多个地址用逗号隔开) | localhost:2181 |
host.name | broker的hostname,类似于bind,一般绑定内网网卡ip | Null |
num.partitions | 默认每个topic的分片数(如果没有指定的话) | 1 |
auto.create.topics.enable | 是否自动创建topic,例如producer publish不存在的topic就会创建出来,类似es自动创建索引(一般线上系统都false) | true |
default.replication.factor | 默认副本数量 | 1 |
4. 完整步骤
cd /opt/soft wget http://apache.fayea.com/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz tar -xvf kafka_2.10-0.8.2.2.tgz ln -s kafka_2.10-0.8.2.2 kafka cd kafka mkdir -p /opt/soft/kafka/data/kafka-logs
5. 设置环境变量
export KAFKA_HOME=/opt/soft/kafka export PATH=$PATH:$KAFKA_HOME/bin
二、单机单broker(单点)
1. 拓扑
2. 部署
(1). 修改配置文件 ${kafka_home}/config/server.properties
修改了log.dirs、zookeeper地址、num.partitions个数
[@zw_94_190 /opt/soft/kafka/config]# cat server.properties | grep -v "#" | grep -v "^$" broker.id=0 port=9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/soft/kafka/data/kafka-logs num.partitions=2 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181
(2). 启动
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties &
&是用守护进程的形式启动。
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties & [1] 31596 [@zw_94_190 /opt/soft/kafka/bin]# [2016-07-02 16:09:32,436] INFO Verifying properties (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,475] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,476] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,476] INFO Property log.dirs is overridden to /opt/soft/kafka/kafka-logs (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,476] INFO Property log.retention.check.interval.ms is overridden to 300000 (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,476] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,476] INFO Property log.segment.bytes is overridden to 1073741824 (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,476] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,477] INFO Property num.network.threads is overridden to 3 (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,477] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,477] INFO Property num.recovery.threads.per.data.dir is overridden to 1 (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,477] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,477] INFO Property socket.receive.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,477] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,478] INFO Property socket.send.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,478] INFO Property zookeeper.connect is overridden to 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,478] INFO Property zookeeper.connection.timeout.ms is overridden to 6000 (kafka.utils.VerifiableProperties) [2016-07-02 16:09:32,519] INFO [Kafka Server 0], starting (kafka.server.KafkaServer) [2016-07-02 16:09:32,521] INFO [Kafka Server 0], Connecting to zookeeper on 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 (kafka.server.KafkaServer) [2016-07-02 16:09:32,534] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2016-07-02 16:09:32,543] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper) [2016-07-02 16:09:32,543] INFO Client environment:host.name=zw_94_190 (org.apache.zookeeper.ZooKeeper) [2016-07-02 16:09:32,543] INFO Client environment:java.version=1.7.0_45 (org.apache.zookeeper.ZooKeeper) [2016-07-02 16:09:32,543] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper) [2016-07-02 16:09:32,543] INFO Client environment:java.home=/opt/soft/jdk1.7.0_45/jre (org.apache.zookeeper.ZooKeeper) [2016-07-02 16:09:32,543] INFO Client environment:java.class.path=:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/opt/soft/kafka/bin/../core/build/dependant-libs-2.10.4*/*.jar:/opt/soft/kafka/bin/../examples/build/libs//kafka-examples*.jar:/opt/soft/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/opt/soft/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/opt/soft/kafka/bin/../clients/build/libs/kafka-clients*.jar:/opt/soft/kafka/bin/../libs/jopt-simple-3.2.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2-javadoc.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2-scaladoc.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2-sources.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2-test.jar:/opt/soft/kafka/bin/../libs/kafka-clients-0.8.2.2.jar:/opt/soft/kafka/bin/../libs/log4j-1.2.16.jar:/opt/soft/kafka/bin/../libs/lz4-1.2.0.jar:/opt/soft/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/soft/kafka/bin/../libs/scala-library-2.10.4.jar:/opt/soft/kafka/bin/../libs/slf4j-api-1.7.6.jar:/opt/soft/kafka/bin/../libs/slf4j-log4j12-1.6.1.jar:/opt/soft/kafka/bin/../libs/snappy-java-1.1.1.7.jar:/opt/soft/kafka/bin/../libs/zkclient-0.3.jar:/opt/soft/kafka/bin/../libs/zookeeper-3.4.6.jar:/opt/soft/kafka/bin/../core/build/libs/kafka_2.10*.jar (org.apache.zookeeper.ZooKeeper) [2016-07-02 16:09:32,543] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper) [2016-07-02 16:09:32,543] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper) [2016-07-02 16:09:32,543] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper) [2016-07-02 16:09:32,543] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper) [2016-07-02 16:09:32,543] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper) [2016-07-02 16:09:32,543] INFO Client environment:os.version=2.6.32-279.el6.x86_64 (org.apache.zookeeper.ZooKeeper) [2016-07-02 16:09:32,543] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper) [2016-07-02 16:09:32,543] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper) [2016-07-02 16:09:32,543] INFO Client environment:user.dir=/opt/soft/kafka_2.10-0.8.2.2/bin (org.apache.zookeeper.ZooKeeper) [2016-07-02 16:09:32,544] INFO Initiating client connection, connectString=10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@2abd2c1e (org.apache.zookeeper.ZooKeeper) [2016-07-02 16:09:32,565] INFO Opening socket connection to server 10.10.53.162/10.10.53.162:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2016-07-02 16:09:32,569] INFO Socket connection established to 10.10.53.162/10.10.53.162:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2016-07-02 16:09:32,581] INFO Session establishment complete on server 10.10.53.162/10.10.53.162:2181, sessionid = 0x25380d7e74c9b71, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2016-07-02 16:09:32,583] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2016-07-02 16:09:32,772] INFO Loading logs. (kafka.log.LogManager) [2016-07-02 16:09:32,806] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log) [2016-07-02 16:09:32,814] INFO Logs loading complete. (kafka.log.LogManager) [2016-07-02 16:09:32,814] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2016-07-02 16:09:32,817] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2016-07-02 16:09:32,839] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor) [2016-07-02 16:09:32,840] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer) [2016-07-02 16:09:32,895] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2016-07-02 16:09:32,920] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2016-07-02 16:09:33,089] INFO Registered broker 0 at path /brokers/ids/0 with address zw_94_190:9092. (kafka.utils.ZkUtils$) [2016-07-02 16:09:33,094] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) [2016-07-02 16:09:33,101] INFO [Kafka Server 0], started (kafka.server.KafkaServer) [2016-07-02 16:09:33,248] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager) [2016-07-02 16:09:33,284] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
(3) 创建topic
${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 --replication-factor 1 --partitions 1 --topic test_topic
Created topic "test_topic". [2016-07-02 16:13:54,131] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test_topic,0] (kafka.server.ReplicaFetcherManager) [2016-07-02 16:13:54,136] INFO Completed load of log test_topic-0 with log end offset 0 (kafka.log.Log) [2016-07-02 16:13:54,140] INFO Created log for partition [test_topic,0] in /opt/soft/kafka/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 1073741824, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> delete, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000, segment.jitter.ms -> 0}. (kafka.log.LogManager) [2016-07-02 16:13:54,141] WARN Partition [test_topic,0] on broker 0: No checkpointed highwatermark is found for partition [test_topic,0] (kafka.cluster.Partition
(4) 查看topic列表
${KAFKA_HOME}/bin/kafka-topics.sh --list --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 test_topic
(5) producer
${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
[2016-07-02 16:17:34,545] WARN Property topic is not valid (kafka.utils.VerifiableProperties) fff [2016-07-02 16:17:41,705] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) fff [2016-07-02 16:19:27,978] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) hello world
(6) consumer
${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 --topic test_topic --from-beginning
fff fff hello world
后启动的consumer依然能消费到消息,证明消息可以持久化,有关内部的实现原理以后的文章可能会介绍。
三、单机多broker(伪分布式)
1. 拓扑
2. 部署
添加三个配置文件(主要区别在broker.id,port,log.dirs,剩下的和第二章没有任何区别)
(1) 配置
(a) server-9093.properties
broker.id=1 port=9093 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/soft/kafka/data/kafka-0-logs num.partitions=2 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181
(b) server-9094.properties
broker.id=2 port=9094 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/soft/kafka/data/kafka-1-logs num.partitions=2 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181
(c) server-9095.properties
broker.id=3 port=9095 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/soft/kafka/data/kafka-2-logs num.partitions=2 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181
(2) 启动三个broker
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server-9093.properties & ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server-9094.properties & ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server-9095.properties &
(3) 创建topic
${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 --replication-factor 3 --partitions 1 --topic hello_topic
(4) 查看topic列表
${KAFKA_HOME}/bin/kafka-topics.sh --list --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181
(5) producer
${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic hello_topic
(6) consumer
${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 --topic hello_topic --from-beginning
四、多机多broker(分布式)
1. 拓扑
2. 部署
和第三章差不多,只不过用了多台机器,最好加上一个host.name
五、 一些总结
1.一套zookeper配置一套broker。
2.生产环境要使用多机多实例。
3.最好不要使用自动创建topic功能。
4.producer依赖的资源是broker-list。(对应的bin下的可执行文件)
5.consumer依赖的资源是zookeeper-list。(对应的bin下的可执行文件)
6.目前看broker之间不会通信,分布式的实现比较依赖于zookeeper。