kafka消费者那些事儿

简介: kafka消费者那些事儿

一、前言



消息的消费一般有两种模式,推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。kakfa采用的是拉模式,这样可以很好的控制消费速率。那么kafka消费的具体工作流程是什么样的呢?kafka的位移管理又是怎么样的呢?


二、消费者消费规则



kafka是以消费者组进行消费,一个消费者组,由多个consumer组成,他们和topic的消费规则如下:

b9f7cc5fb93e3be91ab73286204c3cf3.jpg

  • topic的一个分区只能被消费组中的一个消费者消费。
  • 消费者组中的一个消费者可以消费topic一个或者多个分区。


通过这种分组、分区的消费方式,可以提高消费者的吞吐量,同时也能够实现消息的发布/订阅模式和点对点两种模式。


三、消费者整体工作流程



消费者消费总体分为两个步骤,第一步是制定消费的方案,就是这个组下哪个消费者消费哪个分区,第二个是建立网络连接,获取消息数据。


1、制定消费方案

ac2e643c227cbec280bab0341c3afbd3.jpg


  1. 消费者consumerAconsumerB, consumerCkafka集群中的协调器coordinator发送JoinGroup的请求。coordinator主要是用来辅助实现消费者组的初始化和分区的分配。


  • coordinator老大节点选择 = groupid的hashcode值 % 50( __consumer_offsets内置主题位移的分区数量)例如: groupid的hashcode值 为1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。


  1. 选出一个 consumer作为消费中的leader,比如上图中的consumerB。
  2. 消费者leader制定出消费方案,比如谁来消费哪个分区等,有Range分区策略、RoundRobin分区策略等。
  3. 把消费方案告诉给coordinator
  4. 最后coordinator就把消费方案下发给各个consumer, 图中只画了一条线,实际上是会下发到各个consumer。


2、消费者消费细节


现在已经初始化消费者组信息,知道哪个消费者消费哪个分区,接着我们来看看消费者细节。

21e906fb2fc80f485f7b413143a831b5.jpg


消费者创建一个网络连接客户端ConsumerNetworkClient, 发送消费请求,可以进行如下配置:

  • fetch.min.bytes: 每批次最小抓取大小,默认1字节
  • fetch.max.bytes: 每批次最大抓取大小,默认50M
  • fetch.max.wait.ms:最大超时时间,默认500ms


  1. 发送请求到kafka集群
  2. 获取数据成功,会将数据保存到completedFetches队列中
  3. 消费者从队列中抓取数据,根据配置max.poll.records一次拉取数据返回消息的最大条数,默认500条。
  4. 获取到数据后,经过反序列化器、拦截器后,得到最终的消息。
  5. 最后一步是提交保存消费的位移offset,也就是这个消费者消费到什么位置了,这样下次重启也可以继续从这个位置开始消费,关于offset的管理后面详细介绍。


四、消费者分区策略



前面简单提到了消费者组初始化的时候会对分区进行分配,那么具体的分配策略是什么呢,也就是哪个消费者消费哪个分区数据?


kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。


01、Range 分区策略


  • Range分区 是对每个 topic 而言的。对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
  • 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。


如上图所示:有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。7/3 = 2 余 1 ,除不尽,那么 消费者 C0 便会多消费 1 个分区。 8/3=2余2,除不尽,那么C0和C1分别多消费一个。


这种方式容易造成数据倾斜!如果有 N 多个 topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。


02、RoundRobin 分区策略


RoundRobin 针对集群中所有topic而言,RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。

5ded8083bd764b15325118cc5e5d2838.png03、Sticky 和Cooperative Sticky分区策略


Sticky是粘性的意思,它是从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,在rebalance会尽量保持原有分配的分区不变化,这样可以节省开销。


Cooperative StickySticky类似,但是它会将原来的一次大规模rebalance操作,拆分成了多次小规模的rebalance,直至最终平衡完成,所以体验上会更好。


关于什么是rebalance继续往下看你就知道了。


五、消费者再均衡



