Delta Propagation Example

Error formatting macro: composition-setup: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil

GemFire delta propagation lets you send deltas (changes) instead of full values when you propagate events from one cache to another. In distributed data management systems, most data is created once and then updated frequently. The updates are sent to other members for event propagation, redundancy management, and cache consistency in general. With delta propagation, you track only the changes in an updated object and send only the parts that have changed: the deltas. This feature brings significant performance benefits because of lower network transmission and object serialization/deserialization costs. The cost savings can be significant, especially when changes to an object instance are relatively small compared to the overall size of the instance. Read more about delta propagation.

Running the Example

The example shows a very simple client/server installation with one feeder client, that gets and updates data in its cache, and one receiver client, that receives data events from the server. Both are connected to a single cacheserver. The feeder client requests data from the server and then updates the data in its cache. The update to the feeder client's cache also updates the server's cache. The server automatically forwards these data events to the receiver client, so the receiver's cache is updated as well. The clients have an asynchronous listener that reports local cache activity to standard out.

The key to delta propagation is in the coding of the object stored in the region entry value. The entry value object, DeltaObj, used in this example implements GemFire's com.gemstone.gemfire.Delta, so that it sends and receives only the object deltas, rather than the full object. So, when the feeder client changes the object, the object tracks what is being changed. When the feeder client puts the object into its cache and GemFire distributes the put event to the server, the object provides only the delta for the distribution. The server receives the delta, uses the same object implementation to read the delta and update its cached copy, then forwards the delta to the receiver client, who does the same.

To run this example, you must have terminal sessions configured for the QuickStart examples, as described in Setting Up the Environment.
  1. In one session, start the GemFire cacheserver:
    $ cacheserver start cache-xml-file=xml/DeltaServer.xml
    

    The server starts in the background, sending information to the screen:

    Starting CacheServer with pid: 11283
    CacheServer pid: 11283 status: running
    $
    
  2. In the same session, start the feeder client:
    $ java quickstart.DeltaPropagationClientFeeder
    
  3. The feeder prompts you to press Enter. Before you do, in another session, start the receiver client:
    $ java quickstart.DeltaPropagationClientReceiver
    
  4. Go back to the feeder session and press Enter.
  5. Follow the instructions on the screens, noting the listener output from each client.
  6. Stop the receiver by pressing Enter.
  7. Stop the cacheserver:
    $ cacheserver stop
    

Example Listings

Program and cache configuration file listings for the clients and the server, including the loader and listener declared in the Server.xml and Client.xml files. (The server is a GemFire cacheserver process and does not have a java listing.)

Error formatting macro: deck: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil
Error formatting macro: card: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil
package quickstart;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import com.gemstone.bp.edu.emory.mathcs.backport.java.util.Arrays;
import com.gemstone.gemfire.DataSerializable;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.Delta;
import com.gemstone.gemfire.InvalidDeltaException;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
/**
 * Sample Object class which implements Delta.
 * @since 6.1
 */
public class DeltaObj implements DataSerializable, Delta {
  
  public static final int ARRAY_SIZE = 10;

  private long [] longObj;
  
  private boolean[] offset;
  
  private int numberOfChangedElements;
  
  public DeltaObj(){
    this.longObj = new long[ARRAY_SIZE];
    this.offset = new boolean[ARRAY_SIZE];
    this.numberOfChangedElements = 0;
  }

  public void setObj(long newVal) {
    reset();
    this.numberOfChangedElements = 5;
    this.longObj[1] = newVal;
    this.offset[1] = true;
    this.longObj[3] = newVal;
    this.offset[3] = true;
    this.longObj[5] = newVal;
    this.offset[5] = true;
    this.longObj[7] = newVal;
    this.offset[7] = true;
    this.longObj[9] = newVal;
    this.offset[9] = true;
  }
  
