# VSStoP4 - Migrate Visual SourceSafe to Perforce # # TEST_vsstop4.PY -- TEST THE scripts # # Robert Cowham, Vaccaperna Systems Ltd # # 1. INTRODUCTION # # This test script tests the vsstop4. # The main method is by running different configurations and comparing the # results with a known good result. import os import sys # Add the nearby "code" directory to the module loading path if os.environ.has_key('VSSTOP4_PATH'): VSSTOP4_path = os.environ['VSSTOP4_PATH'] else: VSSTOP4_path = os.path.join(os.getcwd(), '..') if VSSTOP4_path not in sys.path: sys.path.append(VSSTOP4_path) sys.path.append('.') import copy import imp import p4 import popen2 import re import socket import string import time import types import unittest import shutil import tempfile # http://www.python.org/doc/2.2p1/lib/module-tempfile.html class TestCase(unittest.TestCase): def __call__(self, result=None): if result is None: result = self.defaultTestResult() self.result = result unittest.TestCase.__call__(self, result) def addFailure(self, msg): try: raise AssertionError, msg except AssertionError: self.result.addFailure(self,self._TestCase__exc_info()) hostname = string.lower(string.split(socket.gethostname(), '.')[0]) config_filename = 'config_' + hostname + '.py' if not os.path.exists(config_filename): print "Could not find config file", config_filename config_file = open(config_filename) try: imp.load_source('config', config_filename, config_file) finally: config_file.close() original_configuration = copy.copy(sys.modules['config'].__dict__) import config os.environ['PATH'] = os.environ['PATH'] + ";" + config.ss_path os.environ['SSDIR'] = config.SSDIR # The default temporary file prefix starts with an '@'. But # that would mean that temporary files will look like revision # specifications to Perforce. So use a prefix that's acceptable # to Perforce. tempfile.gettempprefix = lambda: '%d.' % os.getpid() try: os.remove('..\logfile.log') except: pass # We need to log to a log file. The VSSTOP4 log will get redirected to # this file, as will the output of various commands. log_filename = (time.strftime('VSSTOP4.%Y%m%dT%H%M%S.log', time.gmtime(time.time()))) log_filename = os.path.abspath(log_filename) log_file = open(log_filename, "a") def log_message(msg): date = time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(time.time())) log_file.write("%s %s\n" % (date, msg)) log_file.flush() sys.stdout.write("VSSTOP4 test suite, logging to %s\n" % log_filename) sys.stdout.flush() # Perforce # # This class supplies the restart_perforce method # It also provides "system", which is like os.system, # but captures output, checks for errors, and writes to the log. class Perforce: # Temporary directory for Perforce server and associated files. p4dir = None p4server_version = "2006.1" # Default value # Stop Perforce server # # If there are any Perforce servers running on the magic port, # use p4 admin to stop them. def stop_perforce(self): self.system('p4.exe -p "%s" -u "%s" -P "%s" admin stop' % (config.p4_port, config.p4_user, config.p4_password), ignore_failure = 1) # Start a new Perforce server # Make a completely fresh Perforce server, with a new repository. def start_perforce(self, server_ver = None): if server_ver: self.p4server_version = server_ver # Make a new repository directory. self.p4dir = tempfile.mktemp() os.mkdir(self.p4dir) log_message("Perforce repository directory %s." % self.p4dir) log_message("Perforce server version %s." % self.p4server_version) # Copy the license if config.p4_license_file: shutil.copyfile(config.p4_license_file, os.path.join(self.p4dir, 'license')) # Work out Perforce's port number and start a Perforce server. match = re.match(".*:([0-9]+)$", config.p4_port) if match: port = int(match.group(1)) else: port = 1999 # p4d on Windows doesn't detach, to we can't use "system" # to start it. import win32api win32api.WinExec("%s -p 0.0.0.0:%d -r %s -L p4d.log" % (config.p4_server_executable[self.p4server_version], port, self.p4dir)) time.sleep(2) # Restart Perforce # # By killing the old server and starting a new one. def restart_perforce(self, server_ver = None): self.stop_perforce() self.start_perforce(server_ver) # Run command and check results # # Calls an external program, raising an exception upon any error # and returning standard output and standard error from the program, # as well as writing them to the log. def system(self, command, ignore_failure = 0, input_lines = None): log_file.write('Executing %s\n' % repr(command)) (child_stdin, child_stdout) = os.popen4(command) if input_lines: child_stdin.writelines(input_lines) child_stdin.close() output = child_stdout.read() result = child_stdout.close() log_file.write(output) if not ignore_failure and result: message = ('Command "%s" failed with result code %d.' % (command, result)) log_file.write(message + '\n') raise message return output # Create a new param file def write_param_file(self): ini_filename = (time.strftime('config.ini.%Y%m%dT%H%M%S', time.gmtime(time.time()))) os.environ['VSSTOP4_CONFIG'] = ini_filename param_file = open("..\%s" % ini_filename, "w") param_file.write("# This is a test version of config file\n# This is not the real config.ini!!\n\n") for k in config.convert_params.keys(): param_file.write("%s: %s\n" % (k, config.convert_params[k])) param_file.close() # VSSTOP4 TEST CASE BASE CLASS # # The VSSTOP4_base class is a generic VSSTOP4 test case. It defines methods # for setting up the integration. # # Other VSSTOP4 test cases will inherit from VSSTOP4_base. # # When a class implements several test cases, the methods that implement # test cases (in the PyUnit sense) should have names starting "test_". # When a class implements a single test case, the method should be # called "runTest". class VSSTOP4_base(TestCase): # Set up everything so that test cases can run # # Get a fresh configuration, a new defect tracker and a new Perforce # server. p4d = Perforce() def setup_everything(self, server_ver = None, config_changes = {}): for key, value in config_changes.items(): setattr(config, key, value) self.p4d.restart_perforce(server_ver) self.p4d.write_param_file() self.config = config # Get a temporary Perforce interface suitable for setting the # Perforce password. if config.p4_password: p4i = p4.P4() p4i.port = config.p4_port p4i.user = config.p4_user p4i.connect() # Get a permanent Perforce interface using the password. self.p4 = p4.P4() self.p4.port = config.p4_port self.p4.user = config.p4_user self.p4.connect() def write_to_file(self, fname, results): temp_file = open(fname, "w") for l in results: temp_file.write(l) temp_file.write("\n") temp_file.close() def compare_results(self, good_file, results): temp_fname = "_" + good_file self.write_to_file(temp_fname, results) result = self.p4d.system("fc %s %s" % (good_file, temp_fname)) if not re.compile("FC: no differences encountered").search(result): self.fail("Expected no differences comparing %s and %s." % (good_file, temp_fname)) def check_result(self, file, command): good_file = file + ".txt" results = self.p4.run(string.split(command, ' ')) self.compare_results(good_file, results) def check_labels(self): p4i = p4.P4() p4i.port = self.config.p4_port p4i.user = self.config.p4_user p4i.tagged() p4i.connect() labels = p4i.run('labels') results = [] for l in labels: results.append('Label: ' + l['label'] + '\n') results.append(l['Description'] + '\n') contents = p4i.run(string.split('files //...@' + l['label'], ' ')) results.append('Files:\n') for f in contents: results.append(f['depotFile'] + '#' + f['rev']) self.compare_results("labels.txt", results) def check_consistency(self): self.check_result('changes', 'changes -l') self.check_result('verify', 'verify //...') self.check_labels() def run_converter(self): self.p4d.system("cd .. && perl convert.pl") # NORMAL OPERATION class normal(VSSTOP4_base): def setUp(self): self.setup_everything() def runTest(self): "Check output of p4 verify" self.run_converter() self.check_consistency() # Server Compatibility - same as normal test just uses different server versions class server_versions(VSSTOP4_base): def setUp(self): pass def runTest(self): "Test all server version (except current one)" for ver in ["2005.2", "2005.1", "2004.2", "2003.2"]: # for ver in ["2005.2", "2005.1"]: self.setup_everything(ver) self.run_converter() self.check_consistency() # TEST CASE: Use Ole Automation class ole_auto(VSSTOP4_base): def setUp(self): config.convert_params['vss_use_ole'] = 'yes' self.setup_everything() def runTest(self): "Use OLE Automation" self.run_converter() self.check_consistency() # TEST CASE: Use vsscorrupt class vss_corrupt(VSSTOP4_base): def setUp(self): config.convert_params['vsscorrupt'] = 'yes' self.setup_everything() def runTest(self): "Use VSS Corrupt" self.run_converter() self.check_consistency() # RUNNING THE TESTS def tests(): if len(sys.argv) == 1: suite = unittest.TestSuite() # Decide which tests to run tests = [normal, ole_auto, vss_corrupt, server_versions] for t in tests: suite.addTest(t()) return suite else: # Unfortunately the following doesn't work with pdb, but is OK otherwise loader = unittest.defaultTestLoader module = __import__('__main__') suite = loader.loadTestsFromName(sys.argv[1], module) return suite def main(): runner = unittest.TextTestRunner(verbosity=2) runner.run(tests()) if __name__ == "__main__": main() # # $Id: //depot/projects/p4ofc/main/test/test_p4ofc.py#43 $