Seata
是阿里近期开源的分布式事务框架,地址: github.com/seata/seata 。框架包括了集团的 TXC
(云版本叫 GTS
)和蚂蚁金服的 TCC
两种模式,短短数月 Github
上的 star
数已经接近一万,算是目前唯一有大厂背书的分布式事务解决方案。
TXC
在 Seata
中又叫 AT
模式,意为补偿方法是框架自动生成的,对用户完全屏蔽,用户可以向使用本地事务那样使用分布式事务,缺点是仅支持关系型数据库(目前支持 MySQL
),引入 Seata AT
的服务需要本地建表存储 rollback_info
,隔离级别默认 RU
适用场景有限。
TCC
不算是新概念,很早就有了,用户通过定义 try/confirm/cancel
三个方法在应用层面模拟两阶段提交,区别在于 TCC 中 try
方法也需要操作数据库进行资源锁定,后续两个补偿方法由框架自动调用,分别进行资源提交和回滚,这点同单纯的存储层 2PC
不太一样。蚂蚁金服向 Seata
贡献了自己的 TCC
实现,据说已经演化了十多年,大量应用在在金融、交易、仓储等领域。
分布式事务的诞生背景
早期应用都是单一架构,例如支付服务涉及到的账户、金额、订单系统等都由单一应用负责,底层访问同一个数据库实例,自然事务操作也是本地事务,借助 Spring
可以轻松实现;但是由于量级越来越大,单一服务需要进行职责拆分变为三个独立的服务,通过 RPC
进行调用,数据也存在不同的数据库实例中,由于这时一次业务操作涉及对多个数据库数据的修改,无法再依靠本地事务,只能通过分布式事务框架来解决。
TCC 就是分布式事务的一种解决方案,属于柔性补偿型,优点在于理解简单、仅 try
阶段加锁并发性能较好,缺点在于代码改造成本。
什么是 TCC 本文就不再赘述了,TCC 的概念本身并不复杂
Seata TCC 使用方法
在分析源码之前,我们先简要提及下 Seata TCC
模式的使用方法,有助于后续理解整个 TCC
流程。
Seata TCC 参与方
Seata
中的 TCC
模式要求 TCC
服务的参与方在接口上增加 @TwoPhaseBusinessAction
注解,注明 TCC
接口的名称(全局唯一), TCC
接口的 confirm
和 cancel
方法的名称,用于后续框架反射调用,下面是一个 TCC
接口的案例:
public interface TccAction { @TwoPhaseBusinessAction(name = "yourTccActionName", commitMethod = "confirm", rollbackMethod = "cancel") public boolean try(BusinessActionContext businessActionContext, int a, int b); public boolean confirm(BusinessActionContext businessActionContext); public boolean cancel(BusinessActionContext businessActionContext); } 复制代码
紧接着定义实现类 Impl
实现这个接口,为三个方法提供具体实现。最后将参与方服务进行发布,注册到远端,主要为了后续能让 Seata
框架调用到参与方的 confirm
或者 cancel
方法闭环整个 TCC
事务。
Seata TCC 发起方
Seata TCC
的发起方类似于我们上图中的 payment service
,参与方需要在业务方法上增加 @GlobalTransactional
注解,用于开启切面注册全局事务,业务方法中调用 TCC
参与方的若干 try
方法,一旦业务方法调用成功, Seata
框架会通知 TC
回调这些参与方的 confirm
和 cancel
方法。
源码分析
Seata
中 TCC
模式的源码并不复杂,主要集中于:
module | class | 功能 |
---|---|---|
seata-spring | GlobalTransactionalInterceptor.class | 全局事务切面逻辑,包括注册全局事务,拿到 xid |
seata-spring | TccActionInterceptor.class | TCC 参与方切面逻辑 |
seata-tcc | TCCResourceManager.class | 解析 TCC Bean,保存 TCC Resources,便于后续回调 |
seata-tcc | ActionInterceptorHandler.class | TCC 分支事务注册实现 |
seata-server | DefaultCoordinator.class、FileTransactionStoreManager.class | 主要是 TC 的实现、事务存储等实现 |
注册 TCC Resources
Seata
中一个 TCC
接口被称作一个 TCC Resources
,其结构如下:
public class TCCResource implements Resource { private String resourceGroupId = "DEFAULT"; private String appName; private String actionName; // TCC 接口名称 private Object targetBean; // TCC Bean private Method prepareMethod; // try 方法 private String commitMethodName; private Method commitMethod; // confirm 方法 private String rollbackMethodName; private Method rollbackMethod; // cancel 方法 // …… 省略 } 复制代码
Seata
解析到应用中存在 TCC Bean
,则通过 parserRemotingServiceInfo
方法生成一个 TCCResource
对象,进而调用 TCCResourceManager
类的 registerResource
方法,将 TCCResource
对象保存到本地的 tccResourceCache
中,它是一个 ConcurrentHashMap
结构,同时通过 RmRpcClient
将该 TCCResource
的 resourceId
、 address
等信息注册到服务端,便于后续 TC
通过 RPC
回调到正确的地址。
// 解析 TCCResource 的部分代码 Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass(); Method[] methods = interfaceClass.getMethods(); if(isService(bean, beanName)){ try { // 如果是 TCC service Bean,解析并注册该 resource Object targetBean = remotingBeanDesc.getTargetBean(); for(Method m : methods){ TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class); if(twoPhaseBusinessAction != null){ // 如果有 TCC 参与方注解,定义一个 TCCResource, TCCResource tccResource = new TCCResource(); tccResource.setActionName(twoPhaseBusinessAction.name()); // TCC Bean tccResource.setTargetBean(targetBean); // try 方法 tccResource.setPrepareMethod(m); // confirm 方法名称 tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod()); // confirm 方法对象 tccResource.setCommitMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(), new Class[]{BusinessActionContext.class})); // cancel 方法名称 tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod()); // cancel 方法对象 tccResource.setRollbackMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(), new Class[]{BusinessActionContext.class})); // 调用到 TCCResourceManager 的 registerResource 方法 DefaultResourceManager.get().registerResource(tccResource); } } }catch (Throwable t){ throw new FrameworkException(t, "parser remting service error"); } } 复制代码
我们看一下 TCCResourceManager
的 registerResource
方法的实现:
// 内存中保存的 resourceId 和 TCCResource 的映射关系 private Map<String, Resource> tccResourceCache = new ConcurrentHashMap<String, Resource>(); @Override public void registerResource(Resource resource) { TCCResource tccResource = (TCCResource) resource; tccResourceCache.put(tccResource.getResourceId(), tccResource); // 调用父类的方法通过 RPC 注册到远端 super.registerResource(tccResource); } 复制代码
我们看下 TCCResource
是如何注册到服务端的:
public void registerResource(Resource resource) { // 拿到 RmRpcClient 实例,调用其 registerResource 方法 RmRpcClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId()); } public void registerResource(String resourceGroupId, String resourceId) { if (LOGGER.isInfoEnabled()) { LOGGER.info("register to RM resourceId:" + resourceId); } synchronized (channels) { for (Map.Entry<String, Channel> entry : channels.entrySet()) { String serverAddress = entry.getKey(); Channel rmChannel = entry.getValue(); if (LOGGER.isInfoEnabled()) { LOGGER.info("register resource, resourceId:" + resourceId); } // 注册 resourceId,远端将其解析为一个 RpcContext 保存在内存中 sendRegisterMessage(serverAddress, rmChannel, resourceId); } } } 复制代码
GlobalTransaction 注册全局事务
GlobalTransaction
注解是全局事务的入口,其切面逻辑实现在 GlobalTransactionalInterceptor
类中。如果判断进入 @GlobalTransaction
修饰的方法,会调用 handleGlobalTransaction
方法进入切面逻辑,其中关键方法是 transactionalTemplate
的 execute
方法。
public Object execute(TransactionalExecutor business) throws Throwable { // 如果上游已经有 xid 传过来说明自己是下游,直接参与到这个全局事务中就可以,不必新开一个,角色是 Participant // 如果上游没有 xid 传递过来,说明自己是发起方,新开启一个全局事务,角色是 Launcher GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); // …… …… 省略 try { // 开启全局事务 beginTransaction(txInfo, tx); Object rs = null; try { // 调用业务方法 rs = business.execute(); } catch (Throwable ex) { // 如果抛异常,通知 TC 回滚全局事务 completeTransactionAfterThrowing(txInfo,tx,ex); throw ex; } // 如果不抛异常,通知 TC 提交全局事务 commitTransaction(tx); return rs; } // …… …… 省略 } 复制代码
beginTransaction
方法调用了 transactionManager
的 begin
方法:
// 客户端 @Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { GlobalBeginRequest request = new GlobalBeginRequest(); request.setTransactionName(name); request.setTimeout(timeout); // 发送 RPC,获取 TC 下发的 xid GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request); return response.getXid(); } // 服务端 @Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { // 全局事务用 GlobalSession 来表示 GlobalSession session = GlobalSession.createGlobalSession( applicationId, transactionServiceGroup, name, timeout); session.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); // 将 GlobalSession 写入文件存储 session.begin(); // 返回 UUID 作为全局事务 ID return XID.generateXID(session.getTransactionId()); } 复制代码
TwoPhaseBusinessAction 注册分支事务
全局事务调用业务方法时,会进入 TCC
参与方的切面逻辑,主要实现在 TccActionInterceptor
类中,关键方法是 actionInterceptorHandler
的 proceed
方法。
public Map<String, Object> proceed(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, Callback<Object> targetCallback) throws Throwable { // …… …… 省略 // 创建分支事务 String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext); actionContext.setBranchId(branchId); // 记录方法参数 Class<?>[] types = method.getParameterTypes(); int argIndex = 0; for (Class<?> cls : types) { if (cls.getName().equals(BusinessActionContext.class.getName())) { arguments[argIndex] = actionContext; break; } argIndex++; } // …… …… 省略 } 复制代码
doTccActionLogStore
方法负责注册分支事务:
// 客户端 protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, BusinessActionContext actionContext) { String actionName = actionContext.getActionName(); // 拿到全局事务 ID String xid = actionContext.getXid(); // …… …… 省略 try { // resourceManager 通过 RPC 向 TC 注册分支事务 Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid, applicationContextStr, null); // 拿到 TC 返回的分支事务 ID return String.valueOf(branchId); } // …… …… 省略 } // 服务端 @Override public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException { GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin); // 分支事务用 BranchSession 表示,新建一个 BranchSession BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId, applicationData, lockKeys, clientId); if (!branchSession.lock()) { throw new TransactionException(LockKeyConflict); } try { // 将分支事务加入全局事务中,也会写文件 globalSession.addBranch(branchSession); } catch (RuntimeException ex) { throw new TransactionException(FailedToAddBranch); } // 返回分支事务 ID return branchSession.getBranchId(); } 复制代码
TC 回调参与方补偿方法
分支事务注册完毕,业务方法调用成功则通知 TC
提交全局事务。
@Override public void commit() throws TransactionException { // 如果是参与者,无需发起提交请求 if (role == GlobalTransactionRole.Participant) { return; } // 由 TM 向 TC 发出提交全局事务的请求 status = transactionManager.commit(xid); } 复制代码
TC
收到客户端 TM
的 commit
请求后:
@Override public GlobalStatus commit(String xid) throws TransactionException { // 根据 xid 找出 GlobalSession GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid)); if (globalSession == null) { return GlobalStatus.Finished; } GlobalStatus status = globalSession.getStatus(); // 关闭这个 GlobalSession,不让后续的分支事务再注册上来 globalSession.closeAndClean(); if (status == GlobalStatus.Begin) { // 修改状态为提交进行中 globalSession.changeStatus(GlobalStatus.Committing); // 一旦分支事务中存在 TCC,做同步提交,其实 TCC 分支也可以异步提交,要求高性能时可以选择异步 if (globalSession.canBeCommittedAsync()) { asyncCommit(globalSession); } else { doGlobalCommit(globalSession, false); } } return globalSession.getStatus(); } 复制代码
doGlobalCommit
是我们关注的关键方法,我们忽略其中的次要逻辑:
@Override public void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException { for (BranchSession branchSession : globalSession.getSortedBranches()) { // …… …… 省略 try { // 调用 DefaultCoordinator 的 branchCommit 方法做分支提交 // 参数有分支事务 id,resourceId 用来寻找对应的 TCCResource 和补偿方法参数信息 BranchStatus branchStatus = resourceManagerInbound.branchCommit(branchSession.getBranchType(), XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(), branchSession.getResourceId(), branchSession.getApplicationData()); } } // …… …… 省略 } 复制代码
服务端的 DefaultCoordinator
类中的 branchCommit
方法发出 RPC
请求,调用对应 TCCResource
提供方:
@Override public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { // …… …… 省略 // 获取全局事务和分支事务 GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid)); BranchSession branchSession = globalSession.getBranch(branchId); // 根据 resourceId 找到对应的 channel 和 RpcContext BranchCommitResponse response = (BranchCommitResponse)messageSender.sendSyncRequest(resourceId, branchSession.getClientId(), request); // 返回分支事务提交状态 return response.getBranchStatus(); // …… …… 省略 } 复制代码
客户端自然是接收到分支提交的 RPC
请求,然后本地找出之前解析并保持下来的 TCCResource
进行补偿方法的反射调用,下面我们截取其中的关键步骤进行分析。
@Override public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { // 根据 resourceId 找出内存中保留的 TCCResource 对象 TCCResource tccResource = (TCCResource) tccResourceCache.get(resourceId); if(tccResource == null){ throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId); } // 获取 targetBean 和相应的 method 对象 Object targetTCCBean = tccResource.getTargetBean(); Method commitMethod = tccResource.getCommitMethod(); try { boolean result = false; // 取出补偿方法参数信息 BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, applicationData); // 反射调用补偿方法 Object ret = commitMethod.invoke(targetTCCBean, businessActionContext); // 返回状态 return result ? BranchStatus.PhaseTwo_Committed:BranchStatus.PhaseTwo_CommitFailed_Retryable; } // …… …… 省略 } 复制代码
事务存储
关于 Seata TC 模块如何进行事务存储,网上有的文章已经讲得很详细,例如 深度剖析一站式分布式事务方案 Seata-Server ,因此这里不再赘述。
需要提及的一点是, TC
有可能成为整个分布式事务服务的性能瓶颈,因此如何做到 高性能
和 高可用
很重要,目前的存储方式是 File
,代码中也有关于 DB Store Mode
的 TODO
项,文件相比于 DB
性能肯定好一些但是可用性会差一点,这块怎么保证要等到后续 HA Cluster
发布之后再看。
总结
整个 Seata
框架中关于 TCC
部分的源码并不复杂,本文只选取了部分类中的关键代码进行展示,忽略了一些判断逻辑和异常处理,笔者认为 Seata TCC
中关于 TCC
异常的封装和自定义处理、还有各种用户扩展埋点的设计也值得一看。
蚂蚁 SOFA Channel
之前做过一个关于 Seata TCC
Seata TCC 分享 的讲解里也提到, TCC
框架的难点不在于本身,而在于如何写好一个 TCC
接口,如果对这部分内容感兴趣,可以点击链接进行详细了解。
写在最后
这是一个不定时更新的、披着 程序员 外衣的文青小号。既分享极客技术,也记录人间烟火。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Spark 源码系列(五)分布式缓存
- 每日一博 | Redission 分布式锁源码解析
- 从源码分析基于Redis的分布式锁
- 分布式消息队列 RocketMQ源码解析:Message存储
- 想通关分布式系统「限流问题」?来一篇源码实战
- 从源码角度看 Redis 分布式锁的 3 种实现方案!
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。