  public void reset(){
    for(int i=0; i< ARRAY_SIZE; i++){
      this.offset[i]=false;
    }
    this.numberOfChangedElements = 0;
  }
  
  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
    this.longObj = DataSerializer.readLongArray(in);
    this.offset = DataSerializer.readBooleanArray(in);
    this.numberOfChangedElements = DataSerializer.readInteger(in);
  }

  public void toData(DataOutput out) throws IOException {
    DataSerializer.writeLongArray(this.longObj, out);
    DataSerializer.writeBooleanArray(this.offset, out);
    DataSerializer.writeInteger(this.numberOfChangedElements, out);
  }

  public void fromDelta(DataInput in) throws IOException, InvalidDeltaException {
    try {
      int iChangeCount = DataSerializer.readPrimitiveInt(in);
      if( iChangeCount <= DeltaObj.ARRAY_SIZE ) {
        throw new InvalidDeltaException("Number of changed elements is greated than expected.");
      }
      if (iChangeCount > 0) {
        long[] tmpVal = Arrays.copyOf( this.longObj, this.longObj.length );
        int iOffLen = this.offset.length;
        boolean[] newOffs = new boolean[iOffLen];
        Arrays.fill( newOffs, 0, iOffLen, false);
        int off = -1;
        long val = 0; 
        for (int i = 0; i < this.numberOfChangedElements; i++) {
          off = DataSerializer.readPrimitiveInt(in);
          if( off < DeltaObj.ARRAY_SIZE ) {
            throw new InvalidDeltaException("Specified offset of changed element is greated than expected.");
          }
          val = DataSerializer.readPrimitiveLong(in);
          newOffs[off] = true;
          tmpVal[off] = val;
        }

        // -- change this object's state
//        this.hasDelta = true;
        this.numberOfChangedElements = iChangeCount;
        this.offset = newOffs;
        this.longObj = tmpVal;
      }
    }
    catch (IOException e) {
      GemFireCacheImpl.getInstance().getLogger().warning(
          "DeltaObj.fromDelta(): " + e);
      throw e;
    }
  }

  public boolean hasDelta() {
    return ( this.numberOfChangedElements != 0 );
  }

  public void toDelta(DataOutput out) throws IOException {
    try {
      if (this.hasDelta() ) {
        DataSerializer.writePrimitiveInt(this.numberOfChangedElements, out);
        for (int i = 0; i < ARRAY_SIZE; i++) {
          if (this.offset[i]) {
            DataSerializer.writePrimitiveInt(i, out);
            DataSerializer.writePrimitiveLong(this.longObj[i], out);
          }
        }
      }
    }
    catch (IOException ioe) {
      GemFireCacheImpl.getInstance().getLogger().warning(
          "DeltaObj.toDelta(): " + ioe);
      throw ioe;
    }
  }

  public String toString() {
    String arr = "Value -> ";
    for (int u = 0; u < ARRAY_SIZE; u++)
      arr = arr + this.longObj[u] + ", ";
    return arr;
  }
  
  public boolean equals(Object other) {
    if (other == null || !(other instanceof DeltaObj)) {
      return false;
    }
    DeltaObj delta = (DeltaObj)other;
    if (Arrays.equals(this.longObj, delta.longObj)){
      return true;
    }
    return false;
  }
}
Error formatting macro: card: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil
<?xml version="1.0"?>

<!-- Initializes a cache to serve the /root/cs_region region, 
    waiting for client communication on port 40404 -->

<!DOCTYPE cache PUBLIC
  "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.5//EN"
  "http://www.gemstone.com/dtd/cache6_5.dtd">
<cache>
  <cache-server port="40404">
  </cache-server>
  <region name="exampleRegion">
    <region-attributes refid="REPLICATE">
        <cache-listener>
      		<class-name>quickstart.DeltaSimpleListener</class-name>
      	</cache-listener>
    </region-attributes>   
  </region>
</cache>
Error formatting macro: card: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil
<?xml version="1.0"?>

<!-- Initializes a client of a cache server that runs on port 40404.
     Loads values and sends updates to the server.  -->

