Kafka - 消费接口分析

简介:

1.概述

  在 Kafka 中,官方对外提供了两种消费 API,一种是高等级消费 API,另一种是低等级的消费 API。在 《高级消费 API》一文中,介绍了其高级消费的 API 实现。今天给大家介绍另一种消费 API。

2.内容

  在使用过 Kafka 的高级消费 API 后,我们知道它是一种高度抽象的消费 API,使用起来简单,方便,但是对于某些特殊的需求我们可能要用到第二种更加底层的 API。那么,我们首先需要知道低级消费 API 的作用。它能帮助我们去做那些事情:

  • 一个消息进行多次读取
  • 在处理过程中只消费 Partition 其中的某一部分消息
  • 添加事物管理机制以保证消息仅被处理一次

  当然,在使用的过程当中也是有些弊端的,其内容如下:

  • 必须在程序中跟踪 Offset 的值
  • 必须找出指定的 Topic Partition 中的 Lead Broker
  • 必须处理 Broker 的变动

  使用其 API 的思路步骤如下所示:

  • 从所有处于 Active 状态的 Broker 中找出哪个是指定 Topic Partition 中的 Lead Broker
  • 找出指定 Topic Partition 中的所有备份 Broker
  • 构造请求
  • 发送请求并查询数据
  • 处理 Leader Broker 的变动

3.代码实现

3.1 Java Project

  若是使用 Java Project 工程去实现该部分代码,需要添加相关以来 JAR 文件,其内容包含如下:

  • scala-xml_${version}-${version}.jar
  • scala-library-${version}.jar
  • metrics-core-${version}.jar
  • kafka-client-${version}.jar
  • kafka_${version}-${version}.jar

  针对 Java Project 工程,需要自己筛选 JAR 去添加。保证代码的顺利执行。

3.2 Maven Project

  对 Maven 工程,在 pom.xml 文件中添加相应的依赖信息即可,简单方便。让 Maven 去管理相应的依赖 JAR 文件。内容如下所示:

复制代码
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.8.2.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
    </exclusion>
    <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
    </exclusion>
    </exclusions>
</dependency>
复制代码

  这样在 Maven 工程中相应的依赖 JAR 文件就添加完成了。

3.3 代码实现

  在低级消费 API 中,实现代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
/**
  * @Date Mar 2, 2016
  *
  * @Author dengjie
  *
  * @Note Simple consumer api
  */
public  class  SimpleKafkaConsumer {
     private  static  Logger log = LoggerFactory.getLogger(SimpleKafkaConsumer. class );
     private  List<String> m_replicaBrokers =  new  ArrayList<String>();
 
     public  SimpleKafkaConsumer() {
         m_replicaBrokers =  new  ArrayList<String>();
     }
 
