Kafka - 消费接口分析

简介:

1.概述

  在 Kafka 中,官方对外提供了两种消费 API,一种是高等级消费 API,另一种是低等级的消费 API。在 《高级消费 API》一文中,介绍了其高级消费的 API 实现。今天给大家介绍另一种消费 API。

2.内容

  在使用过 Kafka 的高级消费 API 后,我们知道它是一种高度抽象的消费 API,使用起来简单,方便,但是对于某些特殊的需求我们可能要用到第二种更加底层的 API。那么,我们首先需要知道低级消费 API 的作用。它能帮助我们去做那些事情:

  • 一个消息进行多次读取
  • 在处理过程中只消费 Partition 其中的某一部分消息
  • 添加事物管理机制以保证消息仅被处理一次

  当然,在使用的过程当中也是有些弊端的,其内容如下:

  • 必须在程序中跟踪 Offset 的值
  • 必须找出指定的 Topic Partition 中的 Lead Broker
  • 必须处理 Broker 的变动

  使用其 API 的思路步骤如下所示:

  • 从所有处于 Active 状态的 Broker 中找出哪个是指定 Topic Partition 中的 Lead Broker
  • 找出指定 Topic Partition 中的所有备份 Broker
  • 构造请求
  • 发送请求并查询数据
  • 处理 Leader Broker 的变动

3.代码实现

3.1 Java Project

  若是使用 Java Project 工程去实现该部分代码,需要添加相关以来 JAR 文件,其内容包含如下:

  • scala-xml_${version}-${version}.jar
  • scala-library-${version}.jar
  • metrics-core-${version}.jar
  • kafka-client-${version}.jar
  • kafka_${version}-${version}.jar

  针对 Java Project 工程,需要自己筛选 JAR 去添加。保证代码的顺利执行。

3.2 Maven Project

  对 Maven 工程,在 pom.xml 文件中添加相应的依赖信息即可,简单方便。让 Maven 去管理相应的依赖 JAR 文件。内容如下所示:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.8.2.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
    </exclusion>
    <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
    </exclusion>
    </exclusions>
</dependency>

 这样在 Maven 工程中相应的依赖 JAR 文件就添加完成了。

3.3 代码实现

  在低级消费 API 中,实现代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
/**
  * @Date Mar 2, 2016
  *
  * @Author dengjie
  *
  * @Note Simple consumer api
  */
public class SimpleKafkaConsumer {
     private static Logger log = LoggerFactory.getLogger(SimpleKafkaConsumer. class );
     private List<String> m_replicaBrokers = new ArrayList<String>();
 
     public SimpleKafkaConsumer() {
         m_replicaBrokers = new ArrayList<String>();
     }
 
     public static void main(String[] args) {
         SimpleKafkaConsumer example = new SimpleKafkaConsumer();
         // Max read number
         long maxReads = SystemConfig.getIntProperty( "kafka.read.max" );
         // To subscribe to the topic
         String topic = SystemConfig.getProperty( "kafka.topic" );
         // Find partition
         int partition = SystemConfig.getIntProperty( "kafka.partition" );
         // Broker node's ip
         List<String> seeds = new ArrayList<String>();
         String[] hosts = SystemConfig.getPropertyArray( "kafka.server.host" , "," );
         for (String host : hosts) {
             seeds.add(host);
         }
         int port = SystemConfig.getIntProperty( "kafka.server.port" );
         try {
             example.run(maxReads, topic, partition, seeds, port);
         } catch (Exception e) {
             log.error( "Oops:" + e);
             e.printStackTrace();
         }
     }
 
