kafka consumer基本使用及 ConsumerIterator如何遍历message

简介: kafka简单使用//1、consumer基本配置Properties props = new Properties();props.put("zookeeper.

kafka简单使用

//1、consumer基本配置
Properties props = new Properties();
props.put("zookeeper.connect", zk);
props.put("group.id", groupid);
props.put("autooffset.reset", "largest");
props.put("autocommit.enable", "true");
props.put("client.id", "test");
props.put("auto.commit.interval.ms", "1000");

ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

//描述读取哪个topic,需要几个线程读,一个线程对应着一个KafkaStream
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
        consumerConnector.createMessageStreams(topicCountMap);

KafkaStream<byte[], byte[]> stream1 = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it1 = stream1.iterator();

//遍历消息
while (it1.hasNext()) {  //阻塞直到有消息
    MessageAndMetadata<byte[], byte[]> messageAndMetadata = it1.next();
    String message =
            String.format("Consumer ID:%s, Topic:%s, GroupID:%s, PartitionID:%s, Offset:%s, Message Key:%s, Message Payload: %s",
                    consumerid,
                    messageAndMetadata.topic(), groupid, messageAndMetadata.partition(),
                    messageAndMetadata.offset(), new String(messageAndMetadata.key()), new String(messageAndMetadata.message()));
    System.out.println(message);
}

问题

使用kafka consumer时候,让我比较好奇的一点是,consumer一旦开启后,可以不停地消费消息。

一般使用迭代器时候,比如list的迭代

while(it.hasNext()){
    String text = it.next();
    System.out.println("text:"+text);
    //...
}

在迭代完已有数据之后,就会停止迭代了。那么kafka是如何做到迭代“未来”的数据呢。

阻塞

consumer启动之后,服务不会停止,而是会不断地消费数据,猜测consumer在某个地方阻塞住了。追下ConsumerIterator.hasNext()的源码

img_e3b9c6cebb9f49dfdd51840f46b2f382.png
it.hasNext

state变量初始化时候,赋值为NOT_READY,


img_f5228c37ba2dacd583af4349f251a2b4.png
state

每次调用ConsumerIterator.next()时候,会将其再次重置为NOT_READY


img_8efac2fc1a4a8a163799795ee90d0ed6.png
next

所以每次进行模式匹配时候,都会走到case _分支

img_3dbf35db4fe2a7e7905812043621b248.png
makeNext

consumer超时配置

val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs)

上文主要注意的有两点,超时时间的配置。(默认为-1)当配置为负数时候,会一直阻塞住,直到收到消息。
当配置大于0,比如100,会在100ms之后抛出ConsumerTimeoutException异常。

总结下

1)kafka consumer在ConsumerIterator.hasNext()阻塞获取消息,以实现消费“今后”的消息的功能。
2)当consumer.timeout.ms 配置大于0,这个阻塞会有个超时,比如配置100表示,如果100ms内没有收到消息,会抛出一个ConsumerTimeoutException异常。
默认会一直阻塞下去,直到收到一条消息。

目录
相关文章
|
2月前
|
消息中间件 Kafka API
kafka Consumer high-level api 之白名单
kafka Consumer high-level api 之白名单
|
10月前
|
消息中间件 存储 算法
聊聊 Kafka: Consumer 源码解析之 Consumer 如何加入 Consumer Group
聊聊 Kafka: Consumer 源码解析之 Consumer 如何加入 Consumer Group
574 0
|
12天前
|
消息中间件 负载均衡 Kafka
一文读懂Kafka API:Producer、Consumer和Streams全解析
大家好,今天我们将深入探讨Kafka的三大核心API。通过这篇文章,你将了解如何使用Producer API发布记录流,利用Consumer API订阅和处理数据,以及通过Streams API实现复杂的流处理。一起开启Kafka的探索之旅吧!
29 2
|
5天前
|
消息中间件 存储 Kafka
Kafka基本使用
Kafka基本使用
|
2月前
|
消息中间件 Cloud Native Kafka
一文搞懂 Kafka consumer 与 broker 交互机制与原理
AutoMQ致力于打造下一代云原生Kafka系统,解决Kafka痛点。本文深入解析Kafka Consumer与Broker的交互机制,涉及消费者角色、核心组件及常用接口。消费者以group形式工作,包括leader和follower。交互流程涵盖FindCoordinator、JoinGroup、SyncGroup、拉取消息和退出过程。文章还探讨了broker的consumer group状态管理和rebalance原理。AutoMQ团队分享Kafka技术,感兴趣的话可以关注他们。
71 2
一文搞懂 Kafka consumer 与 broker 交互机制与原理
|
9月前
|
消息中间件 Arthas Java
线上kafka消息堆积,consumer掉线,怎么办?
线上kafka消息堆积,consumer掉线,怎么办?
130 0
|
2月前
|
消息中间件 缓存 Java
Kafka Consumer java api 配置
Kafka Consumer java api 配置
|
10月前
|
消息中间件 设计模式 Java
聊聊 Kafka: Consumer 源码解析之 Rebalance 机制
聊聊 Kafka: Consumer 源码解析之 Rebalance 机制
288 0
|
2月前
|
消息中间件 Kafka Apache
Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)
Apache Kafka 作为分布式流处理平台,其架构中的生产者和消费者是核心组件,负责实现高效的消息生产和消费。本文将深入剖析 Kafka 架构中生产者和消费者的工作原理、核心概念以及高级功能。
|
8月前
|
消息中间件 Kafka
132 Kafka 查找message
132 Kafka 查找message
32 0