# Perforce Defect Tracking Integration Project # # # TEST_P4DTI.PY -- TEST THE P4DTI # # Gareth Rees, Ravenbrook Limited, 2001-03-14 # # # 1. INTRODUCTION # # This test script tests the P4DTI. The initial plan is described in # [GDR 2000-12-31] and some of the design is given in [GDR 2001-03-14]. # # This script contains a lot of tricky code (for example, using the # hostname or operating system name to pick a module to import or a # class to use; or overriding a method in another module in order to # snoop on its behaviour). Please take care when editing it. # # It uses the PyUnit unit test framework [PyUnit]. # # The intended readership is project developers. # # This document is not confidential. # # The test cases depend on features of the test databases. It is # important that the cases be kept in sync with the test database and # with the design of those databases [GDR 2001-03-14]. When you add a # test, be sure to change the design and refer back to the test. # # # 1.1. Useful info # # The script creates a series of test Perforce servers (one for each # test case). When a test case starts, it kills all the running # Perforce servers on the required port. Each of these has its own # directory, in the appropriate place for temporary files for your # system (typically /tmp on Unix, C:\Temp on Windows). The P4DTI log is # diverted from its real place and sent to the file p4dti.log. # # # 1.2. Regression tests in this script # # The section * means that the defect is tested throughout as a simple # consequence of running the script; there is no particular test for it. # # Job Section Title # ---------------------------------------------------------------------- import os import sys # Add the nearby "code" directory to the module loading path # to get the P4DTI modules. If P4DTI_PATH is set then add # that instead, so that other versions of P4DTI can be tested. if os.environ.has_key('P4DTI_PATH'): p4dti_path = os.environ['P4DTI_PATH'] else: p4dti_path = os.path.join(os.getcwd(), os.pardir, 'code', 'replicator') if p4dti_path not in sys.path: sys.path.insert(0, p4dti_path) import copy import sgmllib import imp import logger import message import p4i import p4dti_unittest import popen2 import re import socket import string import time import types import unittest import urllib import random import shutil import tempfile # http://www.python.org/doc/2.2p1/lib/module-tempfile.html import inspect # The default temporary file prefix starts with an '@'. But # that would mean that temporary files will look like revision # specifications to Perforce. So use a prefix that's acceptable # to Perforce. # [Why does that matter? RB 2002-10-29] tempfile.gettempprefix = lambda: '%d.' % os.getpid() # 2. CONFIGURATION # # # 2.1. Limitations # # This script can only support one basic configuration on each host (see # section 2.2). Tests of other configurations have to be made by # changing configuration parameters in the script. # section 2.2). Tests of other configurations have to be made by # changing configuration parameters in the script. # # This script can only test one defect tracker on each host (you have # to change dt_name in config_HOSTNAME.py to test another defect # tracker). # # This script can only have one current invocation on each host. # # Could it use an environment variable to work out which configuration # to use? # # # 2.2. Using a test configuration # # I want to be able to test different configurations, changes to # configurations, and to check whether erroneous configurations are # spotted correctly. # # I expect to find a working configuration either (1) in the file # specified by the environment variable P4DTI_CONFIG, if set, or (2) in # config_HOSTNAME.py, where HOSTNAME is the first component of the FQDN, # converted to lower case (e.g. 'swan' for 'swan.ravenbrook.com'). # # This configuration file is an ordinary P4DTI configuration file, # except that it must include two additional configuration parameters: # p4_license_file specifies the location of a Perforce license file # suitable for use on the machine running the tests; # p4_server_executable specifies the location of a suitable Perforce # server executable. # # The loaded configuration module is copied when a test is started and # the copy is installed in sys.modules['config']. This has two effects: # # 1. The P4DTI itself won't load the config module; see the # initialization code [RB 2000-12-08]. # # 2. The unit tests can make changes to the copy of the configuration # (e.g., to specify incorrect values, as in section 6) without # affecting the original, which can be restored for the next test. if os.environ.has_key('P4DTI_CONFIG'): config_filename = os.environ['P4DTI_CONFIG'] else: hostname = string.lower(string.split(socket.gethostname(), '.')[0]) config_filename = 'config_' + hostname + '.py' if not os.path.exists(config_filename): print "Could not find config file", config_filename print "Either create one, or set P4DTI_CONFIG to the name of one." config_file = open(config_filename) try: imp.load_source('config', config_filename, config_file) finally: config_file.close() original_configuration = copy.copy(sys.modules['config'].__dict__) import config # 2.3. Logging # # We need to log to a log file. The P4DTI log will get redirected to # this file, as will the output of various commands. log_filename = (config.log_file or time.strftime('p4dti.%Y%m%dT%H%M%S.log', time.gmtime(time.time()))) log_filename = os.path.abspath(log_filename) log_file = open(log_filename, "a") def log_message(msg): date = time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(time.time())) log_file.write("%s %s\n" % (date, msg)) log_file.flush() def log_fn(): # Logs the immediately calling function from the stack log_message("In function: %s" % inspect.stack()[1][3]) sys.stdout.write("P4DTI test suite, logging to %s\n" % log_filename) sys.stdout.flush() if config.dt_name == 'Tracker': email_suffix = "@vaccaperna.co.uk" else: email_suffix = "@ravenbrook.com" def reset_configuration(): # Delete current config, just in case we added something. for k in sys.modules['config'].__dict__.keys(): if not re.match('^__.*__$', k): del sys.modules['config'].__dict__[k] # Restore old config. for k, v in original_configuration.items(): sys.modules['config'].__dict__[k] = v class p4dti_html_parser(sgmllib.SGMLParser): def attrs(self, attrs_list): attrs = { } for a, v in attrs_list: attrs[a] = v return attrs # 3. DEFECT TRACKER AND OPERATING SYSTEM INTERFACES # # Many of the test cases are generic: they don't depend on a particular # defect tracker or operating system. But they need an interface to the # defect tracker in order to restart it, and to the operating system in # order to start a new Perforce server. # # Each interface class should be called DT_OS_interface (where DT is # config.dt_name and OS is os.name) is and must define these methods: # # restart_defect_tracker() # restart_perforce() # # Note that there are no corresponding "stop" methods. The tests leave # Perforce and the defect tracker running so that failures can be # investigated without all the evidence having disappeared. # 3.1. Perforce mixin # # This class supplies the restart_perforce method for use by # defect tracker interfaces. It is suitable for use on both Posix # and Windows. It also provides "system", which is like os.system, # but captures output, checks for errors, and writes to the log. class Perforce_mixin: # Temporary directory for Perforce server and associated files. p4dir = None # 3.1.1. Stop Perforce server # # If there are any Perforce servers running on the magic port, # use p4 admin to stop them. def stop_perforce(self): self.system('p4.exe -p "%s" -u "%s" -P "%s" admin stop' % (config.p4_port, config.p4_user, config.p4_password), ignore_failure = 1) # 3.1.2. Start a new Perforce server # # Make a completely fresh Perforce server, with a new repository. def start_perforce(self): # Make a new repository directory. self.p4dir = tempfile.mktemp() os.mkdir(self.p4dir) log_message("Perforce repository directory %s." % self.p4dir) # Copy the license if config.p4_license_file: shutil.copyfile(config.p4_license_file, os.path.join(self.p4dir, 'license')) # Work out Perforce's port number and start a Perforce server. match = re.match(".*:([0-9]+)$", config.p4_port) if match: port = int(match.group(1)) else: port = 1666 if os.name == 'nt': # p4d on Windows doesn't detach, to we can't use "system" # to start it. RB 2002-10-28 import win32api win32api.WinExec("%s -p -r %s" % (config.p4_server_executable, port, self.p4dir)) time.sleep(2) else: # For some reason, p4d doesn't detach properly when called # by os.popen4, so we have to use os.system here instead # of self.system. RB 2002-10-29 os.system('%s -d -p -r "%s" >> %s 2>&1' % (config.p4_server_executable, port, self.p4dir, log_filename)) # 3.1.3. Restart Perforce # # By killing the old server and starting a new one. def restart_perforce(self): self.stop_perforce() self.start_perforce() # 3.1.4. Run command and check results # # Calls an external program, raising an exception upon any error # and returning standard output and standard error from the program, # as well as writing them to the log. def system(self, command, ignore_failure = 0, input_lines = None): log_file.write('Executing %s\n' % repr(command)) (child_stdin, child_stdout) = os.popen4(command) if input_lines: child_stdin.writelines(input_lines) child_stdin.close() output = child_stdout.read() result = child_stdout.close() log_file.write(output) if not ignore_failure and result: message = ('Command "%s" failed with result code %d.' % (command, result)) log_file.write(message + '\n') raise message return output # 3.2. Interface to TeamTrack on Windows NT: removed 2003-05-23. # 3.3. Bugzilla interface mixin # # This is the interface to Bugzilla, suitable for use on both # Posix and Windows. # # This class installs and initializes a complete new Bugzilla # every time. # # The command lines used are carefully # designed to work with both /bin/sh and CMD.EXE, so be careful # when you edit it. As far as possible, Python's shutil tools # are used for cross-platform work. class Bugzilla_mixin: # The Bugzilla server. server = None # 3.3.1. Install and configure Bugzilla # # install_bugzilla() creates a new Bugzilla installation by copying # the Bugzilla source code from its location relative to the test # suite into a temporary directory and then carrying out the initial # configuration steps that the Bugzilla administrator would carry # out; see [Barnson 2001-08-29, 3.2.14-15]. def install_bugzilla(self): # Make a temporary directory. config.bugzilla_directory = tempfile.mktemp() log_message("Bugzilla directory %s." % config.bugzilla_directory) # Create Bugzilla database. self.empty_bugzilla_database() # Copy Bugzilla sources. bz_path = os.path.abspath(os.path.join( os.getcwd(), os.pardir, 'code', 'bugzilla-%s' % config.bugzilla_version)) shutil.copytree(bz_path, config.bugzilla_directory) cwd = os.getcwd() try: os.chdir(config.bugzilla_directory) if os.name == 'nt': # Patch Bugzilla to make it suitable for running on # windows, using a pre-cooked patch derived from a # working Bugzilla on Windows installation. patch_path = os.path.abspath(os.path.join(cwd, 'bugzilla-%s-win32-patch' % config.bugzilla_version)) self.system("patch < %s" % patch_path) # Copy processmail to processmail.pl, as this is expected # by the patched Bugzilla. os.rename("processmail", "processmail.pl") # Run checksetup.pl for the first time (no input required). self.system('perl checksetup.pl') # Edit the newly-created localconfig so that db_name is # correct. Also, on Windows, edit the webservergroup # for some mysterious reason not explained in the Bugzilla # documentation. f = open('localconfig', 'r') localconfig = f.read() f.close() localconfig = re.sub('\\$db_name = "bugs"', '$db_name = "%s"' % config.dbms_database, localconfig) if os.name == 'nt': localconfig = (localconfig + "\n" + '$webservergroup = "8";\n') f = open('localconfig', 'w') f.write(localconfig) f.close() # Run checksetup.pl for the second time. This time Bugzilla # prompts us to enter some configuration parameters. We # supply values from the test configuration, as follows. password = config.bugzilla_admin_password + '\n' replies = [ # Enter the e-mail address of the administrator config.bugzilla_admin_user + '\n', # You entered ... Is this correct? [Y/n] '\n', # Enter the real name of the administrator 'Bugzilla administrator\n', # Enter a password for the administrator account password, # Please retype the password to verify password, ] # The checksetup.pl script attempts to disable and enable # input echoing (when receiving a password) by calling stty. # These stty calls fail harmlessly here because # checksetup.pl's input is not a terminal. self.system('perl checksetup.pl', input_lines = replies) finally: os.chdir(cwd) # 3.3.2. Empty the Bugzilla database # # Drops (deletes) and recreates the Bugzilla test database on # the MySQL server. def empty_bugzilla_database(self): db = config.dbms_database user = config.dbms_user self.system('mysqladmin -u "%s" --force drop "%s"' % (user, db), ignore_failure = 1) self.system('mysqladmin -u "%s" create "%s"' % (user, db)) # 3.3.3. Restart Bugzilla # # Install Bugzilla. If the configuration parameter # bugzilla_mysqldump is not None, then drop the existing Bugzilla # database and replace it with one from the MySQL dump file. def restart_defect_tracker(self): self.install_bugzilla() if config.bugzilla_mysqldump == None: return self.empty_bugzilla_database() self.system('mysql -u "%s" "%s" < "%s"' % (config.dbms_user, config.dbms_database, config.bugzilla_mysqldump)) # 3.3.4. Run a Bugzilla CGI script # # run_script(script, params) runs a Bugzilla CGI script as if # invoked by a web server as a result of an HTTP request. It # returns a string containing the output of the script. The script # argument is the name of the script (relative to # config.bugzilla_directory) and the params argument is a dictionary # mapping form parameter name to value. def run_script(self, script, params): data = urllib.urlencode(params) env_additions = { 'REQUEST_METHOD': 'POST', 'REMOTE_ADDR': '', 'CONTENT_TYPE': 'application/x-www-form-urlencoded', 'CONTENT_LENGTH': str(len(data)), } cwd = os.getcwd() try: for k, v in env_additions.items(): os.environ[k] = v os.chdir(config.bugzilla_directory) command = 'perl "%s"' % script log_file.write('Executing Bugzilla script %s\n' % repr(command)) child_out, child_in = popen2.popen2(command) child_in.write(data) log_file.write(' ... with input %s ...' % data) child_in.close() result = child_out.read() log_file.write(' ... and output %s\n' % result) return result finally: for e in env_additions.keys(): del os.environ[e] os.chdir(cwd) # 3.3.5. Get Bugzilla server parameters # # Returns a dictionary mapping Bugzilla parameter name to value. # # It works by running the "editparams.cgi" CGI script and parsing # the output. class editparams_parser(p4dti_html_parser): def __init__(self): self.params = { } self.collecting_params = 0 self.textarea_name = None self.textarea_contents = None sgmllib.SGMLParser.__init__(self) def attrs(self, attrs_list): attrs = { } for a, v in attrs_list: attrs[a] = v return attrs def start_form(self, attrs_list): attrs = self.attrs(attrs_list) if attrs.get('action') == 'doeditparams.cgi': self.collecting_params = 1 def end_form(self): self.collecting_params = 0 def start_input(self, attrs_list): if not self.collecting_params: return attrs = self.attrs(attrs_list) if attrs.get('type') == 'radio' and attrs.get('checked'): self.params[attrs['name']] = attrs['value'] elif attrs.get('type', 'text') == 'text': value = re.sub('&', '&', attrs['value']) self.params[attrs['name']] = value def start_textarea(self, attrs_list): if not self.collecting_params: return attrs = self.attrs(attrs_list) self.textarea_name = attrs['name'] self.textarea_contents = [] def end_textarea(self): if not self.collecting_params: return value = string.join(self.textarea_contents, '') self.params[self.textarea_name] = value self.textarea_name = None self.textarea_contents = None def handle_data(self, data): if self.collecting_params and self.textarea_name: self.textarea_contents.append(data) def server_parameters(self): params = { 'Bugzilla_login': config.bugzilla_admin_user, 'Bugzilla_password': config.bugzilla_admin_password, } result = self.run_script('editparams.cgi', params) parser = self.editparams_parser() parser.feed(result) parser.close() return parser.params def edit_parameters(self, new_params): params = self.server_parameters() params.update({ 'Bugzilla_login': config.bugzilla_admin_user, 'Bugzilla_password': config.bugzilla_admin_password, }) params.update(new_params) self.run_script('doeditparams.cgi', params) # It takes a moment for the new table to become available to # other MySQL connections, so wait a bit. time.sleep(4) # 3.2.8. Run command and check results # # Calls an external program, raising an exception upon any error # and returning standard output and standard error from the program, # as well as writing them to the log. def system(self, command, ignore_failure = 0, input_lines = None): log_file.write('Executing %s\n' % repr(command)) (child_stdin, child_stdout) = os.popen4(command) if input_lines: child_stdin.writelines(input_lines) child_stdin.close() output = child_stdout.read() result = child_stdout.close() log_file.write(output) if not ignore_failure and result: message = ('Command "%s" failed with result code %d.' % (command, result)) log_file.write(message + '\n') raise message return output class Bugzilla_nt_interface(Bugzilla_mixin, Perforce_mixin): pass class Bugzilla_posix_interface(Bugzilla_mixin, Perforce_mixin): pass # 3.3.A Tracker interface mixin # # This is the interface to Tracker, suitable for use on # Windows only. # # This class resets a Tracker database # class Tracker_mixin: # The Tracker server. trk = None log_messages = [] def log(self, msg): log_message(msg) self.log_messages.append(msg) # 3.3.1. Install and configure Tracker # def install_tracker(self): import tracker config.logger = self trk = tracker.tracker(config, config.tracker_user, config.tracker_password, config.tracker_project, config.tracker_server) config.trk = trk import win32api win32api.WriteProfileVal('Tracker', 'Marker', str(0), trk.ini_file()) def empty_tracker_database(self): # Need to setup this as it is used in the call below config.tr_fields = { 'Id': ( 'int' ), } config.trk.delete_issues_before(99999999) # Restart Tracker def restart_defect_tracker(self): self.install_tracker() self.empty_tracker_database() class Tracker_nt_interface(Tracker_mixin, Perforce_mixin): pass # 4. P4DTI TEST CASE BASE CLASS # # The p4dti_base class is a generic P4DTI test case. It defines methods # for setting up the integration and some utilities for recording and # interrogating the output of the replicator. # # Other P4DTI test cases will inherit from p4dti_base. # # When a class implements several test cases, the methods that implement # test cases (in the PyUnit sense) should have names starting "test_". # When a class implements a single test case, the method should be # called "runTest". class p4dti_base(p4dti_unittest.TestCase): # Defect tracker interface (an instance of one of the classes in # section 3 above). dti = eval(config.dt_name + '_' + os.name + '_interface')() # Messages written to the replicator's log. log_messages = [] # Mail messages sent by the replicator. Each message is a triple # (recipients, subject, body); see the definition of mail() method # in the replicator class. mail_messages = [] # The replicator. r = None # 4.1. Snoop on the logger # # I want to be able to test that the correct messages are appearing # in the log, so I override the log method in the # logger.multi_logger class so that it records the messages as well # as printing them to the log file. Printing to a log file is # necessary so that the test engineer can analyze what happened when # a test case fails. # # A disadvantage of this implementation is that the logging code is # not tested. Therefore there needs to be a separate set of logging # tests. def snoop_logger(self): logger.multi_logger.log = self.log logger.file_logger.log = self.log logger.sys_logger.log = self.log def log(self, msg): log_message(msg) self.log_messages.append(msg) # The clear_log method can be used to clear this record of the log # before carrying out a test. def clear_log(self): self.log_messages = [] self.mail_messages = [] # 4.2. Snoop on e-mail def snoop_mail(self): sys.modules['smtplib'] = self def log_separator(self): log_file.write("-" * 72) log_file.write("\n") def SMTP(self, server): self.log_separator() return self def quit(self): self.log_separator() def sendmail(self, from_address, to, text): self.mail_messages.append((to, text)) log_file.write(text) # 4.3. Check that the log is as expected # # These methods make various checks on the messages recorded in the # log. def log_message_ids(self): return map(lambda m: m.id, self.log_messages) def expected_only(self, expected): for m in self.log_messages: assert m.id in expected, \ ("Unexpected message %d (%s) found in log %s" % (m.id, str(m), self.log_message_ids())) def expected_only_in_range(self, min, max, expected): for m in self.log_messages: if m.id >= min and m.id <= max: assert m.id in expected, \ ("Unexpected message %d (%s) found in log %s" % (m.id, str(m), self.log_message_ids())) def expected_not(self, unexpected): for m in self.log_messages: assert m.id not in unexpected, \ ("Unexpected message %d (%s) found in log %s" % (m.id, str(m), self.log_message_ids())) def expected(self, expected): found = {} for m in self.log_messages: if found.has_key(m.id): found[m.id] = found[m.id] + 1 else: found[m.id] = 1 for id in expected: assert found.has_key(id) and found[id] > 0, \ ("Expected message %d not found in log %s" % (id, self.log_message_ids())) found[id] = found[id] - 1 def expectation(self, expected, maybe=[]): self.expected(expected) self.expected_only_in_range(800, 1000, expected + maybe) # 4.4. Set up everything so that test cases can run # # Get a fresh configuration, a new defect tracker and a new Perforce # server. def setup_everything(self, config_changes = {}): reset_configuration() for key, value in config_changes.items(): setattr(config, key, value) self.dti.restart_defect_tracker() self.dti.restart_perforce() self.snoop_logger() self.snoop_mail() # Get a temporary Perforce interface suitable for setting the # Perforce password. if config.p4_password: p4p = p4i.p4(port = config.p4_port, user = config.p4_user, client_executable = config.p4_client_executable, logger = self ) p4p.run('user -o') # Problem with quoting strings in arguments to p4 on # Windows using os.popen (as p4i.run does). NB 2002-10-28. if os.name == 'nt': if ' ' in config.p4_password: raise "space in p4_password on Windows" p4p.run('passwd -P %s' % config.p4_password) else: p4p.run('passwd -O "" -P "%s"' % config.p4_password) # Get a permanent Perforce interface using the password. self.p4 = p4i.p4(port = config.p4_port, user = config.p4_user, password = config.p4_password, client_executable = config.p4_client_executable, logger = self ) # Set the user's email address to the replicator_address user = self.p4.run('user -o')[0] user['Email'] = config.replicator_address self.p4.run('user -i', user) # Create a spec depot depot = self.p4.run('depot -o spec')[0] depot['Type'] = 'spec' self.p4.run('depot -i', depot) # 4.5. Initialize the replicator # # Load the init module. If it's already loaded, reload it: we must # reload it because we want the init module to run again and to pick # up on the new configuration. (Some other modules have the same # properties.) def initialize_replicator(self): log_fn() self.mail_messages = [] for m in ['init']: if sys.modules.has_key(m): del sys.modules[m] import init self.r = init.r # Regression test for job000199. assert self.mail_messages == [] # 4.6. Normal replicator startup # # This is a pseudo test case. It checks that the replicator can # start up normally. Other tests may depend on this having been # run; for example, the check_nothing test (5.1). This test must be # run before any other test cases, and therefore can't be part of a # test suite (the tests of which may be run singly or in any order). # It should therefore be run from the setUp() method of a subclass # of this class (unless of course that subclass has a different idea # of what should happen at startup). def check_startup(self): log_fn() self.initialize_replicator() # Expect to get a startup e-mail self.clear_log() self.r.prepare_to_run() self.expectation([800, 866], [867, 870, 910]) # Expect to set up issues and replicate them, but no jobs or # conflicts. self.poll() if config.dt_name == 'Tracker': # We have cleared DB so don't expect any issues self.expected_only_in_range(800, 1000, [911, 912]) else: self.expectation([803, 804, 812, 911, 912]) # There should be no further activity if we replicate again. self.poll() self.expected_only_in_range(800, 1000, [911, 912]) # 4.7. Exceptional replicator behaviour # # This method calls the given function but expects to get the # exception given by 'error', with message id 'msgid'. def check_exception(self, function, error, msgid): try: function() except: err, msg = sys.exc_info()[0:2] assert err == error, \ ("Expected error %s but got %s." % (error, err)) assert msg.id == msgid, \ ("Expected message %d but got %d (%s)" % (msgid, msg.id, str(msg))) else: self.fail("Expected error %s but didn't get it." % error) # check_startup_error() is a special case of check_exception for the # initialize_replicator method. def check_startup_error(self, error, msgid): self.check_exception(self.initialize_replicator, error, msgid) # 4.8. Check consistency # # This method checks that the databases are consistent. def check_consistency(self): log_fn() self.clear_log() self.r.check_consistency() self.expectation([871, 885], [883, 884, 890]) # 4.9. Check replication of a single issue def check_replication_dt_to_p4(self, first_time = 0): log_fn() self.poll() if first_time: self.expected([803]) # set up for replication self.expectation([804, 812, 911, 912], [803, 813]) # Nothing should come back from Perforce. self.poll() if config.dt_name == 'Tracker': # We expect a dummy replicate with no fields done self.expected_only_in_range(800, 1000, [804, 813, 911, 912]) else: self.expected_only_in_range(800, 1000, [911, 912]) # 4.10. Initialize Perforce repository # # This method sets up Perforce clients and workspaces for a set of # users, and adds a file to the repository. It sets up the # following members: # # p4i: Map from user to a Perforce interface for that user. # workspace: Map from user to the client workspace for that user. # # This is only needed for tests involving Perforce fixes: that's why # it's not called in setup_everything(). def setup_perforce(self, users): log_fn() # Create Perforce interfaces and workspaces for the dummy users. self.p4p = {} self.workspace = {} for user in users: self.workspace[user] = os.path.join(self.dti.p4dir, user) os.mkdir(self.workspace[user]) os.mkdir(os.path.join(self.workspace[user], 'depot')) self.p4p[user] = p4i.p4( port = config.p4_port, user = user, client = user + '_' + socket.gethostname(), client_executable = config.p4_client_executable, logger = self, ) # Make the Perforce user record. p4_user = self.p4p[user].run('user -o')[0] p4_user['Email'] = ('%s%s' % (user,email_suffix)) self.p4p[user].run('user -i', p4_user) # Make a Perforce client. client = self.p4p[user].run('client -o')[0] client['Root'] = self.workspace[user] self.p4p[user].run('client -i', client) # Add a file to the repository so that we have something with # which to make changes and fixes. user = users[0] filename = os.path.join(self.workspace[user], 'depot', 'test') open(filename, 'w') self.p4p[user].run('add -t ktext %s' % filename) change = self.p4p[user].run('change -o')[0] change['Description'] = 'Added test file' self.p4p[user].run('submit -i', change) # 4.11. Make changelist in Perforce # # This method makes a changelist in Perforce and edits a file in # that changelist. The changelist is returned without being # submitted. def edit_file(self, user): log_fn() change = self.p4p[user].run('change -o')[0] change['Description'] = 'Edited test file' result = self.p4p[user].run('change -i', change)[0] changelist = string.split(result)[1] filename = os.path.join(self.workspace[user], 'depot', 'test') # Perforce reports "files up to date" as an error, so ignore it. try: self.p4p[user].run('sync %s' % filename) except p4.error: pass self.p4p[user].run('edit -c %s %s' % (changelist, filename)) f = open(filename, 'a') f.write("Foo\n") f.close() return self.p4p[user].run('change -o %s' % changelist)[0] # 4.12 Variant tests # # This class has variant methods for each defect tracker (e.g., # Bugzilla) and calls the appropriate one. def run_variant(self): try: test = getattr(self, config.dt_name) except AttributeError: assert 0, "No test variant for " + config.dt_name + "." test() # 4.13. Poll the databases, expecting no errors expected_notices = [607] def poll(self): log_fn() self.clear_log() self.r.carefully_poll_databases() for m in self.log_messages: if (isinstance(m, message.message) and m.id not in self.expected_notices): assert m.priority >= message.INFO, \ ("Expected no errors, but message '%s' has " "priority %d." % (str(m), m.priority)) # 4.14. Create a job # # Fill in fields. Return the name of the job. def create_job(self, p4p, job): for k, v in job.items(): if string.find(v, '') == 0: job[k] = 'foo' result = p4p.run('job -i', job) return string.split(result[0])[1] # 5. TEST CASES: NORMAL OPERATION # # If nothing has changed, then nothing happens when the replicator # polls. The databases are consistent. class normal(p4dti_base): def setUp(self): self.setup_everything() self.check_startup() def runTest(self): "Startup, replication to Perforce (test_p4dti.normal)" self.check_consistency() # 6. TEST CASES: INCORRECT CONFIGURATIONS # # This is a regression test of job000037, job000075 and job000116. class bogus(p4dti_base): def setUp(self): self.setup_everything() # 6.1. An incorrect parameter generates an error # # This is a utility function for carrying out a range of tests. It # resets the configuration, sets the parameter named by 'param' to # value, then tries to start the replicator. It expects to get an # exception, whose message should have the message id 'msgid'. def check_param(self, param, value, msgid): reset_configuration() setattr(config, param, value) try: self.initialize_replicator() except: err, msg = sys.exc_info()[0:2] if isinstance(msg, message.message): if msg.id != msgid: self.addFailure("Set parameter %s to '%s': " "expected message %d but got %d " "(%s)" % (param, value, msgid, msg.id, str(msg))) else: self.addFailure("Set parameter %s to '%s': expected " "message %d but got '%s: %s' instead." % (param, value, msgid, err, msg)) else: self.addFailure("Set parameter %s to '%s': expected " "message %d but didn't get it." % (param, value, msgid)) # 6.2. Basic errors in parameters are caught quickly # # Basic errors in parameters (wrong type, wrong format) should be # caught quickly. # # This is a table of (parameter name, bogus value, message id of # expected error). bogus_basic_parameters = [ # Regression for job000170: ('administrator_address', 'invalid e-mail address', 202), ('changelist_url', -1, 207), ('changelist_url', "http://invalid/%d/%s", 210), ('changelist_url', "http://invalid/no/format/specifier", 210), ('changelist_url', "http://invalid/%d/%%/%%%", 210), ('closed_state', -1, 208), ('configure_name', -1, 207), ('job_url', 42, 207), ('job_url', "http://invalid/%d/%s", 211), ('job_url', "http://invalid/no/format/specifier", 211), ('job_url', "http://invalid/trailing/percent/%d/%%/%%%", 211), ('log_file', -1, 208), ('log_level', 'not an int', 204), ('migrate_p', 'not a function', 203), ('p4_client_executable', -1, 207), # Regression test for job000158: ('p4_client_executable', 'no such file', 705), ('p4_user', None, 207), ('p4_password', -1, 207), # ('p4_password', 'incorrectpassword', 706), ('p4_port', None, 207), # Regression test for job000158, job000202: ('p4_port', '', 707), ('p4_server_description', -1, 207), ('poll_period', 'not an int', 204), ('prepare_issue', 'not a function', 203), ('replicator_address', 'invalid@e-mail@address', 202), ('replicate_p', 'not a function', 203), ('replicate_job_p', 'not a function', 203), ('replicated_fields', 'not a list', 205), ('replicated_fields', ['not', 'list', 'of', 'strings', 0], 206), ('replicated_fields', ['not', ('list', 'of'), 'strings', 0], 206), ('rid', -1, 207), ('rid', '0abc', 209), ('rid', 'ab-c', 209), ('sid', -1, 207), ('sid', 'abcdefg+z', 209), ('smtp_server', -1, 207), ('start_date', '2001-02-03 24-00-00', 201), ('use_deleted_selections', 'neither 0 nor 1', 200), ('use_deleted_selections', -1, 200), ('use_deleted_selections', 2, 200), ('use_perforce_jobnames', 'neither 0 nor 1', 200), ('use_perforce_jobnames', -1, 200), ('use_perforce_jobnames', 2, 200), ('use_stdout_log', 'neither 0 nor 1', 200), ('use_stdout_log', -1, 200), ('use_stdout_log', 2, 200), ] def test_basic_parameters(self): for param, value, msgid in self.bogus_basic_parameters: self.check_param(param, value, msgid) # 6.3. Basic errors in DT parameters are caught quickly # # As 6.2, but picks a set of tests based on config.dt_name. bogus_Bugzilla_parameters = [ ('bugzilla_directory', 'not a directory', 303), ('bugzilla_directory', '/', 304), ('closed_state', 'not a Bugzilla state', 301), ('dbms_database', -1, 207), ('dbms_host', -1, 207), # Regression for job000168: ('dbms_port', '1234', 204), ('dbms_user', -1, 207), ('dbms_password', -1, 207), ('migrated_user_password', -1, 207), ('replicated_fields', 'not a list', 205), ('replicated_fields', ['not', 'list', 'of', 'strings', 0], 206), ('replicated_fields', ['bug_status'], 311), ('replicated_fields', ['assigned_to'], 311), ('replicated_fields', ['short_desc'], 311), ('replicated_fields', ['resolution'], 311), ('replicated_fields', ['longdesc', 'longdesc'], 312), ('replicated_fields', ['not a Bugzilla field'], 307), ] bogus_Tracker_parameters = [ ('tracker_user', -1, 207), ('tracker_password', -1, 207), ('tracker_project', -1, 207), ('tracker_server', -1, 207), ] def test_dt_parameters(self): dt_params = getattr(self, 'bogus_%s_parameters' % config.dt_name) for param, value, msgid in dt_params: self.check_param(param, value, msgid) # 6.4. Parameter errors are caught by the defect tracker # # Like 6.2 and 6.3 this test sets a parameter to an incorrect value. # In this case the error is caught by the defect tracker, so a # message object isn't returned, but rather a string, which we must # test directly rather than by message id. mysql_errors = [ '_mysql.OperationalError', '_mysql_exceptions.OperationalError', ] erroneous_Bugzilla_parameters = [ ('dt_name', 'not a defect tracker', 'exceptions.ImportError', 'No module named configure_not a defect tracker'), ('dbms_host', 'host.invalid', mysql_errors, '(2005, "Unknown MySQL Server Host'), ('dbms_database', 'invalid', mysql_errors, ['(1049, "Unknown database \'invalid\'")', '(1044, "Access denied for user:' ]), ('dbms_password', 'not the Bugzilla password', mysql_errors, ['(1045, "Access denied for user:', '(1044, "Access denied for user:']), ('dbms_user', 'not the Bugzilla user', mysql_errors, ['(1045, "Access denied for user:', '(1044, "Access denied for user:']), ] erroneous_Tracker_parameters = [ ] def test_dt_errors(self): params = getattr(self, 'erroneous_%s_parameters' % config.dt_name) for param, value, errors, message_texts in params: reset_configuration() setattr(config, param, value) try: self.initialize_replicator() except: (err, msg, _) = sys.exc_info() if not isinstance(errors, types.ListType): errors = [errors] if str(err) not in errors: self.addFailure("Set parameter %s to '%s': " "expected error in %s but got " "error '%s'." % (param, value, errors, str(err))) if isinstance(message_texts, types.ListType): texts = message_texts else: texts = [message_texts] found = 0 for text in texts: if str(msg)[0:len(text)] == text: found = 1 break if not found: self.addFailure("Set parameter %s to '%s': " "expected error message in %s but " "got '%s'." % (param, value, texts, msg)) else: self.addFailure("Set parameter %s to %s: expected " "error in %s but there was no error." % (param, value, errors)) # 6.5. OS-specific parameter tests # # As 6.3, but picks a set of tests based on os.name. bogus_nt_parameters = [ ('use_windows_event_log', -1, 200), ('use_windows_event_log', 2, 200), ('use_windows_event_log', 'neither 0 nor 1', 200), ] bogus_posix_parameters = [ # By using fake Perforce client executables we can check that # unsupported client and server versions are detected. # Regression test for job000173. ('p4_client_executable', './fake_p4.py', 704), ('p4_client_executable', './fake_p4d_old_changelevel.py', 834), ('p4_client_executable', './fake_p4d_no_changelevel.py', 835), ] def test_os_parameters(self): os_params = getattr(self, 'bogus_%s_parameters' % os.name) for param, value, msgid in os_params: self.check_param(param, value, msgid) def runTest(self): "Illegal configuration parameters (test_p4dti.bogus)" self.test_basic_parameters() self.test_dt_parameters() self.test_dt_errors() self.test_os_parameters() # 7. TEST CASE: EXISTING JOB IN PERFORCE # # The replicator should refuse to start if there's a job in Perforce. # # This is a regression test for job000219 and job000240. class existing(p4dti_base): def setUp(self): self.setup_everything() self.initialize_replicator() def runTest(self): "Startup with an existing job (test_p4dti.existing)" j = self.p4.run('job -o')[0] j['Description'] = 'Test job' self.p4.run('job -i', j) self.check_exception(self.r.poll, self.r.error, 914) # 8. TEST CASE: MOVING THE START DATE # # # 8.1. Moving the start date backwards in time # # When start_date is set to the current time, no issues should be # replicated when the replicator starts. Similarly, refreshing Perforce # has no effect. But you can set start_date back in time and refresh # Perforce, this time with effect. # # This is a regression test for job000047, job000050, job000221. class start_1(p4dti_base): def setUp(self): self.setup_everything() def runTest(self): "Moving the start_date backwards in time (test_p4dti.start_1)" reset_configuration() config.start_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) self.initialize_replicator() # When we poll, nothing should happen. self.clear_log() self.r.poll() self.expected_only_in_range(800, 1000, [911, 912]) # Nor when we refresh. self.clear_log() self.r.refresh_perforce_jobs() self.expected_only_in_range(800, 1000, [911, 912]) # The databases should report consistent. self.check_consistency() # Now set start date back in time and try refreshing again. reset_configuration() config.start_date = "1971-01-01 00:00:00" self.initialize_replicator() self.clear_log() self.r.refresh_perforce_jobs() self.expectation([803, 804, 812]) # The databases should still report consistent. self.check_consistency() # 8.2. Moving the start date forwards in time # # Start up with an old start date as normal, then move the start date # forwards in time. The databases should still report consistent, # because issues that were recorded as being replicated in the first # poll should still be recorded as replicated, even though they haven't # changed since the start date. # # This is a regression test for job000340. class start_2(p4dti_base): def setUp(self): self.setup_everything() self.check_startup() def runTest(self): "Moving the start_date forwards in time (test_p4dti.start_2)" # Set start date forward in time and check the consistency. reset_configuration() config.start_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) self.initialize_replicator() # When we poll, nothing should happen. self.poll() self.expected_only_in_range(800, 1000, [911, 912]) # The databases should report consistent. self.check_consistency() # 9. TEST CASE: REPLICATING BY PROJECT # # This is a regression test for job000107, job000112, job000311. class project(p4dti_base): def setUp(self): self.setup_everything() def runTest(self): "Replicate by project (test_p4dti.project)" self.run_variant() def Bugzilla(self): config.replicated_fields = ['product'] config.replicate_p = lambda self: self['product'] == 'product 1' self.check_startup() jobs = self.p4.run('jobs') for j in jobs: assert j['Product'] == 'product 1\n', \ ("Job %s has product %s; shouldn't be replicated." % (j['Job'], j['Product'])) self.check_consistency() # 10. ISSUE LIFE CYCLE TEST CASES # # This test creates issues and takes them through various kinds of # lifecycle, checking that they are replicated correctly at each step. class lifecycle(p4dti_base): user = 'rb' user1 = 'gdr' if config.dt_name == 'Tracker': user = 'fred' user1 = 'Admin' def setUp(self): self.setup_everything() self.setup_perforce([self.user, self.user1]) self.check_startup() # Submit a new issue to the defect tracker. Run a replication # cycle; check that the issue gets replicated to perforce. Return a # pair of the defect tracker issue and the Perforce job. def submit(self, user): id = getattr(self, config.dt_name + '_submit')(user) # It gets replicated to Perforce. This is a regression test for # job000233. self.check_replication_dt_to_p4(first_time = 1) # Check that the job has been created in Perforce. issue = self.r.dt.issue(id) assert issue jobname = issue.corresponding_id() job = self.p4.run('job -o %s' % jobname)[0] # Defect-tracker specific checks. getattr(self, config.dt_name + '_submitted')(issue, job) return issue, job # Assign the issue to the user in the defect tracker. Run a # replication cycle; check that the assignment gets replicated. # Return the updated defect trakcer issue and Perforce job. def assign(self, issue, job, user): getattr(self, config.dt_name + '_assign')(issue, job, user) # It gets replicated to Perforce. self.check_replication_dt_to_p4() # Defect-tracker specific checks. issue = self.r.dt.issue(issue.id()) job = self.p4.run('job -o %s' % job['Job'])[0] getattr(self, config.dt_name + '_assigned')(issue, job) return issue, job # The job has been closed in Perforce. Check that the closure is # replicated to the defect tracker and that the expected messages # appear. def close(self, issue, job, expected): self.poll() self.expectation(expected + [911, 912]) issue1 = self.r.dt.issue(issue.id()) job1 = self.p4.run('job -o %s' % job['Job'])[0] getattr(self, config.dt_name + '_closed')(issue, job, issue1, job1) def Bugzilla_submit(self, user): fields = { 'reporter': 'nb@ravenbrook.com', 'product': 'product 1', 'version': 'unspecified', 'component': 'component 1.1', 'rep_platform': 'All', 'op_sys': 'All', 'priority': 'P1', 'bug_severity': 'critical', 'assigned_to': '', 'cc': '', 'bug_file_loc': '', 'short_desc': 'Life cycle # test bug', 'comment': 'Life cycle # test long description.', 'submit': ' Commit ', 'form_name': 'enter_bug', 'Bugzilla_login': config.bugzilla_admin_user, 'Bugzilla_password': config.bugzilla_admin_password, } result = self.dti.run_script('post_bug.cgi', fields) match = re.search('

