Kafka 中的消费者分为新旧两种类型,它们分别是基于不同版本的消费者 API 实现的。本文将详细介绍 Kafka 新旧消费者的区别,包括特性、用法、适用场景等方面,并提供示例代码加以说明。
1. 新旧消费者概述
1.1 旧消费者(Old Consumer)
旧消费者基于 Kafka 0.8.x 版本引入的消费者 API,主要包括 SimpleConsumer
和 ConsumerConnector
两个类。旧消费者 API 在设计上较为简单,但功能有限,不支持消费者组等高级特性。
1.2 新消费者(New Consumer)
新消费者是在 Kafka 0.9 版本引入的,使用 KafkaConsumer
类实现,提供了更强大、灵活的功能,并支持消费者组、自动提交位移、动态分区分配等高级特性。
2. 新旧消费者的区别
2.1 API 设计
- 旧消费者: 使用
SimpleConsumer
和ConsumerConnector
类进行消息消费,较为简单,但功能有限。 - 新消费者: 使用
KafkaConsumer
类进行消息消费,提供了更丰富、灵活的功能,并支持消费者组等高级特性。
2.2 功能特性
旧消费者:
- 不支持消费者组。
- 不支持自动位移提交。
- 不支持动态分区分配。
- 不支持自动重平衡。
新消费者:
- 支持消费者组,可以实现水平扩展和高可用性。
- 支持自动提交位移,简化了位移管理。
- 支持动态分区分配,能够自动处理分区的增加和减少。
- 支持自动重平衡,确保消费者组内各个消费者之间的负载均衡。
2.3 高级特性
- 旧消费者: 缺乏高级特性,适用于简单的消息消费场景。
- 新消费者: 提供了丰富的高级特性,适用于复杂的消息处理场景,如实现 Exactly Once 语义等。
2.4 兼容性
- 旧消费者: 由于基于较旧的消费者 API,可能在后续版本的 Kafka 中逐渐被淘汰。
- 新消费者: 是 Kafka 推荐的消费者 API,具有较好的兼容性和稳定性。
3. 示例代码
下面分别演示使用旧消费者和新消费者消费 Kafka 消息的示例代码:
3.1 旧消费者示例代码
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class OldConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "test-group");
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(props));
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put("test-topic", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get("test-topic").get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> message = it.next();
System.out.println(new String(message.message()));
}
consumer.shutdown();
}
}
3.2 新消费者示例代码
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class NewConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
System.out.println(record.value());
});
}
}
}