Saturday, November 3, 2012

Distributed Transactions with WSO2 ESB

A transaction is a set of operations that executed as a single unit. When it comes to distributed transactions, it involves two or more networked computers, often using multiple databases. 

It's required to have transaction manager to handle these  distributed transactions. WSO2 carbon platform has integrated the "Atomikos" transaction manager which is a implementation of Java Transaction API (JTA). Also some products like WSO2DSS, WSO2ESB shipped this transaction manager by default and hence you don't need to provide an external JTA provider to use distributed transactions inside these products. 

WSO2ESB contains a synapse mediator called 'Transaction Mediator' which support the distributed transactions using Java Transaction API. In this post we are going to write some sample proxy service that uses the Transaction Mediator and inbuilt transaction manager that comes with WSO2 ESB to handle distributed transaction.

Sample Scenario
The scenario going to use in this sample is same as the scenario used in WSO2 ESB Transaction Mediator Sample (Sample describes above has used JBoss Application Server support to create XA data-sources and also transaction manager has been used the one provided from JBoss). But in this sample we are going to use the in built transaction manager that comes with WSO2 ESB and also XA data-sources are going to create using the Carbon data-source feature without using JBoss Application server.

Use the following scenario to show how the Transaction Mediator works. Assume we have a record in one database and we want to delete that record from the first database and add it to the second database (these two databases can be run on the same server or they can be in two remote servers). The database tables are defined in such a way that the same entry cannot be added twice. So, in the successful scenario, the record will be deleted from the first table (of the first database) and will be added to the second table (of the second database). In a failure scenario (the record is already in the second database), no record will be deleted from first table and no record will be added into the second database.

Step 1:
Create the sample Database and tables using MySQL server.

Create database DB1;
CREATE table company_x(name varchar(10) primary key, id varchar(10), price double);
INSERT into company_x values ('IBM','c1',0.0);
INSERT into company_x values ('SUN','c2',0.0);

Create database DB2;
CREATE table company_x(name varchar(10) primary key, id varchar(10), price double);
INSERT into company_x values ('SUN','c2',0.0);
INSERT into company_x values ('MSFT','c3',0.0);

Step 2:
(i) Add the mysql jdbc driver jar to {ESB_HOME}/repository/components/lib.

(ii) Create two XA data-sources for above two tables. For that, add the following xml configuration to master-datasources.xml file located in {ESB_HOME}/repository/conf/datasources/ directory. (These data-sources can be created using ESB UI as well rather using master-datasources.xml config file). In this data-sources, note that we have used the data-source provider class that provided from Atomikos transaction manager "com.atomikos.jdbc.AtomikosDataSourceBean". Other than that, you can provide the XA data-source properties specific to driver under <dataSourceProps>. Restart the server after modifying master-datasources.xml.  
<datasource>
    <name>DS1</name>
    <jndiConfig>
        <name>DS1</name>
    </jndiConfig>
 <definition type="RDBMS">
 <configuration>
    <dataSourceClassName>com.atomikos.jdbc.AtomikosDataSourceBean
</dataSourceClassName>
    <dataSourceProps>
       <property name="xaDataSourceClassName">com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
</property>
       <property name="uniqueResourceName">TXDB1</property>
       <property name="xaProperties.user">root</property>
       <property name="xaProperties.password">root</property>
       <property name="xaProperties.URL">jdbc:mysql://localhost:3306/DB1</property>
       <property name="poolSize">10</property>
    </dataSourceProps>
 </configuration>
 </definition>
</datasource>

<datasource>
 <name>DS2</name>
 <jndiConfig>
    <name>DS2</name>
 </jndiConfig>
 <definition type="RDBMS">
 <configuration>
    <dataSourceClassName>com.atomikos.jdbc.AtomikosDataSourceBean
</dataSourceClassName>
    <dataSourceProps>
       <property name="xaDataSourceClassName">com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
