Distributed Transactions

Error formatting macro: composition-setup: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil


GemFire provides distributed transactions to increase reliability and consistency when reading and writing multiple Entry operations, potentially across multiple Regions. Reliability is improved by guaranteeing distribution of data despite VM failure. Consistency is improved by adding conflict detection and tighter Entry operation atomicity. The API for distributed transactions is the familiar (for those using relational databases) begin, commit, rollback methods. For example:

    Cache c = CacheFactory.create(DistributedSystem.connect(null));
    AttributesFactory af = new AttributesFactory();
    Region<String, Integer> cash = c.createRegion("cash", af.create());
    Region<String, Integer> trades = c.createRegion("trades", af.create());
    CacheTransactionManager txmgr = c.getCacheTransactionManager();
    try {
      final String customer = "Customer1";
      final Integer purchase = Integer.valueOf(1000);

      // Decrement cash
      Integer cashBalance = cash.get(customer);
      Integer newBalance = Integer.valueOf((cashBalance != null ? cashBalance : 0) - purchase);
      cash.put(customer, newBalance);

      // Increment trades
      Integer tradeBalance = trades.get(customer);
      newBalance = Integer.valueOf((tradeBalance != null ? tradeBalance : 0) + purchase);
      trades.put(customer, newBalance);

    } catch (CommitConflictException conflict) {

The above code attempts to emulate the typical relational database accounts example. It creates a Distributed System from which a Cache is built from which Regions are built. A transaction is started using the begin method on the CacheTransactionManager. The commit method on the CacheTransactionManager throw an exception if there have been changes to the customer key at which time the code discards the transactional state using the rollback method.

As a high performance distributed key value storage system, GemFire implements transactions differently than a traditional relational database such as PostgreSQL or MySQL, as a result there are key differences which will be described in traditional ACID terms.

When deciding whether or not to use distributed transactions it's important to understand the cost/benefit trade-off specifically with respect to performance and data scale. For example, a GemFire transaction batches up all the its operations into a single message, improving distribution efficiency for transactions with many operations, however the more operations a transaction has the more likely it is to conflict with other transactions.

Relational database comparison

The ACID comparison

Below is a breakdown of the differences between GemFire and relational database transactions using the ACID database transaction model.


A common database technique to achieve atomicity is to use Two-phase locking on rows to co-ordinate intersecting transactions, those with changes overlapping with the current transaction. Holding write locks prevents reads of transactional changes until all changes are complete, providing all or none behavior. If problems occurred during the transaction, perhaps due relational constraints or storage limitations, again the locks protect from reads while the problems are resolved often by reverting data back to the original pre-transaction state. Rolling back a database transaction in this example would requires releasing write locks.

A common distributed technique to implement all or none behavior is the Two Phase Commit Protocol which requires all processes participating in the transaction to agree or abort the transaction.

The all or none behavior of GemFire transactions is implemented using a per thread transaction state which collects all transactional changes. The set of Entry changes are atomically reserved by a reservation system. The obtained reservation prevents other intersecting transactions from proceeding (See section on Optimism) allowing the commit to check for conflicts and reserve resources in an all-or-none fashion prior to making changes to the data. Since GemFire is distributed, the transaction state is also sent to the appropriate VMs in a batch and processed to protect against VM failure during the application of the transaction state. After all the changes have been made, both locally and remotely, then the reservation is released. Rolling back the transaction is as simple as discarding the transaction state. Reads do not use the reservation system and as a result are very fast. This system has similar traits to the Muliversion Concurrency Control algorithm in that each thread can write new versions of Entry data without impacting read performance. Also similar in that reads view only the most recent committed version of an Entry.


Database consistency is often described in terms of Referential integrity and references between tables is often described in terms of primary and foreign key constraints. A database transaction which updates multiple colums, potentially in different tables, must ensure that all the updates are completed to maintain consistency again in an atomic fashion.

For GemFire, the application controls the amount and type of consistency between Regions and their Entries. GemFire transactions, during commit use the reservation system to ensure that changes to all Entries for every Region are allowed to proceed at commit time. Once a reservation is obtained a conflict check is performed to ensure that the entries scheduled for change are in the same state as they were when the transaction made the changes (See Conflict checking).


GemFire transaction isolation occurs at the processes and thread level, meaning that transactional changes made by a processes' thread are not visible until the commit phase and other threads (in the same or other processes) do not share transaction data until it is committed. Relational database transactions often isolate changes per JDBC connection.

GemFire reads e.g. get, getEntry, etc. are volatile, meaning they do not use locks. This makes reads very fast with respect to other entry operations, it also allows transactional writes to proceed without being impeded by in progress reads.

Visibility, related to isolation, is the atomic nature in which committed writes are viewable by read operations. Reads during a GemFire transaction offer Repeatable Read isolation, meaning that once the committed value reference is read for a given key, it will always return the same reference. If the transaction writes e.g. put, invalidate, destroy a value for a key that has been read, subsequent reads will return the transactional reference.

Reads return a reference to the value which has been committed to the Region (aka committed state). Direct modifications to a value stored in a Region, say assigning a member variable of the value, are strongly discouraged particularly if they are made during a transaction because they will be immediately visible (before commit!) since they change committed state. The following example illustrates this anti-pattern:

    Region<Integer, StringBuilder> r1 = ...;
    CacheTransactionManager txmgr = ...;
    try {
      Integer key = Integer.valueOf(1);
      StringBuilder v = r1.get(key);
      v.append(" added stuff");  // BAD!!!  This change is not isolated from other transactions!
      r1.put(key, v);
    } catch (CommitConflictException conflict) {

The following is one possible fix to the above example:

    Region<Integer, StringBuilder> r1 = ...;
    CacheTransactionManager txmgr = ...;
    try {
      Integer key = Integer.valueOf(1);
      StringBuilder v = r1.get(key);  // assume v is not null
      StringBuilder newV;
      synchronized (v) {
        newV = new StringBuilder(v.toString());
      newV.append(" added stuff");
      r1.put(key, newV);
    } catch (CommitConflictException conflict) {

There are two important improvements:

  1. A new value is created to put into the Region, which will be staged in transactional context and committed if there are no conflicts
  2. StringBuilders are not synchronized and require synchronization to ensure a consistent view of their contents when making a copy. This code chooses to synchronize on the instance for protection which requires writes to observe the same usage.

Another option is to have GemFire create a copy of value to be modified, either automatically by setting copyOnRead on the Cache or manually creating a copy using the CopyHelper

Another important aspect is atomic visibility of committed data; viewing transactional changes before all of them have been applied or "in transition" commits. For example say key "1" and "2" are written to in a transaction such that their values, change from "A" to "B". In another thread, is it possible to read key "1" with value "B" and key "2" with value "A", while the transaction is committed? This is possible, because of the nature of GemFire reads. This choice sacrifices atomic visibility in favor of performance; reads don't block writes, writes don't block reads.

As mentioned before, the Two-phase locking technique on relational database rows requires extra overhead for both reads and writes, sacrificing performance for isolation purposes. Using such a technique, releasing write locks after all rows have been written, it would not be possible to read "in transition" commits.


Depending on the storage implementation MySQL or Postgress database transactions are more durable than GemFire. Such databases implement durability using disk storage for recovery and transaction logging.

GemFire transactions currently are only as durable as the availability of non-persistent Regions, essentially the more copies of the data on different members the higher the durability. Without persistence, the durability becomes an issue of RAM, network and VM reliability. Unfortunately the more resilient parts of GemFire, persistent Regions and Partitioned Regions do not support transactions.

Other important differences


A relational database transaction typically uses SQL to define the operations which in the transaction. GemFire transactions are written in Java, are composed of Region entry operations e.g. create, put, invalidate, destroy, which occur between the CacheTransactionManager.begin and commit or rollback method calls. SQL is typically limited to the basic relational data command, SELECT, INSERT, UPDATE, and DESTROY, where as Java is a much broader language, however only the GemFire operations are transactional, and even then not all Region operations are transactional, e.g. Region.destroyRegion.


GemFire transactions perform data locking and consistency checks during the commit operation, assuming optimistically that there will be little intersection (aka conflict) between transactions. Relational databases when writing to rows typically locks rows early in the transaction, assuming frequent conflicts, using locks to prevent them. The trade-off between the two is mostly performance, where pessimistic transactions often block other operations and optimistic ones do less blocking. Knowing transactions are optimistic, they should be crafted to prevent conflict to get the best results; one way is to to shrinking the number of operations that a transaction performs, another would be to assign transactional owners to ranges of non-intersecting data.

Conflict checking

GemFire offers write-write conflict checking, ensuring that no changes have occurred to entries which are scheduled for transactional update. An entry is initially considered for update when it is read e.g. Region.get and finally scheduled when a write is performed e.g. Region.put. Changes are determined by checking identity (aka ==).

Sharing or Nesting transactions

In some database systems, placing a transaction within another transaction, in other words "nesting", is allowed. Nesting is not allowed in GemFire, specifically calling CacheTransactionManager.begin two or more times before calling commit.

Sharing GemFire transaction state between threads is not allowed, mainly because there is no exposed object which represents the transactional state.


Notification of transactional Entry operations are available through the CacheListener, CacheWriter and TransactionListener interfaces. CacheListener and CacheWriter deliver events per Entry and are made aware of the current transaction, if any, with the getTransaction method. A TransactionListener delivers events per transaction, in a condensed view, reflecting all operations performed in the transaction across Regions, operations conflated on a per key basis.

Commit processing sequence

So far much of the commit process has been mentioned. GemFire places the bulk of the work for a transaction into the commit method after processing the commit set, the set of Entries changed by the thread between the begin and commit method calls. The following is a sequential view of what occurs during commit, as of GemFire 6.0.

  1. Acquire distributed reservation
    A message is sent to the reservation VM for all the keys (by Region) in the commit set. The entire reservation phase is atomic and fails fast if other committing transactions intersect with the request. The reservation is only granted if and only if the entire set is avialable. A failure to obtain a reservation results in a CommitConflictException. Upon failure, configured transaction listeners are notified of the failed commit set.
    A reservation manager is created in the first peer which performs a commit. Avoiding the reservation message can be done by sending all of the transactions from the same node. This technique will result in improved throughput for your transactions.
  2. Local conflict checks
    A conflict check is performed to ensure there were no changes to the Entries in the commit set. If so, the transaction fails with a CommitConflictException. This check along with the previous step guarantees that none of the objects were updated within the scope of the transaction. The previous reservation step has been crafted to allow the conflict check to rely on local identity to reflect distributed state, making the check very fast. It is important to note that identity comparisons, while fast, fall prey to the ABA problem.
  3. Commit set message
    The commit set is distributed to each member node involved in the transaction. GemFire computes the member set based on the Regions affected and the members that have those Regions defined. Using this information, a message is crafted which batches all transaction operations into a single send operation, optimized for each member by removing un-necessary Region data.
    Each recipient of the commit set fully deserializes the message. How GemFire distributes this commit set is based on the distribution scope configuration used. For commit changes to Scope.DISTRIBUTED_NO_ACK Regions, the commit set is immediately processed, otherwise it is placed on hold for a second commit message.
  4. Commit message
    Once all the commit set messages have been sent, a final commit message is sent to all the members with Scope.DISTRIBUTED_ACK Regions. Each receiving member processes the commit set at this time - the entries are all applied to the cache and any CacheListeners associated with the Regions are invoked in the same order as they occurred in the transaction. All recipients of the commit message reply with an acknowledgment. From a performance standpoint, it's important to note that all messages are sent before waiting for acknowledgment.
    For a strong transactional guarantee, it is important to use Scope.DISTRIBUTED_ACK, since the commit operation will wait for each node to process the transactional changes and the sending thread can be made aware of any problems while applying the transaction. Any Region with Scope.DISTRIBUTED_NO_ACK weakens the transactional guarantee, because the commit set is immediately applied and the sending thread will not be made aware of problems during processing.
  5. Applying to the local cache
    Once distribution has successfully occurred, the commit set is applied to the local cache. Any pending expiration or eviction operations that would have caused conflicts with the current transaction are processed.
  6. Completion
    The configured transaction listeners are invoked. The commit method returns.

GemFire and JTA transactions

A Java EE Container or J2EE application server typically implements distributed transactions (also called XA) using the JTA API. GemFire participates in JTA transactions by implementing the javax.transaction.Synchronization interface.

When GemFire classes are loaded, a search for an existing JTA javax.transaction.TransactionManager is performed. If one exists and there is an ongoing javax.transaction.Transaction, a GemFire transaction is started for that thread, the GemFire transaction is registered as a Synchronization with the TransactionManager and each Region.Entry operation is collected with the GemFire transaction.

When the javax.transaction.Transaction is committed, the GemFire transactional state is processed in the two methods defined on the Synchronization interface. The Syncronization.beforeCompletion() call creates the reservation and checks for conflicts. The Syncronization.afterCompletion(int status) call applies the transaction state if the status is STATUS_COMMITTED. If the provided status is STATUS_ROLLEDBACK then the GemFire transaction is rolledback. For all other status codes an exception is thrown with the message "Unknown JTA Synchronization status".

Dealing with Failures

The following are failure cases and their consequences using GemFire 6.0

Case (1): Committing VM fails during commit

If the commit set was never dispatched to the other members, the transaction is not applied and the reservation is released. If the failure occurs after one or more commit set messages are dispatched, the transaction is completed on all recipient members. The reservation is held in place until all recipients have processed the transaction to guarantee atomicity.

Case (2): A serialization error occurs during sending or receiving of the commit set message

Say due to a programming mistake, one of the values in the transaction fails to serialize. Serialization of data occurs during commit before sending any data, any exception thrown during serialization will be thrown from the commit method. Under these conditions, the reservation is released, the local committed state is not effected, and TransactionListener callbacks are not invoked.

Exceptions caught during de-serialization on the receiving VMs are logged to the recipient's log file. The may cause the commit to fail if there are Scope.DISTRIBUTED_ACK Regions in the commit set. The commit method will throw a CommitIncompleteException with a message text indicating the failing VM's member and the message text of the exception thrown during de-serialization. In this situation, the commit fails to apply the commit set on the committing VM and the VM which threw the de-serialization exception. The commit set will be successfully applied to all other recipients.

Case (3): Recipient VM fails while processing commit set.

Say a VM fails while processing the commit set for one or more Scope.DISTRIBUTED_ACK Regions, recall the commit set is processed after the commit message is sent. The failure could of many different categories: perhaps an OutOfMemoryException, or a VM crash, or possibly a network failure causing a split in the DistributedSystem membership aka "Split Brain". The sender will wait to receive acknowledgment and will stop waiting when the GemFire membership manager removes the failed member from the distributed system. Appropriate membership departure information is logged and the commit process continues on the remaining members. GemFire does not fail the transaction. When a replacement for the failed member is started, it will acquire the latest copy of data (including the failed transaction through the Region's 'Get Initial Image' process. The best chance for complete recovery of a transaction is to ensure the survival of a VM with the Region configured using DataPolicy.REPLICATE.

Case (4): A CacheRuntimeException occurs while processing the commit set.

Once the commit set is received and begins processing, CacheRuntimeExceptions thrown during processing are handled in this way:

  • A RegionDestroyedException can cause commit to throw a CommitDistributionException if the Region which was destroyed during processing has MembershipAttributes configured on the committing VM. In this case the commit set on the remote VM which threw the RegionDestroyedException would be applied to all Entries except the ones for the destroyed Region. The commit set would not be applied locally, but it would be applied to all other recipient VMs. If the commit set's Regions do not have MembershipAttributes, then RegionDestroyedExceptions are ignored, the commit set is applied locally and on all remote VMs where the commit set Regions are not destroyed.
  • Any subclass of a CancelException, such as a CacheClosedException thrown during processing can also cause commit to throw a CommitDistributionException, depending on the MembershipAttributes of the Regions in the transaction. This case has the same commit behavior as RegionDestroyedException situation. If the commit set's Regions do not have MembershipAttributes, then CancelExceptions are ignored, the commit set is applied locally and on all remote VMs where the cache was available.
  • All other subclasses of CacheRuntimeException caught while processing the commit set cause commit to throw a CommitIncompleteException. This behavior does not depend on Region MembershipAttributes. The commit set is not applied locally, partially applied on the throwing VM, and completely applied on recipient VMs which did not throw any exceptions.

Case (5): A VirtualMachineError occurs while processing the commit set

When a VirtualMachineError is received, a shutdown hook closes the DistributedSystem instance. If this occurs on the committing VM, then Case(1) is the result, otherwise If this occurs on a recipient VM then Case(3) is the result.

Case (6): An IOException occurs as a result of overflowing part of the transaction data

This is a special case of Case (4).

Say one of the Regions in the transaction has EvictionAttributes with EvictionAction.OVERFLOW_TO_DISK and the filesystem on which the overflow occurs has a problem, perhaps running out of storage or perhaps a disk failure. The Java VM throws a java.io.IOException under these conditions, which could be caught during processing a transaction commit set. When such an IOException is caught, all further operations on that Region are in doubt, transactional or otherwise, and so the Region is closed. Closing the Region causes the transaction set processing to terminate and causes a reply to be sent back to the sender noting the member that destroyed the Region. The transaction continues without error for the rest of the members and will complete on the sending node, unless the failing Region on the sender has MembershipAttributes configured. In that situation, if there are matching configured roles, the transaction commit method will throw a CommitDistributionException, resulting in a partial transaction: having changed remote state, but leaving local committed state unchanged.

Case (7): The peer hosting the reservation system crashes

The reservation system uses many parts of the Distributed locking system. Specifically it has the same recovery mechanisms. A new reservation peer is created and it recovers all existing "in flight" reservations by querying the existing peers which are in the process of committing transactional data. During the recovery period, no new reservations are issued - thus no new transactions start - until the recovery is complete. In flight commit operations complete without pause.

Transaction Statistics

Transactions related statistics can be found under the CachePerfStats name and type. They are broken down into two general catagories, time and count based. Time based stats are useful for performance tuning when used in conjunction with count based stats. Count based stats are useful for determining system activity. In conjunction with system logging, both types are very useful for debugging.

Statistics Overview

See Management and Monitoring for instructions on how to record statistics and tools that can analyze statistics.

Count based statistics:

statistic description
txCommits Total number times a transaction commit has succeeded.
txCommitChanges Total number of changes made by committed transactions.
txFailures Total number times a transaction commit has failed.
txFailureChanges Total number of changes lost by failed transactions.
txRollbacks Total number times a transaction has been explicitly rolled back.
txRollbackChanges Total number of changes lost by explicit transaction rollbacks.

Time based statistics:

statistic description
txCommitTime The total amount of time, in nanoseconds, spent doing successful transaction commits.
txSuccessLifeTime The total amount of time, in nanoseconds, spent in a transaction before a successful commit. The time measured starts at transaction begin and ends when commit is called.
txFailureTime The total amount of time, in nanoseconds, spent doing failed transaction commits.
txFailedLifeTime The total amount of time, in nanoseconds, spent in a transaction before a failed commit. The time measured starts at transaction begin and ends when commit is called.
txRollbackTime The total amount of time, in nanoseconds, spent doing explicit transaction rollbacks.
txRollbackLifeTime The total amount of time, in nanoseconds, spent in a transaction before an explicit rollback. The time measured starts at transaction begin and ends when rollback is called.
txConflictCheckTime The total amount of time, in nanoseconds, spent doing conflict checks during transaction commit.

Related Links


transaction transaction Delete
jta jta Delete
listener listener Delete
Enter labels to add to this page:
Please wait 
Looking for a label? Just start typing.