开发者社区> 问答> 正文

提出一个关于MemoryMetaManager优化的建议

在项目运行启动订阅时有遇到一个异常(项目采用的是CanalServerWithEmbedded内嵌,关键的配置MetaMode.MIXED & IndexMode.MEMORY_META_FAILBACK),栈输出如下:

dump address /xxx.xx.xx.xxx:3306 has an error, retrying. caused by

java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) at java.util.ArrayList$Itr.next(ArrayList.java:851) at com.alibaba.otter.canal.parse.index.MetaLogPositionManager.getLatestIndexBy(MetaLogPositionManager.java:56) at com.alibaba.otter.canal.parse.index.FailbackLogPositionManager.getLatestIndexBy(FailbackLogPositionManager.java:68) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.findStartPositionInternal(MysqlEventParser.java:567) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.findStartPosition(MysqlEventParser.java:509) at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:167) at java.lang.Thread.run(Thread.java:745)

看代码发现CanalServerWithEmbedded中的subscribe方法中有调用MetaManager的subscribe方法,最终会调用到MemoryMetaManager的subscribe方法,在得到destinations中key对应的list后,修改list(向list里add元素)。代码:

public synchronized void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
    List<ClientIdentity> clientIdentitys = destinations.get(clientIdentity.getDestination());

    if (clientIdentitys.contains(clientIdentity)) {
        clientIdentitys.remove(clientIdentity);
    }

    clientIdentitys.add(clientIdentity);
}

而MemoryMetaManager类中还有一个方法listAllSubscribeInfo会直接返回destinations中key对应的list。代码:

public synchronized List<ClientIdentity> listAllSubscribeInfo(String destination) throws CanalMetaManagerException {
    return destinations.get(destination);
}

我遇到的问题是在MetaLogPositionManager的getLatestIndexBy方法遍历metaManager.listAllSubscribeInfo调用结果时,业务代码线程调用了subscribe方法修改了遍历的list集合,导致了ConcurrentModificationException。

public LogPosition getLatestIndexBy(String destination) {
    List<ClientIdentity> clientIdentities = metaManager.listAllSubscribeInfo(destination);
    LogPosition result = null;
    if (!CollectionUtils.isEmpty(clientIdentities)) {
        // 尝试找到一个最小的logPosition
        for (ClientIdentity clientIdentity : clientIdentities) {  //在此处遍历时链表被其它线程修改
            LogPosition position = (LogPosition) metaManager.getCursor(clientIdentity);
            if (position == null) {
                continue;
            }

            if (result == null) {
                result = position;
            } else {
                result = CanalEventUtils.min(result, position);
            }
        }
    }

    return result;
}

这里感觉在MemoryMetaManager中的listAllSubscribeInfo方法有比较大的隐患,外部拿到引用以后可能在不同的线程中做遍历或者修改。 如果这里每次都拷贝一个新list返回的话不知道对效率上的损失是否太大,如果不希望过多拷贝的话可以返回unmodifiableList,阻止其他类对这个集合的修改,但是这样在其它类遍历这个集合时还是可能出现我这种ConcurrentModificationException。

想问一下你对这个问题是怎么考虑的呢

原提问者GitHub用户zwangbo

展开
收起
绿子直子 2023-05-09 10:25:17 43 0
1 条回答
写回答
取消 提交回答
  • listAllSubscribeInfo修改为返回一个拷贝对象

    原回答者GitHub用户agapple

    2023-05-10 09:36:54
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载