上面也提到了rebalance,也就是再均衡。当kafka发生下面的情况会进行在均衡,也就是重新给消费者分配分区:

  • 有新的消费者加入消费组。
  • 有消费者宕机下线,消费者并不一定需要真正下线,例如遇到长时间的 GC 、网络延迟导致消费者长时间未向Group Coordinator发送心跳等情况时,GroupCoordinator 会认为消费者己下线。
  • 有消费者主动退出消费组。
  • 消费组所对应的Group Coorinator节点发生了变更。
  • 消费组内所订阅的任一主题或者主题的分区数量发生变化。


六、消费者位移offset管理



消费者需要保存当前消费到分区的什么位置了,这样哪怕消费者故障,重启后也能继续消费,这就是消费者的维护offset管理。


01、消费者位移offset存储位置


消费者位移offset存储在哪呢?

  • kafka0.9版本之前,consumer默认将offset保存在Zookeeper
  • 从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets,这样可以大量减少和zookeeper的交互。
  • __consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。


如何查看__consumer_offsets主题内容?


  • 在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false
  • 查看消费者消费主题__consumer_offsets
bin/kafka-console-consumer.sh --topic 
__consumer_offsets --bootstrap-server hadoop102:9092 --
consumer.config config/consumer.properties --formatter 
"kafka.coordinator.group.GroupMetadataManager$OffsetsMessageForm
atter" --from-beginning
## topic1 1号分区
[offset,topic1,1]::OffsetAndMetadata(offset=7, 
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, 
expireTimestamp=None)
## topic1 0号分区
[offset,topic1,0]::OffsetAndMetadata(offset=8, 
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, 
expireTimestamp=None)


02、消费者位移offset提交保存模式


消费者是如何提交保存位移offset呢?


1、自动提交


为了使我们能够专注于自己的业务逻辑,kafka默认提供了自动提交offset的功能。这个由消费者客户端参数 enable.auto.commit 配置, 默认值为 true 。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms 配置,默认值为 5 秒。

f3689d1874f70bd6be10c27d1e8c15c4.png

  • 消费者每隔 5 秒会将拉取到的每个分区中最大的消息位移进行提交。
  • 自动位移提交 的动作是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。


自动提交会带来什么问题?


自动提交消费位移的方式非常简便,但会带来是重复消费的问题。

2e4efed4a97f2535edbb0de51b877530.png

假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象。


我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样 并不能避免重复消费的发送,而且也会使位移提交更加频繁。


2、手动提交


很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更 加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费。手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。

// 是否自动提交 offset
 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);


手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()commitAsync()两种类型的方法。


  • 同步提交方式

同步提交会阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败),它必须等待offset提交完毕,再去消费下一批数据。

// 同步提交 offset
consumer.commitSync();


  • 异步提交方式

异步提交则没有失败重试机制,故有可能提交失败。它发送完提交offset请求后,就开始消费下一批数据了。

1. // 异步提交 offset
2. consumer.commitAsync();


那么手动提交会带来什么问题呢?可能会出现"漏消息"的情况。

e6d94cc7c4913bf97a4742514874b49e.png

设置offset为手动提交,当offset被提交时,数据还在内存中未落盘,此时刚好消费者线程被kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。


我们可以通过消费者事物来解决这样的问题。


其实无论是手动提交还是自动提交,都有可能出现消息重复和是漏消息,与我们的编程模型有关,需要我们开发的时候根据消息的重要程度来选择合适的消费方案。


七、消费者API



一个正常的消费逻辑需要具备以下几个步骤:

(1)配置消费者客户端参数及创建相应的消费者实例;

(2)订阅主题;

(3)拉取消息并消费;

(4)提交消费位移 offset

(5)关闭消费者实例。