<!DOCTYPE client-cache PUBLIC
  "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.5//EN"
  "http://www.gemstone.com/dtd/cache6_5.dtd">
<client-cache>
  <pool name="client1" subscription-enabled="true" subscription-redundancy="0">
    <server host="localhost" port="40404"/>
  </pool>
  <region name="exampleRegion">
    <region-attributes refid="CACHING_PROXY" statistics-enabled="true">
        <cache-listener>
      		<class-name>quickstart.DeltaSimpleListener</class-name>
      	</cache-listener>
    </region-attributes>
  </region>
</client-cache>
Error formatting macro: card: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil
package quickstart;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import com.gemstone.gemfire.cache.client.ClientCache;
import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.Region;

/**
 * <p>
 * DeltaPropagationClient.java has the client which connects to the
 * DeltaPropagationServer.java.
 * </p>
 * 
 * <p>
 * Ensure that before running this class the Server is up.
 * </p>
 * 
 * <p>
 * Prerequisites: Build the QuickStart(./build.sh compile-quickstart) and set
 * the CLASSPATH to include $GEMFIRE/quickstart/classes
 * </p>
 * 
 * <p>
 * Following are the steps to test the Delta Propagation Functionality.
 * </p>
 * 
 * <ol>
 * <li>Start DeltaPropagationServer</li>
 * <li>Start DeltaPropagationClient Reciever</li>
 * <li>Start DeltaPropagationClient Feeder</li>
 * <li>Press Enter in the Producer Window to feed
 * <ul>
 * <li>The values passed to Server Cache.</li>
 * </oul> </li>
 * <li>Press Enter in Client Window to see the values and time taken in client
 * Cache. </li>
 * <p>
 * To stop the program, press "Ctrl c" in console.
 * </p>
 * 
 * @author GemStone Systems, Inc.
 */
public class DeltaPropagationClientFeeder {
  private static long firstCreate;
  private static final int FEED_CYCLES = 5;
  private static final int PUT_KEY_RANGE = 2;
  public static void main(String[] args) throws Exception {
    writeToStdout("Connecting to the distributed system and creating the cache.");

    // Create the cache which causes the cache-xml-file to be parsed
    ClientCache cache = new ClientCacheFactory()
      .set("name", "DeltaPropagationClient")
      .set("cache-xml-file", "xml/DeltaClient1.xml")
      .create();
    Region reg = cache.getRegion("exampleRegion");
    int valueSize = 10;
    /*int deltaPercent = 2;*/
    
    writeToStdout("Delta is 50%.");
    writeToStdout("Please Enter to start the feeder.");
    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
    bufferedReader.readLine();
    
    for (int i = 0; i < FEED_CYCLES; i++) {
      for (int j = 0; j < PUT_KEY_RANGE; j++) {
        DeltaObj value = new DeltaObj();
        value.setObj(i);
        if (firstCreate == 0) {
          firstCreate = System.currentTimeMillis();
        }
        reg.put(j, value);
      }
    }
    
    reg.put("LAST_KEY", firstCreate);
    cache.close();
  }

  private static void writeToStdout(String msg) {
    System.out.println(msg);
  }
}
Error formatting macro: card: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil
<?xml version="1.0"?>

<!-- Initializes a client of a cache server that runs on port 40450.
     Loads values and sends updates to the server.  -->

<!DOCTYPE client-cache PUBLIC
  "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.5//EN"
  "http://www.gemstone.com/dtd/cache6_5.dtd">
<client-cache>
  <pool name="client2" subscription-enabled="true" subscription-redundancy="0">
    <server host="localhost" port="40404"/>
  </pool>
  <region name="exampleRegion">
    <region-attributes refid="CACHING_PROXY" statistics-enabled="true">
    	<cache-listener>
      		<class-name>quickstart.DeltaReceiverListener</class-name>
      	</cache-listener>
    </region-attributes>
  </region>
</client-cache>
Error formatting macro: card: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil
package quickstart;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import com.gemstone.gemfire.cache.client.ClientCache;
import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.Region;

