开发者社区> 问答> 正文

Memory Event Store ack 内存释放不及时

环境信息

canal version 1.1.3 mysql version

问题描述

MemoryEventStoreWithBuffer.cleanUntil函数,在对entries内存释放时,已经ack的entries的最后一个数组元素没有设置为null。

步骤重现

1.每次写入一个Event

2.每次读取并一个Event

3.每次Ack一个position

测试case代码,使用debug方式查看MemoryEventStoreWithBuffer.entries元素:

int bufferSize = 64; MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer(); eventStore.setBufferSize(bufferSize); eventStore.setBatchMode(BatchMode.MEMSIZE); eventStore.start();

boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L)); sleep(100L); Assert.assertTrue(result);

sleep(50L); Position first = eventStore.getFirstPosition();

System.out.println("start get"); Events entrys1 = eventStore.tryGet(first, 10); System.out.println("1 get size : " + entrys1.getEvents().size());

eventStore.ack(entrys1.getPositionRange().getEnd());

result = eventStore.tryPut(buildEvent("1", 1L, 2L)); sleep(100L); Assert.assertTrue(result);

entrys1 = eventStore.tryGet(first, 1); System.out.println("2 get size : " + entrys1.getEvents().size());

eventStore.ack(entrys1.getPositionRange().getEnd());

result = eventStore.tryPut(buildEvent("1", 1L, 3L)); sleep(100L); Assert.assertTrue(result);

entrys1 = eventStore.tryGet(first, 1); System.out.println("3 get size : " + entrys1.getEvents().size());

eventStore.ack(entrys1.getPositionRange().getEnd());

result = eventStore.tryPut(buildEvent("1", 1L, 4L)); sleep(100L); Assert.assertTrue(result);

entrys1 = eventStore.tryGet(first, 1); System.out.println("4 get size : " + entrys1.getEvents().size());

eventStore.ack(entrys1.getPositionRange().getEnd());

result = eventStore.tryPut(buildEvent("1", 1L, 5L)); sleep(100L); Assert.assertTrue(result);

entrys1 = eventStore.tryGet(first, 1); System.out.println("5 get size : " + entrys1.getEvents().size());

eventStore.ack(entrys1.getPositionRange().getEnd());

result = eventStore.tryPut(buildEvent("1", 1L, 6L)); sleep(100L); Assert.assertTrue(result);

eventStore.stop();

bug代码位置: MemoryEventStoreWithBuffer.cleanUntil函数:

if (match) {// 找到对应的position,更新ack seq hasMatch = true;

if (batchMode.isMemSize()) {
    ackMemSize.addAndGet(memsize);
    // 尝试清空buffer中的内存,将ack之前的内存全部释放掉
    for (long index = sequence + 1; index < next; index++) {
        entries[getIndex(index)] = null;// 设置为null
    }
}

if (ackSequence.compareAndSet(sequence, next)) {// 避免并发ack
    notFull.signal();
    ackTableRows.addAndGet(deltaRows);
    if (localExecTime > 0) {
        ackExecTime.lazySet(localExecTime);
    }
    return;
}

}

// 尝试清空buffer中的内存,将ack之前的内存全部释放掉 for (long index = sequence + 1; index < next; index++) { entries[getIndex(index)] = null;// 设置为null }

应该为 index <= next: // 尝试清空buffer中的内存,将ack之前的内存全部释放掉 for (long index = sequence + 1; index <= next; index++) { entries[getIndex(index)] = null;// 设置为null }

原提问者GitHub用户tom-tangjp

展开
收起
云上静思 2023-05-04 12:55:15 95 0
1 条回答
写回答
取消 提交回答
  • 中间尝试改过把最后的一个entry也设置为null,但上线之后遇到NPE的情况. 初步怀疑和position对象的include属性有关,暂时未定位具体NPE的代码触发分支

    原回答者GitHub用户agapple

    2023-05-05 10:38:22
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
ACK 云原生弹性方案—云原生时代的加速器 立即下载
ACK集群类型选择最佳实践 立即下载
企业运维之云原生和Kubernetes 实战 立即下载

相关镜像