上篇文章说了,sesstion.time.out 、max.poll.interval.ms、max.poll.records和auto.offset.reset等参数。
订阅topic
订阅consumer直接:
Consumer.subscribe(Arrays.asList(“topic1”,“topic2”));
如果使用独立的consumer(standalone consumer),则可以手动订阅,
TopicPartition p1 = new TopicPartition(“topic-name”,0); Consumer.assign(Arrays.asList(p1));
不管用哪种方法,consumer订阅都是延迟生效的,订阅的消息只有在下次poll调用的时候才会生效。
消息轮询
Poll原理
consumer是用来读取消息的,而且要能够同时读取多个topic的多个分区消息。若要实现并行读取消息,一种方式使用多线程方式,为每个要读取的分区都要创建一个专有线程去消费(这就是旧版本cousumer采用的方式),另一种方法采用linuxI/O模型的poll或者select等,使用一个线程同时管理多个socket连接,即同时与多个broker通信实现并行读取。
一旦consumer订阅了topic,所有的消费逻辑包括coordinator的协调,消费者组的rebalance以及数据的获取会在主逻辑poll方法中一次调用中被执行,这样用户很容易使用一个线程来管理所有的cousumerIO。
对于问题,consumer到底是单线程还是多线程呢?
最新版的kafka是一个多线程或者双线程的java进程,创建kafkaConsumer的称为主线程,同时在后台创建一个心跳线程,该线程被称呼为后台心跳线程。
kafkaConsumer的poll方法在用户主线程中运行,这同时也表明:消费者组的rebalance、消息获取、coordinator管理、异步任务结果的处理、位移提交等操作这些都在主线程中的,因此仔细调优参数至关重要。
Poll使用方法
Consumer订阅topic之后通常以事件循环的方法来获取消息读取,poll方法根据当前consumer的消费位移返回消息集合。当poll首次被调用的时候,新的消费者组会根据位移重设策略(auto.offset.reset)来设定消费者组的位移,一旦consumer开始提交位移,后续的rebalance完成后会将位置设置为上次已提交的位移。传递给poll方法的超时设置参数用于控制consumer等待消息的最大阻塞时间,比如consumer至少需要1M的数据,那么此刻consumer会以阻塞不断累计数据到满足1M,如果不想让consumer一直阻塞,则可以给一个过期时间,一定时间内如果还没有满足,则返回。
1、要么等数据满了。
2、要么等待时间超过了指定时间。
前面我们说了consumer是单线程设计(其实还有一个心跳线程,辅助线程看主线程是否保持心跳,暂不考虑,不承担逻辑),因此consumer应该运行在他的专属线程中。新版本的java consumer不是线程安全的,如果没有显式的同步锁机制保护,kafka会抛出kafkaConsumer is not safe for multi-threaded access
的异常,如果看到了这样的报错,那么说明kafkaConsumer运用在多线程中,对于目前的kafka设计而言,是不被允许的。
我们可以在while条件指定一个布尔变量isRunning来标识是否需要退出consumer消费循环并且结束consumer应用。具体应该是将isRunning标识为volatile,然后其他线程用isRunning=false来控制线程结束。最后千万不要忘记关闭consumer,这不仅会清楚consumer创建的各种socker资源,还会通知消费者coordinator主动离开从而更快的rebalance。比较推荐在finally代码里显式关闭。
位移管理
Consumer位移
Consumer端要为每个它读取的分区保存消费进度,即分区中最新消费消息的位置,该位置就是offset。Consumer需要定期向kafka提交自己的位置信息,实际上,这个信息通常是下一条带消费消息的位置。假设consumer已经读取了某个分区第n条消息,那么他应该提交位移为N,因为位移是从0开始,位移n的位子是n+1条消息。这样conusmer重启时会从第n+1条开始消费。
Offset对于consumer非常重要,因为他们是实现消息交付语义保证(message delivery semantic)的基石,常见的3中消息交付语义保证。
1、最多一次(at most once)处理语义:消息可能丢失,但不会被重复处理。
2、最少一次(at least once)处理语义:消息不会丢失,但可能处理多次。
3、精确一次(exactly)处理语义:消息一定会被处理且只会处理一次。
显然,若consumer在消费之前就提交位移,那么多在位移提交完成之后,消费还未消费就崩溃了,这时候consumer重启,则会从新的位移开始消费,则这个已提交的位移会丢失。相反的,若consumer在消费之后再提交,则可以实现at least once。好消息是这个出现多次处理的情况,已经在kafka0.11.0.0版本得到解决。
上次提交位移(last committed offset):consumer最近一次提交的offset值。
当前位置(current position):consumer已读取但尚未提交的位置。
水位(watermark):也被称为高水位(high watermark),严格来说他不属于conusmer管理范围,而属于分区日志概念,consumer可以读取水位之下的所有消息,水位之上的则不可以读取。
日志终端位移(log end offset,leo):日志的最新位移,同样不属于consumer范畴,而属于分区日志管辖。它表示了某个分区副本当前保存消息对应最大的位移值。值得注意的是,正常情况下leo不会比水位值小。事实上,只有分区所有副本都保存某条消息,该分区leader副本才会向上移动水位值。
版本版consumer位移管理
consumer会在kafka集群所有broker里选一个broker作为consumer group的coordinator,用于实现组成员管理,消费分配方案,位移提交等。和普通的kafka topic相同,该topic配置多个分区,每个分区有多个副本。位移存在的目的就是保存consumer提交的位移。
当消费者组首次启动时,由于没有初识位移信息,coordinator必须为其确定初始位移值,这就是consumer参数auto.offset.reset的作用。通常consumer要么从最早位移开始读取,要么从最新位移开始读取。
Consumer提交位移主要机制通过向所属coordinator发送位移提交请求实现的,每个位移提交都会往_consumer_offsets对应分区上追加一条消息。消息的key是groupid、topic等,而value就是位移值,如果consumer为同一个group的同一个topic分区提交多次位移,那么就会存在多条key相同但value不同的消息,显然我们只关心最新一条。
自动提交和手动提交
位移提交策略对提供消费交付语义至关重要,默认情况下consumer自动提交间隔是5s、这就是说若不做特定设置,consumer可以通过参数auto.commit.interval.ms参数可以控制自动提交间隔。
自动提交位移的优势是降低用户开发成本使得用户不比处理位移提交,劣势用户不能细颗粒度的处理位移提交,特别是强调精确一次处理语义时,这种情况下,用户可以手动位移提交。
典型的consumer场景,用户需要对poll方法返回的消息集合中消息执行业务处理,用户想要保证只有消息被真正处理才去提交位移,如果自动提交则无法保证这种位移时序性,因此这种情况下必须手动提交位移。在构建kafkaConsumer时设置enable.auto.commit=false,然后调用conmmitSync或commitAsync方法即可。
自动提交:默认配置,enable.auto.commit=true。开发简单,无法实现精确控制,位移提交失败后不易处理。可能造成数据丢失,最多实现“最少一次”处理语义。能容忍一定消息丢失。
自动提交:设置enable.autocommit=false。手动调用consumer.commitSync或
consumer.commitAsync位移提交,可以实现“最少一次”处理,依赖外部可以实现“精确一次”处理语义。
手动提交分为异步commitAsync和同步commitSync,如果调用commitSync,用户程序会等待位移提交结束才执行下一条语句。若调用commitAsync则是一个异步阻塞调用,comsumer会在后续poll轮询该位移结果。这里的异步不是指consumer单独的线程进行位移提交,实际上consumer依然会在主线程poll方法中不断轮询这次异步提交结果,只是该提交不会让这个方法阻塞。
当这个无参数的时候,conmmitSync和commitAsync在调用的时候,都会为他订阅的所有分区进行位移提交。其实他还带另外两个参数的重载方式,用户调用这个方法的时候,需要显式指定一个map告诉kafka哪些分区做提交更为合理。实际使用中这种更加合适,因为consumer只对他所拥有的分区进行提交更为合理。
ConsumerRecords<String, String> records= consumer.poll(lOOO); for (TopicParti tion partition : records.partitions()) { List<ConsumerRecord<String, String partit onRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset ()+”:”+ record.value()); } long lastOffset = parti tionRecords. get (partitionRecords. size() - 1) . offset() ; consumer.commitSync(Collections singletonMap(part tion, new OffsetAndMetadata(lastOffset + 1)));
这里特别需要注意的是,提交的位移一定是consumer下一条待读取消息的位移,这也就是为什么offset+1的原因。