kafka API consumer

简介: kafka API consumer1.kafka consumer流程1.1.在启动时或者协调节点故障转移时,消费者发送ConsumerMetadataRequest给bootstrap brokers列表中的任意一个brokers。

kafka API consumer
1.kafka consumer流程
1.1.在启动时或者协调节点故障转移时,消费者发送ConsumerMetadataRequest给bootstrap brokers列表中的任意一个brokers。在ConsumerMetadataResponse中,它接收消费者对应的消费组所属的协调节点的位置信息。

1.2.消费者连接协调节点,并发送HeartbeatRequest。如果返回的HeartbeatResponse中返回IllegalGeneration错误码,说明协调节点已经在初始化。消费者就会停止抓取数据,提交offsets,发送JoinGroupRequest给协调节点。在JoinGroupResponse,它接收消费者应该拥有的topic-partitions列表以及当前消费组的新的generation编号。这个时候消费组管理已经完成,消费者就可以开始抓取数据,并为它拥有的partitions提交offsets。

1.3.如果HeartbeatResponse没有错误返回,消费者会从它上次拥有的partitions列表继续抓取数据,这个过程是不会被中断的。

Coordinator协调节点的工作过程:
1.在稳定状态下,协调节点通过故障检测协议跟踪每个消费组中每个消费者的健康状况。

2.在选举和启动时,协调节点读取它管理的消费组列表,以及从ZK中读取每个消费组的成员信息。如果之前没有成员信息,它不会做任何动作。只有在同一个消费组的第一个消费者注册进来时,协调节点才开始工作(即开始加载消费组的消费者成员信息)。

3.当协调节点完全加载完它所负责的消费组列表的所有组成员之前,它会在以下几种请求的响应中返回CoordinatorStartupNotComplete错误码:HeartbeatRequest,OffsetCommitRequest,JoinGroupRequest。这样消费者就会过段时间重试(直到完全加载,没有错误码返回为止)。

4.在选举或启动时,协调节点会对消费组中的所有消费者进行故障检测。根据故障检测协议被协调节点标记为Dead的消费者会从消费组中移除,这个时候协调节点会为Dead的消费者所属的消费组触发一个平衡操作(消费者Dead之后,这个消费者拥有的partition需要平衡给其他消费者)。

5.当HeartbeatResponse返回IllegalGeneration错误码,就会触发平衡操作。一旦所有存活的消费者通过JoinGroupRequests重新注册到协调节点,协调节点会将最新的partition所有权信息在JoinGroupResponse的每个消费者之间通信(同步),然后就完成了平衡操作。

6.协调节点会跟踪任何一个消费者已经注册的topics的topic-partition的变更。如果它检测到某个topic新增的partition,就会触发平衡操作。当创建一个新的topics也会触发平衡操作,因为消费者可以在topic被创建之前就注册它感兴趣的topics。

2.消费者组的使用场景

Kafka里的消费者组有两个使用的场景:
2.1“队列模式”:在同一组的消费者共同消费一个主题的所有消息,而且确保一条消息只被一个消费者处理。一个主题的所有的分区会和一个消费组的所有消费者做关联:一个分区只会与一个消费者关联,它的消息不会被其它的消费者接收。
最开始只有一个消费者时,所有的分区都分配给了它。当消息的规模增加时,我们就需要扩展消费者的数量,水平扩展处理能力,一直可以达到每个消费者只关联一个分区。大于分区数的消费者是会处在空闲状态,因为没有分配任何的分区。

2.2“发布/订阅模式”: 创建不同的消费者组意味一个主题的消息会发送给所有订阅它的消费者组,然后消费者组依照前面共同协作的场景进行分配。这往往是因为我们有不同的应用需求,比如一批交易数据,资金系统、ERP系统会消费它而风险监控也需要同时消费它。这就实现了数据的透明异步共用。

在两个场景中,消费者组有个重要的功能:rebalancing。当一个新的消费者加入一个组,如果还有有效的分区(消费者数<=主题分区数),会开始一个重新均衡分配的操作,会将一个已关联的分区(它的原消费者仍保有至少一个分区)重新分配给新加入的消费者。同样的,当一个消费者因为各种原因离开这个组,它的所有分区会被分配给剩下的消费者。

Subscribe(自动) assign(手动)
前面所说的自动分配是指在 KafkaConsumer API中的subscribe()方法。这个方法强制要求你为消费者设置一个消费者组,group.id参数不能为空。而你不需要处理分区的分配问题。而对应subscribe()方法。你可以采用手动的方式,指定消费者读取哪个主题分区,则:assign() 方法。当你需要精确地控制消息处理的负载,也能确定哪个分区有哪些消息时,这种手动的方式会很有用

3.自动提交方式api
[hadoop@h201 kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --create --zookeeper h201:2181,h202:2181,h203:2181 --replication-factor 2 --partitions 3 --topic topic11

[hadoop@h201 kkk]$ vi cc.java
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.Arrays;
public class cc {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
//设置kafka集群的地址
props.put("bootstrap.servers", "h201:9092,h202:9092,h203:9092");
//设置消费者组,组名字自定义,组名字相同的消费者在一个组
props.put("group.id", "g11");
//开启offset自动提交
props.put("enable.auto.commit", "true");
//自动提交时间间隔
props.put("auto.commit.interval.ms", "1000");
//序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//实例化一个消费者
KafkaConsumer consumer = new KafkaConsumer<>(props);
//消费者订阅主题,可以订阅多个主题
consumer.subscribe(Arrays.asList("topic11"));
//死循环不停的从broker中拿数据
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}

[hadoop@h201 kkk]$ /usr/jdk1.8.0_144/bin/javac -classpath /home/hadoop/kafka_2.12-0.10.2.1/libs/kafka-clients-0.10.2.1.jar cc.java
[hadoop@h201 kkk]$ /usr/jdk1.8.0_144/bin/java cc

解释:
Poll方法用来获取消息 ,poll(拉取)
consumer.poll(100) :100ms内拉取一次数据
Record :为存储的消息,record.value 为消息的内容
原文地址https://www.cnblogs.com/xiguage119/archive/2019/07/24/11241417.html

相关文章
|
2月前
|
消息中间件 Kafka
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
|
5月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
142 58
|
3月前
|
消息中间件 NoSQL Kafka
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
83 5
|
3月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
94 4
|
4月前
|
消息中间件 安全 大数据
Kafka多线程Consumer是实现高并发数据处理的有效手段之一
【9月更文挑战第2天】Kafka多线程Consumer是实现高并发数据处理的有效手段之一
378 4
|
5月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
140 4
|
5月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
107 2
|
5月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
96 8
|
5月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
112 7