Fescar example解析 - TM发送逻辑

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 开篇 这篇文章的目的主要是理清楚Fescar的TM发送部分的逻辑,从时序图和源码两个层面进行分析。 文章中间会解答两个自己阅读代码中遇到的困惑(估计大部分人看代码的时候也会遇到这个困惑),包括TmRpcClient的初始化过程和配置加载过程。

开篇

 这篇文章的目的主要是理清楚Fescar的TM发送部分的逻辑,从时序图和源码两个层面进行分析。

 文章中间会解答两个自己阅读代码中遇到的困惑(估计大部分人看代码的时候也会遇到这个困惑),包括TmRpcClient的初始化过程配置加载过程

 文章的最后会附上GlobalAction相关Request的类关系图,便于理解依赖关系。



Fescar TM发送流程

TM Sender.jpg

说明:

  • 1.DefaultGlobalTransaction执行begin/commit/rollback等调用DefaultTransactionManager。
  • 2.DefaultTransactionManager内部调用syncCall()方法,进而调用TmRpcClient的sendMsgWithResponse()方法。
  • 3.TmRpcClient调用父类AbstractRpcRemoting的sendAsyncRequest()方法构建发送队列。
  • 4.AbstractRpcRemotingClient的MergedSendRunnable线程消费发送队列构建MergedWarpMessage调用sendRequest发送。
  • 5.sendRequest()方法内部调用writeAndFlush完成消息发送。



TmRcpClient
说明:

  • TmRpcClient的类依赖关系图如上。
  • TmRpcClient继承自AbstractRpcRemotingClient类。


Fescar TM发送源码分析

public class DefaultTransactionManager implements TransactionManager {

    private static class SingletonHolder {
        private static final TransactionManager INSTANCE = new DefaultTransactionManager();
    }

    /**
     * Get transaction manager.
     *
     * @return the transaction manager
     */
    public static TransactionManager get() {
        return SingletonHolder.INSTANCE;
    }

    private DefaultTransactionManager() {

    }

    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) 
            throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        return response.getXid();
    }

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setTransactionId(txId);
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setTransactionId(txId);
        GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus getStatus(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
        queryGlobalStatus.setTransactionId(txId);
        GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);
        return response.getGlobalStatus();
    }

    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request);
        } catch (TimeoutException toe) {
            throw new TransactionException(TransactionExceptionCode.IO, toe);
        }
    }
}

说明:

  • DefaultTransactionManager的beigin/commit/rollback方法内部最终调用syncCall()方法。
  • syncCall方法内部执行TmRpcClient.getInstance().sendMsgWithResponse(request)调用TmRpcClient方法。


public final class TmRpcClient extends AbstractRpcRemotingClient {
    @Override
    public Object sendMsgWithResponse(Object msg) throws TimeoutException {
        return sendMsgWithResponse(msg, NettyClientConfig.getRpcRequestTimeout());
    }

    @Override
    public Object sendMsgWithResponse(String serverAddress, Object msg, long timeout)
        throws TimeoutException {
        return sendAsyncRequestWithResponse(serverAddress, connect(serverAddress), msg, timeout);
    }
}

说明:

  • TmRpcClient内部执行发送sendMsgWithResponse调用sendAsyncRequestWithResponse。
  • sendAsyncRequestWithResponse的实现在父类AbstractRpcRemoting当中。