     public  static  void  main(String[] args) {
         SimpleKafkaConsumer example =  new  SimpleKafkaConsumer();
         // Max read number
         long  maxReads = SystemConfig.getIntProperty( "kafka.read.max" );
         // To subscribe to the topic
         String topic = SystemConfig.getProperty( "kafka.topic" );
         // Find partition
         int  partition = SystemConfig.getIntProperty( "kafka.partition" );
         // Broker node's ip
         List<String> seeds =  new  ArrayList<String>();
         String[] hosts = SystemConfig.getPropertyArray( "kafka.server.host" "," );
         for  (String host : hosts) {
             seeds.add(host);
         }
         int  port = SystemConfig.getIntProperty( "kafka.server.port" );
         try  {
             example.run(maxReads, topic, partition, seeds, port);
         catch  (Exception e) {
             log.error( "Oops:"  + e);
             e.printStackTrace();
         }
     }
 
     public  void  run( long  a_maxReads, String a_topic,  int  a_partition, List<String> a_seedBrokers,  int  a_port)
             throws  Exception {
         // Get point topic partition's meta
         PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
         if  (metadata ==  null ) {
             log.info( "[SimpleKafkaConsumer.run()] - Can't find metadata for Topic and Partition. Exiting" );
             return ;
         }
         if  (metadata.leader() ==  null ) {
             log.info( "[SimpleKafkaConsumer.run()] - Can't find Leader for Topic and Partition. Exiting" );
             return ;
         }
         String leadBroker = metadata.leader().host();
         String clientName =  "Client_"  + a_topic +  "_"  + a_partition;
 
         SimpleConsumer consumer =  new  SimpleConsumer(leadBroker, a_port,  100000 64  1024 , clientName);
         long  readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(),
                 clientName);
         int  numErrors =  0 ;
         while  (a_maxReads >  0 ) {
             if  (consumer ==  null ) {
                 consumer =  new  SimpleConsumer(leadBroker, a_port,  100000 64  1024 , clientName);
             }
             FetchRequest req =  new  FetchRequestBuilder().clientId(clientName)
                     .addFetch(a_topic, a_partition, readOffset,  100000 ).build();
             FetchResponse fetchResponse = consumer.fetch(req);
 
             if  (fetchResponse.hasError()) {
                 numErrors++;
                 // Something went wrong!
                 short  code = fetchResponse.errorCode(a_topic, a_partition);
                 log.info( "[SimpleKafkaConsumer.run()] - Error fetching data from the Broker:"  + leadBroker
                         " Reason: "  + code);
                 if  (numErrors >  5 )
                     break ;
                 if  (code == ErrorMapping.OffsetOutOfRangeCode()) {
                     // We asked for an invalid offset. For simple case ask for
                     // the last element to reset
                     readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(),
                             clientName);
                     continue ;
                 }
                 consumer.close();
                 consumer =  null ;
                 leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                 continue ;
             }
             numErrors =  0 ;
 
             long  numRead =  0 ;
             for  (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                 long  currentOffset = messageAndOffset.offset();
                 if  (currentOffset < readOffset) {
                     log.info( "[SimpleKafkaConsumer.run()] - Found an old offset: "  + currentOffset +  " Expecting: "
                             + readOffset);
                     continue ;
                 }
 
                 readOffset = messageAndOffset.nextOffset();
                 ByteBuffer payload = messageAndOffset.message().payload();
 
                 byte [] bytes =  new  byte [payload.limit()];
                 payload.get(bytes);
                 System.out.println(String.valueOf(messageAndOffset.offset()) +  ": "  new  String(bytes,  "UTF-8" ));  // Message deal enter
                 numRead++;
                 a_maxReads--;
             }
 
             if  (numRead ==  0 ) {
                 try  {
                     Thread.sleep( 1000 );
                 catch  (InterruptedException ie) {
                 }
             }
         }
         if  (consumer !=  null )
             consumer.close();
     }
 
     public  static  long  getLastOffset(SimpleConsumer consumer, String topic,  int  partition,  long  whichTime,
             String clientName) {
         TopicAndPartition topicAndPartition =  new  TopicAndPartition(topic, partition);
         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =  new  HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
         requestInfo.put(topicAndPartition,  new  PartitionOffsetRequestInfo(whichTime,  1 ));
         kafka.javaapi.OffsetRequest request =  new  kafka.javaapi.OffsetRequest(requestInfo,
                 kafka.api.OffsetRequest.CurrentVersion(), clientName);
         OffsetResponse response = consumer.getOffsetsBefore(request);
 
         if  (response.hasError()) {
             log.info( "[SimpleKafkaConsumer.getLastOffset()] - Error fetching data Offset Data the Broker. Reason: "
                     + response.errorCode(topic, partition));
             return  0 ;
         }
         long [] offsets = response.offsets(topic, partition);
         return  offsets[ 0 ];
     }
 
