Kafka - 消费接口分析

简介:

1.概述

  在 Kafka 中,官方对外提供了两种消费 API,一种是高等级消费 API,另一种是低等级的消费 API。在 《高级消费 API》一文中,介绍了其高级消费的 API 实现。今天给大家介绍另一种消费 API。

2.内容

  在使用过 Kafka 的高级消费 API 后,我们知道它是一种高度抽象的消费 API,使用起来简单,方便,但是对于某些特殊的需求我们可能要用到第二种更加底层的 API。那么,我们首先需要知道低级消费 API 的作用。它能帮助我们去做那些事情:

  • 一个消息进行多次读取
  • 在处理过程中只消费 Partition 其中的某一部分消息
  • 添加事物管理机制以保证消息仅被处理一次

  当然,在使用的过程当中也是有些弊端的,其内容如下:

  • 必须在程序中跟踪 Offset 的值
  • 必须找出指定的 Topic Partition 中的 Lead Broker
  • 必须处理 Broker 的变动

  使用其 API 的思路步骤如下所示:

  • 从所有处于 Active 状态的 Broker 中找出哪个是指定 Topic Partition 中的 Lead Broker
  • 找出指定 Topic Partition 中的所有备份 Broker
  • 构造请求
  • 发送请求并查询数据
  • 处理 Leader Broker 的变动

3.代码实现

3.1 Java Project

  若是使用 Java Project 工程去实现该部分代码,需要添加相关以来 JAR 文件,其内容包含如下:

  • scala-xml_${version}-${version}.jar
  • scala-library-${version}.jar
  • metrics-core-${version}.jar
  • kafka-client-${version}.jar
  • kafka_${version}-${version}.jar

  针对 Java Project 工程,需要自己筛选 JAR 去添加。保证代码的顺利执行。

3.2 Maven Project

  对 Maven 工程,在 pom.xml 文件中添加相应的依赖信息即可,简单方便。让 Maven 去管理相应的依赖 JAR 文件。内容如下所示:

复制代码
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.8.2.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
    </exclusion>
    <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
    </exclusion>
    </exclusions>
</dependency>
复制代码

  这样在 Maven 工程中相应的依赖 JAR 文件就添加完成了。

3.3 代码实现

  在低级消费 API 中,实现代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
/**
  * @Date Mar 2, 2016
  *
  * @Author dengjie
  *
  * @Note Simple consumer api
  */
public  class  SimpleKafkaConsumer {
     private  static  Logger log = LoggerFactory.getLogger(SimpleKafkaConsumer. class );
     private  List<String> m_replicaBrokers =  new  ArrayList<String>();
 
     public  SimpleKafkaConsumer() {
         m_replicaBrokers =  new  ArrayList<String>();
     }
 
