package journal.reader; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintStream; import java.io.Reader; import java.text.ParseException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.zip.GZIPInputStream; import org.ini4j.Ini; import org.ini4j.InvalidFileFormatException; import org.ini4j.Profile.Section; import journal.action.Action; import journal.action.CompoundAction; import journal.exception.UnknownActionException; import journal.exception.UnknownOptionException; import journal.filter.Filter; import jargs.gnu.CmdLineParser; public class JournalReader { private String fileName; private InputStream inputStream; private JournalParser parser; final private Options options; // This class will read the file // then use a parser plug-in to read each line public JournalReader(String fileName, Options options) { this.fileName = fileName; this.options = options; if (fileName.equals("-")) { this.inputStream = System.in; } else { try { this.inputStream = new FileInputStream(this.fileName); } catch (FileNotFoundException e) { throw new RuntimeException("Could not read file " + fileName); } } if (this.options.isCompressed() || fileName.endsWith(".gz")) { try { inputStream = new GZIPInputStream(inputStream); } catch (IOException e) { System.err.println("Unexpected IO Exception " + e); System.exit(-1); } } Reader reader = new BufferedReader( new InputStreamReader(inputStream) ); this.parser = new JournalParser(reader, options); } public void applyAction(final Action action) throws Exception { final LinkedBlockingQueue<JournalEntry> queue = new LinkedBlockingQueue<JournalEntry>(2048); final Thread producer = new Thread(new Runnable() { public void run() { try { JournalEntry entry = parser.parseNext(); while (entry != null) { try { queue.put(entry); entry = parser.parseNext(); } catch (InterruptedException e) { return; } } } catch(ParseException e) { System.out.println("Parser Error at line " + e.getErrorOffset()); e.printStackTrace(); return; } catch (IOException e) { e.printStackTrace(); return; } } }); final Thread consumer = new Thread(new Runnable() { public void run() { int counter = 0; try { try { while (true) { JournalEntry entry = queue.take(); counter++; if (options.isVerbose()) { if (counter % 10000 == 0) { System.err.println("Processed " + counter/10000 + "0k"); } } entry.invokeAction(action); } } catch (InterruptedException e) { // The producer is done, the queue is clear to be drained completely JournalEntry entries[] = new JournalEntry[queue.size()]; entries = queue.toArray(entries); for (JournalEntry entry : entries) { entry.invokeAction(action); } } } catch( Exception e ) { System.err.println("Encountered Exception."); e.printStackTrace(System.err); producer.interrupt(); } } }); producer.start(); consumer.start(); try { producer.join(); } catch (InterruptedException e) { e.printStackTrace(); } consumer.interrupt(); try { consumer.join(); } catch (InterruptedException e) { } } private static Action loadAction(String actionClassName) throws UnknownActionException { Action action = null; try { Class<?> actionClass = Class.forName(actionClassName); action = (Action) actionClass.getConstructor().newInstance(); } catch(Exception e) { throw new UnknownActionException(actionClassName, e); } return action; } final static class Usage { char option; String longOption; String parameter = null; String description; Usage(char o, String l, String d) { this.option = o; this.longOption = l; this.description = d; } Usage(char o, String l, String p, String d) { this.option = o; this.longOption = l; this.parameter = p; this.description = d; } @Override public String toString() { StringBuffer buf = new StringBuffer(); if (parameter == null) { buf.append("\t-").append(option).append(", --").append(longOption); buf.append("\n\t\t").append(description); } else { buf.append("\t-").append(option).append(" <").append(parameter).append(">"). append(", --").append(longOption).append("=<").append(parameter).append(">"); buf.append("\n\t\t").append(description); } return buf.toString(); } } static List<Usage> usage = new ArrayList<Usage>(); static { usage.add(new Usage('h', "help", "This help")); usage.add(new Usage('i', "stdin", "Reads the journal file from stdin")); usage.add(new Usage('C', "config", "configfile", "Configuration file that contains all options. Used by the Reader and the actions")); usage.add(new Usage('a', "action", "classname", "The action to be applied to each record. Default action is to print each record.")); usage.add(new Usage('f', "filter", "filter", "Filter action used before the actual action.")); usage.add(new Usage('z', "compressed", "Use compressed checkpoint and output.")); usage.add(new Usage('v', "verbose", "Reports every 10,000 journal entries processed.")); usage.add(new Usage('c', "caseinsensitive", "Case-insensitive processing of the key fields in each record.")); usage.add(new Usage('u', "unicode", "Unicode (UTF8) checking of each record.")); usage.add(new Usage('o', "output", "outputfile", "Writes all output like reports to the output file instead of stdout.")); usage.add(new Usage('e', "error", "errorfile", "Writes all errors to the errorfile instead of stderr.")); } private static void printUsage() { System.err.println("Usage : JournalReader [options] { <filename> | -i }"); System.err.println("Options: [-h] [-C <configfile>] [-a <action>] [-f <filter>] [-z] [-v] [-c] [-u] [-o <outputFile>] [-e <errorFile>]"); System.err.println(); for (Usage u : usage) { System.err.println(u); } } private static PrintStream getStream(PrintStream def, String filename) { PrintStream stream = def; if (filename != null) { try { stream = new PrintStream(new FileOutputStream(filename)); } catch (Exception e) { System.err.println("Cannot open file : " + filename); e.printStackTrace(System.err); System.exit(1); } } return stream; } private static Action processConfigFile(Options options, String configFileName) { List<Action> actions = new ArrayList<Action>(); List<Filter> filters = new ArrayList<Filter>(); try { Ini ini = new Ini(new FileReader(configFileName)); for( String sectionName : ini.keySet()) { Section section = ini.get(sectionName); if( sectionName.equals("reader")) { for (String optionKey: section.keySet()) { options.setOption("reader", optionKey, section.get(optionKey)); } } else { Action action = loadAction(sectionName); // will report an error and exit if not there action.setDefaultOptions(options); if( action instanceof Filter) { filters.add( (Filter) action ); } else actions.add( action ); for( String optionKey: section.keySet() ) { options.setOption(sectionName, optionKey, section.get(optionKey)); } } } } catch( UnknownActionException e1) { System.err.println(e1); System.exit(1); } catch (UnknownOptionException e1) { System.err.println(e1); System.exit(1); } catch (InvalidFileFormatException e1) { System.err.println(e1); System.exit(1); } catch (FileNotFoundException e1) { System.err.println(e1); System.exit(1); } catch (IOException e1) { System.err.println(e1); System.exit(1); } // post-processing // lets check the assertions first if( actions.size() == 0 ) { System.err.println("Need at least one action specified in the config file"); System.exit(1); } Action root = null; Filter filter = null; // filter size >= 0 if( filters.size() > 0 ) { root = filters.get(0); filter = (Filter) root; for( int i = 1; i < filters.size(); i++) { Filter next = filters.get(i); filter.setAction(next); filter = next; } } Action action = null; if( actions.size() > 1 ) { // use compound CompoundAction c = new CompoundAction(); for( Action a : actions ) c.addAction(a); action = c; } else { action = actions.get(0); } if( filter != null) filter.setAction(action); else root = action; return root; } public static void main(String[] args) { Options options = new Options(); CmdLineParser parser = new CmdLineParser(); CmdLineParser.Option helpOption = parser.addBooleanOption('h', "help"); CmdLineParser.Option actionOption = parser.addStringOption('a', "action"); CmdLineParser.Option filterOption = parser.addStringOption('f', "filter"); CmdLineParser.Option compressedOption = parser.addBooleanOption('z', "compressed"); CmdLineParser.Option verboseOption = parser.addBooleanOption('v', "verbose"); CmdLineParser.Option outputOption = parser.addStringOption('o', "output"); CmdLineParser.Option errorOption = parser.addStringOption('e', "error"); CmdLineParser.Option caseOption = parser.addBooleanOption('c', "caseinsensitive"); CmdLineParser.Option unicodeOption = parser.addBooleanOption('u', "unicode"); CmdLineParser.Option inputOption = parser.addBooleanOption('i', "stdin"); CmdLineParser.Option configOption = parser.addStringOption('C', "config"); try { parser.parse(args); } catch ( CmdLineParser.OptionException e ) { System.err.println(e.getMessage()); printUsage(); System.exit(2); } boolean help = (Boolean) parser.getOptionValue(helpOption, Boolean.FALSE); boolean compressed = (Boolean) parser.getOptionValue(compressedOption, Boolean.FALSE); boolean caseInsensitive = (Boolean) parser.getOptionValue(caseOption, Boolean.FALSE); boolean unicode = (Boolean) parser.getOptionValue(unicodeOption, Boolean.FALSE); String actionClassName = (String) parser.getOptionValue(actionOption,"journal.action.JournalAction"); String filterClassName = (String) parser.getOptionValue(filterOption); String configFileName = (String) parser.getOptionValue(configOption); PrintStream outstream = getStream(System.out, (String) parser.getOptionValue(outputOption)); PrintStream errorstream = getStream(System.err, (String) parser.getOptionValue(errorOption)); options.setOutputStream(outstream); options.setErrorStream(errorstream); Action action = null; if (configFileName != null) { action = processConfigFile(options, configFileName); } else { try { action = loadAction(actionClassName); action.setDefaultOptions(options); if (filterClassName != null) { Action nextAction = action; action = loadAction(filterClassName); ((Filter) action).setAction(nextAction); } } catch(UnknownActionException e) { System.err.println("Action unknown: " + e); System.exit(1); } } if (help) { System.out.println(); printUsage(); System.out.println(); action.help(); System.exit(0); } // processed after reading the confif file. Config file takes precedence here boolean verbose = (Boolean) parser.getOptionValue(verboseOption, Boolean.valueOf(options.isVerbose())); String[] otherArgs = action.parseArgs(parser.getRemainingArgs()); String fileName = null; boolean inputFromStdin = (Boolean) parser.getOptionValue(inputOption, Boolean.FALSE); if (inputFromStdin) { fileName = "-"; // read from stdin } if (otherArgs.length > 0) { fileName = otherArgs[0]; } if ( fileName == null) { printUsage(); System.exit(1); } options.setCompressed(compressed); options.setVerbose(verbose); options.setCaseInsensitive(caseInsensitive); options.setUnicode(unicode); JournalReader reader = new JournalReader(fileName, options); try { action.start(); reader.applyAction(action); action.finish(); } catch( Exception e ) { System.err.println("Encountered Exception."); e.printStackTrace(System.err); } } }
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#9 | 23552 | Sven Erik Knop | Fixed indentation, no functional change | ||
#8 | 8111 | Sven Erik Knop |
Updated JournalReader to use the correct default action. This means that you can invoke the JournalReader without options again to get the help output. |
||
#7 | 8027 | Sven Erik Knop |
Moved exceptions to their own package. Enabled new Action FilepathRenamer (not fully tested yet). |
||
#6 | 8024 | Sven Erik Knop | Refactoring: moved the filter into their own package | ||
#5 | 8023 | Sven Erik Knop |
Complete rewrite of the configuration file, now based on an ini-file format. The ini file has a general [reader] section for settings like verbose, outputFile, case-sensitivity and so on. It also allows to set up a range of Actions and Filters. The section name here is the fully classified class name, followed by settings for the particular actions. An example will make this clearer: ================================================================ [reader] verbose=true [journal.action.UserRenamer] fileName=user.txt patch=True outputFile=user.out [journal.action.ClientRenamer] fileName=client.txt outputFile=client.out patch=true ================================================================ I will provide more example set-ups in the near future. Filters are classes implementing journal.action.Filter (soon to be journal.filter.Filter) which can be chained together and are all executed before the actions. Actions are applied in order that they are given in the config file. |
||
#4 | 8020 | Sven Erik Knop | Replace public Option attributes with setters and getters. | ||
#3 | 8019 | Sven Erik Knop | Safety checks for case-sensitivity and Unicode, now provided in the checkpoint. | ||
#2 | 7818 | Sven Erik Knop |
BufferedReader for reading the checkpoint. Probably does not make a difference, but who knows? Also removed the old zip - file and added the dist jar file generated for every change. Might have to undo that if it freaks out my Eclipse. |
||
#1 | 7527 | Sven Erik Knop |
JournalReader, now in its proper place. Documentation to follow. |