1.概述
在 Kafka 中,官方对外提供了两种消费 API,一种是高等级消费 API,另一种是低等级的消费 API。在 《高级消费 API》一文中,介绍了其高级消费的 API 实现。今天给大家介绍另一种消费 API。
2.内容
在使用过 Kafka 的高级消费 API 后,我们知道它是一种高度抽象的消费 API,使用起来简单,方便,但是对于某些特殊的需求我们可能要用到第二种更加底层的 API。那么,我们首先需要知道低级消费 API 的作用。它能帮助我们去做那些事情:
- 一个消息进行多次读取
- 在处理过程中只消费 Partition 其中的某一部分消息
- 添加事物管理机制以保证消息仅被处理一次
当然,在使用的过程当中也是有些弊端的,其内容如下:
- 必须在程序中跟踪 Offset 的值
- 必须找出指定的 Topic Partition 中的 Lead Broker
- 必须处理 Broker 的变动
使用其 API 的思路步骤如下所示:
- 从所有处于 Active 状态的 Broker 中找出哪个是指定 Topic Partition 中的 Lead Broker
- 找出指定 Topic Partition 中的所有备份 Broker
- 构造请求
- 发送请求并查询数据
- 处理 Leader Broker 的变动
3.代码实现
3.1 Java Project
若是使用 Java Project 工程去实现该部分代码,需要添加相关以来 JAR 文件,其内容包含如下:
- scala-xml_${version}-${version}.jar
- scala-library-${version}.jar
- metrics-core-${version}.jar
- kafka-client-${version}.jar
- kafka_${version}-${version}.jar
针对 Java Project 工程,需要自己筛选 JAR 去添加。保证代码的顺利执行。
3.2 Maven Project
对 Maven 工程,在 pom.xml 文件中添加相应的依赖信息即可,简单方便。让 Maven 去管理相应的依赖 JAR 文件。内容如下所示:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
这样在 Maven 工程中相应的依赖 JAR 文件就添加完成了。
3.3 代码实现
在低级消费 API 中,实现代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
|
/**
* @Date Mar 2, 2016
*
* @Author dengjie
*
* @Note Simple consumer api
*/
public
class
SimpleKafkaConsumer {
private
static
Logger log = LoggerFactory.getLogger(SimpleKafkaConsumer.
class
);
private
List<String> m_replicaBrokers =
new
ArrayList<String>();
public
SimpleKafkaConsumer() {
m_replicaBrokers =
new
ArrayList<String>();
}
public
static
void
main(String[] args) {
SimpleKafkaConsumer example =
new
SimpleKafkaConsumer();
// Max read number
long
maxReads = SystemConfig.getIntProperty(
"kafka.read.max"
);
// To subscribe to the topic
String topic = SystemConfig.getProperty(
"kafka.topic"
);
// Find partition
int
partition = SystemConfig.getIntProperty(
"kafka.partition"
);
// Broker node's ip
List<String> seeds =
new
ArrayList<String>();
String[] hosts = SystemConfig.getPropertyArray(
"kafka.server.host"
,
","
);
for
(String host : hosts) {
seeds.add(host);
}
int
port = SystemConfig.getIntProperty(
"kafka.server.port"
);
try
{
example.run(maxReads, topic, partition, seeds, port);
}
catch
(Exception e) {
log.error(
"Oops:"
+ e);
e.printStackTrace();
}
}
public
void
run(
long
a_maxReads, String a_topic,
int
a_partition, List<String> a_seedBrokers,
int
a_port)
throws
Exception {
// Get point topic partition's meta
PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
if
(metadata ==
null
) {
log.info(
"[SimpleKafkaConsumer.run()] - Can't find metadata for Topic and Partition. Exiting"
);
return
;
}
if
(metadata.leader() ==
null
) {
log.info(
"[SimpleKafkaConsumer.run()] - Can't find Leader for Topic and Partition. Exiting"
);
return
;
}
String leadBroker = metadata.leader().host();
String clientName =
"Client_"
+ a_topic +
"_"
+ a_partition;
SimpleConsumer consumer =
new
SimpleConsumer(leadBroker, a_port,
100000
,
64
*
1024
, clientName);
long
readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(),
clientName);
int
numErrors =
0
;
while
(a_maxReads >
0
) {
if
(consumer ==
null
) {
consumer =
new
SimpleConsumer(leadBroker, a_port,
100000
,
64
*
1024
, clientName);
}
FetchRequest req =
new
FetchRequestBuilder().clientId(clientName)
.addFetch(a_topic, a_partition, readOffset,
100000
).build();
FetchResponse fetchResponse = consumer.fetch(req);
if
(fetchResponse.hasError()) {
numErrors++;
// Something went wrong!
short
code = fetchResponse.errorCode(a_topic, a_partition);
log.info(
"[SimpleKafkaConsumer.run()] - Error fetching data from the Broker:"
+ leadBroker
+
" Reason: "
+ code);
if
(numErrors >
5
)
break
;
if
(code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask for
// the last element to reset
readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(),
clientName);
continue
;
}
consumer.close();
consumer =
null
;
leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
continue
;
}
numErrors =
0
;
long
numRead =
0
;
for
(MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
long
currentOffset = messageAndOffset.offset();
if
(currentOffset < readOffset) {
log.info(
"[SimpleKafkaConsumer.run()] - Found an old offset: "
+ currentOffset +
" Expecting: "
+ readOffset);
continue
;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte
[] bytes =
new
byte
[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset()) +
": "
+
new
String(bytes,
"UTF-8"
));
// Message deal enter
numRead++;
a_maxReads--;
}
if
(numRead ==
0
) {
try
{
Thread.sleep(
1000
);
}
catch
(InterruptedException ie) {
}
}
}
if
(consumer !=
null
)
consumer.close();
}
public
static
long
getLastOffset(SimpleConsumer consumer, String topic,
int
partition,
long
whichTime,
String clientName) {
TopicAndPartition topicAndPartition =
new
TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
new
HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition,
new
PartitionOffsetRequestInfo(whichTime,
1
));
kafka.javaapi.OffsetRequest request =
new
kafka.javaapi.OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if
(response.hasError()) {
log.info(
"[SimpleKafkaConsumer.getLastOffset()] - Error fetching data Offset Data the Broker. Reason: "
+ response.errorCode(topic, partition));
return
0
;
}
long
[] offsets = response.offsets(topic, partition);
return
offsets[
0
];
}
/**
* @param a_oldLeader
* @param a_topic
* @param a_partition
* @param a_port
* @return String
* @throws Exception
* find next leader broker
*/
private
String findNewLeader(String a_oldLeader, String a_topic,
int
a_partition,
int
a_port)
throws
Exception {
for
(
int
i =
0
; i <
3
; i++) {
boolean
goToSleep =
false
;
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
if
(metadata ==
null
) {
goToSleep =
true
;
}
else
if
(metadata.leader() ==
null
) {
goToSleep =
true
;
}
else
if
(a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i ==
0
) {
// first time through if the leader hasn't changed give
// ZooKeeper a second to recover
// second time, assume the broker did recover before failover,
// or it was a non-Broker issue
//
goToSleep =
true
;
}
else
{
return
metadata.leader().host();
}
if
(goToSleep) {
try
{
Thread.sleep(
1000
);
}
catch
(InterruptedException ie) {
}
}
}
throw
new
Exception(
"Unable to find new leader after Broker failure. Exiting"
);
}
private
PartitionMetadata findLeader(List<String> a_seedBrokers,
int
a_port, String a_topic,
int
a_partition) {
PartitionMetadata returnMetaData =
null
;
loop:
for
(String seed : a_seedBrokers) {
SimpleConsumer consumer =
null
;
try
{
consumer =
new
SimpleConsumer(seed, a_port,
100000
,
64
*
1024
,
"leaderLookup"
);
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req =
new
TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
for
(TopicMetadata item : metaData) {
for
(PartitionMetadata part : item.partitionsMetadata()) {
if
(part.partitionId() == a_partition) {
returnMetaData = part;
break
loop;
}
}
}
}
catch
(Exception e) {
log.error(
"Error communicating with Broker ["
+ seed +
"] to find Leader for ["
+ a_topic +
", "
+ a_partition +
"] Reason: "
+ e);
}
finally
{
if
(consumer !=
null
)
consumer.close();
}
}
if
(returnMetaData !=
null
) {
m_replicaBrokers.clear();
for
(kafka.cluster.Broker replica : returnMetaData.replicas()) {
m_replicaBrokers.add(replica.host());
}
}
return
returnMetaData;
}
}
|
4.总结
在使用 Kafka 低级消费 API 时,要明确我们所使用的业务场景,一般建议还是使用高级消费 API,除非遇到特殊需要。另外,在使用过程中,注意 Leader Broker 的处理,和 Offset 的管理。
5.结束语
这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!