玩转kafka中的消费者

简介:

上一篇介绍了如何使用Kafka的生产者,这一篇将介绍在实际生产中如何合理地使用Kafka的消费者API。

Kafka中消费者API分为新版和旧版,本章只介绍新版,旧版的就不做介绍了。

首发于我的个人博客:http://www.janti.cn/article/kafkaconsumer

准备工作

kafka版本:2.11-1.1.1

操作系统:centos7

java:jdk1.8

有了以上这些条件就OK了,具体怎么安装和启动Kafka这里就不强调了,可以看上一篇文章。

新建一个maven工程,需要的依赖如下:

 
<dependency>  
    <groupId>org.apache.kafkagroupId>  
    <artifactId>kafka_2.11artifactId>  
    <version>1.1.1version>  
dependency>  

<dependency>  
    <groupId>org.apache.kafkagroupId>  
    <artifactId>kafka-clientsartifactId> <version>1.1.1version> dependency>

简单的消费者例子

Kafka中是封装了KafkaConsumer类,消息接受都是通过该类来进行的。与生产者一样,在实例化消费者之前都是要进行配置的。

首先介绍这里面的配置:
  • bootstrap.servers:配置连接代理列表,不必包含Kafka集群的所有代理地址,当连接上一个代理后,会从集群元数据信息中获取其他存活的代理信息。但为了保证能够成功连上Kafka集群,在多代理集群的情况下,建议至少配置两个代理。(由于电脑配置有限,本文实验的是单机情况)
  • key.deserializer : 用于反序列化消息Key的类
  • value.deserializer :用于反序列化消息值(Value)的类
  • group.id:指定消费者所在的组
  • client.id:指定客户端所在组的ID
  • enable.auto.commit:设置是否自动提交。在没有指定消费偏移量提交方式时,默认是每隔1S发送一次
  • auto.commit.interval.ms:自动提交偏移值的时间间
订阅消息的流程分为以下:   1.消费者参数配置   2.实例化消费者   3.订阅主题   4.从Kafka中拉取消息   按照这样的流程代码如下:
 
package kafka.consumer;  

import org.apache.kafka.clients.consumer.ConsumerRecord;  
import org.apache.kafka.clients.consumer.ConsumerRecords;  
import org.apache.kafka.clients.consumer.KafkaConsumer;  

import java.util.Arrays;  
import java.util.Properties;  

/**  
 * 简单的消费者  
  *  
 * @author tangj  
 */ public class KafkaSimpleDemo {  
    static Properties properties = new Properties();  

    private static String topic = "MyOrder";  

    //poll超时时间  
  private static long pollTimeout = 2000;  

    // 1.消费者配置  
  static {  
        // bootstarp server 地址  
  properties.put("bootstrap.servers", "10.0.90.53:9092");  
        // group.id指定消费者,所在的组  
  properties.put("group.id", "order");  
        // 组中client ID名称  
  properties.put("client.id", "consumer");  

        // 在没有指定消费偏移量提交方式时,默认是每个1s提交一次偏移量,可以通过auto.commit.interval.ms参数指定提交间隔  
  // 自动提交要设置成true  
 // 手动提交设置成false  
  properties.put("enable.auto.commit", true);  
        // 自动提交偏移值的 时间间隔  
  properties.put("auto.commit.interval.ms", 2000);  

        // key序列化  
  properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
        // value序列化  
  properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
    }  

    public static void main(String args[]) {  
        consumeAutoCommit();  
    }  

    /**  
 * 消费消息自动提交偏移值  
  */  
  public static void consumeAutoCommit() {  
        //2\. 实例化消费者  
  KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);  
        // 3.订阅主题  
  kafkaConsumer.subscribe(Arrays.asList(topic));  

        try {  
            // 消费者是一个长期的过程,所以使用永久循环,  
  while (true) {  
                // 4.拉取消息  
  ConsumerRecords records = kafkaConsumer.poll(pollTimeout);  
                for (ConsumerRecord record : records) {  
                    System.out.println("消息总数为: " + records.count());  
                    System.out.println("收到消息: " + String.format("partition = %d, offset= %d, key=%s, value=%s%n",  
                            record.partition(), record.offset(), record.key(), record.value()));  
                }  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            kafkaConsumer.close();  
        }  
    }  
}  
看了简单的消费者模式,来看看消费者中的其他知识:消费模型,分区均衡,偏移量的提交,多线程消费者。

消费模型

可以看到在配置的时候,配置了消费者组这个变量。这里主要讲讲消费模型。消费模型主要分为两种:消费者组模型,发布订阅模型   消费者组中有多个消费者,组内的消费者共享一个group.id,并且有一个单独的client.id。消费者组用来消费一个主题下的所有分区,但是每个分区只能由该组内的一个消费者消费,不会被重复消费。   发布订阅模型就是所有的消费者都可以通过订阅来获取Kafka中的消息。

分区再平衡

介绍一种情况,消费者并不是越多越好的,当消费者大于分区数时,就会有部分消费者一直空闲着。   分区再平衡,即:parition rebanlance。总的来说分区再平衡就是一个消费者原来消费的分区变成由其他消费者消费,它只发生在消费者组中。   再均衡的作用就是为了保证消费者组的高可用和伸缩性,但是再均衡期间消费者会无法读取消息,有短暂的暂停时间。

