#********************************************************************
#
# Copyright (C) 2005-2006 Hari Krishna Dara
#
# This file is part of p4admin.
#
# p4admin is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# p4admin is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
#*******************************************************************
import sched
import time
import threading
import sys
import logging
import datetime
import utils
log = logging.getLogger(__name__)
class Scheduler(threading.Thread):
def __init__(self):
super(Scheduler, self).__init__()
self.schd = sched.scheduler(time.time, time.sleep)
self.cv = threading.Condition()
self.actionToEventMap = {}
self.currException = None
self.setDaemon(True)
self.jobMonThread = utils.JobMonitorThread()
# FIXME: Implement.
def dispose(self):
pass
def wait(self):
"""Wait for the scheduler to finish tasks"""
# This construct is such that main thread will be able to detect
# KeyboardInterrupts.
while self.isAlive() and not self.currException:
self.join(1)
if self.currException:
ex = self.currException
self.currException = None
raise ex
def run(self):
self.jobMonThread.start()
self.schd.run()
log.info("Scheduler exiting")
def registerAction(self, actionHandler):
self.enterAction(actionHandler)
def getAction(self, actionHandler):
return self.actionToEventMap[actionHandler]
def enterAction(self, actionHandler):
self.cv.acquire()
try:
event = self.schd.enterabs(actionHandler.getNextTime(),
actionHandler.getPriority(), self.executeAction,
(actionHandler,))
self.actionToEventMap[actionHandler] = event
finally:
self.cv.release()
def executeAction(self, actionHandler):
self.cv.acquire()
try:
try:
actionHandler.lastRunStTime = datetime.datetime.now()
self.jobMonThread.startMonitoring(actionHandler.getActionName(),
actionHandler.expectedRunDuration)
actionHandler.execute()
# If repeatable, re-enter.
if actionHandler.isRepeatable():
self.enterAction(actionHandler)
except:
# Save it so that wait() can propagate it to the waiting thread.
self.currException = sys.exc_info()[1]
log.exception('%s: Execution of action aborted due to '
'exception.', actionHandler.getActionName())
# Cancel all events so that scheduler will end.
for action in self.actionToEventMap.keys():
try:
self.schd.cancel(self.actionToEventMap[action])
except ValueError:
pass
finally:
self.jobMonThread.endMonitoring()
if actionHandler.isRepeatable() and not self.currException:
log.info("%s: Next execution scheduled at: %s\n%s",
actionHandler.getActionName(),
time.ctime(actionHandler.getNextTime()),
"-"*50)
self.cv.release()
class ActionHandlerBase(object):
def __init__(self, name, firstRunTime, interval, repeatable, heartbeat,
priority):
self.name = name
self.firstRunTime = firstRunTime
self.interval = interval
self.repeatable = repeatable
self.heartbeat = heartbeat
self.priority = priority
self.nextTime = firstRunTime
self.runCount = 0
self.lastStatus = 0
self.notifyCompletion = False
self.lastRunStTime = None
self.expectedRunDuration = None
def setNotifyCompletion(self, b):
self.notifyCompletion = b
def setExpectedRunDuration(self, runDur):
"""Set a timedelta object. When the run duration exceeds the amount, a
notificaton will be sent out."""
self.expectedRunDuration = runDur
def getActionName(self):
return self.name
def isRepeatable(self):
return self.repeatable
def getPriority(self):
return self.priority
def getNextTime(self):
return self.nextTime
def execute(self):
self.runCount+=1
# If repeatable and heartbeat, first calculate the nextTime.
# TODO: Add a flag to indicate if runs that are missed should be
# skipped (self.nextTime < time.time()).
if self.heartbeat and self.repeatable:
#self.nextTime = time.time() + self.interval
# This makes sure that the nextTime is more accurate, even when the
# current execution is delayed.
self.nextTime = self.nextTime + self.interval
log.info("%s: Execution of action started. (count=%d)",
self.getActionName(), self.runCount)
# TODO: start collecting statistics.
self.lastStatus = self.executeImpl()
# TODO: compute statistics and print.
log.info("%s: Execution of action ended. (count=%d)",
self.getActionName(), self.runCount)
# If repeatable and not heartbeat, calculate nextTime at the end.
if not self.heartbeat and self.repeatable:
self.nextTime = time.time() + self.interval
if self.notifyCompletion:
import notify
notify.sendInfo('%s completed with status: %d' %
(self.getActionName(), self.lastStatus),
'Run count: %d\n' % self.runCount)
def executeImpl(self):
"""This is to be implemented by the derived classes. Return 0 for
success"""
pass
class TestActionHandler(ActionHandlerBase):
def __init__(self, action, argTpl, interval, heartbeat, firstRunTime=None,
initialDelay=None):
if firstRunTime == None:
if initialDelay == None:
firstRunTime = time.time()
else:
firstRunTime = time.time() + initialDelay
super(TestActionHandler, self).__init__("TestAction", firstRunTime,
interval, True, heartbeat, 1)
self.action = action
self.argTpl = argTpl
def executeImpl(self):
self.action(*self.argTpl)
def testSleep(tm):
print 'testSleep('+time.ctime(time.time())+') for: '+str(tm)
time.sleep(tm)
def testExcept(dummy):
print 'throwing exception'
raise Exception('This is a test exception')
if __name__ == '__main__':
# Basic console logger.
rootLogger = logging.getLogger()
rootLogger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(message)s"))
rootLogger.addHandler(handler)
r = Scheduler()
#def enter(initialDelay, interval, repeatable, heartbeat, priority,
# action, argument):
#r.registerAction(TestActionHandler(sys.stdout.write, ("two\n",), 3, False, initialDelay=2))
#r.registerAction(TestActionHandler(sys.stdout.write, ("four\n",), 4, True, firstRunTime=time.time() + 4))
# This results in 2 second gap (interval > sleep).
#r.registerAction(TestActionHandler(testSleep, (1,), 2, True, initialDelay=2))
# This results in 4 second gap (sleep > interval).
#r.registerAction(TestActionHandler(testSleep, (4,), 2, True, initialDelay=2))
# This results in 3 second gap (interval + sleep).
r.registerAction(TestActionHandler(testSleep, (1,), 2, False, initialDelay=2))
#r.registerAction(TestActionHandler(testExcept, (None,), 2, False, initialDelay=2))
r.start()
try:
r.wait()
except:
log.exception('cought from scheduler')