安装JDK
由于Kafka是用Scala语言开发的,运行在JVM上,因此在安装Kafka之前需要先安装JDK.
这里就不啰嗦了,戳这里—> : Java-CentoOS 7安装JDK8 (rpm格式 和 tar.gz格式)& 多JDK设置默认的Java 版本
[root@artisan ~]# java -version java version "1.8.0_221" Java(TM) SE Runtime Environment (build 1.8.0_221-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.221-b11, mixed mode) [root@artisan ~]#
安装zookeeper
kafka是基于zookeeper协调的分布式消息系统,所以zk也是必不可少的. kafka内置了一个zk, 不建议使用。
zookeeper 也是基于java开发的,所以也是需要依赖JDK的。
下载地址 -->: https://archive.apache.org/dist/zookeeper/
这里我们下载 3.4.14版本的, 事实上3.x版本的都可以。
[root@artisan zookeeper-3.4.14]# pwd /usr/local/zookeeper-3.4.14 # copy配置文件 [root@artisan zookeeper-3.4.14]# cp conf/zoo_sample.cfg conf/zoo.cfg # 启动 zk [root@artisan zookeeper-3.4.14]# ./bin/zkServer.sh start ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper-3.4.14/bin/../conf/zoo.cfg Starting zookeeper ... STARTED # 查看进程 QuorumPeerMain --> zk的进程 [root@artisan zookeeper-3.4.14]# jps 3409 QuorumPeerMain 3425 Jps # 查看状态 [root@artisan zookeeper-3.4.14]# ./bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper-3.4.14/bin/../conf/zoo.cfg Mode: standalone [root@artisan zookeeper-3.4.14]# # 客户端连接 [root@artisan zookeeper-3.4.14]# ./bin/zkCli.sh Connecting to localhost:2181 2019-11-17 07:34:32,522 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT 2019-11-17 07:34:32,527 [myid:] - INFO [main:Environment@100] - Client environment:host.name=192.168.18.130 2019-11-17 07:34:32,527 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.8.0_221 2019-11-17 07:34:32,530 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation 2019-11-17 07:34:32,530 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/usr/java/jdk1.8.0_221-amd64/jre 2019-11-17 07:34:32,530 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/usr/local/zookeeper-3.4.14/bin/../zookeeper-server/target/classes:/usr/local/zookeeper-3.4.14/bin/../build/classes:/usr/local/zookeeper-3.4.14/bin/../zookeeper-server/target/lib/*.jar:/usr/local/zookeeper-3.4.14/bin/../build/lib/*.jar:/usr/local/zookeeper-3.4.14/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/local/zookeeper-3.4.14/bin/../lib/slf4j-api-1.7.25.jar:/usr/local/zookeeper-3.4.14/bin/../lib/netty-3.10.6.Final.jar:/usr/local/zookeeper-3.4.14/bin/../lib/log4j-1.2.17.jar:/usr/local/zookeeper-3.4.14/bin/../lib/jline-0.9.94.jar:/usr/local/zookeeper-3.4.14/bin/../lib/audience-annotations-0.5.0.jar:/usr/local/zookeeper-3.4.14/bin/../zookeeper-3.4.14.jar:/usr/local/zookeeper-3.4.14/bin/../zookeeper-server/src/main/resources/lib/*.jar:/usr/local/zookeeper-3.4.14/bin/../conf:.:/usr/java/jdk1.8.0_221-amd64/jre/lib:/usr/java/jdk1.8.0_221-amd64/lib:/usr/java/jdk1.8.0_221-amd64/lib/tools.jar 2019-11-17 07:34:32,530 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 2019-11-17 07:34:32,530 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp 2019-11-17 07:34:32,530 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=<NA> 2019-11-17 07:34:32,530 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux 2019-11-17 07:34:32,531 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd64 2019-11-17 07:34:32,531 [myid:] - INFO [main:Environment@100] - Client environment:os.version=3.10.0-123.el7.x86_64 2019-11-17 07:34:32,531 [myid:] - INFO [main:Environment@100] - Client environment:user.name=root 2019-11-17 07:34:32,531 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/root 2019-11-17 07:34:32,531 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/usr/local/zookeeper-3.4.14 2019-11-17 07:34:32,534 [myid:] - INFO [main:ZooKeeper@442] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@25f38edc Welcome to ZooKeeper! 2019-11-17 07:34:32,624 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1025] - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) JLine support is enabled 2019-11-17 07:34:32,940 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@879] - Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session 2019-11-17 07:34:33,066 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1299] - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x1000014c05c0000, negotiated timeout = 30000 WATCHER:: WatchedEvent state:SyncConnected type:None path:null # 查看zk的根目录信息,默认只有 zookeeper 1个 [zk: localhost:2181(CONNECTED) 0] ls / [zookeeper] [zk: localhost:2181(CONNECTED) 1]
安装kafka
下载解压
下载地址: https://kafka.apache.org/downloads
先说下 kafka版本的定义
kafka_2.11‐1.1.0 : 2.11 是 Scala的版本 ,1.1.0kafka的版本
【20210102更新】
配置hosts
启动kafka时会使用linux主机名关联的ip地址,所以需要把主机名和linux的ip映射配置到本地host里。
[root@artisan local]# cat /etc/hosts 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.18.130 artisan [root@artisan local]#
主要影响的点是这儿 server.properties
如果不配置的话, 那就是用 IP
启动kafka服务
启动脚本语法: kafkaserverstart.sh [daemon] server.properties
server.properties 的配置路径是一个强制的参数, daemon 表示以后台进程运行,否则ssh客户端退出后,就会停止服务
[root@artisan soft_artisan]# pwd /usr/local/soft_artisan [root@artisan soft_artisan]# tar -xvzf kafka_2.11-1.1.0.tgz -C /usr/local/ [root@artisan soft_artisan]# cd /usr/local/kafka_2.11-1.1.0/ [root@artisan kafka_2.11-1.1.0]# cd bin # 后台启动kafka [root@artisan bin]# ./kafka-server-start.sh -daemon ../config/server.properties # 查看进程 [root@artisan bin]# jps 3409 QuorumPeerMain 11923 Kafka 11942 Jps [root@artisan bin]#
查看zk下的节点信息
[root@artisan bin]# pwd /usr/local/zookeeper-3.4.14/bin [root@artisan bin]# [root@artisan bin]# ./zkCli.sh Connecting to localhost:2181 2019-11-17 10:05:52,083 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.14 ..... ..... ..... # 根节点下,除了zookeeper以外,都是kafka创建的 #查看zk的根目录kafka相关节点 [zk: localhost:2181(CONNECTED) 0] ls / [cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config] [zk: localhost:2181(CONNECTED) 1] ls /brokers [ids, topics, seqid] [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids [0 [zk: localhost:2181(CONNECTED) 3]
server.properties核心配置详解
官方说明: https://kafka.apache.org/documentation/#configuration
我们挑几个常用的来说下吧
参数 | 默认值 | 说明 |
broker.id | 0 | 每一个broker在集群中的唯一标识,非负数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况 |
log.dirs | /tmp/kafka-logs | kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能 /data/kafka-logs-1,/data/kafka-logs-2 |
listeners | 9092 | server接受客户端连接的端口 |
zookeeper.connect | localhost:2181 | zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3 |
log.retention.hours | 168 | 每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样。 |
min.insync.replicas | 1 | 当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目 |
delete.topic.enable | false | 是否允许删除主题 |
或者参考: apache kafka系列之server.properties配置文件参数说明
基本命令
官方指导: https://kafka.apache.org/quickstart
创建主题
主要是用kafka内置的 kafka-topics.sh 脚本 来操作消息 。
我们先来看下如何使用 该shell脚本吧
直接输入 kafka-topics.sh ,回车可以看到参数说明。
[root@artisan bin]# pwd /usr/local/kafka_2.11-1.1.0/bin [root@artisan bin]# ./kafka-topics.sh Create, delete, describe, or change a topic. Option Description ------ ----------- --alter Alter the number of partitions, replica assignment, and/or configuration for the topic. --config <String: name=value> A topic configuration override for the topic being created or altered.The following is a list of valid configurations: cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable See the Kafka documentation for full details on the topic configs. --create Create a new topic. --delete Delete a topic --delete-config <String: name> A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). --describe List details for the given topics. --disable-rack-aware Disable rack aware replica assignment --force Suppress console prompts --help Print usage information. --if-exists if set when altering or deleting topics, the action will only execute if the topic exists --if-not-exists if set when creating topics, the action will only execute if the topic does not already exist --list List all available topics. --partitions <Integer: # of partitions> The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected --replica-assignment <String: A list of manual partition-to-broker broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...> --replication-factor <Integer: The replication factor for each replication factor> partition in the topic being created. --topic <String: topic> The topic to be create, alter or describe. Can also accept a regular expression except for --create option --topics-with-overrides if set when describing topics, only show topics that have overridden configs --unavailable-partitions if set when describing topics, only show partitions whose leader is not available --under-replicated-partitions if set when describing topics, only show under replicated partitions --zookeeper <String: hosts> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple hosts can be given to allow fail-over. [root@artisan bin]#
那根据指导,创建个消息吧
--create Create a new topic. • 1
创建一个名字为“artisan”的Topic,这个topic只有一个partition,并且备份因子也设置为1: ./kafka-topics.sh --create --zookeeper 192.168.18.130:2181 --replication-factor 1 --partitions 1 --topic artisan
[root@artisan bin]# ./kafka-topics.sh --create --zookeeper 192.168.18.130:2181 --replication-factor 1 --partitions 1 --topic artisan Created topic "artisan". [root@artisan bin]#
我们可以通过以下--list
命令来查看kafka中目前存在的topic
[root@artisan bin]# ./kafka-topics.sh --list --zookeeper 192.168.18.130:2181 artisan [root@artisan bin]#
除了我们通过手工的方式创建Topic,当producer发布一个消息某个指定的Topic,但是这个Topic并不存在时,会自动创建
删除主题
# 删除 [root@artisan bin]# ./kafka-topics.sh --delete --topic artisan --zookeeper 192.168.18.130:2181 Topic artisan is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. # 查看 [root@artisan bin]# ./kafka-topics.sh --list --zookeeper 192.168.18.130:2181 # 新建 [root@artisan bin]# ./kafka-topics.sh --create --zookeeper 192.168.18.130:2181 --replication-factor 1 --partitions 1 --topic artisan Created topic "artisan". [root@artisan bin]#
发送消息
kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。
在默认情况下,每一个行会被当做成一个独立的消息。
首先我们要运行发布消息的脚本,然后在命令中输入要发送的消息的内容
[root@artisan bin]# ./kafka-console-producer.sh --broker-list 192.168.18.130:9092 --topic artisan >This is a message >This is another message >
消费消息
对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息.
./kafka-console-consumer.sh --bootstrap-server 192.168.18.130:9092 --topic artisan
如果想要消费之前的消息可以通过--from-beginning
参数指定,如下命令:
[root@artisan bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.18.130:9092 --from-beginning --topic artisan This is a message This is another message This is artisan this is anothhhh this is artisan
如果你是通过不同的终端窗口来运行以上的命令,你将会看到在producer终端输入的内容,很快就会在consumer的终端窗口上显示出来。
以上所有的命令都有一些附加的选项;当我们不携带任何参数运行命令的时候,将会显示出这个命令的详细用法。
查看组名
[root@artisan bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.18.130:9092 --list Note: This will not show information about old Zookeeper-based consumers. console-consumer-81551 console-consumer-72540 console-consumer-23504 testGroup [root@artisan bin]#
查看消费者的消费偏移量
[root@artisan bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.18.130:9092 --describe --group testGroup Note: This will not show information about old Zookeeper-based consumers. Consumer group 'testGroup' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID artisan 0 3 5 2 - - - [root@artisan bin]#
current-offset 和 log-end-offset还有 lag ,分别为当前消费偏移量,结束的偏移量(HW),落后消费的偏移量
消费多主题
先创建另外一个topic : xiaogongjiang
[root@artisan bin./kafka-topics.sh --create --zookeeper 192.168.18.130:2181 --replication-factor 1 --partitions 1 --topic xiaogongjiang Created topic "xiaogongjiang". [root@artisan bin]#
开启两个生产者
[root@artisan bin]# ./kafka-console-producer.sh --broker-list 192.168.18.130:9092 --topic xiaogongjiang >send from xiaogongjiang > [root@artisan bin]# ./kafka-console-producer.sh --broker-list 192.168.18.130:9092 --topic artisan >send from artisan >
消费多主题 如下
[root@artisan bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.18.130:9092 --whitelist "artisan|xiaogongjiang" send from xiaogongjiang send from artisan
单播消费
一条消息只能被某一个消费者消费的模式,类似queue模式,只需让所有消费者在同一个消费组里即可.
分别在两个客户端执行如下消费命令,然后往主题里发送消息,结果只有一个客户端能收到消息
这样,生产者跟消费组没关系,只要在消费的时候指定消费组即可
生产者
[root@artisan bin]# ./kafka-console-producer.sh --broker-list 192.168.18.130:9092 --topic artisan >queue model test >queue model the second message
消费者1
[root@artisan bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.18.130:9092 --consumer-property group.id=artisanGroup --topic artisan
消费者2
[root@artisan bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.18.130:9092 --consumer-property group.id=artisanGroup --topic artisan queue model test queue model the second message
多播消费
一条消息能被多个消费者消费的模式,类似publish-subscribe模式 费,针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。我们再增加一个消费者,该消费者属于 testGroup-2 消费组, 结果两个客户端都能收到消息.
生产者
[root@artisan bin]# ./kafka-console-producer.sh --broker-list 192.168.18.130:9092 --topic artisan >messge artisan jajaja >
消费者1 属于 anotherArtisanGroup消费组
[root@artisan bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.18.130:9092 --consumer-property group.id=anotherArtisanGroup --topic artisan messge artisan jajaja
消费者1 属于 artisanGroup 消费组
[root@artisan bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.18.130:9092 --consumer-property group.id=artisanGroup --topic artisan messge artisan jajaja
小结
到此为止,我们搭建了kafka的单节点环境,也演示了基本用法,接下来,我们来搭建一个3个节点的kafka集群吧。