#!/usr/bin/env python
# -*- coding: latin1 -*-
#----------------------------------------------------------------------------
# p4ftpsync.py: Sync of a pseudo remote Perforce client via FTP.
#
# Usage: See __doc__ string below.
#
# Requires:
# - Perforce (www.perforce.com) client installed and an accessible server
# somewhere. Tested for version 2005.1 (probably works for other versions)
# - Python 2.3 or newer (www.python.org)
# - OS: portable - Mac might cause pb with path separators (':')
#
# TOD0:
# - Adapt to other VCM like CVS or subversion (useful ?)
# - Refactor ftp interface so to hide variations in FTP servers, in
# particular error codes & msgs.
# - Truncate or rotate logs.
# - Client/Server mode so that any client can request the update while
# only the server operates on the mirror client.
# - -f: Distinguish between redo the last sync and resync from scratch
# (Not sure it's feasible)
# - ADD and UPDATE are handled the same way, which makes the prog more
# tolerant but less consistent (eg UPDATE will create a dir if non
# existent while it should generate an error). STRICT mode vs LAX mode.
# - prune yes/no, overwrite yes/no, etc.. might be parameters or options
# - Average speed for upload is biased since time elapsed includes all kinds
# of ops (so speed is actually >= than the one measured).
# - Multithreaded prune ?
# - To be really smart, the determination of the target root dir should
# rely on a *mapping* spec ("view") like for a clientview in P4 !
#
# LICENSE:
#
# Copyright (C) 2004-2005 Richard Gruet (p4ftpsync@rgruet.net)
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose and without fee or royalty is hereby
# granted, provided that the above copyright notice appear in all copies
# and that both that copyright notice and this permission notice appear
# in supporting documentation or portions thereof, including modifications,
# that you make.
#
# THE AUTHOR RICHARD GRUET DISCLAIMS ALL WARRANTIES WITH REGARD TO
# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
# FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE !
#
# $Id: //depot/scripts/p4ftpsync.py#79 $
#----------------------------------------------------------------------------
'''
Usage: [python] p4ftpsync[.py] [options] whatToSync
Synchronizes a (pseudo) remote Perforce client via FTP.
If you are e.g. a Web designer and use to work on a local copy of your site
under Perforce and upload afterwards changes via FTP to your online site,
p4ftpsync may save you the hassle of identifying the changes and manually
upload them (the other alternative being to upload the WHOLE site in doubt).
All you have to do is to define a dedicated P4 client which will act as a
mirror/proxy for the remote site. p4ftpsync performs sync on this local
image to determine what to do, and then updates accordingly the remote site
via FTP.
<whatToSync> specifies what to sync, and must be a valid P4 file spec such as
the ones used in the P4 sync command (e.g. //depot/MyProject/MyDir/...,
//depot/Proj2/main.c#2, @label, etc...). For reverse sync, the revision range
info is N.S., since the comparison is always done with the HEAD revision.
Reverse sync:
------------
p4ftpsync may also be run in reverse sync mode (option -r). In this mode,
the files on the remote site are compared with the latest revision files in P4
and any change on the remote site is reported into a new P4 changelist
(optionally submitted at the end of the process).
This mode is handly if some changes are done directly on the remote site and
you want to easily keep your P4 reference repository up to date.
Symbolic links are supported on the remote site but mapped to real files
locally/in P4.
Valid options:
-------------
------------ Perforce options -------------------------
All these values have default, see "Default values" section
--p4Port HOST[:PORT] P4 host IP address, default port 1666
--p4User P4USER P4 user name to use
--p4Passwd P4PASSWORD P4 user password
--p4Client P4CLIENT P4 client to use as the mirror of the remote site
------------ FTP options (for remote site) ------------
--ftpHost HOST[:PORT] FTP host IP address, default port 21 [mandatory]
--ftpUser USER FTP user name [mandatory]
--ftpPasswd PASSWORD FTP user password [default: will be prompted]
--ftpRoot ROOTRELPATH Path of the root folder of the site [mandatory]
------------ Normal sync (p4->remote) options ---------
-o, --scriptDir DIR
Directory in which to generate the Python update script
in normal sync [default: <thisScriptDir>/p4FtpSyncScripts]
------------ Reverse sync (remote->P4) options --------
-r, --reverse Reverse sync : Changes occurred on the remote site are
detected, and a P4 changelist is created but not submitted,
unless option -s is specified.
-s, --submit Submit the P4 changelist. The default is to not submit,
so the changes can be reviewed before (manual) submit.
-c, --comment "COMMENT"
Optional description for the changelist. A default will
be generated if none is specified.
-m, --mailto ADDR1,ADDR2,...
A list of email addr. to send a "file change report" to
[default: do not send an email]
--smtpServer HOST[:PORT]
SMTP server to use to send the mail [default localhost:25]
--smtpUser USER SMTP user name [default: None]
--smtpPasswd PASSWORD
SMTP passord [default: None]
--fromAddr ADDR Email address to put in the 'From' field of the mails
[default: p4ftpsync@p4ftpsync.net -dummy!]
------------------ Common sync options ----------------
-f, --force For normal sync: If specified, the target will be resynced
even if supposedly up to date (same as P4 sync option -f);
For reverse sync: If specified, the file desc. cache will
not be used and the file comparison will always be done.
-t, --test Test/preview mode: For normal sync: do not really P4 sync
the mirror client, generate the script but do not actually
FTP; For reverse sync: do not create P4 changelist or
copy files to client space.
-x, --exclude FILE1,FILE2,... | @FILE
Exclude files from the list of updates to do (handy if
some files must remain different locally from remotely,
e.g. config files, system files).
FILEs are specified as a comma separated list (no spaces)
of patterns (regexp, or alternatively listed in a file
(@FILE), one per line.
When the (end of the) path of a file to sync matches one
of the patterns, it will be excluded from the sync.
------------------ Misc options -----------------------
-v, --verbose Verbose: More trace/error messages on stdout.
-V, --version Print p4ftpsync version on stdout and exit.
-h, --help Print this message on stdout and exit.
Default values:
--------------
p4ftpsync stores (in p4ftpsync.opt) the last values used for options
p4Client, ftpHost, ftpUser and ftpRoot and uses them if no value is provided
on the command line.
Options are stored twice: as associated to the specific target sync spec,
and as generic "last session defaults" (ftpRoot is not stored in the latter
case).
When trying to reload the values for a new session, p4ftpsync first attempts
to reload the target specific values, then the non specific ones.
File desc. cache (reverse sync only):
------------------------------------
Remote file descriptions are saved on disk (<user>@<ftpHost>.fdc files) after
a successful synchronization and used on subsequent syncs to determine if a
file has changed (different file descs), without having to download the file,
which can save a tremendous amount of time.
On the first run of p4ftpsync for a given site, all files from the remote
site will be downloaded and compared to the files in the depot, a lengthy
and bandwith consuming operation ! However next iterations should use the
cache and be considerably faster (the only relatively long operation that
can't be shortened is the determination of the current file structure on the
remote site). When the cache is missing or corrupted, the download of the
entire site (minus the exclusions) occurs again.
The cache can be disabled with option -f, --force (it is then not used for
the current iteration but will still be saved for future use).
Generated FTP Script:
--------------------
In "normal" sync, a Python script is created in the directory indicated by
option -o for the specified target (<whatToSync>). The script is named
<what>_<date>.py (with / and spaces replaced by _). This file represents the
set of FTP actions to synchronize the remote client (site) with a certain
state of the repository, which can prove useful, should the FTP session
failbefore completion: executing the script will "ftp" the changes again.
Log:
---
A file p4ftpsync.log is generated in this script's folder (always verbose,
unlike the on-screen trace). Logs are rotated to p4ftpsync.log.<n>.zip
when they exceed a certain size.
'''
_SHORT_OPTS = 'c:fhm:o:rstvVx:'
_LONG_OPTS = ['comment=', 'exclude=', 'force', 'help', 'mailto=',
'scriptDir=', 'p4Port=', 'p4User=', 'p4Passwd=', 'p4Client=',
'fromAddr=', 'ftpHost=', 'ftpUser=', 'ftpPasswd=', 'ftpRoot=',
'reverse', 'smtpServer=', 'smtpUser=', 'smtpPasswd=',
'submit', 'test', 'verbose', 'version']
__version__ = '0.5.' + '$Revision: #79 $'[12:-2]
__author__ = 'Richard Gruet', 'p4ftpsync@rgruet.net'
__date__ = '$Date: 2005/09/20 $'[7:-2], '$Author: rgruet $'[9:-2]
__since__ = '2004-01-12'
__doc__ += '\n@author: %s (%s)\n@version: %s' % (__author__[0],
__author__[1], __version__)
# For py2exe, which can't guess which db implementation to import for shelve:
import dbhash, weakref
import shelve
import string, re, os, sys, ftplib, time, getpass, threading, socket, copy
import shutil, cPickle
pyVersion = sys.version[:3]
if pyVersion < '2.3':
raise Exception('Sorry, p4ftpsync requires Python 2.2 or newer')
if pyVersion < '2.4':
from sets import Set
else:
Set = set
#<<<<<CHANGEME! Global User settings (in addition to invocation args) <<<<<<<
# Max size of log file in bytes, log is "rotated" beyond :
maxLogSize = 4000000
#>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# Defined Ftp operation types :
ADD, UPDATE, DELETE, MKDIRS = 'ADD', 'UPDATE', 'DELETE', 'MKDIRS'
GET, LISTDIR = 'GET', 'LISTDIR'
P4_SYNC_LEGAL_ACTIONS = { 'added as': ADD,
'updating': UPDATE,
'refreshing': UPDATE,
'deleted as': DELETE
}
_DEFAULT_FILE_PERM = 0664 # default file permissions for a file
_EXEC_MASK_PERM = 0111 # Execute permission bits for a file
def _getThisScriptDir():
try:
return os.path.dirname(__file__)
except NameError: # for py2exe
return '.'
def usage(msg='', exitCode=0):
print 'p4ftpsync: %s\n%s\n' % (msg, __doc__)
sys.exit(exitCode)
def fatal(msg):
log('** Fatal ** %s' % msg)
sys.exit(1)
def error(msg):
log('*Error* %s' % msg)
def warning(msg):
log('*Warning* %s' % msg)
def info(msg):
sys.stdout.write(msg+'\n')
_verbose = False
def vInfo(msg):
if _verbose:
sys.stdout.write(msg+'\n')
_logName = 'p4ftpsync.log'
_logDir = _getThisScriptDir()
_logPath = os.path.join(_logDir, _logName)
# Rotate log if too big: File is archived as p4ftpsync.log.<n>.zip,
# then truncated :
try:
if os.path.getsize(_logPath) >= maxLogSize:
# Determine max indice already used for archived files:
fileNames = os.listdir(_logDir)
l = re.findall(r'%s\.(\d+)\.zip(?m)'% _logName, '\n'.join(fileNames))
n = max([int(x) for x in l])
# Zip current log :
zippedPath = os.path.join(_getThisScriptDir(),
'%s.%d.zip' % (_logName, n+1))
from zipfile import ZipFile, ZIP_DEFLATED
zip = ZipFile(zippedPath, 'w', ZIP_DEFLATED)
zip.write(_logPath, )
zip.close()
os.remove(_logPath)
except OSError:
pass
_log = open(_logPath, 'a')
_logLock = threading.RLock()
def genLog(msg, verbose, display=True):
now = time.strftime('%d-%m-%Y %H:%M:%S')
alreadyTimeStamped = False
_logLock.acquire()
try:
for line in msg.split('\n'):
if line and not alreadyTimeStamped:
line = '[%s] %s' % (now, line)
alreadyTimeStamped = True
_log.write('%s\n' % line)
_log.flush()
if display:
if verbose: vInfo(msg)
else: info(msg)
finally:
_logLock.release()
def log(msg, display=True):
genLog(msg, False, display)
def vLog(msg, display=True):
genLog(msg, True, display)
#-------------------------------------------------------------------------
class _LastSessionOptionStore (object):
#-------------------------------------------------------------------------
''' Stores & retrieves options from the last session, per sync spec.
'''
def __init__(self, filePath=None):
if not filePath:
filePath = os.path.join(_getThisScriptDir(), 'p4ftpsync.opt')
self._filePath = filePath
self._shelf = shelve.open(filePath)
def loadOptions(self, syncSpec):
'''Loads last session options for the given P4 sync spec.
@param syncSpec Something like //depot/MyProj/file1#3
@return Dict of options (may be empty if syncSpec not found)
'''
try:
opts = self._shelf[syncSpec]
except KeyError:
opts = {}
return opts
def storeOptions(self, syncSpec, optionDict):
''' Stores options for the given P4 sync spec.
'''
self._shelf[syncSpec] = optionDict
self._shelf.close() # ensure proper flush
self._shelf = shelve.open(self._filePath)
vLog('options stored for %s = %s' % (syncSpec, optionDict))
_optionStore = _LastSessionOptionStore() # Unique instance.
#-------------------------------------------------------------------------
def _getArgs(argList=sys.argv[1:]):
#-------------------------------------------------------------------------
''' Processes arguments.
'''
LAST_SESSION = '__lastSession__'
# Parse args :
import getopt
try:
opts, whatToSync = getopt.getopt(argList, _SHORT_OPTS, _LONG_OPTS)
except getopt.GetoptError:
usage('invalid option', 2)
if not whatToSync:
usage('You have not specified something to sync.', 2)
if len(whatToSync) > 1:
log('*Warning* More than one target specified. Only the FIRST one '
'("%s") will be processed.' % whatToSync[0])
whatToSync = whatToSync[0]
# Set defaults:
comment = mailto = None
force = verbose = reverse = submit = test = False
excludes = []
scriptDir = os.path.join(_getThisScriptDir(), 'p4FtpSyncScripts')
smtpServer = 'localhost'
smtpUser = smtpPasswd = None
fromAddr = 'p4ftpsync@p4ftpsync.net' # Dummy!
s = _execP4Cmd('p4 info')
p4User = re.findall('^User name: (.+)$(?m)', s)[0]
p4Port = re.findall('^Server address: (.+:\d+)$(?m)', s)[0]
p4Passwd = ftpPasswd = None
p4Client = ftpHost = ftpUser = ftpRoot = None # mandatory options
# Reload options from previous session if available:
# Try load last options used for this specific sync target:
oldOpts = _optionStore.loadOptions(whatToSync)
if oldOpts:
for optName, value in oldOpts.items():
exec '%s = %s' % (optName, repr(value))
## (vars().update(oldOpts) doesn't work in Python 2.3)
vLog("Reloaded previous options for target %s: %s" %
(whatToSync, oldOpts))
else:
# Try load last options used, without target consideration:
oldOpts = _optionStore.loadOptions(LAST_SESSION)
if oldOpts:
for optName, value in oldOpts.items():
exec '%s = %s' % (optName, repr(value))
## (vars().update(oldOpts) doesn't work in Python 2.3)
vLog('Reloaded non-target specific options from last '
'session: %s' % oldOpts)
# Analyze args :
for opt, value in opts:
if opt == '--p4Port':
p4Port = value
elif opt == '--p4User':
p4User = value
elif opt == '--p4Passwd':
p4Passwd = value
elif opt == '--p4Client':
p4Client = value
elif opt == '--ftpHost':
ftpHost = value
elif opt == '--ftpUser':
ftpUser = value
elif opt == '--ftpPasswd':
ftpPasswd = value
elif opt == '--ftpRoot':
ftpRoot = value
elif opt == '--smtpServer':
smtpServer = value
elif opt == '--smtpUser':
smtpUser = value
elif opt == '--smtpPasswd':
smtpPasswd = value
elif opt == '--fromAddr':
fromAddr = value
elif opt in ('-f', '--force'):
force = True
elif opt in ('-o', '--scriptDir'):
scriptDir = value
if scriptDir[-1] == '/':
scriptDir = scriptDir[:-1]
elif opt in ('-c', '--comment'):
comment = value.strip()
elif opt in ('-m', '--mailto'):
mailto = value
elif opt in ('-r', '--reverse'):
reverse = True
elif opt in ('-s', '--submit'):
submit = True
elif opt in ('-t', '--test'):
test = True
elif opt in ('-x', '--exclude'):
if value[0] == '@':
f = value[1:]
patterns = []
try:
for line in open(f).read().split('\n'):
line = line.strip()
if line and not line.startswith('#'):
patterns.append(line)
except Exception, e:
fatal('option %s %s: %s' % (opt, value, e))
else:
patterns = value.split(',')
for pattern in patterns:
# Pattern will be matched against the END of file paths :
if not pattern.startswith('^'):
if not pattern.startswith('.*'):
pattern = '^.*' + pattern
else:
pattern = '^' + pattern
if not pattern.endswith('$'):
pattern += '$'
# Check validity :
try:
re.compile(pattern)
except Exception, e:
fatal('Invalid exclude pattern: %s: %s' %
(e.__class__.__name__, e))
excludes.append(pattern)
elif opt in ('-h', '--help'):
usage()
elif opt in ('-v', '--verbose'):
verbose = True
elif opt in ('-V', 'version'):
print '%s\n' % __version__
sys.exit(0)
# Ask user for missing required values :
for optName in ('p4Client', 'ftpHost', 'ftpUser', 'ftpRoot'):
while not eval(optName):
value = raw_input('%s (required) ? ' % optName).strip()
exec '%s = %s' % (optName, repr(value))
# Return options as a dictionary :
try: del argList, getopt, opt, opts, optName, oldOpts, s, value
except: pass
# Save (some) options for next session:
opts = { 'p4Client': p4Client, 'ftpHost': ftpHost, 'ftpUser': ftpUser,
'ftpRoot': ftpRoot, 'smtpServer': smtpServer,
'smtpUser': smtpUser, 'fromAddr': fromAddr }
_optionStore.storeOptions(whatToSync, opts)
del opts['ftpRoot']
_optionStore.storeOptions(LAST_SESSION, opts)
return vars()
#==========================
# Synchronized objects
#==========================
#-------------------------------------------------------------------------
class SyncedDict(dict):
#-------------------------------------------------------------------------
''' A dictionary with a SUBSET of its methods synchronized.
'''
def __init__(self):
self._lock = threading.RLock() # mutex
dict.__init__(self)
# Operations that are synchronized :
def __repr__(self):
return self._sync(dict.__repr__, self)
def __len__(self):
return self._sync(dict.__len__, self)
def __getitem__(self, key):
return self._sync(dict.__getitem__, self, key)
def __setitem__(self, key, value):
self._sync(dict.__setitem__, self, key, value)
def __delitem__(self, key):
self._sync(dict.__delitem__, self, key)
def __contains__(self, elt):
return self._sync(dict.__contains__, self, elt)
def __iter__(self):
return self._sync(dict.__iter__, self)
def _sync(self, fct, obj, *args, **kargs):
self._lock.acquire()
try:
return fct(obj, *args, **kargs)
finally:
self._lock.release()
#-------------------------------------------------------------------------
class SyncedList(list):
#-------------------------------------------------------------------------
''' A list with a SUBSET of its methods synchronized..
'''
def __init__(self, initSeq=()):
self._lock = threading.RLock() # mutex
list.__init__(self, initSeq)
# Operations that are synchronized :
def __repr__(self):
return self._sync(list.__repr__, self)
def __len__(self):
return self._sync(list.__len__, self)
def __getitem__(self, key):
return self._sync(list.__getitem__, self, key)
def __setitem__(self, key, value):
self._sync(list.__setitem__, self, key, value)
def __delitem__(self, key):
self._sync(list.__delitem__, self, key)
def __contains__(self, elt):
return self._sync(list.__contains__, self, elt)
def __getslice__(self, i, j):
return self._sync(list.__getslice__, self, i, j)
def __setslice__(self, i, j, seq):
return self._sync(list.__setslice__, self, i, j, seq)
def __iter__(self):
return self._sync(list.__iter__, self)
def __add__(self, other):
return self._sync(list.__add__, self, other)
def append(self, elt):
return self._sync(list.append, self, elt)
def extend(self, other):
return self._sync(list.extend, self, other)
def _sync(self, fct, obj, *args, **kargs):
self._lock.acquire()
try:
return fct(obj, *args, **kargs)
finally:
self._lock.release()
#-------------------------------------------------------------------------
class SafeCounter(object):
#-------------------------------------------------------------------------
''' A thread-safe counter whose access is PARTIALLY synchronized.
'''
def __init__(self, initValue=0):
self._lock = threading.RLock() # mutex
self.setValue(initValue)
# Operations that are synchronized :
def __repr__(self):
return repr(self.getValue())
def setValue(self, value):
self._lock.acquire()
try:
self._value = value
finally:
self._lock.release()
def getValue(self):
return self._syncEval('self._value')
def __int__(self):
return self._syncEval('int(self._value)')
def __float__(self):
return self._syncEval('float(self._value)')
def __cmp__(self, other):
return self._syncEval('cmp(self._value, %s)' % other)
def __neg__(self):
return self._syncEval('-self._value')
def __add__(self, other):
return self._syncEval('self._value + %s' % other)
def __sub__(self, other):
return self._syncEval('self._value - %s' % other)
def __mul__(self, other):
return self._syncEval('self._value * %s' % other)
def __div__(self, other):
return self._syncEval('self._value / %s' % other)
def incrAndGet(self, increment=1):
''' Increments the counter and also returns the new value.'''
self._lock.acquire()
try:
self._value = self._value + increment
return self._value
finally:
self._lock.release()
def __iadd__(self, other):
self._lock.acquire()
try:
self._value = self._value + other
return self # Must return something or self will be deleted !!
finally:
self._lock.release()
def __isub__(self, other):
self._lock.acquire()
try:
self._value -= other
return self # Must return something or self will be deleted !!
finally:
self._lock.release()
def __imul__(self, other):
self._lock.acquire()
try:
self._value *= other
return self # Must return something or self will be deleted !!
finally:
self._lock.release()
def __idiv__(self, other):
self._lock.acquire()
try:
self._value /= other
return self # Must return something or self will be deleted !!
finally:
self._lock.release()
def _syncEval(self, whatToEval):
self._lock.acquire()
try:
return eval(whatToEval)
finally:
self._lock.release()
# end class SafeCounter
#========================
# PERFORCE related code
#========================
def _formatP4GlobalOpts(**options):
'''Format P4 command global options from this script options.
'''
globalOpts = '-p %(p4Port)s -u %(p4User)s -c %(p4Client)s' % options
if options['p4Passwd'] is not None:
globalOpts += ' -P "%(p4Passwd)s"' % options
#(otherwise use default contextual passwd)
return globalOpts
def _getFilePathKey(localPath, p4Spec, **opts):
''' Computes file path used as key in dict fileStat, fileDesc cache...
'''
p4ClientPath = getP4ClientPathFor(p4Spec, **opts)
assert localPath.startswith(p4ClientPath)
return localPath[len(p4ClientPath):].replace('\\', '/')
_P4ClientPaths = {} # cache for getP4ClientPathFor
#------------------------------------------------------------------------------
def getP4ClientPathFor(p4DepotFile, **options):
#------------------------------------------------------------------------------
''' Returns the local (current) client path for the given depot file.
'''
global _P4ClientPaths
globalOpts = _formatP4GlobalOpts(**options)
# p4DepotFile may be a file or a directory, must try separately:
p4DepotFile = p4DepotFile.rstrip('...').rstrip('*').rstrip('/');
if _P4ClientPaths.has_key(p4DepotFile):
return _P4ClientPaths[p4DepotFile] # try cache
ok = False
for p4Spec in (p4DepotFile, p4DepotFile+'/*'):
cmd = 'p4 %s fstat %s' % (globalOpts, p4Spec)
w, r, e = os.popen3(cmd)
stat = r.read()
r.close() # or will block (in Python 2.4 at least)
errMsg = e.read()
if not errMsg:
ok = True
break
if not errMsg.endswith('- no such file(s).\n'):
break
if not ok:
raise Exception("Can't execute '%s': %s" % (cmd, errMsg))
# Extract the first 'clientFile' field from the status :
m = re.search('clientFile (.+)$(?m)', stat)
clientPath = m.groups()[0]
if p4Spec.endswith('/*'):
clientPath = os.path.dirname(clientPath)
#raw_input('clientPath=%s' % clientPath) ####
_P4ClientPaths[p4DepotFile] = clientPath
return clientPath
#------------------------------------------------------------------------------
class _RevisionStampMgr(object):
#------------------------------------------------------------------------------
''' Manages "Head Revision stamps".
These stamps are used in reverse sync to "sign" file descriptions
in the _FileDescCache.
A file stamp must have the following characteristics:
- be a number >= 0
- be different for each change/revision of the file
- be the same for all files in the same changelist.
- represent the current head revision for the file
Ideally, it should also be uniquely allocated, even if current changes
are abandoned/reverted (this is the case of changelist # in P4).
'''
def getNullHeadRevStamp(self):
return 0
def getHeadRevStampForFile(self, filePath):
''' Returns the (string) head revision stamp for the given file.
'''
raise NotImplementedError('Should implement fct in derived class')
def setHeadRevStampForCurrentChanges(self, newStamp):
''' Sets the rev stamp that will become the head rev stamp after
current changes are committed for *all* files involved.
'''
raise NotImplementedError('Should implement fct in derived class')
def getHeadRevStampForCurrentChanges(self):
''' Gets the rev stamp that will become the head rev stamp after
current changes are committed for *all* files involved.
'''
raise NotImplementedError('Should implement fct in derived class')
#------------------------------------------------------------------------------
class _P4RevisionStampMgr(_RevisionStampMgr):
#------------------------------------------------------------------------------
''' In P4 the "head changelist number" is used as stamp.
@see: _RevisionStampMgr
'''
def __init__(self, p4Spec=None, **opts):
if p4Spec:
self.fileStats = getP4FileStatsForSpec(p4Spec, **opts)
else:
self.fileStats = {}
self.p4Spec = p4Spec
self.newStamp = None # undetermined
def getHeadRevStampForFile(self, filePath):
return int(self.fileStats[filePath]['headChange'])
def setHeadRevStampForCurrentChanges(self, newStamp):
self.newStamp = int(newStamp)
def getHeadRevStampForCurrentChanges(self):
return self.newStamp
_P4FileStatsCache = {} # cache for getP4FileStatsForSpec
#-------------------------------------------------------------------------
def getP4FileStatsForSpec(p4Spec, **opts):
#-------------------------------------------------------------------------
''' Returns a dict of info about files in the given file spec.
Returns a dictionary of file info, indexed by file "root relative"
path, the root being the P4 client path root corresponding to p4Spec,
and '\' are converted to '/', eg '/index.php'.
Each file Info is itself a dictionary with keys corresponding to the
output of the p4 cmd fstat:
'depotFile' ... depot path to file
'clientFile' ... local path to file
'headAction' ... action taken at head rev, if in depot
'headType' ... head revision type, if in depot
'headRev' ... head revision number, if in depot...
'headChange' ... head revision changelist number, if in depot
etc...
@param p4Spec: [str]a P4 sync spec, eg '//depot/GG3/www/...'
@param opts: p4 options (p4User, etc...)
'''
global _P4FileStatsCache
if _P4FileStatsCache.has_key(p4Spec): # use cache if possible
return _P4FileStatsCache[p4Spec]
log('Getting file info for %s' % p4Spec)
globalOpts = _formatP4GlobalOpts(**opts)
cmd = 'p4 %s fstat %s' % (globalOpts, p4Spec)
vLog(cmd)
w, r, e = os.popen3(cmd)
stats = r.read()
r.close() # or will block (in Python 2.4 at least)
errMsg = e.read()
if errMsg:
raise Exception("Can't execute '%s': %s" % (cmd, errMsg))
fileStats = {}
for stat in stats.split('\n\n'):
if stat:
fileStat = {}
for line in stat.split('\n'):
if line:
key, value = line.split(' ', 2)[1:]
fileStat[key] = value
clientFile = fileStat['clientFile']
fileStats[_getFilePathKey(clientFile, p4Spec, **opts)] = fileStat
#print 'fileStats=%s' % fileStats ###
_P4FileStatsCache[p4Spec] = fileStats
return fileStats
#-------------------------------------------------------------------------
def getP4SyncActions(syncSpec, excluder, quiet, **options):
#-------------------------------------------------------------------------
''' Builds the list of actions to perform the sync.
This involves :
- P4 sync the client (preview) to get the list of changes
- build the list of actions and write it on disk as a Python script
@param syncSpec: What to sync, as a perforce file spec
@param excluder: To exclude some files from the built script.
@param quiet: If True don't log/trace info messages.
@param **options:
@return: (list of update tasks to perform, path of
script generated|None).
'''
if not excluder: excluder = _nullExcluder
cmd, lines = _p4Sync(syncSpec, True, **options)
root = string.join(syncSpec.split('/')[:-1], '/')
RE_P4_INFO_LINE = re.compile(r'^info: %s/(.+)#.+ - ([ \w]+) (.+)$' % root)
RE_P4_ERROR_LINE = re.compile(r'^error: (.*)$')
p4ClientPath = getP4ClientPathFor(syncSpec, **options)
fileStats = getP4FileStatsForSpec(syncSpec, **options)
toDo = []
for line in lines:
#print line ##
m = RE_P4_ERROR_LINE.match(line)
if m:
errMsg = m.group(1)
if errMsg.endswith('file(s) up-to-date.'): # already synced
break
fatal('P4 error while executing "%s": %s' % (cmd, m.group(1)))
m = RE_P4_INFO_LINE.match(line)
if m is None: # empty line
continue
target, action, source = m.groups()
try:
action = P4_SYNC_LEGAL_ACTIONS[action]
except KeyError:
fatal('Illegal P4 action "%s" found in P4 info line "%s"' %
(action, line))
## [Experimental] When ADDING (not updating) a file, try to set the
# x bit on the server if the file has a +x P4 file type modifier:
if action == 'ADD':
permissions = None
key = _getFilePathKey(source, syncSpec, **options)
fileType = fileStats[key]['headType']
if 'x' in fileType: # executable type
permissions = _DEFAULT_FILE_PERM | _EXEC_MASK_PERM
vLog('Exec Permission set for %s -> perm=%s' %
(source, oct(permissions)))
toDo.append((action, source, target, permissions))
else:
toDo.append((action, source, target))
if toDo:
# Write on disk the list of update actions to perform, as a script:
if not quiet: vLog('Writing update script')
scriptPath = _createFTPSyncScript(toDo, syncSpec, excluder, **options)
else:
if not quiet:
log('--> Nothing to sync, already up to date. If you know that '
'the remote site is not up to date because of a failure during a '
'previous upload attempt, and you have an update Python script, '
'you might want to execute it to sync the remote site.\n'
'You can also force a resync to the head revision with the '
'option --force (or -f).'
)
scriptPath = None
return toDo, scriptPath
#-------------------------------------------------------------------------
def _p4Sync(syncSpec, preview, **options):
#-------------------------------------------------------------------------
''' Executes a P4 sync command.
If preview is True, the client is not really synced, the function
is only used to get the list of changes.
@param syncSpec: What to sync, as a P4 file spec
@param preview: If True, don't really sync the client; if False
synchronize really the client.
@param **options:
@return: (P4 cmd, output of cmd)
'''
globalOpts = '-s ' + _formatP4GlobalOpts(**options)
# P4 options :
p4Opts = ''
mode = ''
if options['force']:
p4Opts += '-f'
mode = 'FORCE SYNC / '
if preview:
p4Opts += ' -n'
mode += 'PREVIEW'
else:
mode += 'REAL'
# Format cmd:
cmd = 'p4 %s sync %s %s' % (globalOpts, p4Opts, syncSpec)
vLog('P4 cmd = "%s" (%s)' % (re.sub('-P ".*"', '-P XXX', cmd), mode))
# Execute cmd :
lines = _execP4Cmd(cmd).split('\n')
return (cmd, lines)
#------------------------------------------------------------------------------
def P4SubmitChanges(changes, p4Port, p4Client, p4User, p4Passwd,
comment, submit=True, preview=False):
#------------------------------------------------------------------------------
''' Submits the given changes to Perforce.
Creates a changelist for the changes and submits them if <submit> True
(and not preview). If <preview> is True, all P4 actions are simulated.
@param changes: list returned by Local2FtpDirComparator.cmpDirs
@param comment: changelist description.
@param submit: Whether to submit or not the change list created.
@return: [int]The changelist number.
'''
def _copyFile(f1, f2, preview=False):
if preview:
vLog('PREVIEW: copy %s to %s' % (f1, f2))
else:
# Create dest. directories if needed :
destDir = os.path.dirname(f2)
if not os.path.exists(destDir):
os.makedirs(destDir, mode=0777)
shutil.copy(f1, f2)
globalOpts = '-s ' + _formatP4GlobalOpts(p4Port=p4Port, p4Client=p4Client,
p4User=p4User, p4Passwd=p4Passwd)
## Create a changelist:
changelistSpec = ('Change: new\nClient: %s\nUser: %s\n'
'Status: new\nDescription: %s' % (p4Client,
p4User, comment))
res = _execP4Cmd('p4 %s change -i' % globalOpts, changelistSpec, preview)
m = re.match(r'info: Change (?P<chgListNb>\d+) created.', res)
if m is None:
raise Exception("Can't create a changelist: unable to parse "
'P4 response "%s"' % res)
chgListNb = int(m.group('chgListNb'))
vLog('-> Created changelist #%d' % chgListNb)
## Add changes to changelist :
# Optimizes speed by grouping similar operations (P4 cmds are time-costly)
def _groupFiles(files, groupBy=10):
''' Splits the file list into slices of groupBy x elements,
and formats each slice into one string with the filenames
double quoted.
Returns the list of formatted strings.
'''
groupedFiles = []
for i in range(0, len(files), groupBy):
slice = files[i:i+groupBy]
groupedFiles.append(' '.join(['"%s"' % f for f in slice]))
return groupedFiles
# Group operations by type :
added, deleted, changed, toCopy = [], [], [], []
for op, f1, f2 in changes:
if op == 'ADDED':
_copyFile(f2, f1, preview)
added.append(f1)
elif op =='DELETED':
deleted.append(f1)
elif op == 'CHANGED':
changed.append(f1)
toCopy.append((f2, f1)) # delay copy until open for edit
# (otherwise R/O)
else:
raise Exception('Invalid op type: %s' % op)
# Execute ops by chunks :
groupBy = 10
if deleted:
for files in _groupFiles(deleted, groupBy):
cmd = 'p4 %s delete -c %d %s' % (globalOpts, chgListNb, files)
res = _execP4Cmd(cmd, None, preview)
if res.startswith('error:'):
error('failed to open %s for DELETE in P4: %s' % (files, res))
else:
vLog('Opened %s for DELETE in changelist -> %s' % (files, res))
if added:
for files in _groupFiles(added, groupBy):
cmd = 'p4 %s add -c %d %s' % (globalOpts, chgListNb, files)
res = _execP4Cmd(cmd, None, preview)
if res.startswith('error:'):
error('failed to open %s for ADD in P4: %s' % (files, res))
else:
vLog('Opened %s for ADD in changelist -> %s' % (files, res))
if changed:
for files in _groupFiles(changed, groupBy):
cmd = 'p4 %s edit -c %d %s' % (globalOpts, chgListNb, files)
res = _execP4Cmd(cmd, None, preview)
if res.startswith('error:'):
error('failed to open %s for EDIT in P4: %s' % (files, res))
else:
vLog('Opened %s for EDIT in changelist -> %s' % (files, res))
for src, dest in toCopy:
_copyFile(src, dest, preview)
vLog('-> All changes added to changelist #%d' % chgListNb)
## Submit the changelist if required :
if not preview and submit:
res = _execP4Cmd('p4 %s submit -c %d' % (globalOpts, chgListNb), None,
preview)
if res.startswith('error:'):
error('failed to submit changelist %d in P4: %s' % (chgListNb,res))
else:
log('->Changes submitted to P4.')
return chgListNb
#-------------------------------------------------------------------------
def _execP4Cmd(cmd, input=None, preview=False):
#-------------------------------------------------------------------------
''' Executes a P4 cmd.
@param cmd: The p4 cmd to execute
@param input: data to be provided as input to the cmd
@param preview: If True, just simulate.
@return: The (stdout) output of the cmd, or '0' if preview==True.
'''
if preview:
vLog('PREVIEW: %s' % cmd)
return '0' # dummy
w, r, e = os.popen3(cmd)
if input:
w.write(input)
w.close() # or will block!
output = r.read()
r.close() # or will block (in Python 2.4 at least)
errMsg = e.read()
if errMsg:
raise Exception("Can't execute '%s': %s" % (cmd, errMsg))
return output
#========================
# FTP related code
#========================
#------------------------------------------------------------------------------
class _FileDescCache(object):
#------------------------------------------------------------------------------
''' Cache of (remote site) file descriptions returned by FTP command LIST.
The purpose of the cache is to save the download of all the files from
the site during the reverse sync, since -after the initial comparison-
it is likely that very few changes will have occured between 2
iterations of this script.
The idea is to cache the file descriptions returned by the FTP server
command LIST. The format of these description is server dependent,
which would make parsing hazardeous, but a desc is guaranteed to change
if a file is modified, so by comparing 2 descriptions of the same file
(at different times) we can determine if the file has been modified on
the remote site without having to know the exact format of the desc.
Only the head revision state of a file must be cached since it is used
for reverse sync.
When we compare the remote file with the P4 head revision file, we
assume that the cached file desc represents the state of the head
revision file in P4, and if the current changes detected are
submitted, the current file desc will also reflect the new head rev
state once the changes are committed. There is a little problem,
though: we can't be certain that the changes will be committed (it is
optional, and the submit could also fail), so we must find a mechanism to detect such a case and invalidate cache entries if necessary.
One solution is to associate a "revision stamp" to each description.
For P4, the Head Changelist Number seems a good candidate. It is unique
and even if a submit fails or is never done, the same number will not
be reallocated. It is certain that after changes are completed, a
file described in the cache corresponds to the corresponding head
revision file in the repository. By associating this revision stamp,
we have a way to check later whether the cache description of a file
is up to date or not, and if not, we will have to download the file
and compare it.
'''
# To detect obsolete caches. Change it whenever cache structure changes !!!
CACHE_VERSION = 1
def __init__(self, cacheFilePath, loadCacheFile=True):
self._cachePath = cacheFilePath
# File description (as returned by LIST) cache. The description cached
# is associated with a "stamp" (the head revision changelist number of
# the file (or 0 if the file doesn't exist) as a way to check that the
# cached desc corresponds to the head rev. of the file :
# { fileAbsPath: (fileDesc, headRevStamp), ...}
if loadCacheFile:
self._fileDescs = self._loadFileDescCache()
else:
self._fileDescs = {}
def __repr__(self):
return '<_FileDescCache at %s>' % self._cachePath
def __getitem__(self, key): # cache[key]
return self._fileDescs[key]
def __setitem__(self, key, value): # cache[key]=value
self._fileDescs[key] = value
def __delitem__(self, key): # del cache[key]
del self._fileDescs[key]
def adjustForChanges(self, changes, newStamp, p4ClientPath):
''' Adjusts the file description cache to take changes into account.
The head Rev changelist # are adjusted to reflect their value when
the changelist is submitted.
@param changes: List of changes returned by
Local2FtpDirComparator.cmpDirs()
@param newStamp: [int] new head rev stamp to associate with
changed files.
@param p4ClientPath: root of client workspace.
'''
fileDescs = self._fileDescs
for op, f1, f2 in changes:
assert f1.startswith(p4ClientPath)
key = f1[len(p4ClientPath):].replace('\\', '/')
if op in ('ADDED', 'CHANGED'):
fileDescs[key] = (fileDescs[key][0], newStamp)
elif op =='DELETED':
try:
del fileDescs[key]
except KeyError:
pass
else:
raise Exception('Invalid op type: %s' % op)
def saveFileDescCache(self):
''' Saves file description cache on disk.
Should be done only after a synchronization has been successfully
completed and cache adjusted to take the (possibly future) submit
into account.
'''
path = self._cachePath
f = open(path, 'wb')
try:
cPickle.dump(self.CACHE_VERSION, f, 1) # 1=binary protocol
cPickle.dump(self._fileDescs, f, 1)
finally:
f.close()
vLog('File desc cache saved to %s' % path)
# Private:
def _loadFileDescCache(self):
''' Load file desc cache from disk if it exists.
@return: Dictionary {fileAbsPath: fileDesc,...}
Empty if cache absent or corrupted.
'''
path = self._cachePath
cache = {}
if not os.path.exists(path):
vLog('No file desc cache available at %s' % path)
else:
try:
f = open(path, 'rb')
version = cPickle.load(f)
cache = cPickle.load(f)
vLog('file desc cache loaded from %s' % path)
except Exception, e:
warning("Can't load file desc cache at %s: %s "
"=> don't use cache" % (path, e))
else:
if version != self.CACHE_VERSION:
warning("File desc cache found at %s but obsolete "
" => don't use cache" % path)
cache = {}
return cache
#------------------------------------------------------------------------------
class FtpTree(object):
#------------------------------------------------------------------------------
''' FTP file tree handling.
Maintains an internal representation of the file structure of a remote
site in order to facilitate file access.
File "descriptions" (state) as returned by the FTP server are cached
associated with a revision stamp (see class _FileDescCache).
[Used for reverse sync, see class Local2FtpDirComparator]
'''
RE_FILE_DETAIL = re.compile(r'(?P<c>[\-dlcbps])[lrwxst\-]{9}\s+\d+\s+.+?'
r'(?P<size>\d+)\s+'
r'(?P<date>\w+\s+\d+\s+[\d:]+)\s+'
r'(?P<fileName>.*)$'
)
RE_SYMBLINK = re.compile(r'(?P<linkName>.*)\s->\s(?P<targetFile>.*)')
def __init__(self, ftp, revStampMgr, rootDir='/', loadFileDescCache=True):
''' Builds the tree from the file tree at root on the given FTP site.
'''
self._ftp = ftp # [Ftp] The Ftp site manager.
self._revStampMgr = revStampMgr # Head revision stamp manager
self._root = self._normalizePath(rootDir)
self._dirs = {} # { dirAbsPath: list of filenames (. and ..
# excluded),...}
self._files = {} # { fileAbsPath: (unchanged, localPath if
# downloaded|None),..}. Unchanged True if file
# desc unchanged since last comparison.
# Temp space for downloaded files (different each time):
import tempfile
prefix = 'FtpTree_'
try:
self._tmpRoot = tempfile.mkdtemp(prefix=prefix)
except AttributeError: # mkdtemp defined in Python 2.3
self._tmpRoot = os.path.join(tempfile.gettempdir(), prefix)
# Use a cache, but load existing one only if permitted:
self._fileDescs = _FileDescCache(self._getFileDescCachePath(),
loadFileDescCache)
self._threadCnt = SafeCounter(1) ###
self._maxThreadCnt = SafeCounter(1) ###
t0 = time.time()
log('Reading file structure of site %s@%s (it may take several '
'minutes) ...' % (ftp.getUser(), ftp.getHost()))
self._buildTree(self._root)
vLog('==> File structure read in %.1f sec' % (time.time()-t0))
vLog(' Concurrent Ftp connections used: %d' %len(self._ftp._agents))
assert self._threadCnt == 1 ###
def __del__(self):
self.clean()
def __repr__(self):
return '<FtpTree @ %s: %s>' % (self._ftp.getHost(), self._root)
def clean(self):
''' Deletes the temp space used.
'''
if self._tmpRoot:
try:
shutil.rmtree(self._tmpRoot, True)
self._tmpRoot = None
except:
pass
def listDir(self, dir):
''' Gets the content of the given directory.
@param dir: absolute path of the directory to list.
@return: a list of filenames (Filenames only, not full path).
@exception KeyError: if not found
'''
return self._dirs[self._normalizePath(dir)]
def isDir(self, path):
''' Returns True if path exists and is a directory.
'''
return self._dirs.has_key(self._normalizePath(path))
def isFileUnchanged(self, path):
''' Returns True if the given file has not changed since last sync.
This function relies on the compararison (see _buildTree) of
the file state saved in the cache (if any) and the current state.
When isFileUnchanged() returns True, the file is unchanged.
However, when the fct returns False, it only means that it
cannot be guaranteed that the file is unchanged, a full data
comparison is still needed to determine if the file has changed.
@param path: Absolute path of the file (NOT dir!) on the FTP site.
@exception KeyError: if file not found
'''
assert not self.isDir(path), 'works only for files'
return self._files[self._normalizePath(path)][0]
def getFile(self, path, async=False, callback=None, cbArgs=()):
''' Downloads a file and returns its local path.
@param path: Absolute path of the file on the FTP site.
@param async: If True, the download, if needed, will be performed
asynchronously: the function will immediately return
(with a valid local path), the download will be performed
in the background and if <callBack> is not None, a call
to callback(*cbArgs) will be made on successful
completion of the download. If False, the download is
synchronous and NO callback will be done.
@param callback: callback function in asynchronous mode
@param cbArgs: callback arguments. The local path will be appended.
@return: local path of file.
@exception KeyError: if file not found
'''
path = self._normalizePath(path)
unchanged, localPath = self._files[path]
if not localPath: # not yet loaded
localPath = os.path.join(self._tmpRoot, *path.split('/'))
localDir = os.path.dirname(localPath)
if not os.path.isdir(localDir):
os.makedirs(localDir)
if async:
self._ftp.addOperation(GET, localPath, path, callback,
cbArgs+(localPath,))
else:
self._ftp.execOperation(GET, localPath, path)
self._files[path] = (unchanged, localPath)
return localPath
def adjustFileDescCache(self, changes, p4ClientPath):
''' Adjusts the file description cache to take changes into account.
'''
newStamp = self._revStampMgr.getHeadRevStampForCurrentChanges()
assert newStamp != self._revStampMgr.getNullHeadRevStamp()
self._fileDescs.adjustForChanges(changes, newStamp, p4ClientPath)
def saveFileDescCache(self):
''' Saves file description cache on disk.
Should be done only after a synchronization has been successfully
completed and cache adjusted to take the (possibly future) submit
into account.
'''
self._fileDescs.saveFileDescCache()
# private:
def _buildTree(self, root):
''' Builds the tree of files at given root.
'''
self._normalizePath(root)
vLog('\nListing directory %s' % root) ##
fileNames = []
dirPaths = []
ftpRoot = self._root.rstrip('/')
assert root.startswith(ftpRoot)
revStampMgr = self._revStampMgr
nullHeadRevStamp = revStampMgr.getNullHeadRevStamp()
# Handle simple files, and collect subdir paths :
for fileDesc in self._ftp.execOperation(LISTDIR, root):
#print 'fileDesc = "%s"' % fileDesc ##
m = self.RE_FILE_DETAIL.match(fileDesc)
if m is None:
raise Exception('FileDesc "%s" does not match RE_FILE_DETAIL '
'in LISTDIR(%s)' % (fileDesc, root))
if len(m.groups()) != 4:
raise Exception('FileDesc "%s" matches RE_FILE_DETAIL '
'w/ wrong nb of groups (%d instead of 4), in '
'LISTDIR(%s)' % (fileDesc, len(m.groups()),
root))
c, size, date, fileName = m.groups()
#print m.groups() ###
if fileName in ('.', '..'):
continue
# First handle symbolic links: keep link name as filename
# (will translate locally to a filename = link name)
if c == 'l': # Symbolic link:
m2 = self.RE_SYMBLINK.match(fileName)
if m2 is None:
warning("_buildTree: wrong format for filename in "
"symbolic link desc '%s' in dir %s, expected "
"'linkname -> targetFile'" % (fileDesc, root))
continue
linkName, targetFile = m2.groups()
fileName = linkName
if targetFile.endswith('/'):
c = 'd'
else: c = '-'
filePath = '%s/%s' % (root.rstrip('/'), fileName)
#print 'filePath = "%s"' % filePath ##
if c == 'd':
# directory: collect for later
dirPaths.append(filePath)
elif c == '-': # file, try using cache to determine if file changed
key = filePath[len(ftpRoot):]
unchanged, headRevStamp = False, nullHeadRevStamp
# Get Head revision changelist # for the file:
try:
headRevStamp = revStampMgr.getHeadRevStampForFile(key)
except KeyError:
#print 'Rev Stamp not found for file %s' % key ####
pass
#print 'filePath=%s -> key=%s' % (filePath, key) ####
# Get file description in cache:
try:
cachedDesc, cachedHeadRevStamp = self._fileDescs[key]
except KeyError:
#print 'key not found in _fileDescs' ####
pass
else:
#print 'key found in _fileDescs: (%s, %d)' % (cachedDesc, cachedHeadRevStamp) ####
unchanged = (cachedHeadRevStamp == headRevStamp and
fileDesc == cachedDesc)
#print 'unchanged=%d' % unchanged ####
self._fileDescs[key] = (fileDesc, headRevStamp)
self._files[filePath] = (unchanged, None)
else: # named pipe ('p'), sockets ('s'), etc..
warning("_buildTree: file type '%s' not supported, in "
"file desc '%s' from dir. %s => skipped" %
(c, fileDesc, root))
continue
fileNames.append(fileName)
self._dirs[root] = fileNames
# Handle subdirectories.
# For optimization, the handling of half the subdirs is delegated to
# a "child" thread (sort of "fork') :
import math
l1 = int(math.ceil(float(len(dirPaths)) / 2))
dirPaths1, dirPaths2 = dirPaths[:l1], dirPaths[l1:]
childThread = None
if dirPaths2:
# Delegates build of subtree dirPaths2 to another thread:
import threading
childThread = threading.Thread(target=self._buildTreeThread,
args=(dirPaths2,))
childThread.start()
#print '##Created thread to handle subdir %s' % dirPaths2 ##
# handle recursively (and sequentially) the first half of subdirs:
for filePath in dirPaths1:
self._buildTree(filePath)
# Wait for end of child thread if any:
if childThread:
childThread.join()
#print '## --> child thread ended.'
def _buildTreeThread(self, dirs):
''' Builds the tree of files for the given list of directories.
'''
self._threadCnt += 1
if self._threadCnt > self._maxThreadCnt:
self._maxThreadCnt.setValue(self._threadCnt.getValue())
#print '_buildTreeThread started for dirs = %s' % dirs
#print 'Currently %d threads running (max %d)' % (self._threadCnt, self._maxThreadCnt)
try:
for dir in dirs:
self._buildTree(dir)
#print '## _buildTreeThread ended for dirs = %s' % dirs
finally:
self._threadCnt -= 1
def _normalizePath(self, path, trailingSlash=False):
''' Normalizes the given path.
Replace all '\' with '/'.
If '/' -> unchanged
Otherwise: removes any leading '/';
removes any trailing '/' if trailingSlash==False.
@return: modified path.
'''
path = path.replace('\\', '/')
if path != '/':
assert path != ''
if not path.startswith('/'):
path = '/' + path
if not trailingSlash:
path = path.rstrip('/')
return path
def _getFileDescCachePath(self):
return os.path.join(_getThisScriptDir(), '%s@%s.fdc' %
(self._ftp.getUser(), self._ftp.getHost()))
# End class FtpTree
#------------------------------------------------------------------------------
class Local2FtpDirComparator(object):
#------------------------------------------------------------------------------
BUFSIZE=1024 # for file comparisons
''' Compares recursively a local file directory with a remote FTP one.
'''
def __init__(self, ftp, revStampMgr, ftpRoot='/', forceRead=False):
''' Constructor.
@param ftp: [Ftp] A FTP site manager.
@param revStampMgr: [_RevisionStampMgr]
@param ftpRoot: [str] Path of dir considered as root on FTP site
@param forceRead: [bool] If True ensures that remote files to
compare will be read (don't use cache).
'''
self._ftp = ftp
self._ftpRoot = ftpRoot
self._revStampMgr = revStampMgr
self._ftpTree = FtpTree(ftp, revStampMgr, ftpRoot, not forceRead)
def cmpDirs(self, d1, d2='/', excluder=None):
''' Compares the local dir d1 with the remote FTP dir d2.
@param d1: Absolute path of local directory.
@param d2: Path of remote directory, RELATIVE to ftp root.
@param excluder: [_Excluder] To exclude files from the comparison.
@return: a list of changes to do to update d1 to d2.
'''
if not excluder:
excluder = _nullExcluder
changes = SyncedList()
self._cmpDirs(changes, d1, d2, excluder)
self._ftp.waitForOpsDone()
return changes
def adjustCache(self, changes, p4ClientPath):
''' Adjusts the FtpTree cache to take changes into account.
'''
self._ftpTree.adjustFileDescCache(changes, p4ClientPath)
def saveCache(self):
''' Save the FtpTree cache.
'''
self._ftpTree.saveFileDescCache()
def getFtpTree(self):
return self._ftpTree
# Private:
def _cmpDirs(self, changes, d1, d2, excluder):
''' Recursive implementation of cmpDirs().
The list of changes is appended to <changes>.
'''
d1 = d1.rstrip('/')
d2 = d2.rstrip('/')
print 'in %s:' % d1 ###
##changes = []
s1 = Set(os.listdir(d1))
s2 = Set(self._ftpTree.listDir(d2))
# Deleted & added files :
deleted = s1.difference(s2)
for f in deleted:
f1 = os.path.join(d1, f)
if os.path.isdir(f1):
self._dirDeleted(changes, f1)
else:
changes.append(('DELETED', f1, None))
added = s2.difference(s1)
#print 'added=%s' % added ###
for f in added:
f1 = os.path.join(d1, f)
f2 = '%s/%s' % (d2, f) # '/' always used for FTP
if excluder.shouldBeExcluded(f2):
continue
if self._ftpTree.isDir(f2):
self._dirAdded(changes, f1, f2, excluder)
else:
# Download f2 asynchronously, but get target local path NOW:
f2Local = self._ftpTree.getFile(f2, async=True)
changes.append(('ADDED', f1, f2Local))
# Common files/dirs require further investigation to determine changes:
common = s1.intersection(s2)
for f in common:
f1 = os.path.join(d1, f)
f2 = '%s/%s' % (d2, f) # '/' always used for FTP
if excluder.shouldBeExcluded(f2):
continue
if os.path.isdir(f1):
assert self._ftpTree.isDir(f2) # otherwise in trouble !!!
self._cmpDirs(changes, f1, f2, excluder)
else: # files, must compare them
if self._ftpTree.isFileUnchanged(f2):
vLog('== File %s UNCHANGED (from cache)' % f2)
continue
try:
# Download f2 asynchronously, then compares it to f1
# in _onFileLoadedDoCmp:
f2Local = self._ftpTree.getFile(f2, async=True,
callback=self._onFileLoadedDoCmp,
cbArgs=(changes, f1))
except IOError, e:
warning("Can't cmp %s: %s => skip it!" % (f, e))
def _dirAdded(self, changes, d1, d2, excluder):
''' Append to <changes> a list of 'ADDED' changes in local dir d1,
one for every file in remote FTP dir d2 and its subdirs.
'''
for f in self._ftpTree.listDir(d2):
f1 = os.path.join(d1, f)
f2 = '%s/%s' % (d2, f) # '/' always used for FTP
if excluder.shouldBeExcluded(f2):
continue
if self._ftpTree.isDir(f2):
self._dirAdded(changes, f1, f2, excluder)
else:
# Download f2 asynchronously, but get target local path now:
f2Local = self._ftpTree.getFile(f2, async=True)
changes.append(('ADDED', f1, f2Local))
def _dirDeleted(self, changes, d1):
''' Append to <changes> a list of 'DELETED' changes, one for
every file in local directory d1 and its subdirs.
'''
for f in os.listdir(d1):
f1 = os.path.join(d1, f)
if os.path.isdir(f1):
self._dirDeleted(changes, f1)
else:
changes.append(('DELETED', f1, None))
def _onFileLoadedDoCmp(self, changes, f1, f2):
''' Called on completion of the download of f2.
Compares f1 to f2: if different, add change to <changes>
'''
print '_onFileLoadedDoCmp(%s, %s)' % (f1, f2) ###
if not self._do_cmp(f1, f2):
#print 'CHANGED %s %s' % (f1, f2) ###
changes.append(('CHANGED', f1, f2))
def _do_cmp(self, f1, f2):
''' Returns True if the content of the 2 files is the same.
'''
if TextBinaryClassifier().isFileText(f1):
# Normalize EOL in content of files to compare:
b1 = re.sub('\r*\n|\r|\n', '\n', open(f1, 'rb').read())
b2 = file(f2).read() # Assume file already normalized by _GET
if b1 == b2:
return True
b2EndsWithEol = b2.endswith('\n')
if b1.endswith('\n'):
if not b2EndsWithEol:
return b1 == b2 + '\n'
else:
if b2EndsWithEol:
return b1 + '\n' == b2
return False
else: # binary content
fp1 = open(f1, 'rb')
fp2 = open(f2, 'rb')
while True:
b1 = fp1.read(self.BUFSIZE)
b2 = fp2.read(self.BUFSIZE)
if b1 != b2:
return False
if not b1:
return True
# End class Local2FtpDirComparator
#-------------------------------------------------------------------------
class TextBinaryClassifier(object):
#-------------------------------------------------------------------------
''' Utility class for classifying strings or files as Text or Binary.
'''
# Extensions of files always considered as text (case N.S.) :
_textExts = ('afm,as,asp,bat,c,cfg,cfm,cgi,conf,cpp,css,csv,cxx,dhtml,'
'diz,dwt,h,htm,html,hxx,idl,inc,ini,java,js,log,mak,me,'
'nfo,php,php3,php4,pl,py,pyw,sh,shtml,sql,txt,xht,'
'xhtml,xml,xsl,xslt')
TEXT_FILE_EXTS = SyncedDict()
for ext in _textExts.split(','):
TEXT_FILE_EXTS['.'+ext.lower()] = None
# Extensions of files always considered as binary (case N.S.) :
_binaryExts = ('aif,avi,bmp,class,cur,dll,exe,fla,gif,ico,iff,img,jpe,'
'jpeg,jpg,mid,mov,mp3,mpe,mpeg,mpg,o,obj,ogg,pcd,'
'pcx,pdf,pic,pict,png,ppt,psd,pyc,pyd,pyo,ra,rtf,'
'swf,tga,tiff,wav,wma,wmf,wmw,word,xls')
BINARY_FILE_EXTS = SyncedDict()
for ext in _binaryExts.split(','):
BINARY_FILE_EXTS['.'+ext.lower()] = None
# Determination of type from content (see isStringText()):
textCharacters = "".join(map(chr, range(32, 127)) + list("\n\r\t\b"))
_nullTrans = string.maketrans("", "")
def isFileText(self, file, checkContentIfNeeded=True, blockSize=1024):
''' Tries to determine whether file is text or binary.
@return: True if text, False if binary or impossible
to determine (content not checked).
'''
# Check if extension listed as text or binary:
base, ext = os.path.splitext(file)
ext = ext.lower()
if self.TEXT_FILE_EXTS.has_key(ext):
return True
if self.BINARY_FILE_EXTS.has_key(ext):
return False
# Try to guess the type from the content :
# In doubt "binary" is preferred so once a file with an extension has
# been found as binary, we always return "binary" for files with this
# extension.
isText = checkContentIfNeeded and self.isStringText(
open(file).read(blockSize))
if not isText and ext:
self.BINARY_FILE_EXTS[ext] = None
return isText
def isStringText(self, s):
''' Returns True if s contains Text or False if it contains binary.
Recipe Borrowed from
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/173220
'''
if "\0" in s:
return False
if not s: # Empty files are considered text
return True
# Get the non-text characters (maps a character to itself then
# use the 'remove' option to get rid of the text characters.)
t = s.translate(self._nullTrans, self.textCharacters)
# If more than 30% non-text characters, then
# this is considered a binary file
if len(t)/len(s) > 0.30:
return False
return True
# End class TextBinaryClassifier
#-------------------------------------------------------------------------
def _createFTPSyncScript(toDo, syncSpec, excluder, **options):
#-------------------------------------------------------------------------
''' Creates the Python script for performing the FTP update.
The file is created in the directory given by the option
'scriptDir'.
@param toDo: The list of update actions.
@param excluder: To exclude some files from the built script.
@return: The path of the script created.
'''
template = '''\
#!/usr/bin/env python
# -*- coding: latin1 -*-
#-------------------------------------------------------------------------
# FTP sync for P4 sync of %(syncSpec)s
#
# usage: [python] "%(scriptName)s"
# Updates the remote site %(ftpHost)s via FTP so it is synchronized
# according to the Perforce filespec: %(syncSpec)s.
#
# ** IMPORTANT **
# - Be sure that p4ftpsync.py is in the Python PATH, since it is imported
# by the current script.
#
# You may reexecute this script several times if it happens to fails
# before completion.
#
# - Generated by p4ftpsync %(version)s on %(ascNow)s.
#-------------------------------------------------------------------------
date = "%(ascNow)s"
syncSpec = "%(syncSpec)s"
p4Port = "%(p4Port)s"
p4User = "%(p4User)s"
p4Client = "%(p4Client)s"
p4Passwd = %(p4pass)s
ftpHost = "%(ftpHost)s"
ftpUser = "%(ftpUser)s"
ftpPasswd = %(ftppass)s
ftpRoot = "%(ftpRoot)s"
verbose = False
excludes = %(reprExcludes)s # FYI: exclusions rules applied in toDo list
# below (excluded ops will be commented out)
toDo = (
%(ops)s )
if __name__ == "__main__":
import sys
from p4ftpsync import execFTPToDo, _p4Sync, log
try:
args = sys.argv[1:]
verbose = len(args) > 0 and args[0] in ('-v', '--verbose')
if toDo:
# Ensures that p4 (local) client is synced before FTPing :
log('Syncing %(syncSpec)s locally (real)')
opts = {'p4Port':p4Port, 'p4User':p4User, 'p4Client':p4Client,
'p4Passwd':p4Passwd, 'force':False}
_p4Sync(syncSpec, False, **opts)
# Executes FTP cmds :
execFTPToDo(toDo, ftpHost, ftpUser, ftpPasswd, ftpRoot,
excludes=[], verbose=verbose)
else:
log('=> Nothing to do, all files concerned were excluded.')
except SystemExit:
pass
except Exception, e:
log('\\n** Error** %%s: %%s' %% (e.__class__.__name__, e))
if verbose: raise
'''
now = time.localtime()
ascNow = time.asctime(now)
version = __version__
if options['p4Passwd'] is None: p4pass = 'None'
else: p4pass = '"%s"' % options['p4Passwd']
if options['ftpPasswd'] is None: ftppass = 'None'
else: ftppass = '"%s"' % options['ftpPasswd']
reprExcludes = repr(excluder.excludes)
ops = ''
for op in toDo:
# Check if op should be excluded :
target = op[2]
if excluder.shouldBeExcluded(target, False):
msg = '-- File %s excluded [matches %s]'% excluder.getLastExclude()
vLog(msg)
ops += '\t\t## %s:\n##\t\t%s\n' % (msg, repr(op))
else:
ops += '\t\t%s,\n' % repr(op) # Keep op
scriptName = '%s %s.py' % (syncSpec,
time.strftime('%d-%m-%Y %Hh%Mm%Ss', now))
scriptName = re.sub('[/ ]', '_', scriptName)
scriptName = scriptName.replace('*', '(star)')
scriptDir = options['scriptDir']
if not os.path.isdir(scriptDir):
os.makedirs(scriptDir)
scriptPath = os.path.join(scriptDir, scriptName)
j = open(scriptPath, 'w')
try:
vars().update(options)
j.write(template % vars())
finally:
j.close()
log('--> Created FTP script "%s".' % scriptPath)
return scriptPath
#----------------------------------------------------------------------------
def execFTPToDo(toDo, ftpHost, ftpUser, ftpPasswd, ftpRoot,
excludes=[], verbose=False):
#----------------------------------------------------------------------------
''' Executes the FTP actions contained in the given toDo list.
Only normal (ie not reverse) sync actions.
'''
global _verbose
_verbose = verbose
excluder = _Excluder(excludes)
t0 = time.time()
log('\n---- Synchronizing FTP site %s@%s (root dir: %s) ----\n' %
(ftpUser, ftpHost,ftpRoot))
ftp = Ftp(ftpHost, ftpUser, ftpPasswd, ftpRoot)
try:
# Get the list of actions to perform :
ftp.clearStats()
reGetBaseDir = re.compile(r'(/.+?/).*')
dirsToCreate = {}
ops = []
for op in toDo:
action, target = op[0], op[2]
absTarget = '/' + target.lstrip('/') # dest path always rel. to
# root dir
if excluder.shouldBeExcluded(absTarget):
continue
if action in (ADD, UPDATE):
# (theorically only ADD, but let's be tolerant!)
dir = os.path.dirname(absTarget)
if dir != '/':
# dir must be created if not existent. This creation can be
# concurrent if the threads work on *different* subfolders
# of the base folder, e.g. the creation of ['/A/', '/A/C,
# '/A/D'] may be performed in parallel with the creation of
# [/B/', '/B/F/']:
##TODO: adopt (better) strategy used for LISTDIR, see
## FtpTree._buildTree
dir += '/'
baseDir = reGetBaseDir.match(dir).group(1)
try:
dirs = dirsToCreate[baseDir]
except KeyError:
dirs = dirsToCreate[baseDir] = {}
dirs[dir] = None
#else action in (UPDATE, DELETE)
ops.append(op)
failed = []
# Create dirs if required (muti-threaded with the restriction
# that each thread handles a separate branch from the root):
if dirsToCreate:
log('Creating directories (if not already existing)...')
for dirsInSameBranch in dirsToCreate.values():
ftp.addOperation(MKDIRS, dirsInSameBranch.keys())
done, failed = ftp.waitForOpsDone()
if not failed:
log('--> %d directories created.' %
ftp.getStats().dirsCreatedCnt)
# Execute ops (multi-threaded):
if not failed:
for op in ops:
ftp.addOperation(*op)
done, failed = ftp.waitForOpsDone()
vLog('--> Multithreaded operations completed: %d OK, %d '
'error(s).' % (len(done), len(failed)))
#print 'Cached existing dirs = %s' % FtpAgent._existingDirs ###
# Prune dirs if necessary (single-threaded):
if not failed:
ftp.pruneDirs()
# Stats & result :
elapsed = time.time() - t0
stats = ftp.getStats()
uploaded = stats.bytesUploaded
if failed:
s = ('--> Sync completed in %d sec with %d errors (and possibly '
'more). Consult the log (%s)\n' % (elapsed, len(failed),
_logPath))
s += '--> Known failed operations:\n'
for op in failed:
s += ' %s\n' % op
s += '\n'
else:
s = '--> Sync completed in %d sec without errors.\n' % elapsed
if stats.filesUploadedCnt > 0:
s += ('%5d file(s) uploaded (%d bytes, average %d bytes/sec)\n' %
(stats.filesUploadedCnt, stats.bytesUploaded,
stats.bytesUploaded/elapsed))
if stats.filesDownloadedCnt > 0:
s += ('%5d file(s) downloaded (%d bytes, average %d bytes/sec)\n' %
(stats.filesDownloadedCnt, stats.bytesDownloaded,
stats.bytesDownloaded/elapsed))
s += ('%5d file(s) deleted, %d dir(s) created, %d dir(s) pruned.' %
(stats.filesDeletedCnt, stats.dirsCreatedCnt, stats.dirsPrunedCnt))
log(s)
finally:
ftp.stop()
return ftp ### for debug
#-------------------------------------------------------------------------
class Operation(object):
#-------------------------------------------------------------------------
'''One "FTP" operation.
'''
DFT_MAX_ATTEMPTS = 3 # default max nb of exec attempts
opCnt = SafeCounter(0) # for allocation of op IDs
def __init__(self, action, *args, **kwargs):
self.id = Operation.opCnt.incrAndGet(1)
self.action = action # op type
self.args = args # argument list
self.kwargs = kwargs # keyword argument dictionary, including
# optional special keywords '__event__' and
# '__attempts__', see below.
# [threading.Event] Set when op completed (in synchronous mode):
try:
self._event = kwargs['__event__']
del kwargs['__event__']
except KeyError:
self._event = None
# [int] Nb of attempts to execute the operation:
try:
self.maxAttempts = kwargs['__attempts__']
del kwargs['__attempts__']
except KeyError:
self.maxAttempts = self.DFT_MAX_ATTEMPTS
self.startedOn = 0 # [time] When task started running
self.attempt = 0 # [int] exec attempt #
self.agent = None # [FtpAgent] Agent that exec the op if running
self.endedOn = 0 # [time] When task was completed
self.result = None # Result if any and exec OK.
self.error = None # [Exception] error if failed, None if success
def __del__(self):
if self._event:
pass ##TODO: release event in Pool ???
def __repr__(self):
args = ', '.join([str(arg) for arg in self.args[:3]]) # limit size!
if len(self.args) > 3:
args += ',...'
s = ('<Operation #%d [%d/%d]: %s(%s)' % (self.id, self.attempt,
self.maxAttempts, self.action, args))
if self.startedOn:
s += (' started: %s by %s' % (time.strftime('%H:%M:%S',
time.localtime(self.startedOn)), self.agent._name))
if self.endedOn:
s += ' ended: %d' % time.strftime('%H:%M:%S',
time.localtime(self.endedOn))
if self.error:
s += ' error :%s' % self.error
s += '>'
return s
def started(self):
return self.startedOn != 0
def ended(self):
return self.endedOn != 0
def succeeded(self):
" Returns True if op successfully completed."
assert self.ended()
return self.error is None
def failed(self):
" Returns True if op failed."
assert self.ended()
return self.error is not None
def setStarted(self, agent):
self.agent = agent
self.startedOn = time.time()
self.attempt += 1
def setSucceeded(self, result):
assert self.started()
self.endedOn = time.time()
self.result = result
self.error = None
if self._event: # (must be done LAST)
self._event.set()
def setFailed(self, error):
assert self.started()
self.endedOn = time.time()
self.result = None
self.error = error
if self._event: # (must be done LAST)
self._event.set()
def reset(self):
" Resets op for resubmission. Do NOT reset self.attempt."
self.startedOn = self.endedOn = 0
self.agent = None
self.result = self.error = None
if self._event:
self._event.clear()
def releaseEvent(self):
''' Deletes event after it has been taken into account.
* Use with caution! *
'''
if self._event:
self._event = None ##TODO: release to a pool instead ?
# End class Operation
import Queue
#-------------------------------------------------------------------------
class Ftp(object):
#-------------------------------------------------------------------------
'''Handles FTP operations for a FTP site.
'''
class ETooManyConnections(Exception):
''' FTP server max nb of client connections exceeded.'''
pass
class EInvalidOperation(Exception):
''' No handler for Ftp operation.'''
pass
class ETimeout(Exception):
''' A timeout occured.'''
pass
# Max number of FtpAgents (threads). The actual number will be limited by
# the max nb of simultaneous connections allowed by the FTP server;
FTP_AGENT_MAX = 15
assert FTP_AGENT_MAX >= 1
STOP_POLL_INTERVAL = 0.5 # How long to wait (sec) between 2 polls of
# threads to see if they are stopped
PATROL_INTERVAL = 30 # How long to wait (sec) between 2 patrols
STOP_TIME_LIMIT = 30.0 # How long to wait (sec) for all threads stopped
# after a stop request.
QUEUE_POLL_INTERVAL = 1. # How long to wait (sec) between 2 polls of
# a queue to see if is empty
class Stats:
" Statistics for a Ftp object."
def __init__(self, ftpAgents):
self._agents = ftpAgents
self.clear()
def clear(self):
self.filesUploadedCnt = 0
self.bytesUploaded = 0
self.filesDownloadedCnt = 0
self.bytesDownloaded = 0
self.filesDeletedCnt = 0
self.dirsCreatedCnt = 0
self.dirsPrunedCnt = 0
[a.clearStats() for a in self._agents]
def consolidate(self):
self.filesUploadedCnt = self.consolidateAttr('filesUploadedCnt')
self.bytesUploaded = self.consolidateAttr('bytesUploaded')
self.filesDownloadedCnt =self.consolidateAttr('filesDownloadedCnt')
self.bytesDownloaded = self.consolidateAttr('bytesDownloaded')
self.filesDeletedCnt = self.consolidateAttr('filesDeletedCnt')
self.dirsCreatedCnt = self.consolidateAttr('dirsCreatedCnt')
self.dirsPrunedCnt = self.consolidateAttr('dirsPrunedCnt')
return self
def consolidateAttr(self, attrName):
return sum([getattr(a._stats, attrName) for a in self._agents])
def __init__(self, host, user, passwd=None, rootDir='/'):
self._host = host # host[:port], default port 21
self._user = user
if passwd is None:
passwd = getpass.getpass('FTP password for user %s ? ' %
user).strip()
self._passwd = passwd
if not rootDir.endswith('/'):
rootDir += '/' # required for _MKDIRS()
self._root = rootDir
# Use a queue of operations serving a pool of FtpAgents:
self._qpending = Queue.Queue(0) # Pending Operations
self._running = SyncedDict() # Running Operations {opId:op..}
self._done = SyncedDict() # Completed Operations {opId:op...}
self._failed = SyncedDict() # Failed Operations {opId:op...}
self._addOperationLock = threading.RLock()
# Create the first Ftp agent. Other will be created later if needed :
self._agents = SyncedList()
self.__createAgentLock = threading.RLock()
agent = self._createAgent()
assert agent is not None, "Need at least ONE agent!"
self._stats = Ftp.Stats(self._agents)
def __repr__(self):
return '<Ftp %s@%s %d agents, ops=(%d,%d,%d,%d)>' % ((self._user,
self._host, len(self._agents))+self.getOpCnts())
def getHost(self):
return self._host
def getUser(self):
return self._user
def addOperation(self, action, *args, **kwargs):
''' Submits an operation to perform asynchronously.
An Operation instance is created and queued. It will be
executed ASAP by the 1st Ftp agent available.
@param args: List of op arguments.
@param kwargs: Dictionary of keyword args.
@return: [Operation] operation created.
'''
op = Operation(action, *args, **kwargs)
self._addOperationLock.acquire() # synchronize this method
try:
self._qpending.put(op)
# If too many ops are pending, create (if possible) a new ftp
# agent to handle the charge :
agentCnt = len(self._agents)
#print 'agents: %s pending ops: %s' % (agentCnt, self._qpending.qsize()) ####
if (agentCnt < Ftp.FTP_AGENT_MAX) and (
self._qpending.qsize() > agentCnt+2):
self._createAgent()
return op
finally:
self._addOperationLock.release()
def execOperation(self, action, *args, **kwargs):
''' Executes an operation semi-synchronously with an optional timeout.
The operation is queued and executed ASAP, but -unlike
addOperation- the function waits for completion or timeout,
and returns the result if OK, or raises an Exception otherwise.
@param args: List of op arguments.
@param kwargs: Dictionary of keyword args, including:
timeout: [float] timeout in sec [default None]
@return: result or None
@exception Ftp.ETimeout: if timeout
@exception other: exception raised by op exec itself.
'''
try:
timeout = kwargs['timeout']
del kwargs['timeout']
except KeyError:
timeout = None
# Use an event to synchronize :
kwargs['__event__'] = event = threading.Event() ##TODO: use a pool ??
op = self.addOperation(action, *args, **kwargs)
event.wait(timeout)
op.releaseEvent()
if not op.ended():
raise Ftp.ETimeout('Timeout while executing op %s' % op)
if op.failed():
raise op.error
return op.result
def stop(self, clearPendingOps=False):
''' Stops all agents.
Agents are stopped after they complete their current op if any.
Returns the nb of pending ops
'''
if not self._agents:
return
# Request stop ASAP:
vLog('Requesting agents to stop...')
for agent in self._agents:
agent.stop()
# Wait for actual termination
deadline = time.time() + self.STOP_TIME_LIMIT
while [a for a in self._agents if a.isAlive()]:
if time.time() >= deadline:
error('Unable to stop all agents in %.2f seconds => abort.' %
self.STOP_TIME_LIMIT)
break
time.sleep(self.STOP_POLL_INTERVAL)
else:
vLog('All agents (threads) stopped')
if clearPendingOps:
while not self._qpending.empty(): self._qpending.get()
return self._qpending.qsize()
def waitForOpsDone(self):
''' Waits until all operations (pending & running) are completed.
return a tuple (list of ops done, list of failed ops)
'''
vLog('waiting for all current ops to complete...')
t0 = time.time()
# (The order of the following tests is important since an op may move
# from running to pending in case or retry after an error)
while len(self._running) > 0 or not self._qpending.empty():
time.sleep(self.QUEUE_POLL_INTERVAL)
# Periodically checks if agents running the ops are still alive:
if (not [a for a in self._agents if a.isAlive()] or
(time.time() - t0) >= self.PATROL_INTERVAL):
if not self._patrol(checkDuplicateOps=True): # something NOK
break
t0 = time.time()
vLog('--> No more pending or running operations.')
return (self.getOpsDone(), self.getOpsFailed())
def getOpCnts(self):
''' Returns a tuple (nb of pending ops, nb of running ops,
nb of completed ops, nb of failed operations)
'''
return (self._qpending.qsize(), len(self._running), len(self._done),
len(self._failed))
def getOpsDone(self):
''' Returns a list of ops completed OK so far.'''
return self._done.values()
def getOpsFailed(self):
''' Returns a list of ops that failed'''
return self._failed.values()
def clearStats(self):
self._stats.clear()
def getStats(self):
return self._stats.consolidate()
def pruneDirs(self):
''' Deletes empty directories (among visited ones only).
Done in sequence (ie not concurrently) by the 1st agent.
'''
self.waitForOpsDone()
self._agents[0].pruneDirs()
def _createAgent(self):
'''Creates a new Ftp agent.
@return: the agent, or None if max nb of agents reached.
'''
self.__createAgentLock.acquire() # Synchronize this method
try:
agentCnt = len(self._agents)
name = 'T%02d' % (agentCnt+1)
try:
agent = FtpAgent(name, self._qpending, self._running,
self._done, self._failed, self._host,
self._user, self._passwd, self._root)
except Ftp.ETooManyConnections:
Ftp.FTP_AGENT_MAX = agentCnt
warning("Can't create more than %d agents/connections, "
"FTP server limit reached" % agentCnt)
return None
else:
self._agents.append(agent)
agent.start()
return agent
finally:
self.__createAgentLock.release()
def _patrol(self, checkDuplicateOps=False):
''' Checks that everything is going normally (no agents dead, etc...).
@return True if processing can continue, False if fatal error
detected.
'''
#vLog('Start patrol...') ###
agentsFoundDead = 0
ok = True
# Consistency check :
if checkDuplicateOps:
# Check that no running/done/failed op is still pending:
self._qpending.mutex.acquire()
try: pending = copy.copy(self._qpending.queue) # pending list image
finally: self._qpending.mutex.release()
for op in self._running.values():
if op in pending:
fatal('Running op "%s" ALSO in found pending queue' % op)
for op in self._done.values():
if op in pending:
fatal('Done op "%s" ALSO in found pending queue' % op)
for op in self._failed.values():
if op in pending:
fatal('Failed op "%s" ALSO in found pending queue' % op)
# Check that at least one thread is alive :
for agent in self._agents[:]:
if not agent.isAlive():
error('patrol: agent %s found dead' % agent._name)
agentsFoundDead += 1
del self._agents[self._agents.index(agent)]
if not self._agents:
errMsg = 'patrol: All agents dead => give up'
error(errMsg)
e = Exception(errMsg)
# Move pending & running ops to failed:
failedOps = []
while not self._qpending.empty():
failedOps.append(self._qpending.get())
failedOps.extend(self._running.values())
for op in failedOps:
op.endedOn = time.time()
op.error = e
self._failed[op.id] = op
self._running = {}
ok = False
else:
# Check that all agents currently running ops are in the list of
# live agents:
self._running._lock.acquire()
try:
for op in self._running.values()[:]:
agent = op.agent
if agent not in self._agents:
error('patrol: Agent %s supposed to run op #%d but is '
'not alive' % (agent._name, op.id))
# Move running op to pending state:
del self._running[op.id]
if op.attempt < op.maxAttempts:
# Reenter op in queue for next attempt:
op.startedOn = 0
op.agent = None
self._qpending.put(op)
log(' ==> Retry (%d/%d)' % (op.attempt+1,
op.maxAttempts))
else:
errMsg = ("Can't execute op after %d attempts "
"=> abandoned" % op.maxAttempts)
op.endedOn = time.time()
op.error = Exception(errMsg)
self._failed[op.id] = op
error(errMsg)
finally:
self._running._lock.release()
msg = 'Patrol complete: '
msg += '%d agent(s) alive,' % len(self._agents)
if agentsFoundDead: msg += ' %d agent(s) found dead' % agentsFoundDead
else: msg += ' NO agents found dead'
msg += ', %d pending ops, %d running ops' % (self._qpending.qsize(),
len(self._running))
if len(self._running) <= 3:
msg += ': %s' % self._running.values()
log(msg)
return ok
#end class Ftp
#-------------------------------------------------------------------------
class FtpAgent(threading.Thread):
#-------------------------------------------------------------------------
''' One thread/agent in charge of executing FTP operations.
The operations are taken from a queue.
Manages its own FTP connection.
'''
SLEEP_PERIOD = 0.5 # How long (sec) to wait when the queue is empty
# before rescanning it
CX_MAX_TRIES = 3 # Max number of successive connection attempts.
CX_WAIT_INTERVAL = 10.0 # Interval (sec) between 2 connection attempts
# (will be multiplied by attempt number).
_existingDirs = SyncedDict() # { dirPath: None, ...}
_dirsToCheckEmpty = SyncedDict() # { dirPath: None, ...}
_fileClassifier = TextBinaryClassifier()
class Stats:
" Statistics for a FtpAgent."
def __init__(self):
self.clear()
def clear(self):
self.filesUploadedCnt = 0
self.bytesUploaded = 0
self.filesDownloadedCnt = 0
self.bytesDownloaded = 0
self.filesDeletedCnt = 0
self.dirsCreatedCnt = 0
self.dirsPrunedCnt = 0
def __init__(self, name, qpending, running, done, failed, host, user,
passwd, rootDir='/'):
self._name = name
self._qpending = qpending # Pending Operations queue
self._running = running # Running Operations dict
self._done = done # Completed Operations dict
self._failed = failed # Failed operations dict.
segs = host.split(':') # host[:port], default port 21
if len(segs) == 2:
host, port = segs
else:
port = 21
self._host = host
self._port = port
self._user = user
self._passwd = passwd
self._rootDir = rootDir
self._ftp = None # underlying ftplib.FTP object
self._connected = False
self._stats = FtpAgent.Stats()
self._stopEvent = threading.Event()
self._sleepPeriod = FtpAgent.SLEEP_PERIOD
# Connect now (synchronously) rather than asynchronously in run()
# to stop creating agents if the FTP server max nb of cx is reached:
self._connect()
threading.Thread.__init__(self, name=name)
def __del__(self):
try: self._disconnect()
except: pass
def __repr__(self):
s = '<FtpAgent %s' % self._name
if self.isConnected():
s += ' (connected)'
s += '>'
return s
# start(self) inherited from Thread
def stop(self):
'''Requests the thread to stop.
The thread will stop as after it has completed the current op,
if any. Doesn't wait for the thread's actual termination.
'''
self._stopEvent.set()
def run(self):
'''Main control loop
'''
vLog("%s: started." % self._name) ##
try:
while not self._stopEvent.isSet():
try:
op = self._qpending.get(block=False)
except Queue.Empty:
self._stopEvent.wait(self._sleepPeriod)
else:
# Execute one operation:
vLog("%s: got %s" % (self._name, op)) ##
op.setStarted(self) ##
self._running[op.id] = op
try:
action = op.action
# A method named _<action> must exist:
r = eval('self._%s' % action)(*op.args,
**op.kwargs)
except AttributeError:
errMsg = '%s: Invalid operation: %s' % (
self._name, action)
error(errMsg)
del self._running[op.id]
self._failed[op.id] = op
op.setFailed(Ftp.EInvalidOperation(errMsg))
continue
except Exception, e:
error('%s: in FTP op "%s": %s: %s' % (self._name,
op, e.__class__.__name__, e))
del self._running[op.id]
if op.attempt >= op.maxAttempts:
self._failed[op.id] = op
op.setFailed(e)
error("%s: op still fails after %d attempt(s) "
"=> abandoned" % (self._name, op.maxAttempts))
else:
if (isinstance(e, ftplib.error_reply) or
isinstance(e, ftplib.error_proto)):
error('%s: Unexpected reply received from '
'Ftp server %s => RE-Connect...' %
(self._name, self._host))
self._disconnect()
self._connect(resetStats=False)
elif (isinstance(e, socket.error) or
isinstance(e, EOFError)):
error('%s: Connection (probably) lost => '
'auto-reconnect...' % self._name)
self._disconnect() # just in case...
self._connect(resetStats=False)
# Reenter op in queue for next attempt:
op.reset()
self._qpending.put(op)
log(' ==> Retry (%d/%d)' % (op.attempt+1,
op.maxAttempts))
time.sleep(0.01) # yield CPU
else:
#[op successful]
#vLog("%s: operation completed: %s." % (self._name, op))
try:
del self._running[op.id]
except KeyError, e:
##should NOT happen, indicates race condition ?
warning("%s: Can't delete op #%d from running "
"queue: %s" % (self._name, op.id, e))
self._done[op.id] = op
op.setSucceeded(r)
finally:
self._disconnect()
vLog("%s: stopped." % self._name) ##
def isConnected(self, realCheck=False):
if realCheck:
self._checkConnected()
return self._connected
def clearStats(self):
self._stats.clear()
def getStats(self):
return self._stats
def pruneDirs(self):
''' Deletes empty directories (only checks those where a file
or dir has been deleted).
*** Must be executed in sequence (single thread) ***
'''
dirs = FtpAgent._dirsToCheckEmpty.keys()
if not dirs:
return
vLog('%s: Directory pruning started' % self._name, ) ###
dirs.sort(); dirs.reverse() # sort descending -> '/A/B' precedes '/A'
for dir in dirs:
while dir:
try:
files = self._ftp.nlst(dir)
except (ftplib.error_temp, ftplib.error_perm), e:
errMsg = e.args[0]
errCode = int(errMsg.split()[0])
# Dir doesn't exist, different FTP servers responses :
# GoDaddy: 550 No such file or directory
# HostPC: ftplib.error_temp 450 XXX:No such file or directory
# online: 550 XXX: No such file or directory
# GuildFTPd: No error => return []
if errCode not in (450, 550):
raise
if re.search('No such file(?i)', errMsg):
removeDir = False # (but will check parent dir)
else:
# Directory empty, different FTP servers responses:
# GoDaddy: No error => return []
# HostPC: ftplib.error_temp: 450 No files found
# online: ftplib.error_perm: 550 No files found
# GuildFTPd: No error => return []
if not re.search('No files found(?i)', errMsg):
raise
removeDir = True
else:
if files: # dir not empty
break
removeDir = True
if removeDir:
self._ftp.rmd(dir)
self._stats.dirsPrunedCnt += 1
log('Directory "%s" now empty -> deleted.' % dir)
# Check the parent dir if any, might be empty now:
if dir.endswith('/'): dir = dir[:-1]
dir = os.path.dirname(dir)
vLog('%s: --> Directory pruning completed' % self._name) ###
# Implementation of "Ftp" operations :
def _ADD(self, src, dest, permissions=None):
''' Uploads file <src> to location <dest>.
@param permissions: perms to assign (chmod) to the file if not None
@Return: size of src file.
'''
if dest.startswith('/'): dest = dest[1:]
#vLog('%s: _ADD(%s, %s)' % (self._name, src, dest))
# Determine source file type {text, binary} and open it accordingly:
isText = self._isText(src, checkContentIfNeeded=True)
mode = 'r'
if not isText: mode += 'b'
fSrc = open(src, mode)
try:
# (Here we assume that the directories, if any, have already been
# created via _MKDIRS, since it'd be hazardeous to create
# these folders on the fly in a multi-threaded environment)
# Create or override the target file :
if isText:
cmd = self._ftp.storlines
type = 'text'
else:
cmd = self._ftp.storbinary
type = 'binary'
cmd('STOR %s' % dest, fSrc)
srcSize = self._getLocalFileSize(src)
self._stats.bytesUploaded += srcSize
self._stats.filesUploadedCnt += 1
log('%s: -> %s uploaded (%s) - %d bytes.' % (self._name, dest,
type, srcSize))
finally:
fSrc.close()
# Try to change target file permissions if requested :
if permissions is not None:
r = self._ftp.sendcmd('SITE CHMOD %s %s' % (oct(permissions),dest))
if r.startswith('200'):
log('%s: -> CHMOD %s %s' % (self._name, oct(permissions),dest))
else:
warning('Could not CHMOD %s to %s: %s' %
(dest, oct(permissions, r)))
return srcSize
def _UPDATE(self, src, dest):
return self._ADD(src, dest)
def _DELETE(self, src, file):
''' Deletes the given remote <file> if exists. <src> is N.S. here.
'''
if file.startswith('/'): file = file[1:]
#vLog('%s: _DELETE(%s)' % (self._name, file))
try:
self._ftp.delete(file)
except ftplib.error_perm, e:
# File doesn't exist, different FTP servers responses :
# GoDaddy: ftplib.error_perm: 550 No such file.
# HostPC.com: ftplib.error_perm: 550 XXX: No such file or directory
# Online.net: ftplib.error_perm: 550 DELE: Couldn't get file status
# for XXX
# GuildFTPd: NO error !!
errMsg = e.args[0]
errCode = int(errMsg.split()[0])
if errCode != 550:
raise
vLog("%s: -> %s was already deleted" % (self._name, file))
else:
dir = os.path.dirname(file)
if dir:
FtpAgent._dirsToCheckEmpty[dir] = None
self._stats.filesDeletedCnt += 1
log('%s: -> %s deleted.' % (self._name, file))
def _MKDIRS(self, dirsToCreate):
'''Creates (remote) dirs if they don't already exist.
@param dirsToCreate: a list of path; must be ones of a FILE or
end with a /, and must be relative to self._rootDir (but may
start with a /). All subdirs will be created.
'''
#vLog('%s: _MKDIRS(%s)' % (self._name, dirsToCreate)
for path in dirsToCreate:
dirNames = path.split('/')
if dirNames and dirNames[0]=='': del dirNames[0] # strip leading /
dirPath = ''
for dirName in dirNames[:-1]:
dirPath += dirName + '/'
#vInfo(dirPath) ##
if FtpAgent._existingDirs.has_key(dirPath): # in cache
#vLog('-> %s already existed' % dirPath)
continue
# Create dir if necessary:
try:
self._ftp.mkd(dirPath)
except ftplib.error_perm, e:
errMsg = e.args[0]
errCode = int(errMsg.split()[0])
# Directory already exists, different FTP servers responses:
# GoDaddy: 521 Directory already exists
# HostPC: 550 File exists
# online: 550 XXX: File exists
# GuildFTPd: ftplib.error_perm: 550 XXX: Permission denied.
if errCode not in (521, 550):
raise
#vLog("-> %s already existed" % dirPath)
else:
self._stats.dirsCreatedCnt += 1
log("%s: -> %s created." % (self._name, dirPath))
FtpAgent._existingDirs[dirPath] = None
def _GET(self, localPath, path, callback=None, cbArgs=()):
''' Downloads the remote file <path> to the given temp local location.
Preserves the relative position of the file to the root.
If callback is not None, calls it on successful completion:
callback(*cbArgs)
@return: the size of the local file created.
'''
#vLog('%s: _GET(%s, %s)' % (self._name, localPath, path))
if self._isText(path, checkContentIfNeeded=False):
type = 'text'
# If we use retrlines, there is no way to determine if the LAST
# line ended with an EOL, so we use a BINARY transfer and handle
# the EOL ourselves; also normalize eol :
from cStringIO import StringIO
s = StringIO()
self._ftp.retrbinary('RETR %s' % path, s.write)
content = re.sub('\r*\n|\r|\n', '\n', s.getvalue())
f = file(localPath, 'w')
f.write(content)
else:
type = 'binary'
f = file(localPath, 'wb')
self._ftp.retrbinary('RETR %s' % path, f.write)
f.close()
size = self._getLocalFileSize(localPath) ## For future stats
self._stats.bytesDownloaded += size
self._stats.filesDownloadedCnt += 1
log('%s: -> %s downloaded to %s (%s) - %d bytes.' % (self._name,
path, localPath, type, size))
if callback:
callback(*cbArgs)
return size
def _LISTDIR(self, dirPath):
''' Lists the content of the given remote directory.
@param dirPath: Path of remote directory to list, relative to root.
@return: dir content as a list of file descs, in "ls -l" style
(exact format depends on the server)
'''
#vLog('%s: _LISTDIR(%s)' % (self._name, dirPath))
fileDescs = []
self._ftp.retrlines('LIST % s' % dirPath, fileDescs.append)
return fileDescs
# For test only :
def _MULT(self, x, y):
''' Test operation
'''
return x * y
def _TIMEOUT(self, delay=1):
''' Lasts longer than the given delay to trigger a timeout '''
print '_TIMEOUT(%d) called' % delay
time.sleep(delay+1)
def _FAILS(self):
raise Exception('This is a TEST exception!!')
# private :
def _connect(self, resetStats=True):
if self._checkConnected(): # already connected
return
# Make several attempts ONLY in case of a communication error:
self._ftp = None
for i in range(self.CX_MAX_TRIES):
vLog('%s: Connecting [%d/%d] to ftp://%s as %s...' %
(self._name, i+1, self.CX_MAX_TRIES, self._host, self._user))
try:
self._ftp = ftplib.FTP()
self._ftp.connect(self._host, self._port)
self._ftp.login(self._user, self._passwd)
except socket.error, e:
vLog('%s: socket error: %s' % (self._name, e))
if i+1 < self.CX_MAX_TRIES:
# wait more and more time before each new attempt:
time.sleep(self.CX_WAIT_INTERVAL * i)
continue # try several times
except (ftplib.error_temp, ftplib.error_perm), e:
errMsg = e.args[0]
errCode = int(errMsg.split()[0])
# Empirical detection of max nb of connections exceeded:
# free.fr, GuildFTPd: ftplib.error_temp: 421 Too many cx....
# HostPC: ftplib.error_perm: 530 Sorry, the maximum number...
# unfortunately also used for a wrong login
# Web2.mcn.org: errCode 530 used for authentication failed
# "530 Authentication failed, sorry"
if errCode == 421 or (errCode==530 and
not errMsg.startswith('Login incorrect') and
not errMsg.startswith('530 Authentication failed')):
raise Ftp.ETooManyConnections('%s: Too many connections'
': %s' % (self._name, e))
elif errCode == 530 or isinstance(e, ftplib.error_perm):
raise Exception('%s: Ftp authentication for "%s" '
'failed: %s' % (self._name, self._user, e))
else:
raise Exception('%s: Ftp connection failed: %s: %s' %
(self._name, e.__class__.__name__, e))
except Exception, e:
raise Exception('%s: Ftp connection failed: %s: %s' %
(self._name, e.__class__.__name__, e))
else:
break # OK
else:
raise Exception("%s: Can't (re)connect to Ftp %s after "
"%d attempts." % (self._name, self._host,
self.CX_MAX_TRIES))
#print self._ftp.getwelcome() ##
# Always have current dir at logical root:
self._MKDIRS(self._rootDir) ##??
self._ftp.cwd(self._rootDir)
vLog('%s: Connected, root dir is %s' % (self._name, self._ftp.pwd()))
if resetStats:
self.clearStats()
self._connected = True
def _disconnect(self):
if self._checkConnected():
try: self._ftp.close()
except: pass
self._ftp = None
self._connected = False
vLog("%s: disconnected." % self._name)
def _checkConnected(self):
''' Checks the connection and updates (and returns) _connected.
'''
self._connected = False
if self._ftp is not None:
try:
self._ftp.voidcmd('NOOP')
except (EOFError, socket.error), e:
vLog('%s: Connection broken: %s' % (self._name, e))
except ftplib.all_errors, e:
vLog('%s: communication error (%s) while checking connection '
'=> assume not connected' % (self._name, e))
else:
self._connected = True
return self._connected
def _isText(self, file, checkContentIfNeeded=True):
''' Tries to determine whether file is text or binary.
@return: True if text, False if binary or impossible
to determine.
'''
return self._fileClassifier.isFileText(file, checkContentIfNeeded)
def _getLocalFileSize(self, localFile):
''' Returns the size of the given local file, or 0 if undetermined.
'''
try:
return os.stat(localFile)[-4]
except IndexError:
error('IndexError while os.stat(%s)[-4], stat=' %
(localFile, os.stat(localFile)))
return 0
# End class FtpAgent
#-------------------------------------------------------------------------
class _Excluder(object):
#-------------------------------------------------------------------------
''' Excludes files based on their match against a list of patterns.
'''
def __init__(self, excludes):
self.excludes = excludes # list of patterns to exclude
self._reExcludes = [re.compile(p) for p in excludes]
self._path, self._pattern = None, None
def shouldBeExcluded(self, path, logIt=True):
if not path[0] == '/': # dest path always rel. to root dir
absPath = '/' + path
else:
absPath = path
for i, reExclude in enumerate(self._reExcludes):
if reExclude.match(absPath):
if logIt:
vLog('-- File %s excluded [matches %s]' %
(absPath, self.excludes[i]))
self._path, self._pattern = absPath, self.excludes[i]
#print '_Excluder(%s): MATCH "%s"' % (path, self.excludes[i])##
return True
#print '_Excluder(%s): no match' % path ###
return False
def getLastExclude(self):
''' Returns info about last file excluded.
To be called just after shouldBeExcluded() returning True.
'''
return (self._path, self._pattern)
# End class _Excluder
_nullExcluder = _Excluder([]) # An excluder excluding nothing !
#-------------------------------------------------------------------------
def _sendEmailReport(changes, rootPath, exclusions, siteDesc, toAddrs,
fromAddr, smtpServer='localhost', smtpUser=None,
smtpPwd=None):
#-------------------------------------------------------------------------
''' Sends an email to report on changes detected during reverse sync.
'''
import smtplib
subject = 'File Changes Scan for site %s' % siteDesc
# Format the message in both html and plain text:
txt = ('This mail was automatically sent to you by the p4ftpsync program '
'to inform you of file changes on the %s site.\n' % siteDesc)
html = ('<html><body>\n<div style="font-size:smaller">%s</div>\n' % txt)
txt += '-'*60 + '\n\n'
html += '<hr />\n'
if not changes:
txt += '-> No changes.\n'
html += '-> <b>No changes</b><br />\n'
else:
txt += 'The following file changes have been detected on the site:\n\n'
html += ('The following file changes have been detected on the site:'
'<br />\n')
html += '<ul>\n'
changes = sorted([(f1, op) for op, f1, f2 in changes])
for f, op in changes:
if f.startswith(rootPath):
f = f[len(rootPath):].replace('\\', '/')
txt += ' %s (%s)\n' % (f, op)
html += ' <li>%s (<b>%s</b>)</li>\n' % (f, op.lower())
html += '</ul>\n'
txt += '-'*60 + '\n\n'
html += '<hr />\n'
txt += ('Note: The files matching the following patterns have been '
'ignored:\n %s\n' % exclusions)
html += ('<div style="font-size:smaller">Note: The files matching the '
'following patterns have been ignored :<br />\n'
' %s</div>\n' % ', '.join(exclusions))
html += '</body></html>\n'
msg = _createHtmlMail(html, txt, subject, toAddrs)
#print 'Mail message = %s\n' % msg ###
# Send the email :
segs = smtpServer.split(':') # host[:port], default port 25
if len(segs) == 2:
host, port = segs
else:
host, port = smtpServer, 25
smtp = smtplib.SMTP(host, port)
if smtpUser:
if smtpPwd is None: smtpPwd = ''
smtp.login(smtpUser, smtpPwd)
try:
smtp.sendmail(fromAddr, toAddrs.split(','), msg)
finally:
smtp.quit()
log('Report email sent to %s' % toAddrs)
#-------------------------------------------------------------------------
def _createHtmlMail (html, text, subject, toAddrs):
#-------------------------------------------------------------------------
'''Create a mime-message that will render HTML in popular
MUAs, text in better ones.
[Barely adapted from Python Recipes:
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/67083]
'''
import MimeWriter
import mimetools
import cStringIO
out = cStringIO.StringIO() # output buffer for our message
htmlin = cStringIO.StringIO(html)
txtin = cStringIO.StringIO(text)
writer = MimeWriter.MimeWriter(out)
#
# set up some basic headers... we put subject here
# because smtplib.sendmail expects it to be in the
# message body
#
writer.addheader("Subject", subject)
writer.addheader("MIME-Version", "1.0")
writer.addheader('To', toAddrs)
#
# start the multipart section of the message
# multipart/alternative seems to work better
# on some MUAs than multipart/mixed
#
writer.startmultipartbody("alternative")
writer.flushheaders()
#
# the plain text section
#
subpart = writer.nextpart()
subpart.addheader("Content-Transfer-Encoding", "quoted-printable")
pout = subpart.startbody("text/plain", [("charset", 'us-ascii')])
mimetools.encode(txtin, pout, 'quoted-printable')
txtin.close()
#
# start the html subpart of the message
#
subpart = writer.nextpart()
subpart.addheader("Content-Transfer-Encoding", "quoted-printable")
#
# returns us a file-ish object we can write to
#
pout = subpart.startbody("text/html", [("charset", 'us-ascii')])
mimetools.encode(htmlin, pout, 'quoted-printable')
htmlin.close()
#
# Now that we're done, close our writer and
# return the message body
#
writer.lastpart()
msg = out.getvalue()
out.close()
return msg
def _toDoAsString(toDo, excluder=None, preview=False):
''' Returns a formatted string representing the given toDo list.
NB: Only ADD, UPDATE, DELETE ops are processed; "Exclusions"
are disregarded.
'''
l = []
for op in toDo:
print 'op=%s' % str(op) ####
if len(op) > 3:
action, src, target, permissions = op
else:
action, src, target = op
permissions = None
if excluder and excluder.shouldBeExcluded(target, False):
continue
if action == ADD:
opDesc = 'added as'
elif action == UPDATE:
opDesc = 'updating'
elif action == DELETE:
opDesc = 'deleted as'
else:
continue
if preview: l.append('Sync preview: ')
if permissions is not None:
perms = ' with perm. %s' % oct(permissions)
else:
perms = ''
l.append('%s - %s %s%s\n' % (src, opDesc, target, perms))
return ''.join(l)
def _test(args):
### Copy the TMP directory:
if len(args) < 2:
action = 'ADD'
else:
action = args[1].upper()
if action not in (ADD, UPDATE, DELETE):
fatal('Bad action "%s" for test mode' % action)
sys.exit(1)
srcDir = os.environ['TMP']
ftpHost = "ftp.rgruet.com"
ftpUser = "rgruet"
ftpPasswd = raw_input('Password for %s@%s ? ' %
(ftpUser, ftpHost)).strip()
ftpRoot = "/domains/rgruet.com/public_html/tmp/"
def visit(args, dirName, names):
root, toDo = args
l = len(root)
if root[-1] not in ('/', '\\'):
l += 1
for name in names:
src = os.path.join(dirName, name)
if not os.path.isdir(src):
target = src[l:].replace('\\', '/')
#print "%s\t%s" % (src, target)
toDo.append((action, src, target))
toDo = []
os.path.walk(srcDir, visit, (srcDir, toDo))
ftp = execFTPToDo(toDo, ftpHost, ftpUser, ftpPasswd, ftpRoot,
verbose=True)
def _askForLocalSync(syncSpec, **opts):
''' Check local sync state and asks the user what to do.
@return: 0:No need to sync 1:do not sync (choice), 2:do sync, 3:cancel
'''
# Disable temporarily option -f (otherwise getP4SyncActions would always
# return a non empty toDo list) :
force = opts['force']
opts['force'] = False
try:
toDo, scriptPath = getP4SyncActions(syncSpec,_nullExcluder,True,**opts)
finally:
opts['force'] = force
if not toDo:
return 0 # don't sync, already up to date
p4Client = opts['p4Client']
prompt1 ='''\
The local P4 client (%s) appears not to be synced to the latest revision
of %s; the comparison with the remote site might be incorrect because there is
a chance that the remote site has been modified while not up to date.
You can take one of the following actions:
1) [default] Do not P4 sync %s, but still proceed to reverse sync. If the
local site has been changed since the last sync to the remote site, there
will be conflicts in the final P4 changelist that will need to be
resolved.
2) Manually P4 sync %s to the head revision, then proceed to reverse
sync. The remote site was probably not up to date before it was modified,
so continuing will *overwrite* any change performed locally since the last
sync to the remote site.
3) Cancel (Exit)
Your choice ? ''' % (p4Client, syncSpec, p4Client, p4Client)
prompt2 = 'Invalid option, try again [1-3]: '
prompt = prompt1
while True:
r = raw_input(prompt).strip()
if not r: return 1 # default
try:
r = int(r)
except:
r = -1
if r in (1, 2, 3):
return r
prompt = prompt2
print 'invalid option, try again.'
#///////////////////////////////////////////
#// M A I N //
#///////////////////////////////////////////
if __name__ == "__main__":
args = sys.argv[1:]
if args and args[0] == 'test':
_test(args) # Secret Option !!
else: # Parse arguments into a dict :
vLog('\n') # separator, in log only
opts = _getArgs()
toDo = None
_verbose = opts['verbose'] # control global verbosity for vInfo,
# and vLog fcts
log('#'*70, display=False) # separator, in log only
log('p4ftpsync %s' % __version__)
log('#'*70, display=False) # separator, in log only
vLog('Options: %s' % opts)
ftp = None
try:
syncSpec = opts['whatToSync']
ftpHost, ftpUser, ftpPasswd, ftpRoot, excludes = (
opts['ftpHost'], opts['ftpUser'],
opts['ftpPasswd'], opts['ftpRoot'],
opts['excludes'])
p4Client, p4User = opts['p4Client'], opts['p4User']
p4Port, p4Passwd = opts['p4Port'], opts['p4Passwd']
excluder = _Excluder(opts['excludes'])
preview = opts['test']
if preview: mode = ' (TEST/PREVIEW mode)'
else: mode = ''
if opts['reverse']:
log('\n============= Reverse Syncing %s%s '
'(remote->local) ==============\n' % (syncSpec, mode))
log('(Detect changes on FTP site %s@%s and create a P4 '
'changelist for client %s)' % (ftpUser,ftpHost, p4Client))
p4ClientPath = getP4ClientPathFor(syncSpec, **opts)
# Is p4client synced to last revision ?
r = _askForLocalSync(syncSpec, **opts)
if r == 3:
log('Cancel & exit.'); sys.exit(3)
if r in (0, 2):
# Even if the client was supposedly up to date (r==0),
# there is a chance that the space contains extra files not
# in P4 (e.g. files to be added but add was canceled),
# which will affect the file comparison => to be sure, we
# clear the space and re-sync to head revision :
log('Clear local client space before syncing to head '
'revision')
shutil.rmtree(p4ClientPath, True)
log('Sync local client to head revision before comparing')
force = opts['force']
opts['force'] = True
_p4Sync(syncSpec, False, **opts)
opts['force'] = force
elif r == 1:
warning('local sync not synced to head revision. The '
'final P4 changelist may include conflicts that '
'will need to be manually resolved.')
else:
raise Exception('invalid return value (%d) from '
'_askForLocalSync()' % r)
# Get info about files:
revStampMgr = _P4RevisionStampMgr(syncSpec, **opts)
# Compare local and remote (ftp) site:
ftp = Ftp(ftpHost, ftpUser, ftpPasswd)
comparator = Local2FtpDirComparator(ftp, revStampMgr, ftpRoot,
opts['force'])
try:
d1 = p4ClientPath
d2 = ftpRoot
log('Comparing (local) dir %s and (remote) dir. %s' %
(d1, d2))
changes = comparator.cmpDirs(d1, d2, excluder)
vLog('-> Comparison complete.\nChanges = %s' % changes) ###
if changes:
if not preview:
# Create (and submit if required) a P4 changelist :
comment = opts['comment']
if not comment:
comment = ('%s: integrated changes made on '
'FTP site %s@%s.' % (syncSpec,
ftpUser, ftpHost))
chgListNb = P4SubmitChanges(changes, p4Port,
p4Client, p4User, p4Passwd, comment, opts['submit'], preview)
# Adjust the file head rev stamp in the cache
# before we save it, so it reflects the changes
revStampMgr.setHeadRevStampForCurrentChanges(
chgListNb)
comparator.adjustCache(changes, p4ClientPath)
comparator.saveCache() # AFTER comparison complete and
# cache possibly adjusted!
finally:
comparator.getFtpTree().clean() # delete tmp files.
# Send an email to report on changes, if requested :
if opts['mailto'] and not preview:
toAddrs = opts['mailto']
siteDesc = '[%s]%s' % (ftpHost, ftpRoot)
_sendEmailReport(changes, p4ClientPath, excludes, siteDesc,
toAddrs, opts['fromAddr'],
opts['smtpServer'], opts['smtpUser'],
opts['smtpPasswd'])
else:
log('\n============== Syncing %s%s '
'(local->remote) ================\n' % (syncSpec, mode))
log('(Synchronize local P4 client %s and transfer the '
'changes to FTP site %s@%s)' % (p4Client, ftpUser,
ftpHost))
# P4 sync the client and build the list of update actions:
toDo, scriptPath = getP4SyncActions(syncSpec, excluder, False,
**opts)
if toDo:
if preview:
log('Sync would do the following:\n%s' %
_toDoAsString(toDo, excluder, True))
else:
# Sync the remote site via FTP:
vLog('Execute update script %s ...' % scriptPath)
cmd = 'python %s' % scriptPath
if _verbose:
cmd += ' -v'
# Wish I could use popen3 here and catch stderr, but
# doing so will result in output not visible until
# the cmd is copmpleted !
if os.system(cmd) != 0:
raise Exception("Error while executing script %s" %
scriptPath)
except SystemExit:
pass
except Exception, e:
log('\n**Error** %s: %s' % (e.__class__.__name__, str(e)))
if ftp:
try: ftp.stop(True)
except: pass
if _verbose: raise
else:
if ftp:
try: ftp.stop(True)
except: pass
log('p4ftpsync - command completed.')