# test_p4python.PY -- Various tests for p4python - python integration with office. # # Robert Cowham, Vaccaperna Systems Ltd # #---Imports import os import sys # Add the nearby "code" directory to the module loading path sys.path.insert(0, '.') import copy import imp import popen2 import re import socket import string import time import types import shutil import tempfile import getopt import unittest import p4 # Main class to test #---Code verbose = 0 slow = 1 class TestCase(unittest.TestCase): def __call__(self, result=None): if result is None: result = self.defaultTestResult() self.result = result unittest.TestCase.__call__(self, result) def addFailure(self, msg): try: raise AssertionError, msg except AssertionError: self.result.addFailure(self,self._TestCase__exc_info()) hostname = string.lower(string.split(socket.gethostname(), '.')[0]) config_filename = 'config_' + hostname + '.py' if not os.path.exists(config_filename): print "Could not find config file", config_filename config_file = open(config_filename) try: imp.load_source('config', config_filename, config_file) finally: config_file.close() original_configuration = copy.copy(sys.modules['config'].__dict__) import config # os.environ['PATH'] = os.environ['PATH'] + ";" + config.p4_path # The default temporary file prefix starts with an '@'. But # that would mean that temporary files will look like revision # specifications to Perforce. So use a prefix that's acceptable # to Perforce. tempfile.gettempprefix = lambda: '%d.' % os.getpid() # We need to log to a log file. The P4OFC log will get redirected to # this file, as will the output of various commands. log_filename = (time.strftime('P4OFC.%Y%m%dT%H%M%S.log', time.gmtime(time.time()))) log_filename = os.path.abspath(log_filename) log_file = open(log_filename, "a") def log_exception(): import sys, traceback, string type, val, tb = sys.exc_info() log_message(string.join(traceback.format_exception(type, val, tb), '')) del type, val, tb def log_message(msg): date = time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(time.time())) log_file.write("%s %s\n" % (date, msg)) log_file.flush() if verbose: print "%s %s\n" % (date, msg) sys.stdout.write("P4Python test suite, logging to %s.\n" % log_filename) sys.stdout.flush() # Perforce # # This class supplies the restart_perforce method # It also provides "system", which is like os.system, # but captures output, checks for errors, and writes to the log. class Perforce: # Temporary directory for Perforce server and associated files. p4dir = None # Stop Perforce server # # We used to be able to stop via "p4 admin stop", but due to password stuff, # just kill the process. def stop_perforce(self): self.system("stop_p4d.py", ignore_failure = 0) # Start a new Perforce server # Make a completely fresh Perforce server, with a new repository. def start_perforce(self, clean=True): if os.name == "nt": import win32api if clean: # Make a new repository directory. self.tempdir = tempfile.mktemp() os.mkdir(self.tempdir) log_message("Perforce repository directory %s." % self.tempdir) self.p4dir = os.path.join(self.tempdir, 'p4root') os.mkdir(self.p4dir) # Copy the license if config.p4_license_file: shutil.copyfile(config.p4_license_file, os.path.join(self.p4dir, 'license')) # Copy depot and checkpoint to appropriate place unzipper = unzip.unzip() zipsource = "data/depot.zip" zipdest = os.path.join(self.p4dir, 'depot') unzipper.extract(zipsource, zipdest) # Recreate db files from checkpoint self.system('""%s"" -r %s -jr %s' % (config.p4_server_2004_2_executable, self.p4dir, os.path.abspath(r'data\checkpoint')), ignore_failure = 0) # Now change to use full path name if os.name == "nt": self.tempdir = os.path.dirname(win32api.GetLongPathName(os.path.join(self.p4dir, 'db.have'))) log_message("Reset tempdir to '%s'." % (self.tempdir)) # Work out Perforce's port number and start a Perforce server. match = re.match(".*:([0-9]+)$", config.p4_port) if match: port = int(match.group(1)) else: port = 1999 # p4d on Windows doesn't detach, to we can't use "system" # to start it. if os.name == "nt": win32api.WinExec('"%s" -p 0.0.0.0:%d -r %s' % (config.p4_server_2004_2_executable, port, self.p4dir)) time.sleep(3) # Restart Perforce # # By killing the old server and starting a new one. def restart_perforce(self, clean=False): self.stop_perforce() self.start_perforce(clean) # Run command and check results # # Calls an external program, raising an exception upon any error # and returning standard output and standard error from the program, # as well as writing them to the log. def system(self, command, ignore_failure = 0, input_lines = None): log_file.write('Executing %s\n' % repr(command)) (child_stdin, child_stdout) = os.popen4(command) if input_lines: child_stdin.writelines(input_lines) child_stdin.close() output = child_stdout.read() result = child_stdout.close() log_file.write(output) if not ignore_failure and result: message = ('Command "%s" failed with result code %d.' % (command, result)) log_file.write(message + '\n') raise message return output # P4Python TEST CASE BASE CLASS # # The P4python_base class is a generic test case. It defines methods # for setting up the integration. # # Other test cases will inherit from this class. # # When a class implements several test cases, the methods that implement # test cases (in the PyUnit sense) should have names starting "test_". # When a class implements a single test case, the method should be # called "runTest". class P4Python_base(TestCase): # Set up everything so that test cases can run # def setup_everything(self, config_changes = {}): for key, value in config_changes.items(): setattr(config, key, value) def setUp(self): self.setup_everything() self.p4 = p4.P4() self.p4.port = config.p4_port self.p4.user = config.p4_user self.p4.password = config.p4_password self.p4.client = config.p4_client def run_test(self): pass #--- Test Cases # Normal operation class normal(P4Python_base): def setUp(self): super(normal, self).setUp() def runTest(self): "Test Normal operations including checkin" self.p4.exception_level = 0 self.p4.connect() output = self.p4.run("info") for e in self.p4.errors: log_message("Error: " + e) log_message(output) output = self.p4.run("fstat", "//depot/main/jam/jam.c") self.assert_(len(self.p4.errors) == 0, "found some errors") self.assert_(len(output) == 1, "wrong number of results") self.assert_(output[0].has_key('depotFile'), "dict key not found") log_message(output) output = self.p4.run("fstat", "//depot/main/jam/jam.*") self.assert_(len(self.p4.errors) == 0, "found some errors") self.assert_(len(output) == 6, "wrong number of results") self.assert_(output[0].has_key('depotFile'), "dict key not found") log_message(output) file = "//depot/main/jam/Makefile" output = self.p4.run("fstat", file) self.assert_(len(self.p4.errors) == 0, "found some errors") self.assert_(len(output) == 1, "wrong number of results") self.assertEquals(output[0]['depotFile'], file, "wrong depotfile") self.assert_(isinstance(output[0]['otherAction'], list), "not got list type for outputstat") log_message(output) self.p4.disconnect() self.p4.tagged() self.p4.connect() output = self.p4.run("files", "//depot/main/jam/jam.*") self.assert_(len(self.p4.errors) == 0, "found unexpected errors") self.assert_(len(output) == 6, "wrong number of results") self.assertEquals(output[0]['depotFile'], "//depot/main/jam/jam.1", "depotfile key not found") for key in ['rev', 'time', 'action', 'type', 'depotFile', 'change']: self.assert_(output[0].has_key(key), "expected key not present") log_message(output) # This is where things get more exciting with nested arrays etc. file = "//depot/main/p4-doc/man/p4d.1" output = self.p4.run("filelog", file) self.assertEquals(output[0]['rev'], ['6', '5', '4', '3', '2', '1'], "rev key not correct") self.assertEquals(output[0]['file'], [['//depot/r97.3/p4-doc/man/p4d.1'], ['//depot/r97.3/p4-doc/man/p4d.1'], ['//depot/r97.2/p4-doc/man/p4d.1'], None, None, ['//depot/main/p4-doc/man/p3d.1']], "file key not correct") self.assertEquals(output[0]['user'], ['seiwald', 'seiwald', 'seiwald', 'seiwald', 'seiwald', 'seiwald'], "user key not correct") self.assertEquals(output[0]['srev'], [['#1'], ['#none'], ['#none'], None, None, ['#none']], "srev key not correct") file = "//depot/main/p4-doc/man/p3d.1" output = self.p4.run("filelog", file) self.assertEquals(output[0]['rev'], ['4', '3', '2', '1'], "rev key not correct") self.assertEquals(output[0]['file'], [None, ['//depot/main/p4-doc/man/p4d.1', '//depot/main/p4-doc/man/p4d.1.c']], "file key not correct") self.assertEquals(output[0]['user'], ['seiwald', 'seiwald', 'peterk', 'seiwald'], "user key not correct") self.assertEquals(output[0]['srev'], [None, ['#none', '#none']], "srev key not correct") output = self.p4.run("describe", "-s", "704") self.assertEquals(output[0]['rev'], ['5'], "rev key not correct") self.assertEquals(output[0]['client'], 'darn', "client key not correct") self.assertEquals(output[0]['depotFile'], ['//depot/main/www/pitch/page01.html'], "depotFile key not correct") self.assertEquals(output[0]['desc'], 'Pitch: fixed typo, then rewrote first three paragraphs\n' + 'to make them EVEN MORE GRIPPING!\n', "desc key not correct") output = self.p4.run("describe", "-s", "709") self.assertEquals(output[0]['rev'], ['134', '33'], "rev key not correct") self.assertEquals(output[0]['client'], 'fork-beos', "client key not correct") self.assertEquals(output[0]['depotFile'], ['//depot/main/jam/Jambase', '//depot/main/jam/jambase.c'], "depotFile key not correct") self.assertEquals(output[0]['desc'], 'Upgrade to latest metrowerks on Beos -- the Intel one.\n', "desc key not correct") # Try special form of run output2 = self.p4.run_describe("-s", "709") self.assertEquals(output, output2) # Spec output class specs(P4Python_base): def setUp(self): super(specs, self).setUp() def runTest(self): "Test spec operations" self.p4.exception_level = 1 self.p4.parse_forms() self.p4.connect() output = self.p4.run("info") output = self.p4.run("change", "-o", "709") self.assertEquals(output[0]['Change'], '709') self.assertEquals(output[0]['Client'], 'fork-beos') self.assertEquals(output[0]['Date'], '2001/12/24 01:06:32') self.assertEquals(output[0]['Description'], 'Upgrade to latest metrowerks on Beos -- the Intel one.\n') output = self.p4.run("edit", "//depot/main/jam/jam.c") self.assert_(len(output) == 1, "file not opened for edit") output = self.p4.run("edit", "//depot/main/jam/jam.h") self.assert_(len(output) == 1, "file not opened for edit") submit = self.p4.run("change", "-o")[0] submit["Description"] = "Test submit" self.assert_(len(submit["Files"]) == 2, "files not opened for edit") self.p4.input = submit output = self.p4.run("submit", "-i") self.assert_(re.match("Change \d+.* submitted\.", output[-1]), "submit didn't work") # Now try the special options output = self.p4.run("edit", "//depot/main/jam/jam.c") self.assert_(len(output) == 1, "file not opened for edit") output = self.p4.run("edit", "//depot/main/jam/jam.h") self.assert_(len(output) == 1, "file not opened for edit") submit = self.p4.fetch_change() submit["Description"] = "Test submit" self.assert_(len(submit["Files"]) == 2, "files not opened for edit") output = self.p4.save_submit(submit) self.assert_(re.match("Change \d+.* submitted\.", output[-1]), "submit didn't work") client1 = self.p4.fetch_client() client3 = self.p4.fetch_client("bruno_ws") client2 = self.p4.run("client", "-o")[0] self.assertEquals(client1, client2, "clients not equal") self.assertEquals(client1, client3, "more clients not equal") job_count = len(self.p4.run("jobs")) job = self.p4.fetch_job() job["Description"] = "my new job" # Note \n will be tacked on anyway self.p4.save_job(job) jobs = self.p4.run("jobs") job = self.p4.fetch_job("newjob") job["Description"] = "another new job\n" self.p4.save_job(job) new_jobs = self.p4.run("jobs") for j in new_jobs: if j["Job"] == job["Job"]: break self.assertEquals(j["Description"], job["Description"]) self.assertEquals(j["Job"], "newjob") self.assertEquals(j["Job"], job["Job"]) output = self.p4.run("job", "-d", "newjob") ## print output new_jobs = self.p4.run("jobs") self.assertEquals(jobs, new_jobs) for j in new_jobs: if j["Description"] in ["another new job\n", "my new job\n"]: output = self.p4.delete_job("-f", j["Job"]) self.assert_(re.match(r"Job %s deleted." % j["Job"], output)) jobs = self.p4.run("jobs") self.assertEquals(job_count, len(jobs)) # Exceptions class exceptions(P4Python_base): def setUp(self): super(exceptions, self).setUp() def runTest(self): "Test exceptions being generated for errors and warnings" try: output = self.p4.run("info") except: log_exception() for e in self.p4.errors: log_message(e) else: fail("expected an error exception") self.p4.connect() try: output = self.p4.run("fred") except: log_exception() for e in self.p4.errors: log_message(e) else: fail("expected an error exception") self.p4.disconnect() self.p4.exception_level = 0 self.p4.connect() output = self.p4.run("fred") self.assert_(len(self.p4.errors) > 0, "no errors found") for e in self.p4.errors: log_message(e) class warnings(P4Python_base): def setUp(self): super(warnings, self).setUp() def runTest(self): "Test exceptions being generated for warnings" self.p4.connect() try: output = self.p4.run("fred") except: log_exception() for e in self.p4.errors: log_message(e) else: fail("expected an error exception") self.p4.disconnect() self.p4.exception_level = 0 # RUNNING THE TESTS def tests(): suite = unittest.TestSuite() # Decide which tests to run tests = [normal, exceptions, specs] for t in tests: suite.addTest(t()) return suite if __name__ == "__main__": # Filter out our arguments before handing unknown ones on argv = [] argv.append(sys.argv[0]) try: opts, args = getopt.getopt(sys.argv[1:], "hv", ["help", "verbose"]) except getopt.GetoptError: print "Usage: -v/verbose" sys.exit(2) for opt, arg in opts: if opt in ("-v", "--verbose"): verbose = 1 argv.append("-v") elif opt in ("-h", "--help"): argv.append("-h") # pass it through else: # If unknown pass them on argv.append(opt) argv.append(arg) unittest.main(defaultTest="tests", argv=argv) # # $Id: //depot/projects/p4ofc/main/test/test_p4ofc.py#43 $