     public void run( long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port)
             throws Exception {
         // Get point topic partition's meta
         PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
         if (metadata == null ) {
             log.info( "[SimpleKafkaConsumer.run()] - Can't find metadata for Topic and Partition. Exiting" );
             return ;
         }
         if (metadata.leader() == null ) {
             log.info( "[SimpleKafkaConsumer.run()] - Can't find Leader for Topic and Partition. Exiting" );
             return ;
         }
         String leadBroker = metadata.leader().host();
         String clientName = "Client_" + a_topic + "_" + a_partition;
 
         SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000 , 64 * 1024 , clientName);
         long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(),
                 clientName);
         int numErrors = 0 ;
         while (a_maxReads > 0 ) {
             if (consumer == null ) {
                 consumer = new SimpleConsumer(leadBroker, a_port, 100000 , 64 * 1024 , clientName);
             }
             FetchRequest req = new FetchRequestBuilder().clientId(clientName)
                     .addFetch(a_topic, a_partition, readOffset, 100000 ).build();
             FetchResponse fetchResponse = consumer.fetch(req);
 
             if (fetchResponse.hasError()) {
                 numErrors++;
                 // Something went wrong!
                 short code = fetchResponse.errorCode(a_topic, a_partition);
                 log.info( "[SimpleKafkaConsumer.run()] - Error fetching data from the Broker:" + leadBroker
                         + " Reason: " + code);
                 if (numErrors > 5 )
                     break ;
                 if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                     // We asked for an invalid offset. For simple case ask for
                     // the last element to reset
                     readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(),
                             clientName);
                     continue ;
                 }
                 consumer.close();
                 consumer = null ;
                 leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                 continue ;
             }
             numErrors = 0 ;
 
             long numRead = 0 ;
             for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                 long currentOffset = messageAndOffset.offset();
                 if (currentOffset < readOffset) {
                     log.info( "[SimpleKafkaConsumer.run()] - Found an old offset: " + currentOffset + " Expecting: "
                             + readOffset);
                     continue ;
                 }
 
                 readOffset = messageAndOffset.nextOffset();
                 ByteBuffer payload = messageAndOffset.message().payload();
 
                 byte [] bytes = new byte [payload.limit()];
                 payload.get(bytes);
                 System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8" )); // Message deal enter
                 numRead++;
                 a_maxReads--;
             }
 
             if (numRead == 0 ) {
                 try {
                     Thread.sleep( 1000 );
                 } catch (InterruptedException ie) {
                 }
             }
         }
         if (consumer != null )
             consumer.close();
     }
 
     public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime,
             String clientName) {
         TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
         requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1 ));
         kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo,
                 kafka.api.OffsetRequest.CurrentVersion(), clientName);
         OffsetResponse response = consumer.getOffsetsBefore(request);
 
         if (response.hasError()) {
             log.info( "[SimpleKafkaConsumer.getLastOffset()] - Error fetching data Offset Data the Broker. Reason: "
                     + response.errorCode(topic, partition));
             return 0 ;
         }
         long [] offsets = response.offsets(topic, partition);
         return offsets[ 0 ];
     }
 
     /**
      * @param a_oldLeader
      * @param a_topic
      * @param a_partition
      * @param a_port
      * @return String
      * @throws Exception
      *             find next leader broker
      */
     private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
         for ( int i = 0 ; i < 3 ; i++) {
             boolean goToSleep = false ;
             PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
             if (metadata == null ) {
                 goToSleep = true ;
             } else if (metadata.leader() == null ) {
                 goToSleep = true ;
             } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0 ) {
                 // first time through if the leader hasn't changed give
                 // ZooKeeper a second to recover
                 // second time, assume the broker did recover before failover,
                 // or it was a non-Broker issue
                 //
                 goToSleep = true ;
             } else {
                 return metadata.leader().host();
             }
             if (goToSleep) {
                 try {
                     Thread.sleep( 1000 );
                 } catch (InterruptedException ie) {
                 }
             }
         }
         throw new Exception( "Unable to find new leader after Broker failure. Exiting" );
     }
 
     private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
         PartitionMetadata returnMetaData = null ;
         loop: for (String seed : a_seedBrokers) {
             SimpleConsumer consumer = null ;
             try {
                 consumer = new SimpleConsumer(seed, a_port, 100000 , 64 * 1024 , "leaderLookup" );
                 List<String> topics = Collections.singletonList(a_topic);
                 TopicMetadataRequest req = new TopicMetadataRequest(topics);
                 kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
 
                 List<TopicMetadata> metaData = resp.topicsMetadata();
                 for (TopicMetadata item : metaData) {
                     for (PartitionMetadata part : item.partitionsMetadata()) {
                         if (part.partitionId() == a_partition) {
                             returnMetaData = part;
                             break loop;
                         }
                     }
                 }
             } catch (Exception e) {
                 log.error( "Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", "
                         + a_partition + "] Reason: " + e);
             } finally {
                 if (consumer != null )
                     consumer.close();
             }
         }
         if (returnMetaData != null ) {
             m_replicaBrokers.clear();
             for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                 m_replicaBrokers.add(replica.host());
             }
         }
         return returnMetaData;
     }
}

4.总结

  在使用 Kafka 低级消费 API 时,要明确我们所使用的业务场景,一般建议还是使用高级消费 API,除非遇到特殊需要。另外,在使用过程中,注意 Leader Broker 的处理,和 Offset 的管理。

5.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

目录
相关文章
|
消息中间件 Java Kafka
|
消息中间件 Kafka 编解码
|
消息中间件 Kafka
|
6天前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
15天前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
19 1
|
2月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
159 9
|
2月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
63 3
|
2月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
99 0
|
2月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。