Consumer位移管理-Kafka从入门到精通(十一)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Consumer位移管理-Kafka从入门到精通(十一)

上篇文章说了,sesstion.time.out 、max.poll.interval.ms、max.poll.records和auto.offset.reset等参数。

KafkaConsumer-Kafka从入门到精通(十)


订阅topic


订阅consumer直接:

Consumer.subscribe(Arrays.asList(“topic1”,“topic2”));

如果使用独立的consumer(standalone consumer),则可以手动订阅,

TopicPartition p1 = new TopicPartition(“topic-name”,0);
Consumer.assign(Arrays.asList(p1));


不管用哪种方法,consumer订阅都是延迟生效的,订阅的消息只有在下次poll调用的时候才会生效。


消息轮询


Poll原理

consumer是用来读取消息的,而且要能够同时读取多个topic的多个分区消息。若要实现并行读取消息,一种方式使用多线程方式,为每个要读取的分区都要创建一个专有线程去消费(这就是旧版本cousumer采用的方式),另一种方法采用linuxI/O模型的poll或者select等,使用一个线程同时管理多个socket连接,即同时与多个broker通信实现并行读取。

一旦consumer订阅了topic,所有的消费逻辑包括coordinator的协调,消费者组的rebalance以及数据的获取会在主逻辑poll方法中一次调用中被执行,这样用户很容易使用一个线程来管理所有的cousumerIO。

对于问题,consumer到底是单线程还是多线程呢?

最新版的kafka是一个多线程或者双线程的java进程,创建kafkaConsumer的称为主线程,同时在后台创建一个心跳线程,该线程被称呼为后台心跳线程。

kafkaConsumer的poll方法在用户主线程中运行,这同时也表明:消费者组的rebalance、消息获取、coordinator管理、异步任务结果的处理、位移提交等操作这些都在主线程中的,因此仔细调优参数至关重要。


Poll使用方法

Consumer订阅topic之后通常以事件循环的方法来获取消息读取,poll方法根据当前consumer的消费位移返回消息集合。当poll首次被调用的时候,新的消费者组会根据位移重设策略(auto.offset.reset)来设定消费者组的位移,一旦consumer开始提交位移,后续的rebalance完成后会将位置设置为上次已提交的位移。传递给poll方法的超时设置参数用于控制consumer等待消息的最大阻塞时间,比如consumer至少需要1M的数据,那么此刻consumer会以阻塞不断累计数据到满足1M,如果不想让consumer一直阻塞,则可以给一个过期时间,一定时间内如果还没有满足,则返回。

1、要么等数据满了。

2、要么等待时间超过了指定时间。

前面我们说了consumer是单线程设计(其实还有一个心跳线程,辅助线程看主线程是否保持心跳,暂不考虑,不承担逻辑),因此consumer应该运行在他的专属线程中。新版本的java consumer不是线程安全的,如果没有显式的同步锁机制保护,kafka会抛出kafkaConsumer is not safe for multi-threaded access

的异常,如果看到了这样的报错,那么说明kafkaConsumer运用在多线程中,对于目前的kafka设计而言,是不被允许的。


我们可以在while条件指定一个布尔变量isRunning来标识是否需要退出consumer消费循环并且结束consumer应用。具体应该是将isRunning标识为volatile,然后其他线程用isRunning=false来控制线程结束。最后千万不要忘记关闭consumer,这不仅会清楚consumer创建的各种socker资源,还会通知消费者coordinator主动离开从而更快的rebalance。比较推荐在finally代码里显式关闭。


位移管理


Consumer位移

Consumer端要为每个它读取的分区保存消费进度,即分区中最新消费消息的位置,该位置就是offset。Consumer需要定期向kafka提交自己的位置信息,实际上,这个信息通常是下一条带消费消息的位置。假设consumer已经读取了某个分区第n条消息,那么他应该提交位移为N,因为位移是从0开始,位移n的位子是n+1条消息。这样conusmer重启时会从第n+1条开始消费。

Offset对于consumer非常重要,因为他们是实现消息交付语义保证(message delivery semantic)的基石,常见的3中消息交付语义保证。

1、最多一次(at most once)处理语义:消息可能丢失,但不会被重复处理。

2、最少一次(at least once)处理语义:消息不会丢失,但可能处理多次。

3、精确一次(exactly)处理语义:消息一定会被处理且只会处理一次。


显然,若consumer在消费之前就提交位移,那么多在位移提交完成之后,消费还未消费就崩溃了,这时候consumer重启,则会从新的位移开始消费,则这个已提交的位移会丢失。相反的,若consumer在消费之后再提交,则可以实现at least once。好消息是这个出现多次处理的情况,已经在kafka0.11.0.0版本得到解决。


上次提交位移(last committed offset):consumer最近一次提交的offset值。

当前位置(current position):consumer已读取但尚未提交的位置。

水位(watermark):也被称为高水位(high watermark),严格来说他不属于conusmer管理范围,而属于分区日志概念,consumer可以读取水位之下的所有消息,水位之上的则不可以读取。


