Kafka 3.x的解压安装 - Linux

简介: Kafka 3.x的解压安装 - Linux

一、Kafka介绍

来自维基百科:Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。

1. 应用场景

Kafka可以看作是一个能够处理消息队列的中间件,适用于实时的流数据处理,主要用于平衡好生产者和消费者之间的关系。

  • 生产者

生产者可以看作是数据源,可以来自于日志采集框架,如Flume,也可以来自于其它的流数据服务。当接收到数据后,将根据预设的Topic暂存在Kafka中等待消费。对于接收到的数据将会有额外的标记,用于记录数据的被消费【使用】情况。

  • 消费者

消费者即数据的使用端,可以是一个持久化的存储结构,如Hadoop,也可以直接接入支持流数据计算的各种框架,如Spark - Streaming。消费者可以有多个,通过订阅不同的Topic来获取数据。

2. 版本对比

Kafka的0.x和1.x可以看作是上古版本了,最近的更新也是几年以前,从目前的场景需求来看,也没有什么特别的理由需要使用到这两个版本了。

  • 2.x

在进行版本选择时,通常需要综合考虑整个数据流所设计到的计算框架和存储结构,来确定开发成本以及兼容性。目前2.x版本同样是一个可以用于生产环境的版本,并且保持着对Scala最新版本的编译更新。

  • 3.x

3.x是目前最新的稳定版,需要注意的是,Kafka的每个大版本之间的差异较大,包括命令参数以及API调用,所以在更换版本前需要做好详细的调查与准备,本文以3.x的安装为例。

二、Kafka安装

解压安装的操作方式可以适用于各种主流Linux操作系统,只需要解决好前置环境问题。

1. 前置环境

此前,运行Kafka需要预先安装Zookeeper。在Kafka 2.8.0版本以后,引入了Kraft(Kafka Raft)模式,可以使Kafka在不依赖外部Zookeeper的前提下运行。除此之外Kafka由Scala语言编写,需要JVM的运行环境。

  • 检查已有的JDK环境
  • Ubuntu/Debian:update-java-alternatives --list【需要已安装sudo apt install java-common】
  • CentOS/RedHat:rpm -qa|grep java

检查已经安装的环境也可以使用find来搜索,或者使用java -version命令辅助验证,主要为了保证不会与即将安装的JDK版本产生冲突。

  • 卸载不需要的JDK环境
  • Ubuntu/Debian:sudo apt remove xxx
  • CentOS/RedHat:sudo rpm -e --nodeps xxx

使用已经查到的软件包完整名称替换掉xxx的部分,可以卸载掉系统预装的JDK版本。

  • 安装JDK
  • Ubuntu/Debian:sudo apt install openjdk-8-jdk
  • CentOS/RedHat:sudo yum install java-1.8.0-openjdk

安装完成后可以使用java-version命令验证【可省去环境变量配置】。

2. 软件安装

wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz

  • 解压安装
tar -zvxf kafka_2.13-3.4.0.tgz
  • 环境变量配置
  • 系统环境变量:/etc/profile
  • Ubuntu/Debian:~/.bashrc
  • CentOS/RedHat:~/.bash_profile

需要在环境变量中指定Kafka的安装目录以及命令文件所在目录,系统环境变量与用户环境变量配置其中之一即可。

export KAFKA_HOME=/home/parallels/Software/kafka_2.13-3.4.0
export PATH=$PATH:$KAFKA_HOME/bin

在文件结尾添加以上内容后执行source命令,使其立即生效。

[Ubuntu/Debian]source ~/.bashrc
[CentOS/RedHat]source ~/.bash_profile

执行后可以输入kafka,然后按Tab尝试补全【需要按多次】,如果出现命令列表则证明配置成功。

3. 服务启动

如果使用Kraft模式,则需要先进行集群初始化【即使是单个节点】,以下为操作步骤:

  • 修改配置文件

修改Kafka的server.properties文件,更换其中的log.dirs目录指向,防止默认的/tmp被清空:

# 创建log目录
cd $KAFKA_HOME
mkdir kafka-logs
# 修改配置文件
log.dirs=/home/parallels/Software/kafka_2.13-3.4.0/kafka-logs
  • 创建Kafka的集群ID

调用kafka-storage.sh生成一个UUID,并存储在一个临时变量KAFKA_CLUSTER_ID中。

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

  • 执行格式化操作

进入到Kafka的家目录后,执行以下命令:

bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

  • 启动Kafka集群

进入到Kafka的家目录后,执行以下命令

bin/kafka-server-start.sh config/kraft/server.properties

这种方式并不是后台运行,需要保证终端开启,等测试稳定后可以在后台执行或者注册为系统服务。

三、Console测试

1. 基础命令

  • 查看Topic列表
kafka-topics.sh --list --bootstrap-server localhost:9092
  • 创建Topic
  • 默认:分区数为1
  • 默认:副本因子为1
  • 默认:删除策略为delete,即删除后数据不可恢复
  • 默认:压缩类型为producer,即由生产者决定是否压缩消息
  • 默认:清理策略为delete,即按照时间和大小进行日志清理

当创建一个Topic时,如果只填写Topic名称,会使用以上的默认配置

kafka-topics.sh --create --topic <topic-name> --partitions <num-partitions> --replication-factor <replication-factor> --bootstrap-server localhost:9092

  • 查看Topic
kafka-topics.sh --describe --topic <topic-name> --bootstrap-server localhost:9092

  • 删除Topic
kafka-topics.sh --delete --topic <topic-name> --bootstrap-server localhost:9092

2. 启动生产者

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic

启动一个基于console的生产者脚本,可以方便的进行数据输入的测试,直接进行数据输入即可。

