package journal.reader; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintStream; import java.io.Reader; import java.text.ParseException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.zip.GZIPInputStream; import journal.action.Action; import journal.action.Filter; import journal.reader.Options.Names; import jargs.gnu.CmdLineParser; public class JournalReader { private String fileName; private InputStream inputStream; private JournalParser parser; final private Options options; // This class will read the file // then use a parser plug-in to read each line public JournalReader(String fileName, Options options) { this.fileName = fileName; this.options = options; if (fileName.equals("-")) { this.inputStream = System.in; } else { try { this.inputStream = new FileInputStream(this.fileName); } catch (FileNotFoundException e) { throw new RuntimeException("Could not read file " + fileName); } } if (this.options.isCompressed() || fileName.endsWith(".gz")) { try { inputStream = new GZIPInputStream(inputStream); } catch (IOException e) { System.err.println("Unexpected IO Exception " + e); System.exit(-1); } } Reader reader = new BufferedReader( new InputStreamReader(inputStream) ); this.parser = new JournalParser(reader, options); } public void applyAction(final Action action) throws Exception { final LinkedBlockingQueue<JournalEntry> queue = new LinkedBlockingQueue<JournalEntry>(2048); final Thread producer = new Thread(new Runnable() { public void run() { try { JournalEntry entry = parser.parseNext(); while (entry != null) { try { queue.put(entry); entry = parser.parseNext(); } catch (InterruptedException e) { return; } } } catch(ParseException e) { System.out.println("Parser Error at line " + e.getErrorOffset()); e.printStackTrace(); return; } catch (IOException e) { e.printStackTrace(); return; } } }); final Thread consumer = new Thread(new Runnable() { public void run() { int counter = 0; try { try { while (true) { JournalEntry entry = queue.take(); counter++; if (options.isVerbose()) { if (counter % 10000 == 0) { System.err.println("Processed " + counter/10000 + "0k"); } } entry.invokeAction(action); } } catch (InterruptedException e) { // The producer is done, the queue is clear to be drained completely JournalEntry entries[] = new JournalEntry[queue.size()]; entries = queue.toArray(entries); for (JournalEntry entry : entries) { entry.invokeAction(action); } } } catch( Exception e ) { System.err.println("Encountered Exception."); e.printStackTrace(System.err); producer.interrupt(); } } }); producer.start(); consumer.start(); try { producer.join(); } catch (InterruptedException e) { e.printStackTrace(); } consumer.interrupt(); try { consumer.join(); } catch (InterruptedException e) { } } private static Action loadAction(String actionClassName) { Action action = null; try { Class<?> actionClass = Class.forName(actionClassName); action = (Action) actionClass.getConstructor().newInstance(); } catch( Exception e) { System.out.println(e); System.exit(1); } return action; } final static class Usage { char option; String longOption; String parameter = null; String description; Usage(char o, String l, String d) { this.option = o; this.longOption = l; this.description = d; } Usage(char o, String l, String p, String d) { this.option = o; this.longOption = l; this.parameter = p; this.description = d; } @Override public String toString() { StringBuffer buf = new StringBuffer(); if (parameter == null) { buf.append("\t-").append(option).append(", --").append(longOption); buf.append("\n\t\t").append(description); } else { buf.append("\t-").append(option).append(" <").append(parameter).append(">"). append(", --").append(longOption).append("=<").append(parameter).append(">"); buf.append("\n\t\t").append(description); } return buf.toString(); } } static List<Usage> usage = new ArrayList<Usage>(); static { usage.add(new Usage('h', "help", "This help")); usage.add(new Usage('i', "stdin", "Reads the journal file from stdin")); usage.add(new Usage('C', "config", "configfile", "Configuration file that contains all options. Used by the Reader and the actions")); usage.add(new Usage('a', "action", "classname", "The action to be applied to each record. Default action is to print each record.")); usage.add(new Usage('f', "filter", "filter", "Filter action used before the actual action.")); usage.add(new Usage('z', "compressed", "Use compressed checkpoint and output.")); usage.add(new Usage('v', "verbose", "Reports every 10,000 journal entries processed.")); usage.add(new Usage('c', "caseinsensitive", "Case-insensitive processing of the key fields in each record.")); usage.add(new Usage('u', "unicode", "Unicode (UTF8) checking of each record.")); usage.add(new Usage('o', "output", "outputfile", "Writes all output like reports to the output file instead of stdout.")); usage.add(new Usage('e', "error", "errorfile", "Writes all errors to the errorfile instead of stderr.")); } private static void printUsage() { System.err.println("Usage : JournalReader [options] { <filename> | -i }"); System.err.println("Options: [-h] [-C <configfile>] [-a <action>] [-f <filter>] [-z] [-v] [-c] [-u] [-o <outputFile>] [-e <errorFile>]"); System.err.println(); for (Usage u : usage) { System.err.println(u); } System.err.println(); System.err.println("Available configuration file parameters are :"); for (Names n : Options.Names.values()) { System.err.println("\t" + n.option); } } private static PrintStream getStream(PrintStream def, String filename) { PrintStream stream = def; if (filename != null) { try { stream = new PrintStream(new FileOutputStream(filename)); } catch (Exception e) { System.err.println("Cannot open file : " + filename); e.printStackTrace(System.err); System.exit(1); } } return stream; } public static void main(String[] args) { Options options = new Options(); CmdLineParser parser = new CmdLineParser(); CmdLineParser.Option helpOption = parser.addBooleanOption('h', "help"); CmdLineParser.Option actionOption = parser.addStringOption('a', "action"); CmdLineParser.Option filterOption = parser.addStringOption('f', "filter"); CmdLineParser.Option compressedOption = parser.addBooleanOption('z', "compressed"); CmdLineParser.Option verboseOption = parser.addBooleanOption('v', "verbose"); CmdLineParser.Option outputOption = parser.addStringOption('o', "output"); CmdLineParser.Option errorOption = parser.addStringOption('e', "error"); CmdLineParser.Option caseOption = parser.addBooleanOption('c', "caseinsensitive"); CmdLineParser.Option unicodeOption = parser.addBooleanOption('u', "unicode"); CmdLineParser.Option inputOption = parser.addBooleanOption('i', "stdin"); CmdLineParser.Option configOption = parser.addStringOption('C', "config"); try { parser.parse(args); } catch ( CmdLineParser.OptionException e ) { System.err.println(e.getMessage()); printUsage(); System.exit(2); } boolean help = (Boolean) parser.getOptionValue(helpOption, Boolean.FALSE); String configFileName = (String) parser.getOptionValue(configOption); if (configFileName != null) { try { options.loadProperties(configFileName); } catch (FileNotFoundException e) { System.err.println(e.getMessage()); System.err.println(); printUsage(); System.exit(2); } catch (IOException e) { System.err.println(e.getMessage()); System.err.println(); printUsage(); System.exit(2); } } boolean compressed = (Boolean) parser.getOptionValue(compressedOption, Boolean.FALSE); boolean verbose = (Boolean) parser.getOptionValue(verboseOption, Boolean.FALSE); boolean caseInsensitive = (Boolean) parser.getOptionValue(caseOption, Boolean.FALSE); boolean unicode = (Boolean) parser.getOptionValue(unicodeOption, Boolean.FALSE); String actionClassName = (String) parser.getOptionValue(actionOption, options.getProperty(Options.Names.ACTION)); String filterClassName = (String) parser.getOptionValue(filterOption); Action action = loadAction(actionClassName); if (help) { System.out.println(); printUsage(); System.out.println(); action.help(); System.exit(0); } if (filterClassName != null) { Action nextAction = action; action = loadAction(filterClassName); ((Filter) action).setAction(nextAction); } PrintStream outstream = getStream(System.out, (String) parser.getOptionValue(outputOption)); PrintStream errorstream = getStream(System.err, (String) parser.getOptionValue(errorOption)); String[] otherArgs = action.parseArgs(parser.getRemainingArgs()); String fileName = null; boolean inputFromStdin = (Boolean) parser.getOptionValue(inputOption, Boolean.FALSE); if (inputFromStdin) { fileName = "-"; // read from stdin } if (otherArgs.length > 0) { fileName = otherArgs[0]; } if ( fileName == null) { printUsage(); System.exit(1); } options.setCompressed(compressed); options.setVerbose(verbose); options.setOutputStream(outstream); options.setErrorStream(errorstream); options.setCaseInsensitive(caseInsensitive); options.setUnicode(unicode); JournalReader reader = new JournalReader(fileName, options); try { action.start(options); reader.applyAction(action); action.finish(); } catch( Exception e ) { System.err.println("Encountered Exception."); e.printStackTrace(System.err); } } }
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#9 | 23552 | Sven Erik Knop | Fixed indentation, no functional change | ||
#8 | 8111 | Sven Erik Knop |
Updated JournalReader to use the correct default action. This means that you can invoke the JournalReader without options again to get the help output. |
||
#7 | 8027 | Sven Erik Knop |
Moved exceptions to their own package. Enabled new Action FilepathRenamer (not fully tested yet). |
||
#6 | 8024 | Sven Erik Knop | Refactoring: moved the filter into their own package | ||
#5 | 8023 | Sven Erik Knop |
Complete rewrite of the configuration file, now based on an ini-file format. The ini file has a general [reader] section for settings like verbose, outputFile, case-sensitivity and so on. It also allows to set up a range of Actions and Filters. The section name here is the fully classified class name, followed by settings for the particular actions. An example will make this clearer: ================================================================ [reader] verbose=true [journal.action.UserRenamer] fileName=user.txt patch=True outputFile=user.out [journal.action.ClientRenamer] fileName=client.txt outputFile=client.out patch=true ================================================================ I will provide more example set-ups in the near future. Filters are classes implementing journal.action.Filter (soon to be journal.filter.Filter) which can be chained together and are all executed before the actions. Actions are applied in order that they are given in the config file. |
||
#4 | 8020 | Sven Erik Knop | Replace public Option attributes with setters and getters. | ||
#3 | 8019 | Sven Erik Knop | Safety checks for case-sensitivity and Unicode, now provided in the checkpoint. | ||
#2 | 7818 | Sven Erik Knop |
BufferedReader for reading the checkpoint. Probably does not make a difference, but who knows? Also removed the old zip - file and added the dist jar file generated for every change. Might have to undo that if it freaks out my Eclipse. |
||
#1 | 7527 | Sven Erik Knop |
JournalReader, now in its proper place. Documentation to follow. |