# test_p4python.PY -- Various tests for p4python - python API for Perforce # # Robert Cowham, Vaccaperna Systems Ltd # # Note that this set of tests demonstrates a variety of ways to use # P4Python. # # The tests assume that you have a test Perforce repository setup and a server # running. Please use a copy of the Perforce training repository to ensure # that reference filenames are present (also has appropriate history for some # tests (commands) to run. #---Imports import os import sys import copy import imp import popen2 import re import socket import string import time import types import shutil import tempfile import getopt import unittest # Add the nearby "code" directory to the module loading path to pick up p4 sys.path.insert(0, '..') import p4 # Main class to test #---Code verbose = 0 slow = 1 class TestCase(unittest.TestCase): def __call__(self, result=None): if result is None: result = self.defaultTestResult() self.result = result unittest.TestCase.__call__(self, result) def addFailure(self, msg): try: raise AssertionError, msg except AssertionError: self.result.addFailure(self,self._TestCase__exc_info()) hostname = string.lower(string.split(socket.gethostname(), '.')[0]) config_filename = 'config_' + hostname + '.py' if not os.path.exists(config_filename): print "Could not find config file", config_filename config_file = open(config_filename) try: imp.load_source('config', config_filename, config_file) finally: config_file.close() original_configuration = copy.copy(sys.modules['config'].__dict__) import config # os.environ['PATH'] = os.environ['PATH'] + ";" + config.p4_path # The default temporary file prefix starts with an '@'. But # that would mean that temporary files will look like revision # specifications to Perforce. So use a prefix that's acceptable # to Perforce. tempfile.gettempprefix = lambda: '%d.' % os.getpid() # We need to log to a log file. The P4OFC log will get redirected to # this file, as will the output of various commands. log_filename = (time.strftime('P4OFC.%Y%m%dT%H%M%S.log', time.gmtime(time.time()))) log_filename = os.path.abspath(log_filename) log_file = open(log_filename, "a") def log_exception(): import sys, traceback, string type, val, tb = sys.exc_info() log_message(string.join(traceback.format_exception(type, val, tb), '')) del type, val, tb def log_message(msg): date = time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(time.time())) log_file.write("%s %s\n" % (date, msg)) log_file.flush() if verbose: print "%s %s\n" % (date, msg) sys.stdout.write("P4Python test suite, logging to %s.\n" % log_filename) sys.stdout.flush() # P4Python TEST CASE BASE CLASS # # The P4python_base class is a generic test case. It defines methods # for setting up the integration. # # Other test cases will inherit from this class. # # When a class implements several test cases, the methods that implement # test cases (in the PyUnit sense) should have names starting "test_". # When a class implements a single test case, the method should be # called "runTest". class P4Python_base(TestCase): # Set up everything so that test cases can run # def setup_everything(self, config_changes = {}): for key, value in config_changes.items(): setattr(config, key, value) def setUp(self): self.setup_everything() self.p4 = p4.P4() self.p4.port = config.p4_port self.p4.user = config.p4_user self.p4.password = config.p4_password self.p4.client = config.p4_client def run_test(self): pass #--- Test Cases # Normal operation class normal(P4Python_base): def setUp(self): super(normal, self).setUp() def runTest(self): "Test Normal operations including checkin" self.p4.exception_level = 0 self.p4.connect() output = self.p4.run("info") for e in self.p4.errors: log_message("Error: " + e) log_message(output) output = self.p4.run("fstat", "//depot/main/jam/jam.c") self.assert_(len(self.p4.errors) == 0, "found some errors") self.assert_(len(output) == 1, "wrong number of results") self.assert_(output[0].has_key('depotFile'), "dict key not found") log_message(output) output = self.p4.run("fstat", "//depot/main/jam/jam.*") self.assert_(len(self.p4.errors) == 0, "found some errors") self.assert_(len(output) == 6, "wrong number of results") self.assert_(output[0].has_key('depotFile'), "dict key not found") log_message(output) file = "//depot/main/jam/Makefile" output = self.p4.run("fstat", file) self.assert_(len(self.p4.errors) == 0, "found some errors") self.assert_(len(output) == 1, "wrong number of results") self.assertEquals(output[0]['depotFile'], file, "wrong depotfile") self.assert_(isinstance(output[0]['otherAction'], list), "not got list type for outputstat") log_message(output) self.p4.disconnect() self.p4.tagged() self.p4.connect() output = self.p4.run("files", "//depot/main/jam/jam.*") self.assert_(len(self.p4.errors) == 0, "found unexpected errors") self.assert_(len(output) == 6, "wrong number of results") self.assertEquals(output[0]['depotFile'], "//depot/main/jam/jam.1", "depotfile key not found") for key in ['rev', 'time', 'action', 'type', 'depotFile', 'change']: self.assert_(output[0].has_key(key), "expected key not present") log_message(output) # This is where things get more exciting with nested arrays etc. file = "//depot/main/p4-doc/man/p4d.1" output = self.p4.run("filelog", file) self.assertEquals(output[0]['rev'], ['6', '5', '4', '3', '2', '1'], "rev key not correct") self.assertEquals(output[0]['file'], [['//depot/r97.3/p4-doc/man/p4d.1'], ['//depot/r97.3/p4-doc/man/p4d.1'], ['//depot/r97.2/p4-doc/man/p4d.1'], None, None, ['//depot/main/p4-doc/man/p3d.1']], "file key not correct") self.assertEquals(output[0]['user'], ['seiwald', 'seiwald', 'seiwald', 'seiwald', 'seiwald', 'seiwald'], "user key not correct") self.assertEquals(output[0]['srev'], [['#1'], ['#none'], ['#none'], None, None, ['#none']], "srev key not correct") file = "//depot/main/p4-doc/man/p3d.1" output = self.p4.run("filelog", file) self.assertEquals(output[0]['rev'], ['4', '3', '2', '1'], "rev key not correct") self.assertEquals(output[0]['file'], [None, ['//depot/main/p4-doc/man/p4d.1', '//depot/main/p4-doc/man/p4d.1.c']], "file key not correct") self.assertEquals(output[0]['user'], ['seiwald', 'seiwald', 'peterk', 'seiwald'], "user key not correct") self.assertEquals(output[0]['srev'], [None, ['#none', '#none']], "srev key not correct") output = self.p4.run("describe", "-s", "704") self.assertEquals(output[0]['rev'], ['5'], "rev key not correct") self.assertEquals(output[0]['client'], 'darn', "client key not correct") self.assertEquals(output[0]['depotFile'], ['//depot/main/www/pitch/page01.html'], "depotFile key not correct") self.assertEquals(output[0]['desc'], 'Pitch: fixed typo, then rewrote first three paragraphs\n' + 'to make them EVEN MORE GRIPPING!\n', "desc key not correct") output = self.p4.run("describe", "-s", "709") self.assertEquals(output[0]['rev'], ['134', '33'], "rev key not correct") self.assertEquals(output[0]['client'], 'fork-beos', "client key not correct") self.assertEquals(output[0]['depotFile'], ['//depot/main/jam/Jambase', '//depot/main/jam/jambase.c'], "depotFile key not correct") self.assertEquals(output[0]['desc'], 'Upgrade to latest metrowerks on Beos -- the Intel one.\n', "desc key not correct") # Try special form of run output2 = self.p4.run_describe("-s", "709") self.assertEquals(output, output2) # Spec output class specs(P4Python_base): def setUp(self): super(specs, self).setUp() def runTest(self): "Test spec operations" self.p4.exception_level = 1 self.p4.parse_forms() self.p4.connect() output = self.p4.run("info") output = self.p4.run("change", "-o", "709") self.assertEquals(output[0]['Change'], '709') self.assertEquals(output[0]['Client'], 'fork-beos') self.assertEquals(output[0]['Date'], '2001/12/24 01:06:32') self.assertEquals(output[0]['Description'], 'Upgrade to latest metrowerks on Beos -- the Intel one.\n') output = self.p4.run("edit", "//depot/main/jam/jam.c") self.assert_(len(output) == 1, "file not opened for edit") output = self.p4.run("edit", "//depot/main/jam/jam.h") self.assert_(len(output) == 1, "file not opened for edit") submit = self.p4.run("change", "-o")[0] submit["Description"] = "Test submit" self.assert_(len(submit["Files"]) == 2, "files not opened for edit") self.p4.input = submit output = self.p4.run("submit", "-i") self.assert_(re.match("Change \d+.* submitted\.", output[-1]), "submit didn't work") # Now try the special options output = self.p4.run("edit", "//depot/main/jam/jam.c") self.assert_(len(output) == 1, "file not opened for edit") output = self.p4.run("edit", "//depot/main/jam/jam.h") self.assert_(len(output) == 1, "file not opened for edit") submit = self.p4.fetch_change() submit["Description"] = "Test submit" self.assert_(len(submit["Files"]) == 2, "files not opened for edit") output = self.p4.save_submit(submit) self.assert_(re.match("Change \d+.* submitted\.", output[-1]), "submit didn't work") client1 = self.p4.fetch_client() client3 = self.p4.fetch_client("bruno_ws") client2 = self.p4.run("client", "-o")[0] self.assertEquals(client1, client2, "clients not equal") self.assertEquals(client1, client3, "more clients not equal") job_count = len(self.p4.run("jobs")) job = self.p4.fetch_job() job["Description"] = "my new job" # Note \n will be tacked on anyway self.p4.save_job(job) jobs = self.p4.run("jobs") job = self.p4.fetch_job("newjob") job["Description"] = "another new job\n" self.p4.save_job(job) new_jobs = self.p4.run("jobs") for j in new_jobs: if j["Job"] == job["Job"]: break self.assertEquals(j["Description"], job["Description"]) self.assertEquals(j["Job"], "newjob") self.assertEquals(j["Job"], job["Job"]) output = self.p4.run("job", "-d", "newjob") ## print output new_jobs = self.p4.run("jobs") self.assertEquals(jobs, new_jobs) for j in new_jobs: if j["Description"] in ["another new job\n", "my new job\n"]: output = self.p4.delete_job("-f", j["Job"]) self.assert_(re.match(r"Job %s deleted." % j["Job"], output)) jobs = self.p4.run("jobs") self.assertEquals(job_count, len(jobs)) # Exceptions class exceptions(P4Python_base): def setUp(self): super(exceptions, self).setUp() def runTest(self): "Test exceptions being generated for errors and warnings" try: output = self.p4.run("info") except: log_exception() for e in self.p4.errors: log_message(e) else: fail("expected an error exception") self.p4.connect() try: output = self.p4.run("fred") except: log_exception() for e in self.p4.errors: log_message(e) else: fail("expected an error exception") self.p4.disconnect() self.p4.exception_level = 0 self.p4.connect() output = self.p4.run("fred") self.assert_(len(self.p4.errors) > 0, "no errors found") for e in self.p4.errors: log_message(e) class warnings(P4Python_base): def setUp(self): super(warnings, self).setUp() def runTest(self): "Test exceptions being generated for warnings" self.p4.connect() # Repeated sync should generate a warning - but we don't expect exception try: output = self.p4.run_sync("//depot/main/jam/jam.c#0") output = self.p4.run_sync("//depot/main/jam/jam.c") output = self.p4.run_sync("//depot/main/jam/jam.c") except: log_exception() for e in self.p4.errors: log_message("Error:" + e) for e in self.p4.warnings: log_message("Warning:" + e) log_message(e) self.fail("Didn't expect an exception") self.p4.disconnect() self.p4.exception_level = 2 self.p4.connect() # Repeated sync should generate a warning - this time we expect exception try: output = self.p4.run_sync("//depot/main/jam/jam.c#0") output = self.p4.run_sync("//depot/main/jam/jam.c") output = self.p4.run_sync("//depot/main/jam/jam.c") except: log_exception() for e in self.p4.errors: log_message("Error:" + e) for e in self.p4.warnings: log_message("Warning:" + e) log_message(e) else: self.fail("Expected an exception") # Ensure login works OK class login(P4Python_base): def setUp(self): super(login, self).setUp() def runTest(self): "Test login" self.p4.exception_level = 1 self.p4.connect() output = self.p4.run("login", "-s")[0] print output if re.match("Perforce password (P4PASSWD) invalid or unset.", output): self.p4.input = config.p4_password print self.p4.run("login") # RUNNING THE TESTS def tests(): suite = unittest.TestSuite() # Decide which tests to run tests = [login, normal, warnings, exceptions, specs] for t in tests: suite.addTest(t()) return suite if __name__ == "__main__": # Filter out our arguments before handing unknown ones on argv = [] argv.append(sys.argv[0]) try: opts, args = getopt.getopt(sys.argv[1:], "hv", ["help", "verbose"]) except getopt.GetoptError: print "Usage: -v/verbose" sys.exit(2) for opt, arg in opts: if opt in ("-v", "--verbose"): verbose = 1 argv.append("-v") elif opt in ("-h", "--help"): argv.append("-h") # pass it through else: # If unknown pass them on argv.append(opt) argv.append(arg) unittest.main(defaultTest="tests", argv=argv) # # $Id: //depot/projects/p4ofc/main/test/test_p4ofc.py#43 $