seata at模式工作原理

    这个项目调试起来还是挺麻烦的,因为你首先需要启动注册中心和seata管理服务,并且需要三个客户端来模拟分布式事务的进行,项目结构如下(盗图一张)


        tc:seata提供的事务管理中心。

        tm:事务的发起者,并且最终与tc通信告诉事务的成功与否

        rm:单个的微服务,也就是最终的资源管理者

        实际上这张图不是很恰当,一般的业务流程中并不会有一个单独的business tm出现,实际上比较贴合的流程应该是在创建order的时候,会同时发起事务,并且最终会通过其它服务的返回信息,来决定提交来回滚,所以order服务应该即是tm又是rm,但是我又不会画图,那就这么着吧。

        使用at模式,我们除了配置seata相关参数,数据源代理之外。只需要在需要进行事务控制的方法上加上@GlobalTranscation注解就可以了,那么看一下这个注解的代理方法,对于它的处理,在GlobalTransactionScanner这里做了处理,我们来重点看一下这个类。

        首先看一下它的继承关系,

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
    implements InitializingBean, ApplicationContextAware,
    DisposableBean

        它分别继承了AbstractAutoPrxoyCreator用来实现aop,并且继承了InitializingBean, ApplicationContextAware, DisposableBean来在spring bean的钩子函数中进行一些初始化的处理。

        方法initClient在afterPropertiesSet钩子函数中被调用到

private void initClient() {
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Initializing Global Transaction Clients ... ");
    }
    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
        throw new IllegalArgumentException(
            "applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);
    }
    //这里的初始化是通过注册中心获取到 tm的地址,并且初始化tcp连接
    TMClient.init(applicationId, txServiceGroup);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info(
            "Transaction Manager Client is initialized. applicationId[" + applicationId + "] txServiceGroup["
                + txServiceGroup + "]");
    }
    // 同上
    RMClient.init(applicationId, txServiceGroup);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info(
            "Resource Manager is initialized. applicationId[" + applicationId + "] txServiceGroup[" + txServiceGroup
                + "]");
    }

    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Global Transaction Clients are initialized. ");
    }
    registerSpringShutdownHook();

}

        再来看一下重写的wrapIfNecessary方法

@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    if (disableGlobalTransaction) {
        return bean;
    }
    try {
        synchronized (PROXYED_SET) {
            if (PROXYED_SET.contains(beanName)) {
                return bean;
            }
            interceptor = null;
            // 使用tcc模式
            if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
            } else {
                Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                if (!existsAnnotation(new Class[]{serviceInterface})
                    && !existsAnnotation(interfacesIfJdk)) {
                    return bean;
                }

                if (interceptor == null) {
                    interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                }
            }

            LOGGER.info(
                "Bean[" + bean.getClass().getName() + "] with name [" + beanName + "] would use interceptor ["
                    + interceptor.getClass().getName() + "]");
            if (!AopUtils.isAopProxy(bean)) {
                // 执行父类的方法
                bean = super.wrapIfNecessary(bean, beanName, cacheKey);
            } else {
                // 获取这个beanadvised
                AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                // 把advised包装成advisor
                Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                for (Advisor avr : advisor) {
                    advised.addAdvisor(0, avr);
                }
            }
            PROXYED_SET.add(beanName);
            return bean;
        }
    } catch (Exception exx) {
        throw new RuntimeException(exx);
    }
}

        在postProcessBeforeInitialization中,对于dataSource进行处理

public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
    // 如果bean是datasource且不是seata代理过得
    if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Auto proxy of  [" + beanName + "]");
        }
        DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean);
        // 通过cglib进行增强
        return Enhancer.create(bean.getClass(), (org.springframework.cglib.proxy.MethodInterceptor) (o, method, args, methodProxy) -> {
            Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
            if (null != m) {
                return m.invoke(dataSourceProxy, args);
            } else {
                boolean oldAccessible = method.isAccessible();
                try {
                    method.setAccessible(true);
                    return method.invoke(bean, args);
                } finally {
                    //recover the original accessible for security reason
                    method.setAccessible(oldAccessible);
                }
            }
        });
    }
    return bean;
}

        其实这一段大部分都是spring的知识,为什么这里要说一遍,因为看的太久已经忘了,所以复习一下。真正干活的其实还是GlobalTransactionalInterceptor,那我们再来分析一下它

