package journal.action.sql; import java.sql.Clob; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.LinkedHashSet; import java.util.Set; import journal.reader.DataJournalEntry; import journal.reader.DataJournalEntry.Argument; import journal.schema.Attribute; import journal.schema.Domain; import journal.schema.Index; import journal.schema.TableVersion; public class SQLTableVersion { private String tableName; private SQLAdapter sql; private SupportedDatabase database; private int currentBatch = 0; private PreparedStatement insertStatement; private PreparedStatement updateStatement = null; private PreparedStatement deleteStatement; private PreparedStatement selectStatement = null; private Set<String> indexSet; private String attrPrefix; SQLTableVersion(SQLAdapter sql, TableVersion t) throws SQLException { this.sql = sql; this.database = sql.getDatabase(); this.tableName = t.getTable().getName().replace("db.", "db_"); attrPrefix = t.getTable().getName().replace("db.", "") + "_"; // create statements for : // - create table // - insert into // - update set where // - delete from where // - select from where // need to check table does already exist, otherwise create table indexSet = new LinkedHashSet<String>(); for (Index index : t.getTable().getIndeces()) { indexSet.add(index.getName()); } StringBuffer create = new StringBuffer("CREATE TABLE "); StringBuffer insert = new StringBuffer("INSERT INTO "); StringBuffer update = new StringBuffer("UPDATE "); StringBuffer delete = new StringBuffer("DELETE FROM "); StringBuffer select = new StringBuffer("SELECT "); create.append(tableName).append(" ("); insert.append(tableName).append(" ("); update.append(tableName).append(" SET "); delete.append(tableName).append(" WHERE "); String comma = ""; String nonIndexComma = ""; boolean nonIndexAttrExists = false; for (Attribute attr : t) { String attrName = attrPrefix + attr.getName(); create.append(comma). append(attrName). append(" ").append(getType(attr.getDomain())); insert.append(comma).append(attrName); if (! indexSet.contains(attr.getName())) { update.append(nonIndexComma).append(attrName).append(" = ?"); select.append(nonIndexComma).append(attrName); nonIndexAttrExists = true; nonIndexComma = ", "; } if (comma.equals("")) { comma = ", "; } } create.append(comma).append(" PRIMARY KEY ("); update.append(" WHERE "); select.append(" FROM ").append(tableName); select.append(" WHERE "); comma = ""; String and = ""; for (Index index : t.getTable().getIndeces()) { create.append(comma).append(attrPrefix).append(index.getName()); update.append(and).append(attrPrefix).append(index.getName()).append(" = ?"); delete.append(and).append(attrPrefix).append(index.getName()).append(" = ?"); select.append(and).append(attrPrefix).append(index.getName()).append(" = ?"); if (comma.equals("")) { comma = ", "; and = " AND "; } } create.append("))"); insert.append(") VALUES ( ?"); for (int i = 1; i < t.getAttributes().size(); i++) { insert.append(", ?"); } insert.append(")"); // System.out.println(create.toString().toUpperCase()); // System.out.println(insert.toString().toUpperCase()); // if (nonIndexAttrExists) System.out.println(update.toString().toUpperCase()); // System.out.println(delete.toString().toUpperCase()); // if (nonIndexAttrExists) System.out.println(select.toString().toUpperCase()); // create tables if they do not exist yet if (!sql.tableExists(tableName)) { Statement stmt = sql.getConnection().createStatement(); stmt.executeUpdate(create.toString()); } insertStatement = sql.getConnection().prepareStatement(insert.toString()); deleteStatement = sql.getConnection().prepareStatement(delete.toString()); if (nonIndexAttrExists) { updateStatement = sql.getConnection().prepareStatement(update.toString()); selectStatement = sql.getConnection().prepareStatement(select.toString()); } } private String getType(Domain domain) { return database.getMapping(domain); } void putValue(DataJournalEntry entry) throws Exception { int index = 0; // pre-filter the values if case-insensitive. // All indexed keys need to be lower case for (Object o : entry.getValues()) { insertStatement.setObject(++index, o); } if (sql.getBatchSize() > 1) { if (currentBatch < sql.getBatchSize()) { insertStatement.addBatch(); currentBatch++; return; // do not execute at this time } else { int total = currentBatch; int n = flushBatchUpdate(); if (n > 0 && n != total) { throw new Exception("PutValue for " + tableName + " did not insert batch " + n + " : " + total); } } } else { int n = insertStatement.executeUpdate(); if (n != 1) { throw new Exception("PutValue for " + tableName + " did not insert 1 value"); } } } int flushBatchUpdate() throws Exception { int n = 0; int total[] = insertStatement.executeBatch(); for (int i : total) { if (i > 0) n += i; else if (i != Statement.SUCCESS_NO_INFO) { throw new Exception("flashBatchUpdate reported error " + i); } } currentBatch = 0; return n; } private void setParameters(PreparedStatement statement, DataJournalEntry entry, int start) throws Exception { int i = start; for (String index : indexSet) { statement.setObject(++i, entry.getValue(index)); } } void replaceValue(DataJournalEntry entry) throws Exception { if (updateStatement == null) { throw new Exception("ReplaceValue called on table " + tableName); } int index = 0; for (Argument arg : entry.getArguments() ) { if (!indexSet.contains(arg.getAttribute().getName())) { updateStatement.setObject(++index, arg.getValue()); } } setParameters(updateStatement, entry, index); int n = updateStatement.executeUpdate(); if (n != 1) { throw new Exception("ReplaceValue for " + tableName + " did not update 1 value"); } } void deleteValue(DataJournalEntry entry ) throws Exception { setParameters(deleteStatement, entry, 0); int n = deleteStatement.executeUpdate(); if (n != 1) { throw new Exception("ReplaceValue for " + tableName + " did not update 1 value"); } } void verifyValue(DataJournalEntry entry) throws Exception { // OK, so this is way to general for the only usage we have, which // is checking the journal counter, but what the hell setParameters(selectStatement, entry, 0); if (selectStatement == null) { throw new Exception("VerifyValue called on table " + tableName); } ResultSet rs = selectStatement.executeQuery(); if (rs.next()) { for (Argument arg : entry.getArguments() ) { if (!indexSet.contains(arg.getAttribute().getName())) { Object result = rs.getObject(attrPrefix + arg.getAttribute().getName()); if (result instanceof Clob) { // Oracle stores TEXT as a CLOB, so I get a Clob object back // I "know" that the string will be short, so I load everything Clob clob = (Clob) result; String substring = clob.getSubString(1, (int)clob.length()); result = substring; clob.free(); } if (!result.equals(arg.getValue())) { throw new Exception("VerifyValue for " + tableName + ": expected " + arg.getValue() + " got " + result); } } } } else { // throw a wobbly throw new Exception("VerifyValue for " + tableName + " did not find value"); } rs.close(); } }
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#3 | 7529 | Sven Erik Knop |
Deleted the whole JournalReader. It has actually moved to //guest/sven_erik_knop/java/JournalReader/. |
||
#2 | 7427 | Sven Erik Knop |
Major update of the JournalReader: - New help (usage) system, which explains all options - Config file that allows storing of parameters in a file. These are necessary for the SQLLoader and Updater, because classpath, classname and connection parameters need to be set. (see the example *.cfg files provided) - Now tested with MySQL, SQLite and Oracle 10 XE |
||
#1 | 7375 | Sven Erik Knop |
Major update of the JournalReader. Complete rewrite of the command line parsing Change in the options parsing within the journal reader New SQLLoader action. Currently only against MySQL (needs MySQL JDBC driver) with fixed database and user name. This will be replaced by a config file at some stage. |