Saturday, November 3, 2012

Distributed Transactions with WSO2 ESB

A transaction is a set of operations that executed as a single unit. When it comes to distributed transactions, it involves two or more networked computers, often using multiple databases. 

It's required to have transaction manager to handle these  distributed transactions. WSO2 carbon platform has integrated the "Atomikos" transaction manager which is a implementation of Java Transaction API (JTA). Also some products like WSO2DSS, WSO2ESB shipped this transaction manager by default and hence you don't need to provide an external JTA provider to use distributed transactions inside these products. 

WSO2ESB contains a synapse mediator called 'Transaction Mediator' which support the distributed transactions using Java Transaction API. In this post we are going to write some sample proxy service that uses the Transaction Mediator and inbuilt transaction manager that comes with WSO2 ESB to handle distributed transaction.

Sample Scenario
The scenario going to use in this sample is same as the scenario used in WSO2 ESB Transaction Mediator Sample (Sample describes above has used JBoss Application Server support to create XA data-sources and also transaction manager has been used the one provided from JBoss). But in this sample we are going to use the in built transaction manager that comes with WSO2 ESB and also XA data-sources are going to create using the Carbon data-source feature without using JBoss Application server.

Use the following scenario to show how the Transaction Mediator works. Assume we have a record in one database and we want to delete that record from the first database and add it to the second database (these two databases can be run on the same server or they can be in two remote servers). The database tables are defined in such a way that the same entry cannot be added twice. So, in the successful scenario, the record will be deleted from the first table (of the first database) and will be added to the second table (of the second database). In a failure scenario (the record is already in the second database), no record will be deleted from first table and no record will be added into the second database.

Step 1:
Create the sample Database and tables using MySQL server.

Create database DB1;
CREATE table company_x(name varchar(10) primary key, id varchar(10), price double);
INSERT into company_x values ('IBM','c1',0.0);
INSERT into company_x values ('SUN','c2',0.0);

Create database DB2;
CREATE table company_x(name varchar(10) primary key, id varchar(10), price double);
INSERT into company_x values ('SUN','c2',0.0);
INSERT into company_x values ('MSFT','c3',0.0);

Step 2:
(i) Add the mysql jdbc driver jar to {ESB_HOME}/repository/components/lib.

(ii) Create two XA data-sources for above two tables. For that, add the following xml configuration to master-datasources.xml file located in {ESB_HOME}/repository/conf/datasources/ directory. (These data-sources can be created using ESB UI as well rather using master-datasources.xml config file). In this data-sources, note that we have used the data-source provider class that provided from Atomikos transaction manager "com.atomikos.jdbc.AtomikosDataSourceBean". Other than that, you can provide the XA data-source properties specific to driver under <dataSourceProps>. Restart the server after modifying master-datasources.xml.  
<datasource>
    <name>DS1</name>
    <jndiConfig>
        <name>DS1</name>
    </jndiConfig>
 <definition type="RDBMS">
 <configuration>
    <dataSourceClassName>com.atomikos.jdbc.AtomikosDataSourceBean
</dataSourceClassName>
    <dataSourceProps>
       <property name="xaDataSourceClassName">com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
</property>
       <property name="uniqueResourceName">TXDB1</property>
       <property name="xaProperties.user">root</property>
       <property name="xaProperties.password">root</property>
       <property name="xaProperties.URL">jdbc:mysql://localhost:3306/DB1</property>
       <property name="poolSize">10</property>
    </dataSourceProps>
 </configuration>
 </definition>
</datasource>

<datasource>
 <name>DS2</name>
 <jndiConfig>
    <name>DS2</name>
 </jndiConfig>
 <definition type="RDBMS">
 <configuration>
    <dataSourceClassName>com.atomikos.jdbc.AtomikosDataSourceBean
</dataSourceClassName>
    <dataSourceProps>
       <property name="xaDataSourceClassName">com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
</property>
       <property name="uniqueResourceName">TXDB2</property>
       <property name="xaProperties.user">root</property>
       <property name="xaProperties.password">root</property>
       <property name="xaProperties.URL">jdbc:mysql://localhost:3306/DB2</property>
       <property name="poolSize">10</property>
     </dataSourceProps>
 </configuration>
 </definition>
</datasource>

Step 3:
Create the ESB sequnce to implement the mentioned scenario using above two data-sources.
<sequence xmlns="http://ws.apache.org/ns/synapse" name="main">
  <in>
   <send>
    <endpoint>
      <address uri="http://localhost:9000/services/SimpleStockQuoteService"/>
         </endpoint>
    </send>
   </in>
   <out>
      <transaction action="new"/>
      <log level="custom">
         <property name="text" value="** Reporting to the Database DB1**"/>
      </log>
      <dbreport useTransaction="true">
         <connection>
            <pool>
               <dsName>DS1</dsName>
            </pool>
         </connection>
         <statement>
           <sql>
             <![CDATA[delete from company_x where name =?]]></sql>
               <parameter xmlns:m1="http://services.samples/xsd" 
     xmlns:ns="http://org.apache.synapse/xsd" 
     xmlns:m0="http://services.samples" 
     expression="//m0:return/m1:symbol/child::text()" type="VARCHAR"/>
         </statement>
         </dbreport>
         <log level="custom">
            <property name="text" value="** Reporting to the Database DB2**"/>
         </log>
         <dbreport useTransaction="true">
            <connection>
               <pool>
                  <dsName>DS2</dsName>
               </pool>
            </connection>
            <statement>
               <sql>
                  <![CDATA[INSERT into company_x values (?,'c4',?)]]></sql>
                  <parameter xmlns:m1="http://services.samples/xsd" 
        xmlns:ns="http://org.apache.synapse/xsd" 
        xmlns:m0="http://services.samples" 
        expression="//m0:return/m1:symbol/child::text()" type="VARCHAR"/>
                  <parameter xmlns:m1="http://services.samples/xsd" 
        xmlns:ns="http://org.apache.synapse/xsd" 
        xmlns:m0="http://services.samples" 
        expression="//m0:return/m1:last/child::text()" type="DOUBLE"/>
               </statement>
            </dbreport>
            <transaction action="commit"/>
            <send/>
         </out>
      </sequence>


Testing
Successful Scenario

1. To remove the IBM record from the first database and add it to the second database, run the sample with the following options.
ant stockquote -Daddurl=http://localhost:9000/services/SimpleStockQuoteService -Dtrpurl=http://localhost:8280/ -Dsymbol=IBM

2. Check both databases to see how the record is deleted from the first database and added to the second database.

Failure Scenario

1. Try to add an entry which is already there in the second database. This time use Symbol SUN.
ant stockquote -Daddurl=http://localhost:9000/services/SimpleStockQuoteService -Dtrpurl=http://localhost:8280/ -Dsymbol=SUN

2. You will see that whole transaction has rollback. Check both databases again; there is no record deleted from the first database and no record added into the second database.