Kafka broker配置介绍 (四)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介:

这部分内容对了解系统和提高软件性能都有很大的帮助,kafka官网上也给出了比较详细的配置详单,但是我们还是直接从代码来看broker到底有哪些配置需要我们去了解的,配置都有英文注释,所以每一部分是干什么的就不翻译了,都能看懂:

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
package  kafka.server
 
import  java.util.Properties
import  kafka.utils.{Utils, ZKConfig}
import  kafka.message.Message
 
/**
  * Configuration settings for the kafka server
  */
class  KafkaConfig(props :  Properties) extends  ZKConfig(props) {
   /* the port to listen and accept connections on */
   val  port :  Int =  Utils.getInt(props, "port" , 6667 )
 
   /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
   val  hostName :  String =  Utils.getString(props, "hostname" , null )
 
   /* the broker id for this server */
   val  brokerId :  Int =  Utils.getInt(props, "brokerid" )
   
   /* the SO_SNDBUFF buffer of the socket sever sockets */
   val  socketSendBuffer :  Int =  Utils.getInt(props, "socket.send.buffer" , 100 * 1024 )
   
   /* the SO_RCVBUFF buffer of the socket sever sockets */
   val  socketReceiveBuffer :  Int =  Utils.getInt(props, "socket.receive.buffer" , 100 * 1024 )
   
   /* the maximum number of bytes in a socket request */
   val  maxSocketRequestSize :  Int =  Utils.getIntInRange(props, "max.socket.request.bytes" , 100 * 1024 * 1024 , ( 1 , Int.MaxValue))
 
   /* the maximum size of message that the server can receive */
   val  maxMessageSize =  Utils.getIntInRange(props, "max.message.size" , 1000000 , ( 0 , Int.MaxValue))
 
   /* the number of worker threads that the server uses for handling all client requests*/
   val  numThreads =  Utils.getIntInRange(props, "num.threads" , Runtime.getRuntime().availableProcessors, ( 1 , Int.MaxValue))
   
   /* the interval in which to measure performance statistics */
   val  monitoringPeriodSecs =  Utils.getIntInRange(props, "monitoring.period.secs" , 600 , ( 1 , Int.MaxValue))
   
   /* the default number of log partitions per topic */
   val  numPartitions =  Utils.getIntInRange(props, "num.partitions" , 1 , ( 1 , Int.MaxValue))
   
   /* the directory in which the log data is kept */
   val  logDir =  Utils.getString(props, "log.dir" )
   
   /* the maximum size of a single log file */
   val  logFileSize =  Utils.getIntInRange(props, "log.file.size" , 1 * 1024 * 1024 * 1024 , (Message.MinHeaderSize, Int.MaxValue))
 
   /* the maximum size of a single log file for some specific topic */
   val  logFileSizeMap =  Utils.getTopicFileSize(Utils.getString(props, "topic.log.file.size" , "" ))
 
   /* the maximum time before a new log segment is rolled out */
   val  logRollHours =  Utils.getIntInRange(props, "log.roll.hours" , 24 * 7 , ( 1 , Int.MaxValue))
 
   /* the number of hours before rolling out a new log segment for some specific topic */
   val  logRollHoursMap =  Utils.getTopicRollHours(Utils.getString(props, "topic.log.roll.hours" , "" ))
 
   /* the number of hours to keep a log file before deleting it */
   val  logRetentionHours =  Utils.getIntInRange(props, "log.retention.hours" , 24 * 7 , ( 1 , Int.MaxValue))
 
   /* the number of hours to keep a log file before deleting it for some specific topic*/
   val  logRetentionHoursMap =  Utils.getTopicRetentionHours(Utils.getString(props, "topic.log.retention.hours" , "" ))
   
   /* the maximum size of the log before deleting it */
   val  logRetentionSize =  Utils.getLong(props, "log.retention.size" , - 1 )
 
   /* the maximum size of the log for some specific topic before deleting it */
   val  logRetentionSizeMap =  Utils.getTopicRetentionSize(Utils.getString(props, "topic.log.retention.size" , "" ))
 
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
   val  logCleanupIntervalMinutes =  Utils.getIntInRange(props, "log.cleanup.interval.mins" , 10 , ( 1 , Int.MaxValue))
   
   /* enable zookeeper registration in the server */
   val  enableZookeeper =  Utils.getBoolean(props, "enable.zookeeper" , true )
 
   /* the number of messages accumulated on a log partition before messages are flushed to disk */
   val  flushInterval =  Utils.getIntInRange(props, "log.flush.interval" , 500 , ( 1 , Int.MaxValue))
 
   /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
   val  flushIntervalMap =  Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms" , "" ))
 
   /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
   val  flushSchedulerThreadRate =  Utils.getInt(props, "log.default.flush.scheduler.interval.ms" 3000 )
 
   /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
   val  defaultFlushIntervalMs =  Utils.getInt(props, "log.default.flush.interval.ms" , flushSchedulerThreadRate)
 
