这篇开始,介绍一下seata的源代码。我们再看一下seata官方TCC模式下的这张图片:
而RM和TC的职责如下:
Transaction Coordinator(TC): Maintain status of global and branch transactions, drive the global commit or rollback. Resource Manager(RM): Manage resources that branch transactions working on, talk to TC for registering branch transactions and reporting status of branch transactions, and drive the branch transaction commit or rollback.
简单概括就是RM管理分支事务,而TC管理着全局事务和分支事务的状态。
说明:本文的代码基于文件方式注册TC(seata-server)来讲解,基于seata最新代码。
初始化流程
我们启动一个集成seata的服务,这个服务自身就是一个RM,启动后第一步就是注册到TC,我们看一下启动日志:
2020-08-30 16:51:32.946 INFO 1108808 --- [eoutChecker_1_1] i.s.c.r.netty.NettyClientChannelManager : will connect to 192.168.59.132:8091 2020-08-30 16:51:32.950 INFO 1108808 --- [eoutChecker_1_1] i.s.core.rpc.netty.NettyPoolableFactory : NettyPool create channel to transactionRole:TMROLE,address:192.168.59.132:8091,msg:< RegisterTMRequest{applicationId='order-server', transactionServiceGroup='my_test_tx_group'} > 2020-08-30 16:51:33.276 INFO 1108808 --- [eoutChecker_1_1] i.s.c.rpc.netty.TmNettyRemotingClient : register TM success. client version:1.3.0, server version:1.3.0,channel:[id: 0xfabece13, L:/192.168.59.1:63274 - R:/192.168.59.132:8091] 2020-08-30 16:51:33.277 INFO 1108808 --- [eoutChecker_1_1] i.s.core.rpc.netty.NettyPoolableFactory : register success, cost 261 ms, version:1.3.0,role:TMROLE,channel:[id: 0xfabece13, L:/192.168.59.1:63274 - R:/192.168.59.132:8091]
RM注册为TC的客户端,我们就从这个客户端初始化开始,这个逻辑在类GlobalTransactionScanner中,URL类图如下:
GlobalTransactionScanner这个类的初始化在GlobalTransactionAutoConfiguration类,这个类是一个spring的Configuration 类,里面定义了Bean(GlobalTransactionScanner),代码如下:
@Configuration @EnableConfigurationProperties(SeataProperties.class) public class GlobalTransactionAutoConfiguration { private final ApplicationContext applicationContext; private final SeataProperties seataProperties; public GlobalTransactionAutoConfiguration(ApplicationContext applicationContext, SeataProperties seataProperties) { this.applicationContext = applicationContext; this.seataProperties = seataProperties; } @Bean public GlobalTransactionScanner globalTransactionScanner() { String applicationName = applicationContext.getEnvironment() .getProperty("spring.application.name");//来自application.yml参数(见下面注意) String txServiceGroup = seataProperties.getTxServiceGroup();//seataProperties定义了txServiceGroup变量,这个变量对应application.yml参数(见下面注意) if (StringUtils.isEmpty(txServiceGroup)) { txServiceGroup = applicationName + "-fescar-service-group"; seataProperties.setTxServiceGroup(txServiceGroup); } return new GlobalTransactionScanner(applicationName, txServiceGroup); } }
注意:上面方法中的2个参数正是来自我们服务中的application.yml文件,代码如下:
spring: application: name: storage-server cloud: alibaba: seata: tx-service-group: my_test_tx_group
上面的globalTransactionScanner这个bean实现了spring的InitializingBean接口,初始化结束后会向TC注册客户端,代码如下:
public void afterPropertiesSet() { if (disableGlobalTransaction) {//在seata的file.conf文件中配置 if (LOGGER.isInfoEnabled()) { LOGGER.info("Global transaction is disabled."); } return; } initClient(); } private void initClient() { //省略部分源代码 //init TM TMClient.init(applicationId, txServiceGroup); if (LOGGER.isInfoEnabled()) { LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } //init RM RMClient.init(applicationId, txServiceGroup); //省略部分源代码 registerSpringShutdownHook(); }
上面的代码有2个部分,TM和RM的初始化,这2个初始化就是连接TC的过程。介绍TM和RM初始化之前,我们再看一下上一讲中我们基于eureka注册中心的微服务架构图:
从图上可以看出,订单中心(order-server)就是一个TM,同时也是一个RM。
TM初始化
我们看一下TM的初始化代码:
public class TMClient { public static void init(String applicationId, String transactionServiceGroup) { TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup); tmNettyRemotingClient.init(); } }
这样,TM客户端就去连seata-server了(TC)。
public void init() { // registry processor registerProcessor(); if (initialized.compareAndSet(false, true)) { super.init(); } }
下面的注册方法一个是向注册了接收响应消息类型,另一个是注册了心跳机制:
private void registerProcessor() { // 1.registry TC response processor ClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler()); super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null); // 2.registry heartbeat message processor ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor(); super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null); }
接着init方法去连接TC,这个逻辑在类AbstractNettyRemotingClient,代码如下:
public void init() { timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { clientChannelManager.reconnect(getTransactionServiceGroup());//这里的transactionServiceGroup是从GlobalTransactionScanner传过来的,值是my_test_tx_group } }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS); if (NettyClientConfig.isEnableClientBatchSendRequest()) {//file.conf中定义enableClientBatchSendRequest = true,看第3部分讲解 mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD)); mergeSendExecutorService.submit(new MergedSendRunnable()); } super.init(); clientBootstrap.start(); }
上面这段代码是一个核心的逻辑,我们分下面3步来解读:
1.我们首先看一下上面的clientChannelManager.reconnect方法,这个方法在一个定时执行器中,会定时去执行。这段代码在NettyClientChannelManager类,
void reconnect(String transactionServiceGroup) { List<String> availList = null; try { availList = getAvailServerList(transactionServiceGroup); } catch (Exception e) { LOGGER.error("Failed to get available servers: {}", e.getMessage(), e); return; } if (CollectionUtils.isEmpty(availList)) { String serviceGroup = RegistryFactory.getInstance() .getServiceGroup(transactionServiceGroup); LOGGER.error("no available service '{}' found, please make sure registry config correct", serviceGroup); return; } for (String serverAddress : availList) { try { acquireChannel(serverAddress); } catch (Exception e) { LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e); } } }
上面的getAvailServerList是通过transactionServiceGroup这个属性(本文属性值是my_test_tx_group)来查找seata-server集群地址列表,看下面的file.conf这段配置就很容易明白了。逻辑就是通过my_test_tx_group拼接出vgroupMapping.my_test_tx_group,然后找到这个属性值(这里是seata-server),然后属性值拼接出seata-server.grouplist,从而查找seata-server集群列表。详细代码就不贴了。
service { #transaction service group mapping vgroupMapping.my_test_tx_group = "seata-server" #only support when registry.type=file, please don't set multiple addresses seata-server.grouplist = "192.168.59.132:8091" #degrade, current not support enableDegrade = false #disable seata disableGlobalTransaction = false }
上面获取的availList(seata-server集群地址列表)如果不空,则调用方法acquireChannel。acquireChannel方法首先判断连接是否存在,不存在,则创建连接:
Channel acquireChannel(String serverAddress) { Channel channelToServer = channels.get(serverAddress); if (channelToServer != null) {//当前地址已经存在连接,直接返回 channelToServer = getExistAliveChannel(channelToServer, serverAddress); if (channelToServer != null) { return channelToServer; } } if (LOGGER.isInfoEnabled()) { LOGGER.info("will connect to " + serverAddress); } channelLocks.putIfAbsent(serverAddress, new Object()); synchronized (channelLocks.get(serverAddress)) { return doConnect(serverAddress); } } private final GenericKeyedObjectPool<NettyPoolKey, Channel> nettyClientKeyPool; private Channel doConnect(String serverAddress) { Channel channelToServer = channels.get(serverAddress); if (channelToServer != null && channelToServer.isActive()) {//当前地址已经存在连接,直接返回 return channelToServer; } Channel channelFromPool; try { NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress); NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey); if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {//TM和RM的初始化流程都要走这段代码,如果是RM,则要set一下ResourceIds,还记得这个吗?看下面RM部分的讲解 RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage(); ((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds()); } channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress)); channels.put(serverAddress, channelFromPool); } catch (Exception exx) { LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx); throw new FrameworkException("can not register RM,err:" + exx.getMessage()); } return channelFromPool; }
上面nettyClientKeyPool.borrowObject方法就是从连接池中获取一个连接,seata在这里使用的连接池是commons-pool,读过源码的人肯定会似曾相识,我这类就不再介绍了,感兴趣的看这篇文章《lettuce连接池很香,撸撸它的源代码》,里面有介绍。
2.接着我们讲一下super.init()方法,这个方法在父类AbstractNettyRemoting,代码如下:
public void init() { timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) { if (entry.getValue().isTimeout()) { futures.remove(entry.getKey()); entry.getValue().setResultMessage(null); if (LOGGER.isDebugEnabled()) { LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody()); } } } nowMills = System.currentTimeMillis(); } }, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS); }
这个方法非常简单,定时任务不断检测消息发送结果,如果是超时,则移除消息,然后把消息结果置为空。
3.最后我们看一下clientBootstrap.start()方法:
public void start() { if (this.defaultEventExecutorGroup == null) {//defaultEventExecutorGroup初始化 this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(), new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()), nettyClientConfig.getClientWorkerThreads())); } this.bootstrap.group(this.eventLoopGroupWorker).channel(//对连接的配置 nettyClientConfig.getClientChannelClazz()).option( ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option( ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option( ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()); if (nettyClientConfig.enableNative()) { if (PlatformDependent.isOsx()) { if (LOGGER.isInfoEnabled()) { LOGGER.info("client run on macOS"); } } else {//非mac系统则配置epoll模式和TCP快速确认机制 bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) .option(EpollChannelOption.TCP_QUICKACK, true); } } bootstrap.handler(//添加handlers new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), nettyClientConfig.getChannelMaxWriteIdleSeconds(), nettyClientConfig.getChannelMaxAllIdleSeconds())) .addLast(new ProtocolV1Decoder()) .addLast(new ProtocolV1Encoder()); if (channelHandlers != null) { addChannelPipelineLast(ch, channelHandlers); } } }); if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) { LOGGER.info("NettyClientBootstrap has started"); } }
上面的代码也比较简单,就是使用本地的配置来初始化netty的bootstrap。这些配置在file.conf这个文件中。
RM初始化
RM的初始化跟TM基本一样,我们从RMClient.init(applicationId, txServiceGroup)方法讲起。
public static void init(String applicationId, String transactionServiceGroup) { RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup); rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get()); rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get()); rmNettyRemotingClient.init(); }
下面是RmNettyRemotingClient中的init方法,跟TM的代码是一样的:
public void init() { // registry processor registerProcessor(); if (initialized.compareAndSet(false, true)) { super.init(); } }
RM注册的响应消息类型比较多,包括分支事务提交、回滚、写undo_log、注册TC响应和向TC注册心跳,跟TM相比,这里的处理更复杂一些,前面的几篇文章我也讲过了seata的AT模式和TCC模式,无论哪种模式,其实RM的处理逻辑都是最多的。
private void registerProcessor() { // 1.registry rm client handle branch commit processor RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this); super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor); // 2.registry rm client handle branch commit processor RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this); super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor); // 3.registry rm handler undo log processor RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler()); super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor); // 4.registry TC response processor ClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler()); super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null); // 5.registry heartbeat message processor ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor(); super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null); }
上面的super.init()就是调用父类AbstractNettyRemotingClient的方法了,跟TmNettyRemotingClient是一个父类,所以不再重复贴代码了。上面留了一个问题就是resourceId,来源依然在RmNettyRemotingClient这个类中,代码如下:
protected Function<String, NettyPoolKey> getPoolKeyFunction() { return (serverAddress) -> { String resourceIds = getMergedResourceKeys(); if (resourceIds != null && LOGGER.isInfoEnabled()) { LOGGER.info("RM will register :{}", resourceIds); } RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup); message.setResourceIds(resourceIds); return new NettyPoolKey(NettyPoolKey.TransactionRole.RMROLE, serverAddress, message); }; } public String getMergedResourceKeys() { Map<String, Resource> managedResources = resourceManager.getManagedResources(); Set<String> resourceIds = managedResources.keySet(); if (!resourceIds.isEmpty()) { StringBuilder sb = new StringBuilder(); boolean first = true; for (String resourceId : resource Ids) { if (first) { first = false; } else { sb.append(DBKEYS_SPLIT_CHAR); } sb.append(resourceId); } return sb.toString(); } return null; }
在TCC模式下,跟踪代码后,发现有一段代码给ResourceId赋值了,赋值的代码正是2阶段提交的事务注解,比如:
@TwoPhaseBusinessAction(name = "storageApi", commitMethod = "commit", rollbackMethod = "rollback")
赋值的代码如下,可以看到TCC模式下resourceId其实就是2阶段提交的注解中的name值,其他模式暂时不做研究了,这段代码在DefaultRemotingParser类:
public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName, RemotingParser remotingParser) { RemotingDesc remotingBeanDesc = remotingParser.getServiceDesc(bean, beanName);//这儿获得的Bean是在接口上有@LocalTCC注解的 if (remotingBeanDesc == null) { return null; } remotingServiceMap.put(beanName, remotingBeanDesc); Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass(); Method[] methods = interfaceClass.getMethods(); if (remotingParser.isService(bean, beanName)) { try { //service bean, registry resource Object targetBean = remotingBeanDesc.getTargetBean(); for (Method m : methods) { TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class); if (twoPhaseBusinessAction != null) { TCCResource tccResource = new TCCResource(); tccResource.setActionName(twoPhaseBusinessAction.name());//resourceId是在这儿赋值的 //省略部分代码 DefaultResourceManager.get().registerResource(tccResource); } } } catch (Throwable t) { throw new FrameworkException(t, "parser remoting service error"); } } //省略部分代码 return remotingBeanDesc; }
剩下的流程,RM和TM就一样了。
批量发送请求
在前面的TM初始化流程中,AbstractNettyRemotingClient类init方法有一段批量提交的代码,如下:
if (NettyClientConfig.isEnableClientBatchSendRequest()) {//file.conf中定义enableClientBatchSendRequest = true,看第3部分讲解 mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD)); mergeSendExecutorService.submit(new MergedSendRunnable()); }
因为配置文件中定义的是允许批量提交的,所以这段代码会走进去,这段代码就是定义了一个只有一个线程的线程池,执行批量提交请求的线程:
private class MergedSendRunnable implements Runnable { @Override public void run() { while (true) { synchronized (mergeLock) { try { mergeLock.wait(MAX_MERGE_SEND_MILLS); } catch (InterruptedException e) { } } isSending = true; for (String address : basketMap.keySet()) { BlockingQueue<RpcMessage> basket = basketMap.get(address); if (basket.isEmpty()) { continue; } MergedWarpMessage mergeMessage = new MergedWarpMessage(); while (!basket.isEmpty()) { RpcMessage msg = basket.poll(); mergeMessage.msgs.add((AbstractMessage) msg.getBody()); mergeMessage.msgIds.add(msg.getId()); } if (mergeMessage.msgIds.size() > 1) { printMergeMessageLog(mergeMessage); } Channel sendChannel = null; try { // send batch message is sync request, but there is no need to get the return value. // Since the messageFuture has been created before the message is placed in basketMap, // the return value will be obtained in ClientOnResponseProcessor. sendChannel = clientChannelManager.acquireChannel(address); AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage); } catch (FrameworkException e) { if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) { destroyChannel(address, sendChannel); } // fast fail for (Integer msgId : mergeMessage.msgIds) { MessageFuture messageFuture = futures.remove(msgId); if (messageFuture != null) { messageFuture.setResultMessage(null); } } LOGGER.error("client merge call failed: {}", e.getMessage(), e); } } isSending = false; } } //省略printMergeMessageLog方法代码 }
上面的流程非常简单,在一个周期性的无线循环中,把basketMap中的消息队列取出来,把每个队列的消息都放到mergeMessage中,最后把mergeMessage发送出去,可以看到,就是一个批量发送。
总结
seata中TM和RM的初始化流程基本相同,不一样的有2点:注册响应消息类型、RM中需要增加ResourceId和分支事务handler。
TM和RM客户端使用netty跟TC(seata-server)建立连接,而连接池使用的是commons-pool的连接池。