#******************************************************************** # # Copyright (C) 2005-2006 Hari Krishna Dara # # This file is part of p4admin. # # p4admin is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # p4admin is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # #******************************************************************* import sched import time import threading import sys import logging import datetime import utils log = logging.getLogger(__name__) class Scheduler(threading.Thread): def __init__(self): super(Scheduler, self).__init__() self.schd = sched.scheduler(time.time, time.sleep) self.cv = threading.Condition() self.actionToEventMap = {} self.currException = None self.setDaemon(True) self.jobMonThread = utils.JobMonitorThread() # FIXME: Implement. def dispose(self): pass def wait(self): """Wait for the scheduler to finish tasks""" # This construct is such that main thread will be able to detect # KeyboardInterrupts. while self.isAlive() and not self.currException: self.join(1) if self.currException: ex = self.currException self.currException = None raise ex def run(self): self.jobMonThread.start() self.schd.run() log.info("Scheduler exiting") def registerAction(self, actionHandler): self.enterAction(actionHandler) def getAction(self, actionHandler): return self.actionToEventMap[actionHandler] def enterAction(self, actionHandler): self.cv.acquire() try: event = self.schd.enterabs(actionHandler.getNextTime(), actionHandler.getPriority(), self.executeAction, (actionHandler,)) self.actionToEventMap[actionHandler] = event finally: self.cv.release() def executeAction(self, actionHandler): self.cv.acquire() try: try: actionHandler.lastRunStTime = datetime.datetime.now() self.jobMonThread.startMonitoring(actionHandler.getActionName(), actionHandler.expectedRunDuration) actionHandler.execute() # If repeatable, re-enter. if actionHandler.isRepeatable(): self.enterAction(actionHandler) except: # Save it so that wait() can propagate it to the waiting thread. self.currException = sys.exc_info()[1] log.exception('%s: Execution of action aborted due to ' 'exception.', actionHandler.getActionName()) # Cancel all events so that scheduler will end. for action in self.actionToEventMap.keys(): try: self.schd.cancel(self.actionToEventMap[action]) except ValueError: pass finally: self.jobMonThread.endMonitoring() if actionHandler.isRepeatable() and not self.currException: log.info("%s: Next execution scheduled at: %s\n%s", actionHandler.getActionName(), time.ctime(actionHandler.getNextTime()), "-"*50) self.cv.release() class ActionHandlerBase(object): def __init__(self, name, firstRunTime, interval, repeatable, heartbeat, priority): self.name = name self.firstRunTime = firstRunTime self.interval = interval self.repeatable = repeatable self.heartbeat = heartbeat self.priority = priority self.nextTime = firstRunTime self.runCount = 0 self.lastStatus = 0 self.notifyCompletion = False self.lastRunStTime = None self.expectedRunDuration = None def setNotifyCompletion(self, b): self.notifyCompletion = b def setExpectedRunDuration(self, runDur): """Set a timedelta object. When the run duration exceeds the amount, a notificaton will be sent out.""" self.expectedRunDuration = runDur def getActionName(self): return self.name def isRepeatable(self): return self.repeatable def getPriority(self): return self.priority def getNextTime(self): return self.nextTime def execute(self): self.runCount+=1 # If repeatable and heartbeat, first calculate the nextTime. # TODO: Add a flag to indicate if runs that are missed should be # skipped (self.nextTime < time.time()). if self.heartbeat and self.repeatable: #self.nextTime = time.time() + self.interval # This makes sure that the nextTime is more accurate, even when the # current execution is delayed. self.nextTime = self.nextTime + self.interval log.info("%s: Execution of action started. (count=%d)", self.getActionName(), self.runCount) # TODO: start collecting statistics. self.lastStatus = self.executeImpl() # TODO: compute statistics and print. log.info("%s: Execution of action ended. (count=%d)", self.getActionName(), self.runCount) # If repeatable and not heartbeat, calculate nextTime at the end. if not self.heartbeat and self.repeatable: self.nextTime = time.time() + self.interval if self.notifyCompletion: import notify notify.sendInfo('%s completed with status: %d' % (self.getActionName(), self.lastStatus), 'Run count: %d\n' % self.runCount) def executeImpl(self): """This is to be implemented by the derived classes. Return 0 for success""" pass class TestActionHandler(ActionHandlerBase): def __init__(self, action, argTpl, interval, heartbeat, firstRunTime=None, initialDelay=None): if firstRunTime == None: if initialDelay == None: firstRunTime = time.time() else: firstRunTime = time.time() + initialDelay super(TestActionHandler, self).__init__("TestAction", firstRunTime, interval, True, heartbeat, 1) self.action = action self.argTpl = argTpl def executeImpl(self): self.action(*self.argTpl) def testSleep(tm): print 'testSleep('+time.ctime(time.time())+') for: '+str(tm) time.sleep(tm) def testExcept(dummy): print 'throwing exception' raise Exception('This is a test exception') if __name__ == '__main__': # Basic console logger. rootLogger = logging.getLogger() rootLogger.setLevel(logging.INFO) handler = logging.StreamHandler(sys.stdout) handler.setFormatter(logging.Formatter("%(message)s")) rootLogger.addHandler(handler) r = Scheduler() #def enter(initialDelay, interval, repeatable, heartbeat, priority, # action, argument): #r.registerAction(TestActionHandler(sys.stdout.write, ("two\n",), 3, False, initialDelay=2)) #r.registerAction(TestActionHandler(sys.stdout.write, ("four\n",), 4, True, firstRunTime=time.time() + 4)) # This results in 2 second gap (interval > sleep). #r.registerAction(TestActionHandler(testSleep, (1,), 2, True, initialDelay=2)) # This results in 4 second gap (sleep > interval). #r.registerAction(TestActionHandler(testSleep, (4,), 2, True, initialDelay=2)) # This results in 3 second gap (interval + sleep). r.registerAction(TestActionHandler(testSleep, (1,), 2, False, initialDelay=2)) #r.registerAction(TestActionHandler(testExcept, (None,), 2, False, initialDelay=2)) r.start() try: r.wait() except: log.exception('cought from scheduler')