java – 在一个全局事务的范围内使用JTA同时向不同的数据源调用几个查询

栏目: Java · 发布时间: 5年前

内容简介:翻译自:https://stackoverflow.com/questions/18847256/invoking-few-queries-to-different-data-sources-concurrently-using-jta-in-scope

我有一个带有3个分布式dataSources的应用程序(com.atomikos.jdbc.AtomikosDataSourceBean).我正在使用Atomikos事务管理器作为JTA实现.每个dataSource都与PostgreSQL数据库一起使用.

现在,我正在调用我对每个dataSource的查询,一切正常.

我想知道,如果有可能,使用JTA,并行调用我的查询(多线程,并发)?

我尝试使用jdbcTemplate(Spring)简单地在新创建的线程中调用查询.首先,我遇到了一个春天问题. Spring将事务上下文存储在ThreadLocal字段中,因此在我的新线程( Spring transaction manager and multithreading )中无法正确解析.我已经通过将相同的事务上下文设置为新创建的线程的ThreadLocal来解决了这个问题.

但是我在Atomikos代码中面临同样的问题.它们还将CompositeTransactionImp存储在线程范围映射(BaseTrancationManager#getCurrentTx)中.但在Atomikos案例中,不可能为新线程设置值.

所以我不能同时执行我的查询,因为似乎Atomikos不支持这种方法.

但我也查看了JTA规范并发现了以下内容:“多个线程可能同时与同一个全局事务关联.” (“3.2 TransactionManager Interface”, http://download.oracle.com/otndocs/jcp/jta-1.1-spec-oth-JSpec/?submit=Download )

问题:如何在一个全局事务的范围内使用JTA(2阶段提交)同时向不同的dataSource调用两个或多个查询?

在tomcat上下文中配置DataSources:

<Resource name="jdbc/db1" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
          factory="com.company.package.AtomikosDataSourceBeanFactory"
          xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
          xaProperties.serverName="localhost"
          xaProperties.portNumber="5451"
          xaProperties.databaseName="db1"
          uniqueResourceName="jdbc/db1"
          xaProperties.user="secretpassword"
          xaProperties.password="secretpassword"
          minPoolSize="5"
          maxPoolSize="10"
          testQuery="SELECT 1"  />

<Resource name="jdbc/db2" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
          factory="com.company.package.AtomikosDataSourceBeanFactory"
          xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
          xaProperties.serverName="localhost"
          xaProperties.portNumber="5451"
          xaProperties.databaseName="db2"
          uniqueResourceName="jdbc/db2"
          xaProperties.user="secretpassword"
          xaProperties.password="secretpassword"
          minPoolSize="5"
          maxPoolSize="10"
          testQuery="SELECT 1"  />

<Resource name="jdbc/db3" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
          factory="com.company.package.AtomikosDataSourceBeanFactory"
          xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
          xaProperties.serverName="localhost"
          xaProperties.portNumber="5451"
          xaProperties.databaseName="db3"
          uniqueResourceName="jdbc/db3"
          xaProperties.user="secretpassword"
          xaProperties.password="secretpassword"
          minPoolSize="5"
          maxPoolSize="10"
          testQuery="SELECT 1"  />

Spring环境中的事务管理器配置:

<bean id="transactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
  init-method="init" destroy-method="close" lazy-init="true">
  <property name="forceShutdown" value="false" />
 </bean>

码:

final SqlParameterSource parameters = getSqlParameterSourceCreator().convert(entity);

    // Solving Spring's ThreadLocal issue: saving thread local params
    final Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
    final List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
    final boolean actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive();
    final String currentTransactionName = TransactionSynchronizationManager.getCurrentTransactionName();
    final AtomicReference<Throwable> exceptionHolder = new AtomicReference<Throwable>();

    // Running query in a separate thread.
    final Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                // Solving Spring's ThreadLocal issue: setting thread local values to newly created thread.
                for (Map.Entry<Object, Object> entry : resourceMap.entrySet()) {
                    TransactionSynchronizationManager.bindResource(entry.getKey(), entry.getValue());
                }
                if (synchronizations != null && !synchronizations.isEmpty()) {
                    TransactionSynchronizationManager.initSynchronization();
                    for (TransactionSynchronization synchronization : synchronizations) {
                        TransactionSynchronizationManager.registerSynchronization(synchronization);
                    }
                }
                TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
                TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);

                // Executing query.
                final String query = "insert into ...";
                NamedParameterJdbcTemplate template = new NamedParameterJdbcTemplate(dataSourceOne);

                template.update(query, parameters);
            } catch (final Throwable ex) {
                exceptionHolder.set(ex);
            }
        }
    });
    thread.start();

    // ... same code as above for other dataSources.

    // allThreds.join(); - joining to all threads.

我认为你的想法是解决TransactionSynchronizationManager必须使用单线程事务这一事实很有意思,但可能很危险.

在TransactionSynchronizationManager中,事务资源存储在ThreadLocal Map中,其中密钥是资源工厂,我想知道当您使用相同的资源工厂执行此多变线程的解决方法时会追加什么 – 它可能不适用于您的因为你有3个数据源 – . (乍一看,我会说你的一个交易资源将被另一个替换,但也许我错过了一些东西……).

无论如何,我认为你可以尝试使用 javax.transaction.TransactionManager.resume() 来实现你想要做的事情.

这个想法是直接使用JTA api,因此绕过单线程Spring事务支持.

这里有一些代码来说明我的想法:

@Autowired
JtaTransactionManager txManager;  //from Spring

javax.transaction.TransactionManager jtaTransactionManager;

public void parallelInserts() {
    jtaTransactionManager = txManager.getTransactionManager();  //we are getting the underlying implementation
    jtaTransactionManager.begin();
    final Transaction jtaTransaction  = jtaTransactionManager.getTransaction();
    try {
      Thread t1 = new Thread(){
        @Override
        public void run() {
            try {
                jtaTransactionManager.resume(jtaTransaction);
                //... do the insert
            } catch (InvalidTransactionException e) {
                try {
                    jtaTransaction.setRollbackOnly();
                } catch (SystemException e1) {
                    e1.printStackTrace();
                }
                e.printStackTrace();
            } catch (SystemException e) {
                e.printStackTrace();
            }
        }
      };
      t1.start();
      //same with t2 and t3
    } catch (Exception ex) {
        jtaTransactionManager.setRollbackOnly();
        throw ex;
    }
    //join threads and commit
    jtaTransactionManager.commit();
}

我认为这个解决方案可能有用(我必须说我没有尝试过).我现在看到的唯一限制是你不能重新使用线程,因为resume()调用没有反向部分,第二次调用resume()你可能会有一个IllegalStateException.

翻译自:https://stackoverflow.com/questions/18847256/invoking-few-queries-to-different-data-sources-concurrently-using-jta-in-scope


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

统计思维

统计思维

[美] Allen B. Downey / 金迎 / 人民邮电出版社 / 2015-9 / 49.00元

现实工作中,人们常常需要用数据说话。可是,数据自己不会说话,需要人对它进行分析和挖掘才能找到有价值的信息。概率统计是数据分析的通用语言,是大数据时代预测未来的根基。如果你有编程背景,就能以概率和统计学为工具,将数据转化为有用的信息和知识,让数据说话。本书介绍了如何借助计算而非数学方法,使用Python语言对数据进行统计分析。 通过书中有趣的案例,你可以学到探索性数据分析的整个过程,从数据收集......一起来看看 《统计思维》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具