Apache Kafka是开源分布式高并发消息中间件,支持每秒百万级消息并发,在互联网高并发架构:双11、电商秒杀抢购、网络直播、IOT大数据采集、聊天App、导航等高并发架构中大量使用。
生产环境一般要求搭建Kafka集群。Java开发Kafka集群需要注意参数的详细配置,Kafka参数的含义在配置集群的时候非常重要,尤其是关系性能和集群的参数。下面我们一起来看看Kafka的详细参数。
Kafka服务器基础配置
broker的身份ID。必须为每个代理设置一个唯一的整数。集群配置时候非常重要。不能重复。
broker.id = 1
Kafka套接字服务器配置
Kafka服务器套接字服务器侦听的地址。 如果未配置,就会使用java.net.InetAddress.getCanonicalHostName()地址。格式:
#listeners = listener_name:// host_name:port
例如:
listeners = PLAINTEXT://localhost:9092
listeners = PLAINTEXT://localhost:9091
Broker向生产者和消费者通告的主机名和端口。如果没有设定,它将使用“侦听器”的值。否则,它将使用该值从java.net.InetAddress.getCanonicalHostName()返回的地址。
#advertised.listeners = PLAINTEXT://host.name:9092
Kafka侦听器名称映射到安全协议,默认为它们是相同名称
#listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
服务器用于从网络接收请求并向网络发送响应消息的线程数
num.network.threads = 3
服务器用于处理请求的线程数,可能包括磁盘I / O需要的线程.
num.io.threads = 8
套接字服务器使用的发送缓冲区(SO_SNDBUF)大小,字节。
socket.send.buffer.bytes = 102400
套接字服务器使用的接收缓冲区(SO_RCVBUF)大小,字节。
socket.receive.buffer.bytes = 102400
套接字服务器将接受的请求的最大大小(防止OOM内存溢出)大小,字节。
socket.request.max.bytes = 104857600
Kafka日志配置
逗号分隔的目录列表,用于存储日志文件
log.dirs =。/日志
每个主题的默认日志分区数。更多分区允许更大并行处理消息,但这也会导致更多的文件
num.partitions = 1
在启动时用于日志恢复和在关闭时刷新的每个数据目录文件夹需要的线程数。
#对于数据目录文件夹位于RAID阵列中的情况,建议增加此线程数值。
num.recovery.threads.per.data.dir = 1
Kafka内部主题配置
Topic主题的消费者的group元数据复制因子,"__consumer_offsets" 和"__transaction_state" 。
offsets.topic.replication.factor = 1
transaction.state.log.replication.factor = 1
transaction.state.log.min.isr = 1
Kafka日志刷新政策配置
Kafka消息立即写入文件系统,但默认情况下我们只有fsync()来缓慢延迟地同步操作系统缓存消息到磁盘上。 以下配置参数控制将消息数据刷新到磁盘过程。这里有一些重要的权衡:
1、持久性:如果Kafka不使用复制,则可能会丢失未刷新的数据。
2、延迟:当Kafka刷新确实发生时,非常大的刷新间隔可能会导致延迟峰值,因为会有大量数据需要刷新到磁盘,间隔太久缓冲消息越多。
3、吞吐量:Flush通常是最昂贵的操作,并且小的Flush间隔可能导致过多的磁盘IO操作搜索。
下面以下设置允许配置刷新策略以在一段时间后或每N条消息(或两者)刷新数据。这可以在全局范围内配置完成,也可以针对每个主题的单独配置。
#强制刷新数据到磁盘之前要接受的消息数,10000消息时批量刷盘
#log.flush.interval.messages = 10000
#强制刷新之前消息可以在日志中停留的最长时间 1000毫秒
#log.flush.interval.ms = 1000
Kafka日志保存策略
以下配置参数控制Kafka日志段的处理策略,我们可以设置为在一段时间后或在累积给定大小后删除日志数据段。只要满足一下任意条件,就会删除一个日志段。删除总是从日志的末尾开始。
log log的最小时间长度,超过删除日志。
#由于时间可以删除的日志文件的最小年龄 168小时
log.retention.hours = 168
基于大小的日志保留策略。 除非剩余的段大小在log.retention.bytes之下,否则将从日志中删除段。 功能独立于log.retention.hours限制。
#log.retention.bytes = 1073741824
#日志段文件的最大大小。达到此大小时,将创建新的日志段。
log.segment.bytes = 1073741824
#检查日志段以查看是否可以删除日志段的时间间隔
#保留策略
log.retention.check.interval.ms = 300000
Zookeeper参数设置
Zookeeper连接字符串。默认单台设置,集群需要多台,逗号分隔的主机:端口,每个对应一个Zookeeper。例如“127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002”。
#所有kafka znodes的根目录。集群设置模式
zookeeper.connect =localhost:2181,localhost:2182,localhost:2183
连接到zookeeper的超时时间(以毫秒为单位)
zookeeper.connection.timeout.ms = 6000
kafka Group协调配置
以下配置指定GroupCoordinator将延迟初始消费者重新平衡的时间(以毫秒为单位)。
#当新成员加入组时,重新平衡延迟是group.initial.rebalance.delay.ms的值,最多max.poll.interval.ms。
默认值为3秒。我们将此参数设置为0,方便开发和测试。但是生产环境中默认值推荐3秒更合适,因为这有助于避免在应用程序启动期间不必要且可能很昂贵的重新平衡过程,减少系统资源的消耗。
group.initial.rebalance.delay.ms = 0
在集群和优化情况下需要了解每个参数的确切含义,对于Kafka集群的设置,需要配置多个Zookeeper地址。默认的日志清理、垃圾回收、连接池、线程模型都是非常重要的因素。
使用最新的Java Spring Boot 2.x版本连接Kafka需要在配置文件中修改地址参数:
spring.kafka.consumer.group-id=myGroup
spring.kafka.bootstrap-servers=localhost:9091,localhost:9092,localhost:9093
参考文档 http://kafka.apache.org/
8、阿里巴巴Java群超过2900人
直播地址:Java技术进阶群
进群方式:钉钉扫码入群
阿里巴巴MongoDB群