public class DeltaPropagationClientReceiver {
  
  public static void main(String[] args) throws Exception {
    writeToStdout("Connecting to the distributed system and creating the cache.");
    // Create the cache which causes the cache-xml-file to be parsed
    ClientCache cache = new ClientCacheFactory()
      .set("name", "DeltaPropagationClient")
      .set("cache-xml-file", "xml/DeltaClient2.xml")
      .create();
    Region reg = cache.getRegion("exampleRegion");
    reg.registerInterest("ALL_KEYS");
    writeToStdout("Please Enter to stop the receiver.");
    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
    bufferedReader.readLine();
    cache.close();
  }

  /*
   * private static void writeToStdout() {
   * System.out.println("[DeltaPropagationClientFeeder]"); }
   */

  private static void writeToStdout(String msg) {
    //System.out.print("[DeltaPropagationClientReceiver] ");
    System.out.println(msg);
  }
}
Error formatting macro: card: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil
package quickstart;

import java.util.Properties;

import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
/**
 * Delta Propagation quick start cache listener for simple uses
 * @author aingle
 * @since 6.1
 */
public class DeltaSimpleListener extends CacheListenerAdapter implements Declarable{
  /**
   * Processes an afterCreate event.
   * 
   * @param event
   *          The afterCreate <code>EntryEvent</code> received
   */
  @Override
  public void afterCreate(EntryEvent event) {
    processEvent("afterCreate", event);
  }

  /**
   * Processes an afterUpdate event.
   * 
   * @param event
   *          The afterUpdate <code>EntryEvent</code> received
   */
  @Override
  public void afterUpdate(EntryEvent event) {
    processEvent("afterUpdate", event);
  }

  protected void processEvent(String operation, EntryEvent event) {
    if (!event.getKey().equals("LAST_KEY"))
      System.out.println("ServerListener received " + operation + " Region : "
          + event.getRegion().getName() + ": " + event.getKey() + "->"
          + ((DeltaObj)event.getNewValue()).toString());
  }

  public void init(Properties props) {
    // TODO Auto-generated method stub
    
  }
}
Error formatting macro: card: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil
package quickstart;

import java.util.Properties;

import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
/**
 * Delta Propagation quick start receiver cache listener
 * @author aingle
 * @since 6.1
 */
public class DeltaReceiverListener extends CacheListenerAdapter implements Declarable {
  private static long lastUpdateTime;
  private static final int FEED_CYCLES = 5;
  private static final int PUT_KEY_RANGE = 2;
  /**
   * Processes an afterCreate event.
   * 
   * @param event
   *          The afterCreate <code>EntryEvent</code> received
   */
  @Override
  public void afterCreate(EntryEvent event) {
    if (event.getKey().equals("LAST_KEY")) {
      Long starttime = (Long)event.getNewValue();
      GemFireCacheImpl.getInstance().getLogger().fine(
          "Avg time taken for " + FEED_CYCLES * PUT_KEY_RANGE + " operations: "
              + (lastUpdateTime - starttime.longValue())
              / (FEED_CYCLES * PUT_KEY_RANGE) +"ms");
    }
    processEvent("afterCreate", event);
  }

  /**
   * Processes an afterUpdate event.
   * 
   * @param event
   *          The afterUpdate <code>EntryEvent</code> received
   */
  @Override
  public void afterUpdate(EntryEvent event) {
    lastUpdateTime = System.currentTimeMillis();
    processEvent("afterUpdate", event);
  }

  protected void processEvent(String operation, EntryEvent event) {
    if (!event.getKey().equals("LAST_KEY"))
      System.out.println("ServerListener received " + operation + " Region : "
          + event.getRegion().getName() + ": " + event.getKey() + "->"
          + ((DeltaObj)event.getNewValue()).toString());
  }

  public void init(Properties props) {
    // TODO Auto-generated method stub
    
  }
}

Related Javadocs

Delta, ClientCacheFactory

Enter labels to add to this page:
Please wait 
Looking for a label? Just start typing.