package journal.action.sql; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.LinkedHashSet; import java.util.Set; import journal.reader.DataJournalEntry; import journal.reader.DataJournalEntry.Argument; import journal.schema.Attribute; import journal.schema.Index; import journal.schema.TableVersion; public class SQLTableVersion { private String tableName; private SQLAdapter sql; private int currentBatch = 0; private PreparedStatement insertStatement; private PreparedStatement updateStatement = null; private PreparedStatement deleteStatement; private PreparedStatement selectStatement = null; private Set<String> indexSet; private String attrPrefix; SQLTableVersion(SQLAdapter sql, TableVersion t) throws SQLException { this.sql = sql; this.tableName = t.getTable().getName().replace("db.", "db_"); attrPrefix = t.getTable().getName().replace("db.", "") + "_"; // create statements for : // - create table // - insert into // - update set where // - delete from where // - select from where // need to check table does already exist, otherwise create table indexSet = new LinkedHashSet<String>(); for (Index index : t.getTable().getIndeces()) { indexSet.add(index.getName()); } StringBuffer create = new StringBuffer("CREATE TABLE "); StringBuffer insert = new StringBuffer("INSERT INTO "); StringBuffer update = new StringBuffer("UPDATE "); StringBuffer delete = new StringBuffer("DELETE FROM "); StringBuffer select = new StringBuffer("SELECT "); create.append(tableName).append(" ("); insert.append(tableName).append(" ("); update.append(tableName).append(" SET "); delete.append(tableName).append(" WHERE "); String comma = ""; String nonIndexComma = ""; boolean nonIndexAttrExists = false; for (Attribute attr : t) { String attrName = attrPrefix + attr.getName(); create.append(comma). append(attrName). append(" ").append(attr.getDomain().getJdbcType()); insert.append(comma).append(attrName); if (! indexSet.contains(attr.getName())) { update.append(nonIndexComma).append(attrName).append(" = ?"); select.append(nonIndexComma).append(attrName); nonIndexAttrExists = true; nonIndexComma = ", "; } if (comma.equals("")) { comma = ", "; } } create.append(comma).append(" CONSTRAINT PRIMARY KEY ("); update.append(" WHERE "); select.append(" FROM ").append(tableName); select.append(" WHERE "); comma = ""; String and = ""; for (Index index : t.getTable().getIndeces()) { create.append(comma).append(attrPrefix).append(index.getName()); update.append(and).append(attrPrefix).append(index.getName()).append(" = ?"); delete.append(and).append(attrPrefix).append(index.getName()).append(" = ?"); select.append(and).append(attrPrefix).append(index.getName()).append(" = ?"); if (comma.equals("")) { comma = ", "; and = " AND "; } } create.append("))"); insert.append(") VALUES ( ?"); for (int i = 1; i < t.getAttributes().size(); i++) { insert.append(", ?"); } insert.append(")"); // System.out.println(create.toString().toUpperCase()); // System.out.println(insert.toString().toUpperCase()); // if (nonIndexAttrExists) System.out.println(update.toString().toUpperCase()); // System.out.println(delete.toString().toUpperCase()); // if (nonIndexAttrExists) System.out.println(select.toString().toUpperCase()); // create tables if they do not exist yet if (!sql.tableExists(tableName)) { Statement stmt = sql.getConnection().createStatement(); stmt.executeUpdate(create.toString()); } insertStatement = sql.getConnection().prepareStatement(insert.toString()); deleteStatement = sql.getConnection().prepareStatement(delete.toString()); if (nonIndexAttrExists) { updateStatement = sql.getConnection().prepareStatement(update.toString()); selectStatement = sql.getConnection().prepareStatement(select.toString()); } } void putValue(DataJournalEntry entry) throws Exception { int index = 0; // pre-filter the values if case-insensitive. // All indexed keys need to be lower case for (Object o : entry.getValues()) { insertStatement.setObject(++index, o); } if (sql.getBatchSize() > 1) { if (currentBatch < sql.getBatchSize()) { insertStatement.addBatch(); currentBatch++; return; // do not execute at this time } else { int total = currentBatch; int n = flushBatchUpdate(); if (n != total) { throw new Exception("PutValue for " + tableName + " did not insert batch " + n + " : " + total); } } } else { int n = insertStatement.executeUpdate(); if (n != 1) { throw new Exception("PutValue for " + tableName + " did not insert 1 value"); } } } int flushBatchUpdate() throws Exception { int n = 0; int total[] = insertStatement.executeBatch(); for (int i : total) { n += i; } currentBatch = 0; return n; } private void setParameters(PreparedStatement statement, DataJournalEntry entry, int start) throws Exception { int i = start; for (String index : indexSet) { statement.setObject(++i, entry.getValue(index)); } } void replaceValue(DataJournalEntry entry) throws Exception { if (updateStatement == null) { throw new Exception("ReplaceValue called on table " + tableName); } int index = 0; for (Argument arg : entry.getArguments() ) { if (!indexSet.contains(arg.getAttribute().getName())) { updateStatement.setObject(++index, arg.getValue()); } } setParameters(updateStatement, entry, index); int n = updateStatement.executeUpdate(); if (n != 1) { throw new Exception("ReplaceValue for " + tableName + " did not update 1 value"); } } void deleteValue(DataJournalEntry entry ) throws Exception { setParameters(deleteStatement, entry, 0); int n = deleteStatement.executeUpdate(); if (n != 1) { throw new Exception("ReplaceValue for " + tableName + " did not update 1 value"); } } void verifyValue(DataJournalEntry entry) throws Exception { // OK, so these is way to general for the only usage we have which // is checking the journal counter, but what the hell setParameters(selectStatement, entry, 0); if (selectStatement == null) { throw new Exception("VerifyValue called on table " + tableName); } ResultSet rs = selectStatement.executeQuery(); if (rs.next()) { for (Argument arg : entry.getArguments() ) { if (!indexSet.contains(arg.getAttribute().getName())) { Object result = rs.getObject(attrPrefix + arg.getAttribute().getName()); if (!result.equals(arg.getValue())) { throw new Exception("VerifyValue for " + tableName + ": expected " + arg.getValue() + " got " + result); } } } } else { // throw a wobbly throw new Exception("VerifyValue for " + tableName + " did not find value"); } rs.close(); } }
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#3 | 7529 | Sven Erik Knop |
Deleted the whole JournalReader. It has actually moved to //guest/sven_erik_knop/java/JournalReader/. |
||
#2 | 7427 | Sven Erik Knop |
Major update of the JournalReader: - New help (usage) system, which explains all options - Config file that allows storing of parameters in a file. These are necessary for the SQLLoader and Updater, because classpath, classname and connection parameters need to be set. (see the example *.cfg files provided) - Now tested with MySQL, SQLite and Oracle 10 XE |
||
#1 | 7375 | Sven Erik Knop |
Major update of the JournalReader. Complete rewrite of the command line parsing Change in the options parsing within the journal reader New SQLLoader action. Currently only against MySQL (needs MySQL JDBC driver) with fixed database and user name. This will be replaced by a config file at some stage. |