#********************************************************************
#
# Copyright (C) 2005-2006 Hari Krishna Dara
#
# This file is part of p4admin.
#
# p4admin is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# p4admin is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
#*******************************************************************
import os
import os.path
import stat
import sys
import re
import logging
import time
import datetime
import subprocess
import atexit
import threading
log = logging.getLogger(__name__)
shell_error = 0
# So that we can abort it while exiting.
curProcess = None
procMonThread = None
def sendWarning(jobName, expectedRunDuration, lastRunStTime):
log.debug('JobMonitorThread.run done waiting, no '+
'notification, sending email')
import notify
notify.sendWarning(('%s job running for too long, '+
'started at: %s') % (jobName, lastRunStTime))
# From commands.getstatusoutput()
def execute(cmd, verbosity=3, expectedRunDuration=None, callback=sendWarning):
"""Return output of executing cmd in a shell as a string."""
#if verbosity >= 2:
# log.info("Executing: %s", cmd)
#text = pipe.read()
#sts = pipe.close()
#if sts is None: sts = 0
#global shell_error
#shell_error = sts
#if text[-1:] == '\n': text = text[:-1]
#log.info(text)
#return text
output = execute2(cmd, verbosity, expectedRunDuration, callback)
return ''.join(output)
def execute2(cmd, verbosity=True, expectedRunDuration=None, callback=sendWarning):
"""Return output of executing cmd in a shell as a tuple of lines.
If verbosity is:
0 - don't log any information
1 - log only warnings
2 - log only the command being executed
3 - log both command and output of the command"""
global shell_error
global curProcess
global procMonThread
if not procMonThread:
procMonThread = JobMonitorThread()
procMonThread.start()
if verbosity >= 2:
log.info(">>> %s", cmd)
else:
log.debug(">>> %s", cmd)
#pipe = os.popen(cmd + ' 2>&1', 'r')
if sys.modules.has_key('win32service'): # If service module is loaded.
# This avoids the TypeError for missing stdin while running as service.
# Based on the workaround suggested in the bug report at:
# http://python.org/sf/1238747
log.debug('using nul stdin for child')
stdin = file('nul', 'r')
else:
log.debug('using default stdin for child')
stdin = None
try:
curProcess = subprocess.Popen(cmd, shell=True, stdin=stdin,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
finally:
if stdin != None:
stdin.close()
log.debug('execute: created proces pid: %d', curProcess.pid)
pipe = curProcess.stdout
output = []
procMonThread.startMonitoring(cmd.split()[0]+"(pid:"+
str(curProcess.pid)+")", expectedRunDuration, callback)
try:
for line in pipe:
#output[len(output):0] = line
output.append(line)
if verbosity >= 3:
log.info("<<< %s", line.rstrip("\n"))
finally:
procMonThread.endMonitoring()
log.debug('execute: closing pipe')
#sts = pipe.close()
pipe.close()
curProcess.wait()
sts = curProcess.returncode
log.debug('execute: process exited, returncode: ' + str(sts))
curProcess = None
if sts is None: sts = 0
if sts != 0 and verbosity > 0:
log.warning('Command returned failure, return code: %d', sts)
shell_error = sts
# TODO: I remember to have seen a way to determine the LHS type and decide
# whether to return the sequence or convert it to a String (is it called
# covariant return type?). I will not then need two variants of execute.
return output
def killProcess(popen):
"""Kill the process object created by subprocess.Popen"""
if popen == None:
return
log.debug('killProcess: %d', popen.pid)
if os.name == 'nt':
# From: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/347462
import win32api
PROCESS_TERMINATE = 1
try:
handle = win32api.OpenProcess(PROCESS_TERMINATE, False, popen.pid)
win32api.TerminateProcess(handle, -1)
win32api.CloseHandle(handle)
except:
# Most probably, the process already exited.
pass
else:
import signal
try:
os.kill(popen.pid, signal.SIGTERM)
time.sleep(1)
os.kill(popen.pid, signal.SIGHUP)
time.sleep(1)
os.kill(popen.pid, signal.SIGKILL)
except OSError, ex:
# The process already exited.
pass
def killCurrentProcess():
if curProcess:
log.info("At exit, killing process: %d", curProcess.pid)
killProcess(curProcess)
atexit.register(killCurrentProcess)
def getScriptsRoot():
# Based on http://starship.python.net/crew/theller/moin.cgi/HowToDetermineIfRunningFromExe
if hasattr(sys, "frozen"): # py2exe
return os.path.dirname(os.path.dirname(sys.executable))
else:
# This will have back-slashes if run using ActiveState python.
return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def extractFieldValue(field, sep, output):
m = re.compile(field + sep + r'(.*)\r?').search(output)
if m != None:
return m.group(1)
else:
return ''
def escape(str, chars):
return re.compile('(['+chars+'])').sub(r'\\\1', str)
class RestartProgram(Exception):
def __init__(self, msg):
Exception.__init__(self, msg)
def updateScriptFiles():
import config
import notify
if not config.packageSyncEnabled:
return
# If scripts are not in perforce, do nothing.
if not config.packageInPerforce:
return
result = execute('p4 '+config.p4OptionsRemote+ ' sync '+config.scriptsRootNtv
+'/scripts/... '+config.scriptsRootNtv+'/cfg/...')
if shell_error != 0:
import notify
notify.sendError('Error updating the script files: '+result)
if re.search(' - updating ', result) or re.search(' - added ', result) or\
re.search(' - deleted ', result):
log.info('Script files have been updated, requesting a restart: %s',
result)
# If the scripts are loaded from a zip file, we need to refresh the zip
# file itself.
if re.search(r'.zip', __file__):
library = os.path.dirname(__file__)
log.info('Updating files in: %s', library)
try:
updateZip(library)
except IOError, ex:
log.exception('during updateZip')
import notify
notify.sendError('updation of '+library+' failed.\n'+str(ex))
except OSError, ex:
log.exception('during updateZip')
import notify
notify.sendError('updation of '+library+' failed.\n'+str(ex))
raise RestartProgram(
'Restart required, one or more files have been updated')
if config.p4RemoteClient:
result = execute(config.remoteShell+' '+config.p4HostRemote+' p4 '+
config.p4RemoteOptions+' sync '+config.rsyncRemoteConfig)
if shell_error != 0:
import notify
notify.sendError('Error updating the remote configuratil file: '+
config.rsyncRemoteConfig)
def listCheckPts(checkptsDir):
"""Returnes a list of all the .ckp files sorted in reverse order"""
checkpts = [f for f in os.listdir(checkptsDir) if (f.find('.ckp') > 0)]
checkpts.sort(key=checkpointSeqNum, reverse=True)
return checkpts
def listJournals(journalsDir):
"""Returnes a list of all the .jnl files sorted"""
journals = [f for f in os.listdir(journalsDir) if (f.find('.jnl') > 0)]
journals.sort(key=journalSeqNum, reverse=False)
return journals
def removeOldCheckpoints(checkptsDir, journalsDir):
"""Remove old checkpoint and journal files"""
import config
checkpts = listCheckPts(checkptsDir)
if len(checkpts) > config.numOldCheckPoints:
# Find the sequence number of the first checkpoint that is too old. We
# will remove all checkpoints and journals that has a sequence number
# that is smaller than that.
cutOffSeqNum = checkpointSeqNum(checkpts[config.numOldCheckPoints])
log.debug('Checkpoint cutOffSeqNum: %d', cutOffSeqNum)
fileErrors = []
for file in checkpts[config.numOldCheckPoints:]:
file = os.path.normpath(os.path.join(checkptsDir, file))
log.debug("Removing old checkpoint: %s", file)
try:
os.chmod(file, stat.S_IWRITE)
os.remove(file)
except OSError, ex:
fileErrors.append(file+": "+str(ex))
#file = checkptsDirPsx+'/'+file
#log.debug("Removing old checkpoint: %s", file)
#result = execute('rm -f '+file)
#if shell_error != 0:
# fileErrors.append(file+": "+result)
for file in listJournals(journalsDir):
if journalSeqNum(file) > cutOffSeqNum:
continue
file = os.path.normpath(os.path.join(journalsDir, file))
log.debug("Removing old journal: %s", file)
try:
os.chmod(file, stat.S_IWRITE)
os.remove(file)
except OSError, ex:
fileErrors.append(file+": "+str(ex))
#file = journalsDirPsx+'/'+file
#log.debug("Removing old journal: %s", file)
#result = execute('rm -f '+file)
#if shell_error != 0:
# fileErrors.append(file+": "+result)
if fileErrors:
import notify
notify.sendError("There were errors clearing some old "
"checkpoint/journal files:\n"+"\t\n".join(fileErrors))
def journalSeqNum(j):
return int(re.search(r'jnl\.(\d+)(\.gz|$)', j).group(1))
def checkpointSeqNum(j):
return int(re.search(r'ckp\.(\d+)(\.gz|$)', j).group(1))
class RestartProgram(Exception):
"""Just an exception designed to indicate that a program restart is
requird."""
def __init__(self, msg):
# Exception is an old style class, so we can't use super() here.
Exception.__init__(self, msg)
def getDateTime24hr(timeStr, refDate = datetime.datetime.now()):
"""Given a time string in 24hr format (%H:%M:%S), compute the datetime for
today at the given time."""
timeOfDay = time.strptime(timeStr, "%H:%M:%S")
return time.mktime(refDate.replace(
hour=timeOfDay.tm_hour,
minute=timeOfDay.tm_min,
second=timeOfDay.tm_sec,
microsecond=0).timetuple())
def getDateTime7day(timeStr, weekDay, refDate = datetime.datetime.now()):
"""Given a time string in 24hr format (%H:%M:%S), compute the datetime for the
next given week-day at the given time. If today is already the given week-day,
computes it for today (works like getDateTime24hr).
weekDay is 0-6 for Mon-Sun."""
timeOfDay = time.strptime(timeStr, "%H:%M:%S")
# Rollforward to the next Saturday. It is ok to do verify today at any time, as
# long as today is Saturday.
daysToRoll = (weekDay - refDate.weekday())
if refDate.weekday() > weekDay:
daysToRoll += 7
return time.mktime((refDate + datetime.timedelta(days=daysToRoll)).replace(
hour=timeOfDay.tm_hour,
minute=timeOfDay.tm_min,
second=timeOfDay.tm_sec,
microsecond=0).timetuple())
def computeFirstRunTime(pattern):
"""Pattern is of the format: [weekday,]%H:%M:%S. For time format, see
time.strptime()"""
if pattern == None:
# Run immediately.
return time.time()
#return 0
m = re.match(r'((?P<weekday>\d),)?(?P<time>\d{1,2}:\d{1,2}:\d{1,2})', pattern)
if m == None:
raise Exception('Invalid pattern: '+pattern)
weekday = m.group('weekday')
timestr = m.group('time')
if weekday != None:
return getDateTime7day(timestr, int(weekday))
else:
return getDateTime24hr(timestr)
def timedeltaToSeconds(td):
"""Returns seconds in floating point (microseconds as a fraction). Suitable
for wait()"""
seconds = td.seconds
seconds += td.days*(60*60*24)
seconds += (float(td.microseconds)/10**6)
return seconds
def setDefault(module, paramName, paramDef):
"""Given a parameter name and it's default value, sets the parameter on the
module only if it not already set. This is really meant for config module"""
if not hasattr(module, paramName):
setattr(module, paramName, paramDef)
def copyUpdateZip(source, dest):
"""Create a copy of the zipfile with refreshed contents (if files are
still available)."""
import zipfile
dirname = os.path.dirname(os.path.abspath(source))
zr = zipfile.ZipFile(source, 'r')
zw = zipfile.ZipFile(dest, 'w')
for name in zr.namelist():
localname = os.path.join(dirname, name)
if os.path.exists(localname):
log.debug('updating: %s', name)
zw.write(localname, name)
else:
log.debug('copying: %s', name)
zw.writestr(zr.getinfo(name), zr.read(name))
zw.close()
def updateZip(zfile):
"""Updates the zipfile by refreshing contents from filesystem (if files are
still available). It first creates a copy and overwrites the original zip
file with the copy when successful."""
(dirname, filename) = os.path.split(os.path.abspath(zfile))
tmpCopy = os.path.join(dirname, os.path.splitext(filename)[0]+'copy.zip')
if os.path.exists(tmpCopy):
os.chmod(tmpCopy, stat.S_IWRITE)
os.remove(tmpCopy)
copyUpdateZip(zfile, tmpCopy)
os.chmod(zfile, stat.S_IWRITE)
import shutil
shutil.move(tmpCopy, zfile)
def makeDir(dir):
if not os.path.exists(dir):
try:
log.info("Creating dir: %s", dir)
os.mkdir(dir)
except OSError, ex:
import notify
notify.sendError("Couldn't create: " + dir + ": " +
str(ex))
return 1
elif not os.path.isdir(dir):
import notify
notify.sendError(dir+' already exists but is not a directory')
return 1
return 0
def p4SetServiceLocal(var, value):
import config
result = execute('p4 set -S '+config.p4ServiceNameLocal+
' set '+var+'='+value, verbosity=0)
if shell_error != 0:
import notify
notify.sendError('p4 set -S failed: '+result)
return 1
return 0
def startPerforceLocal():
import config
result = execute('p4 '+config.p4OptionsLocal+' info', verbosity=0)
log.debug('p4 info result: %s', result)
if shell_error == 0:
log.debug('perforce is already running, returning')
# Already started.
return 0
if sys.platform in ('win32', 'cygwin'):
result = execute('net stop '+config.p4ServiceNameLocal, verbosity=0)
if shell_error != 0:
if not re.search(config.p4ServiceNameLocal, result):
log.info("The service: %s doesn't exist, creating it.",
config.p4ServiceNameLocal)
result = execute('svcinst create -n '+config.p4ServiceNameLocal+
' -e p4s', verbosity=2)
if shell_error != 0:
import notify
notify.sendError('Error creating perforce service: '+
config.p4ServiceNameLocal)
return 1
if p4SetServiceLocal('P4JOURNAL', config.p4JournalLocal) != 0:
return 1
if p4SetServiceLocal('P4ROOT', config.p4RootLocal) != 0:
return 1
if p4SetServiceLocal('P4PORT', config.p4PortLocal) != 0:
return 1
result = execute("net start " + config.p4ServiceNameLocal, verbosity=0)
if shell_error != 0:
if not re.search(r'service has already been started', result):
import notify
notify.sendError("Couldn't start local perforce server: " + result)
return 1
else:
result = execute('p4d -r '+config.p4RootLocal+' -p '+
config.p4PortLocal+' -J '+config.p4JournalLocal,
verbosity=0)
if shell_error != 0:
import notify
notify.sendError("Couldn't start local perforce server: " + result)
return 1
return 0
def getNativePath(path):
if sys.platform == 'cygwin':
path = re.sub(r'\\', '/', path)
path = execute('cygpath -m '+path, verbosity=3).strip()
if shell_error != 0:
import notify
notify.sendError('Error while converting path: '+path+
' to local path')
return path
def getPosixPath(path, host=None):
import config
if sys.platform in ('win32', 'cygwin'):
path = re.sub(r'\\', '/', path)
if host != None:
path = execute(config.remoteShell+' '+host+ ' cygpath -u '+path,
verbosity=3,
expectedRunDuration=datetime.timedelta(minutes=5)).strip()
else:
path = execute('cygpath -u '+path, verbosity=3).strip()
if shell_error != 0:
import notify
notify.sendError('Error while converting path: '+path+
' to posix path')
return path
def getMixedPath(path):
"""For windows, this just converts the back-slashes to forward-slashes."""
if sys.platform in ('win32', 'cygwin'):
return re.sub(r'\\', '/', path)
else:
return path
def getLogLevelForStr(levelStr):
"""Given a level in string, lookup the value of real logging attribute"""
try:
return int(getattr(logging, levelStr))
except AttributeError, ex:
raise Exception('No such level defined in logging: '+levelStr)
except TypeError, ex:
raise Exception('The given level is invalid: '+levelStr)
def typeIn(objType, objList):
for obj in objList:
if isinstance(obj, objType):
return True
return False
class JobMonitorThread(threading.Thread):
def __init__(self):
super(JobMonitorThread, self).__init__()
self.setDaemon(True)
self.jobAvailable = False
self.jobName = None
self.expectedRunDuration = None
self.callback = None
self.lastRunStTime = None
self.cv = threading.Condition()
self.running = False
def run(self):
self.running = True
while True:
self.cv.acquire()
try:
# If nothing to monitor, wait indefinitely.
if not self.jobAvailable:
log.debug('JobMonitorThread.run waiting for request')
self.cv.wait()
log.debug('JobMonitorThread.run got notification')
else:
if self.expectedRunDuration:
waitTime = timedeltaToSeconds(
self.expectedRunDuration)
log.debug('JobMonitorThread.run waiting for: %f seconds'
% waitTime)
self.cv.wait(waitTime)
# Indicates that endMonitoring() was not called.
if self.jobAvailable:
log.debug(('JobMonitorThread.run %s did not finish'+
' in time') % self.jobName)
self.callback(self.jobName, self.expectedRunDuration,
self.lastRunStTime)
else:
log.debug(('JobMonitorThread.run %s finished in '+
'time') % self.jobName)
else:
log.debug('JobMonitorThread.run expectedRunDuration '+
'not defined')
finally:
self.cv.release()
self.running = False;
def startMonitoring(self, jobName, expectedRunDuration, callback=sendWarning):
"""Starts monitoring the specified action, provided a
expectedRunDuration is set."""
if not self.running:
return
if expectedRunDuration:
log.debug(('JobMonitorThread.startMonitoring: %s '+
'expectedRunDuration: %s') % (jobName, expectedRunDuration))
self.cv.acquire()
self.jobAvailable = True
self.jobName = jobName
self.expectedRunDuration = expectedRunDuration
self.callback = callback
self.lastRunStTime = datetime.datetime.now()
try:
log.debug('JobMonitorThread.startMonitoring notifying')
self.cv.notify()
finally:
self.cv.release()
else:
# Just for logging purposes.
self.jobName = jobName
self.expectedRunDuration = None
def endMonitoring(self):
if not self.running:
return
if self.expectedRunDuration:
log.debug(('JobMonitorThread.endMonitoring: %s'+
' expectedRunDuration: %s') % (self.jobName,
self.expectedRunDuration))
self.cv.acquire()
self.jobAvailable = False
try:
log.debug('JobMonitorThread.endMonitoring notifying')
self.cv.notify()
# A quick HACK to make sure the thread receives the
# notification. A more sophisticated approach would require
# another protocol, or may be i should just throw away the
# thread? Makes it much simpler.
time.sleep(2)
finally:
self.cv.release()
else:
self.jobName = None
gotCB = False
def testCB(name, erd, st):
log.debug('testCB got callback')
global gotCB
gotCB = True
def test():
time1 = "22:00:00"
time2 = "18:00:00"
t1 = getDateTime24hr(time1)
print time.ctime(t1)
print time.ctime(getDateTime7day(time2, 0))
print time.ctime(getDateTime7day(time2, 1))
print time.ctime(getDateTime7day(time2, 2))
print time.ctime(getDateTime7day(time2, 3))
print time.ctime(getDateTime7day(time2, 4))
print time.ctime(getDateTime7day(time2, 5))
at1 = computeFirstRunTime(time1)
if at1 != t1:
print "FAILED: computeFirstRunTime didn't match getDateTime24hr"
t26 = getDateTime7day(time2, 6)
print time.ctime(t26)
at26 = computeFirstRunTime('6,'+time2)
if at26 != t26:
print "FAILED: computeFirstRunTime didn't match getDateTime7day"
# Requires config.
#if getPosixPath('/usr') != '/usr':
# print 'FAILED: getPosixPath'
#if getLogLevelForStr('DEBUG') != logging.DEBUG:
# print 'FAILED: getLogLevelForStr'
d1 = datetime.datetime.now()
time.sleep(3)
d2 = datetime.datetime.now()
td = d2 - d1
secs = timedeltaToSeconds(td)
if secs < 3 or secs > 4:
print 'FAILED: timedeltaToSeconds'
mon = JobMonitorThread()
mon.start()
#rootLogger = logging.getLogger()
#rootLogger.setLevel(logging.DEBUG)
#handler = logging.StreamHandler(sys.stdout)
#handler.setFormatter(logging.Formatter("%(message)s"))
#rootLogger.addHandler(handler)
print 'Testing monitoring'
global gotCB
gotCB = False
mon.startMonitoring('test1', datetime.timedelta(seconds=10), testCB)
time.sleep(2)
mon.endMonitoring()
if gotCB:
print 'FAILED: test1'
time.sleep(2)
gotCB = False
mon.startMonitoring('test2', datetime.timedelta(seconds=2), testCB)
time.sleep(5)
mon.endMonitoring()
if not gotCB:
print 'FAILED: test2'
time.sleep(2)
print 'Testing execute'
gotCB = False
execute('sleep 2', 3, datetime.timedelta(seconds=5), testCB)
if gotCB:
print 'FAILED: execute1'
gotCB = False
execute('sleep 5', 3, datetime.timedelta(seconds=2), testCB)
if not gotCB:
print 'FAILED: execute2'
if __name__ == '__main__':
test()