     public  static  void  main(String[] args) {
         SimpleKafkaConsumer example =  new  SimpleKafkaConsumer();
         // Max read number
         long  maxReads = SystemConfig.getIntProperty( "kafka.read.max" );
         // To subscribe to the topic
         String topic = SystemConfig.getProperty( "kafka.topic" );
         // Find partition
         int  partition = SystemConfig.getIntProperty( "kafka.partition" );
         // Broker node's ip
         List<String> seeds =  new  ArrayList<String>();
         String[] hosts = SystemConfig.getPropertyArray( "kafka.server.host" "," );
         for  (String host : hosts) {
             seeds.add(host);
         }
         int  port = SystemConfig.getIntProperty( "kafka.server.port" );
         try  {
             example.run(maxReads, topic, partition, seeds, port);
         catch  (Exception e) {
             log.error( "Oops:"  + e);
             e.printStackTrace();
         }
     }
 
     public  void  run( long  a_maxReads, String a_topic,  int  a_partition, List<String> a_seedBrokers,  int  a_port)
             throws  Exception {
         // Get point topic partition's meta
         PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
         if  (metadata ==  null ) {
             log.info( "[SimpleKafkaConsumer.run()] - Can't find metadata for Topic and Partition. Exiting" );
             return ;
         }
         if  (metadata.leader() ==  null ) {
             log.info( "[SimpleKafkaConsumer.run()] - Can't find Leader for Topic and Partition. Exiting" );
             return ;
         }
         String leadBroker = metadata.leader().host();
         String clientName =  "Client_"  + a_topic +  "_"  + a_partition;
 
         SimpleConsumer consumer =  new  SimpleConsumer(leadBroker, a_port,  100000 64  1024 , clientName);
         long  readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(),
                 clientName);
         int  numErrors =  0 ;
         while  (a_maxReads >  0 ) {
             if  (consumer ==  null ) {
                 consumer =  new  SimpleConsumer(leadBroker, a_port,  100000 64  1024 , clientName);
             }
             FetchRequest req =  new  FetchRequestBuilder().clientId(clientName)
                     .addFetch(a_topic, a_partition, readOffset,  100000 ).build();
             FetchResponse fetchResponse = consumer.fetch(req);
 
             if  (fetchResponse.hasError()) {
                 numErrors++;
                 // Something went wrong!
                 short  code = fetchResponse.errorCode(a_topic, a_partition);
                 log.info( "[SimpleKafkaConsumer.run()] - Error fetching data from the Broker:"  + leadBroker
                         " Reason: "  + code);
                 if  (numErrors >  5 )
                     break ;
                 if  (code == ErrorMapping.OffsetOutOfRangeCode()) {
                     // We asked for an invalid offset. For simple case ask for
                     // the last element to reset
                     readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(),
                             clientName);
                     continue ;
                 }
                 consumer.close();
                 consumer =  null ;
                 leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                 continue ;
             }
             numErrors =  0 ;
 
             long  numRead =  0 ;
             for  (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                 long  currentOffset = messageAndOffset.offset();
                 if  (currentOffset < readOffset) {
                     log.info( "[SimpleKafkaConsumer.run()] - Found an old offset: "  + currentOffset +  " Expecting: "
                             + readOffset);
                     continue ;
                 }
 
                 readOffset = messageAndOffset.nextOffset();
                 ByteBuffer payload = messageAndOffset.message().payload();
 
                 byte [] bytes =  new  byte [payload.limit()];
                 payload.get(bytes);
                 System.out.println(String.valueOf(messageAndOffset.offset()) +  ": "  new  String(bytes,  "UTF-8" ));  // Message deal enter
                 numRead++;
                 a_maxReads--;
             }
 
             if  (numRead ==  0 ) {
                 try  {
                     Thread.sleep( 1000 );
                 catch  (InterruptedException ie) {
                 }
             }
         }
         if  (consumer !=  null )
             consumer.close();
     }
 
     public  static  long  getLastOffset(SimpleConsumer consumer, String topic,  int  partition,  long  whichTime,
             String clientName) {
         TopicAndPartition topicAndPartition =  new  TopicAndPartition(topic, partition);
         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =  new  HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
         requestInfo.put(topicAndPartition,  new  PartitionOffsetRequestInfo(whichTime,  1 ));
         kafka.javaapi.OffsetRequest request =  new  kafka.javaapi.OffsetRequest(requestInfo,
                 kafka.api.OffsetRequest.CurrentVersion(), clientName);
         OffsetResponse response = consumer.getOffsetsBefore(request);
 
         if  (response.hasError()) {
             log.info( "[SimpleKafkaConsumer.getLastOffset()] - Error fetching data Offset Data the Broker. Reason: "
                     + response.errorCode(topic, partition));
             return  0 ;
         }
         long [] offsets = response.offsets(topic, partition);
         return  offsets[ 0 ];
     }
 
