开发者社区> 问答> 正文

MemoryEventStoreWithBuffer使用ITEMSIZE模式,cleanUntil为

public void cleanUntil(Position position, Long seqId) throws CanalStoreException { final ReentrantLock lock = this.lock; lock.lock(); try { ...... for (long next = sequence + 1; next <= maxSequence; next++) { Event event = entries[getIndex(next)]; if (localExecTime == 0 && event.getExecuteTime() > 0) { localExecTime = event.getExecuteTime(); } deltaRows += event.getRowsCount(); memsize += calculateSize(event); if ((seqId < 0 || next == seqId) && CanalEventUtils.checkPosition(event, (LogPosition) position)) { // 找到对应的position,更新ack seq hasMatch = true; /////////////////////////////////////////////////////// //这里为什么一定要MENSIZE的时候才进行entries[getIndex(index)] = null操作 ////////////////////////////////////////////////////// if (batchMode.isMemSize()) { ackMemSize.addAndGet(memsize); // 尝试清空buffer中的内存,将ack之前的内存全部释放掉 for (long index = sequence + 1; index < next; index++) { entries[getIndex(index)] = null;// 设置为null }

                // 考虑getFirstPosition/getLastPosition会获取最后一次ack的position信息
                // ack清理的时候只处理entry=null,释放内存
                Event lastEvent = entries[getIndex(next)];
                lastEvent.setEntry(null);
                lastEvent.setRawEntry(null);
            }

            if (ackSequence.compareAndSet(sequence, next)) {// 避免并发ack
                notFull.signal();
                ackTableRows.addAndGet(deltaRows);
                if (localExecTime > 0) {
                    ackExecTime.lazySet(localExecTime);
                }
                return;
            }
        }
    }
    if (!hasMatch) {// 找不到对应需要ack的position
        throw new CanalStoreException("no match ack position" + position.toString());
    }
} finally {
    lock.unlock();
}

}

原提问者GitHub用户tom-tangjp

展开
收起
数据大拿 2023-05-04 11:40:21 62 0
1 条回答
写回答
取消 提交回答
  • 已经有清理消费过的事件,entries[xx] = null

    原回答者GitHub用户agapple

    2023-05-05 10:18:33
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载