偏移量的处理

偏移量的左右就是记录已经消费的消息。之前的一个简单的demo演示了自动提交的方式,接下来介绍手动提交。   在实际生产中,消费者拉取到消息之后会进行一些业务处理,比如存到数据库,写入缓存,网络请求等,这些都会有失败的可能,所以要对偏移值进行更精细的控制。   手动提交有两种方式: 第一种是同步提交,同步提交是阻塞的,对于提交失败的处理,它会一直提交,直到提交成功。   第二种是异步提交,异步提交是非阻塞的,对于提交失败,也不会重新提交。   当然,对于手动提交的业务设计,还是要结合具体业务进行考虑和设计。手动提交之后,需要设置 enable.auto.commit为false.并且不需要设置 auto.commit.interval.ms。

下面给出一个自动提交的例子,设置没消费5次,提交一次偏移值:



public static void consumeHandleCommit() {
KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
try {
int maxcount = 5;
int count = 0;
kafkaConsumer.subscribe(Arrays.asList(topic));
for (; ; ) {
// 拉取消息
ConsumerRecords records = kafkaConsumer.poll(polltimeout);
for (ConsumerRecord record : records) {
System.out.println("收到消息: " + String.format("partition = %d, offset= %d, key=%s, value=%s%n",
record.partition(), record.offset(), record.key(), record.value()));
count++;
}
// 业务逻辑完成后,提交偏移量
if (count >= maxcount) {
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map offsets, Exception exception) {
if (null != exception) {
exception.printStackTrace();
} else {
System.out.println("偏移值提交成功");
}
}
});
count = 0;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}




多线程消费者

单线程的消费者效率肯定是低于多线程的消费者的,但是消费者的多线程设计与生产者不同,KafkaConsumer是非线程安全的。

保证线程安全,每个线程,各自实例化一个KafkaConsumer对象,并且多个消费者线程只消费同一个主题,不考虑多个消费者线程消费同一个分区。

线程实体类:

package kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

public class KafkaConsumerThread implements Runnable {

//每个线程私有一个consumer实例

private KafkaConsumer<String,String> consumer;

public KafkaConsumerThread(Map<String,Object> configMap,String topic) {
Properties properties = new Properties();
properties.putAll(configMap);
this.consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList(topic));
}

@Override
public void run() {

try {
for (; ; ) {
// 拉取消息
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("收到消息: " + String.format("partition = %d, offset= %d, key=%s, value=%s%n",
record.partition(), record.offset(), record.key(), record.value()));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}

线程启动类:

package kafka.consumer;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class KafkaConsumerExcutor {


/**
* kafkaConsumer是非线程安全的,处理好多线程同步的方案是
* 每个线程实例化一个kafkaConsumer对象
*/
public static void main(String args[]) {

String topic = "hello";

Map<String, Object> configMap = new HashMap<>();
configMap.put("bootstrap.servers", "10.0.90.53:9092");
//group.id指定消费者,所在的组,保证所有线程都在一个消费者组
configMap.put("group.id", "test");
configMap.put("enable.auto.commit", true);
configMap.put("auto.commit.interval.ms", 1000);

// key序列化
configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value序列化
configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

ExecutorService service = Executors.newFixedThreadPool(6);
// 该主题总共有6个分区,那么保证6个线程
for (int i = 0; i < 6; i++) {
service.submit(new KafkaConsumerThread(configMap, topic));
}
}
}


总结

本文介绍了kafka中的消费者,包括多线程消费者,偏移量的处理,以及分区再平衡。

切记一点,实际的生产中消费者需要根据实际业务来进行设计。

相关文章
|
1月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
85 2
|
4月前
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
143 62
|
4月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
140 58
|
2月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
47 1
|
6月前
|
消息中间件 分布式计算 Kafka
Kafka(四)【Kafka 消费者】(4)
Kafka(四)【Kafka 消费者】
|
4月前
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
139 3
|
5月前
|
消息中间件 存储 负载均衡
深入理解Kafka核心设计及原理(三):消费者
深入理解Kafka核心设计及原理(三):消费者
101 8
|
4月前
|
图形学 C# 开发者
全面掌握Unity游戏开发核心技术:C#脚本编程从入门到精通——详解生命周期方法、事件处理与面向对象设计,助你打造高效稳定的互动娱乐体验
【8月更文挑战第31天】Unity 是一款强大的游戏开发平台,支持多种编程语言,其中 C# 最为常用。本文介绍 C# 在 Unity 中的应用,涵盖脚本生命周期、常用函数、事件处理及面向对象编程等核心概念。通过具体示例,展示如何编写有效的 C# 脚本,包括 Start、Update 和 LateUpdate 等生命周期方法,以及碰撞检测和类继承等高级技巧,帮助开发者掌握 Unity 脚本编程基础,提升游戏开发效率。
111 0
|
5月前
|
消息中间件 存储 Kafka
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
63 0
|
5月前
|
消息中间件 存储 资源调度
实时计算 Flink版产品使用问题之在消费Kafka的Avro消息,如何配置FlinkKafka消费者的相关参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章