一、背景
随着目前业务复杂度的增加,项目中经常需要有大量的跨系统异步任务需要处理。在这种情况下, 我们选择了kafka作为了我们的消息中间件, 选择kafka主要基于以下几点:
- 支持分布式,避免单点问题
- 技术方案成熟,公司内部有上线项目
- 性能优异,能够持久化消息
通常情况下,我们会采取轮询或者随机的方式,通过Kafka的producer向Kafka集群生产数据,来尽可能保证Kafk分区之间的数据是均匀分布的。在分区数据均匀分布的前提下,如果我们针对要处理的topic数据量等因素,设计出合理的Kafka分区数量。对于一些实时任务,比如Spark Streaming/Structured-Streaming、Flink和Kafka集成的应用,消费端不存在长时间"挂掉"的情况即数据一直在持续被消费,那么一般不会产生Kafka数据积压的情况。但是这些都是有前提的,当一些意外或者不合理的分区数设置情况的发生,积压问题就不可避免。
二、遇到的问题及剖析
问题描述
第一次发现问题是在联调的时候,任务执行方发现consumer会打印出错误日志,重复消费,并且陷入循环。当时很快定位到问题:由于Consumer长时间没有发送心跳包,导致触发Rebalance操作, Consumer被踢下线了。
问题剖析
对于这个问题,需要详细讲述一下Kafka Consumer相关的机制。Kafka为了保证Partition分配的高效率, 使用了如下机制:
- 所有的Consumer都要和Coordinator连接;
- Coordinator选出一个Consumer作为Leader来分配Partition;
- Leader分配完以后通知Coordinator, 由Coordinator来通知给其他Consumer;
- 如果一个consumer不能工作了,Coordinator会触发Rebalance机制,重新分配Partition。
Coordinator判定一个Consumer不能工作,底层依靠的就是Heartbeat机制。
Consumer的配置里面有一项是Session_timeout,如果Heartbeat不能在Session_timeout时间内发出一次请求,Coordinator就会触发一次Rebalance操作,重新分配Partition。
三、消息积压典型场景及解决方案
Kafka消息积压的典型场景主要包括以下三种:
1.消费任务宕机
问题描述
比如,我们写的实时应用因为某种原因挂掉了,并且这个任务没有被监控程序监控发现通知相关负责人,负责人又没有写自动拉起任务的脚本进行重启。那么在我们重新启动这个实时应用进行消费之前,这段时间的消息就会被滞后处理,如果数据量很大,可就不是简单重启应用直接消费就能解决的。
解决方案
1)任务重新启动后直接消费最新的消息,对于"滞后"的历史数据采用离线程序进行"补漏"。此外,建议将任务纳入监控体系,当任务出现问题时,及时通知相关负责人处理。当然任务重启脚本也是要有的,还要求实时框架异常处理能力要强,避免数据不规范导致的不能重新拉起任务。
2)任务启动从上次提交offset处开始消费处理如果积压的数据量很大,需要增加任务的处理能力,比如增加资源,让任务能尽可能地快速消费处理,并赶上消费最新的消息
2.Kafka分区不合理(太少)导致消费能力不足
问题描述
Kafka单分区生产消息的速度qps通常很高,如果消费者因为某些原因(比如受业务逻辑复杂度影响,消费时间会有所不同),就会出现消费滞后的情况。此外,Kafka分区数是Kafka并行度调优的最小单元,如果Kafka分区数设置的太少,会影响Kafka consumer消费的吞吐量。
解决方案
如果数据量很大,合理的增加Kafka分区数是关键。如果利用的是Spark流和Kafka direct approach方式,也可以对KafkaRDD进行repartition重分区,增加并行度处理。
3.Kafka消息的key不均匀,导致分区间数据不均衡
问题描述
在使用Kafka producer消息时,可以为消息指定key,但是要求key要均匀,否则会出现Kafka分区间数据不均衡。
解决方案
可以在Kafka producer处,给key加随机后缀,使其均衡。
四、额外需要关注的点
额外需要关注的点:在消费速度不一致的情况下,offset提交处理问题。
Consumer的offset提交是按照TopicPartition作为提交单元的。在Consumer消费过程中,可能会发生Reblance事件,如果当前Consumer分配到的Partition数量大于1个,可能这个Partition会被分配给其他的Consumer。在这个过程中,Consumer已经消费了该条数据,那么在提交Offset的时候,就会遇到CommitOffsetError,因为这个Partition已经不属于自己了。由于我们使用poll函数一次性返回多个数据,加上在消费速度不均衡的情况下offset管理的问题。所以我们必须要手动管理offset, 保存我们上次提交offset的时间和未提交offset的数量,一旦其中某一个达到阈值,就真正地提交offset。