public void cleanUntil(Position position, Long seqId) throws CanalStoreException { final ReentrantLock lock = this.lock; lock.lock(); try { ...... for (long next = sequence + 1; next <= maxSequence; next++) { Event event = entries[getIndex(next)]; if (localExecTime == 0 && event.getExecuteTime() > 0) { localExecTime = event.getExecuteTime(); } deltaRows += event.getRowsCount(); memsize += calculateSize(event); if ((seqId < 0 || next == seqId) && CanalEventUtils.checkPosition(event, (LogPosition) position)) { // 找到对应的position,更新ack seq hasMatch = true; /////////////////////////////////////////////////////// //这里为什么一定要MENSIZE的时候才进行entries[getIndex(index)] = null操作 ////////////////////////////////////////////////////// if (batchMode.isMemSize()) { ackMemSize.addAndGet(memsize); // 尝试清空buffer中的内存,将ack之前的内存全部释放掉 for (long index = sequence + 1; index < next; index++) { entries[getIndex(index)] = null;// 设置为null }
// 考虑getFirstPosition/getLastPosition会获取最后一次ack的position信息
// ack清理的时候只处理entry=null,释放内存
Event lastEvent = entries[getIndex(next)];
lastEvent.setEntry(null);
lastEvent.setRawEntry(null);
}
if (ackSequence.compareAndSet(sequence, next)) {// 避免并发ack
notFull.signal();
ackTableRows.addAndGet(deltaRows);
if (localExecTime > 0) {
ackExecTime.lazySet(localExecTime);
}
return;
}
}
}
if (!hasMatch) {// 找不到对应需要ack的position
throw new CanalStoreException("no match ack position" + position.toString());
}
} finally {
lock.unlock();
}
}
原提问者GitHub用户tom-tangjp
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。