环境信息
canal version 1.1.3 mysql version
问题描述
MemoryEventStoreWithBuffer.cleanUntil函数,在对entries内存释放时,已经ack的entries的最后一个数组元素没有设置为null。
步骤重现
1.每次写入一个Event
2.每次读取并一个Event
3.每次Ack一个position
测试case代码,使用debug方式查看MemoryEventStoreWithBuffer.entries元素:
int bufferSize = 64; MemoryEventStoreWithBuffer eventStore = new MemoryEventStoreWithBuffer(); eventStore.setBufferSize(bufferSize); eventStore.setBatchMode(BatchMode.MEMSIZE); eventStore.start();
boolean result = eventStore.tryPut(buildEvent("1", 1L, 1L)); sleep(100L); Assert.assertTrue(result);
sleep(50L); Position first = eventStore.getFirstPosition();
System.out.println("start get"); Events entrys1 = eventStore.tryGet(first, 10); System.out.println("1 get size : " + entrys1.getEvents().size());
eventStore.ack(entrys1.getPositionRange().getEnd());
result = eventStore.tryPut(buildEvent("1", 1L, 2L)); sleep(100L); Assert.assertTrue(result);
entrys1 = eventStore.tryGet(first, 1); System.out.println("2 get size : " + entrys1.getEvents().size());
eventStore.ack(entrys1.getPositionRange().getEnd());
result = eventStore.tryPut(buildEvent("1", 1L, 3L)); sleep(100L); Assert.assertTrue(result);
entrys1 = eventStore.tryGet(first, 1); System.out.println("3 get size : " + entrys1.getEvents().size());
eventStore.ack(entrys1.getPositionRange().getEnd());
result = eventStore.tryPut(buildEvent("1", 1L, 4L)); sleep(100L); Assert.assertTrue(result);
entrys1 = eventStore.tryGet(first, 1); System.out.println("4 get size : " + entrys1.getEvents().size());
eventStore.ack(entrys1.getPositionRange().getEnd());
result = eventStore.tryPut(buildEvent("1", 1L, 5L)); sleep(100L); Assert.assertTrue(result);
entrys1 = eventStore.tryGet(first, 1); System.out.println("5 get size : " + entrys1.getEvents().size());
eventStore.ack(entrys1.getPositionRange().getEnd());
result = eventStore.tryPut(buildEvent("1", 1L, 6L)); sleep(100L); Assert.assertTrue(result);
eventStore.stop();
bug代码位置: MemoryEventStoreWithBuffer.cleanUntil函数:
if (match) {// 找到对应的position,更新ack seq hasMatch = true;
if (batchMode.isMemSize()) {
ackMemSize.addAndGet(memsize);
// 尝试清空buffer中的内存,将ack之前的内存全部释放掉
for (long index = sequence + 1; index < next; index++) {
entries[getIndex(index)] = null;// 设置为null
}
}
if (ackSequence.compareAndSet(sequence, next)) {// 避免并发ack
notFull.signal();
ackTableRows.addAndGet(deltaRows);
if (localExecTime > 0) {
ackExecTime.lazySet(localExecTime);
}
return;
}
}
// 尝试清空buffer中的内存,将ack之前的内存全部释放掉 for (long index = sequence + 1; index < next; index++) { entries[getIndex(index)] = null;// 设置为null }
应该为 index <= next: // 尝试清空buffer中的内存,将ack之前的内存全部释放掉 for (long index = sequence + 1; index <= next; index++) { entries[getIndex(index)] = null;// 设置为null }
原提问者GitHub用户tom-tangjp
中间尝试改过把最后的一个entry也设置为null,但上线之后遇到NPE的情况. 初步怀疑和position对象的include属性有关,暂时未定位具体NPE的代码触发分支
原回答者GitHub用户agapple
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。