kafka简单安装部署

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 一、安装、配置  1.下载 kafka是由linkedin开源的,但是已经托管在了apache,所以需要从apache下载,http://kafka.apache.org/downloads.html。

一、安装、配置

 1.下载

kafka是由linkedin开源的,但是已经托管在了apache,所以需要从apache下载,http://kafka.apache.org/downloads.html。
安装推荐的版本安装就可以了,例如下面0.10.0.0是最新的release,也是推荐稳定的release。
建议下载scala 2.11版本(kafka是scala语言开发的)

Releases
0.10.0.0 is the latest release. The current stable version is 0.10.0.0. 
You can verify your download by following these procedures and using these KEYS. 

0.10.0.0
Release Notes 
Source download: kafka-0.10.0.0-src.tgz (asc, md5) 
Binary downloads: 
Scala 2.10  - kafka_2.10-0.10.0.0.tgz (asc, md5) 
Scala 2.11  - kafka_2.11-0.10.0.0.tgz (asc, md5) 
We build for multiple versions of Scala. This only matters if you are using Scala and you want a version built for the same Scala version you use. Otherwise any version should work (2.11 is recommended).

 

2. kafka目录结构

解压后:

drwxr-xr-x 3 root root  4096 Sep  3  2015 bin
drwxr-xr-x 2 root root  4096 Sep  3  2015 config
drwxr-xr-x 2 root root  4096 Sep  3  2015 libs
-rw-r--r-- 1 root root 11358 Sep  3  2015 LICENSE
-rw-r--r-- 1 root root   162 Sep  3  2015 NOTICE

 

bin: 可执行文件,例如启动关闭kafka,生产者、消费者客户端,zookeeper启动(kafka依赖zookeeper)等等
config: kafka的相关配置
libs: kafka的类库

3. 配置

kafka有很多配置选项,本次只说明重要的或者必备的一些配置。

[@zw_94_190 /opt/soft/kafka/config]# cat server.properties | grep -v "#" | grep -v "^$"
broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

 

broker.id broker的唯一标识 0
log.dirs 看着名字是日志目录,其实是kafka的持久化文件目录(可以配置多个用逗号隔开) /tmp/kafka-logs
zookeeper.connect zookeeper集群地址(多个地址用逗号隔开) localhost:2181
host.name broker的hostname,类似于bind,一般绑定内网网卡ip Null
num.partitions 默认每个topic的分片数(如果没有指定的话) 1
auto.create.topics.enable 是否自动创建topic,例如producer publish不存在的topic就会创建出来,类似es自动创建索引(一般线上系统都false) true
default.replication.factor 默认副本数量 1

4. 完整步骤

cd /opt/soft
wget http://apache.fayea.com/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
tar -xvf kafka_2.10-0.8.2.2.tgz
ln -s kafka_2.10-0.8.2.2 kafka
cd kafka
mkdir -p /opt/soft/kafka/data/kafka-logs

 

5. 设置环境变量

export KAFKA_HOME=/opt/soft/kafka
export PATH=$PATH:$KAFKA_HOME/bin

 

二、单机单broker(单点)

1. 拓扑

2. 部署

(1). 修改配置文件 ${kafka_home}/config/server.properties

修改了log.dirs、zookeeper地址、num.partitions个数

[@zw_94_190 /opt/soft/kafka/config]# cat server.properties | grep -v "#" | grep -v "^$"
broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/soft/kafka/data/kafka-logs
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181

 

(2). 启动
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties &

 

&是用守护进程的形式启动。