日志终端位移(log end offset,leo):日志的最新位移,同样不属于consumer范畴,而属于分区日志管辖。它表示了某个分区副本当前保存消息对应最大的位移值。值得注意的是,正常情况下leo不会比水位值小。事实上,只有分区所有副本都保存某条消息,该分区leader副本才会向上移动水位值。


版本版consumer位移管理

consumer会在kafka集群所有broker里选一个broker作为consumer group的coordinator,用于实现组成员管理,消费分配方案,位移提交等。和普通的kafka topic相同,该topic配置多个分区,每个分区有多个副本。位移存在的目的就是保存consumer提交的位移。

当消费者组首次启动时,由于没有初识位移信息,coordinator必须为其确定初始位移值,这就是consumer参数auto.offset.reset的作用。通常consumer要么从最早位移开始读取,要么从最新位移开始读取。

Consumer提交位移主要机制通过向所属coordinator发送位移提交请求实现的,每个位移提交都会往_consumer_offsets对应分区上追加一条消息。消息的key是groupid、topic等,而value就是位移值,如果consumer为同一个group的同一个topic分区提交多次位移,那么就会存在多条key相同但value不同的消息,显然我们只关心最新一条。


自动提交和手动提交

位移提交策略对提供消费交付语义至关重要,默认情况下consumer自动提交间隔是5s、这就是说若不做特定设置,consumer可以通过参数auto.commit.interval.ms参数可以控制自动提交间隔。

自动提交位移的优势是降低用户开发成本使得用户不比处理位移提交,劣势用户不能细颗粒度的处理位移提交,特别是强调精确一次处理语义时,这种情况下,用户可以手动位移提交。

典型的consumer场景,用户需要对poll方法返回的消息集合中消息执行业务处理,用户想要保证只有消息被真正处理才去提交位移,如果自动提交则无法保证这种位移时序性,因此这种情况下必须手动提交位移。在构建kafkaConsumer时设置enable.auto.commit=false,然后调用conmmitSync或commitAsync方法即可。


自动提交:默认配置,enable.auto.commit=true。开发简单,无法实现精确控制,位移提交失败后不易处理。可能造成数据丢失,最多实现“最少一次”处理语义。能容忍一定消息丢失。

自动提交:设置enable.autocommit=false。手动调用consumer.commitSync或

consumer.commitAsync位移提交,可以实现“最少一次”处理,依赖外部可以实现“精确一次”处理语义。


手动提交分为异步commitAsync和同步commitSync,如果调用commitSync,用户程序会等待位移提交结束才执行下一条语句。若调用commitAsync则是一个异步阻塞调用,comsumer会在后续poll轮询该位移结果。这里的异步不是指consumer单独的线程进行位移提交,实际上consumer依然会在主线程poll方法中不断轮询这次异步提交结果,只是该提交不会让这个方法阻塞。


当这个无参数的时候,conmmitSync和commitAsync在调用的时候,都会为他订阅的所有分区进行位移提交。其实他还带另外两个参数的重载方式,用户调用这个方法的时候,需要显式指定一个map告诉kafka哪些分区做提交更为合理。实际使用中这种更加合适,因为consumer只对他所拥有的分区进行提交更为合理。

    ConsumerRecords<String, String> records= consumer.poll(lOOO); 
    for (TopicParti tion partition : records.partitions()) { 
    List<ConsumerRecord<String, String partit onRecords = 
                    records.records(partition); 
    for (ConsumerRecord<String, String> record : partitionRecords) { 
            System.out.println(record.offset ()+”:”+ record.value());
        }
    long lastOffset = parti tionRecords. get (partitionRecords. 
        size() - 1) . offset() ; 
        consumer.commitSync(Collections singletonMap(part tion, new 
        OffsetAndMetadata(lastOffset + 1)));

这里特别需要注意的是,提交的位移一定是consumer下一条待读取消息的位移,这也就是为什么offset+1的原因。


相关文章
|
1月前
|
消息中间件 安全 大数据
Kafka多线程Consumer是实现高并发数据处理的有效手段之一
【9月更文挑战第2天】Kafka多线程Consumer是实现高并发数据处理的有效手段之一
97 4
|
2月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
51 4
|
2月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
38 2
|
2月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
87 4
|
2月前
|
开发者 图形学 前端开发
绝招放送:彻底解锁Unity UI系统奥秘,五大步骤教你如何缔造令人惊叹的沉浸式游戏体验,从Canvas到动画,一步一个脚印走向大师级UI设计
【8月更文挑战第31天】随着游戏开发技术的进步,UI成为提升游戏体验的关键。本文探讨如何利用Unity的UI系统创建美观且功能丰富的界面,包括Canvas、UI元素及Event System的使用,并通过具体示例代码展示按钮点击事件及淡入淡出动画的实现过程,助力开发者打造沉浸式的游戏体验。
44 0
|
2月前
|
消息中间件 Java Kafka
【Azure 事件中心】在Windows系统中使用 kafka-consumer-groups.bat 查看Event Hub中kafka的consumer groups信息
【Azure 事件中心】在Windows系统中使用 kafka-consumer-groups.bat 查看Event Hub中kafka的consumer groups信息
|
2月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
86 9
|
2月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
59 3
|
2月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
69 0
下一篇
无影云桌面