The purpose of the JDBCRowLoader class is to provide sample code that loads data from archive relational data stores.
The JDBCRowLoader has the following features:
- It can be used for any JDBC data source (provided the driver is available in the classpath of the server)
- It can be used for any table, although a separate instance of the RowLoader is created for each table
- It will pool JDBC Connections and PreparedStatements, with a configurable minimum and maximum number of connections
- It uses the Connection.isReadOnly(true) setting to request the driver to optimize the transaction settings for reads
The query-string parameter is passed to the JDBCRowLoader to tell it what SQL statement to use against the archive database. This is a required parameter.
If the column layout of the archive table matches the column layout of the SQLFabric table, you may use SELECT * in the query string.
If the column layout of the archive table does not match the column layout of the SQLFabric table, you must explicitly provide and order the column names in the SELECT statement so that the result set will match the layout of the SQLFabric table.
There is no requirement that the schema or table name in SQLFabric match the schema and/or table name in the archive database.
The elements of the primary key will be passed into the JDBCRowLoader when it is invoked, in the order that the columns are defined in the SQLFabric table. They will be passed as parameters into the PreparedStatement in that order, so you must structure the WHERE clause of the query-string so that the elements are passed in the correct order.
The JDBCRowLoader is designed to work on a SQL table that is defined as follows:
Accepted parameters with examples:
- url=jdbc:oracle:thin:@localhost:1521:XE (required)
- query-string=SELECT * FROM LoaderArchive WHERE id=? AND name=? (required)
- user=app
- password=app
- min-connections=1
- max-connections=5
- connection-timeout=3000 (milliseconds)
All properties are also passed to the JDBC connection when it is created. See http://publib.boulder.ibm.com/infocenter/iseries/v5r4/index.jsp?topic=/rzaha/conprop.htm for a list of typical properties.
If the user and password are included in the connection string, omit user and password properties.
The following is the Java code for JDBCRowLoader. You will need the sqlfabric.jar and gemfire.jar in your classpath to compile and use this loader. You may also want to modify the logging and exception handling routines to match your environment.
package example.gemstone.sqlfabric.callbacks;
import java.sql.*;
import java.util.*;
import java.util.concurrent.*;
import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.sqlfabric.callbacks.RowLoader;
public class JDBCRowLoader implements RowLoader {
public static final int VENDOR_CODE_ARCHIVE_ERROR = 0;
public static final int VENDOR_CODE_TIMEOUT = 1;
private final Queue<PreparedStatement[]> waitingQueries = new LinkedList<PreparedStatement[]>();
private final Queue<PreparedStatement> availableStatements = new LinkedList<PreparedStatement>();
private String url = "";
private String user = "";
private String password = "";
private String queryString = "";
private final Properties props = new Properties();
private int minConnections = 0;
private int maxConnections = 1;
private Integer connectionCount = 0;
private long connectionTimeout = 3000;
private final ExecutorService backgroundExecutor = Executors.newCachedThreadPool();
private final LogWriter log = CacheFactory.getAnyInstance().getLogger();
public static JDBCRowLoader create() {
return new JDBCRowLoader();
}
public void init(Object[] params) {
log.entering("example.gemstone.sqlfabric.loader.JDBCRowLoader", "init()");
for (int i=0; i < params.length; i++) {
String parameter = (String) params[i];
int equalsIndex = parameter.indexOf('=');
if ((equalsIndex > 0) & (parameter.length() > equalsIndex + 1)){
String key = parameter.substring(0, equalsIndex).trim();
String value = parameter.substring(equalsIndex + 1).trim();
props.put(key, value);
}
}
this.url = props.getProperty("url", "");
this.user = props.getProperty("user" , "");
this.password = props.getProperty("password" , "");
this.queryString = props.getProperty("query-string" , "");
this.minConnections = Integer.parseInt(props.getProperty("min-connections" , "1"));
this.maxConnections = Integer.parseInt(props.getProperty("max-connections" , "1"));
this.connectionTimeout = Long.parseLong(props.getProperty("connection-timeout" , "3000"));
char[] pwdXChars = new char[password.length()];
for (int i=0; i< password.length(); i++) {
pwdXChars[i] = 'x';
}
if (log.infoEnabled()) {
log.info("JDBCRowLoader initialized.");
log.info(" connection url: " + url);
log.info(" user : " + user);
log.info(" password : " + String.copyValueOf(pwdXChars));
log.info(" query string : " + queryString);
}
for(int i = 0; i < minConnections; i++) {
synchronized (connectionCount) {
connectionCount = connectionCount +1;
}
StatementCreator creator = new StatementCreator();
this.backgroundExecutor.execute(creator);
}
}
public Object getRow(String schema, String table, Object[] params) throws SQLException {
log.entering("example.gemstone.sqlfabric.loader.JDBCRowLoader", "getRow(String schema, String table, Object[] params)");
if (log.infoEnabled()) {
log.info("JDBCRowLoader invoked to fetch from schema <" + schema + "> on table <" + table + ">." );
for(int i = 0; i < params.length; i++) {
log.info(" primary key element " + i + ": " + params[i]);
}
}
PreparedStatement[] holder = new PreparedStatement[1];
synchronized (holder) {
this.getPooledStatement(holder);
if (holder[0] == null) {
try {
holder.wait(connectionTimeout);
} catch (InterruptedException e) {
log.warning("JDBCRowLoader interrupted while waiting for an available pooled statement.");
log.warning(e);
Thread.currentThread().interrupt();
}
}
}
PreparedStatement pstmt = holder[0];
if (pstmt == null) {
throw new SQLException("Timeout waiting for pooled connection to archive database", "08001",
VENDOR_CODE_TIMEOUT);
}
try {
for (int i = 0; i < params.length; i++) {
pstmt.setObject(i + 1, params[i]);
}
ResultSet result = pstmt.executeQuery();
log.info("Query succeeded");
recyclePooledStatement(pstmt);
return result;
} catch (SQLException e) {
releasePooledStatement(pstmt);
log.error("Error executing prepared statement in JDBCRowLoader");
log.error(e);
throw new SQLException("Error executing query from archive database", e.getSQLState(),
VENDOR_CODE_ARCHIVE_ERROR, e);
}
}
private synchronized void getPooledStatement(PreparedStatement[] holder) {
holder[0] = this.availableStatements.poll();
if (holder[0] == null) {
this.waitingQueries.add(holder);
synchronized (connectionCount) {
if ((connectionCount) < maxConnections) {
connectionCount = connectionCount +1;
StatementCreator creator = new StatementCreator();
this.backgroundExecutor.execute(creator);
}
}
}
}
private synchronized void returnPooledStatement(PreparedStatement pstmt) {
PreparedStatement[] holder = this.waitingQueries.poll();
if (holder == null) {
this.availableStatements.offer(pstmt);
} else {
synchronized (holder) {
holder[0] = pstmt;
holder.notify();
}
}
}
private void recyclePooledStatement(PreparedStatement pstmt) {
StatementRecycler recycler = new StatementRecycler(pstmt);
this.backgroundExecutor.execute(recycler);
}
private void releasePooledStatement(PreparedStatement pstmt) {
StatementReleaser releaser = new StatementReleaser(pstmt);
this.backgroundExecutor.execute(releaser);
}
private class StatementCreator implements Runnable {
public void run() {
if (url.isEmpty()) {
log.error("Connection url not provided for JDBCRowLoader");
return;
}
try {
Connection con = DriverManager.getConnection(url, props);
log.info(" Successful connection to target database: " + url);
con.setReadOnly(true);
PreparedStatement pstmt = con.prepareStatement(queryString);
recyclePooledStatement(pstmt);
} catch (SQLException e) {
synchronized (connectionCount) {
connectionCount = connectionCount - 1;
}
log.error("Error connecting to target database");
log.error(e);
}
}
}
private class StatementRecycler implements Runnable {
private PreparedStatement pstmt;
StatementRecycler (PreparedStatement target) {
this.pstmt = target;
}
public void run() {
try {
pstmt.clearParameters();
returnPooledStatement(pstmt);
} catch (SQLException e) {
releasePooledStatement(pstmt);
log.warning(e);
}
}
}
private class StatementReleaser implements Runnable {
private PreparedStatement pstmt;
StatementReleaser (PreparedStatement target) {
this.pstmt = target;
}
public void run() {
try {
synchronized (connectionCount) {
connectionCount = connectionCount - 1;
}
pstmt.getConnection().close();
} catch (SQLException e) {
log.warning(e);
}
}
}
}