</property>
       <property name="uniqueResourceName">TXDB2</property>
       <property name="xaProperties.user">root</property>
       <property name="xaProperties.password">root</property>
       <property name="xaProperties.URL">jdbc:mysql://localhost:3306/DB2</property>
       <property name="poolSize">10</property>
     </dataSourceProps>
 </configuration>
 </definition>
</datasource>

Step 3:
Create the ESB sequnce to implement the mentioned scenario using above two data-sources.
<sequence xmlns="http://ws.apache.org/ns/synapse" name="main">
  <in>
   <send>
    <endpoint>
      <address uri="http://localhost:9000/services/SimpleStockQuoteService"/>
         </endpoint>
    </send>
   </in>
   <out>
      <transaction action="new"/>
      <log level="custom">
         <property name="text" value="** Reporting to the Database DB1**"/>
      </log>
      <dbreport useTransaction="true">
         <connection>
            <pool>
               <dsName>DS1</dsName>
            </pool>
         </connection>
         <statement>
           <sql>
             <![CDATA[delete from company_x where name =?]]></sql>
               <parameter xmlns:m1="http://services.samples/xsd" 
     xmlns:ns="http://org.apache.synapse/xsd" 
     xmlns:m0="http://services.samples" 
     expression="//m0:return/m1:symbol/child::text()" type="VARCHAR"/>
         </statement>
         </dbreport>
         <log level="custom">
            <property name="text" value="** Reporting to the Database DB2**"/>
         </log>
         <dbreport useTransaction="true">
            <connection>
               <pool>
                  <dsName>DS2</dsName>
               </pool>
            </connection>
            <statement>
               <sql>
                  <![CDATA[INSERT into company_x values (?,'c4',?)]]></sql>
                  <parameter xmlns:m1="http://services.samples/xsd" 
        xmlns:ns="http://org.apache.synapse/xsd" 
        xmlns:m0="http://services.samples" 
        expression="//m0:return/m1:symbol/child::text()" type="VARCHAR"/>
                  <parameter xmlns:m1="http://services.samples/xsd" 
        xmlns:ns="http://org.apache.synapse/xsd" 
        xmlns:m0="http://services.samples" 
        expression="//m0:return/m1:last/child::text()" type="DOUBLE"/>
               </statement>
            </dbreport>
            <transaction action="commit"/>
            <send/>
         </out>
      </sequence>


Testing
Successful Scenario

1. To remove the IBM record from the first database and add it to the second database, run the sample with the following options.
ant stockquote -Daddurl=http://localhost:9000/services/SimpleStockQuoteService -Dtrpurl=http://localhost:8280/ -Dsymbol=IBM

2. Check both databases to see how the record is deleted from the first database and added to the second database.

Failure Scenario

1. Try to add an entry which is already there in the second database. This time use Symbol SUN.
ant stockquote -Daddurl=http://localhost:9000/services/SimpleStockQuoteService -Dtrpurl=http://localhost:8280/ -Dsymbol=SUN

2. You will see that whole transaction has rollback. Check both databases again; there is no record deleted from the first database and no record added into the second database.

