# # Python module for running a parser, and then checking the result against # a set of registered listeners. # import cutil, sys, Serializer, Setup class IPoller: ''' Polls something, returning an object representing a single record extracted from what is being polled. If the returned object is None, then assume that either nothing is available, or the record has not completed being extracted. ''' def __init__(self): ''' Just for form: I like to have a constructor for all my classes so I don't have to guess when I jump into a subclass. ''' pass def next(self): ''' Poll for the next object. If nothing is available at the moment, then return None. ''' return None class Listener: ''' Listens to polled objects. It's a two stage approach: 1. Check if the polled object is handled by this listener by running isHandled(object). If it returns True, then: 2. Call listen(object). TODO: add a queueing technique. This will provide: * storing listened-to objects as they come in * flushing the queue when it is handled * storing any number of other objects per listened-to object. On listener start-up, the queue will be run-through This should be provided though a Listener sub-class specialized for handling the queue. ''' def __init__(self): pass def onStartup(self): ''' Called after the instance is constructed, and before the loop processing begins. ''' pass def isHandled(self, obj): return False def listen(self, obj): pass def noData(self, hadData): ''' Called before a pause in polling for data. hadData will be True if the previous poll attempt returned with data. ''' pass def stoppedLoop(self): ''' Called after the poll loop completes. Gives the listeners a chance to clean themselves up. ''' pass class BatchListener(Listener): ''' A kind of listener that queues up pending tasks, so that they can be handled in batch. Users should implement: isHandled(self, obj) process(self, journalObj) on_flush_complete(self) # optional TODO: should "filename" be replaced with a single SerializerMaster reference? ''' def __init__(self, filename, nodata_threshold, maximum_pending_threshold): ''' filename: file to save/restore the queued serialized form. nodata_threshold: maximum number of repeated noData calls without a listen call before the process() method is called. maximum_pending_threshold: maximum number of repeated listen calls to queue up before calling the process() method. ''' Listener.__init__(self) self.nodata_threshold = nodata_threshold self.maximum_pending_threshold = maximum_pending_threshold self.serializer = Serializer.SerializerMaster(filename) self.__no_data_waits = 0 self.__journal_adds = 0 def add_serializable(self, name, obj): ''' Add a custom serializable object, if it isn't already known. Returns the added (or already added) item. ''' self.serializer.add_default(name, obj) return self.serializer.value(name) def get_serializable(self, name): ''' Retrieve a custom serializable object. ''' return self.serializer.value(name) def enqueue(self, journalObj): ''' Add the journal object to the internal serialized data structure(s). ''' pass def flush(self): ''' Flush the pending items that were added on enqueue() calls. ''' pass def onStartup(self): ''' On initial startup, always flush the queue. ''' self.flush() def stoppedLoop(self): ''' After stopping the loop, always flush the queue. ''' self.flush() def listen(self, obj): ''' Push objects into the queue, and check for max size. ''' self.enqueue(obj) self.__journal_adds += 1 if self.__journal_adds > self.maximum_pending_threshold: self.flush() self.__journal_adds = 0 def noData(self, hadData): if hadData: self.__no_data_waits = 0 else: self.__no_data_waits += 1 if self.__no_data_waits > self.nodata_threshold: self.flush() self.__journal_adds = 0 self.__no_data_waits = 0 class QueuedListener(BatchListener): def __init__(self, filename, nodata_threshold, maximum_pending_threshold): ''' filename: file to save/restore the queued serialized form. nodata_threshold: maximum number of repeated noData calls without a listen call before the process() method is called. maximum_pending_threshold: maximum number of repeated listen calls to queue up before calling the process() method. ''' BatchListener.__init__(self, nodata_threshold, maximum_pending_threshold) self.__journal = self.add_serializable('journal', Serializer.PendingQueue()) def process(self, journalObj): ''' Process a pending journal object as though listen was called. The actual 'journalObj' object is the one returned from parse() ''' pass def parse(self, journalObj): ''' Parse the journal object into an object usable by the tool. ''' return journalObj def flush(self): ''' Flush the pending journal queue until its empty. This is performed such that the item is removed from the queue only after it has been successfully processed. ''' while self.__journal.has_contents(): obj = self.__journal.peek() self.process(obj) self.__journal.next() self.on_flush_complete() def enqueue(self, journalObj): self.__journal.add(parse(journalObj)) def on_flush_complete(self): ''' Called after the flush operation completes. ''' pass class ListenerRunner: ''' Executes a listener/poller combination. ''' def __init__(self, poller, listenerList): ''' Sets the poller to use, and a list of listeners to respond to polled objects. If a listener in the list is a String, then it will attempt to create the object referenced by the string. ''' assert poller != None self.run = True self._signals = None self.poller = poller self.listeners = [] for x in listenerList: if isinstance(x, str): module = None clazz = x if x.count('.') > 0: module = x[:x.rindex('.')] clazz = x[x.rindex('.')+1:] if module != None: m = __import__(module) x = eval('m.%s()' % clazz) assert isinstance(x, Listener) self.listeners.append(x) def __del__(self): self.run = False self.restoreSignals() def setupSignals(self): ''' Sets up system signal handlers to correctly manage the loop process. This allows for better daemon-like behavior. ''' import signal if self._signals != None: raise Exception, "Signals already registered. Call restoreSignals first." cutil.log(cutil.INFO, "ListenerRunner: Registering system signal handlers") def interruptHandler(sigNumber, stackFrame): self.run = False self._signals = { signal.SIGINT: signal.signal(signal.SIGINT, interruptHandler), signal.SIGSTOP: signal.signal(signal.SIGSTOP, interruptHandler), signal.SIGTSTP: signal.signal(signal.SIGTSTP, interruptHandler), signal.SIGHUP: signal.signal(signal.SIGHUP, interruptHandler), signal.SIGTERM: signal.signal(signal.SIGTERM, interruptHandler), } def restoreSignals(self): if self._signals != None: cutil.log(cutil.INFO, "ListenerRunner: Restoring original system signal handlers") import signal # This isn't working right. #for (sigNumber, origHandler) in self._signals.items(): # signal.signal(sigNumber, origHandler) def poll(self): ''' Poll for an object, and call each listener. Returns True if the polled object was not None. ''' obj = self.poller.next() if obj != None: for x in self.listeners: try: if x.isHandled(obj): x.listen(obj) except: (errType, errValue, errTb) = sys.exc_info() cutil.log(cutil.ERR, ("Handler ", x, " raised exception ", errType, ": ", errValue)) cutil.log_error(cutil.ERR) return obj != None def loop(self, waitTime): ''' Goes into a loop to poll for events. If the poll returns None, then a wait period will be enabled. It will run until self.run is false. waitTime: time, in seconds, to wait between each poll. It can be a floating point number to indicate fractions of a second. ''' import time cutil.log(cutil.INFO, "ListenerRunner: calling onStartup for listeners") for x in self.listeners: try: x.onStartup() except: (errType, errValue, errTb) = sys.exc_info() cutil.log(cutil.ERR, ("Handler ", x, " raised exception ", errType, ": ", errValue)) cutil.log_error(cutil.ERR) cutil.log(cutil.INFO, "ListenerRunner: starting poll loop") hadData = False self.checkFatalError() while self.run: try: if not self.poll(): for x in self.listeners: try: x.noData(hadData) except: (errType, errValue, errTb) = sys.exc_info() cutil.log(cutil.ERR, ("Handler ", x, " raised exception ", errType, ": ", errValue)) cutil.log_error(cutil.ERR) hadData = False time.sleep(waitTime) else: hadData = True except: cutil.log_error(cutil.FATAL) self.checkFatalError() cutil.log(cutil.INFO, "ListenerRunner: finished poll loop") for x in self.listeners: x.stoppedLoop() def checkFatalError(self): if Setup.DIE_ON_FATAL_ERROR and cutil.hasFatal(): cutil.log(cutil.ERR, "Loop encounterd a fatal error. Terminating.") self.run = False