#!/usr/bin/env python '''Pure Python Perforce client. This module tries to be API compatible to the excellent P4Python by Robert Cowham (see http://public.perforce.com/guest/robert_cowham/perforce/API/python/index.html) ''' __author__ = "Miki Tebeka " __version__ = "0.1.0" from sys import platform from os import environ, popen, chdir from os.path import join from marshal import load, dump, loads from subprocess import Popen, PIPE from time import strftime # Find p4 executable _REGKEY = r"SOFTWARE\perforce\environment" _REGVAL = "P4INSTROOT" if platform == "win32": from _winreg import OpenKey, CloseKey, QueryValueEx, HKEY_LOCAL_MACHINE key = OpenKey(HKEY_LOCAL_MACHINE, _REGKEY) instdir, key_type = QueryValueEx(key, _REGVAL) CloseKey(key) _P4EXE = join(instdir, "p4.exe") elif platform == "cygwin": regkey = "/HKLM/%s/%s" % (_REGKEY.replace("\\", "/"), _REGVAL) instdir = popen("regtool get %s" % regkey).read().strip() P4 = r"%s\%s" % (instdir, "p4.exe") _P4EXE = popen("cygpath -au \"%s\"" % P4).read().strip() else: # *Nix _P4EXE = popen("which p4").read().strip() _RUN = "run_" _FETCH = "fetch_" _DELETE = "delete_" _SAVE = "save_" class P4Error(Exception): '''Perforce Error''' def __init__(self, value=None): Exception.__init__(self) self.value = value def __str__(self): if self.value: return str(self.value) else: return "Perforce Error" class DummyP4Python: '''Dummy function to conform with original p4python Placed here away from the "real" code''' def connect(self): return 1 def disconnect(self): return 1 def dropped(self): return 0 def parse_forms(self): return 1 def tagged(self): raise NotImplementedError def translate(self, output, content=-2, fnames=-2, dialog=-2): raise NotImplementedError def login(self, password): raise NotImplementedError def passwd(self, password): raise NotImplementedError class P4(DummyP4Python): '''Perforce API client''' def __init__(self, use_iter=0): self.user = "" # User name ($P4USER) self.client = "" # Client name ($P4CLIENT) self.port = "" # Port ($P4PORT) self.password = "" # Password ($P4PASSWD) self.errors = [] # Errors from last command self.warnings = [] # Warnings from last command self.cwd = "." # Current working directory # Exception level: # 0 not raise # 1 raise on errors # 2 raise on warnings self.exception_level = 1 self._input = None # Current input (for "-i" commands) # Return iterator or list if use_iter: self.run = self._run_iter else: self.run = self._run_list def _raise_error(self, error): '''Raise and error''' self.errors = [error] raise P4Error(self.errors) def _assert(self, result): '''Check if result has error and raise according to exception_level''' do_raise = 0 if result["code"] == "error": self.errors = result["data"] if self.exception_level > 0: do_raise = 1 if result["code"] == "warning": self.warnings = result["data"] if self.exception_level > 1: do_raise = 1 if do_raise: raise P4Error(result["data"]) def _run_iter(self, *args): '''Run command, iterator version''' try: chdir(self.cwd) except OSError, e: self._raise_error("can't change directory to %s (%s)" % \ (self.cwd, e)) command = [_P4EXE, "-G"] for attr, opt in ( ("user", "-u"), ("client", "-c"), ("port", "-p"), ("password", "-P")): val = getattr(self, attr) if val: command.append(opt) command.append(val) args = list(args) input = None while args: arg = args.pop(0) command.append(arg) if arg == "-i": input = args.pop(0) if (input == None) and self._input: input = self._input pipe = Popen(command, stdout=PIPE, stdin=PIPE) if input: dump(input, pipe.stdin) pipe.stdin.close() while 1: try: result = load(pipe.stdout) self._assert(result) yield result except ValueError, e: self._raise_error("bad return data") except EOFError: break pipe.wait() if (self.exception_level > 0) and (pipe.returncode != 0): self._raise_error("bad return code - %s" % pipe.returncode) def _run_list(self, *args): '''Run command, list version''' return list(self._run_iter(*args)) def __getattr__(self, attr): '''Hooks for all shortcut methods (such as save_ ...)''' if attr in self.__dict__: return self.__dict__[attr] if attr.startswith(_RUN): command = attr[len(_RUN):] def f(*args): return self.run(command, *args) return f elif attr.startswith(_FETCH): command = attr[len(_FETCH):] def f(*args): return list(self.run(command, "-o", *args))[0] return f elif attr.startswith(_SAVE): command = attr[len(_SAVE):] def f(*args): return self.run(command, "-i", *args) return f elif attr.startswith(_DELETE): command = attr[len(_DELETE):] def f(*args): return self.run(command, "-d", *args) return f raise AttributeError # High level function def new_label(p4, name, path, description, revision=""): '''Do a label "name" on path@revision with desctiption''' lspec = p4.fetch_label(name) lspec["Description"] = description lspec["Owner"] = p4.user lspec["Options"] = "unlocked" lspec["View"] = [path] p4.save_label(lspec) if revision: path += "@%s" % revision p4.run_labelsync("-l", name, path) lspec["Options"] = "locked" p4.save_label(lspec) def new_change(p4, description=""): '''Create new changelist, return change number''' if not description: description = "Change list for %s" % p4.user change = p4.fetch_change() if "Files" in change: del change["Files"] change["Description"] = description + "\n" res = p4.save_change(change) return res[0].split()[1] def _timestamp(): return strftime("%Y/%m/%d %H:%M:%S") def client_exists(p4, name): for client in p4.run_clients(): if client["client"] == name: return 1 return 0 def new_client(p4, name, root, mapping, description=""): '''Create new client''' if client_exists(p4, name): raise ValueError("client %s already exists" % name) if not description: description = "Created by %s" % p4.client client = p4.fetch_client() client = { "Description" : description + "\n", "Host" : "", "Update" : _timestamp(), "Access" : _timestamp(), "Client" : name, "Owner" : p4.user, "LineEnd" : "local", "Root" : root, "Options" : "noallwrite noclobber nocompress unlocked nomodtime normdir", "View" : mapping } p4.save_client(client) def branch_exists(p4, name): for branch in p4.run_branches(): if branch["branch"] == name: return 1 return 0 def new_branch(p4, name, source, dest, description=""): '''Create new branch''' if branch_exists(p4, name): raise ValueError("branch %s already exists" % name) if not description: description = "Created by %s" % p4.user description += "\n" branch = p4.fetch_branch(name) branch["Owner"] = p4.user branch["View"] = ["%s %s" % (source, dest)] branch["Description"] = description p4.save_branch(branch) if __name__ == "__main__": from sys import argv p4 = P4() try: ret = p4.run(*argv[1:]) for obj in ret: print obj print "-" * 40 except P4Error, e: raise SystemExit("error: %s" % e)