Consumer位移管理-Kafka从入门到精通(十一)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Consumer位移管理-Kafka从入门到精通(十一)

上篇文章说了,sesstion.time.out 、max.poll.interval.ms、max.poll.records和auto.offset.reset等参数。

KafkaConsumer-Kafka从入门到精通(十)


订阅topic


订阅consumer直接:

Consumer.subscribe(Arrays.asList(“topic1”,“topic2”));

如果使用独立的consumer(standalone consumer),则可以手动订阅,

TopicPartition p1 = new TopicPartition(“topic-name”,0);
Consumer.assign(Arrays.asList(p1));


不管用哪种方法,consumer订阅都是延迟生效的,订阅的消息只有在下次poll调用的时候才会生效。


消息轮询


Poll原理

consumer是用来读取消息的,而且要能够同时读取多个topic的多个分区消息。若要实现并行读取消息,一种方式使用多线程方式,为每个要读取的分区都要创建一个专有线程去消费(这就是旧版本cousumer采用的方式),另一种方法采用linuxI/O模型的poll或者select等,使用一个线程同时管理多个socket连接,即同时与多个broker通信实现并行读取。

一旦consumer订阅了topic,所有的消费逻辑包括coordinator的协调,消费者组的rebalance以及数据的获取会在主逻辑poll方法中一次调用中被执行,这样用户很容易使用一个线程来管理所有的cousumerIO。

对于问题,consumer到底是单线程还是多线程呢?

最新版的kafka是一个多线程或者双线程的java进程,创建kafkaConsumer的称为主线程,同时在后台创建一个心跳线程,该线程被称呼为后台心跳线程。

kafkaConsumer的poll方法在用户主线程中运行,这同时也表明:消费者组的rebalance、消息获取、coordinator管理、异步任务结果的处理、位移提交等操作这些都在主线程中的,因此仔细调优参数至关重要。


Poll使用方法

Consumer订阅topic之后通常以事件循环的方法来获取消息读取,poll方法根据当前consumer的消费位移返回消息集合。当poll首次被调用的时候,新的消费者组会根据位移重设策略(auto.offset.reset)来设定消费者组的位移,一旦consumer开始提交位移,后续的rebalance完成后会将位置设置为上次已提交的位移。传递给poll方法的超时设置参数用于控制consumer等待消息的最大阻塞时间,比如consumer至少需要1M的数据,那么此刻consumer会以阻塞不断累计数据到满足1M,如果不想让consumer一直阻塞,则可以给一个过期时间,一定时间内如果还没有满足,则返回。

1、要么等数据满了。

2、要么等待时间超过了指定时间。

前面我们说了consumer是单线程设计(其实还有一个心跳线程,辅助线程看主线程是否保持心跳,暂不考虑,不承担逻辑),因此consumer应该运行在他的专属线程中。新版本的java consumer不是线程安全的,如果没有显式的同步锁机制保护,kafka会抛出kafkaConsumer is not safe for multi-threaded access

的异常,如果看到了这样的报错,那么说明kafkaConsumer运用在多线程中,对于目前的kafka设计而言,是不被允许的。


我们可以在while条件指定一个布尔变量isRunning来标识是否需要退出consumer消费循环并且结束consumer应用。具体应该是将isRunning标识为volatile,然后其他线程用isRunning=false来控制线程结束。最后千万不要忘记关闭consumer,这不仅会清楚consumer创建的各种socker资源,还会通知消费者coordinator主动离开从而更快的rebalance。比较推荐在finally代码里显式关闭。


位移管理


Consumer位移

Consumer端要为每个它读取的分区保存消费进度,即分区中最新消费消息的位置,该位置就是offset。Consumer需要定期向kafka提交自己的位置信息,实际上,这个信息通常是下一条带消费消息的位置。假设consumer已经读取了某个分区第n条消息,那么他应该提交位移为N,因为位移是从0开始,位移n的位子是n+1条消息。这样conusmer重启时会从第n+1条开始消费。

Offset对于consumer非常重要,因为他们是实现消息交付语义保证(message delivery semantic)的基石,常见的3中消息交付语义保证。

1、最多一次(at most once)处理语义:消息可能丢失,但不会被重复处理。

2、最少一次(at least once)处理语义:消息不会丢失,但可能处理多次。

3、精确一次(exactly)处理语义:消息一定会被处理且只会处理一次。


显然,若consumer在消费之前就提交位移,那么多在位移提交完成之后,消费还未消费就崩溃了,这时候consumer重启,则会从新的位移开始消费,则这个已提交的位移会丢失。相反的,若consumer在消费之后再提交,则可以实现at least once。好消息是这个出现多次处理的情况,已经在kafka0.11.0.0版本得到解决。


上次提交位移(last committed offset):consumer最近一次提交的offset值。

当前位置(current position):consumer已读取但尚未提交的位置。

水位(watermark):也被称为高水位(high watermark),严格来说他不属于conusmer管理范围,而属于分区日志概念,consumer可以读取水位之下的所有消息,水位之上的则不可以读取。


