1 下载或复制Kafka安装包
http://archive.apache.org/dist/kafka/0.10.1.0/
2 解压缩Kafka安装包
tar -zxvf kafka_2.11-0.10.1.0.gz
3 配置Kafka集群
配置Kafka集群时只需要修改broker.id和zookeeper.connect。
- 在Master上的配置
在Server Basics代码段中,在“broker.id=0”下面增加“host.name=master”。
broker.id=0 host.name=master
在该文件的ZooKeeper代码段中,将已有的“zookeeper.connect= localhost:2181”替换成如下的代码(需要根据实际集群情况配置):
zookeeper.connect=master:2181,slave0:2181,slave1:2181
完成了Master上的配置后,需要将Master上的Kafka安装目录复制到Slave。我们这里需要复制两次,其中一次。
scp -r kafka_2.11-0.10.1.0 slave0:~/
- 在Slave上的配置
Kafka集群还需要在Slave上进行必要的配置。对于这里的Slave0,将其server.properties配置文件中的broker.id设置为1,host.name设置为slave0,ZooKeeper代码段保持不变。
4 Kafka的初步应用
启动ZooKeeper服务,集群的话启动集群
./kafka-server-start.sh ~/kafka_2.11-0.10.1.0/config/server.properties
由于Kafka是作为守护进程加载的,执行上述命令后终端也会出现停顿状态,这表示系统已经处于后台运行状态,所以也不需要关闭该终端窗口,只要保持当前状态即可。
实际上,用户可以另外开启一个终端,通过“jps”命令来查看当前系统进程列表,可以看到的几个进程名称。
注意事项
1.有一些用户在启动Kafka自带的ZooKeeper服务时可能会失败,或者即使启动了也不能正常工作,例如无法支持主题创建。
2.这时可以先关闭刚才启动的ZooKeeper服务(可以简单地通过“kill-9 PID”命令来终止QuorumPeerMain服务,其中PID是QuorumPeerMain的进程ID,如上面示例中的4775),然后通过HBase来启动其自带的ZooKeeper服务,或者启动独立安装的ZooKeeper。只要启动成功,也同样可以为Kafka的工作提供支持。
3.上述实践说明,在Hadoop集群应用中,只要能够启动任何一个组件自带的ZooKeeper服务,或者启动独立安装的ZooKeeper,就可以为其他任何需要ZooKeeper服务的组件提供支持,并不需要每个组件都启动自带的ZooKeeper。
4.1 创建主题
要使用Kafka,一定需要创建主题。主题(Topic)是消息中间件的基本概念,相当于文件系统的目录,其实就是用于保存消息内容的计算实体,通过主题名称可标识消息,就如同通过目录名标识目录一样。
我们在Master上创建一个名为test的主题。注意,为了创建主题,请在Master上另外开启一个终端,并进入Kafka安装目录,并执行如下命令:
./kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic test
./kafka-topics.sh --list --zookeeper master:2181
执行成功后会出现“Created topic‘test’”的提示,我们也可以通过执行下面的命令查看已经创建的主题。
4.2 发送消息
消息中间件是一个用于接收消息并转发消息的服务。为了检验Kafka是否能够正常工作,需要创建一个消息生产者(Producer)来产生消息。请重新开启一个终端,然后执行如下的命令:
./kafka-console-producer.sh --broker-list master:9092 --topic test
作为Producer,上面的终端一直处于产生消息的状态,其任务就是等待用户的输入,并保存到主题中。这时需要在另一个终端上创建消息消费者(Consumer),才能接收这些消息。
4.3 消费消息
要创建消息消费者并接收消息,需要在一个新的终端上执行如下的命令:
./kafka-console-consumer.sh --zookeeper master:2181 --topic test --from-beginning
按下Enter键后,即可接收到从消息生产者发送到test主题中的消息,如图7-21所示。
./kafka-topics.sh --describe --zookeeper master:2181 --topic test
可以看到,消息生产者和消息消费者通过Kafka的消息中间件联系起来了,消息生产者是产生消息的一方,而消息消费者只需要从主题中接收消息。这种应用模式在很多数据处理系统中都可以发挥积极作用。例如,在一些实时大数据应用中,Kafka可以保存从数据源产生的数据,接收者(消息消费者)则可以按照自己的数据传输速率接收数据,因此,Kafka起到了一个缓冲作用。
由于Kafka是一个分布式的消息分发系统,所以也可以在集群中的任何节点接收消息,例如,在Slave0上通过执行“bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test --from-beginning”命令能够接收到从Master发送的消息。同样,还可以在Slave上创建消息生产者向Kafka服务器(Server)发送消息,使得集群的任何节点都可以接收消息。