Access Keys:
Skip to content (Access Key - 0)

Release Notes

Getting Started

Concepts

Developer's Guide

Tools and Utilities

Reference

Toggle Sidebar
You are viewing information for a product in a pre-release state. The released product and the documentation may differ significantly from the version described here

JDBCRowLoader

The purpose of the JDBCRowLoader class is to provide sample code that loads data from archive relational data stores.

The JDBCRowLoader has the following features:

  • It can be used for any JDBC data source (provided the driver is available in the classpath of the server)
  • It can be used for any table, although a separate instance of the RowLoader is created for each table
  • It will pool JDBC Connections and PreparedStatements, with a configurable minimum and maximum number of connections
  • It uses the Connection.isReadOnly(true) setting to request the driver to optimize the transaction settings for reads

The query-string parameter is passed to the JDBCRowLoader to tell it what SQL statement to use against the archive database. This is a required parameter.
If the column layout of the archive table matches the column layout of the SQLFabric table, you may use SELECT * in the query string.
If the column layout of the archive table does not match the column layout of the SQLFabric table, you must explicitly provide and order the column names in the SELECT statement so that the result set will match the layout of the SQLFabric table.
There is no requirement that the schema or table name in SQLFabric match the schema and/or table name in the archive database.

The elements of the primary key will be passed into the JDBCRowLoader when it is invoked, in the order that the columns are defined in the SQLFabric table. They will be passed as parameters into the PreparedStatement in that order, so you must structure the WHERE clause of the query-string so that the elements are passed in the correct order.

The JDBCRowLoader is designed to work on a SQL table that is defined as follows:

CREATE TABLE table-name
 {({ column-definition | table-constraint }
  [ , { column-definition | table-constraint } ] * ) }
 LOADER (example.gemstone.sqlfabric.loader.JDBCRowLoader.create '<parameter name>=<value>' [, '<parameter name>=<value>'] * )

Accepted parameters with examples:

  • url=jdbc:oracle:thin:@localhost:1521:XE (required)
  • query-string=SELECT * FROM LoaderArchive WHERE id=? AND name=? (required)
  • user=app
  • password=app
  • min-connections=1
  • max-connections=5
  • connection-timeout=3000 (milliseconds)

All properties are also passed to the JDBC connection when it is created. See http://publib.boulder.ibm.com/infocenter/iseries/v5r4/index.jsp?topic=/rzaha/conprop.htm for a list of typical properties.
If the user and password are included in the connection string, omit user and password properties.


The following is the Java code for JDBCRowLoader. You will need the sqlfabric.jar and gemfire.jar in your classpath to compile and use this loader. You may also want to modify the logging and exception handling routines to match your environment.


JDBCRowLoader.java
package example.gemstone.sqlfabric.callbacks;

import java.sql.*;
import java.util.*;
import java.util.concurrent.*;

import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.sqlfabric.callbacks.RowLoader;

public class JDBCRowLoader implements RowLoader {

    public static final int VENDOR_CODE_ARCHIVE_ERROR = 0;
    public static final int VENDOR_CODE_TIMEOUT = 1;

    private final Queue<PreparedStatement[]> waitingQueries = new LinkedList<PreparedStatement[]>();
    private final Queue<PreparedStatement> availableStatements = new LinkedList<PreparedStatement>();
    
    private String url = "";
    private String user = "";
    private String password = "";
    private String queryString = "";
	
    private final Properties props = new Properties();
    private int minConnections = 0;
    private int maxConnections = 1;
    private Integer connectionCount = 0;
    private long connectionTimeout = 3000;
    private final ExecutorService backgroundExecutor = Executors.newCachedThreadPool();
	
    private final LogWriter log = CacheFactory.getAnyInstance().getLogger();
	
    public static JDBCRowLoader create() {
        return new JDBCRowLoader();
    }

    public void init(Object[] params) {
		
        log.entering("example.gemstone.sqlfabric.loader.JDBCRowLoader", "init()");

        for (int i=0; i < params.length; i++) {
            String parameter = (String) params[i];
            int equalsIndex = parameter.indexOf('=');
            if ((equalsIndex > 0) & (parameter.length() > equalsIndex + 1)){
                String key = parameter.substring(0, equalsIndex).trim();
                String value = parameter.substring(equalsIndex + 1).trim();
                props.put(key, value);
            }
        }
	
        this.url = props.getProperty("url", "");
        this.user = props.getProperty("user" , "");
        this.password = props.getProperty("password" , "");
        this.queryString = props.getProperty("query-string" , "");
        this.minConnections = Integer.parseInt(props.getProperty("min-connections" , "1"));
        this.maxConnections = Integer.parseInt(props.getProperty("max-connections" , "1"));
        this.connectionTimeout = Long.parseLong(props.getProperty("connection-timeout" , "3000"));

        char[] pwdXChars = new char[password.length()];
        for (int i=0; i< password.length(); i++) {
            pwdXChars[i] = 'x';
        }

        if (log.infoEnabled()) {
            log.info("JDBCRowLoader initialized.");
            log.info("   connection url: " + url);
            log.info("   user          : " + user);
            log.info("   password      : " + String.copyValueOf(pwdXChars));
            log.info("   query string  : " + queryString);
        }

        for(int i = 0; i < minConnections; i++) {
            synchronized (connectionCount) {
                connectionCount = connectionCount +1;
            }
            StatementCreator creator = new StatementCreator();
            this.backgroundExecutor.execute(creator);
        }
    }

