#!/bin/env python3
'''
Perforce Baseline and Branch Import Tool
Test Suite
Copyright (c) 2012 Perforce Software, Inc.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL PERFORCE
SOFTWARE, INC. BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
THIS SOFTWARE IS NOT OFFICIALLY SUPPORTED.
Please send any feedback to consulting@perforce.com.
Test driver for BBI. Run with '-h' for arguments.
- p4 and p4d must be available in path
- Must be run from the BBI test directory
'''
# imports
import sys
import os
import re
import platform
from optparse import OptionParser
from subprocess import *
import shutil
import stat
import tarfile
import tempfile
import time
# global vars
P4USER=""
P4PORT=""
P4CLIENT=""
verbosity="2"
mode = "0"
p4=""
p4Raw=""
p4cfg = ""
bbicfg = ""
bbipy = ""
bbipy_validator = ""
topdir = ""
P4ROOT= ""
WSDIR= ""
p4dProc = None
testresults = {}
# parse options
def parseoptions():
global mode
global verbosity
usage = "usage: %prog [options]"
parser = OptionParser(usage)
parser.add_option("-m", "--mode", dest="mode",
help="Mode: 0 = no validation, 1 = check file metadata, 2 = check file contents.")
parser.add_option("-v", "--verbosity", dest="level",
help="Specify the verbosity level, from 1-3. 1 is quiet mode;"
" only error and test output is displayed. 2 is normal verbosity; some messages"
" displayed. 3 is debug-level verbosity. The default for this program is 3.")
(options, args) = parser.parse_args()
if options.mode:
try:
if -1 < int(options.mode) < 3:
mode = options.mode
print("Mode level set to: {0}.".format(options.mode))
else:
sys.exit(1)
except:
log("ERROR", "Mode level must be an integer value of 0, 1 or 2.")
if options.level:
try:
if 0 < int(options.level) < 4:
verbosity = options.level
print("Verbosity level set to: {0}.".format(options.level))
else:
sys.exit(1)
except:
log("ERROR", "Verbosity level must be an integer value of 1, 2 or 3.")
# log
def log(msglevel="DEBUG", message=""):
if msglevel == "TEST":
print("Running in test mode. Command run would have been:\n", message)
elif msglevel == "ERROR":
print(message)
sys.exit(1)
elif (verbosity == "3"):
print(message)
elif (verbosity == "2" and msglevel == "INFO"):
print(message)
# run OS cmd
def runcmd(cmd):
try:
log("DEBUG", cmd);
pipe = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, universal_newlines=True)
stdout, stderr = pipe.communicate()
log("DEBUG", stdout)
if pipe.returncode != 0:
log("INFO", "{0} generated the following output: {1}".format(cmd, stdout))
log("ERROR", "{0} generated the following error: {1}".format(cmd, stderr))
else:
return stdout
except OSError as err:
log("ERROR", "Execution failed: {0}".format(err))
# run P4 cmd in tagged mode (use -s)
def runp4cmd(cmd, instr=None):
try:
log("DEBUG", cmd);
pipe = Popen(p4 + cmd, shell=True, stdin=PIPE, stdout=PIPE, universal_newlines=True)
stdout, stderr = pipe.communicate(instr)
log("DEBUG", stdout)
if re.search(r"^exit: 0", stdout, re.M):
return stdout
else:
log("ERROR", "{0}{1} generated the following error: {2}".format(p4, cmd, stdout))
except OSError as err:
log("ERROR", "Execution failed: {0}".format(err))
# run P4 cmd without tagged output
def runp4cmdRaw(cmd, instr=None):
try:
log("DEBUG", cmd);
pipe = Popen(p4Raw + cmd, shell=True, stdin=PIPE, stdout=PIPE, universal_newlines=True)
stdout, stderr = pipe.communicate(instr)
log("DEBUG", stdout)
if pipe.returncode != 0:
log("ERROR", "{0}{1} generated the following error: {2}".format(p4Raw, cmd, stdout))
else:
return stdout
except OSError as err:
log("ERROR", "Execution failed: {0}".format(err))
# delete tree
def rmtree(treedir):
try:
for root, dirs, names in os.walk(treedir):
for name in names:
os.chmod(os.path.join(root,name), stat.S_IWRITE)
shutil.rmtree(treedir)
except OSError as e:
log("ERROR", "Could not remove {0}.\nThe error was:\n{1}".format(treedir, e))
# start P4D
def setupServer():
global p4dProc
os.mkdir(P4ROOT)
runcmd("p4d -r {0} -jr {1}".format(P4ROOT, os.path.join(os.getcwd(), "checkpoint")))
tar = tarfile.open("depot.tar.gz")
tar.extractall(path=P4ROOT)
tar.close()
p4dProc = Popen(["p4d", "-d", "-r", P4ROOT, "-p", P4PORT, "-J", "off"])
log("INFO", "p4d launched with pid {0}".format(p4dProc.pid))
# make workspace
def setupWorkspace():
os.mkdir(WSDIR)
cwd = os.getcwd()
os.chdir(WSDIR)
client = runp4cmdRaw("client -o")
runp4cmd("client -i", client.encode("latin-1"))
cfgfile = open("p4config.txt", "w")
cfgfile.write("P4PORT={0}\n".format(P4PORT))
cfgfile.write("P4USER={0}\n".format(P4USER))
cfgfile.write("P4CLIENT={0}\n".format(P4CLIENT))
cfgfile.close()
os.chdir(cwd)
# delete server
def cleanup():
runp4cmd("admin stop")
p4dProc.wait()
log("DEBUG", "Waiting for P4D to shutdown...")
time.sleep(3)
rmtree(P4ROOT)
rmtree(WSDIR)
rmtree(topdir)
# validates against expected data and (optionally) runs the validator
def validate(testname, rjd=False):
diffs = runp4cmd("diff2 -q \"//depot/test/{0}/...\" \"//depot/expected/{1}/...\"".format(testname, testname))
tstresult = ""
if re.search(r"^info:", diffs, re.M):
log("INFO", "\tFAIL:")
log("INFO", "\t\t" + diffs)
tstresult = "FAIL"
else:
log("INFO", "\tSUCCESS")
tstresult = "SUCCESS"
if mode != "0":
if rjd:
validation = runcmd("python {0} -v {1} -r rjd -m {2} {3} {4}".format(bbipy_validator, verbosity, mode, p4cfg, bbicfg))
else:
validation = runcmd("python {0} -v {1} -m {2} {3} {4}".format(bbipy_validator, verbosity, mode, p4cfg, bbicfg))
log("INFO", "\tValidator says: {0}".format(validation))
if re.search(r"^\s*DIFF", validation, re.M):
tstresult = "FAIL"
return tstresult
def runtest(testname, bbicfglines, rjd=False):
global testresults
log("INFO", "---------------------------------------------------")
log("INFO", "TEST: " + testname)
log("INFO", "\n")
bbicfg_f = open(bbicfg, "w")
for cfgline in bbicfglines:
bbicfg_f.write(cfgline)
bbicfg_f.close()
bbiout = ''
if rjd:
bbiout = runcmd("python {0} -v {1} -r rjd {2} {3}".format(bbipy, verbosity, p4cfg, bbicfg))
else:
bbiout = runcmd("python {0} -v {1} {2} {3}".format(bbipy, verbosity, p4cfg, bbicfg))
log("INFO", "\tRan BBI")
tstresult = validate(testname, rjd)
os.unlink(bbicfg)
testresults[testname] = tstresult
return bbiout
# test basic snapshot import
def testUpdate():
testname = "update"
cfglines = ["BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0")), "UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}\n".format(testname,testname)]
runtest(testname, cfglines)
# test renaming
def testRename():
testname = "rename"
cfglines = ["BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0")), "UPDATE|FGS-1.0|//depot/test/{0}_orig/...|Import FGS-1.0 for {1}\n".format(testname,testname), "RENAME|//depot/test/{0}_orig/...|//depot/test/{0}/...|Rename FGS-1.0 for {1}\n".format(testname,testname)]
runtest(testname, cfglines)
# test snapshot import with empty directory (do not populate)
def testUpdateEmptyNoPopulate():
testname = "empty_no_populate"
cfglines = ["BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0-DEV.tar.gz")), "UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}\n".format(testname,testname)]
runtest(testname, cfglines)
# test snapshot import with empty directory (populate)
def testUpdateEmptyPopulate():
testname = "empty_populate"
cfglines = ["BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0-DEV.tar.gz")), "UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}|EMPTYDIRS\n".format(testname,testname)]
runtest(testname, cfglines)
# test snapshot import with symlinks (preserve)
def testUpdateLinksPreserve():
testname = "links_preserve"
cfglines = ["BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-0.1-DEV")), "UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}|SYMLINKS\n".format(testname,testname)]
runtest(testname, cfglines)
# test snapshot import with symlinks (do not preserve)
def testUpdateLinksNoPreserve():
testname = "links_no_preserve"
cfglines = ["BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-0.1-DEV")), "UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}\n".format(testname,testname)]
runtest(testname, cfglines)
# test branch
def testBranch():
testname = "branch"
cfglines = []
cfglines.append("BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0")))
cfglines.append("UPDATE|FGS-1.0|//depot/test/{0}_orig/...|Import FGS-1.0 for {1}\n".format(testname,testname))
cfglines.append("BRANCH|//depot/test/{0}_orig/...|//depot/test/{0}/...|Branch FGS-1.0 for {1}\n".format(testname,testname))
runtest(testname, cfglines)
# test branch
def testBranchWithSpec():
global testresults
testname = "branch_with_spec"
cfglines = []
cfglines.append("BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0")))
cfglines.append("UPDATE|FGS-1.0|//depot/test/{0}_orig/...|Import FGS-1.0 for {1}\n".format(testname,testname))
cfglines.append("BRANCH|//depot/test/{0}_orig/...|//depot/test/{0}/...|Branch FGS-1.0 for {1}|BRANCHSPEC={2}\n".format(testname,testname,testname))
runtest(testname, cfglines)
branches = runp4cmdRaw("branches")
pattern = r"""^Branch %s\s+"""
regexp = re.compile(pattern % testname, re.M)
if re.search(regexp, branches) == None:
log("INFO", "\tNo branch spec named {0}".format(testname))
testresults[testname] = "FAIL"
# test basic snapshot import with NODELETE option
def testUpdateNoDelete():
testname = "update_no_delete"
cfglines = []
cfglines.append("BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0")))
cfglines.append("BASELINE|FGS-1.1|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.1")))
cfglines.append("UPDATE|FGS-1.1|//depot/test/{0}/...|Import FGS-1.1 for {1}\n".format(testname,testname))
cfglines.append("UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}|NODELETE\n".format(testname,testname))
runtest(testname, cfglines)
# test wildcards (allow)
def testWildcardsAllow():
testname = "wildcards_allow"
cfglines = []
cfglines.append("BASELINE|FGS-0.2-DEV|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-0.2-DEV")))
cfglines.append("UPDATE|FGS-0.2-DEV|//depot/test/{0}/...|Import FGS-0.2 for {1}|ALLOWWILDCARDS\n".format(testname,testname))
runtest(testname, cfglines)
# test basic snapshot import using a command
def testUpdateCmd():
testname = "update_cmd"
cfglines = []
if platform.system() == "Windows":
cfglines.append("BASELINE|FGS-1.0|CMD:copy {0} .\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0", "src", "makefile")))
else:
cfglines.append("BASELINE|FGS-1.0|CMD:cp {0} .\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0", "src", "makefile")))
cfglines.append("UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}\n".format(testname,testname))
runtest(testname, cfglines)
# test basic snapshot import with 'detect case' flag
def testCaseDetect():
global testresults
testname = "case_detect"
cfglines = []
cfglines.append("BASELINE|FGS-0.3-DEV|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-0.3-DEV.tar")))
cfglines.append("UPDATE|FGS-0.3-DEV|//depot/test/{0}/...|Import FGS-0.3 for {1}\n".format(testname,testname))
log("INFO", "---------------------------------------------------")
log("INFO", "TEST: " + testname)
log("INFO", "\n")
bbicfg_f = open(bbicfg, "w")
for cfgline in cfglines:
bbicfg_f.write(cfgline)
bbicfg_f.close()
cmdout = runcmd("python {0} -v {1} -c 1 {2} {3}".format(bbipy, verbosity, p4cfg, bbicfg))
log("INFO", "\tRan BBI")
os.unlink(bbicfg)
if re.search(r"^Potential case sensitivity conflict", cmdout, re.M):
log("INFO", "\tSUCCESS")
testresults[testname] = "SUCCESS"
else:
log("INFO", "\tFAIL:")
log("INFO", "\t\t" + cmdout)
testresults[testname] = "FAIL"
# test rmtop
def testRmtop():
testname = "rmtop"
cfglines = []
cfglines.append("BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0.tar.gz")))
cfglines.append("UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}|RMTOP\n".format(testname,testname))
runtest(testname, cfglines)
# test junk (no remove)
def testJunkNoRemove():
testname = "junk_no_remove"
cfglines = []
cfglines.append("BASELINE|FGS-0.4-DEV|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-0.4-DEV")))
cfglines.append("UPDATE|FGS-0.4-DEV|//depot/test/{0}/...|Import FGS-0.4-DEV for {1}\n".format(testname,testname))
runtest(testname, cfglines, rjd=False)
# test junk (remove)
def testJunkRemove():
testname = "junk_remove"
cfglines = []
cfglines.append("BASELINE|FGS-0.4-DEV|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-0.4-DEV")))
cfglines.append("UPDATE|FGS-0.4-DEV|//depot/test/{0}/...|Import FGS-0.4-DEV for {1}\n".format(testname,testname))
runtest(testname, cfglines, rjd=True)
# test 'record as merged'
def testRecordMerge():
testname = "record_merge"
cfglines = []
cfglines.append("BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0")))
cfglines.append("BASELINE|FGS-1.1|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.1")))
cfglines.append("UPDATE|FGS-1.0|//depot/test/{0}_orig/...|Import FGS-1.0 for {1}\n".format(testname,testname))
cfglines.append("BRANCH|//depot/test/{0}_orig/...|//depot/test/{0}/...|Branch FGS-1.0 for {1}\n".format(testname,testname))
cfglines.append("UPDATE|FGS-1.1|//depot/test/{0}_orig/...|Update FGS-1.1 for {1}\n".format(testname,testname))
cfglines.append("RECORD_AS_MERGED|//depot/test/{0}_orig/...|//depot/test/{0}/...|Record merge for {1}\n".format(testname,testname))
runtest(testname, cfglines)
# test import zip file
def testUpdateZip():
testname = "update_zip"
cfglines = []
cfglines.append("BASELINE|FGS-1.1|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.1.zip")))
cfglines.append("UPDATE|FGS-1.1|//depot/test/{0}/...|Import FGS-1.1 for {1}|RMTOP\n".format(testname,testname))
runtest(testname, cfglines)
# test 'copy merge'
def testCopyMerge():
testname = "copy_merge"
cfglines = []
cfglines.append("BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0")))
cfglines.append("BASELINE|FGS-1.1|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.1")))
cfglines.append("UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}\n".format(testname,testname))
cfglines.append("BRANCH|//depot/test/{0}/...|//depot/test/{0}_dev/...|Branch FGS-1.0 for {1}\n".format(testname,testname))
cfglines.append("UPDATE|FGS-1.1|//depot/test/{0}_dev/...|Update FGS-1.1 for {1}\n".format(testname,testname))
cfglines.append("COPY_MERGE|//depot/test/{0}_dev/...|//depot/test/{0}/...|Record merge for {1}\n".format(testname,testname))
runtest(testname, cfglines)
# test 'copy merge' with full copy
def testCopyMergeFull():
testname = "copy_merge_full"
cfglines = []
cfglines.append("BASELINE|FGS-1.0|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.0")))
cfglines.append("BASELINE|FGS-1.1|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.1")))
cfglines.append("BASELINE|FGS-1.1.p1|{0}\n".format(os.path.join(os.getcwd(), "baselines", "FGS-1.1.p1")))
cfglines.append("UPDATE|FGS-1.0|//depot/test/{0}/...|Import FGS-1.0 for {1}\n".format(testname,testname))
cfglines.append("BRANCH|//depot/test/{0}/...|//depot/test/{0}_dev/...|Branch FGS-1.0 for {1}\n".format(testname,testname))
cfglines.append("UPDATE|FGS-1.1|//depot/test/{0}_dev/...|Update FGS-1.1 for {1}\n".format(testname,testname))
cfglines.append("RECORD_AS_MERGED|//depot/test/{0}_dev/...|//depot/test/{0}/...|Record merge for {1}\n".format(testname,testname))
cfglines.append("UPDATE|FGS-1.1.p1|//depot/test/{0}/...|Update FGS-1.1.p1 for {1}\n".format(testname,testname))
cfglines.append("COPY_MERGE|//depot/test/{0}_dev/...|//depot/test/{0}/...|Copy merge for {1}|FULLCOPY\n".format(testname,testname))
runtest(testname, cfglines)
# set global variables
def initVars():
global P4USER
global P4PORT
global P4CLIENT
global verbosity
global mode
global p4
global p4Raw
global p4cfg
global bbicfg
global bbipy
global bbipy_validator
global topdir
global P4ROOT
global WSDIR
P4USER="import_user"
P4PORT="127.0.0.1:9000"
P4CLIENT="import_ws"
verbosity="2"
mode = "0"
# make working directories
topdir = tempfile.mkdtemp(dir=os.getcwd())
P4ROOT=os.path.join(topdir, "p4root")
WSDIR=os.path.join(topdir, "ws")
# set P4 and config
if platform.system() == "Windows":
p4="p4.exe"
else:
p4="p4"
p4Raw = "{0} -p {1} -c {2} -u {3} ".format(p4, P4PORT, P4CLIENT, P4USER)
p4 = "{0} -p {1} -c {2} -u {3} -s ".format(p4, P4PORT, P4CLIENT, P4USER)
p4cfg = os.path.join(topdir, "p4cfg.txt")
p4cfg_f = open(p4cfg, "w")
p4cfg_f.write("P4PORT=" + P4PORT + "\n")
p4cfg_f.write("P4CLIENT=" + P4CLIENT + "\n")
p4cfg_f.write("P4USER=" + P4USER + "\n")
p4cfg_f.close()
bbicfg = os.path.join(topdir, "bbi.cfg")
bbipy = os.path.join(os.getcwd(), "..", "p4bbi.py")
bbipy_validator = os.path.join(os.getcwd(), "..", "p4bbi_validate.py")
# main routine
if __name__ == "__main__":
initVars()
parseoptions()
setupServer()
setupWorkspace()
testUpdate()
testUpdateEmptyPopulate()
testUpdateEmptyNoPopulate()
testUpdateLinksPreserve()
testUpdateLinksNoPreserve()
testRename()
testBranch()
testBranchWithSpec()
testUpdateNoDelete()
testWildcardsAllow()
testUpdateCmd()
testCaseDetect()
testRmtop()
testJunkNoRemove()
testJunkRemove()
testRecordMerge()
testUpdateZip()
testCopyMerge()
testCopyMergeFull()
cleanup()
log("INFO", "---------------------------------------------------")
log("INFO", "Summary of results:")
goodResultCnt = 0
badResultCnt = 0
for key in sorted(iter(testresults)):
log("INFO", "\t{0:20}:\t{1:20}".format(key, testresults[key]))
if testresults[key] == 'SUCCESS':
goodResultCnt = goodResultCnt + 1
else:
badResultCnt = badResultCnt + 1
totalResultCnt = goodResultCnt + badResultCnt
log("INFO", "{0} of {1} test passed: {2}%".format(goodResultCnt, totalResultCnt, 100 * goodResultCnt / totalResultCnt))