flink任务上线运行有一段时间了一直很稳定,这两天忽然发现有些“丢数据”的现象。这是怎么回事呢?用这篇文章记录一下踩坑之路。
任务要求:
我们这个任务要达到的效果是汇总每月的业务量,需要按照具体分类分组。业务量绝对不会出现负数的问题,但是现在汇总出来的居然存在负数!肯定是某个环节出现了问题。
排查思路:
我这个任务使用的是flink的table api操作比较简单。最先发现数据不正常是发现flink汇总出来的数据比实际要少,然后就开始去跟踪中间件kafka中的消息。意外的发现有些分类下的数据居然是负数!当时看到非常震惊。然后开始排查flink代码,这块计算就是简单分组后进行sum。sum怎么会出现负数呢?百思不得其解,最终经过大半天的天马行空的猜想得出个结论-玄学!当然了肯定不是玄学,所谓玄学往往出问题就在我们认为这个东西绝对不会有问题,但是现在有问题了。我们就将其归类为玄学。于是按照这个想法我开始慢慢回归问题本身。
问题本身就在于SUM出现了负数,我觉得很难理解。所以那就干脆不理解,就认为SUM不可能是负数,那出现负数的原因只能是因为在做减法的时候减到负数。有了这个思路之后我开始联想到flink的状态保存有个失效机制。如果该状态迟迟没有更新直到超过我们设置的过期时间,这个状态就会过期。当flink真正监听到该状态需要被删除时又会进行删除。如此一来一个状态被删了两次就会出现负数。
解决办法:
setIdleStateRetentionTime()使用此方法设定过期时间。将该时间设置大于任务所需时间即可解决负数问题。但是这样写有弊端就是会保存很多冗余的状态。不会实时的清除。这个我们后续在讨论。
希望本篇文章可以对同是flink小白的你有所帮助。
转载请注明出处。