介绍
Apache Kafka 是一个流行的分布式消息代理,旨在高效处理大量实时数据。Kafka 集群不仅高度可扩展和容错,而且与其他消息代理(如 ActiveMQ 和 RabbitMQ)相比,吞吐量更高。虽然它通常用作 发布/订阅 消息系统,但许多组织也将其用于日志聚合,因为它为发布的消息提供了持久存储。
发布/订阅消息系统允许一个或多个生产者发布消息,而不考虑消费者的数量或它们如何处理消息。订阅的客户端会自动收到关于更新和新消息创建的通知。这个系统比客户端定期轮询以确定是否有新消息可用的系统更高效和可扩展。
在本教程中,您将在 Ubuntu 18.04 上安装和使用 Apache Kafka 2.1.1。
先决条件
要跟随本教程,您需要:
- 一个 Ubuntu 18.04 服务器和一个具有 sudo 权限的非 root 用户。如果您尚未设置非 root 用户,请按照本指南中的步骤进行设置。
- 服务器上至少 4GB 的 RAM。RAM 少于此数量可能导致 Kafka 服务失败,Java 虚拟机(JVM)在启动过程中抛出 “内存不足” 异常。
- 在服务器上安装 OpenJDK 8。要安装此版本,请按照安装特定版本的 OpenJDK 的说明进行操作。Kafka 是用 Java 编写的,因此它需要 JVM;但是,它的启动 shell 脚本存在版本检测错误,导致在 JVM 版本高于 8 时无法启动。
步骤 1 —— 为 Kafka 创建用户
由于 Kafka 可以通过网络处理请求,因此您应该为其创建一个专用用户。这样可以最大程度地减少 Kafka 服务器被入侵对 Ubuntu 机器造成的损害。在本步骤中,我们将创建一个名为 kafka 的专用用户,但是在设置完 Kafka 后,您应该创建一个不同的非 root 用户来执行此服务器上的其他任务。
以非 root sudo 用户身份登录,使用 useradd
命令创建一个名为 kafka 的用户:
sudo useradd kafka -m
-m
标志确保为用户创建一个家目录。这个家目录 /home/kafka
将作为我们在下面各节中执行命令的工作空间目录。
使用 passwd
设置密码:
sudo passwd kafka
使用 adduser
命令将 kafka 用户添加到 sudo
组,以便它具有安装 Kafka 依赖所需的权限:
sudo adduser kafka sudo
您的 kafka 用户现在已准备就绪。使用 su
登录到此帐户:
su -l kafka
现在我们已经创建了 Kafka 专用用户,可以继续下载并提取 Kafka 二进制文件。
步骤 2 —— 下载和提取 Kafka 二进制文件
让我们将 Kafka 二进制文件下载并提取到 kafka 用户的家目录中的专用文件夹中。
首先,在 /home/kafka
中创建一个名为 Downloads
的目录以存储您的下载:
mkdir ~/Downloads
使用 curl
下载 Kafka 二进制文件:
curl "https://www.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz" -o ~/Downloads/kafka.tgz
创建一个名为 kafka
的目录并切换到此目录。这将是 Kafka 安装的基本目录:
mkdir ~/kafka && cd ~/kafka
使用 tar
命令提取您下载的存档:
tar -xvzf ~/Downloads/kafka.tgz --strip 1
我们使用 --strip 1
标志确保将存档的内容提取到 ~/kafka/
本身,而不是提取到它内部的另一个目录(例如 ~/kafka/kafka_2.11-2.1.1/
)中。
现在我们已成功下载并提取了二进制文件,可以继续配置 Kafka 以允许主题删除。
步骤 3 —— 配置 Kafka 服务器
Kafka 的默认行为不允许我们删除 主题,即可发布消息的类别、组或源名称。为了修改这一点,让我们编辑配置文件。
Kafka 的配置选项在 server.properties
中指定。使用 nano
或您喜欢的编辑器打开此文件:
nano ~/kafka/config/server.properties
让我们添加一个设置,允许我们删除 Kafka 主题。在文件底部添加以下内容:
delete.topic.enable = true
保存文件并退出 nano
。现在我们已经配置了 Kafka,可以继续创建 systemd 单元文件以运行并在启动时启用它。
步骤 4 —— 创建 Systemd 单元文件并启动 Kafka 服务器
在本节中,我们将为 Kafka 服务创建 systemd 单元文件。这将帮助我们执行常见的服务操作,如以与其他 Linux 服务一致的方式启动、停止和重新启动 Kafka。
Zookeeper 是 Kafka 用于管理其集群状态和配置的服务。它通常作为许多分布式系统中的一个组成部分而被广泛使用。如果您想了解更多信息,请访问官方的 Zookeeper 文档。
创建 zookeeper
的单元文件:
sudo nano /etc/systemd/system/zookeeper.service
将以下单元定义输入到文件中:
[Unit] Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=kafka ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
[Unit]
部分指定了 Zookeeper 在启动之前需要网络和文件系统准备就绪。
[Service]
部分指定了 systemd 应该使用 zookeeper-server-start.sh
和 zookeeper-server-stop.sh
shell 文件来启动和停止服务。它还指定了如果 Zookeeper 异常退出,应该自动重新启动。
接下来,为 kafka
创建 systemd 服务文件:
sudo nano /etc/systemd/system/kafka.service
将以下单元定义输入到文件中:
[Unit] Requires=zookeeper.service After=zookeeper.service [Service] Type=simple User=kafka ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1' ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
[Unit]
部分指定了此单元文件依赖于 zookeeper.service
。这将确保在 kafka
服务启动时自动启动 zookeeper
。
[Service]
部分指定了 systemd 应该使用 kafka-server-start.sh
和 kafka-server-stop.sh
shell 文件来启动和停止服务。它还指定了如果 Kafka 异常退出,应该自动重新启动。
现在单元已经定义好了,使用以下命令启动 Kafka:
sudo systemctl start kafka
为了确保服务器已成功启动,检查 kafka
单元的日志:
sudo journalctl -u kafka
您应该会看到类似以下的输出:
Jul 17 18:38:59 kafka-ubuntu systemd[1]: Started kafka.service.
现在您有一个监听端口 9092
的 Kafka 服务器。
虽然我们已经启动了 kafka
服务,但如果我们重新启动服务器,它将不会自动启动。要在服务器启动时启用 kafka
,运行:
sudo systemctl enable kafka
现在我们已经启动并启用了服务,让我们检查安装。
步骤 5 —— 测试安装
让我们发布和消费一个 “Hello World” 消息,以确保 Kafka 服务器的行为正常。在 Kafka 中发布消息需要:
- 一个 生产者,它使记录和数据发布到主题成为可能。
- 一个 消费者,它从主题中读取消息和数据。
首先,通过输入以下命令创建一个名为 TutorialTopic
的主题:
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
您可以使用 kafka-console-producer.sh
脚本从命令行创建一个生产者。它需要 Kafka 服务器的主机名、端口和一个主题名称作为参数。
通过输入以下命令,将字符串 "Hello, World"
发布到 TutorialTopic
主题:
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
接下来,您可以使用 kafka-console-consumer.sh
脚本创建一个 Kafka 消费者。它需要 ZooKeeper 服务器的主机名和端口,以及一个主题名称作为参数。
以下命令从 TutorialTopic
消费消息。请注意使用了 --from-beginning
标志,它允许消费在消费者启动之前发布的消息:
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
如果没有配置问题,您应该在终端中看到 Hello, World
:
Hello, World
该脚本将继续运行,等待更多消息发布到主题。随时可以打开一个新的终端并启动一个生产者,以发布更多消息。您应该能够在消费者的输出中看到它们。
测试完成后,按 CTRL+C
停止消费者脚本。现在我们已经测试了安装,让我们继续安装 KafkaT。
步骤 6 — 安装 KafkaT(可选)
KafkaT 是 Airbnb 推出的一个工具,它可以让你更轻松地从命令行查看 Kafka 集群的详细信息并执行某些管理任务。由于它是一个 Ruby gem,所以你需要安装 Ruby 来使用它。你还需要安装 build-essential
软件包以便构建它所依赖的其他 gem。使用 apt
安装它们:
sudo apt install ruby ruby-dev build-essential
现在你可以使用 gem 命令安装 KafkaT:
sudo gem install kafkat
KafkaT 使用 .kafkatcfg
作为配置文件,用于确定你的 Kafka 服务器的安装目录和日志目录。它还应该有一个指向你的 ZooKeeper 实例的条目。
创建一个名为 .kafkatcfg
的新文件:
nano ~/.kafkatcfg
添加以下行以指定有关你的 Kafka 服务器和 Zookeeper 实例的必要信息:
{ "kafka_path": "~/kafka", "log_path": "/tmp/kafka-logs", "zk_path": "localhost:2181" }
现在你已经准备好使用 KafkaT 了。首先,以下是如何使用它查看所有 Kafka 分区的详细信息:
kafkat partitions
你将看到以下输出:
Topic Partition Leader Replicas ISRs TutorialTopic 0 0 [0] [0] __consumer_offsets 0 0 [0] [0] ... ...
你将看到 TutorialTopic
,以及 __consumer_offsets
,这是 Kafka 用于存储与客户端相关信息的内部主题。你可以安全地忽略以 __consumer_offsets
开头的行。
要了解更多关于 KafkaT 的信息,请参阅其 GitHub 仓库。
步骤 7 — 设置多节点集群(可选)
如果你想使用更多的 Ubuntu 18.04 机器创建一个多 broker 集群,你应该在每台新机器上重复执行步骤 1、步骤 4 和步骤 5。此外,你还应该对每台机器的 server.properties
文件进行以下更改:
broker.id
属性的值应该被更改,以便在整个集群中是唯一的。该属性唯一标识集群中的每个服务器,并且可以将任何字符串作为其值。例如,"server1"
,"server2"
等。zookeeper.connect
属性的值应该被更改,以便所有节点指向同一个 ZooKeeper 实例。该属性指定了 Zookeeper 实例的地址,并遵循<主机名/IP_地址>:<端口>
的格式。例如,"203.0.113.0:2181"
,"203.0.113.1:2181"
等。
如果你想为你的集群拥有多个 ZooKeeper 实例,每个节点上 zookeeper.connect
属性的值应该是一个相同的、用逗号分隔的字符串,列出所有 ZooKeeper 实例的 IP 地址和端口号。
步骤 8 — 限制 Kafka 用户
现在所有的安装都已完成,你可以移除 kafka 用户的管理员权限。在这样做之前,请注销并以任何其他非 root sudo 用户登录。如果你仍在运行你开始本教程的相同 shell 会话,只需输入 exit
。
从 sudo 组中移除 kafka 用户:
sudo deluser kafka sudo
为了进一步提高 Kafka 服务器的安全性,使用 passwd
命令锁定 kafka 用户的密码。这样可以确保没有人可以直接使用这个帐户登录到服务器:
sudo passwd kafka -l
此时,只有 root 用户或一个 sudo 用户可以通过以下命令登录为 kafka
:
sudo su - kafka
将来,如果你想解锁它,可以使用 passwd
命令并带上 -u
选项:
sudo passwd kafka -u
现在你已成功限制了 kafka 用户的管理员权限。
结论
现在你已经在你的 Ubuntu 服务器上安全地运行 Apache Kafka。你可以通过使用 Kafka 客户端在你的项目中创建 Kafka 生产者和消费者,Kafka 客户端可用于大多数编程语言。要了解更多关于 Kafka 的信息,你也可以参考其文档。