10 comments:

  1. Hi Dinusha,

    I have one Query
    Here i am sending the my GIT URL which contain my Query

    https://gist.github.com/anonymous/5300686

    Could you please help me.
    Thanks in Advance
    Anil

    ReplyDelete
    Replies
    1. Hi Anil,

      I went through your query. First check whether the data service that you have created can be called directly without using an ESB proxy service. (i.e invoke 'selectallusergroups' operation using try-it tool in DSS or any other client and check whether, it returns the expected result. same way, invoke the 'select_id_columns' operation and check it works as expected.)

      If both operations works fine, then adding it into a ESB proxy service is simple. Pass through proxy configuration is the simplest proxy that you can create using ESB. You just need to provide the end point of the data-service that you created above and in publish wsdl section, provide the wsdl url of your data-service. Following guide will help you to create the proxy service.

      [1]. http://docs.wso2.org/wiki/display/ESB460/Pass+Through+Proxy+Template

      Regards,
      Dinusha.

      Delete
  2. Hi,

    is an error using directly :
    com.mysql.jdbc.jdbc2.optional.MysqlXADataSource

    instead of
    com.atomikos.jdbc.AtomikosDataSourceBean

    com.mysql.jdbc.jdbc2.optional.MysqlXADataSource

    ?

    Thank you,
    Claudio

    ReplyDelete
    Replies
    1. Yes, you need to use com.atomikos.jdbc.AtomikosDataSourceBean class. Because, when you are using Atomikos transaction manager to handle JDBC transactions, that data-source class will take care of the XA resource enlistment [1]. Unless your application handle the resource enlistment, you need to use this AtomikosDataSourceBean class. For the given ESB sample, yes, you cannot to use it without AtomikosDataSourceBean class, it will not handle the transactions if you didn't use it.

      [1]. http://www.atomikos.com/Documentation/ConfiguringJdbc#Enlistment_delistment_and_recove

      Delete
  3. Hi Dinusha, Thank you and it's working for My sql, But how can i implement same for "PostgreSQL 9.1.4". Please guide me.

    ReplyDelete
    Replies
    1. Hi Anil,

      I haven't specifically tested with PostgreSQL. But it doesn't matter which RDBMS DB we are using, as long as those are support for the xa-datasources. Only thing we need to change is, data-source properties that we defining when creating the datasource to match with PostgreSQL. Datasource configuration would be something like follows for PostgreSQL,

      <configuration>
      <dataSourceClassName>com.atomikos.jdbc.AtomikosDataSourceBean</dataSourceClassName>
      <dataSourceProps>
      <property name="xaDataSourceClassName">org.postgresql.xa.PGXADataSource</property>
      <property name="uniqueResourceName">TXDB1</property>
      <property name="xaProperties.user">root</property>
      <property name="xaProperties.password">root</property>
      <property name="xaProperties.serverName">localhost</property>
      <property name="xaProperties.portNumber">5432</property>
      <property name="xaProperties.databaseName">DB1</property>
      <property name="poolSize">10</property>
      </dataSourceProps>

      Also note that, to enable XA in PostgreSQL, set the "max_prepared_transactions" parameter in postgresql.conf. Its value should be at least as large as the "max_connections" value. You could refer this [1] as well.

      [1]. http://www.atomikos.com/Documentation/ConfiguringPostgreSQL

      Delete
  4. Hi Dinusha,

    I created the xa datasource for a oracle database but I am not able to test the XA- data source using the test connection button. were you able to test the connection from the UI. I am using ESB version 4.8.0



    DS1

    DS1



    com.atomikos.jdbc.AtomikosDataSourceBean

    oracle.jdbc.xa.client.OracleXADataSource
    EbsXAData
    ....
    ....
    jdbc:oracle:thin@chelsqlora01-vip.karmalab.net:1533:PRJ2DEV1
    10





    Am I missing anything?
    thanks,
    Ravi

    ReplyDelete
    Replies
    1. Hi Ravi,

      This is a known issue (reported in [1]) when using the AtomikosDataSourceBean class.

      The reason is that there's a limitation from Atomikos level,that the connection does not get closed once it's initialized for testConnection. So if you first try 'Test Connection' option and secondly try invoking proxy service which is using same XA datasource,the second invocation will fail.

      Hence,please don't use 'test connection' option,before invoking the proxy service as you have used atomikos based datasource.

      [1]. https://wso2.org/jira/browse/ESBJAVA-2828

      Regards,
      Dinusha.

      Delete
  5. hi Dinusha,
    can you please let me know how transaction work in one data source.

    if i have 2 operation(insert). first insertion is successful and second is not.
    in that case i need to perform roll back.

    i used transaction but its not working.

    you can find the snippet here.

    http://stackoverflow.com/questions/25719098/how-to-use-transaction-in-wso2

    thanks in advance,
    Vikas

    ReplyDelete

Note: Only a member of this blog may post a comment.