# -*- encoding: UTF8 -*- # tests missing # - maximum # - preflight check from __future__ import print_function import sys import time import P4 import logging import io import codecs import logutils import unittest, os, types, shutil, stat from subprocess import Popen, PIPE if sys.version_info[0] >= 3: from configparser import ConfigParser from io import StringIO else: from ConfigParser import ConfigParser from StringIO import StringIO import P4Transfer P4D = "p4d" P4USER = "testuser" P4CLIENT = "test_ws" TRANSFER_CLIENT = "transfer" TRANSFER_CONFIG = "transfer.cfg" TEST_COUNTER_NAME = "P4Transfer" INTEG_ENGINE = 3 def onRmTreeError(function, path, exc_info): os.chmod(path, stat.S_IWRITE) os.remove(path) def ensureDirectory(directory): if not os.path.isdir(directory): os.mkdir(directory) class P4Server: def __init__(self, root): self.root = root self.server_root = os.path.join(root, "server") self.client_root = os.path.join(root, "client") ensureDirectory(self.root) ensureDirectory(self.server_root) ensureDirectory(self.client_root) self.p4d = P4D self.port = "rsh:%s -r \"%s\" -L log -i" % (self.p4d, self.server_root) self.p4 = P4.P4() self.p4.port = self.port self.p4.user = P4USER self.p4.client = P4CLIENT self.p4.connect() self.p4.run_depots() # triggers creation of the user self.p4.run_configure('set', 'dm.integ.engine=%d' % INTEG_ENGINE) self.p4.disconnect() # required to pick up the configure changes self.p4.connect() self.client_name = P4CLIENT client = self.p4.fetch_client(self.client_name) client._root = self.client_root self.p4.save_client(client) def shutDown(self): if self.p4.connected(): self.p4.disconnect() def createTransferClient(self, name, root): pass def enableUnicode(self): cmd = [self.p4d, "-r", self.server_root, "-L", "log", "-vserver=3", "-xi"] f = Popen(cmd, stdout=PIPE).stdout for s in f.readlines(): pass f.close() def getCounter(self): "Returns value of counter as integer" result = self.p4.run('counter', TEST_COUNTER_NAME) if result and 'counter' in result[0]: return int(result[0]['value']) return 0 class TestP4Transfer(unittest.TestCase): def setUp(self): self.setDirectories() self.stdoutput = StringIO() logger = logutils.getLogger(P4Transfer.LOGGER_NAME, self.stdoutput) def tearDown(self): self.source.shutDown() self.target.shutDown() time.sleep(1) self.cleanupTestTree() def setDirectories(self): self.startdir = os.getcwd() self.transfer_root = os.path.join(self.startdir, 'transfer') self.cleanupTestTree() ensureDirectory(self.transfer_root) self.source = P4Server(os.path.join(self.transfer_root, 'source')) self.target = P4Server(os.path.join(self.transfer_root, 'target')) self.transfer_client_root = os.path.join(self.transfer_root, 'transfer_client') ensureDirectory(self.transfer_client_root) def cleanupTestTree(self): os.chdir(self.startdir) if os.path.isdir(self.transfer_root): shutil.rmtree(self.transfer_root, False, onRmTreeError) def setupTransfer(self): """Creates client workspaces on source and target and a config file""" source_client = self.source.p4.fetch_client(TRANSFER_CLIENT) source_client._root = self.transfer_client_root source_client._view = ['//depot/inside/... //%s/...' % TRANSFER_CLIENT] self.source.p4.save_client(source_client) target_client = self.target.p4.fetch_client('transfer') target_client._root = self.transfer_client_root target_client._view = ['//depot/import/... //%s/...' % TRANSFER_CLIENT] self.target.p4.save_client(target_client) # create the config file self.parser = ConfigParser() self.parser.add_section('source') self.parser.set('source', 'p4port', self.source.port) self.parser.set('source', 'p4user', P4USER) self.parser.set('source', 'p4client', TRANSFER_CLIENT) self.parser.add_section('target') self.parser.set('target', 'p4port', self.target.port) self.parser.set('target', 'p4user', P4USER) self.parser.set('target', 'p4client', TRANSFER_CLIENT) self.parser.add_section('general') self.parser.set('general', 'logfile', os.path.join(self.transfer_root, 'temp', 'test.log')) if not os.path.exists(os.path.join(self.transfer_root, 'temp')): os.mkdir(os.path.join(self.transfer_root, 'temp')) self.parser.set('general', 'counter_name', TEST_COUNTER_NAME) # write the config file self.transfer_cfg = os.path.join(self.transfer_root, TRANSFER_CONFIG) with open(self.transfer_cfg, 'w') as f: self.parser.write(f) def run_P4Transfer(self, *args): base_args = ['-c', self.transfer_cfg, '-s'] if args: base_args.extend(args) pt = P4Transfer.P4Transfer(*base_args) result = pt.replicate() return result def assertCounters(self, sourceValue, targetValue): sourceCounter = self.target.getCounter() targetCounter = len(self.target.p4.run("changes")) self.assertEqual(sourceCounter, sourceValue, "Source counter is not {} but {}".format(sourceValue, sourceCounter)) self.assertEqual(targetCounter, targetValue, "Target counter is not {} but {}".format(targetValue, targetCounter)) def testArgParsing(self): self.setupTransfer() args = ['-c', self.transfer_cfg, '-s'] pt = P4Transfer.P4Transfer(*args) self.assertEqual(pt.options.config, self.transfer_cfg) self.assertTrue(pt.options.stoponerror) self.assertFalse(pt.options.preflight) args = ['-c', self.transfer_cfg] pt = P4Transfer.P4Transfer(*args) def testAdd(self): self.setupTransfer() inside = os.path.join(self.source.client_root, "inside") ensureDirectory(inside) inside_file1 = os.path.join(inside, "inside_file1") with open(inside_file1, 'w') as f: f.write('Test content') self.source.p4.run_add(inside_file1) self.source.p4.run_submit('-d', 'inside_file1 added') self.run_P4Transfer() changes = self.target.p4.run_changes() self.assertEqual(len(changes), 1, "Target does not have exactly one change") self.assertEqual(changes[0]['change'], "1", "Target change is not 1") files = self.target.p4.run_files('//depot/...') self.assertEqual(len(files), 1, "Target does not have exactly one file") self.assertEqual(files[0]['depotFile'], '//depot/import/inside_file1', "File not transferred to correct place") self.assertCounters(1, 1) def testUnicode(self): self.setupTransfer() inside = os.path.join(self.source.client_root, "inside") ensureDirectory(inside) inside_file1 = u"inside_file1uåäö".encode('mbcs') localinside_file1 = os.path.join(inside, inside_file1) with open(localinside_file1, 'w') as f: f.write('Test content') self.source.p4.run_add('-f', localinside_file1) self.source.p4.run_submit('-d', 'inside_file1 added') self.run_P4Transfer() changes = self.target.p4.run_changes() self.assertEqual(len(changes), 1, "Target does not have exactly one change") self.assertEqual(changes[0]['change'], "1", "Target change is not 1") files = self.target.p4.run_files('//depot/...') self.assertEqual(len(files), 1, "Target does not have exactly one file") self.assertEqual(files[0]['depotFile'], '//depot/import/%s' % inside_file1) self.assertCounters(1, 1) def testAddWildcardChars(self): self.setupTransfer() inside = os.path.join(self.source.client_root, "inside") ensureDirectory(inside) inside_file1 = os.path.join(inside, "@inside_file1") with open(inside_file1, 'w') as f: f.write('Test content') self.source.p4.run_add('-f', inside_file1) self.source.p4.run_submit('-d', 'inside_file1 added') self.run_P4Transfer() changes = self.target.p4.run_changes() self.assertEqual(len(changes), 1, "Target does not have exactly one change") self.assertEqual(changes[0]['change'], "1", "Target change is not 1") files = self.target.p4.run_files('//depot/...') self.assertEqual(len(files), 1, "Target does not have exactly one file") self.assertEqual(files[0]['depotFile'], '//depot/import/%40inside_file1', "File not transferred to correct place") self.assertCounters(1, 1) inside_file1Fixed = inside_file1.replace("@", "%40") self.source.p4.run_edit(inside_file1Fixed) with open(inside_file1, 'w') as f: f.write('Different stuff') self.source.p4.run_submit('-d', 'inside_file1 modified') self.run_P4Transfer() changes = self.target.p4.run_changes() self.assertEqual(len(changes), 2) self.assertEqual(changes[0]['change'], "2") def testEditAndDelete(self): self.setupTransfer() # add inside = os.path.join(self.source.client_root, "inside") ensureDirectory(inside) inside_file1 = os.path.join(inside, "inside_file1") with open(inside_file1, 'w') as f: f.write('Test content') self.source.p4.run_add(inside_file1) self.source.p4.run_submit('-d', "inside_file1 added") # edit self.source.p4.run_edit(inside_file1) with open(inside_file1, 'a+') as f: f.write('More content') self.source.p4.run_submit('-d', "inside_file1 edited") self.run_P4Transfer() changes = self.target.p4.run_changes() self.assertEqual(len(changes), 2) self.assertEqual(changes[0]['change'], "2") self.assertCounters(2, 2) # delete self.source.p4.run_delete(inside_file1) self.source.p4.run_submit('-d', "inside_file1 deleted") self.run_P4Transfer() self.assertCounters(3, 3) changes = self.target.p4.run_changes() self.assertEqual(len(changes), 3, "Target does not have exactly three changes") filelog = self.target.p4.run_filelog('//depot/import/inside_file1') self.assertEqual(filelog[0].revisions[0].action, 'delete', "Target has not been deleted") # re-add with open(inside_file1, 'w') as f: f.write('New content') self.source.p4.run_add(inside_file1) self.source.p4.run_submit('-d', "Re-added") self.run_P4Transfer() self.assertCounters(4, 4) filelog = self.target.p4.run_filelog('//depot/import/inside_file1') self.assertEqual(filelog[0].revisions[0].action, 'add', "Target has not been re-added") def testFileTypes(self): self.setupTransfer() inside = os.path.join(self.source.client_root, "inside") ensureDirectory(inside) inside_file1 = os.path.join(inside, "inside_file1") with open(inside_file1, 'w') as f: print("Test content", file=f) self.source.p4.run_add('-tbinary', inside_file1) self.source.p4.run_submit('-d', "inside_file1 added") self.run_P4Transfer() self.assertCounters(1, 1) filelog = self.target.p4.run_filelog('//depot/import/inside_file1') self.assertEqual(filelog[0].revisions[0].type, 'binary', "File type is not binary, but %s" % filelog[0].revisions[0].type) # change type to binary+x self.source.p4.run_edit('-t+x', inside_file1) with open(inside_file1, 'a+') as f: print("More content", file=f) self.source.p4.run_submit('-d', "Type changed") self.run_P4Transfer() self.assertCounters(2, 2) filelog = self.target.p4.run_filelog('//depot/import/inside_file1') self.assertEqual(filelog[0].revisions[0].type, 'xbinary', "File type is not xbinary, but %s" % filelog[0].revisions[0].type) # add ktext file inside_file2 = os.path.join(inside, "inside_file2") with open(inside_file2, 'w') as f: print("$Id$", file=f) print("$DateTime$", file=f) self.source.p4.run_add('-t+k', inside_file2) self.source.p4.run_submit('-d', "Ktext added") self.run_P4Transfer() self.assertCounters(3, 3) filelog = self.target.p4.run_filelog('//depot/import/inside_file2') self.assertEqual(filelog[0].revisions[0].type, 'ktext', "File type is not ktext, but %s" % filelog[0].revisions[0].type) verifyResult = self.target.p4.run_verify('-q', '//depot/import/inside_file2') self.assertEqual(len(verifyResult), 0) # just to see that ktext gets transferred properly content = self.target.p4.run_print('//depot/import/inside_file2')[1] lines = content.split("\n") self.assertEqual(lines[0], '$Id: //depot/import/inside_file2#1 $', "Content does not match : %s" % lines[0]) def testSimpleIntegrate(self): self.setupTransfer() # seed the integration inside = os.path.join(self.source.client_root, "inside") ensureDirectory(inside) inside_file1 = os.path.join(inside, "inside_file1") with open(inside_file1, 'w') as f: print("Test content", file=f) self.source.p4.run_add(inside_file1) self.source.p4.run_submit('-d', 'inside_file1 added') inside_file2 = os.path.join(inside, "inside_file2") self.source.p4.run_integrate(inside_file1, inside_file2) self.source.p4.run_submit('-d', 'inside_file1 -> inside_file2') result = self.run_P4Transfer() # test integration self.assertCounters(2, 2) changes = self.target.p4.run_changes() self.assertEqual(len(changes), 2, "Target does not have exactly two changes") # seed edit/copy self.source.p4.run_edit(inside_file1) with open(inside_file1, 'a+') as f: print("More content", file=f) self.source.p4.run_submit('-d', 'inside_file1 edited') self.source.p4.run_integrate(inside_file1, inside_file2) self.source.p4.run_resolve('-at') self.source.p4.run_submit('-d', 'inside_file1 -> inside_file2 (copy)') result = self.run_P4Transfer() self.assertCounters(4, 4) filelog = self.target.p4.run_filelog('//depot/import/inside_file2') self.assertEqual(len(filelog[0].revisions), 2, "Not exactly 2 target revisions") self.assertEqual(len(filelog[0].revisions[1].integrations), 1, "Not exactly 1 integration into target") self.assertEqual(filelog[0].revisions[0].integrations[0].how, "copy from", "'How' is not copy from") def testComplexIntegrate(self): self.setupTransfer() # seed the integration content = """ Line 1 Line 2 Line 3 """ content1 = """ Line 1 Line 2 - changed Line 3 """ content2 = """ Line 1 Line 2 Line 3 - changed """ content4 = """ Line 1 Line 2 - changed Line 3 - differs """ content5 = """ Line 1 Line 2 - changed Line 3 - edited """ inside = os.path.join(self.source.client_root, "inside") ensureDirectory(inside) inside_file1 = os.path.join(inside, "inside_file1") with open(inside_file1, 'w') as f: print(content, file=f) self.source.p4.run_add(inside_file1) self.source.p4.run_submit('-d', 'inside_file1 added') inside_file2 = os.path.join(inside, "inside_file2") self.source.p4.run_integrate(inside_file1, inside_file2) self.source.p4.run_submit('-d', 'inside_file1 -> inside_file2') # Prepare merge self.source.p4.run_edit(inside_file1, inside_file2) with open(inside_file1, 'w') as f: print(content1, file=f) with open(inside_file2, 'w') as f: print(content2, file=f) self.source.p4.run_submit('-d', "Changed both contents") # Integrate with merge self.source.p4.run_integrate(inside_file1, inside_file2) self.source.p4.run_resolve('-am') self.source.p4.run_submit('-d', "Merged contents") contentMerged = self.source.p4.run_print(inside_file2)[1] sourceCounter = 4 targetCounter = 4 self.run_P4Transfer() self.assertCounters(sourceCounter, targetCounter) filelog = self.target.p4.run_filelog('//depot/import/inside_file2') self.assertEqual(filelog[0].revisions[0].integrations[0].how, 'merge from', "How is not merge from") self.assertEqual(self.target.p4.run_print('//depot/import/inside_file2')[1], contentMerged, "Content not the same") # Prepare integrate with edit self.source.p4.run_edit(inside_file1, inside_file2) with open(inside_file1, 'w') as f: print(content1, file=f) self.source.p4.run_submit('-d', "Created a conflict") # Integrate with edit self.source.p4.run_integrate(inside_file1, inside_file2) class EditResolve(P4.Resolver): def resolve(self, mergeData): with open(mergeData.result_path, 'w') as f: f.write(content5) return 'ae' self.source.p4.run_resolve(resolver=EditResolve()) self.source.p4.run_submit('-d', "Merge with edit") sourceCounter += 2 targetCounter += 2 self.run_P4Transfer() self.assertCounters(sourceCounter, targetCounter) # Prepare ignore self.source.p4.run_edit(inside_file1) with open(inside_file1, 'a+') as f: print("For your eyes only", file=f) self.source.p4.run_submit('-d', "Edit source again") self.source.p4.run_integrate(inside_file1, inside_file2) self.source.p4.run_resolve('-ay') # ignore self.source.p4.run_submit('-d', "Ignored change in inside_file1") sourceCounter += 2 targetCounter += 2 self.run_P4Transfer() self.assertCounters(sourceCounter, targetCounter) filelog = self.target.p4.run_filelog('//depot/import/inside_file2') self.assertEqual(filelog[0].revisions[0].integrations[0].how, 'ignored', "How is not ignored") content = self.target.p4.run_print('-a', '//depot/import/inside_file2') self.assertEqual(content[1], content[3], "Content of #1 not equal to #2") # Prepare delete self.source.p4.run_delete(inside_file1) self.source.p4.run_submit('-d', "Delete file 1") self.source.p4.run_merge(inside_file1, inside_file2) # to trigger resolve self.source.p4.run_resolve('-at') self.source.p4.run_submit('-d', "Propagated delete") sourceCounter += 2 targetCounter += 2 self.run_P4Transfer() self.assertCounters(sourceCounter, targetCounter) filelog = self.target.p4.run_filelog('//depot/import/inside_file2') self.assertEqual(len(filelog[0].revisions[0].integrations), 1, "No integration for delete") self.assertEqual(filelog[0].revisions[0].integrations[0].how, 'delete from', "How is not delete from") # Prepare re-add with open(inside_file1, 'w') as f: print(content1, file=f) self.source.p4.run_add(inside_file1) self.source.p4.run_submit('-d', 'inside_file1 re-added') self.source.p4.run_integrate(inside_file1, inside_file2) self.source.p4.run_submit('-d', "inside_file2 re-added") sourceCounter += 2 targetCounter += 2 self.run_P4Transfer() self.assertCounters(sourceCounter, targetCounter) filelog = self.target.p4.run_filelog('//depot/import/inside_file2') self.assertEqual(filelog[0].revisions[0].integrations[0].how, 'branch from', "How is not branch from") def testMultipleIntegrate(self): self.setupTransfer() inside = os.path.join(self.source.client_root, "inside") ensureDirectory(inside) inside_file1 = os.path.join(inside, "inside_file1") with open(inside_file1, 'w') as f: print("Some content", file=f) self.source.p4.run_add(inside_file1) self.source.p4.run_submit('-d', 'inside_file1 added') inside_file2 = os.path.join(inside, "inside_file2") self.source.p4.run_integrate(inside_file1, inside_file2) self.source.p4.run_submit('-d', 'inside_file1 -> inside_file2') file3 = os.path.join(inside, "file3") self.source.p4.run_integrate(inside_file2, file3) self.source.p4.run_submit('-d', 'inside_file2 -> File3') self.run_P4Transfer() self.assertCounters(3, 3) filelog1 = self.target.p4.run_filelog('//depot/import/inside_file1') filelog2 = self.target.p4.run_filelog('//depot/import/inside_file2') filelog3 = self.target.p4.run_filelog('//depot/import/file3') self.assertEqual(len(filelog1[0].revisions), 1, "Not exactly one inside_file1 revision") self.assertEqual(len(filelog2[0].revisions), 1, "Not exactly one inside_file2 revision") self.assertEqual(len(filelog3[0].revisions), 1, "Not exactly one file3 revision") self.assertEqual(len(filelog1[0].revisions[0].integrations), 1, "Not exactly one inside_file1 integ record") self.assertEqual(len(filelog2[0].revisions[0].integrations), 2, "Not exactly two inside_file2 integ records") self.assertEqual(len(filelog3[0].revisions[0].integrations), 1, "Not exactly one file3 integ record") def testInsideOutside(self): self.setupTransfer() inside = os.path.join(self.source.client_root, "inside") ensureDirectory(inside) outside = os.path.join(self.source.client_root, "outside") ensureDirectory(outside) # add from outside, integrate in outside_file = os.path.join(outside, 'outside_file') with open(outside_file, 'w') as f: print("Some content", file=f) self.source.p4.run_add(outside_file) self.source.p4.run_submit('-d', "Outside outside_file") inside_file2 = os.path.join(inside, 'inside_file2') self.source.p4.run_integrate(outside_file, inside_file2) self.source.p4.run_submit('-d', "Integrated from outside to inside") self.run_P4Transfer() self.assertCounters(2, 1) changes = self.target.p4.run_changes() self.assertEqual(len(changes), 1, "Not exactly one change on target") filelog = self.target.p4.run_filelog('//depot/import/inside_file2') self.assertEqual(filelog[0].revisions[0].action, "add", "inside_file2 action is not add") # edit from outside, integrated in self.source.p4.run_edit(outside_file) with open(outside_file, 'a+') as f: print("More content", file=f) self.source.p4.run_submit('-d', "Outside outside_file edited") self.run_P4Transfer() self.assertCounters(2, 1) # counters will not move, no change within the client workspace's scope self.source.p4.run_integrate(outside_file, inside_file2) self.source.p4.run_resolve('-at') self.source.p4.run_submit('-d', "Copied outside_file -> inside_file2") self.run_P4Transfer() self.assertCounters(4, 2) changes = self.target.p4.run_changes() self.assertEqual(len(changes), 2, "Not exactly two changes on target") filelog = self.target.p4.run_filelog('//depot/import/inside_file2') self.assertEqual(filelog[0].revisions[0].action, "edit", "inside_file2 action is not edit") # delete from outside, integrate in self.source.p4.run_delete(outside_file) self.source.p4.run_submit('-d', "outside_file deleted") self.source.p4.run_integrate(outside_file, inside_file2) self.source.p4.run_submit('-d', "inside_file2 deleted from outside_file") self.run_P4Transfer() self.assertCounters(6, 3) changes = self.target.p4.run_changes() self.assertEqual(len(changes), 3, "Not exactly three changes on target") filelog = self.target.p4.run_filelog('//depot/import/inside_file2') self.assertEqual(filelog[0].revisions[0].action, "delete", "inside_file2 action is not delete") def testObliteratedSource(self): "File has been integrated and then source obliterated" self.setupTransfer() inside = os.path.join(self.source.client_root, "inside") ensureDirectory(inside) outside = os.path.join(self.source.client_root, "outside") ensureDirectory(outside) # add from outside, integrate in outside_file1 = os.path.join(outside, 'outside_file1') with open(outside_file1, 'w') as f: print("Some content", file=f) self.source.p4.run_add(outside_file1) self.source.p4.run_submit('-d', "Outside outside_file1") inside_file2 = os.path.join(inside, 'inside_file2') self.source.p4.run_integrate(outside_file1, inside_file2) self.source.p4.run_submit('-d', "Integrated from outside to inside") self.source.p4.run_obliterate('-y', outside_file1) self.run_P4Transfer('--stoponerror') self.assertCounters(2, 1) def testKeywords(self): self.setupTransfer() inside = os.path.join(self.source.client_root, "inside") ensureDirectory(inside) files = [] for f in range(1,5): fname = "file{}".format(f) files.append(os.path.join(inside, fname)) for fname in files: with open(fname, 'w') as f: f.write('Test content') self.source.p4.run_add(fname) self.source.p4.run_reopen("-t", "ktext", files[0]) self.source.p4.run_reopen("-t", "kxtext", files[1]) self.source.p4.run_reopen("-t", "text+kmx", files[2]) self.source.p4.run_reopen("-t", "ktext+xkm", files[3]) self.source.p4.run_submit('-d', 'File(s) added') self.run_P4Transfer("--nokeywords") newFiles = self.target.p4.run_files("//...") self.assertEqual(len(newFiles), 4, "Target does not have exactly 4 files") self.assertEqual(newFiles[0]['type'], "text", "File1 does not have type text, but {}".format(newFiles[0]['type'])) self.assertEqual(newFiles[1]['type'], "xtext", "File2 does not have type xtext, but {}".format(newFiles[1]['type'])) self.assertEqual(newFiles[2]['type'], "text+mx", "File3 does not have type text+mx, but {}".format(newFiles[2]['type'])) self.assertEqual(newFiles[3]['type'], "text+mx", "File4 does not have type text+xm, but {}".format(newFiles[3]['type'])) fname = files[0] self.source.p4.run_edit("-t", "+k", fname) with open(fname,"a") as f: f.write("More stuff") self.source.p4.run_submit('-d', 'File edited') self.run_P4Transfer("--nokeywords") files = self.target.p4.run_files("//depot/import/file1") self.assertEqual(files[0]['type'], "text", "File1 does not have type text, but {}".format(files[0]['type'])) if __name__ == '__main__': unittest.main()