package journal.action.sql; import java.sql.Clob; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.LinkedHashSet; import java.util.Set; import journal.reader.DataJournalEntry; import journal.reader.DataJournalEntry.Argument; import journal.schema.Attribute; import journal.schema.Domain; import journal.schema.Index; import journal.schema.TableVersion; public class SQLTableVersion { private String tableName; private SQLAdapter sql; private SupportedDatabase database; private int currentBatch = 0; private PreparedStatement insertStatement; private PreparedStatement updateStatement = null; private PreparedStatement deleteStatement; private PreparedStatement selectStatement = null; private Set<String> indexSet; private String attrPrefix; SQLTableVersion(SQLAdapter sql, TableVersion t) throws SQLException { this.sql = sql; this.database = sql.getDatabase(); this.tableName = t.getTable().getName().replace("db.", "db_"); attrPrefix = t.getTable().getName().replace("db.", "") + "_"; // create statements for : // - create table // - insert into // - update set where // - delete from where // - select from where // need to check table does already exist, otherwise create table indexSet = new LinkedHashSet<String>(); for (Index index : t.getTable().getIndeces()) { indexSet.add(index.getName()); } StringBuffer create = new StringBuffer("CREATE TABLE "); StringBuffer insert = new StringBuffer("INSERT INTO "); StringBuffer update = new StringBuffer("UPDATE "); StringBuffer delete = new StringBuffer("DELETE FROM "); StringBuffer select = new StringBuffer("SELECT "); create.append(tableName).append(" ("); insert.append(tableName).append(" ("); update.append(tableName).append(" SET "); delete.append(tableName).append(" WHERE "); String comma = ""; String nonIndexComma = ""; boolean nonIndexAttrExists = false; for (Attribute attr : t) { String attrName = attrPrefix + attr.getName(); create.append(comma). append(attrName). append(" ").append(getType(attr.getDomain())); insert.append(comma).append(attrName); if (! indexSet.contains(attr.getName())) { update.append(nonIndexComma).append(attrName).append(" = ?"); select.append(nonIndexComma).append(attrName); nonIndexAttrExists = true; nonIndexComma = ", "; } if (comma.equals("")) { comma = ", "; } } create.append(comma).append(" PRIMARY KEY ("); update.append(" WHERE "); select.append(" FROM ").append(tableName); select.append(" WHERE "); comma = ""; String and = ""; for (Index index : t.getTable().getIndeces()) { create.append(comma).append(attrPrefix).append(index.getName()); update.append(and).append(attrPrefix).append(index.getName()).append(" = ?"); delete.append(and).append(attrPrefix).append(index.getName()).append(" = ?"); select.append(and).append(attrPrefix).append(index.getName()).append(" = ?"); if (comma.equals("")) { comma = ", "; and = " AND "; } } create.append("))"); insert.append(") VALUES ( ?"); for (int i = 1; i < t.getAttributes().size(); i++) { insert.append(", ?"); } insert.append(")"); System.out.println(create.toString().toUpperCase()); // System.out.println(insert.toString().toUpperCase()); // if (nonIndexAttrExists) System.out.println(update.toString().toUpperCase()); // System.out.println(delete.toString().toUpperCase()); // if (nonIndexAttrExists) System.out.println(select.toString().toUpperCase()); // create tables if they do not exist yet if (!sql.tableExists(tableName)) { Statement stmt = sql.getConnection().createStatement(); stmt.executeUpdate(create.toString()); } insertStatement = sql.getConnection().prepareStatement(insert.toString()); deleteStatement = sql.getConnection().prepareStatement(delete.toString()); if (nonIndexAttrExists) { updateStatement = sql.getConnection().prepareStatement(update.toString()); selectStatement = sql.getConnection().prepareStatement(select.toString()); } } private String getType(Domain domain) { return database.getMapping(domain); } void putValue(DataJournalEntry entry) throws Exception { int index = 0; // pre-filter the values if case-insensitive. // All indexed keys need to be lower case for (Object o : entry.getValues()) { insertStatement.setObject(++index, o); } if (sql.getBatchSize() > 1) { if (currentBatch < sql.getBatchSize()) { insertStatement.addBatch(); currentBatch++; return; // do not execute at this time } else { int total = currentBatch; int n = flushBatchUpdate(); if (n > 0 && n != total) { throw new Exception("PutValue for " + tableName + " did not insert batch " + n + " : " + total); } sql.getConnection().commit(); } } else { int n = insertStatement.executeUpdate(); if (n != 1) { throw new Exception("PutValue for " + tableName + " did not insert 1 value"); } } } int flushBatchUpdate() throws Exception { int n = 0; int total[] = insertStatement.executeBatch(); for (int i : total) { if (i > 0) n += i; else if (i != Statement.SUCCESS_NO_INFO) { throw new Exception("flashBatchUpdate reported error " + i); } } currentBatch = 0; return n; } private void setParameters(PreparedStatement statement, DataJournalEntry entry, int start) throws Exception { int i = start; for (String index : indexSet) { statement.setObject(++i, entry.getValue(index)); } } void replaceValue(DataJournalEntry entry) throws Exception { if (updateStatement == null) { throw new Exception("ReplaceValue called on table " + tableName); } int index = 0; for (Argument arg : entry.getArguments() ) { if (!indexSet.contains(arg.getAttribute().getName())) { updateStatement.setObject(++index, arg.getValue()); } } setParameters(updateStatement, entry, index); int n = updateStatement.executeUpdate(); if (n != 1) { throw new Exception("ReplaceValue for " + tableName + " did not update 1 value"); } } void deleteValue(DataJournalEntry entry ) throws Exception { setParameters(deleteStatement, entry, 0); int n = deleteStatement.executeUpdate(); if (n != 1) { throw new Exception("ReplaceValue for " + tableName + " did not update 1 value"); } } void verifyValue(DataJournalEntry entry) throws Exception { // OK, so this is way to general for the only usage we have, which // is checking the journal counter, but what the hell setParameters(selectStatement, entry, 0); if (selectStatement == null) { throw new Exception("VerifyValue called on table " + tableName); } ResultSet rs = selectStatement.executeQuery(); if (rs.next()) { for (Argument arg : entry.getArguments() ) { if (!indexSet.contains(arg.getAttribute().getName())) { Object result = rs.getObject(attrPrefix + arg.getAttribute().getName()); if (result instanceof Clob) { // Oracle stores TEXT as a CLOB, so I get a Clob object back // I "know" that the string will be short, so I load everything Clob clob = (Clob) result; String substring = clob.getSubString(1, (int)clob.length()); result = substring; clob.free(); } if (!result.equals(arg.getValue())) { throw new Exception("VerifyValue for " + tableName + ": expected " + arg.getValue() + " got " + result); } } } } else { // throw a wobbly throw new Exception("VerifyValue for " + tableName + " did not find value"); } rs.close(); } }
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#1 | 7527 | Sven Erik Knop |
JournalReader, now in its proper place. Documentation to follow. |