#!/usr/bin/python import os import re import sys import stat import time import uuid import socket import thread import getopt import marshal import logging import threading from subprocess import Popen, PIPE, STDOUT import xml.etree.ElementTree as ET # usage def usage(): PROGRAM_USAGE = """\ Usage: ReplicaTester.py [-h] [-C] [-l logfile] [-v] [-t type] -c configFile -h = show help (this message) -C = generate sample configuration file -v = verbose (debug mode) -l logfile = specify logfile (default is STDOUT) -c configFile = the XML configuration file -t type = type of test to perform [ping|meta|file|all] ping - simple p4 ping test to each replica meta - metadata (p4 key) update file - distribution of file all - perform each of these in turn (default) ** NOTE: Tests are run in parallel and could result in increased network activity. Please use responsibly. ** NOTE: for file testing, a client workspace MUST be already defined. This script will not create one for you. """ print(PROGRAM_USAGE) sys.exit(0) # genConfigFile # generates a sample XML configuration file, suitable as a template def genConfigFile(): cfg = """\ /usr/local/bin/p4 10485760 //depotName/testfile METADATA_TEST_KEY admin passwd repl-client master:1666 replica1:1666 """ print(cfg) sys.exit() # parseConfigFile # parses the configuration file specified by the path, storing values in a global dictionary g_config def parseConfigFile(path): global g_config tree = ET.parse(path) config = tree.getroot() x = config.find('p4') if x is not None: g_config['p4'] = x.text x = config.find('metaTestKey') if x is not None: g_config['metaTestKey'] = x.text x = config.find('fileSize') if x is not None: g_config['fileSize'] = int(x.text) x = config.find('depotFile') if x is not None: g_config['depotFile'] = x.text x = config.find('adminUser') if x is not None: g_config['admin.user'] = x.text x = config.find('adminPass') if x is not None: g_config['admin.pass'] = x.text x = config.find('client') if x is not None: g_config['client'] = x.text x = config.find('master') if x is not None: g_config['master.port'] = x.text g_config['master.name'] = x.attrib['name'] g_config['master.timeout'] = int(x.attrib['timeout']) if 'timeout' in x.attrib else DEFAULT_TCP_TIMEOUT x = config.find('replicaList') if x is not None: replicas = [] for r in x: replica = {} replica['name'] = r.attrib['name'] replica['attempts'] = int(r.attrib['attempts']) if 'attempts' in r.attrib else DEFAULT_ATTEMPTS replica['port'] = r.text replica['timeout'] = int(r.attrib['timeout']) if 'timeout' in r.attrib else DEFAULT_TCP_TIMEOUT replicas.append(replica) g_config['replicas'] = replicas # global variables DEFAULT_ATTEMPTS=10 DEFAULT_TCP_TIMEOUT=1 g_config = {} g_ticket = None # error # Exit with error messages def error(*msg): logging.error(*msg) # errorExit # Exit with error messages def errorExit(*msg): logging.error(*msg) exit() # exit # exit with error def exit(*msg): for m in msg: logging.info(m) raise SystemExit() # p4MarshalCmd # executes the p4 command, results sent to a list def p4MarshalCmd(cmd, p4user, p4port, quiet=False): if not quiet: logging.debug("p4 {0}".format(" ".join(cmd))) list = [] baseCmd = [g_config['p4'], "-p", p4port, "-u", p4user, "-c", g_config['client'], "-G"] if g_ticket is not None: baseCmd.append("-P") baseCmd.append(g_ticket) pipe = Popen(baseCmd + cmd, stdout=PIPE).stdout try: while 1: record = marshal.load(pipe) list.append(record) except EOFError: pass pipe.close() return list # p4InputCmd # executes the p4 command with input def p4InputCmd(data, cmd, p4user, p4port, quiet=False): if not quiet: logging.debug("p4 {0}".format(" ".join(cmd))) list = [] baseCmd = [g_config['p4'], "-p", p4port, "-u", p4user, "-c", g_config['client']] if g_ticket is not None: baseCmd.append("-P") baseCmd.append(g_ticket) proc = Popen(baseCmd + cmd, stdout=PIPE, stdin=PIPE, stderr=STDOUT) result = proc.communicate(input=data) return result # p4Cmd # executes a p4 command, returns results def p4Cmd(cmd, p4user, p4port, quiet=False): if not quiet: logging.debug("p4 {0}".format(" ".join(cmd))) baseCmd = [g_config['p4'], "-p", p4port, "-u", p4user, "-c", g_config['client']] if g_ticket is not None: baseCmd.append("-P") baseCmd.append(g_ticket) proc = Popen(baseCmd + cmd, stdout=PIPE, stderr=STDOUT) result = proc.communicate() return result # containsError # utility function to check for any error code in the results array def containsError(results=[],logError=True): foundError = False for r in results: if 'code' in r: if r['code'] == 'error': foundError = True if logError: error(r['data']) elif r['code'] == 'info': #code info output can be important in troubleshooting debug(r['data']) return foundError # generateRandomFile # creates a random binary file in the specified location def generateRandomFile(file, size): logging.debug("creating {0} with size {1}...".format(file,size)) try: # first check to see if the parent directory exists # and create it if it doesn't d = os.path.dirname(file) if not os.path.exists(d): os.makedirs(d) # delete the file if it already exists (to get around read-only settings on Windows) if os.path.exists(file): os.chmod( file, stat.S_IWRITE ) os.unlink( file ) with open(file, 'wb') as fout: fout.write(os.urandom(size)) except: raise Exception("Cannot create random file " + file) # createChange # creates a changelist with the specified description and returns the changelist number def createChange(description): change = 0 spec = """\ Change: new Client: {0} User: {1} Status: new Description: {2} """.format(g_config['client'], g_config['admin.user'], description) cmd = ["change", "-i"] result = p4InputCmd(spec, cmd, g_config['admin.user'], g_config['master.port']) if result[1] is not None: raise Exception("Error creating changelist") matchObj = re.match("Change (\d*) created.*", result[0]) if matchObj: change = int(matchObj.group(1)) return change # depotFileExists # checks to see if the specified path exists on the server def depotFileExists(depotFile): exists = False cmd = ["files", depotFile] result = p4MarshalCmd(cmd, g_config['admin.user'], g_config['master.port']) if not containsError(result): exists = True return exists # getTicket # performs a login -a -p on the master server using the admin user and password # and returns the ticket if successful def getTicket(): ticket = None cmd = ['login', '-a', '-p'] result = p4InputCmd(g_config['admin.pass'], cmd, g_config['admin.user'], g_config['master.port']) if(result[0].startswith("Enter password:")): ticket = result[0].replace("Enter password:", "").strip() else: raise Exception("Unable to get ticket") return ticket # tcpPing # simple utility that tries to make a TCP/IP socket connection to the specified server:port def tcpPing(p4port, timeout=1): [host,port] = p4port.split(":",2) pingable = False try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(timeout) s.connect((host, int(port))) pingable = True s.close() except: logging.debug("[tcp ping]: %s unreachable" % p4port) return pingable # p4Ping # runs a p4 ping on the specified replica def p4Ping(replica): logging.debug("pinging %s" % replica['name']) cmd = ["ping"] result = p4MarshalCmd(cmd, g_config["admin.user"], replica['port'], True) if containsError(result): logging.info(result[0]['data']) else: r = result[0] logging.info("{0}: count:{1} time: {2}".format(replica['name'], r['pingCount'], r['pingTimes'])) # pingTest # check responsiveness of each of the Perforce replica servers using p4 ping def pingTest(): logging.info("--- pingTest start") t = [] for replica in g_config['replicas']: if replica['reachable']: rt = threading.Thread(name=replica['name'], target=p4Ping, args=(replica,)) t.append(rt) rt.start() while threading.activeCount() > 1: logging.debug("waiting for active threads to complete...") time.sleep(1) logging.info("--- pingTest complete") # p4Pull # runs a p4 ping on the specified replica def p4Pull(replica): logging.debug("checking journal sequence on %s" % replica['name']) cmd = ["pull", "-lj"] result = p4MarshalCmd(cmd, g_config["admin.user"], replica['port'], True) if containsError(result): logging.info(result[0]['data']) else: r = result[0] if r['replicaJournalSequence'] == r['masterJournalSequence']: logging.info("{0}: in sync with master".format(replica['name'])) else: diff = int(r['masterJournalSequence']) - int(r['replicaJournalSequence']) logging.info("{0}: sequence difference: {1}".format(replica['name'], diff)) # pullTest # performs a p4 pull on each of the Perforce replica servers def pullTest(): logging.info("--- pullTest start") t = [] for replica in g_config['replicas']: if replica['reachable']: rt = threading.Thread(name=replica['name'], target=p4Pull, args=(replica,)) t.append(rt) rt.start() while threading.activeCount() > 1: logging.debug("waiting for active threads to complete...") time.sleep(1) logging.info("--- pullTest complete") # p4CheckKey # checks the key for the value def p4CheckKey(replica, key, value, begin): logging.debug("checking for key on %s" % replica['name']) cmd = ["key", key] found = False for count in range(replica['attempts']): result = p4MarshalCmd(cmd, g_config["admin.user"], replica['port'], True) if containsError(result): logging.info(result[0]['data']) break else: r = result[0] if r['value'] == value: found = True done = time.time() diff = done - begin logging.info("%s: key updated %f sec" % (replica['name'], diff)) break if not found: logging.error("%s: key not updated in %d attempts " % (replica['name'], replica['attempts'])) # metaTest # modify a piece of metadata and verify that it is sent to each of the replicas def metaTest(): logging.info("--- metaTest start") value = uuid.uuid4().hex cmd = ["key", g_config['metaTestKey'], value] result = p4Cmd(cmd, g_config['admin.user'], g_config['master.port']) begin = time.time() if result[0] is not None: logging.debug("[TIMESTAMP %f]: key set on master" % begin) for replica in g_config['replicas']: if replica['reachable']: rt = threading.Thread(name=replica['name'], target=p4CheckKey, args=(replica, g_config['metaTestKey'], value, begin,)) rt.start() while threading.activeCount() > 1: logging.debug("waiting for active threads...") time.sleep(1) else: logging.error("unable to set key %s on master" % g_config['metaTestKey']) logging.info("--- metaTest complete") def p4VerifyFile(replica, fstat, begin): logging.debug("verifying file on replica...") cmd = ["verify", "-s", "-m", "1", g_config['depotFile']] found = False for count in range(replica['attempts']): result = p4MarshalCmd(cmd, g_config["admin.user"], replica['port'], True) if containsError(result): logging.info(result[0]['data']) break else: r = result[0] if r['rev'] == fstat['headRev'] and "status" not in r: found = True done = time.time() diff = done - begin logging.info("%s: file updated %f sec" % (replica['name'], diff)) break time.sleep(1) if not found: logging.error("%s: key not updated in %d attempts " % (replica['name'], replica['attempts'])) # fileTest # submit a file of a known size and verify that the file is sent to each of the replicas def fileTest(): logging.info("--- fileTest start") change = 0 try: cmd = ["sync", "-k"] result = p4Cmd(cmd, g_config['admin.user'], g_config['master.port']) if result[1] is not None: raise Exception("error while synchronizing the client") cmd = ["where", g_config['depotFile']] result = p4MarshalCmd(cmd, g_config['admin.user'], g_config['master.port']) if containsError(result): raise Exception("error while retrieving client spec") if not 'path' in result[0]: raise Exception("unable to determine file path") file = result[0]['path'] change = createChange("this is a test") adding = True if depotFileExists(g_config['depotFile']): adding = False cmd = ["edit", "-k", "-c", str(change), g_config['depotFile']] result = p4Cmd(cmd, g_config['admin.user'], g_config['master.port']) if (result[1] is not None) or ("opened for edit" not in result[0]): raise Exception("error while editing file") generateRandomFile(file, g_config['fileSize']) if adding: cmd = ["add", "-c", str(change), "-t", "binary+S", g_config['depotFile']] result = p4Cmd(cmd, g_config['admin.user'], g_config['master.port']) if result[1] is not None: raise Exception("error while adding file") cmd = ["submit", "-c", str(change)] result = p4Cmd(cmd, g_config['admin.user'], g_config['master.port']) if result[1] is not None: raise Exception("error while submitting changelist {0}".format(change)) begin = time.time() cmd = ["fstat", g_config['depotFile']] result = p4MarshalCmd(cmd, g_config['admin.user'], g_config['master.port']) if containsError(result): raise Exception("Error getting fstat on file from master") fstat = result[0] for replica in g_config['replicas']: if replica['reachable']: rt = threading.Thread(name=replica['name'], target=p4VerifyFile, args=(replica, fstat, begin,)) rt.start() while threading.activeCount() > 1: logging.debug("waiting for active threads...") time.sleep(1) except Exception as e: if change > 0: # revert the changelist cmd = ["revert", "-c", str(change), "//..."] p4Cmd(cmd, g_config['admin.user'], g_config['master.port']) # delete the pending changelist cmd = ["change", "-d", str(change)] p4Cmd(cmd, g_config['admin.user'], g_config['master.port']) raise e logging.info("--- fileTest complete") def main(argv=None): global g_ticket verbose = False logFile = None configFile = None scantype = "all" try: opts, args = getopt.getopt(argv, "hl:c:vCt:") for opt, arg in opts: if opt == "-v": verbose = True elif opt == "-h": usage() elif opt == "-l": logFile = arg elif opt == "-c": configFile = arg elif opt == "-C": genConfigFile() elif opt == "-t": scantype = arg if(configFile == None): print("ERROR: configFile is required\n") usage() sys.exit(3) logLevel = logging.INFO if verbose: logLevel = logging.DEBUG if logFile is not None: logging.basicConfig(filename=logFile, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logLevel) logging.info("------ STARTING RUN ------") else: logging.basicConfig(format='[%(levelname)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logLevel) parseConfigFile(configFile) if not tcpPing(g_config['master.port'], g_config['master.timeout']): raise Exception("Cannot ping master server") g_ticket = getTicket() if "invalid" in g_ticket: raise Exception("Password invalid") for replica in g_config['replicas']: # if we can't even get to the TCP/IP port, no bother trying to use p4 ping if tcpPing(replica['port'], replica['timeout']): replica['reachable'] = True else: replica['reachable'] = False # log the unreachable server as an error logging.error("%s: unreachable" % replica['name']) if scantype == "ping": pingTest() elif scantype == "pull": pullTest() elif scantype == "meta": metaTest() elif scantype == "file": fileTest() elif scantype == "all": pingTest() pullTest() metaTest() fileTest() else: raise Exception("Scan type not recognized") sys.exit(0) except getopt.GetoptError: print("ERROR: unknown argument\n") usage() sys.exit(2) if __name__ == '__main__': try: main(sys.argv[1:]) except Exception as e: errorExit(e)