Bug ([0-9]+) posted

', result) if match: bugid = match.group(1) else: self.fail("Tried to submit a bug to Bugzilla, but got the " "following in reply: %s." % result) # Sleep for a second so that we can be sure to pick this # up on the next poll. time.sleep(1) return bugid def Bugzilla_submitted(self, bug, job): for field, expected in [ ('Status', 'bugzilla_new'), ('Summary', 'Life cycle # test bug\n'), ('Priority', 'P1'), ('Severity', 'critical'), ]: assert job[field] == expected, \ ("Expected new job %s to have %s '%s', but found " "'%s'." % (job['Job'], field, expected, job[field])) def Bugzilla_assign(self, bug, job, user): # Assign the issue in Bugzilla. Ideally this should go through # the Bugzilla web interface, but that's horrible. So cheat for # now by going through the very low-level Bugzilla interface. # (This is horrible too!) bz_user = self.r.config.user_translator.translate_p4_to_dt( user, self.r.dt, self.r.dt_p4) changes = {'bug_status': 'ASSIGNED', 'assigned_to': bz_user} bug_id = bug['bug_id'] self.r.dt.bugzilla.update_row('bugs', changes, 'bug_id = %d' % bug_id) for k, v in changes.items(): activity = { 'bug_id': bug_id, 'who': bz_user, 'bug_when': self.r.dt.bugzilla.now(), 'fieldid': self.r.dt.bugzilla.fieldid(k), 'removed': str(bug[k]), 'added': str(v), } self.r.dt.bugzilla.insert_row('bugs_activity', activity) # Wait two seconds so that the bug will be picked up in the next # poll, not the one after. time.sleep(2) def Bugzilla_assigned(self, bug, job): # The status should now be assigned. job = self.p4.run('job -o %s' % job['Job'])[0] assert job[self.r.config.job_status_field] == 'assigned', \ ("Expected assigned job %(Job)s to have status " "'assigned' but it has state %(Status)s." % job) def Bugzilla_closed(self, bug, job, bug1, job1): pass def Tracker_submit(self, user): fields = { 'Title': 'Auto-created bug', 'Description': 'Auto-created description', 'Severity': '3-Medium', 'Owner': 'Admin', } bugid = self.r.dt.tracker.create_bug(fields, user) # Sleep for a second so that we can be sure to pick this # up on the next poll. time.sleep(1) return bugid def Tracker_submitted(self, bug, job): for field, expected in [ (self.r.config.job_status_field, 'Open'), ('Severity', 'Medium'), ]: assert job[field] == expected, \ ("Expected new job %s to have %s '%s', but found " "'%s'." % (job['Job'], field, expected, job[field])) def Tracker_assign(self, bug, job, user): # Assign the issue in Tracker. tr_user = self.r.config.user_translator.translate_p4_to_dt( user, self.r.dt, self.r.dt_p4) changes = {'Owner': tr_user} self.r.dt.tracker.update_bug(changes, bug, user) # Wait two seconds so that the bug will be picked up in the next # poll, not the one after. time.sleep(2) def runTest(self): "Issue life cycle (test_p4dti.lifecycle)" # 10.1. Simple issue lifecycle # # This is a simple issue cycle: # # 1. An issue is submitted to the defect tracker. # 2. It is replicated to Perforce. # 3. It gets assigned to a developer. # 4. It is closed in Perforce (by editing the job). # 5. The closure gets replicated back to the defect tracker. issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) # Close the job in Perforce. This is a regression test for # job000118. job[self.r.config.job_status_field] = 'closed' self.p4.run('job -i', job) self.close(issue, job, [805, 824, 826]) # 10.2. Issue lifecycle (fix in Perforce) # # This is an issue cycle in which the issue is associated with a # changelist in Perforce: # # 1. An issue is submitted to the defect tracker. # 2. It is replicated to Perforce. # 3. It gets assigned to a developer. # 4. It is closed in Perforce (by making a fix). # 5. Closure and fix get replicated back to the defect # tracker. # # This is a regression test for job000133. issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) self.p4.run('fix -c 1 %s' % job['Job']) self.close(issue, job, [802, 805, 819, 820, 824, 826]) # 10.3. Issue lifecycle (fix on submission in Perforce) # # This tests an issue lifecycle in which the issue is associated # with a pending changelist and closed on submission. # # 1. An issue is submitted to the defect tracker. # 2. It is replicated to Perforce. # 3. It gets assigned to a developer. # 4. The job description is edited in Perforce (this is a # regression test for job000362). # 5. A fix is made with a pending changelist. # 6. The change is submitted. # 7. Job, fix get replicated back to the defect tracker. # # This is a regression test for job000225. issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) job['Description'] = job['Description'] + '\nEdited.' self.p4p[self.user].run('job -i', job) self.poll() self.expectation([805, 824, 911, 912], [826]) change = self.edit_file(self.user) self.p4p[self.user].run('fix -c %s %s' % (change['Change'], job['Job'])) self.p4p[self.user].run('submit -c %s' % change['Change']) self.close(issue, job, [802, 805, 819, 820, 824, 826]) # Delete fix in Perforce and check that the deletion is # replicated (regression test for job000013 and job000222). self.p4p[self.user].run('fix -d -c %s %s' % (change['Change'], job['Job'])) self.poll() self.expected([818]) fixes = self.p4.run('fixes -j %s' % job['Job']) assert len(fixes) == 0, ("Expected no fixes for %s, but found " "%s." % (job['Job'], fixes)) # 10.4. Issue lifecycle (fix to assigned on submission in # Perforce) # # As section 10.3, but the fix is to the job's current state # rather than "closed", so the job state doesn't change. # Regression test for job000007. issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) change = self.edit_file(self.user) status = job[self.r.config.job_status_field] self.p4p[self.user].run('fix -s %s -c %s %s' % (status, change['Change'], job['Job'])) self.p4p[self.user].run('submit -c %s' % change['Change']) self.poll() self.expectation([802, 805, 819, 820, 825, 911, 912]) # Change fix status (only -- note that we set the job status # back); check that it's replicated. self.p4p[self.user].run('fix -s closed -c %s %s' % (change['Change'], job['Job'])) self.p4p[self.user].run('job -i', job) self.poll() self.expectation([805, 819, 821, 825, 911, 912]) # 10.5. Simultaneous edit # # Provoke a conflict by changing an issue simultaneously in both # systems. Then try again, but with a user other than the job # owner. (Check that mail goes to both.) # # Make, update and delete a fix in Perforce simultaneously with # the change. Make sure the fixes are respectively deleted, # updated and restored. Ditto for filespecs. change1 = self.edit_file(self.user) # make change2 = self.edit_file(self.user) # update change3 = self.edit_file(self.user) # delete self.poll() for u in [self.user, self.user1]: issue, job = self.submit(self.user) status = job[self.r.config.job_status_field] # Make the fixes that we are going to update and delete. self.p4p[self.user].run('fix -s %s -c %s %s' % (status, change2['Change'], job['Job'])) self.p4p[self.user].run('fix -s %s -c %s %s' % (status, change3['Change'], job['Job'])) job['P4DTI-filespecs'] = 'filespec_1\n' self.p4p[self.user].run('job -i', job) self.poll() # Now edit the job simultaneously in DT and Perforce. getattr(self, config.dt_name + '_assign')(issue, job, self.user) job[self.r.config.job_status_field] = 'closed' job['P4DTI-filespecs'] = 'filespec_2\n' self.p4p[u].run('job -i', job) # Make, update and delete those fixes. self.p4p[u].run('fix -c %s %s' % (change1['Change'], job['Job'])) self.p4p[u].run('fix -c %s %s' % (change2['Change'], job['Job'])) self.p4p[u].run('fix -d -c %s %s' % (change3['Change'], job['Job'])) self.clear_log() self.r.carefully_poll_databases() self.expectation([800, 806, 811, 814, 815, 816, 817, 841, 860, 861, 853, 862, 812, 910, 911, 912]) issue = self.r.dt.issue(issue.id()) job = self.p4.run('job -o %s' % job['Job'])[0] getattr(self, config.dt_name + '_assigned')(issue, job) self.poll() fixes = self.p4.run('fixes -j %s' % job['Job']) assert job['P4DTI-filespecs'] == 'filespec_1\n' assert len(fixes) == 2 assert fixes[0]['Change'] == change3['Change'] assert fixes[0]['Status'] == status assert fixes[1]['Change'] == change2['Change'] assert fixes[1]['Status'] == status for m in self.log_messages: if m.id == 800: assert string.find(m.text, config.administrator_address) assert string.find(m.text, self.user) != -1 assert string.find(m.text, u) != -1 # 10.6. Illegal changes # # Regression test for job000429. tests = getattr(self, config.dt_name + '_illegal_changes') for field, value, expected in tests: issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) job[field] = value self.p4p[self.user].run('job -i', job) self.clear_log() self.r.carefully_poll_databases() self.expectation(expected + [800, 805, 811, 851, 860, 861, 862, 812, 852, 853, 910, 911, 912], [824, 923]) self.poll() # We're done; one last check for luck. self.check_consistency() Bugzilla_illegal_changes = [ ('Product', 'no_such_product', [504]), ('Description', 'foo', [505]), ('Status', 'verified', [503]), ] # 10a. ISSUE LIFE CYCLE TEST CASES # # This test creates issues and takes them through various kinds of # lifecycle, checking that they are replicated correctly at each step. # Specific to Tracker class tracker_lifecycle(p4dti_base): user = 'fred' user1 = 'Admin' user2 = 'jim' def setUp(self): self.setup_everything() self.setup_perforce([self.user, self.user1, self.user2]) self.check_startup() # Submit a new issue to the defect tracker. Run a replication # cycle; check that the issue gets replicated to perforce. Return a # pair of the defect tracker issue and the Perforce job. def submit(self, user): log_fn() id = getattr(self, config.dt_name + '_submit')(user) # It gets replicated to Perforce. This is a regression test for # job000233. self.check_replication_dt_to_p4(first_time = 1) # Check that the job has been created in Perforce. config.trk.login(config.tracker_user, config.tracker_password) issue = self.r.dt.issue(id) config.trk.logout() assert issue jobname = issue.corresponding_id() job = self.p4.run('job -o %s' % jobname)[0] # Defect-tracker specific checks. getattr(self, config.dt_name + '_submitted')(issue, job) return issue, job # Assign the issue to the user in the defect tracker. Run a # replication cycle; check that the assignment gets replicated. # Return the updated defect trakcer issue and Perforce job. def assign(self, issue, job, user): log_fn() getattr(self, config.dt_name + '_assign')(issue, job, user) # It gets replicated to Perforce. self.check_replication_dt_to_p4() # Defect-tracker specific checks. config.trk.login(config.tracker_user, config.tracker_password) issue = self.r.dt.issue(issue.id()) job = self.p4.run('job -o %s' % job['Job'])[0] getattr(self, config.dt_name + '_assigned')(issue, job) return issue, job # The job has been closed in Perforce. Check that the closure is # replicated to the defect tracker and that the expected messages # appear. def close(self, issue, job, expected): log_fn() self.poll() self.expectation(expected + [911, 912]) config.trk.login(config.tracker_user, config.tracker_password) issue1 = self.r.dt.issue(issue.id()) job1 = self.p4.run('job -o %s' % job['Job'])[0] getattr(self, config.dt_name + '_closed')(issue, job, issue1, job1) def Tracker_submit(self, user): log_fn() fields = { 'Title': 'Auto-created bug', 'Description': 'Auto-created description', 'Severity': '3-Medium', 'Owner': 'Admin', } config.trk.login(config.tracker_user, config.tracker_password) bugid = self.r.dt.tracker.create_bug(fields, user) config.trk.logout() # Sleep for a second so that we can be sure to pick this # up on the next poll. time.sleep(1) print "Bug:", bugid return bugid def Tracker_submitted(self, bug, job): log_fn() for field, expected in [ (self.r.config.job_status_field, 'open'), ('Severity', '3-medium'), ]: assert job[field] == expected, \ ("Expected new job %s to have %s '%s', but found " "'%s'." % (job['Job'], field, expected, job[field])) def Tracker_assign(self, bug, job, user): log_fn() # Assign the issue in Tracker. tr_user = self.r.config.user_translator.translate_p4_to_dt( self.user2, self.r.dt, self.r.dt_p4) changes = {'Assigned To': tr_user} self.update_bug(changes, bug, user) # Wait two seconds so that the bug will be picked up in the next # poll, not the one after. time.sleep(2) def Tracker_assigned(self, bug, job): # The status should now be assigned. job = self.p4.run('job -o %s' % job['Job'])[0] assert job['Assigned_to'] == self.user2, \ ("Expected assigned job %s to be assigned to ' " "'assigned' but it has state %(Status)s." % (job['Job'], self.user2, job['Assigned_to'])) def Tracker_closed(self, bug, job, bug1, job1): pass def Tracker_check_p4_to_dt(self, issue, job, value_list): config.trk.login(config.tracker_user, config.tracker_password) issue = self.r.dt.issue(issue.id()) config.trk.logout() for field, expected in value_list: if isinstance(expected, types.StringType): assert issue[field] == expected, \ ("Expected replicated issue %s to have %s '%s', but found " "'%s'." % (issue['Id'], field, expected, issue[field])) else: # assume function assert expected(issue[field]), \ ("Expected replicated issue %s to have valid field %s." % (issue['Id'], field)) def check_job_fields(self, job, value_list): j = self.p4.run('job -o %s' % job['Job'])[0] for field, expected in value_list: assert j[field] == expected, \ ("Expected replicated job %s to have %s '%s', but found " "'%s'." % (j['Job'], field, expected, j[field])) def update_bug(self, changes, issue, user): config.trk.login(config.tracker_user, config.tracker_password) self.r.dt.tracker.update_bug(changes, issue, user) config.trk.logout() def runTest(self): "Issue life cycle (test_p4dti.tracker_lifecycle)" # 10.1. Simple issue lifecycle # # This is a simple issue cycle: # # 1. An issue is submitted to the defect tracker. # 2. It is replicated to Perforce. # 3. It gets assigned to a developer. # 4. It is closed in Perforce (by editing the job). # 5. The closure gets replicated back to the defect tracker. issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) # Change the status and check for replication back log_message("----Test for status change being replicated back") job[self.r.config.job_status_field] = 'evaluated' temp = self.p4p[self.user].run('job -o') self.p4p[self.user].run('job -i', job) ## self.close(issue, job, [805, 824, 826]) self.poll() self.expectation([805, 824, 911, 912], [826]) value_list = [('Status', 'Evaluated')] self.Tracker_check_p4_to_dt(issue, job, value_list) # Now check for Date Fixed field being set log_message("----Test for Date Fixed field being set") job[self.r.config.job_status_field] = 'closed' self.p4p[self.user].run('job -i', job) self.poll() self.expectation([805, 824, 911, 912], [826]) value_list = [('Status', 'Fixed'), ('Date Fixed', lambda x: x <> '')] self.Tracker_check_p4_to_dt(issue, job, value_list) # Test updates of individual fields # with auto-login self.p4.run('logout') changes = {'Status': 'Resubmitted'} self.update_bug(changes, issue, self.user) self.poll() self.expectation([804, 812, 911, 912]) # Changed fields value_list = [(self.r.config.job_status_field, 'open')] self.check_job_fields(job, value_list) # Test closing issue in Tracker - should be closed in p4 changes = {'State': 'Closed', 'Title': 'I am a title with " a single quote'} self.update_bug(changes, issue, self.user) self.poll() self.expectation([804, 812, 911, 912]) # Changed fields value_list = [(self.r.config.job_status_field, 'closed')] self.check_job_fields(job, value_list) # Update a field and replicate back. job = self.p4p[self.user].run('job -o %s' % job['Job'])[0] job["FIXED_IN_REL.VER.BLD[PATCH]"] = "Rel 1.2" self.p4p[self.user].run('job -i', job) self.poll() self.expectation([805, 824, 911, 912]) # Test Perforce Note field job = self.p4p[self.user].run('job -o %s' % job['Job'])[0] job["PERFORCE_NOTE"] = "Some test notes\nwith some more stuff" self.p4p[self.user].run('job -i', job) self.poll() self.expectation([805, 824, 911, 912]) # Update an existing note job = self.p4p[self.user].run('job -o %s' % job['Job'])[0] job["PERFORCE_NOTE"] += "Some changed stuff in this note" self.p4p[self.user].run('job -i', job) self.poll() self.expectation([805, 824, 911, 912]) # Test updates of individual fields changes = {'Perforce Note': 'This is a new note\r\nwith some trailing blank lines\r\n\r\n\r\n'} self.update_bug(changes, issue, self.user) self.poll() self.expectation([804, 812, 911, 912]) # Changed fields # 10.2. Issue lifecycle (fix in Perforce) # # This is an issue cycle in which the issue is associated with a # changelist in Perforce: # # 1. An issue is submitted to the defect tracker. # 2. It is replicated to Perforce. # 3. It gets assigned to a developer. # 4. It is closed in Perforce (by making a fix). # 5. Closure and fix get replicated back to the defect # tracker. log_message("Test for lifecyle - fix in Perforce") issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) self.p4p[self.user].run('fix -c 1 %s' % job['Job']) self.close(issue, job, [805, 819, 820, 824]) # 10.3. Issue lifecycle (fix on submission in Perforce) # # This tests an issue lifecycle in which the issue is associated # with a pending changelist and closed on submission. # # 1. An issue is submitted to the defect tracker. # 2. It is replicated to Perforce. # 3. It gets assigned to a developer. # 4. The job description is edited in Perforce (this is a # regression test for job000362). # 5. A fix is made with a pending changelist. # 6. The change is submitted. # 7. Job, fix get replicated back to the defect tracker. log_message("----test for fix on submission") issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) job['Description'] = job['Description'] + '\nEdited.' self.p4p[self.user].run('job -i', job) self.poll() self.expectation([805, 824, 911, 912], [826]) change = self.edit_file(self.user) self.p4p[self.user].run('fix -c %s %s' % (change['Change'], job['Job'])) self.p4p[self.user].run('submit -c %s' % change['Change']) self.close(issue, job, [805, 819, 820, 824]) log_message("----test for fix replicated before submission") issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) # job['Description'] = job['Description'] + '\nEdited.' # self.p4p[self.user].run('job -i', job) self.poll() self.expectation([911, 912], [826]) change = self.edit_file(self.user) self.p4p[self.user].run('fix -c %s %s' % (change['Change'], job['Job'])) # Poll and check that things are replicated OK. self.poll() self.expectation([805, 819, 820, 824, 911, 912]) # Now submit and poll again self.p4p[self.user].run('submit -c %s' % change['Change']) self.close(issue, job, [805, 824]) tr_user = self.r.config.user_translator.translate_p4_to_dt( self.user, self.r.dt, self.r.dt_p4) value_list = [('Fixed By', tr_user)] self.Tracker_check_p4_to_dt(issue, job, value_list) log_message("----test for deleted fix") # Delete fix in Perforce and check that the deletion is # replicated. self.p4p[self.user].run('fix -d -c %s %s' % (change['Change'], job['Job'])) self.poll() self.expected([818]) fixes = self.p4.run('fixes -j %s' % job['Job']) assert len(fixes) == 0, ("Expected no fixes for %s, but found " "%s." % (job['Job'], fixes)) # 10.4. Issue lifecycle (fix to assigned on submission in # Perforce) # # As section 10.3, but the fix is to the job's current state # rather than "closed", so the job state doesn't change. issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) change = self.edit_file(self.user) status = job[self.r.config.job_status_field] self.p4p[self.user].run('fix -s %s -c %s %s' % (status, change['Change'], job['Job'])) self.p4p[self.user].run('submit -c %s' % change['Change']) self.poll() self.expectation([805, 819, 820, 824, 911, 912]) # Change fix status (only -- note that we set the job status # back); check that it's replicated. job = self.p4p[self.user].run('job -o %s' % job['Job'])[0] self.p4p[self.user].run('fix -s closed -c %s %s' % (change['Change'], job['Job'])) self.p4p[self.user].run('job -i', job) self.poll() self.expectation([805, 819, 821, 825, 911, 912]) ## # 10.5. Simultaneous edit ## # ## # Provoke a conflict by changing an issue simultaneously in both ## # systems. Then try again, but with a user other than the job ## # owner. (Check that mail goes to both.) ## # ## # Make, update and delete a fix in Perforce simultaneously with ## # the change. Make sure the fixes are respectively deleted, ## # updated and restored. Ditto for filespecs. ## ## change1 = self.edit_file(self.user) # make ## change2 = self.edit_file(self.user) # update ## change3 = self.edit_file(self.user) # delete ## self.poll() ## for u in [self.user, self.user1]: ## issue, job = self.submit(self.user) ## status = job[self.r.config.job_status_field] ## # Make the fixes that we are going to update and delete. ## ## self.p4p[self.user].run('fix -s %s -c %s %s' ## % (status, change2['Change'], ## job['Job'])) ## self.p4p[self.user].run('fix -s %s -c %s %s' ## % (status, change3['Change'], ## job['Job'])) ## job['P4DTI-filespecs'] = 'filespec_1\n' ## self.p4p[self.user].run('job -i', job) ## self.poll() ## # Now edit the job simultaneously in DT and Perforce. ## getattr(self, config.dt_name + '_assign')(issue, job, ## self.user) ## job[self.r.config.job_status_field] = 'closed' ## job['P4DTI-filespecs'] = 'filespec_2\n' ## self.p4p[u].run('job -i', job) ## # Make, update and delete those fixes. ## self.p4p[u].run('fix -c %s %s' ## % (change1['Change'], job['Job'])) ## self.p4p[u].run('fix -c %s %s' ## % (change2['Change'], job['Job'])) ## self.p4p[u].run('fix -d -c %s %s' ## % (change3['Change'], job['Job'])) ## self.clear_log() ## self.r.carefully_poll_databases() ## self.expectation([800, 806, 811, 814, 815, 816, 817, 841, ## 860, 861, 853, 862, 812, 910, 911, 912]) ## issue = self.r.dt.issue(issue.id()) ## job = self.p4.run('job -o %s' % job['Job'])[0] ## getattr(self, config.dt_name + '_assigned')(issue, job) ## self.poll() ## fixes = self.p4.run('fixes -j %s' % job['Job']) ## assert job['P4DTI-filespecs'] == 'filespec_1\n' ## ## assert len(fixes) == 2 ## assert fixes[0]['Change'] == change3['Change'] ## assert fixes[0]['Status'] == status ## assert fixes[1]['Change'] == change2['Change'] ## assert fixes[1]['Status'] == status ## for m in self.log_messages: ## if m.id == 800: ## assert string.find(m.text, ## config.administrator_address) ## assert string.find(m.text, self.user) != -1 ## assert string.find(m.text, u) != -1 # We're done; one last check for luck. config.trk.login(config.tracker_user, config.tracker_password) self.check_consistency() config.trk.logout() # 11. P4DTI CONFIGURATION DATABASE # # This checks that parameters get added and removed from the # configuration database. This is a regression test for job000169 and # job000351. class configdb(p4dti_base): def setUp(self): self.setup_everything() def Bugzilla_config(self): return self.r.dt.bugzilla.get_config() def check(self, cf1): reset_configuration() for k, v in cf1.items(): setattr(config, k, v) self.initialize_replicator() cf2 = getattr(self, config.dt_name + '_config')() for k, v in cf1.items(): if v != cf2.get(k): self.addFailure("Set parameter %s to '%s', but found " "'%s' in the config database." % (k, v, cf2.get(k))) def runTest(self): "Replicator configuration database (test_p4dti.configdb)" cf1 = { 'p4_server_description': 'spong', 'changelist_url': 'http://spong/changelist?%d', 'job_url': 'http://spong/job?%s' } cf2 = { 'p4_server_description': 'spong', 'changelist_url': None, 'job_url': None } self.check(cf1) self.check(cf2) self.check(cf1) # 12. INCONSISTENCIES # # This test case checks that each kind of inconsistency that can be # reported by the consistency checking script is detected and reported. class inconsistencies(p4dti_base): user = 'rb' def setUp(self): self.setup_everything() self.setup_perforce([self.user]) self.check_startup() def runTest(self): "Consistency check failures (test_p4dti.inconsistencies)" # 12.1. Unreplicated fix in Perforce change = self.edit_file(self.user) job1 = self.p4.run('jobs')[0] self.p4p[self.user].run('fix -s %s -c %s %s' % (job1[config.job_status_field], change['Change'], job1['Job'])) self.clear_log() self.r.check_consistency() if not self.r.dt.supports('fixes'): self.expectation([871, 890], [883, 884, 885]) else: self.expectation([871, 878, 886, 890], [883, 884]) self.poll() # 12.2. Fix with wrong status in Perforce job1status = job1[config.job_status_field] assert job1status != 'assigned' self.p4p[self.user].run('fix -s assigned -c %s %s' % (change['Change'], job1['Job'])) self.clear_log() self.r.check_consistency() self.expectation([871, 880, 886, 890], [883, 884]) self.p4p[self.user].run('fix -s %s -c %s %s' % (job1status, change['Change'], job1['Job'])) # 12.3. Changed job in Perforce - not relevant to Tracker if config.dt_name <> 'Tracker': job1['Description'] = job1['Description'] + '...\n' self.p4p[self.user].run('job -i', job1) self.clear_log() self.r.check_consistency() self.expectation([871, 875, 886, 890], [883, 884]) self.poll() # 12.4. Unreplicated job in Perforce # # This is a regression test for job000372. job4 = self.p4p[self.user].run('job -o')[0] job4['P4DTI-rid'] = config.rid job4['P4DTI-issue-id'] = "999999" job4name = self.create_job(self.p4p[self.user], job4) self.clear_log() self.r.check_consistency() self.expectation([871, 882, 886, 890], [883, 884]) # 12.5. Unreplicated job pointing to replicated issue issueid = self.p4.run('jobs')[0]['P4DTI-issue-id'] job5 = self.p4p[self.user].run('job -o %s' % job4name)[0] job5['P4DTI-issue-id'] = issueid self.p4p[self.user].run('job -i', job5) self.clear_log() self.r.check_consistency() self.expectation([871, 881, 886, 890], [883, 884]) self.p4p[self.user].run('job -d %s' % job5['Job']) # 12.6. Missing job in Perforce job6 = self.p4.run('jobs')[0] self.p4p[self.user].run('job -d %s' % job6['Job']) self.clear_log() self.r.check_consistency() self.expectation([871, 873, 886, 890], [883, 884]) # 12.7. Job pointing to wrong issue # # We also get "(P4DTI-875X) Job '%s' would need the following # set of changes ..." because P4DTI-issue-id is wrong, and # "(P4DTI-8793) Change %d fixes issue '%s' but there is no # corresponding fix for job '%s'." because we added a fix and # replicated it in section 12.1 above but now it's missing in # Perforce. id = job6['P4DTI-issue-id'] job6['P4DTI-issue-id'] = '999999' self.p4p[self.user].run('job -i', job6) self.clear_log() self.r.check_consistency() self.expectation([871, 874, 875, 879, 887, 890], [883, 884]) job6['P4DTI-issue-id'] = id # Put things back to rights, and add a filespec in preparation # for section 12.8 below. job6['P4DTI-filespecs'] = 'filespec_1\n' self.p4p[self.user].run('job -i', job6) self.poll() self.check_consistency() # 12.8. Incorrect filespecs # # By changing the filespec we can provoke messages about # filespecs being missing in both sides. job6['P4DTI-filespecs'] = 'filespec_2\n' self.p4p[self.user].run('job -i', job6) self.clear_log() self.r.check_consistency() self.expectation([871, 876, 877, 887, 890], [883, 884]) # Polling should sort everything out. self.poll() self.check_consistency() # 13. RACE DURING REPLICATION OF FIXES # # This is a regression test for job000385. class race_385(p4dti_base): user = 'rb' change = None original_fixes_differences = None race_flag = None def setUp(self): self.setup_everything() self.setup_perforce([self.user]) self.check_startup() def race(self, dt_fixes, p4_fixes): fix_diffs = self.original_fixes_differences(dt_fixes, p4_fixes) # Submit the change (this is the race). But only once. if self.race_flag == 0: self.p4p[self.user].run('submit -c %s' % self.change['Change']) self.race_flag = 1 return fix_diffs def runTest(self): "Race during replication of fixes (test_p4dti.race_385)" # Create a pending change in Perforce and make a fix to that # change. self.change = self.edit_file(self.user) job = self.p4.run('jobs')[0] self.p4p[self.user].run('fix -s %s -c %s %s' % (job[config.job_status_field], self.change['Change'], job['Job'])) # Create a second pending change in Perforce (so that the first # change will get renumbered when submitted). self.edit_file(self.user) # We'll make sure that the change gets submitted after # fixes_differences gets called -- this is our last opportunity # to run the race before the possibly illegal call to p4 change # -o. self.original_fixes_differences = self.r.fixes_differences self.r.fixes_differences = self.race self.race_flag = 0 # Replicate. self.poll() # Restore the original fixes_differences. self.r.fixes_differences = self.original_fixes_differences # Check that the replication succeeded. self.expectation([802, 805, 819, 820], [825, 911, 912]) self.check_consistency() # 14. CAN CONFIRM AN UNCONFIRMED BUG # # This is a regression test for job000262 and job000410. Bugzilla only. class unconfirmed(p4dti_base): bug_id = 27 # this bug is UNCONFIRMED in our test database user = 'rb' def setUp(self): self.setup_everything() self.setup_perforce([self.user]) self.check_startup() def runTest(self): "Confirm an UNCONFIRMED bug (test_p4dti.unconfirmed)" job = self.p4.run('job -o bug%d' % self.bug_id)[0] assert job[self.r.config.job_status_field] == 'unconfirmed' job[self.r.config.job_status_field] = 'bugzilla_new' self.p4p[self.user].run('job -i', job) # Replicate. self.poll() # Check that the replication succeeded. self.expectation([805, 824], [911, 912]) self.check_consistency() # 15. REPLICATE NEW JOBS FROM PERFORCE # # This checks that a new job in Perforce can be submitted to the defect # tracker and successfully replicated thereafter. This is a regression # test for job000036. class new_p4_job(p4dti_base): user = 'rb' Bugzilla_job = { 'Summary': 'test summary', 'User': user, 'Description': 'test description', 'Priority': 'P3', 'Severity': 'normal', 'Product': 'product 1', } def Bugzilla_prepare_issue(self, issue, job): default_component = { 'product 1': 'component 1.1', 'product 2': 'component 2.1', 'unconfirmed': 'unconf1', 'group': 'group1', } if issue['product'] == '': issue['product'] = 'product 1' if issue['component'] == '': issue['component'] = default_component[issue['product']] if issue['version'] == '': issue['version'] = 'unspecified' def setUp(self): self.setup_everything() config.prepare_issue = getattr(self, config.dt_name + '_prepare_issue') config.replicate_job_p = lambda(job): 1 self.setup_perforce([self.user]) self.check_startup() def runTest(self): "Replicate a new job from Perforce (test_p4dti.new_p4_job)" # Add a job to Perforce. job = self.p4p[self.user].run('job -o')[0] for k, v in getattr(self, config.dt_name + '_job').items(): job[k] = v jobname = self.create_job(self.p4p[self.user], job) # Replicate it. self.poll() self.expected([892, 894]) self.expected_only_in_range(800, 914, [826, 892, 894, 911, 912]) # Check that the job is replicated. self.check_consistency() job = self.p4.run('job -o %s' % jobname)[0] assert job['P4DTI-rid'] == config.rid # 16. WINDOWS NT SERVICE AND EVENT LOG # # This is a regression test for job000046 and job000149. class nt_service(p4dti_base): # Service name in the Windows registry. _svc_name_ = 'p4dti_service' # If the service can't replicate its initial batch of issues in five # minutes, we'll consider it to have failed (this depends on the # test database not having too many issues). timeout = 300 # Number of polls before we expect consistency to be achieved. polls_for_consistency = 3 def setUp(self): self.setup_everything() self.initialize_replicator() # Handle on Service Manager. import win32service access_level = win32service.SC_MANAGER_ALL_ACCESS self.hscm = win32service.OpenSCManager(None, None, access_level) # Handle on Event Log. import win32evtlog self.hevt = win32evtlog.OpenEventLog(None, 'Application') # Establish configuration environment for service. We want the # current configuration with the following changes: NT Event # logging must be set; logging level is DEBUG; email from the # replicator is supressed. controls = (('P4DTI_CONFIG', os.path.abspath(config_filename)), ('P4DTI_EVTLOG', ''), ('P4DTI_LOGLEVEL', str(message.DEBUG)), ('P4DTI_ADMINADDR', ''), ) for key, value in controls: os.environ[key] = value # If the service is already there, remove it. if self.query_service(): self.remove_service() def tearDown(self): import win32service import win32evtlog win32service.CloseServiceHandle(self.hscm) win32evtlog.CloseEventLog(self.hevt) # Return None if service not currently installed. Otherwise # return currentState field as reported by Service Manager. def query_service(self): import win32service type_filter = win32service.SERVICE_WIN32 state_filter = win32service.SERVICE_STATE_ALL query = win32service.EnumServicesStatus # List all registered services services = query(self.hscm, type_filter, state_filter) # Is ours there? for svc_name, description, status in services: if svc_name == self._svc_name_: # Looks like it was. currentState = status[1] return currentState # Looks like it wasn't. return None def wait(self, function): timeout = self.timeout while timeout > 0: if function(): return 1 time.sleep(1) timeout = timeout - 1 return 0 def wait_for_status(self, status): query = self.query_service return self.wait(lambda query=query, status=status: query() == status) def read_event_log_first_time(self): import win32evtlog hevt = self.hevt oldest_record = win32evtlog.GetOldestEventLogRecord(hevt) record_count = win32evtlog.GetNumberOfEventLogRecords(hevt) newest_record = oldest_record + record_count - 1 # Random access into the Event Log. read_flags = (win32evtlog.EVENTLOG_FORWARDS_READ + win32evtlog.EVENTLOG_SEEK_READ) # Read newest event and ignore it. This has the required # side-effect of setting the handle's reading position in the # log, ready for future reads. win32evtlog.ReadEventLog(hevt, read_flags, newest_record) def wait_for_event_log(self, msg): return self.wait(lambda self=self, msg=msg: self.match_event_log(msg)) # Read all the log entries that have been written since last # time. It's remotely possible that we might flush the message # we're looking for twice in one call to ReadEventLog, but this is # acceptable as it is harmless to pool again. def match_event_log(self, msg): import win32evtlog hevt = self.hevt # Sequential access from where we left off, last time we read # on this handle. read_flags = (win32evtlog.EVENTLOG_FORWARDS_READ + win32evtlog.EVENTLOG_SEQUENTIAL_READ) import catalog # This is the string we're looking for. wanted = str(catalog.msg(msg)) # We have to make an undeterminable number of reads to exhaust # the log. Each read will return some undeterminable number of # records. (I can't tell from documentation whether zero is a # possible number in cases where there are records to be # read. This doesn't matter, as we're prepared to come back # several times.) records = win32evtlog.ReadEventLog(hevt, read_flags, 0) while records: for record in records: message = record.StringInserts[0] if message == wanted: return 1 records = win32evtlog.ReadEventLog(hevt, read_flags, 0) return 0 # Simplify hook into service.main() def main(self, *args): import service service.main([''] + list(args)) # The next four methods have to use service.py, as that's what # we're testing. def install_service(self): self.main() def remove_service(self): self.main('remove') def start_service(self): self.main('start') def halt_service(self): self.main('stop') def runTest(self): "Manage NT service (test_p4dti.nt_service)" import win32service # Install the service and ensure that it's there. self.install_service() assert self.query_service() # Start it running. self.read_event_log_first_time() self.start_service() assert self.wait_for_status(win32service.SERVICE_RUNNING) # Allow the replication to happen. for i in range(self.polls_for_consistency): # (message.DEBUG, "Poll finished.") assert self.wait_for_event_log(912) # Halt the service. self.halt_service() assert self.wait_for_status(win32service.SERVICE_STOPPED) # Remove service and ensure that it's gone away. self.remove_service() assert not self.query_service() # Check that replication succeeded. self.check_consistency() # 17. MIGRATION # # This tests that jobs in Perforce can be migrated from the default # Perforce jobspec to the defect tracker and replicated thereafter. # This is a regression test for job000022, job000249, job000422 and # job000426. class migrate(p4dti_base): n_jobs = 20 # include a non-existent user for migration to create. users = ['rb', 'nb', 'gdr', 'spong'] states = ['open', 'closed', 'suspended'] fixes = {} def setUp(self): self.setup_everything() for param in ["migrate_p", "translate_jobspec", "prepare_issue"]: setattr(config, param, getattr(self, config.dt_name + "_" + param)) self.setup_perforce(self.users) self.initialize_replicator() self.create_jobs() # delete the replicator user from the Bugzilla database, so # we can test that migration creates it. self.dti.system('mysql -u "%s" "%s" -e "delete from profiles where login_name = \'%s\'"' % (config.dbms_user, config.dbms_database, config.replicator_address)) def create_jobs(self): self.fixes = {} # Create some jobs in Perforce. for i in range(self.n_jobs): job = self.p4.run('job -o')[0] user = self.users[i % len(self.users)] status = self.states[i % len(self.states)] changes = { 'Description': ("First line of job %d\n" "Remainder of job %d\n" "Blah blah blah...\n" % (i, i)), 'User': user, self.r.config.job_status_field: status, } self.r.update_job(job, changes) self.p4p[user].run('fix -s %s -c 1 %s' % (status, job['Job'])) self.fixes[job['Job']] = status def runTest(self): "Migration from Perforce jobs (test_p4dti.migrate)" self.clear_log() self.r.migrate_users() self.r.migrate() self.expected([892, 895]) self.expected_only_in_range(800, 914, [802, 819, 820, 892, 895]) self.r.refresh_perforce_jobs() self.check_consistency() # Check that no fixes have been lost, changed or added. # Regression test for job000271. for f in self.p4.run('fixes'): if not self.fixes.has_key(f['Job']): self.addFailure("Found unexpected fix for job '%s'." % f['Job']) else: if self.fixes[f['Job']] != f['Status']: self.addFailure("Expected fix for job '%s' to have " "status '%s', but found '%s'." % (f['Job'], self.fixes[f['Job']], f['Status'])) del self.fixes[f['Job']] for j in self.fixes.keys(): self.addFailure("Expected a fix for job '%s', but didn't " "find it." % j) def Bugzilla_migrate_p(self, job): return 1 def Bugzilla_prepare_issue(self, issue, job): issue["product"] = "product 1" issue["component"] = "component 1.1" issue["version"] = "unspecified" def Bugzilla_translate_jobspec(self, job): desc = job.get("Description", "") newline = string.find(desc, "\n") job["Summary"] = desc[:newline] job["Description"] = desc[newline+1:] job["User"] = job.get("User", "") if 'reporter' in config.replicated_fields: job["Reporter"] = job.get("User", "") if 'qa_contact' in config.replicated_fields: job["QA_Contact"] = "None" status_map = { "open": ("assigned", ""), "closed": ("closed", "FIXED"), "suspended": ("closed", "LATER"), } (status, resolution) = status_map[job.get(self.r.config.job_status_field, "open")] job["Status"] = status job["Resolution"] = resolution job["Severity"] = "blocker" job["Priority"] = "P1" return job # 18. BUGZILLA PARAMETERS # # Check that editing Bugzilla parameters causes the # p4dti_bugzilla_parameters table to be created in the Bugzilla # database, and that the parameters we expect to be there really are. class bugzilla_params(p4dti_base): expected_parameters = [ 'emailregexp', 'emailregexpdesc', 'emailsuffix', 'p4dti', ] def runTest(self): "Bugzilla parameters (test_p4dti.bugzilla_params)" p4dti_param = whrandom.randint(1,1000000) # Confirm that the replicator spots the absense of the # p4dti_bugzilla_parameters table. self.setup_everything({ 'bugzilla_mysqldump': None }) self.clear_log() self.initialize_replicator() self.expected([129]) # Follow the instructions in [RB 2000-08-10, 5.4.3]: create the # p4dti_bugzilla_parameters table by editing the parameters. We # take the opportunity to set the 'p4dti' parameter to a random # number which we will read back later to check that the table # is being updated correctly. self.dti.edit_parameters({ 'p4dti': str(p4dti_param), }) # Confirm that replicator finds the p4dti_bugzilla_parameters # table and the required parameters are present. self.clear_log() reset_configuration() self.initialize_replicator() self.expected_not([129, 130]) for p in self.expected_parameters: assert config.bugzilla.params.has_key(p) # Confirm that the p4dti_parameter matches. assert int(config.bugzilla.params['p4dti']) == p4dti_param # 19. ENUM KEYWORDS WITH SPACES # # This is a regression test for job000445. Bugzilla only. class enum_spaces(p4dti_base): def runTest(self): "Bugzilla enums containing spaces (test_p4dti.enum_spaces)" self.setup_everything({ 'bugzilla_mysqldump': 'job000445-mysqldump', }) self.check_startup() self.check_consistency() # 20. BUGZILLA EMAILSUFFIX # # This checks that when the Bugzilla "emailsuffix" parameter is set, # that the P4DTI correctly translates users between Bugzilla and # Perforce. Bugzilla only. Regression test for job000352. class emailsuffix(p4dti_base): users = ['gdr', 'nb', 'ndl', 'rb'] def runTest(self): "Bugzilla 'emailsuffix' parameter (test_p4dti.emailsuffix)" self.setup_everything({ 'bugzilla_mysqldump': 'job000352-mysqldump', }) self.setup_perforce(self.users) self.dti.edit_parameters({ 'p4dti': 1, 'emailregexp': '^.*$', 'emailsuffix': email_suffix, }) self.check_startup() self.expected_not([129, 130, 516, 536, 867]) # 21. PERFORCE JOB/FIX CONSISTENCY # # Check that Perforce updates the 'P4DTI-user' field whenever someone # changes a job by fixing it. Regression test for job000086 and # job000276. class fix_update(p4dti_base): def check_modifier(self, jobname, expected, case): job = self.p4p['a'].run('job -o %s' % jobname)[0] for found, desc in ((job['P4DTI-user'], "job['P4DTI-user']"), (self.r.job_modifier(job), "job modifier")): if found != expected: self.addFailure("After %s, expected %s to be " "'%s', but found '%s'." % (case, desc, expected, found)) return job def runTest(self): "Perforce updates jobs when fix (test_p4dti.fix_update)" self.setup_everything() self.setup_perforce(['a', 'b', 'c', 'd', 'e', 'f']) self.initialize_replicator() self.r.prepare_to_run() # Check that the "P4DTI-user" field has the correct value after # a series of actions: # 21.1. Job creation job = self.p4p['a'].run('job -o')[0] jobname = self.create_job(self.p4p['a'], job) job = self.check_modifier(jobname, 'a', "job creation") status = job[config.job_status_field] # 21.2. Job editing job['Description'] = 'test job 2' self.p4p['b'].run('job -i', job) job = self.check_modifier(jobname, 'b', "job editing") # 21.3. Fixing (with change in status) self.p4p['c'].run('fix -c 1 %s' % jobname) job = self.check_modifier(jobname, 'c', "fixing (with change " "in status)") # 21.4. Fixing a submitted changelist (no change in status) change = self.edit_file('d') self.p4p['d'].run('submit -c %s' % change['Change']) self.p4p['d'].run('fix -c %s %s' % (change['Change'], jobname)) job = self.check_modifier(jobname, 'd', "fixing (without " "change in status)") # 21.5. Changing the status of an existing fix self.p4p['e'].run('fix -s %s -c 1 %s' % (status, jobname)) job = self.check_modifier(jobname, 'e', "changing fix status ") # 21.6. Submitting pending changelist change = self.edit_file('f') self.p4p['f'].run('fix -c %s %s' % (change['Change'], jobname)) self.p4p['f'].run('submit -c %s' % change['Change']) job = self.check_modifier(jobname, 'f', "submitting a pending " "changelist") # 22. FREQUENT EDITS # # Check that frequent edits (at least one per poll) in the defect # tracker don't cause conflicts. Regression test for job000016, # job000042. class frequent_edits(lifecycle): def runTest(self): "Frequent edits cause no conflicts (test_p4dti.frequent_edits)" # Submit an issue to the defect tracker and replicate it. id = getattr(self, config.dt_name + '_submit')(self.user) self.poll() self.expectation([803, 804, 812, 911, 912]) # Check that it was replicated correctly. issue = self.r.dt.issue(id) jobname = issue.corresponding_id() job = self.p4.run('job -o %s' % jobname)[0] getattr(self, config.dt_name + '_submitted')(issue, job) # Assign it in the defect tracker and replicate the assignment. getattr(self, config.dt_name + '_assign')(issue, job, self.user) self.poll() self.expectation([804, 812, 911, 912]) # 23. BUGZILLA PERFORCE SECTION # # Check that the Bugzilla patch is correctly producing a Perforce # section in the bug form. class perforce_section(lifecycle): def setUp(self): self.setup_everything() self.setup_perforce([self.user]) self.dti.edit_parameters({ 'p4dti': 1 }) self.check_startup() def runTest(self): "Perforce section in Bugzilla (test_p4dti.perforce_section)" # Submit a new job to Bugzilla and fix it in Perforce. issue, job = self.submit(self.user) issue, job = self.assign(issue, job, self.user) self.p4.run('fix -c 1 %s' % job['Job']) self.close(issue, job, [802, 805, 819, 820, 824, 826]) # Get the bug form. bug_form = self.dti.run_script("show_bug.cgi", { 'bugzilla_login': config.bugzilla_admin_user, 'bugzilla_password': config.bugzilla_admin_password, 'id': issue.id(), }) parser = self.bug_form_parser() parser.feed(bug_form) parser.close() parser.check() # 23.1. Parse Bugzilla bug form class bug_form_parser(p4dti_html_parser): def __init__(self): self.seen_p4dti = 0 self.in_p4dti = 0 sgmllib.SGMLParser.__init__(self) def check(self): assert self.seen_p4dti == 1 def start_div(self, attrs_list): attrs = self.attrs(attrs_list) if attrs.get('class') == 'p4dti': self.seen_p4dti = self.seen_p4dti + 1 self.in_p4dti = 1 def end_div(self): self.in_p4dti = 0 # RUNNING THE TESTS def tests(): suite = unittest.TestSuite() tests = [start_1, bogus, configdb, existing, fix_update, frequent_edits, inconsistencies, lifecycle, migrate, new_p4_job, normal, project, race_385, start_2] if os.name == 'nt': tests.extend([nt_service]) if config.dt_name == 'Bugzilla': tests.extend([bugzilla_params, emailsuffix, enum_spaces, perforce_section, unconfirmed]) if config.dt_name == 'Tracker': # Only required tests - so overwrite above lists tests = [bogus, existing, fix_update, frequent_edits, normal] tests = [tracker_lifecycle] ## tests = [normal] # optional tests - commented out for now # test.extend([start_1, race_385, start_2, lifecycle, migrate] for t in tests: suite.addTest(t()) return suite if __name__ == "__main__": unittest.main(defaultTest="tests") # A. 