     /**
      * @param a_oldLeader
      * @param a_topic
      * @param a_partition
      * @param a_port
      * @return String
      * @throws Exception
      *             find next leader broker
      */
     private  String findNewLeader(String a_oldLeader, String a_topic,  int  a_partition,  int  a_port)  throws  Exception {
         for  ( int  i =  0 ; i <  3 ; i++) {
             boolean  goToSleep =  false ;
             PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
             if  (metadata ==  null ) {
                 goToSleep =  true ;
             else  if  (metadata.leader() ==  null ) {
                 goToSleep =  true ;
             else  if  (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i ==  0 ) {
                 // first time through if the leader hasn't changed give
                 // ZooKeeper a second to recover
                 // second time, assume the broker did recover before failover,
                 // or it was a non-Broker issue
                 //
                 goToSleep =  true ;
             else  {
                 return  metadata.leader().host();
             }
             if  (goToSleep) {
                 try  {
                     Thread.sleep( 1000 );
                 catch  (InterruptedException ie) {
                 }
             }
         }
         throw  new  Exception( "Unable to find new leader after Broker failure. Exiting" );
     }
 
     private  PartitionMetadata findLeader(List<String> a_seedBrokers,  int  a_port, String a_topic,  int  a_partition) {
         PartitionMetadata returnMetaData =  null ;
         loop:  for  (String seed : a_seedBrokers) {
             SimpleConsumer consumer =  null ;
             try  {
                 consumer =  new  SimpleConsumer(seed, a_port,  100000 64  1024 "leaderLookup" );
                 List<String> topics = Collections.singletonList(a_topic);
                 TopicMetadataRequest req =  new  TopicMetadataRequest(topics);
                 kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
 
                 List<TopicMetadata> metaData = resp.topicsMetadata();
                 for  (TopicMetadata item : metaData) {
                     for  (PartitionMetadata part : item.partitionsMetadata()) {
                         if  (part.partitionId() == a_partition) {
                             returnMetaData = part;
                             break  loop;
                         }
                     }
                 }
             catch  (Exception e) {
                 log.error( "Error communicating with Broker ["  + seed +  "] to find Leader for ["  + a_topic +  ", "
                         + a_partition +  "] Reason: "  + e);
             finally  {
                 if  (consumer !=  null )
                     consumer.close();
             }
         }
         if  (returnMetaData !=  null ) {
             m_replicaBrokers.clear();
             for  (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                 m_replicaBrokers.add(replica.host());
             }
         }
         return  returnMetaData;
     }
}

4.总结

  在使用 Kafka 低级消费 API 时,要明确我们所使用的业务场景,一般建议还是使用高级消费 API,除非遇到特殊需要。另外,在使用过程中,注意 Leader Broker 的处理,和 Offset 的管理。

5.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

联系方式: 
邮箱:smartloli.org@gmail.com 
Twitter: https://twitter.com/smartloli 
QQ群(Hadoop - 交流社区1): 424769183 
温馨提示:请大家加群的时候写上加群理由(姓名+公司/学校),方便管理员审核,谢谢! 

热爱生活,享受编程,与君共勉!



本文转自哥不是小萝莉博客园博客,原文链接:http://www.cnblogs.com/smartloli/,如需转载请自行联系原作者

相关文章
|
13天前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
20 4
|
13天前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
22 2
|
13天前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
37 1
|
17天前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
43 0
|
1月前
|
数据采集 消息中间件 存储
实时数据处理的终极武器:Databricks与Confluent联手打造数据采集与分析的全新篇章!
【9月更文挑战第3天】本文介绍如何结合Databricks与Confluent实现高效实时数据处理。Databricks基于Apache Spark提供简便的大数据处理方式,Confluent则以Kafka为核心,助力实时数据传输。文章详细阐述了利用Kafka进行数据采集,通过Delta Lake存储并导入数据,最终在Databricks上完成数据分析的全流程,展示了一套完整的实时数据处理方案。
60 3
|
2月前
|
消息中间件 负载均衡 Kafka
Kafka 实现负载均衡与故障转移:深入分析 Kafka 的架构特点与实践
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理和流传输设计的高性能消息系统。其核心设计注重高吞吐量、低延迟与可扩展性,并具备出色的容错能力。Kafka采用分布式日志概念,通过数据分区及副本机制确保数据可靠性和持久性。系统包含Producer(消息生产者)、Consumer(消息消费者)和Broker(消息服务器)三大组件。Kafka利用独特的分区机制实现负载均衡,每个Topic可以被划分为多个分区,每个分区可以被复制到多个Broker上,确保数据的高可用性和可靠性。
55 2
|
2月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
130 4
|
2月前
|
数据采集 消息中间件 存储
实时数据处理的终极武器:Databricks与Confluent联手打造数据采集与分析的全新篇章!
【8月更文挑战第9天】利用Databricks与Confluent打造实时数据处理方案。Confluent的Kafka负责数据采集,通过主题接收IoT及应用数据;Databricks运用Structured Streaming处理Kafka数据,并以Delta Lake存储,支持ACID事务。这套组合实现了从数据采集、存储到分析的全流程自动化,满足企业对大数据实时处理的需求。
39 3
|
2月前
|
消息中间件 数据挖掘 Kafka
揭秘数据洪流中的救世主:Confluent与Flink的实时分析奇迹!
【8月更文挑战第9天】在现代数据处理中,实时数据分析至关重要。Confluent Platform与Apache Flink的组合提供了一套高效的数据流处理方案。Confluent Platform基于Apache Kafka构建,负责数据的收集、传输及管理;而Flink则擅长实时处理这些数据流,进行复杂分析。首先需配置Confluent Platform,包括设置Zookeeper、Kafka brokers及相关服务。
53 1
|
2月前
|
消息中间件 安全 机器人
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