Function Execution Example

Error formatting macro: composition-setup: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil

Function execution allows you to move function behavior to the application member hosting the the data. For large data sets, this can provide much better performance than moving the data to the application that needs to run the function. Read more about function execution.

Running the Example

In this example of peer-to-peer function execution, one member sends a request for function execution to a peer member. FunctionExecutionPeer2 creates a region, populates the region and sends a function execution request to FunctionExecutionPeer1 while simultaneously executing the function on its own region. It collects the result from its own execution as well as from FunctionExecutionPeer1. The function executed is programmed in MultiGetFunction, as an implementation of GemFire's FunctionAdapter. The results from the function are handled by MyArrayListResultCollector, which is an implementation of GemFire's ResultCollector.

To run this example, you must have terminal sessions configured for the QuickStart examples, as described in Setting Up the Environment.
  1. In one session, start the first member:
    $ java quickstart.FunctionExecutionPeer1
    
  2. When the first VM tells you to do so, start the second member in another session:
    $ java quickstart.FunctionExecutionPeer2
    

Example Listings

Program and cache configuration file listings for the function execution on VMs 1 and 2.  Both VMs use the same cache XML configuration file.

Error formatting macro: deck: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil
Error formatting macro: card: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil
<?xml version="1.0"?>
<!DOCTYPE cache PUBLIC
    "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.5//EN"
    "http://www.gemstone.com/dtd/cache6_5.dtd">

<!--
  | FunctionExecutionpeer.xml
  |
 -->
<cache>
    <region name="exampleRegion">
        <region-attributes refid="PARTITION">
            <partition-attributes local-max-memory="50" total-num-buckets="13" />
        </region-attributes>
    </region>
</cache>
Error formatting macro: card: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil
package quickstart;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.execute.FunctionService;

/**
 * This is the peer to which FunctionExecutionPeer2 connects for function execution.
 * This peer executes the function execution request and returns the results to
 * the requesting peer
 * 
 * @author GemStone Systems, Inc.
 * @since 6.0
 */

public class FunctionExecutionPeer1 {

  public static final String EXAMPLE_REGION_NAME = "exampleRegion";

  private final BufferedReader stdinReader;

  public FunctionExecutionPeer1() {
    this.stdinReader = new BufferedReader(new InputStreamReader(System.in));
  }

  public static void main(String[] args) throws Exception {
    new FunctionExecutionPeer1().run();
  }

  public void run() throws Exception {

    writeToStdout("Peer to which other peer sends request for function Execution");
    writeToStdout("Connecting to the distributed system and creating the cache... ");

    // Create the cache which causes the cache-xml-file to be parsed
    Cache cache = new CacheFactory()
      .set("name", "FunctionExecutionPeer1")
      .set("cache-xml-file", "xml/FunctionExecutionPeer.xml")
      .create();

    // Get the exampleRegion
    Region<String, String> exampleRegion = cache.getRegion(EXAMPLE_REGION_NAME);
    writeToStdout("Example region \"" + exampleRegion.getFullPath()
        + "\" created in cache.");

    writeToStdout("Registering the function MultiGetFunction on Peer");
    MultiGetFunction function = new MultiGetFunction();
    FunctionService.registerFunction(function);

    writeToStdout("Please start Other Peer And Then Press Enter to continue.");
    stdinReader.readLine();
    
    System.out.println("Closing the cache and disconnecting.");
    cache.close();
  }

  private static void writeToStdout(String msg) {
   // System.out.print("[FunctionExecutionPeer1] ");
    System.out.println(msg);
  }
}
Error formatting macro: card: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil
package quickstart;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.execute.Execution;
import com.gemstone.gemfire.cache.execute.FunctionService;
import com.gemstone.gemfire.cache.execute.ResultCollector;

/**
 * In this example of peer-to-peer function execution, one peer sends a request
 * for function execution to another peer. FunctionExecutionPeer2 creates a region,
 * populates the region and sends a function execution request to FunctionExecutionPeer1
 * while simultaneously executing the function on its own region.
 * It collects the result from its own execution as well as from FunctionExecutionPeer1.
 * Please refer to the quickstart guide for instructions on how to run this
 * example.
 * 
 * @author GemStone Systems, Inc.
 * @since 6.0
 */

public class FunctionExecutionPeer2 {

  public static final String EXAMPLE_REGION_NAME = "exampleRegion";

  private final BufferedReader stdinReader;

  public FunctionExecutionPeer2() {
    this.stdinReader = new BufferedReader(new InputStreamReader(System.in));
  }

  public static void main(String[] args) throws Exception {
    new FunctionExecutionPeer2().run();
  }