    public Object getRow(String schema, String table, Object[] params) throws SQLException {
        log.entering("example.gemstone.sqlfabric.loader.JDBCRowLoader", "getRow(String schema, String table, Object[] params)");
        if (log.infoEnabled()) {
            log.info("JDBCRowLoader invoked to fetch from schema <" + schema + "> on table <" + table + ">." );
            for(int i = 0; i < params.length; i++) {
                log.info("  primary key element " + i + ": " + params[i]);
            }
        }

        PreparedStatement[] holder = new PreparedStatement[1];
        synchronized (holder) {	
            this.getPooledStatement(holder);
            if (holder[0] == null) {
                try {
                    holder.wait(connectionTimeout);
                } catch (InterruptedException e) {
                    log.warning("JDBCRowLoader interrupted while waiting for an available pooled statement.");
                    log.warning(e);
                    Thread.currentThread().interrupt();
                }
            }
        }
        PreparedStatement pstmt = holder[0];
        if (pstmt == null) {
            throw new SQLException("Timeout waiting for pooled connection to archive database", "08001",
                                    VENDOR_CODE_TIMEOUT);
        }

        try {
            for (int i = 0; i < params.length; i++) {
                pstmt.setObject(i + 1, params[i]);
            }
            ResultSet result = pstmt.executeQuery();
            // even if this result set is empty (i.e. no row found), just return the empty result set
            log.info("Query succeeded");
            recyclePooledStatement(pstmt);
            return result;
        } catch (SQLException e) {
            // throw away the pooled statement, just in case it was the problem
            releasePooledStatement(pstmt);
            log.error("Error executing prepared statement in JDBCRowLoader");
            log.error(e);
            throw new SQLException("Error executing query from archive database", e.getSQLState(),
                                    VENDOR_CODE_ARCHIVE_ERROR, e);
        }
    }
	
    private synchronized void getPooledStatement(PreparedStatement[] holder) {
        // Take the next available prepared statement. If there isn't one,
        // add the holder to the list of waiting queries.
        // The calling thread must call holder.wait() if it finds the holder empty.
        // holder.notify() will be called when a prepared statement is available
        // and put into the holder.
		
        holder[0] = this.availableStatements.poll();
        if (holder[0] == null) {
            this.waitingQueries.add(holder);
            synchronized (connectionCount) {
                if ((connectionCount) < maxConnections) {
                    connectionCount = connectionCount +1;
                    StatementCreator creator = new StatementCreator();
                    this.backgroundExecutor.execute(creator);
                }
            }
        }
    }
	
    private synchronized void returnPooledStatement(PreparedStatement pstmt) {
        // Check to see if there are queries waiting on a statement.
        // If not, add the statement back into the pool.
        PreparedStatement[] holder = this.waitingQueries.poll();
        if (holder == null) {
            this.availableStatements.offer(pstmt);
        } else {
            synchronized (holder) {
                holder[0] = pstmt;
                holder.notify();
            }
        }
    }

    private void recyclePooledStatement(PreparedStatement pstmt) {
        StatementRecycler recycler = new StatementRecycler(pstmt);
        this.backgroundExecutor.execute(recycler);
    }

    private void releasePooledStatement(PreparedStatement pstmt) {
        StatementReleaser releaser = new StatementReleaser(pstmt);
        this.backgroundExecutor.execute(releaser);
    }

    private class StatementCreator implements Runnable {
        public void run() {
            if (url.isEmpty()) {
                log.error("Connection url not provided for JDBCRowLoader");
                return;
            }		
            try {
                Connection con = DriverManager.getConnection(url, props);
                log.info(" Successful connection to target database: " + url);
                con.setReadOnly(true);
                PreparedStatement pstmt = con.prepareStatement(queryString);
                recyclePooledStatement(pstmt);				
            } catch (SQLException e) {
                // Connection count is incremented when the job is scheduled.
                // Since it has failed, decrement the counter
                synchronized (connectionCount) {
                    connectionCount = connectionCount - 1;
                }
                log.error("Error connecting to target database");
                log.error(e);
            }		
        }
    }

    private class StatementRecycler implements Runnable {
        private PreparedStatement pstmt;
        StatementRecycler (PreparedStatement target) {
            this.pstmt = target;
        }
        public void run() {
            try {
                pstmt.clearParameters();
                returnPooledStatement(pstmt);
            } catch (SQLException e) {
                releasePooledStatement(pstmt);
                log.warning(e);
            }		
        }
    }

    private class StatementReleaser implements Runnable {
        private PreparedStatement pstmt;
        StatementReleaser (PreparedStatement target) {
            this.pstmt = target;
        }
        public void run() {
            try {
                synchronized (connectionCount) {
                    connectionCount = connectionCount - 1;
                }
                pstmt.getConnection().close();
            } catch (SQLException e) {
                log.warning(e);
            }	
        }
    }
}

Child Pages

Enter labels to add to this page:
Please wait 
Looking for a label? Just start typing.
Adaptavist Theme Builder (3.4.0-M2-conf210) Powered by Atlassian Confluence 2.10.3, the Enterprise Wiki.