     /**
      * @param a_oldLeader
      * @param a_topic
      * @param a_partition
      * @param a_port
      * @return String
      * @throws Exception
      *             find next leader broker
      */
     private  String findNewLeader(String a_oldLeader, String a_topic,  int  a_partition,  int  a_port)  throws  Exception {
         for  ( int  i =  0 ; i <  3 ; i++) {
             boolean  goToSleep =  false ;
             PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
             if  (metadata ==  null ) {
                 goToSleep =  true ;
             else  if  (metadata.leader() ==  null ) {
                 goToSleep =  true ;
             else  if  (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i ==  0 ) {
                 // first time through if the leader hasn't changed give
                 // ZooKeeper a second to recover
                 // second time, assume the broker did recover before failover,
                 // or it was a non-Broker issue
                 //
                 goToSleep =  true ;
             else  {
                 return  metadata.leader().host();
             }
             if  (goToSleep) {
                 try  {
                     Thread.sleep( 1000 );
                 catch  (InterruptedException ie) {
                 }
             }
         }
         throw  new  Exception( "Unable to find new leader after Broker failure. Exiting" );
     }
 
     private  PartitionMetadata findLeader(List<String> a_seedBrokers,  int  a_port, String a_topic,  int  a_partition) {
         PartitionMetadata returnMetaData =  null ;
         loop:  for  (String seed : a_seedBrokers) {
             SimpleConsumer consumer =  null ;
             try  {
                 consumer =  new  SimpleConsumer(seed, a_port,  100000 64  1024 "leaderLookup" );
                 List<String> topics = Collections.singletonList(a_topic);
                 TopicMetadataRequest req =  new  TopicMetadataRequest(topics);
                 kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
 
                 List<TopicMetadata> metaData = resp.topicsMetadata();
                 for  (TopicMetadata item : metaData) {
                     for  (PartitionMetadata part : item.partitionsMetadata()) {
                         if  (part.partitionId() == a_partition) {
                             returnMetaData = part;
                             break  loop;
                         }
                     }
                 }
             catch  (Exception e) {
                 log.error( "Error communicating with Broker ["  + seed +  "] to find Leader for ["  + a_topic +  ", "
                         + a_partition +  "] Reason: "  + e);
             finally  {
                 if  (consumer !=  null )
                     consumer.close();
             }
         }
         if  (returnMetaData !=  null ) {
             m_replicaBrokers.clear();
             for  (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                 m_replicaBrokers.add(replica.host());
             }
         }
         return  returnMetaData;
     }
}

4.总结

  在使用 Kafka 低级消费 API 时,要明确我们所使用的业务场景,一般建议还是使用高级消费 API,除非遇到特殊需要。另外,在使用过程中,注意 Leader Broker 的处理,和 Offset 的管理。

5.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

联系方式: 
邮箱:smartloli.org@gmail.com 
Twitter: https://twitter.com/smartloli 
QQ群(Hadoop - 交流社区1): 424769183 
温馨提示:请大家加群的时候写上加群理由(姓名+公司/学校),方便管理员审核,谢谢! 

热爱生活,享受编程,与君共勉!



本文转自哥不是小萝莉博客园博客,原文链接:http://www.cnblogs.com/smartloli/,如需转载请自行联系原作者

相关文章
|
4月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
321 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
4月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
155 11
|
4月前
|
消息中间件 架构师 Java
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
|
10月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
473 5
|
11月前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
203 4
|
11月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
137 2
|
11月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
170 1
|
数据采集 消息中间件 存储
实时数据处理的终极武器:Databricks与Confluent联手打造数据采集与分析的全新篇章!
【9月更文挑战第3天】本文介绍如何结合Databricks与Confluent实现高效实时数据处理。Databricks基于Apache Spark提供简便的大数据处理方式,Confluent则以Kafka为核心,助力实时数据传输。文章详细阐述了利用Kafka进行数据采集,通过Delta Lake存储并导入数据,最终在Databricks上完成数据分析的全流程,展示了一套完整的实时数据处理方案。
154 3
|
11月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
227 0
|
消息中间件 安全 机器人
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
370 0