开发者社区 > 云原生 > 正文

xa模式获取channel导致的资源悬挂问题

问题描述

xa模式,获取channel导致的资源悬挂问题

发生的情况

XA模式下,TC日志分布式事务回滚成功,实际上数据库中的分支事务未回滚,导致资源永远无法处理。

期待的结果

TC日志记录的分布式事务回滚成功,数据库中的分支事务也应该回滚成功。

复制回顾

order,account,storage三个服务,resource_id是相同的,也就是相同数据库,不同应用的场景下

1.开启了一个全局事务 2.然后order和tm的channel断了 3.tm会选择一个resource_id相同的服务,作为替代,比如account 4.发生xaer_note的异常,最终会发生资源悬挂的问题。

附加信息

public static Channel getChannel(String resourceId, String clientId) {
    Channel resultChannel = null;

    String[] clientIdInfo = readClientId(clientId);

    if (clientIdInfo == null || clientIdInfo.length != 3) {
        throw new FrameworkException("Invalid Client ID: " + clientId);
    }

    String targetApplicationId = clientIdInfo[0];
    String targetIP = clientIdInfo[1];
    int targetPort = Integer.parseInt(clientIdInfo[2]);

    ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
        RpcContext>>> applicationIdMap = RM_CHANNELS.get(resourceId);

    if (targetApplicationId == null || applicationIdMap == null ||  applicationIdMap.isEmpty()) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("No channel is available for resource[{}]", resourceId);
        }
        return null;
    }

    ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap = applicationIdMap.get(targetApplicationId);

    if (ipMap != null && !ipMap.isEmpty()) {
        // Firstly, try to find the original channel through which the branch was registered.
        ConcurrentMap<Integer, RpcContext> portMapOnTargetIP = ipMap.get(targetIP);
        if (portMapOnTargetIP != null && !portMapOnTargetIP.isEmpty()) {
            RpcContext exactRpcContext = portMapOnTargetIP.get(targetPort);
            if (exactRpcContext != null) {
                Channel channel = exactRpcContext.getChannel();
                if (channel.isActive()) {
                    resultChannel = channel;
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Just got exactly the one {} for {}", channel, clientId);
                    }
                } else {
                    if (portMapOnTargetIP.remove(targetPort, exactRpcContext)) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("Removed inactive {}", channel);
                        }
                    }
                }
            }

            // The original channel was broken, try another one.
            if (resultChannel == null) {
                for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnTargetIPEntry : portMapOnTargetIP
                    .entrySet()) {
                    Channel channel = portMapOnTargetIPEntry.getValue().getChannel();

                    if (channel.isActive()) {
                        resultChannel = channel;
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info(
                                "Choose {} on the same IP[{}] as alternative of {}", channel, targetIP, clientId);
                        }
                        break;
                    } else {
                        if (portMapOnTargetIP.remove(portMapOnTargetIPEntry.getKey(),
                            portMapOnTargetIPEntry.getValue())) {
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info("Removed inactive {}", channel);
                            }
                        }
                    }
                }
            }
        }

        // No channel on the this app node, try another one.
        if (resultChannel == null) {
            for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> ipMapEntry : ipMap
                .entrySet()) {
                if (ipMapEntry.getKey().equals(targetIP)) { continue; }

                ConcurrentMap<Integer, RpcContext> portMapOnOtherIP = ipMapEntry.getValue();
                if (portMapOnOtherIP == null || portMapOnOtherIP.isEmpty()) {
                    continue;
                }

                for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnOtherIPEntry : portMapOnOtherIP.entrySet()) {
                    Channel channel = portMapOnOtherIPEntry.getValue().getChannel();

                    if (channel.isActive()) {
                        resultChannel = channel;
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("Choose {} on the same application[{}] as alternative of {}", channel, targetApplicationId, clientId);
                        }
                        break;
                    } else {
                        if (portMapOnOtherIP.remove(portMapOnOtherIPEntry.getKey(),
                            portMapOnOtherIPEntry.getValue())) {
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info("Removed inactive {}", channel);
                            }
                        }
                    }
                }
                if (resultChannel != null) { break; }
            }
        }
    }

    if (resultChannel == null) {
        resultChannel = tryOtherApp(applicationIdMap, targetApplicationId);

        if (resultChannel == null) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("No channel is available for resource[{}] as alternative of {}", resourceId, clientId);
            }
        } else {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Choose {} on the same resource[{}] as alternative of {}", resultChannel, resourceId, clientId);
            }
        }
    }

    return resultChannel;

}

环境信息

JDK version : 1.8 Seata version: 1.4.2

原提问者GitHub用户lcmvs

展开
收起
学习娃 2023-06-14 17:05:26 70 0
1 条回答
写回答
取消 提交回答
  • 1.支持resourceid的自定义,来区分多个表

    2.升级上次的方案,用分布式锁和分布式缓存,串行化branch的提交和二阶段下发的行为 你觉得哪个更合适,如果自定义resourceid的话,改动小,感觉也能接受,个人觉得可以支持前者,这样也不耦合其他组件

    原回答者GitHub用户a364176773

    2023-06-14 17:33:02
    赞同 展开评论 打赏
问答分类:
问答地址:

阿里云拥有国内全面的云原生产品技术以及大规模的云原生应用实践,通过全面容器化、核心技术互联网化、应用 Serverless 化三大范式,助力制造业企业高效上云,实现系统稳定、应用敏捷智能。拥抱云原生,让创新无处不在。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载