这部分内容对了解系统和提高软件性能都有很大的帮助,kafka官网上也给出了比较详细的配置详单,但是我们还是直接从代码来看broker到底有哪些配置需要我们去了解的,配置都有英文注释,所以每一部分是干什么的就不翻译了,都能看懂:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
kafka.server
import
java.util.Properties
import
kafka.utils.{Utils, ZKConfig}
import
kafka.message.Message
/**
* Configuration settings for the kafka server
*/
class
KafkaConfig(props
:
Properties)
extends
ZKConfig(props) {
/* the port to listen and accept connections on */
val
port
:
Int
=
Utils.getInt(props,
"port"
,
6667
)
/* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
val
hostName
:
String
=
Utils.getString(props,
"hostname"
,
null
)
/* the broker id for this server */
val
brokerId
:
Int
=
Utils.getInt(props,
"brokerid"
)
/* the SO_SNDBUFF buffer of the socket sever sockets */
val
socketSendBuffer
:
Int
=
Utils.getInt(props,
"socket.send.buffer"
,
100
*
1024
)
/* the SO_RCVBUFF buffer of the socket sever sockets */
val
socketReceiveBuffer
:
Int
=
Utils.getInt(props,
"socket.receive.buffer"
,
100
*
1024
)
/* the maximum number of bytes in a socket request */
val
maxSocketRequestSize
:
Int
=
Utils.getIntInRange(props,
"max.socket.request.bytes"
,
100
*
1024
*
1024
, (
1
, Int.MaxValue))
/* the maximum size of message that the server can receive */
val
maxMessageSize
=
Utils.getIntInRange(props,
"max.message.size"
,
1000000
, (
0
, Int.MaxValue))
/* the number of worker threads that the server uses for handling all client requests*/
val
numThreads
=
Utils.getIntInRange(props,
"num.threads"
, Runtime.getRuntime().availableProcessors, (
1
, Int.MaxValue))
/* the interval in which to measure performance statistics */
val
monitoringPeriodSecs
=
Utils.getIntInRange(props,
"monitoring.period.secs"
,
600
, (
1
, Int.MaxValue))
/* the default number of log partitions per topic */
val
numPartitions
=
Utils.getIntInRange(props,
"num.partitions"
,
1
, (
1
, Int.MaxValue))
/* the directory in which the log data is kept */
val
logDir
=
Utils.getString(props,
"log.dir"
)
/* the maximum size of a single log file */
val
logFileSize
=
Utils.getIntInRange(props,
"log.file.size"
,
1
*
1024
*
1024
*
1024
, (Message.MinHeaderSize, Int.MaxValue))
/* the maximum size of a single log file for some specific topic */
val
logFileSizeMap
=
Utils.getTopicFileSize(Utils.getString(props,
"topic.log.file.size"
,
""
))
/* the maximum time before a new log segment is rolled out */
val
logRollHours
=
Utils.getIntInRange(props,
"log.roll.hours"
,
24
*
7
, (
1
, Int.MaxValue))
/* the number of hours before rolling out a new log segment for some specific topic */
val
logRollHoursMap
=
Utils.getTopicRollHours(Utils.getString(props,
"topic.log.roll.hours"
,
""
))
/* the number of hours to keep a log file before deleting it */
val
logRetentionHours
=
Utils.getIntInRange(props,
"log.retention.hours"
,
24
*
7
, (
1
, Int.MaxValue))
/* the number of hours to keep a log file before deleting it for some specific topic*/
val
logRetentionHoursMap
=
Utils.getTopicRetentionHours(Utils.getString(props,
"topic.log.retention.hours"
,
""
))
/* the maximum size of the log before deleting it */
val
logRetentionSize
=
Utils.getLong(props,
"log.retention.size"
, -
1
)
/* the maximum size of the log for some specific topic before deleting it */
val
logRetentionSizeMap
=
Utils.getTopicRetentionSize(Utils.getString(props,
"topic.log.retention.size"
,
""
))
/* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
val
logCleanupIntervalMinutes
=
Utils.getIntInRange(props,
"log.cleanup.interval.mins"
,
10
, (
1
, Int.MaxValue))
/* enable zookeeper registration in the server */
val
enableZookeeper
=
Utils.getBoolean(props,
"enable.zookeeper"
,
true
)
/* the number of messages accumulated on a log partition before messages are flushed to disk */
val
flushInterval
=
Utils.getIntInRange(props,
"log.flush.interval"
,
500
, (
1
, Int.MaxValue))
/* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */
val
flushIntervalMap
=
Utils.getTopicFlushIntervals(Utils.getString(props,
"topic.flush.intervals.ms"
,
""
))
/* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
val
flushSchedulerThreadRate
=
Utils.getInt(props,
"log.default.flush.scheduler.interval.ms"
,
3000
)
/* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
val
defaultFlushIntervalMs
=
Utils.getInt(props,
"log.default.flush.interval.ms"
, flushSchedulerThreadRate)
/* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
val
topicPartitionsMap
=
Utils.getTopicPartitions(Utils.getString(props,
"topic.partition.count.map"
,
""
))
/* the maximum length of topic name*/
val
maxTopicNameLength
=
Utils.getIntInRange(props,
"max.topic.name.length"
,
255
, (
1
, Int.MaxValue))
}
|
上面这段代码来自kafka.server包下的KafkaConfig类,之前我们就说过,broker就是kafka中的server,所以讲配置放在这个包中也不奇怪。这里我们顺着代码往下读,也顺便看看scala的语法。和java一样也要import相关的包,kafka将同一包内的两个类写在大括号中:
1
|
import
kafka.utils.{Utils, ZKConfig}
|
然后我们看类的写法:
1
|
class
KafkaConfig(props
:
Properties)
extends
ZKConfig(props)
|
我们看到在加载kafkaConfig的时候会加载一个properties对象,同时也会加载有关zookeeper的properties,这个时候我们可以回忆一下,之前我们启动kafka broker的命令:
1. 启动zookeeper server :bin/zookeeper-server-start.sh ../config/zookeeper.properties & (用&是为了能退出命令行)
2. 启动kafka server: bin/kafka-server-start.sh ../config/server.properties &
所以你能明白,初始化kafka broker的时候程序一定是去加载位于config文件夹下的properties,这个和java都一样没有区别。当然properties我们也可以通过程序来给出,这个我们后面再说,继续看我们的代码。既然找到了对应的properties文件,我们就结合代码和properties一起来看。
Kafka broker的properties中,将配置分为以下六类:
l Server Basics:关于brokerid,hostname等配置
l Socket Server Settings:关于传输的配置,端口、buffer的区间等。
l Log Basics:配置log的位置和partition的数量。
l Log Flush Policy:这部分是kafka配置中最重要的部分,决定了数据flush到disk的策略。
l Log Retention Policy:这部分主要配置日志处理时的策略。
l Zookeeper:配置zookeeper的相关信息。
在文件properties中的配置均出现在kafkaConfig这个类中,我们再看看kafkaConfig中的代码:
1
2
3
4
5
|
/* the broker id for this server */
val
brokerId
:
Int
=
Utils.getInt(props,
"brokerid"
)
/* the SO_SNDBUFF buffer of the socket sever sockets */
val
socketSendBuffer
:
Int
=
Utils.getInt(props,
"socket.send.buffer"
,
100
*
1024
)
|
凡是参数中有三个的,最后一个是default,而参数只有两个的则要求你一定要配置,否则的话则报错。当然在这么多参数中肯定是有一些经验参数的,至于这些参数怎么配置我确实没有一个特别的推荐,需要在不断的测试中才能磨合出来。
当然你也可以将配置写在程序里,然后通过程序去启动broker,这样kafka的配置就可以像下面一样写:
1
2
3
|
Properties props = new Properties();
props.setProperty(
"port"
,
"9093"
);
props.setProperty(
"log.dir"
,
"/home/kafka/data1"
);
|
我倒是觉得配置还是直接写在配置文件中比较好,如果需要修改也不会影响正在运行的服务,写在内存中,总是会有些不方便的地方。所以还是建议大家都写配置好了,后面讲到的producer和consumer都一样。
这里再提两个参数一个是brokerid,每个broker的id必须要区分;第二个参数是hostname,这个是broker和producer、consumer联系的关键,这里记住一定要改成你的地址和端口,否则永远连得都是localhost。
--------------------------------------------------------
下一篇将写producer和consumer的配置了,涉及到这部分就要开始编程了,写着写着又往源码里看进去了,下篇会先讲如何搭建开发环境,然后再写两个简单那的例子去熟悉配置。