#!/usr/bin/env python
'''Pure Python Perforce client.
This module tries to be API compatible to the excellent P4Python by Robert
Cowham (see
http://public.perforce.com/guest/robert_cowham/perforce/API/python/index.html)
'''
__author__ = "Miki Tebeka <mtebeka@qualcomm.com>"
__version__ = "0.1.0"
from sys import platform
from os import environ, popen, chdir
from os.path import join
from marshal import load, dump, loads
from subprocess import Popen, PIPE
from time import strftime
# Find p4 executable
_REGKEY = r"SOFTWARE\perforce\environment"
_REGVAL = "P4INSTROOT"
if platform == "win32":
from _winreg import OpenKey, CloseKey, QueryValueEx, HKEY_LOCAL_MACHINE
key = OpenKey(HKEY_LOCAL_MACHINE, _REGKEY)
instdir, key_type = QueryValueEx(key, _REGVAL)
CloseKey(key)
_P4EXE = join(instdir, "p4.exe")
elif platform == "cygwin":
regkey = "/HKLM/%s/%s" % (_REGKEY.replace("\\", "/"), _REGVAL)
instdir = popen("regtool get %s" % regkey).read().strip()
P4 = r"%s\%s" % (instdir, "p4.exe")
_P4EXE = popen("cygpath -au \"%s\"" % P4).read().strip()
else: # *Nix
_P4EXE = popen("which p4").read().strip()
_RUN = "run_"
_FETCH = "fetch_"
_DELETE = "delete_"
_SAVE = "save_"
class P4Error(Exception):
'''Perforce Error'''
def __init__(self, value=None):
Exception.__init__(self)
self.value = value
def __str__(self):
if self.value:
return str(self.value)
else:
return "Perforce Error"
class DummyP4Python:
'''Dummy function to conform with original p4python
Placed here away from the "real" code'''
def connect(self):
return 1
def disconnect(self):
return 1
def dropped(self):
return 0
def parse_forms(self):
return 1
def tagged(self):
raise NotImplementedError
def translate(self, output, content=-2, fnames=-2, dialog=-2):
raise NotImplementedError
def login(self, password):
raise NotImplementedError
def passwd(self, password):
raise NotImplementedError
class P4(DummyP4Python):
'''Perforce API client'''
def __init__(self, use_iter=0):
self.user = "" # User name ($P4USER)
self.client = "" # Client name ($P4CLIENT)
self.port = "" # Port ($P4PORT)
self.password = "" # Password ($P4PASSWD)
self.errors = [] # Errors from last command
self.warnings = [] # Warnings from last command
self.cwd = "." # Current working directory
# Exception level:
# 0 not raise
# 1 raise on errors
# 2 raise on warnings
self.exception_level = 1
self._input = None # Current input (for "-i" commands)
# Return iterator or list
if use_iter:
self.run = self._run_iter
else:
self.run = self._run_list
def _raise_error(self, error):
'''Raise and error'''
self.errors = [error]
raise P4Error(self.errors)
def _assert(self, result):
'''Check if result has error and raise according to exception_level'''
do_raise = 0
if result["code"] == "error":
self.errors = result["data"]
if self.exception_level > 0:
do_raise = 1
if result["code"] == "warning":
self.warnings = result["data"]
if self.exception_level > 1:
do_raise = 1
if do_raise:
raise P4Error(result["data"])
def _run_iter(self, *args):
'''Run command, iterator version'''
try:
chdir(self.cwd)
except OSError, e:
self._raise_error("can't change directory to %s (%s)" % \
(self.cwd, e))
command = [_P4EXE, "-G"]
for attr, opt in (
("user", "-u"),
("client", "-c"),
("port", "-p"),
("password", "-P")):
val = getattr(self, attr)
if val:
command.append(opt)
command.append(val)
args = list(args)
input = None
while args:
arg = args.pop(0)
command.append(arg)
if arg == "-i":
input = args.pop(0)
if (input == None) and self._input:
input = self._input
pipe = Popen(command, stdout=PIPE, stdin=PIPE)
if input:
dump(input, pipe.stdin)
pipe.stdin.close()
while 1:
try:
result = load(pipe.stdout)
self._assert(result)
yield result
except ValueError, e:
self._raise_error("bad return data")
except EOFError:
break
pipe.wait()
if (self.exception_level > 0) and (pipe.returncode != 0):
self._raise_error("bad return code - %s" % pipe.returncode)
def _run_list(self, *args):
'''Run command, list version'''
return list(self._run_iter(*args))
def __getattr__(self, attr):
'''Hooks for all shortcut methods (such as save_ ...)'''
if attr in self.__dict__:
return self.__dict__[attr]
if attr.startswith(_RUN):
command = attr[len(_RUN):]
def f(*args):
return self.run(command, *args)
return f
elif attr.startswith(_FETCH):
command = attr[len(_FETCH):]
def f(*args):
return list(self.run(command, "-o", *args))[0]
return f
elif attr.startswith(_SAVE):
command = attr[len(_SAVE):]
def f(*args):
return self.run(command, "-i", *args)
return f
elif attr.startswith(_DELETE):
command = attr[len(_DELETE):]
def f(*args):
return self.run(command, "-d", *args)
return f
raise AttributeError
# High level function
def new_label(p4, name, path, description, revision=""):
'''Do a label "name" on path@revision with desctiption'''
lspec = p4.fetch_label(name)
lspec["Description"] = description
lspec["Owner"] = p4.user
lspec["Options"] = "unlocked"
lspec["View"] = [path]
p4.save_label(lspec)
if revision:
path += "@%s" % revision
p4.run_labelsync("-l", name, path)
lspec["Options"] = "locked"
p4.save_label(lspec)
def new_change(p4, description=""):
'''Create new changelist, return change number'''
if not description:
description = "Change list for %s" % p4.user
change = p4.fetch_change()
if "Files" in change:
del change["Files"]
change["Description"] = description + "\n"
res = p4.save_change(change)
return res[0].split()[1]
def _timestamp():
return strftime("%Y/%m/%d %H:%M:%S")
def client_exists(p4, name):
for client in p4.run_clients():
if client["client"] == name:
return 1
return 0
def new_client(p4, name, root, mapping, description=""):
'''Create new client'''
if client_exists(p4, name):
raise ValueError("client %s already exists" % name)
if not description:
description = "Created by %s" % p4.client
client = p4.fetch_client()
client = {
"Description" : description + "\n",
"Host" : "",
"Update" : _timestamp(),
"Access" : _timestamp(),
"Client" : name,
"Owner" : p4.user,
"LineEnd" : "local",
"Root" : root,
"Options" : "noallwrite noclobber nocompress unlocked nomodtime normdir",
"View" : mapping
}
p4.save_client(client)
def branch_exists(p4, name):
for branch in p4.run_branches():
if branch["branch"] == name:
return 1
return 0
def new_branch(p4, name, source, dest, description=""):
'''Create new branch'''
if branch_exists(p4, name):
raise ValueError("branch %s already exists" % name)
if not description:
description = "Created by %s" % p4.user
description += "\n"
branch = p4.fetch_branch(name)
branch["Owner"] = p4.user
branch["View"] = ["%s %s" % (source, dest)]
branch["Description"] = description
p4.save_branch(branch)
if __name__ == "__main__":
from sys import argv
p4 = P4()
try:
ret = p4.run(*argv[1:])
for obj in ret:
print obj
print "-" * 40
except P4Error, e:
raise SystemExit("error: %s" % e)