Kafka安装部署

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
日志服务 SLS,月写入数据量 50GB 1个月
云原生网关 MSE Higress,422元/月
简介: Kafka安装部署
  1. kafka介绍(摘自百度百科)

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

  1. 环境准备
服务器版本:SUSE Linux Enterprise Server 12 (x86_64)
jdk版本:  1.8
kafka版本:2.3.0
版本获取:https://mirrors.cnnic.cn/apache/kafka/2.3.0/

3.部署步骤

  • 解压压缩包
tar zxvf kafka_2.12-2.3.0.tgz  -C /app
  • 目录介绍
/bin 操作kafka的可执行脚本,还包含windows下脚本
/config 配置文件所在目录
/libs 依赖库目录
/logs 日志数据目录,目录kafka把server端日志分为5种类型,分为:server,request,state,log-cleaner,controller
  • 配置zookeeper
vim /app/kafka_2.12-2.3.0/config/server.properties
修改日志路径:log.dirs=/app/kafka_2.12-2.3.0/kafka-logs
监听地址端口:listeners=PLAINTEXT://192.168.96.102:9092
zk连接地址:zookeeper.connect=localhost:2181(kafka自带zookeeper)
启动命令:bin/zookeeper-server-start.sh config/zookeeper.properties &

优化配置:broker.id=0

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

num.partitions=3

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

auto.create.topics.enable=true

delete.topics.enable=true

说明
broker.id:每个broker在集群中的唯一标识,正整数。当该服务器的ip地址发生变更,但broker.id未变,则不会影响consumers的消费情况
listeners:kafka的监听地址与端口,在实际测试中如果写0.0.0.0会报错。
num.network.threads:kafka用于处理网络请求的线程数
num.io.threads:kafka用于处理磁盘io的线程数
socket.send.buffer.bytes:发送数据的缓冲区
socket.receive.buffer.bytes:接收数据的缓冲区
socket.request.max.bytes:允许接收的最大数据包的大小(防止数据包过大导致OOM)
log.dirs:kakfa用于保存数据的目录,所有的消息都会存储在该目录当中。可以通过逗号来指定多个路径,kafka会根据最少被使用的原则选择目录分配新的partition。需要说明的是,kafka在分配partition的时候选择的原则不是按照磁盘空间大小来定的,而是根据分配的partition的个数多少而定
num.partitions:设置新创建的topic的默认分区数
number.recovery.threads.per.data.dir:用于恢复每个数据目录时启动的线程数
log.retention.hours:配置kafka中消息保存的时间,还支持log.retention.minutes和log.retention.ms。如果多个同时设置会选择时间最短的配置,默认为7天。
log.retention.check.interval.ms:用于检测数据过期的周期
log.segment.bytes:配置partition中每个segment数据文件的大小。默认为1GB。超出该大小后,会自动创建一个新的segment文件。
zookeeper.connect:指定连接的zk的地址,zk中存储了broker的元数据信息。可以通过逗号来设置多个值。格式为:hostname:port/path。hostname为zk的主机名或ip,port为zk监听的端口。/path表示kafka的元数据存储到zk上的目录,如果不设置,默认为根目录
zookeeper.connection.timeout:kafka连接zk的超时时间
group.initial.rebalance.delay.ms:在实际环境当中,当将多个consumer加入到一个空的consumer group中时,每加入一个consumer就会触发一次对partition消费的重平衡,如果加入100个,就得重平衡100次,这个过程就会变得非常耗时。通过设置该参数,可以延迟重平衡的时间,比如有100个consumer会在10s内全部加入到一个consumer group中,就可以将该值设置为10s,10s之后,只需要做一次重平衡即可。默认为0则代表不开启该特性。
auto.create.topics.enable:当有producer向一个不存在的topic中写入消息时,是否自动创建该topic
delete.topics.enable:kafka提供了删除topic的功能,但默认并不会直接将topic数据物理删除。如果要从物理上删除(删除topic后,数据文件也一并删除),则需要将此项设置为true
需要说明的是,多个kafka节点依赖zk实现集群,所以各节点并不需要作特殊配置,只需要broker.id不同,并接入到同一个zk集群即可。
  • 启动
进入bin目录,执行
bin/kafka-server-start.sh -daemon  config/server.properties

启动时我这里报错

[2019-10-10 18:10:48,103] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
  at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
  at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
  at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
  at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
  at kafka.server.KafkaServer.createZkClient$1(KafkaServer.scala:364)
  at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:387)
  at kafka.server.KafkaServer.startup(KafkaServer.scala:207)
  at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
  at kafka.Kafka$.main(Kafka.scala:84)
  at kafka.Kafka.main(Kafka.scala)
[2019-10-10 18:10:48,105] INFO shutting down (kafka.server.KafkaServer)
这时将配置文件里超时时间延长一些,就可以了
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=60000(默认为6s)
  • 检查进程

  • 创建topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181  --replication-factor 2 --partitions 3 --topic myfirsttopic 
Created topic myfirsttopic.
参数说明:
--create:创建一个topic
--zookeeper: 指定zookeeper集群的主机列表,多个server可使用逗号分隔,这里因为kafka和zk是在同一个server,所以直接连接了本机的2181端口
--replication-factor:指定创建这个topic的副本数
--partitions:指定该topic的分区数
--topic:指定topic的名称```


相关文章
|
16天前
|
消息中间件 Ubuntu Java
|
28天前
|
消息中间件 Ubuntu Java
|
2月前
|
消息中间件 Ubuntu Java
Kafka安装部署
Kafka安装部署
|
消息中间件 Java Kafka
Apache Kafka-CMAK(kafka manager)安装部署使用
Apache Kafka-CMAK(kafka manager)安装部署使用
400 0
|
消息中间件 大数据 Java
环境篇之 kafka 的安装部署|学习笔记
快速学习环境篇之 kafka 的安装部署
140 0
|
消息中间件 Java Kafka
kafka简单安装部署
一、安装、配置  1.下载 kafka是由linkedin开源的,但是已经托管在了apache,所以需要从apache下载,http://kafka.apache.org/downloads.html。
1281 0
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
46 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
268 9
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
67 3