线上是同事写的flume-kafkaplugin来实现对kafka消费到其他终端的,不过最近遇到几个莫名的case.
现象:flume消费延迟,因为当时比较紧急,同事想把延迟的数据丢掉从新的点开始追,就清了offset,重启就开始消费新的地方了,不过我记得0.7.2kafka的默认autooffset.reset是smallest应该没用啊,后来才知道同事hard code的一些参数:
props.put("zk.sessiontimeout.ms","60000");
props.put("fetch.size",String.valueOf(Integer.parseInt((getBatchSize(context))) * 300 * 1024));
props.put("autocommit.enable","false");
props.put("queuedchunks.max","100000");
props.put("autooffset.reset","largest");
props.put("socket.buffersize","102400000");
props.put("socket.timeout.ms","60000");
怪不得设置了batchsize会报message too big的exception=.=
不过即使丢弃了消息还是没解决问题,我们又把offset修改了回去.主要是flume的消费能力是平时的一半,必须要找到影响这个的因素.
因为比较紧急,临时增加了一倍consumer“解决了”消费能力的问题.
后来发现,当时这个consumergroup连接的zk connect string的zk fd超过了ulimit所设置的,不过连接数才到3K而已,这是为什么呢?
查了一下issue list发现kafka 0.7.2依赖的zk 3.3.4(KAFKA-318)有file descriptor泄漏的bug:ZOOKEEPER-1309(3.3.4 bug太多了,还有升级回滚问题1149)
后来同事测试了一下3.4.5基本上是兼容了,升级解决.
最后再插一个kafka的bug,就是即使autocommit被关闭了,这个版本的kafka在rebalance时还是会flush offset,也多亏这个Bug(KAKFA-919)了,否则不知道有多少重复数据了=.=
PS:不过线上使用的话还是把autocommit打开吧,可以将interval调大一点否则会造成不少的麻烦.不过打开这个还要注意另外一个Bug(KAFKA-601),ZK对于KAFKA还是很关键的,而下一篇就是和ZK相关了.
本文转自MIKE老毕 51CTO博客,原文链接:http://blog.51cto.com/boylook/1335673,如需转载请自行联系原作者