日志终端位移(log end offset,leo):日志的最新位移,同样不属于consumer范畴,而属于分区日志管辖。它表示了某个分区副本当前保存消息对应最大的位移值。值得注意的是,正常情况下leo不会比水位值小。事实上,只有分区所有副本都保存某条消息,该分区leader副本才会向上移动水位值。


版本版consumer位移管理

consumer会在kafka集群所有broker里选一个broker作为consumer group的coordinator,用于实现组成员管理,消费分配方案,位移提交等。和普通的kafka topic相同,该topic配置多个分区,每个分区有多个副本。位移存在的目的就是保存consumer提交的位移。

当消费者组首次启动时,由于没有初识位移信息,coordinator必须为其确定初始位移值,这就是consumer参数auto.offset.reset的作用。通常consumer要么从最早位移开始读取,要么从最新位移开始读取。

Consumer提交位移主要机制通过向所属coordinator发送位移提交请求实现的,每个位移提交都会往_consumer_offsets对应分区上追加一条消息。消息的key是groupid、topic等,而value就是位移值,如果consumer为同一个group的同一个topic分区提交多次位移,那么就会存在多条key相同但value不同的消息,显然我们只关心最新一条。


自动提交和手动提交

位移提交策略对提供消费交付语义至关重要,默认情况下consumer自动提交间隔是5s、这就是说若不做特定设置,consumer可以通过参数auto.commit.interval.ms参数可以控制自动提交间隔。

自动提交位移的优势是降低用户开发成本使得用户不比处理位移提交,劣势用户不能细颗粒度的处理位移提交,特别是强调精确一次处理语义时,这种情况下,用户可以手动位移提交。

典型的consumer场景,用户需要对poll方法返回的消息集合中消息执行业务处理,用户想要保证只有消息被真正处理才去提交位移,如果自动提交则无法保证这种位移时序性,因此这种情况下必须手动提交位移。在构建kafkaConsumer时设置enable.auto.commit=false,然后调用conmmitSync或commitAsync方法即可。


自动提交:默认配置,enable.auto.commit=true。开发简单,无法实现精确控制,位移提交失败后不易处理。可能造成数据丢失,最多实现“最少一次”处理语义。能容忍一定消息丢失。

自动提交:设置enable.autocommit=false。手动调用consumer.commitSync或

consumer.commitAsync位移提交,可以实现“最少一次”处理,依赖外部可以实现“精确一次”处理语义。


手动提交分为异步commitAsync和同步commitSync,如果调用commitSync,用户程序会等待位移提交结束才执行下一条语句。若调用commitAsync则是一个异步阻塞调用,comsumer会在后续poll轮询该位移结果。这里的异步不是指consumer单独的线程进行位移提交,实际上consumer依然会在主线程poll方法中不断轮询这次异步提交结果,只是该提交不会让这个方法阻塞。


当这个无参数的时候,conmmitSync和commitAsync在调用的时候,都会为他订阅的所有分区进行位移提交。其实他还带另外两个参数的重载方式,用户调用这个方法的时候,需要显式指定一个map告诉kafka哪些分区做提交更为合理。实际使用中这种更加合适,因为consumer只对他所拥有的分区进行提交更为合理。

    ConsumerRecords<String, String> records= consumer.poll(lOOO); 
    for (TopicParti tion partition : records.partitions()) { 
    List<ConsumerRecord<String, String partit onRecords = 
                    records.records(partition); 
    for (ConsumerRecord<String, String> record : partitionRecords) { 
            System.out.println(record.offset ()+”:”+ record.value());
        }
    long lastOffset = parti tionRecords. get (partitionRecords. 
        size() - 1) . offset() ; 
        consumer.commitSync(Collections singletonMap(part tion, new 
        OffsetAndMetadata(lastOffset + 1)));

这里特别需要注意的是,提交的位移一定是consumer下一条待读取消息的位移,这也就是为什么offset+1的原因。


相关文章
|
21天前
|
消息中间件 Kafka
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
|
2月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
81 4
|
3月前
|
消息中间件 安全 大数据
Kafka多线程Consumer是实现高并发数据处理的有效手段之一
【9月更文挑战第2天】Kafka多线程Consumer是实现高并发数据处理的有效手段之一
334 4
|
4月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
116 4
|
4月前
|
开发者 图形学 前端开发
绝招放送:彻底解锁Unity UI系统奥秘,五大步骤教你如何缔造令人惊叹的沉浸式游戏体验,从Canvas到动画,一步一个脚印走向大师级UI设计
【8月更文挑战第31天】随着游戏开发技术的进步,UI成为提升游戏体验的关键。本文探讨如何利用Unity的UI系统创建美观且功能丰富的界面,包括Canvas、UI元素及Event System的使用,并通过具体示例代码展示按钮点击事件及淡入淡出动画的实现过程,助力开发者打造沉浸式的游戏体验。
116 0
|
4月前
|
消息中间件 Java Kafka
【Azure 事件中心】在Windows系统中使用 kafka-consumer-groups.bat 查看Event Hub中kafka的consumer groups信息
【Azure 事件中心】在Windows系统中使用 kafka-consumer-groups.bat 查看Event Hub中kafka的consumer groups信息
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
103 1
|
2月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
54 1
|
4月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
338 9
下一篇
DataWorks