  public void run() throws Exception {

    writeToStdout("Peer sending function Execution request to other peer as well as executing function on its own region");

    writeToStdout("Connecting to the distributed system and creating the cache... ");

    // Create the cache which causes the cache-xml-file to be parsed
    Cache cache = new CacheFactory()
      .set("name", "FunctionExecutionPeer2")
      .set("cache-xml-file", "xml/FunctionExecutionPeer.xml")
      .create();

    // Get the exampleRegion
    Region<String, String> exampleRegion = cache.getRegion(EXAMPLE_REGION_NAME);
    writeToStdout("Example region \"" + exampleRegion.getFullPath()
        + "\" created in cache.");

    // Populate the region
    for (int i = 0; i < 20; i++) {
      exampleRegion.put("KEY_" + i, "VALUE_" + i);
    }
    writeToStdout("Example region \"" + exampleRegion.getFullPath()
        + "\" is populated.");
    
    writeToStdout("Press Enter to continue.");
    stdinReader.readLine();

    writeToStdout("Executing Function : MultiGetFunction on region \""
        + exampleRegion.getFullPath()
        + "\" with filter size " + 3 + " and with MyArrayListResultCollector.");
    MultiGetFunction function = new MultiGetFunction();
    FunctionService.registerFunction(function);
    
    writeToStdout("Press Enter to continue.");
    stdinReader.readLine();
    
    Set<String> keysForGet = new HashSet<String>();
    keysForGet.add("KEY_4");
    keysForGet.add("KEY_9");
    keysForGet.add("KEY_7");
    Execution execution = FunctionService.onRegion(exampleRegion).withFilter(
        keysForGet).withArgs(Boolean.TRUE).withCollector(
        new MyArrayListResultCollector());
    ResultCollector rc = execution.execute(function);

    writeToStdout("Function executed successfully. Now getting the result");
    
    List result = (List)rc.getResult();
    writeToStdout("Got result with size " + result.size() + ".");
    writeToStdout("Press Enter to continue.");
    stdinReader.readLine();
    
    System.out.println("Closing the cache and disconnecting.");
    cache.close();
  }

  private static void writeToStdout(String msg) {
    System.out.println(msg);
  }
}
Error formatting macro: card: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil
package quickstart;

import java.io.Serializable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.cache.execute.FunctionAdapter;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;

/**
 * Application Function to retrieve values for multiple keys in a region
 * 
 * @author Gemstone Systems Inc
 * @since 6.0
 * 
 */
public class MultiGetFunction extends FunctionAdapter {

  public void execute(FunctionContext fc) {
    if(fc instanceof RegionFunctionContext){
    RegionFunctionContext context = (RegionFunctionContext)fc;
    Set keys = context.getFilter();
    Set keysTillSecondLast = new HashSet();
    int setSize = keys.size();
    Iterator keysIterator = keys.iterator();
    for (int i = 0; i < (setSize - 1); i++) {
      keysTillSecondLast.add(keysIterator.next());
    }
    for (Object k : keysTillSecondLast) {
      context.getResultSender().sendResult(
          (Serializable)PartitionRegionHelper.getLocalDataForContext(context)
              .get(k));
    }
    Object lastResult = keysIterator.next();
    context.getResultSender().lastResult(
        (Serializable)PartitionRegionHelper.getLocalDataForContext(context)
            .get(lastResult));
    }else {
      fc.getResultSender().lastResult(Runtime.getRuntime().freeMemory()/(1024*1024)); 
    }
  }

  public String getId() {
    return getClass().getName();
  }
}
Error formatting macro: card: java.lang.NoClassDefFoundError: net/customware/confluence/plugin/composition/CompositionUtil
package quickstart;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.execute.ResultCollector;
import com.gemstone.gemfire.distributed.DistributedMember;

/**
 * 
 * MyArrayListResultCollector gathers result from all the function execution
 * nodes.<br>
 * Using a custom ResultCollector a user can sort/aggregate the result. This
 * implementation stores the result in a List. The size of the list will be same
 * as the no of nodes on which a function got executed
 * 
 * @author Gemstone Systems Inc
 * @since 6.0
 * 
 */
public class MyArrayListResultCollector implements
    ResultCollector<Serializable,Serializable> {

  final ArrayList<Serializable> result = new ArrayList<Serializable>();

  /**
   * Adds a single function execution result from a remote node to the
   * ResultCollector
   * 
   * @param resultOfSingleExecution
   * @param memberID
   */
  public void addResult(DistributedMember memberID,
      Serializable resultOfSingleExecution) {
    this.result.add(resultOfSingleExecution);
  }

  /**
   * Waits if necessary for the computation to complete, and then retrieves its
   * result.<br>
   * If {@link Function#hasResult()} is false, upon calling
   * {@link ResultCollector#getResult()} throws {@link FunctionException}.
   * 
   * @return the Serializable computed result
   * @throws FunctionException
   *           if something goes wrong while retrieving the result
   */
  public Serializable getResult() throws FunctionException {
    return this.result;
  }

  /**
   * Waits if necessary for at most the given time for the computation to
   * complete, and then retrieves its result, if available. <br>
   * If {@link Function#hasResult()} is false, upon calling
   * {@link ResultCollector#getResult()} throws {@link FunctionException}.
   * 
   * @param timeout
   *          the maximum time to wait
   * @param unit
   *          the time unit of the timeout argument
   * @return Serializable computed result
   * @throws FunctionException
   *           if something goes wrong while retrieving the result
   */
  public Serializable getResult(long timeout, TimeUnit unit)
      throws FunctionException, InterruptedException {
    return this.result;
  }

  /**
   * GemFire will invoke this method before re-executing function (in case of
   * Function Execution HA) This is to clear the previous execution results from
   * the result collector
   * 
   * @since 6.3
   */
  public void clearResults() {
    result.clear();
  }
  
  /**
   * Call back provided to caller, which is called after function execution is
   * complete and caller can retrieve results using
   * {@link ResultCollector#getResult()}
   * 
   */
  public void endResults() {}

}

Related Javadocs

execute package

Enter labels to add to this page:
Please wait 
Looking for a label? Just start typing.