public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {

    protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg, long timeout) throws
        TimeoutException {
        if (timeout <= 0) {
            throw new FrameworkException("timeout should more than 0ms");
        }
        return sendAsyncRequest(address, channel, msg, timeout);
    }

    private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout)
        throws TimeoutException {
        if (channel == null) {
            LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel.");
            return null;
        }

        // 构建RpcMessage对象
        final RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setId(RpcMessage.getNextMessageId());
        rpcMessage.setAsync(false);
        rpcMessage.setHeartbeat(false);
        rpcMessage.setRequest(true);
        rpcMessage.setBody(msg);

        // 通过MessageFuture包装实现超时
        final MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeout);
        futures.put(rpcMessage.getId(), messageFuture);

        // 测试代码走的是这个分支
        if (address != null) {
            // 根据address进行hash放置到不同的Map当中
            ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
            BlockingQueue<RpcMessage> basket = map.get(address);
            if (basket == null) {
                map.putIfAbsent(address, new LinkedBlockingQueue<RpcMessage>());
                basket = map.get(address);
            }
            basket.offer(rpcMessage);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("offer message: " + rpcMessage.getBody());
            }

            // 发送其实是另外一个线程单独执行发送操作的
            if (!isSending) {
                synchronized (mergeLock) {
                    mergeLock.notifyAll();
                }
            }
        } else {
            ChannelFuture future;
            channelWriteableCheck(channel, msg);
            future = channel.writeAndFlush(rpcMessage);
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (!future.isSuccess()) {
                        MessageFuture messageFuture = futures.remove(rpcMessage.getId());
                        if (messageFuture != null) {
                            messageFuture.setResultMessage(future.cause());
                        }
                        destroyChannel(future.channel());
                    }
                }
            });
        }

        // 通过Future实现限时超时机制
        if (timeout > 0) {
            try {
                return messageFuture.get(timeout, TimeUnit.MILLISECONDS);
            } catch (Exception exx) {
                LOGGER.error("wait response error:" + exx.getMessage() + ",ip:" + address + ",request:" + msg);
                if (exx instanceof TimeoutException) {
                    throw (TimeoutException)exx;
                } else {
                    throw new RuntimeException(exx);
                }
            }
        } else {
            return null;
        }
    }
}

说明:

  • 构建RpcMessage对象,包装Request。
  • 构建MessageFuture对象,包装RpcMessage,实现超时等待功能。
  • 通过basket进行分桶操作,真正执行发送的代码在AbstractRpcRemotingClient类的MergedSendRunnable。
  • Request的发送类似生成消费者模型,上述代码只是生产者部分。


public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting
    implements RemotingService, RegisterMsgListener, ClientMessageSender {

    public class MergedSendRunnable implements Runnable {

        @Override
        public void run() {
            while (true) {
                synchronized (mergeLock) {
                    try {
                        mergeLock.wait(MAX_MERGE_SEND_MILLS);
                    } catch (InterruptedException e) {}
                }
                isSending = true;
                for (String address : basketMap.keySet()) {
                    BlockingQueue<RpcMessage> basket = basketMap.get(address);
                    if (basket.isEmpty()) { continue; }

                    MergedWarpMessage mergeMessage = new MergedWarpMessage();
                    while (!basket.isEmpty()) {
                        RpcMessage msg = basket.poll();
                        mergeMessage.msgs.add((AbstractMessage)msg.getBody());
                        mergeMessage.msgIds.add(msg.getId());
                    }
                    if (mergeMessage.msgIds.size() > 1) {
                        printMergeMessageLog(mergeMessage);
                    }
                    Channel sendChannel = connect(address);
                    try {
                        sendRequest(sendChannel, mergeMessage);
                    } catch (FrameworkException e) {
                        if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable
                            && address != null) {
                            destroyChannel(address, sendChannel);
                        }
                        LOGGER.error("", "client merge call failed", e);
                    }
                }
                isSending = false;
            }
        }
}

说明:

  • MergedSendRunnable 负责消费待发送消息体并组装成MergedWarpMessage对象。
  • sendRequest()方法内部将MergedWarpMessage再次包装成RpcMessage进行发送。


public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {

    protected void sendRequest(Channel channel, Object msg) {
        RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setAsync(true);
        rpcMessage.setHeartbeat(msg instanceof HeartbeatMessage);
        rpcMessage.setRequest(true);
        rpcMessage.setBody(msg);
        rpcMessage.setId(RpcMessage.getNextMessageId());
        if (msg instanceof MergeMessage) {
            mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)msg);
        }
        channelWriteableCheck(channel, msg);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"
                + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
        }
        channel.writeAndFlush(rpcMessage);
    }
}

说明:

  • RpcMessage再次包装MergeMessage进行发送。


TmRpcClient初始化