@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
    final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

    final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
    final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
    if (globalTransactionalAnnotation != null) {
        return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
    } else if (globalLockAnnotation != null) {
        return handleGlobalLock(methodInvocation);
    } else {
        return methodInvocation.proceed();
    }
}

        invoke方法分别根据注解的不同来进行逻辑处理,我们重点看handleGlobalTranscation方法的执行过程

private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
                                       final GlobalTransactional globalTrxAnno) throws Throwable {
    try {
        1.transactionalTemplate execute 传递进来的executor
        return transactionalTemplate.execute(new TransactionalExecutor() {
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }

            public String name() {
                String name = globalTrxAnno.name();
                if (!StringUtils.isNullOrEmpty(name)) {
                    return name;
                }
                return formatMethod(methodInvocation.getMethod());
            }

            // 获取transaction的info
            @Override
            public TransactionInfo getTransactionInfo() {
                TransactionInfo transactionInfo = new TransactionInfo();
                transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
                transactionInfo.setName(name());
                Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                    rollbackRules.add(new RollbackRule(rbRule));
                }
                for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                    rollbackRules.add(new RollbackRule(rbRule));
                }
                for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                    rollbackRules.add(new NoRollbackRule(rbRule));
                }
                for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                    rollbackRules.add(new NoRollbackRule(rbRule));
                }
                transactionInfo.setRollbackRules(rollbackRules);
                return transactionInfo;
            }
        });
    2 分类对于事务错误进行解决
    } catch (TransactionalExecutor.ExecutionException e) {
        TransactionalExecutor.Code code = e.getCode();
        switch (code) {
            case RollbackDone:
                throw e.getOriginalException();
            case BeginFailure:
                failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            case CommitFailure:
                failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            case RollbackFailure:
                failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            default:
                throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);

        }
    }
}

        1 处其实真正干活还是它来干的

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1.1新建一个事务
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

    // 1.2 获取事务信息
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    try {

        // 1.3 开始事务
        beginTransaction(txInfo, tx);

        Object rs = null;
        try {

            // business 执行
            rs = business.execute();

        } catch (Throwable ex) {

            // 1.4 异常处理 进行回滚等操作
            completeTransactionAfterThrowing(txInfo,tx,ex);
            throw ex;
        }

        // 提交事务
        commitTransaction(tx);

        return rs;
    } finally {
        // 执行钩子函数
        triggerAfterCompletion();
        cleanUp();
    }
}

1.1 处会新建一个GlobalTranscation,此时xid还未生成,为null

DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {
    this.transactionManager = TransactionManagerHolder.get();
    this.xid = xid;
    this.status = status;
    this.role = role;
}

1.2 会根据注解来获取事务的一些配置信息,回滚规则,超时时间等等

public TransactionInfo getTransactionInfo() {
    TransactionInfo transactionInfo = new TransactionInfo();
    transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
    transactionInfo.setName(name());
    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
    for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
        rollbackRules.add(new RollbackRule(rbRule));
    }
    for (String rbRule : globalTrxAnno.rollbackForClassName()) {
        rollbackRules.add(new RollbackRule(rbRule));
    }
    for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
        rollbackRules.add(new NoRollbackRule(rbRule));
    }
    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
        rollbackRules.add(new NoRollbackRule(rbRule));
    }
    transactionInfo.setRollbackRules(rollbackRules);
    return transactionInfo;
}

1.3 事务的发起

public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    // 新建一个事务开始的request
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    // 获取返回值
    GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
    if (response.getResultCode() == ResultCode.Failed) {
        throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    }
    // 返回一个xid
    return response.getXid();
}

1.4 事务的回滚

private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable ex) throws TransactionalExecutor.ExecutionException {
    //如果没有事务信息而且回滚信息匹配
    if (txInfo != null && txInfo.rollbackOn(ex)) {
        try {
            // 执行回滚操作
            rollbackTransaction(tx, ex);
        } catch (TransactionException txe) {
            // 这里回滚操作也有可能失败 在配置中可以配置事务回滚的次数
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.RollbackFailure, ex);
        }
    } else {
        // 提交
        commitTransaction(tx);
    }
}