''' Created on Dec 2, 2017 @author: Charlie McLouth ''' import unittest from p4rest.p4 import P4 from P4Server import P4TestcaseShared import os import logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) debugHandler = logging.StreamHandler() debugHandler.setFormatter(logging.Formatter("%(levelname)s:%(filename)s:" "%(lineno)d:%(funcName)s:" "%(message)s")) logger.addHandler(debugHandler) class TestP4(P4TestcaseShared): def initTestData(self, _p4api): # create 10 of every object p4api = P4() p4api.port = _p4api.port p4api.user = _p4api.user p4api.client = _p4api.client p4api.exception_level = P4.RAISE_ERROR p4api.connect() for plural in ["users", "depots", "streams", "clients", "repos", "labels", "branches", "changes", "jobs", "groups", "ldaps", "remotes", "servers"]: single, key = p4api.specfields.get(plural) for i in range(0,10): args = ["{}_{!s}".format(single, i)] p4api.user = "user_{!s}".format(i) if single == "depot": if i == 0: args = ["-t" "stream", "{}_{!s}".format(single, i)] elif i == 1: args = ["-t" "graph", "{}_{!s}".format(single, i)] elif single == "change": args = [] p4api.client = "client_{!s}".format(i) elif single == "user": args.clear() elif single == "stream": args = ["-t" "mainline", "//depot_0/{}{}".format(single, i)] elif single == "repo": args = ["//depot_1/{}{}".format(single, i)] func = getattr(p4api, "fetch_{}".format(single)) spec = func(args) args = [spec] if single == "branch": spec["View"][0] = "//depot_2/a/... //depot_2/b/..." elif single == "client": spec["Root"] = self.p4server.clientroot options = spec["Options"].split() options[-1] = "rmdir" spec["Options"] = " ".join(options) elif single in ["change", "job"]: spec["Description"] = "description {!s}".format(i) elif single == "ldap": spec["SimplePattern"] = "%user%" elif single == "group": spec["Owners"] = [p4api.user] spec["Users"] = [p4api.user] func = getattr(p4api, "save_{}".format(single)) spec = func(args) logger.debug("{}:({},{})".format(plural, single, key)) p4api.user = _p4api.user p4api.client = _p4api.client for i in range(0,10): p4api.user = "user_{!s}".format(i) p4api.client = "client_{!s}".format(i) spec = p4api.fetch_change() spec["Description"] = "submitted change" changeno = p4api.save_change(spec)[0].split(maxsplit=2)[1] for ii in range(0,10): filename = p4api.run_where("//depot_2/{!s}/file{!s}.txt".format(i, ii))[0]["path"] if not os.path.isdir(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) with open(filename, mode="a") as f: f.write("a line {!s}".format(ii)) p4api.run_add("-c", changeno, filename) for line in p4api.run_submit("-c", changeno): if isinstance(line, dict) and "submittedChange" in line: changeno = line["submittedChange"] break spec = p4api.fetch_change() spec["Description"] = "shelved change" changeno = p4api.save_change(spec)[0].split(maxsplit=2)[1] p4api.run_edit("-c", changeno, "//depot_2/{!s}/...".format(i)) p4api.run_shelve("-r", "-c", changeno) spec = p4api.fetch_change() spec["Description"] = "pending change" changeno = p4api.save_change(spec)[0].split(maxsplit=2)[1] # p4api.run_reopen("-c", changeno, "//depot_2/{!s}/...".format(i)) p4api.run_revert("//...") p4api.disconnect() def assertLimitAndNext(self, testname, warnings, results, limit, offset, currenttotal, expectedtotal): '''assert the current values and advance to next offset''' if not isinstance(results, list): results = [results] logger.debug(testname) logger.debug("len(warnings):{!s}".format(len(warnings))) logger.debug("len(results):{!s}".format(len(results))) logger.debug("limit):{!s}".format(limit)) logger.debug("offset):{!s}".format(offset)) logger.debug("currenttotal):{!s}".format(currenttotal)) logger.debug("expectedtotal):{!s}".format(expectedtotal)) self.assertGreaterEqual(limit, len(results), testname) if len(results) < 1: self.assertEqual(2, len(warnings), testname) self.assertEqual(expectedtotal, warnings[-1]["count"], testname) self.assertEqual(offset, warnings[-1]["offset"], testname) newtotal = currenttotal + len(results) logger.debug("newtotal):{!s}".format(newtotal)) self.assertGreaterEqual(expectedtotal, newtotal, testname) if newtotal < expectedtotal: self.assertEqual(limit, len(results), testname) self.assertEqual(2, len(warnings), testname) self.assertEqual(limit, len(results), testname) # advance offset newoffset = warnings[-1]["next_offset"] self.assertGreaterEqual(expectedtotal, newoffset, testname) elif len(results) > 0: self.assertEqual(expectedtotal, newtotal, testname) self.assertEqual(0, len(warnings), testname) # advance offset newoffset = offset + len(results) self.assertEqual(expectedtotal, newoffset, testname) else: # advance offset out of range newoffset = offset + 1 self.assertLessEqual(expectedtotal, newoffset, testname) logger.debug("newoffset):{!s}".format(newoffset)) return newtotal, newoffset def testassert(self): testname = "testassert" data = ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"] # fits in limit results = list(data) limit = len(data) warnings = [] currentoffset = 0 currenttotal = 0 expectedtotal = len(data) (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) self.assertEqual(expectedtotal, currenttotal, testname) self.assertEqual(expectedtotal, currentoffset, testname) warnings = ["", {"count": expectedtotal, "offset": currentoffset}] results = [] (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) self.assertGreater(currentoffset, expectedtotal, testname) # even across limits results = data[0:5] limit = 5 warnings = ["", {"next_offset": limit}] currentoffset = 0 currenttotal = 0 expectedtotal = len(data) (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) self.assertEqual(limit, currenttotal, testname) self.assertEqual(limit, currentoffset, testname) results = data[5:] warnings = [] (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) self.assertEqual(expectedtotal, currenttotal, testname) self.assertEqual(expectedtotal, currentoffset, testname) warnings = ["", {"count": expectedtotal, "offset": currentoffset}] results = [] (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) self.assertGreater(currentoffset, expectedtotal, testname) # odd across limits results = data[0:4] limit = 4 warnings = ["", {"next_offset": limit}] currentoffset = 0 currenttotal = 0 expectedtotal = len(data) (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) self.assertEqual(limit, currenttotal, testname) self.assertEqual(limit, currentoffset, testname) results = data[4:8] warnings = ["", {"next_offset": 8}] (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) self.assertEqual(8, currenttotal, testname) self.assertEqual(8, currentoffset, testname) results = data[8:] warnings = [] (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) self.assertEqual(expectedtotal, currenttotal, testname) self.assertEqual(expectedtotal, currentoffset, testname) warnings = ["", {"count": expectedtotal, "offset": currentoffset}] results = [] (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) self.assertGreater(currentoffset, expectedtotal, testname) # odd across limits results = data[0:3] limit = 3 warnings = ["", {"next_offset": limit}] currentoffset = 0 currenttotal = 0 expectedtotal = len(data) (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) self.assertEqual(limit, currenttotal, testname) self.assertEqual(limit, currentoffset, testname) results = data[3:6] warnings = ["", {"next_offset": 6}] (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) self.assertEqual(6, currenttotal, testname) self.assertEqual(6, currentoffset, testname) results = data[6:9] warnings = ["", {"next_offset": 9}] (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) self.assertEqual(9, currenttotal, testname) self.assertEqual(9, currentoffset, testname) results = data[9:] warnings = [] (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) self.assertEqual(expectedtotal, currenttotal, testname) self.assertEqual(expectedtotal, currentoffset, testname) warnings = ["", {"count": expectedtotal, "offset": currentoffset}] results = [] (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) self.assertGreater(currentoffset, expectedtotal, testname) # all possibilities expectedtotal = len(data) for limit in range(1, expectedtotal + 1): currentoffset = 0 currenttotal = 0 expectedCounter = expectedtotal // limit + 1 if expectedtotal % limit > 0: expectedCounter = expectedCounter + 1 self.assertGreaterEqual(expectedCounter, 2, testname) actualCounter = 0 while currentoffset <= expectedtotal: if currentoffset == expectedtotal: results = [] warnings = ["", {"count": expectedtotal, "offset": currentoffset}] elif currentoffset + limit >= expectedtotal: results = data[currentoffset:] warnings = [] else: results = data[currentoffset:currentoffset + limit] warnings = ["", {"next_offset": currentoffset + limit}] (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) actualCounter = actualCounter + 1 self.assertEqual(expectedCounter, actualCounter, testname) def testLimits(self): testname = "testLimits" logger.debug(testname) # first pass with no limits p4api = P4() p4api.port = self.p4server.p4api.port p4api.user = "user_0" p4api.client = "client_0" p4api.exception_level = P4.RAISE_ERROR p4api.connect() p4api.run_sync("//...") results = p4api.run_have() expectedtotal = len(results) self.assertEqual(100, expectedtotal) self.assertEqual(0, len(p4api.warnings)) # limit on the command for limit in [1, 2, 3, 5, 7, 11]: currentoffset = 0 currenttotal = 0 expectedCounter = expectedtotal // limit + 1 if expectedtotal % limit > 0: expectedCounter = expectedCounter + 1 self.assertGreaterEqual(expectedCounter, 2, testname) actualCounter = 0 while currentoffset <= expectedtotal: if currentoffset < 1: results = p4api.run_have(limit=limit) else: results = p4api.run_have(limit=limit, offset=currentoffset) warnings = p4api.warnings (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) actualCounter = actualCounter + 1 self.assertEqual(expectedCounter, actualCounter, testname) # confirm we ignore limits on sync p4files = p4api.run_sync("//...#0", limit=limit) self.assertGreater(len(p4files), limit) p4files = p4api.run_sync("//...", limit=limit) self.assertGreater(len(p4files), limit) p4api.disconnect() del(p4api) # limit on the object for limit in [1, 2, 3, 5, 7, 11]: p4api = P4(limit=limit) p4api.port = self.p4server.p4api.port p4api.user = "user_0" p4api.client = "client_0" p4api.exception_level = P4.RAISE_ERROR p4api.connect() p4api.run_sync("//...") currentoffset = 0 currenttotal = 0 expectedCounter = expectedtotal // limit + 1 if expectedtotal % limit > 0: expectedCounter = expectedCounter + 1 self.assertGreaterEqual(expectedCounter, 2, testname) actualCounter = 0 while currentoffset <= expectedtotal: if currentoffset < 1: results = p4api.run_have() else: results = p4api.run_have(offset=currentoffset) warnings = p4api.warnings (currenttotal, currentoffset) = self.assertLimitAndNext(testname, warnings, results, limit, currentoffset, currenttotal, expectedtotal) actualCounter = actualCounter + 1 self.assertEqual(expectedCounter, actualCounter, testname) # confirm we ignore limits on sync p4files = p4api.run_sync("//...#0") self.assertGreater(len(p4files), limit) p4files = p4api.run_sync("//...") self.assertGreater(len(p4files), limit) p4api.disconnect() del(p4api) # cleanup p4api = P4() p4api.port = self.p4server.p4api.port p4api.user = "user_0" p4api.client = "client_0" p4api.exception_level = P4.RAISE_ERROR p4api.connect() p4api.run_sync("//...#0") p4api.disconnect() del(p4api) def testIterator(self): testname = "testIterator" logger.debug(testname) # first pass with no limits p4api = P4() p4api.port = self.p4server.p4api.port p4api.user = "user_0" p4api.client = "client_0" p4api.exception_level = P4.RAISE_ERROR p4api.connect() # first pass with no limits expectedMap = {"branches": 10, "changes": 40, "clients": 10, "depots": 12, "groups": 10, "jobs": 10, "labels": 10, "ldaps": 10, "remotes": 10, "repos": 10, "servers": 10, "streams": 10, "users": 11 } plurals = list(p4api.specfields.keys()) plurals.sort() for plural in plurals: func = getattr(p4api, "run_{}".format(plural)) p4results = func() actualcount = len(p4results) self.assertEqual(expectedMap[plural], actualcount, plural) self.assertEqual(0, len(p4api.warnings), plural) # limit on the command for limit in [1, 2, 3, 5, 7, 11]: for plural in plurals: func = getattr(p4api, "iterate_{}".format(plural)) testinfo = "{}-{}".format(testname, plural) currentoffset = 0 currenttotal = 0 expectedCounter = expectedMap[plural] // limit + 1 if expectedMap[plural] % limit > 0: expectedCounter = expectedCounter + 1 self.assertGreaterEqual(expectedCounter, 2, testname) actualCounter = 0 while currentoffset <= expectedMap[plural]: results = [] if currentoffset < 1: for spec in func(limit=limit): results.append(spec) else: for spec in func(limit=limit, offset=currentoffset): results.append(spec) warnings = p4api.warnings (currenttotal, currentoffset) = self.assertLimitAndNext(testinfo, warnings, results, limit, currentoffset, currenttotal, expectedMap[plural]) actualCounter = actualCounter + 1 self.assertEqual(expectedCounter, actualCounter, testname) p4api.disconnect() del(p4api) # limit on the object for limit in [1, 2, 3, 5, 7, 11]: p4api = P4(limit=limit) p4api.port = self.p4server.p4api.port p4api.user = "user_0" p4api.client = "client_0" p4api.exception_level = P4.RAISE_ERROR p4api.connect() for plural in plurals: testinfo = "{}-{}".format(testname, plural) func = getattr(p4api, "iterate_{}".format(plural)) currentoffset = 0 currenttotal = 0 expectedCounter = expectedMap[plural] // limit + 1 if expectedMap[plural] % limit > 0: expectedCounter = expectedCounter + 1 self.assertGreaterEqual(expectedCounter, 2, testname) actualCounter = 0 while currentoffset <= expectedMap[plural]: results = [] if currentoffset < 1: for spec in func(): results.append(spec) else: for spec in func(offset=currentoffset): results.append(spec) warnings = p4api.warnings (currenttotal, currentoffset) = self.assertLimitAndNext(testinfo, warnings, results, limit, currentoffset, currenttotal, expectedMap[plural]) actualCounter = actualCounter + 1 self.assertEqual(expectedCounter, actualCounter, testname) p4api.disconnect() del(p4api) def testMLimits(self): testname = "testMLimits" logger.debug(testname) # first pass with no limits p4api = P4() p4api.port = self.p4server.p4api.port p4api.user = "user_0" p4api.client = "client_0" p4api.exception_level = P4.RAISE_ERROR p4api.connect() # first pass with no limits expectedMap = {"branches": 10, "changes": 40, "clients": 10, "depots": 12, "groups": 10, "jobs": 10, "labels": 10, "ldaps": 10, "remotes": 10, "repos": 10, "servers": 10, "streams": 10, "users": 11 } plurals = list(p4api.specfields.keys()) plurals.sort() for plural in plurals: func = getattr(p4api, "run_{}".format(plural)) p4results = func() actualcount = len(p4results) self.assertEqual(expectedMap[plural], actualcount, plural) self.assertEqual(0, len(p4api.warnings), plural) # limit on the command for limit in [1, 2, 3, 5, 7, 11]: for plural in plurals: func = getattr(p4api, "run_{}".format(plural)) testinfo = "{}-{}".format(testname, plural) currentoffset = 0 currenttotal = 0 expectedCounter = expectedMap[plural] // limit + 1 if expectedMap[plural] % limit > 0: expectedCounter = expectedCounter + 1 self.assertGreaterEqual(expectedCounter, 2, testname) actualCounter = 0 while currentoffset <= expectedMap[plural]: results = [] if currentoffset < 1: results = func(limit=limit) else: results = func(limit=limit, offset=currentoffset) warnings = p4api.warnings (currenttotal, currentoffset) = self.assertLimitAndNext(testinfo, warnings, results, limit, currentoffset, currenttotal, expectedMap[plural]) actualCounter = actualCounter + 1 self.assertEqual(expectedCounter, actualCounter, testname) p4api.disconnect() del(p4api) # limit on the object for limit in [1, 2, 3, 5, 7, 11]: p4api = P4(limit=limit) p4api.port = self.p4server.p4api.port p4api.user = "user_0" p4api.client = "client_0" p4api.exception_level = P4.RAISE_ERROR p4api.connect() for plural in plurals: testinfo = "{}-{}".format(testname, plural) func = getattr(p4api, "run_{}".format(plural)) currentoffset = 0 currenttotal = 0 expectedCounter = expectedMap[plural] // limit + 1 if expectedMap[plural] % limit > 0: expectedCounter = expectedCounter + 1 self.assertGreaterEqual(expectedCounter, 2, testname) actualCounter = 0 while currentoffset <= expectedMap[plural]: results = [] if currentoffset < 1: results = func() else: results = func(offset=currentoffset) warnings = p4api.warnings (currenttotal, currentoffset) = self.assertLimitAndNext(testinfo, warnings, results, limit, currentoffset, currenttotal, expectedMap[plural]) actualCounter = actualCounter + 1 self.assertEqual(expectedCounter, actualCounter, testname) p4api.disconnect() del(p4api) if __name__ == "__main__": #import sys;sys.argv = ['', 'Test.testName'] unittest.main()