public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean {

    public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode,
                                    FailureHandler failureHandlerHook) {
        setOrder(ORDER_NUM);
        setProxyTargetClass(true);
        this.applicationId = applicationId;
        this.txServiceGroup = txServiceGroup;
        this.mode = mode;
        this.failureHandlerHook = failureHandlerHook;
    }

    private void initClient() {

        TMClient.init(applicationId, txServiceGroup);
        if ((AT_MODE & mode) > 0) {
            RMClientAT.init(applicationId, txServiceGroup);
        }
    }

    public void afterPropertiesSet() {
        initClient();
    }
}

说明:

  • GlobalTransactionScanner的构造函数执行后执行afterPropertiesSet并执行initClient()操作。
  • initClient()内部执行TMClient.init(applicationId, txServiceGroup)进行TMClient的初始化。


public class TMClient {
    public static void init(String applicationId, String transactionServiceGroup) {
        TmRpcClient tmRpcClient = TmRpcClient.getInstance(
                 applicationId, transactionServiceGroup);
        tmRpcClient.init();
    }
}

public final class TmRpcClient extends AbstractRpcRemotingClient {
    public void init() {
        if (initialized.compareAndSet(false, true)) {
            init(SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS);
        }
    }


    public void init(long healthCheckDelay, long healthCheckPeriod) {
        // 注意initVars()方法
        initVars();

        ExecutorService mergeSendExecutorService = new ThreadPoolExecutor(
           MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD,
           KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, 
           new LinkedBlockingQueue<Runnable>(),
           new NamedThreadFactory(getThreadPrefix(MERGE_THREAD_PREFIX), 
                        MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    reconnect();
                } catch (Exception ignore) {
                    LOGGER.error(ignore.getMessage());
                }
            }
        }, healthCheckDelay, healthCheckPeriod, TimeUnit.SECONDS);
    }


    private void initVars() {
        enableDegrade = CONFIG.getBoolean(
        ConfigurationKeys.SERVICE_PREFIX + ConfigurationKeys.ENABLE_DEGRADE_POSTFIX);
        super.init();
    }
}

说明:

  • 核心在于关注initVars()方法。


public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting
    implements RemotingService, RegisterMsgListener, ClientMessageSender {

    public void init() {
        NettyPoolableFactory keyPoolableFactory = new NettyPoolableFactory(this);
        // 核心构建发送的对象的连接池
        nettyClientKeyPool = new GenericKeyedObjectPool(keyPoolableFactory);
        nettyClientKeyPool.setConfig(getNettyPoolConfig());
        serviceManager = new ServiceManagerStaticConfigImpl();
        super.init();
    }
}


public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {
    public void init() {
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                List<MessageFuture> timeoutMessageFutures = new ArrayList<MessageFuture>(futures.size());

                for (MessageFuture future : futures.values()) {
                    if (future.isTimeout()) {
                        timeoutMessageFutures.add(future);
                    }
                }

                for (MessageFuture messageFuture : timeoutMessageFutures) {
                    futures.remove(messageFuture.getRequestMessage().getId());
                    messageFuture.setResultMessage(null);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("timeout clear future : " + messageFuture.getRequestMessage().getBody());
                    }
                }
                nowMills = System.currentTimeMillis();
            }
        }, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
    }
}

说明:

  • AbstractRpcRemotingClient的init()方法核心构建nettyClientKeyPool工厂。
  • nettyClientKeyPool用于获取连接TC的对象的工厂池。


配置加载分析

Config

public class FileConfiguration implements Configuration {

    private static final Logger LOGGER = LoggerFactory.getLogger(FileConfiguration.class);

    private static final Config CONFIG = ConfigFactory.load();
}


package com.typesafe.config;
public final class ConfigFactory {
    private ConfigFactory() {
    }

    public static Config load() {
        return load(ConfigParseOptions.defaults());
    }
}

说明:

  • 配置加载使用了JAVA 配置管理库 typesafe.config
  • 默认加载classpath下的application.conf,application.json和application.properties文件。通过ConfigFactory.load()加载。