public class MyConsumer { 
    public static void main(String[] args) { 
        Properties props = new Properties(); 
        // 定义 kakfa 服务的地址,不需要将所有 broker 指定上 
        props.put("bootstrap.servers", "doitedu01:9092"); 
        // 制定 consumer group 
        props.put("group.id", "g1"); 
        // 是否自动提交 offset 
        props.put("enable.auto.commit", "true"); 
        // 自动提交 offset 的时间间隔 
        props.put("auto.commit.interval.ms", "1000");
        // key 的反序列化类 
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        // value 的反序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        // 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, none 
        props.put("auto.offset.reset","earliest");
      // 定义 consumer 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
        // 消费者订阅的 topic, 可同时订阅多个 
        consumer.subscribe(Arrays.asList("first", "test","test1"));
        while (true) { 
            // 读取数据,读取超时时间为 100ms 
            ConsumerRecords<String, String> records = consumer.poll(100); 
            for (ConsumerRecord<String, String> record : records) 
              System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 
        } 
  } 
}


1、订阅主题

  • 指定集合方式订阅主题
1. consumer.subscribe(Arrays.asList(topicl )); 
2. consumer subscribe(Arrays.asList(topic2))


  • 正则方式订阅主题

如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅, 在之后的过程中,如果 有人又创建了新的主题,并且主题名字与正表达式相匹配,那么这个消费者就可以消费到 新添加的主题中的消息。

consumer.subscribe(Pattern.compile ("topic.*" ));


  • 订阅主题指定分区

消费者不仅可以通过 KafkaConsumer.subscribe()方法订阅主题,还可直接订阅某些主题的指定分区。

consumer.assign(Arrays.asList(new TopicPartition ("tpc_1" , 0),new TopicPartition(“tpc_2”,1))) ;


2、取消订阅

通过unsubscribe()方法采取消主题的订阅。

consumer.unsubscribe();


3、poll()拉取消息

kafka 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用 poll() 方法, poll()方法返回的是所订阅的主题(分区)上的一组消息。


对于 poll () 方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空。

public ConsumerRecords<K, V> poll(final Duration timeout)


超时时间参数 timeout ,用来控制 poll() 方法的阻塞时间,在消费者的缓冲区里没有可用数据时会发生阻塞。


4、指定位移消费


有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer 中的 seek( 方法正好提供了这个功能,让我们可以追前消费或回溯消费。

public void seek(TopicPartiton partition,long offset)


八、消费者重要参数



最后我们总结一下消费者中重要的参数配置。


image.png


九、总结



kafka消费是很重要的一个环节,本文总结kafka消费者的一些重要机制,包括消费者的整个流程,消费的分区策略,消费的再平衡以及消费的位移管理。在明白这些机制以后,简单讲解了如何使用消费者consumer的API以及消费者中重要的参数。


本文来自博客园,作者:JAVA旭阳

原文链接:https://www.cnblogs.com/alvinscript/p/17448122.html

相关文章
|
23天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
54 2
|
3月前
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
136 62
|
3月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
133 58
|
1月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
35 1
|
5月前
|
消息中间件 分布式计算 Kafka
Kafka(四)【Kafka 消费者】(4)
Kafka(四)【Kafka 消费者】
|
3月前
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
93 3
|
3月前
|
图形学 C# 开发者
全面掌握Unity游戏开发核心技术:C#脚本编程从入门到精通——详解生命周期方法、事件处理与面向对象设计,助你打造高效稳定的互动娱乐体验
【8月更文挑战第31天】Unity 是一款强大的游戏开发平台,支持多种编程语言,其中 C# 最为常用。本文介绍 C# 在 Unity 中的应用,涵盖脚本生命周期、常用函数、事件处理及面向对象编程等核心概念。通过具体示例,展示如何编写有效的 C# 脚本,包括 Start、Update 和 LateUpdate 等生命周期方法,以及碰撞检测和类继承等高级技巧,帮助开发者掌握 Unity 脚本编程基础,提升游戏开发效率。
82 0
|
4月前
|
消息中间件 存储 负载均衡
深入理解Kafka核心设计及原理(三):消费者
深入理解Kafka核心设计及原理(三):消费者
90 8
|
4月前
|
消息中间件 存储 Kafka
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
58 0
|
4月前
|
消息中间件 存储 资源调度
实时计算 Flink版产品使用问题之在消费Kafka的Avro消息,如何配置FlinkKafka消费者的相关参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。