# test_p4python.PY -- Various tests for p4python - python API for Perforce # # Robert Cowham, Vaccaperna Systems Ltd # # Note that this set of tests demonstrates a variety of ways to use # P4Python. # # The tests assume that you have a test Perforce repository setup and a server # running. Please use a copy of the Perforce training repository to ensure # that reference filenames are present (also has appropriate history for some # tests (commands) to run. #---Imports import os import sys import copy import imp import popen2 import re import socket import string import time import types import shutil import tempfile import getopt import fileinput import unittest # Add the nearby "code" directory to the module loading path to pick up p4 sys.path.insert(0, '..') import p4 # Main class to test #---Code verbose = 0 slow = 1 class TestCase(unittest.TestCase): def __call__(self, result=None): if result is None: result = self.defaultTestResult() self.result = result unittest.TestCase.__call__(self, result) def addFailure(self, msg): try: raise AssertionError, msg except AssertionError: self.result.addFailure(self,self._TestCase__exc_info()) hostname = string.lower(string.split(socket.gethostname(), '.')[0]) config_filename = 'config_' + hostname + '.py' if not os.path.exists(config_filename): print "Could not find config file", config_filename config_file = open(config_filename) try: imp.load_source('config', config_filename, config_file) finally: config_file.close() original_configuration = copy.copy(sys.modules['config'].__dict__) import config # os.environ['PATH'] = os.environ['PATH'] + ";" + config.p4_path # The default temporary file prefix starts with an '@'. But # that would mean that temporary files will look like revision # specifications to Perforce. So use a prefix that's acceptable # to Perforce. tempfile.gettempprefix = lambda: '%d.' % os.getpid() # We need to log to a log file. The P4OFC log will get redirected to # this file, as will the output of various commands. log_filename = (time.strftime('P4OFC.%Y%m%dT%H%M%S.log', time.gmtime(time.time()))) log_filename = os.path.abspath(log_filename) log_file = open(log_filename, "a") def log_exception(): import sys, traceback, string type, val, tb = sys.exc_info() log_message(string.join(traceback.format_exception(type, val, tb), '')) del type, val, tb def log_message(msg): date = time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(time.time())) log_file.write("%s %s\n" % (date, msg)) log_file.flush() if verbose: print "%s %s\n" % (date, msg) sys.stdout.write("P4Python test suite, logging to %s.\n" % log_filename) sys.stdout.flush() # P4Python TEST CASE BASE CLASS # # The P4python_base class is a generic test case. It defines methods # for setting up the integration. # # Other test cases will inherit from this class. # # When a class implements several test cases, the methods that implement # test cases (in the PyUnit sense) should have names starting "test_". # When a class implements a single test case, the method should be # called "runTest". class P4Python_base(TestCase): # Set up everything so that test cases can run # def setup_everything(self, config_changes = {}): for key, value in config_changes.items(): setattr(config, key, value) def edit_file(self, filename): output = self.p4.run_where(filename)[0] localpath = output['path'] f = open(localpath, 'r') lines = f.readlines() f.close() f = open(localpath, 'w') lines.insert(0, "A new line\n") f.writelines(lines) f.close() def setUp(self): self.setup_everything() self.p4 = p4.P4() self.p4.port = config.p4_port self.p4.user = config.p4_user self.p4.password = config.p4_password self.p4.client = config.p4_client def run_test(self): pass #--- Test Cases # Normal operation class normal(P4Python_base): def setUp(self): super(normal, self).setUp() def runTest(self): "Test Normal operations including checkin" self.p4.exception_level = 0 self.p4.connect() output = self.p4.run("info") for e in self.p4.errors: log_message("Error: " + e) log_message(output) output = self.p4.run("fstat", "//depot/main/jam/jam.c") self.assert_(len(self.p4.errors) == 0, "found some errors") self.assert_(len(output) == 1, "wrong number of results") self.assert_(output[0].has_key('depotFile'), "dict key not found") log_message(output) output = self.p4.run("fstat", "//depot/main/jam/jam.*") self.assert_(len(self.p4.errors) == 0, "found some errors") self.assert_(len(output) == 6, "wrong number of results") self.assert_(output[0].has_key('depotFile'), "dict key not found") log_message(output) file = "//depot/main/jam/Makefile" output = self.p4.run("fstat", file) self.assert_(len(self.p4.errors) == 0, "found some errors") self.assert_(len(output) == 1, "wrong number of results") self.assertEquals(output[0]['depotFile'], file, "wrong depotfile") self.assert_(isinstance(output[0]['otherAction'], list), "not got list type for outputstat") log_message(output) self.p4.disconnect() self.p4.tagged() self.p4.connect() output = self.p4.run("files", "//depot/main/jam/jam.*") self.assert_(len(self.p4.errors) == 0, "found unexpected errors") self.assert_(len(output) == 6, "wrong number of results") self.assertEquals(output[0]['depotFile'], "//depot/main/jam/jam.1", "depotfile key not found") for key in ['rev', 'time', 'action', 'type', 'depotFile', 'change']: self.assert_(output[0].has_key(key), "expected key not present") log_message(output) # This is where things get more exciting with nested arrays etc. file = "//depot/main/p4-doc/man/p4d.1" output = self.p4.run("filelog", file) self.assertEquals(output[0]['rev'], ['6', '5', '4', '3', '2', '1'], "rev key not correct") self.assertEquals(output[0]['file'], [['//depot/r97.3/p4-doc/man/p4d.1'], ['//depot/r97.3/p4-doc/man/p4d.1'], ['//depot/r97.2/p4-doc/man/p4d.1'], None, None, ['//depot/main/p4-doc/man/p3d.1']], "file key not correct") self.assertEquals(output[0]['user'], ['seiwald', 'seiwald', 'seiwald', 'seiwald', 'seiwald', 'seiwald'], "user key not correct") self.assertEquals(output[0]['srev'], [['#1'], ['#none'], ['#none'], None, None, ['#none']], "srev key not correct") file = "//depot/main/p4-doc/man/p3d.1" output = self.p4.run("filelog", file) self.assertEquals(output[0]['rev'], ['4', '3', '2', '1'], "rev key not correct") self.assertEquals(output[0]['file'], [None, ['//depot/main/p4-doc/man/p4d.1', '//depot/main/p4-doc/man/p4d.1.c']], "file key not correct") self.assertEquals(output[0]['user'], ['seiwald', 'seiwald', 'peterk', 'seiwald'], "user key not correct") self.assertEquals(output[0]['srev'], [None, ['#none', '#none']], "srev key not correct") output = self.p4.run("describe", "-s", "704") self.assertEquals(output[0]['rev'], ['5'], "rev key not correct") self.assertEquals(output[0]['client'], 'darn', "client key not correct") self.assertEquals(output[0]['depotFile'], ['//depot/main/www/pitch/page01.html'], "depotFile key not correct") self.assertEquals(output[0]['desc'], 'Pitch: fixed typo, then rewrote first three paragraphs\n' + 'to make them EVEN MORE GRIPPING!\n', "desc key not correct") output = self.p4.run("describe", "-s", "709") self.assertEquals(output[0]['rev'], ['134', '33'], "rev key not correct") self.assertEquals(output[0]['client'], 'fork-beos', "client key not correct") self.assertEquals(output[0]['depotFile'], ['//depot/main/jam/Jambase', '//depot/main/jam/jambase.c'], "depotFile key not correct") self.assertEquals(output[0]['desc'], 'Upgrade to latest metrowerks on Beos -- the Intel one.\n', "desc key not correct") # Try special form of run output2 = self.p4.run_describe("-s", "709") self.assertEquals(output, output2) # Check for diff2 working file1 = "//depot/main/jam/jam.c" file2 = "//depot/main/jam/jam.h" output = self.p4.run("diff2", file1, file2)[0] self.assertEquals(output["depotFile"], file1) self.assertEquals(output["depotFile2"], file2) self.assertEquals(output["status"], "content") file1 = "//depot/main/jam/jambase" self.exception_level = 2 output = self.p4.run_sync(file1) output = self.p4.run_edit(file1) output = self.p4.run("diff", file1) self.edit_file(file1) output = self.p4.run("diff", file1) self.assert_(re.search(file1, output[0], re.IGNORECASE)) self.assert_(len(output[1:]) > 0) # Spec output class specs(P4Python_base): def setUp(self): super(specs, self).setUp() def runTest(self): "Test spec operations" self.p4.exception_level = 1 self.p4.parse_forms() self.p4.connect() output = self.p4.run("info") output = self.p4.run("change", "-o", "709") self.assertEquals(output[0]['Change'], '709') self.assertEquals(output[0]['Client'], 'fork-beos') self.assertEquals(output[0]['Date'], '2001/12/24 01:06:32') self.assertEquals(output[0]['Description'], 'Upgrade to latest metrowerks on Beos -- the Intel one.\n') output = self.p4.run("edit", "//depot/main/jam/jam.c") self.assert_(len(output) == 1, "file not opened for edit") output = self.p4.run("sync", "//depot/main/jam/jam.h") output = self.p4.run("edit", "//depot/main/jam/jam.h") self.assert_(len(output) == 1, "file not opened for edit") submit = self.p4.run("change", "-o")[0] submit["Description"] = "Test submit" self.assertEquals(len(submit["Files"]), 4) self.p4.input = submit output = self.p4.run("submit", "-i") self.assert_(re.match("Change \d+.* submitted\.", output[-1]), "submit didn't work") # Now try the special options output = self.p4.run("edit", "//depot/main/jam/jam.c") self.assert_(len(output) == 1, "file not opened for edit") output = self.p4.run("edit", "//depot/main/jam/jam.h") self.assert_(len(output) == 1, "file not opened for edit") submit = self.p4.fetch_change() submit["Description"] = "Test submit" self.assert_(len(submit["Files"]) == 2, "files not opened for edit") output = self.p4.save_submit(submit) self.assert_(re.match("Change \d+.* submitted\.", output[-1]), "submit didn't work") client1 = self.p4.fetch_client() client3 = self.p4.fetch_client("bruno_ws") client2 = self.p4.run("client", "-o")[0] self.assertEquals(client1, client2, "clients not equal") self.assertEquals(client1, client3, "more clients not equal") job_count = len(self.p4.run("jobs")) job = self.p4.fetch_job() job["Description"] = "my new job" # Note \n will be tacked on anyway self.p4.save_job(job) jobs = self.p4.run("jobs") job = self.p4.fetch_job("newjob") job["Description"] = "another new job\n" self.p4.save_job(job) new_jobs = self.p4.run("jobs") for j in new_jobs: if j["Job"] == job["Job"]: break self.assertEquals(j["Description"], job["Description"]) self.assertEquals(j["Job"], "newjob") self.assertEquals(j["Job"], job["Job"]) output = self.p4.run("job", "-d", "newjob") ## print output new_jobs = self.p4.run("jobs") self.assertEquals(jobs, new_jobs) for j in new_jobs: if j["Description"] in ["another new job\n", "my new job\n"]: output = self.p4.delete_job("-f", j["Job"]) self.assert_(re.match(r"Job %s deleted." % j["Job"], output)) jobs = self.p4.run("jobs") self.assertEquals(job_count, len(jobs)) # Check for diff2 working file1 = "//depot/main/jam/jam.c" file2 = "//depot/main/jam/jam.h" output = self.p4.run("diff2", file1, file2)[0] self.assertEquals(output["depotFile"], file1) self.assertEquals(output["depotFile2"], file2) self.assertEquals(output["status"], "content") file1 = "//depot/main/jam/jamgram.h" self.exception_level = 2 output = self.p4.run_sync(file1) output = self.p4.run_edit(file1) output = self.p4.run("diff", file1) self.edit_file(file1) output = self.p4.run("diff", file1) self.assert_(re.search(file1, output[0], re.IGNORECASE)) self.assert_(len(output[1:]) > 0) # Exceptions class exceptions(P4Python_base): def setUp(self): super(exceptions, self).setUp() def runTest(self): "Test exceptions being generated for errors and warnings" try: output = self.p4.run("info") except p4.P4Error: log_exception() for e in self.p4.errors: log_message(e) else: fail("expected an error exception") self.p4.connect() try: output = self.p4.run("fred") except p4.P4Error: log_exception() for e in self.p4.errors: log_message(e) else: fail("expected an error exception") self.p4.disconnect() self.p4.exception_level = 0 self.p4.connect() output = self.p4.run("fred") self.assert_(len(self.p4.errors) > 0, "no errors found") for e in self.p4.errors: log_message(e) self.p4.disconnect() self.p4.exception_level = 1 # Default self.p4.connect() # Repeated sync should generate a warning - but we don't expect exception try: output = self.p4.run_sync("//depot/main/jam/jam.c#0") output = self.p4.run_sync("//depot/main/jam/jam.c") output = self.p4.run_sync("//depot/main/jam/jam.c") except p4.P4Error: log_exception() for e in self.p4.errors: log_message("Error:" + e) for e in self.p4.warnings: log_message("Warning:" + e) log_message(e) self.fail("Didn't expect an exception") self.p4.disconnect() self.p4.exception_level = 2 self.p4.connect() # Repeated sync should generate a warning - this time we expect exception try: output = self.p4.run_sync("//depot/main/jam/jam.c#0") output = self.p4.run_sync("//depot/main/jam/jam.c") output = self.p4.run_sync("//depot/main/jam/jam.c") except p4.P4Error: log_exception() for e in self.p4.errors: log_message("Error:" + e) for e in self.p4.warnings: log_message("Warning:" + e) log_message(e) else: self.fail("Expected an exception") # Check for correct exception being raised. try: fred = self.p4.bad_attribute except AttributeError: pass else: self.fail("Didn't get AttributeError") # Check for correct exception being raised. try: output = self.p4.run("info", {'key': 'value'}) except: # print "Unexpected error:", sys.exc_info()[0] pass else: self.fail("Didn't get TypeError") # Check for correct exception being raised. try: fred = self.p4.user = ['user'] except TypeError: pass else: self.fail("Didn't get TypeError") try: self.p4.run("admin", "stop") time.sleep(2) self.p4.disconnect() except p4.P4Error, e: pass else: self.fail("Didn't get P4Error") self.p4.exception_level = 2 # Check for correct exception being raised if we can't connect # Do this test last!! try: self.p4.port = "192.168.255.255:2555" self.p4.connect() except p4.P4Error, e: pass else: self.fail("Didn't get TypeError") # Ensure login works OK class login(P4Python_base): def setUp(self): super(login, self).setUp() def _change_user(self, user): self.p4.disconnect() self.p4.user = user self.p4.connect() def _delete_user(self, new_user): user = self.p4.fetch_user(new_user) self.p4.save_user(user) self._change_user(config.p4_user) self.p4.login(config.p4_password) print self.p4.delete_user("-f", new_user) def runTest(self): "Test login" self.p4.exception_level = 1 self.p4.connect() try: output = self.p4.run("login", "-s")[0] except p4.P4Error, e: if re.match("Password must be set before access can be granted.", str(e)): self.p4.input = self.p4.password output = self.p4.run("passwd") # Alternative forms of login output = self.p4.run("logout")[0] self.assert_(re.search("User \w* logged out.", output)) self.p4.input = "" output = self.p4.login(config.p4_password)[0] self.assert_(re.search("User \w* logged in.", output)) # Check we get an error if no password supplied output = self.p4.run("logout")[0] self.assert_(re.search("User \w* logged out.", output)) self.p4.input = "" try: output = self.p4.login()[0] except p4.P4Error: pass else: self.assert_(False, "Expected exception and didn't get one") self.p4.input = "" output = self.p4.run("login", "-s")[0] if re.match("Perforce password (P4PASSWD) invalid or unset.", output): self.p4.input = config.p4_password output = self.p4.run("login")[0] self.assert_(re.search("User \w* logged in.", output)) self.p4.disconnect() self.p4.parse_forms() self.p4.connect() # Tests to ensure passwd stuff is OK new_user = self.p4.user + "1" self._change_user(new_user) try: output = self.p4.run("login", "-s")[0] except p4.P4Error, e: if re.match("Password must be set before access can be granted.", str(e)): self.p4.input = config.p4_password output = self.p4.run("passwd") self._delete_user(new_user) self._change_user(new_user) try: output = self.p4.run("login", "-s")[0] except p4.P4Error, e: if re.match("Password must be set before access can be granted.", str(e)): output = self.p4.passwd(config.p4_password) self._delete_user(new_user) # RUNNING THE TESTS def tests(): suite = unittest.TestSuite() # Decide which tests to run - note exceptions last as it stops server! tests = [login, normal, specs, exceptions] for t in tests: suite.addTest(t()) return suite if __name__ == "__main__": # Filter out our arguments before handing unknown ones on argv = [] argv.append(sys.argv[0]) try: opts, args = getopt.getopt(sys.argv[1:], "hv", ["help", "verbose"]) except getopt.GetoptError: print "Usage: -v/verbose" sys.exit(2) for opt, arg in opts: if opt in ("-v", "--verbose"): verbose = 1 argv.append("-v") elif opt in ("-h", "--help"): argv.append("-h") # pass it through else: # If unknown pass them on argv.append(opt) argv.append(arg) unittest.main(defaultTest="tests", argv=argv) # # $Id: //depot/projects/p4ofc/main/test/test_p4ofc.py#43 $