package journal.reader; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintStream; import java.io.Reader; import java.text.ParseException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.zip.GZIPInputStream; import journal.action.Action; import journal.action.Filter; import jargs.gnu.CmdLineParser; public class JournalReader { private String fileName; private InputStream inputStream; private JournalParser parser; final private Options options; // This class will read the file // then use a parser plug-in to read each line public JournalReader(String fileName, Options options) { this.fileName = fileName; this.options = options; if (fileName.equals("-")) { this.inputStream = System.in; } else { try { this.inputStream = new FileInputStream(this.fileName); } catch (FileNotFoundException e) { throw new RuntimeException("Could not read file " + fileName); } } if (this.options.compressed || fileName.endsWith(".gz")) { try { inputStream = new GZIPInputStream(inputStream); } catch (IOException e) { System.err.println("Unexpected IO Exception " + e); System.exit(-1); } } Reader reader = new InputStreamReader(inputStream); this.parser = new JournalParser(reader); } public void applyAction(final Action action) throws Exception { final LinkedBlockingQueue<JournalEntry> queue = new LinkedBlockingQueue<JournalEntry>(2048); final Thread producer = new Thread(new Runnable() { public void run() { try { JournalEntry entry = parser.parseNext(); while (entry != null) { try { queue.put(entry); entry = parser.parseNext(); } catch (InterruptedException e) { return; } } } catch(ParseException e) { System.out.println("Parser Error at line " + e.getErrorOffset()); e.printStackTrace(); return; } catch (IOException e) { e.printStackTrace(); return; } } }); final Thread consumer = new Thread(new Runnable() { public void run() { int counter = 0; try { try { while (true) { JournalEntry entry = queue.take(); counter++; if (options.verbose) { if (counter % 10000 == 0) { System.err.println("Processed " + counter/10000 + "0k"); } } entry.invokeAction(action); } } catch (InterruptedException e) { // The producer is done, the queue is clear to be drained completely JournalEntry entries[] = new JournalEntry[queue.size()]; entries = queue.toArray(entries); for (JournalEntry entry : entries) { entry.invokeAction(action); } } } catch( Exception e ) { System.err.println("Encountered Exception."); e.printStackTrace(System.err); producer.interrupt(); } } }); producer.start(); consumer.start(); try { producer.join(); } catch (InterruptedException e) { e.printStackTrace(); } consumer.interrupt(); try { consumer.join(); } catch (InterruptedException e) { } } private static Action loadAction(String actionClassName) { Action action = null; try { Class<?> actionClass = Class.forName(actionClassName); action = (Action) actionClass.getConstructor().newInstance(); } catch( Exception e) { System.out.println(e); System.exit(1); } return action; } final static class Usage { char option; String longOption; String parameter = null; String description; Usage(char o, String l, String d) { this.option = o; this.longOption = l; this.description = d; } Usage(char o, String l, String p, String d) { this.option = o; this.longOption = l; this.parameter = p; this.description = d; } @Override public String toString() { StringBuffer buf = new StringBuffer(); if (parameter == null) { buf.append("\t-").append(option).append(", --").append(longOption); buf.append("\n\t\t").append(description); } else { buf.append("\t-").append(option).append(" <").append(parameter).append(">"). append(", --").append(longOption).append("=<").append(parameter).append(">"); buf.append("\n\t\t").append(description); } return buf.toString(); } } static List<Usage> usage = new ArrayList<Usage>(); static { usage.add(new Usage('h', "help", "This help")); usage.add(new Usage('i', "stdin", "Reads the journal file from stdin")); usage.add(new Usage('C', "config", "configfile", "Configuration file that contains all options. Used by the Reader and the actions")); usage.add(new Usage('a', "action", "classname", "The action to be applied to each record. Default action is to print each record.")); usage.add(new Usage('f', "filter", "filter", "Filter action used before the actual action.")); usage.add(new Usage('z', "compressed", "Use compressed checkpoint and output.")); usage.add(new Usage('v', "verbose", "Reports every 10,000 journal entries processed.")); usage.add(new Usage('c', "caseinsensitive", "Case-insensitive processing of the key fields in each record.")); usage.add(new Usage('o', "output", "outputfile", "Writes all output like reports to the output file instead of stdout.")); usage.add(new Usage('e', "error", "errorfile", "Case-insensitive processing of the key fields in each record.")); } private static void printUsage() { System.err.println("Usage : JournalReader [options] { <filename> | -i }"); System.err.println("Options: [-h] [-C <configfile>] [-a <action>] [-f <filter>] [-z] [-v] [-c] [-o <outputFile>] [-e <errorFile>]"); System.err.println(); for (Usage u : usage) { System.err.println(u); } } private static PrintStream getStream(PrintStream def, String filename) { PrintStream stream = def; if (filename != null) { try { stream = new PrintStream(new FileOutputStream(filename)); } catch (Exception e) { System.err.println("Cannot open file : " + filename); e.printStackTrace(System.err); System.exit(1); } } return stream; } public static void main(String[] args) { Options options = new Options(); CmdLineParser parser = new CmdLineParser(); CmdLineParser.Option helpOption = parser.addBooleanOption('h', "help"); CmdLineParser.Option actionOption = parser.addStringOption('a', "action"); CmdLineParser.Option filterOption = parser.addStringOption('f', "filter"); CmdLineParser.Option compressedOption = parser.addBooleanOption('z', "compressed"); CmdLineParser.Option verboseOption = parser.addBooleanOption('v', "verbose"); CmdLineParser.Option outputOption = parser.addStringOption('o', "output"); CmdLineParser.Option errorOption = parser.addStringOption('e', "error"); CmdLineParser.Option caseOption = parser.addBooleanOption('c', "caseinsensitive"); CmdLineParser.Option inputOption = parser.addBooleanOption('i', "stdin"); CmdLineParser.Option configOption = parser.addStringOption('C', "config"); try { parser.parse(args); } catch ( CmdLineParser.OptionException e ) { System.err.println(e.getMessage()); printUsage(); System.exit(2); } boolean help = (Boolean) parser.getOptionValue(helpOption, Boolean.FALSE); String configFileName = (String) parser.getOptionValue(configOption); if (configFileName != null) { try { options.loadProperties(configFileName); } catch (FileNotFoundException e) { System.err.println(e.getMessage()); System.err.println(); printUsage(); System.exit(2); } catch (IOException e) { System.err.println(e.getMessage()); System.err.println(); printUsage(); System.exit(2); } } boolean compressed = (Boolean) parser.getOptionValue(compressedOption, Boolean.FALSE); boolean verbose = (Boolean) parser.getOptionValue(verboseOption, Boolean.FALSE); boolean caseInsensitive = (Boolean) parser.getOptionValue(caseOption, Boolean.FALSE); String actionClassName = (String) parser.getOptionValue(actionOption, options.getProperty(Options.Names.ACTION)); String filterClassName = (String) parser.getOptionValue(filterOption); Action action = loadAction(actionClassName); if (help) { System.out.println(); printUsage(); System.out.println(); action.help(); System.exit(0); } if (filterClassName != null) { Action nextAction = action; action = loadAction(filterClassName); ((Filter) action).setAction(nextAction); } PrintStream outstream = getStream(System.out, (String) parser.getOptionValue(outputOption)); PrintStream errorstream = getStream(System.err, (String) parser.getOptionValue(errorOption)); String[] otherArgs = action.parseArgs(parser.getRemainingArgs()); String fileName = null; boolean inputFromStdin = (Boolean) parser.getOptionValue(inputOption, Boolean.FALSE); if (inputFromStdin) { fileName = "-"; // read from stdin } if (otherArgs.length > 0) { fileName = otherArgs[0]; } if ( fileName == null) { printUsage(); System.exit(1); } options.compressed = compressed; options.verbose = verbose; options.outputStream = outstream; options.errorStream = errorstream; options.caseInsensitive = caseInsensitive; JournalReader reader = new JournalReader(fileName, options); try { action.start(options); reader.applyAction(action); action.finish(); } catch( Exception e ) { System.err.println("Encountered Exception."); e.printStackTrace(System.err); } } }
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#4 | 7529 | Sven Erik Knop |
Deleted the whole JournalReader. It has actually moved to //guest/sven_erik_knop/java/JournalReader/. |
||
#3 | 7427 | Sven Erik Knop |
Major update of the JournalReader: - New help (usage) system, which explains all options - Config file that allows storing of parameters in a file. These are necessary for the SQLLoader and Updater, because classpath, classname and connection parameters need to be set. (see the example *.cfg files provided) - Now tested with MySQL, SQLite and Oracle 10 XE |
||
#2 | 7375 | Sven Erik Knop |
Major update of the JournalReader. Complete rewrite of the command line parsing Change in the options parsing within the journal reader New SQLLoader action. Currently only against MySQL (needs MySQL JDBC driver) with fixed database and user name. This will be replaced by a config file at some stage. |
||
#1 | 7374 | Sven Erik Knop | Rename/move file(s) - correct location for Eclipse project | ||
//guest/sven_erik_knop/JournalReader/journal/reader/JournalReader.java | |||||
#1 | 6467 | Sven Erik Knop |
Added JournalReader, a Java library of useful tools to read and process checkpoints and journals. Added are a readme.txt to explain some details, and a jar file that contains the compiled class files. The programs will need Java 1.6 to run. |