3. 启动消费者

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

添加from-beginning参数来从头消费数据。

4. 验证消费流程

生产者脚本中持续输入数据,然后查看消费者脚本中的消费情况,使用Ctrl + C终止。

四、注册系统服务

为了方便的控制Kafka服务的启动和停止,可以将其注册为系统服务。

1. Systemd服务配置

  • 创建Systemd服务文件
sudo vi /etc/systemd/system/kafka.service

在文件中添加以下内容,需要手动替换User和Group名称以及ExecStartExecStop中关于路径的部分:

[Unit]
Description=Apache Kafka
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=parallels
Group=parallels
ExecStart=/path/to/kafka_home/bin/kafka-server-start.sh /path/to/kafka_home/config/kraft/server.properties
ExecStop=/path/to/kafka_home/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target

  • 重新加载Systemd配置
sudo systemctl daemon-reload

2. Kafka服务控制

  • 开机自动启动
sudo systemctl enable kafka.service

  • 启动Kafka服务
sudo systemctl start kafka.service
  • 检查Kafka状态
sudo systemctl status kafka.service

  • 停止Kafka服务
sudo systemctl stop kafka.service
  • 重启Kafka服务
sudo systemctl restart kafka.service


目录
相关文章
|
6天前
|
存储 IDE Linux
零基础保姆级教程!手把手教你免费玩转Linux CentOS安装+学习环境搭建(附避坑指南)
本文详细介绍了在VMware虚拟机中安装CentOS 6.8的全过程。首先,需确保已安装VMware并开启V-CPU虚拟化功能,可通过BIOS设置或使用LeoMoon CPU-V工具检测。接着,下载CentOS镜像文件,并在VMware中新建虚拟机,配置CPU、内存、硬盘等参数。最后,加载ISO镜像启动虚拟机,按照提示完成CentOS的安装,包括语言、键盘、存储方式、地区、密码设置及硬盘分区等步骤。安装完成后,以root用户登录即可进入系统桌面,开始学习Linux命令和操作。
51 12
零基础保姆级教程!手把手教你免费玩转Linux CentOS安装+学习环境搭建(附避坑指南)
|
19天前
|
缓存 Ubuntu Linux
Linux中yum、rpm、apt-get、wget的区别,yum、rpm、apt-get常用命令,CentOS、Ubuntu中安装wget
通过本文,我们详细了解了 `yum`、`rpm`、`apt-get`和 `wget`的区别、常用命令以及在CentOS和Ubuntu中安装 `wget`的方法。`yum`和 `apt-get`是高层次的包管理器,分别用于RPM系和Debian系发行版,能够自动解决依赖问题;而 `rpm`是低层次的包管理工具,适合处理单个包;`wget`则是一个功能强大的下载工具,适用于各种下载任务。在实际使用中,根据系统类型和任务需求选择合适的工具,可以大大提高工作效率和系统管理的便利性。
108 25
|
23天前
|
Linux
Linux压缩与解压“助手”
gzip压缩解压 因为都是系统自带的我们不需要安装; ls 看一下目录当前的文件,创建一个 touch 123.txt 文件; 原文件消失,压缩解压 gzip 进行压缩,(“ gzip 123.txt ”),这样就压缩完毕了; gzip 进行解压,(“ gzip -d 123.txt.gz”),这样就解压完毕了; 原文件保留,gzip -k 123.txt、gzip -dk 123.txt.gz ;
39 9
|
1月前
|
消息中间件 Kafka Docker
docker compose 安装 kafka
通过本文的步骤,您可以快速在本地使用 Docker Compose 安装并配置 Kafka 和 Zookeeper。Docker Compose 简化了多容器应用的管理,方便快速搭建和测试分布式系统。
90 2
|
1月前
|
Ubuntu Java Linux
Linux 安装 Qualcomm ® SnapdragonTM Profiler
通过本文的详细介绍,您应该已经成功在 Linux 系统上安装并配置了 Qualcomm® Snapdragon™ Profiler,并能够连接 Android 设备进行性能分析。Snapdragon Profiler 提供了丰富的工具和功能,可以帮助开发者深入了解应用程序的性能瓶颈,从而进行优化。希望本文能对您有所帮助,让您在开发过程中更高效地使用 Snapdragon Profiler 进行性能分析和优化。
78 10
|
1月前
|
Linux
Linux安装svn并启动
Linux安装svn并启动
60 10
|
2月前
|
Oracle 关系型数据库 Linux
linux8安装oracle 11g遇到的问题记录
Oracle 11g在Linux 8上安装时会遇到link编译环节的问题。官方建议忽略安装中的链接错误,安装完成后应用DBPSU 11.2.0.4.240716补丁及一次性补丁33991024,再重新编译二进制文件,并配置监听器和数据库。但因11g已退出服务期,这些补丁需付费获取。网上信息显示22年1月的PSU补丁也可解决问题,找到该补丁后按常规方式打补丁即可。如有需求或疑问可咨询我。
103 20
|
2月前
|
弹性计算 运维 Ubuntu
os-copilot在Alibaba Cloud Linux镜像下的安装与功能测试
我顺利使用了OS Copilot的 -t -f 功能,我的疑惑是在换行的时候就直接进行提问了,每次只能写一个问题,没法连续换行更有逻辑的输入问题。 我认为 -t 管道 功能有用 ,能解决环境问题的连续性操作。 我认为 -f 管道 功能有用 ,可以单独创建可连续性提问的task问题。 我认为 | 对文件直接理解在新的服务器理解有很大的帮助。 此外,我还有建议 可以在非 co 的环境下也能进行连续性的提问。
87 7
|
2月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
5月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
207 1