kafka consumer基本使用及 ConsumerIterator如何遍历message

简介: kafka简单使用//1、consumer基本配置Properties props = new Properties();props.put("zookeeper.

kafka简单使用

//1、consumer基本配置
Properties props = new Properties();
props.put("zookeeper.connect", zk);
props.put("group.id", groupid);
props.put("autooffset.reset", "largest");
props.put("autocommit.enable", "true");
props.put("client.id", "test");
props.put("auto.commit.interval.ms", "1000");

ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

//描述读取哪个topic,需要几个线程读,一个线程对应着一个KafkaStream
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
        consumerConnector.createMessageStreams(topicCountMap);

KafkaStream<byte[], byte[]> stream1 = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it1 = stream1.iterator();

//遍历消息
while (it1.hasNext()) {  //阻塞直到有消息
    MessageAndMetadata<byte[], byte[]> messageAndMetadata = it1.next();
    String message =
            String.format("Consumer ID:%s, Topic:%s, GroupID:%s, PartitionID:%s, Offset:%s, Message Key:%s, Message Payload: %s",
                    consumerid,
                    messageAndMetadata.topic(), groupid, messageAndMetadata.partition(),
                    messageAndMetadata.offset(), new String(messageAndMetadata.key()), new String(messageAndMetadata.message()));
    System.out.println(message);
}

问题

使用kafka consumer时候,让我比较好奇的一点是,consumer一旦开启后,可以不停地消费消息。

一般使用迭代器时候,比如list的迭代

while(it.hasNext()){
    String text = it.next();
    System.out.println("text:"+text);
    //...
}

在迭代完已有数据之后,就会停止迭代了。那么kafka是如何做到迭代“未来”的数据呢。

阻塞

consumer启动之后,服务不会停止,而是会不断地消费数据,猜测consumer在某个地方阻塞住了。追下ConsumerIterator.hasNext()的源码

img_e3b9c6cebb9f49dfdd51840f46b2f382.png
it.hasNext

state变量初始化时候,赋值为NOT_READY,


img_f5228c37ba2dacd583af4349f251a2b4.png
state

每次调用ConsumerIterator.next()时候,会将其再次重置为NOT_READY


img_8efac2fc1a4a8a163799795ee90d0ed6.png
next

所以每次进行模式匹配时候,都会走到case _分支

img_3dbf35db4fe2a7e7905812043621b248.png
makeNext

consumer超时配置

val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs)

上文主要注意的有两点,超时时间的配置。(默认为-1)当配置为负数时候,会一直阻塞住,直到收到消息。
当配置大于0,比如100,会在100ms之后抛出ConsumerTimeoutException异常。

总结下

1)kafka consumer在ConsumerIterator.hasNext()阻塞获取消息,以实现消费“今后”的消息的功能。
2)当consumer.timeout.ms 配置大于0,这个阻塞会有个超时,比如配置100表示,如果100ms内没有收到消息,会抛出一个ConsumerTimeoutException异常。
默认会一直阻塞下去,直到收到一条消息。

目录
相关文章
|
8月前
|
消息中间件 Kafka API
kafka Consumer high-level api 之白名单
kafka Consumer high-level api 之白名单
|
2月前
|
消息中间件 Kafka
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
|
3月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
103 4
|
4月前
|
消息中间件 安全 大数据
Kafka多线程Consumer是实现高并发数据处理的有效手段之一
【9月更文挑战第2天】Kafka多线程Consumer是实现高并发数据处理的有效手段之一
416 4
|
5月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
173 4
|
5月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
138 2
|
5月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
104 8
|
5月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
127 7
|
5月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
217 4

热门文章

最新文章