${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties & 
[1] 31596
[@zw_94_190 /opt/soft/kafka/bin]# [2016-07-02 16:09:32,436] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,475] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,476] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,476] INFO Property log.dirs is overridden to /opt/soft/kafka/kafka-logs (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,476] INFO Property log.retention.check.interval.ms is overridden to 300000 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,476] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,476] INFO Property log.segment.bytes is overridden to 1073741824 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,476] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,477] INFO Property num.network.threads is overridden to 3 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,477] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,477] INFO Property num.recovery.threads.per.data.dir is overridden to 1 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,477] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,477] INFO Property socket.receive.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,477] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,478] INFO Property socket.send.buffer.bytes is overridden to 102400 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,478] INFO Property zookeeper.connect is overridden to 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,478] INFO Property zookeeper.connection.timeout.ms is overridden to 6000 (kafka.utils.VerifiableProperties)
[2016-07-02 16:09:32,519] INFO [Kafka Server 0], starting (kafka.server.KafkaServer)
[2016-07-02 16:09:32,521] INFO [Kafka Server 0], Connecting to zookeeper on 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 (kafka.server.KafkaServer)
[2016-07-02 16:09:32,534] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2016-07-02 16:09:32,543] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:host.name=zw_94_190 (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:java.version=1.7.0_45 (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:java.home=/opt/soft/jdk1.7.0_45/jre (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:java.class.path=:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib:/opt/soft/kafka/bin/../core/build/dependant-libs-2.10.4*/*.jar:/opt/soft/kafka/bin/../examples/build/libs//kafka-examples*.jar:/opt/soft/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/opt/soft/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/opt/soft/kafka/bin/../clients/build/libs/kafka-clients*.jar:/opt/soft/kafka/bin/../libs/jopt-simple-3.2.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2-javadoc.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2-scaladoc.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2-sources.jar:/opt/soft/kafka/bin/../libs/kafka_2.10-0.8.2.2-test.jar:/opt/soft/kafka/bin/../libs/kafka-clients-0.8.2.2.jar:/opt/soft/kafka/bin/../libs/log4j-1.2.16.jar:/opt/soft/kafka/bin/../libs/lz4-1.2.0.jar:/opt/soft/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/soft/kafka/bin/../libs/scala-library-2.10.4.jar:/opt/soft/kafka/bin/../libs/slf4j-api-1.7.6.jar:/opt/soft/kafka/bin/../libs/slf4j-log4j12-1.6.1.jar:/opt/soft/kafka/bin/../libs/snappy-java-1.1.1.7.jar:/opt/soft/kafka/bin/../libs/zkclient-0.3.jar:/opt/soft/kafka/bin/../libs/zookeeper-3.4.6.jar:/opt/soft/kafka/bin/../core/build/libs/kafka_2.10*.jar (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:os.version=2.6.32-279.el6.x86_64 (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,543] INFO Client environment:user.dir=/opt/soft/kafka_2.10-0.8.2.2/bin (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,544] INFO Initiating client connection, connectString=10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@2abd2c1e (org.apache.zookeeper.ZooKeeper)
[2016-07-02 16:09:32,565] INFO Opening socket connection to server 10.10.53.162/10.10.53.162:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2016-07-02 16:09:32,569] INFO Socket connection established to 10.10.53.162/10.10.53.162:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2016-07-02 16:09:32,581] INFO Session establishment complete on server 10.10.53.162/10.10.53.162:2181, sessionid = 0x25380d7e74c9b71, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2016-07-02 16:09:32,583] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2016-07-02 16:09:32,772] INFO Loading logs. (kafka.log.LogManager)
[2016-07-02 16:09:32,806] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log)
[2016-07-02 16:09:32,814] INFO Logs loading complete. (kafka.log.LogManager)
[2016-07-02 16:09:32,814] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2016-07-02 16:09:32,817] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2016-07-02 16:09:32,839] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2016-07-02 16:09:32,840] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)
[2016-07-02 16:09:32,895] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2016-07-02 16:09:32,920] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2016-07-02 16:09:33,089] INFO Registered broker 0 at path /brokers/ids/0 with address zw_94_190:9092. (kafka.utils.ZkUtils$)
[2016-07-02 16:09:33,094] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-07-02 16:09:33,101] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2016-07-02 16:09:33,248] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2016-07-02 16:09:33,284] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)

 

(3) 创建topic
${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 --replication-factor 1 --partitions 1 --topic test_topic
Created topic "test_topic".
[2016-07-02 16:13:54,131] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test_topic,0] (kafka.server.ReplicaFetcherManager)
[2016-07-02 16:13:54,136] INFO Completed load of log test_topic-0 with log end offset 0 (kafka.log.Log)
[2016-07-02 16:13:54,140] INFO Created log for partition [test_topic,0] in /opt/soft/kafka/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 1073741824, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy -> delete, unclean.leader.election.enable -> true, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000, segment.jitter.ms -> 0}. (kafka.log.LogManager)
[2016-07-02 16:13:54,141] WARN Partition [test_topic,0] on broker 0: No checkpointed highwatermark is found for partition [test_topic,0] (kafka.cluster.Partition

 

(4) 查看topic列表
${KAFKA_HOME}/bin/kafka-topics.sh --list --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181

test_topic

 

(5) producer
${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

 

[2016-07-02 16:17:34,545] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
fff
[2016-07-02 16:17:41,705] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
fff
[2016-07-02 16:19:27,978] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
hello
world
(6) consumer
${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 --topic test_topic --from-beginning
fff
fff
hello
world

后启动的consumer依然能消费到消息,证明消息可以持久化,有关内部的实现原理以后的文章可能会介绍。

三、单机多broker(伪分布式)

1. 拓扑

2. 部署

添加三个配置文件(主要区别在broker.id,port,log.dirs,剩下的和第二章没有任何区别)

(1) 配置

(a) server-9093.properties

broker.id=1
port=9093
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/soft/kafka/data/kafka-0-logs
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181

 

(b) server-9094.properties

broker.id=2
port=9094
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/soft/kafka/data/kafka-1-logs
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181

 

(c) server-9095.properties

broker.id=3
port=9095
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/soft/kafka/data/kafka-2-logs
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181

 

(2) 启动三个broker
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server-9093.properties &
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server-9094.properties &
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server-9095.properties &
(3) 创建topic

 

${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 --replication-factor 3 --partitions 1 --topic hello_topic

 

(4) 查看topic列表

 

${KAFKA_HOME}/bin/kafka-topics.sh --list --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181

 

(5) producer

 

${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic hello_topic

 

(6) consumer

 

${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper 10.10.53.159:2181,10.10.53.162:2181,10.16.14.182:2181 --topic hello_topic --from-beginning
 

 

四、多机多broker(分布式)

1. 拓扑

2. 部署

和第三章差不多,只不过用了多台机器,最好加上一个host.name

五、 一些总结

1.一套zookeper配置一套broker。
2.生产环境要使用多机多实例。
3.最好不要使用自动创建topic功能。
4.producer依赖的资源是broker-list。(对应的bin下的可执行文件)
5.consumer依赖的资源是zookeeper-list。(对应的bin下的可执行文件)
6.目前看broker之间不会通信,分布式的实现比较依赖于zookeeper。

相关文章
|
6月前
|
消息中间件 监控 数据可视化
Linux安装Kafka图形化界面
Linux安装Kafka图形化界面
274 4
|
2天前
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
4月前
|
消息中间件 Ubuntu Java
在Ubuntu 18.04上安装Apache Kafka的方法
在Ubuntu 18.04上安装Apache Kafka的方法
213 0
|
1月前
|
消息中间件 Ubuntu Java
|
1月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
2月前
|
消息中间件 Ubuntu Java
|
3月前
|
消息中间件 Ubuntu Java
Kafka安装部署
Kafka安装部署
|
3月前
|
消息中间件 Java Linux
linux 之centos7安装kafka;;;;;待补充,未完成
linux 之centos7安装kafka;;;;;待补充,未完成
|
4月前
|
消息中间件 监控 Java
【一键解锁!】Kafka Manager 部署与测试终极指南 —— 从菜鸟到高手的必经之路!
【8月更文挑战第9天】随着大数据技术的发展,Apache Kafka 成为核心组件,用于处理实时数据流。Kafka Manager 提供了简洁的 Web 界面来管理和监控 Kafka 集群。本文介绍部署步骤及示例代码,助您快速上手。首先确认已安装 Java 和 Kafka。
668 4
|
5月前
|
消息中间件 Java Kafka
kafka Linux环境搭建安装及命令创建队列生产消费消息
kafka Linux环境搭建安装及命令创建队列生产消费消息
122 4