    /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
   val  topicPartitionsMap =  Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map" , "" ))
 
   /* the maximum length of topic name*/
   val  maxTopicNameLength =  Utils.getIntInRange(props, "max.topic.name.length" , 255 , ( 1 , Int.MaxValue))
}

上面这段代码来自kafka.server包下的KafkaConfig类,之前我们就说过,broker就是kafka中的server,所以讲配置放在这个包中也不奇怪。这里我们顺着代码往下读,也顺便看看scala的语法。和java一样也要import相关的包,kafka将同一包内的两个类写在大括号中:

1
import  kafka.utils.{Utils, ZKConfig}

然后我们看类的写法:

1
class  KafkaConfig(props :  Properties) extends  ZKConfig(props)

我们看到在加载kafkaConfig的时候会加载一个properties对象,同时也会加载有关zookeeper的properties,这个时候我们可以回忆一下,之前我们启动kafka broker的命令:

1.  启动zookeeper server :bin/zookeeper-server-start.sh ../config/zookeeper.properties  & (用&是为了能退出命令行)

2.  启动kafka server:  bin/kafka-server-start.sh ../config/server.properties  &

所以你能明白,初始化kafka broker的时候程序一定是去加载位于config文件夹下的properties,这个和java都一样没有区别。当然properties我们也可以通过程序来给出,这个我们后面再说,继续看我们的代码。既然找到了对应的properties文件,我们就结合代码和properties一起来看。

Kafka broker的properties中,将配置分为以下六类:

l  Server Basics:关于brokerid,hostname等配置

l  Socket Server Settings:关于传输的配置,端口、buffer的区间等。

l  Log Basics:配置log的位置和partition的数量。

l  Log Flush Policy:这部分是kafka配置中最重要的部分,决定了数据flush到disk的策略。

l  Log Retention Policy:这部分主要配置日志处理时的策略。

l  Zookeeper:配置zookeeper的相关信息。

在文件properties中的配置均出现在kafkaConfig这个类中,我们再看看kafkaConfig中的代码:

1
2
3
4
5
/* the broker id for this server */
   val  brokerId :  Int =  Utils.getInt(props, "brokerid" )
   
   /* the SO_SNDBUFF buffer of the socket sever sockets */
   val  socketSendBuffer :  Int =  Utils.getInt(props, "socket.send.buffer" , 100 * 1024 )

凡是参数中有三个的,最后一个是default,而参数只有两个的则要求你一定要配置,否则的话则报错。当然在这么多参数中肯定是有一些经验参数的,至于这些参数怎么配置我确实没有一个特别的推荐,需要在不断的测试中才能磨合出来。

当然你也可以将配置写在程序里,然后通过程序去启动broker,这样kafka的配置就可以像下面一样写:

1
2
3
Properties props = new Properties();
props.setProperty( "port" , "9093" );
props.setProperty( "log.dir" , "/home/kafka/data1" );

我倒是觉得配置还是直接写在配置文件中比较好,如果需要修改也不会影响正在运行的服务,写在内存中,总是会有些不方便的地方。所以还是建议大家都写配置好了,后面讲到的producer和consumer都一样。

这里再提两个参数一个是brokerid,每个broker的id必须要区分;第二个参数是hostname,这个是broker和producer、consumer联系的关键,这里记住一定要改成你的地址和端口,否则永远连得都是localhost。

 

--------------------------------------------------------

下一篇将写producer和consumer的配置了,涉及到这部分就要开始编程了,写着写着又往源码里看进去了,下篇会先讲如何搭建开发环境,然后再写两个简单那的例子去熟悉配置。


本文转自快乐就好博客园博客,原文链接:http://www.cnblogs.com/happyday56/p/4210934.html,如需转载请自行联系原作者
相关文章
|
5月前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之想要加快消费 Kafka 数据的速度,该怎么配置参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
消息中间件 存储 缓存
Kafka(三)【Broker 存储】(1)
Kafka(三)【Broker 存储】
|
2月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
38 2
|
2月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
70 7
|
3月前
|
消息中间件 Kafka
面试题Kafka问题之RabbitMQ的路由配置工作如何解决
面试题Kafka问题之RabbitMQ的路由配置工作如何解决
53 1
|
3月前
|
消息中间件 NoSQL Redis
实时计算 Flink版产品使用问题之配置了最大连续失败数不为1,在Kafka的精准一次sink中,如果ck失败了,这批数据是否会丢失
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 存储 缓存
微服务数据问题之Kafka的默认复制配置如何解决
微服务数据问题之Kafka的默认复制配置如何解决
|
4月前
|
消息中间件 Java Kafka
集成Kafka到Spring Boot项目中的步骤和配置
集成Kafka到Spring Boot项目中的步骤和配置
167 7
|
3月前
|
消息中间件 存储 资源调度
实时计算 Flink版产品使用问题之在消费Kafka的Avro消息,如何配置FlinkKafka消费者的相关参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 NoSQL Kafka
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(2)
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(2)

热门文章

最新文章

下一篇
无影云桌面