Request的类关系图

GlobalActionRequest.png

目录
相关文章
|
Web App开发 前端开发 JavaScript
Dapp技术开发全解析丨附DAPP源码的逻辑实例解析
智能合约是Dapp的核心组成部分,它是一种基于区块链技术的自动化合约,可以执行预定义的操作和条件。智能合约的执行是去中心化的,可以在区块链上自动执行,无需中间方参与。智能合约的开发需要熟悉Solidity等编程语言,同时需要了解智能合约的安全性问题。
|
存储 区块链 数据安全/隐私保护
DApp互助预约排单系统开发设计规则逻辑解析
DApp互助预约排单系统开发设计规则逻辑解析
|
SQL 安全 网络安全
交易所开发测试版丨交易所系统开发规则玩法/架构设计/项目步骤/方案逻辑/案例解析/源码部署
The development process of the exchange system involves multiple steps and links. The following is the detailed process and steps for the development of the exchange system:
|
2月前
|
图形学 开发者 UED
Unity游戏开发必备技巧:深度解析事件系统运用之道,从生命周期回调到自定义事件,打造高效逻辑与流畅交互的全方位指南
【8月更文挑战第31天】在游戏开发中,事件系统是连接游戏逻辑与用户交互的关键。Unity提供了多种机制处理事件,如MonoBehaviour生命周期回调、事件系统组件及自定义事件。本文介绍如何有效利用这些机制,包括创建自定义事件和使用Unity内置事件系统提升游戏体验。通过合理安排代码执行时机,如在Awake、Start等方法中初始化组件,以及使用委托和事件处理复杂逻辑,可以使游戏更加高效且逻辑清晰。掌握这些技巧有助于开发者更好地应对游戏开发挑战。
129 0
|
3月前
|
JSON 数据格式 Java
化繁为简的魔法:Struts 2 与 JSON 联手打造超流畅数据交换体验,让应用飞起来!
【8月更文挑战第31天】在现代 Web 开发中,JSON 成为数据交换的主流格式,以其轻量、易读和易解析的特点受到青睐。Struts 2 内置对 JSON 的支持,结合 Jackson 库可便捷实现数据传输。本文通过具体示例展示了如何在 Struts 2 中进行 JSON 数据的序列化与反序列化,并结合 AJAX 技术提升 Web 应用的响应速度和用户体验。
118 0
|
3月前
|
开发者 iOS开发 C#
Uno Platform 入门超详细指南:从零开始教你打造兼容 Web、Windows、iOS 和 Android 的跨平台应用,轻松掌握 XAML 与 C# 开发技巧,快速上手示例代码助你迈出第一步
【8月更文挑战第31天】Uno Platform 是一个基于 Microsoft .NET 的开源框架,支持使用 C# 和 XAML 构建跨平台应用,适用于 Web(WebAssembly)、Windows、Linux、macOS、iOS 和 Android。它允许开发者共享几乎全部的业务逻辑和 UI 代码,同时保持原生性能。选择 Uno Platform 可以统一开发体验,减少代码重复,降低开发成本。安装时需先配置好 Visual Studio 或 Visual Studio for Mac,并通过 NuGet 或官网下载工具包。
325 0
|
5月前
|
自然语言处理 JavaScript 前端开发
【JavaScript】JavaScript基础知识强化:变量提升、作用域逻辑及TDZ的全面解析
【JavaScript】JavaScript基础知识强化:变量提升、作用域逻辑及TDZ的全面解析
65 3
|
6月前
|
存储 SQL 关系型数据库
drds逻辑表与物理解析
drds逻辑表与物理解析
116 5
|
6月前
|
编解码 计算机视觉 Python
IPC机制在jetson中实现硬解码视频流数据通信的逻辑解析
IPC机制在jetson中实现硬解码视频流数据通信的逻辑解析
192 0
|
6月前
|
测试技术 数据库
深入解析MyBatis-Plus中的逻辑删除功能及实例
深入解析MyBatis-Plus中的逻辑删除功能及实例
960 0

推荐镜像

更多
下一篇
无影云桌面