''' Created on Dec 28, 2017 @author: Charlie McLouth ''' import unittest from P4Server import P4TestcaseShared import os.path from p4rest.util import osenviron, mkdtemp, rmtree from p4rest.flask.application import create_app, APP_CONFIG_KEY from p4rest.session import SESSION_HEADER_NAME from werkzeug.datastructures import Headers from base64 import b64encode from flask import current_app, json import logging logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) debugHandler = logging.StreamHandler() debugHandler.setFormatter(logging.Formatter("%(levelname)s:%(filename)s:" "%(lineno)d:%(funcName)s:" "%(message)s")) logger.addHandler(debugHandler) class TestWWWCommands(P4TestcaseShared): '''Test P4 environment within the P4RestSessionP4''' def initTestData(self, _p4api): _p4api.run_password("", "Perf0rce") _p4api.run_configure("set", "security=3") _p4api.run_login(password="Perf0rce") name = "p4testadmin" spec = _p4api.fetch_user(name) _p4api.save_user(spec, "-f") _p4api.run_password("", "Perf0rce", name) _p4api.delete_depot("repo") _p4api.delete_depot("depot") def setUp(self): self.orgP4CONFIG = osenviron.get("P4CONFIG", None) self.orgP4TICKETS = osenviron.get("P4TICKETS", None) self.orgP4TRUST = osenviron.get("P4TRUST", None) self.testdir = os.path.abspath(".") if not os.path.isdir(self.testdir): raise Exception("Test directory ({}) does not exist." "".format(self.testdir)) self.testdir = mkdtemp(suffix=None, prefix=__name__, dir=self.testdir) self.configKey = "TestWWWCommandsConfig_key" configdefaults = {self.configKey: os.path.join(self.testdir, "test.json"), "WORKDIR": os.path.join(self.testdir, "workdir"), APP_CONFIG_KEY: self.configKey, "DEBUG": True} self.wwwapp = create_app(configdefaults) self.wwwapp.testing = True self.workdir = self.wwwapp.config["WORKDIR"] self.p4tickets = self.wwwapp.config["P4TICKETS"] P4TestcaseShared.setUp(self) def tearDown(self): if os.path.isdir(self.testdir): rmtree(self.testdir) else: logger.warn('not a directory?:' + self.testdir) if self.orgP4CONFIG is not None: osenviron["P4CONFIG"] = self.orgP4CONFIG elif "P4CONFIG" in osenviron: del(osenviron["P4CONFIG"]) if self.orgP4TICKETS is not None: osenviron["P4TICKETS"] = self.orgP4TICKETS elif "P4TICKETS" in osenviron: del(osenviron["P4TICKETS"]) if self.orgP4TRUST is not None: osenviron["P4TRUST"] = self.orgP4TRUST elif "P4CONFIG" in osenviron: del(osenviron["P4CONFIG"]) def __genAuthHeader(self, user, password): data = b"" if user is not None and len(user) > 0: data = user.encode() data = data + b":" if password is not None and len(password) > 0: data = data + password.encode() return b"Basic " + b64encode(data) def createSession(self, P4PORT, P4USER, P4PASSWORD): command = "createSession" with self.wwwapp.app_context(): with current_app.test_client() as client: headers = Headers() headers.add('Authorization', self.__genAuthHeader(P4USER, P4PASSWORD)) headers.add("Content-Type", "application/json") data = {"P4PORT": P4PORT} json_data = json.dumps(data) response = client.post(path="/sessions/", data=json_data, headers=headers) self.assertEqual(200, response.status_code, command) self.assertEqual("application/json", response.headers.get("Content-Type"), command) response_data = json.loads(response.get_data()) self.assertIsInstance(response_data, dict, command) self.assertEqual(P4USER, response_data["userName"], command) sessionId = response_data.get(SESSION_HEADER_NAME, None) self.assertIsNotNone(sessionId, command) return sessionId def deleteSession(self, sessionId): requestPath = "/sessions/{}".format(sessionId) self.wwwrequest("DELETE", sessionId, requestPath, None, None, 204) def wwwrequest(self, method, sessionId, path, data, headers, httpstatuscode): # clone headers requestHeaders = Headers(headers) # add the sessionId requestHeaders.add('Authorization', "{}={}".format(SESSION_HEADER_NAME, sessionId)) # convert data to json json_data = None if data is not None: json_data = json.dumps(data) # set the header that we have json data requestHeaders.add("Content-Type", "application/json") # issue the request with self.wwwapp.app_context(): with current_app.test_client() as client: if method.upper() == "POST": response = client.post(path=path, data=json_data, headers=requestHeaders) elif method.upper() == "GET": response = client.get(path=path, data=json_data, headers=requestHeaders) elif method.upper() == "PUT": response = client.put(path=path, data=json_data, headers=requestHeaders) elif method.upper() == "DELETE": response = client.delete(path=path, data=json_data, headers=requestHeaders) self.assertEqual(httpstatuscode, response.status_code, response.get_data()) return response def assert_limit_and_next(self, testname, response, limit, offset, currenttotal, expectedtotal): '''assert the current values and advance to next offset''' results = [] meta = None warnings = None errors = None if isinstance(response, list): results = response logger.debug(results) elif isinstance(response, (str, int)): results = [response] logger.debug(results) elif isinstance(response, dict): if "results" in response and response["results"] is not None: if isinstance(response["results"], list): results = response["results"] else: results = [response["results"]] if "meta" in response and response["meta"] is not None: meta = response["meta"] if "warnings" in response and response["warnings"] is not None: warnings = response["warnings"] if "errors" in response and response["errors"] is not None: errors = response["errors"] if "results" not in response \ and "meta" not in response \ and "warnings" not in response \ and "errors" not in response: results = [response] logger.debug(testname) #a if condition else b logger.debug("len(warnings):{!s}".format(len(warnings) if warnings else None)) logger.debug("len(results):{!s}".format(len(results) if results else None)) logger.debug("len(meta):{!s}".format(len(meta) if meta else None)) logger.debug("len(errors):{!s}".format(len(errors) if errors else None)) logger.debug("limit):{!s}".format(limit)) logger.debug("offset):{!s}".format(offset)) logger.debug("currenttotal):{!s}".format(currenttotal)) logger.debug("expectedtotal):{!s}".format(expectedtotal)) self.assertGreaterEqual(limit, len(results), testname) if len(results) < 1: self.assertEqual(2, len(warnings), testname) self.assertEqual(expectedtotal, warnings[-1]["count"], testname) self.assertEqual(offset, warnings[-1]["offset"], testname) newtotal = currenttotal + len(results) logger.debug("newtotal):{!s}".format(newtotal)) if newtotal > expectedtotal: logger.debug(results) logger.debug(response) self.assertGreaterEqual(expectedtotal, newtotal, testname) if newtotal < expectedtotal: self.assertEqual(limit, len(results), testname) self.assertEqual(2, len(meta), testname) self.assertEqual(limit, len(results), testname) self.assertTrue(meta["more_results"], testname) # advance offset newoffset = meta["next_offset"] self.assertGreaterEqual(expectedtotal, newoffset, testname) elif len(results) > 0: self.assertEqual(expectedtotal, newtotal, testname) self.assertIsNone(warnings, testname) # advance offset newoffset = offset + len(results) self.assertEqual(expectedtotal, newoffset, testname) else: # advance offset out of range newoffset = offset + 1 self.assertLessEqual(expectedtotal, newoffset, testname) logger.debug("newoffset):{!s}".format(newoffset)) return newtotal, newoffset def assert_fetch_user(self, response, name): command = "fetch_user" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, dict, command) self.assertIn("User", responseData, command) self.assertEqual(name, responseData["User"], command) # TODO: assert types return responseData def assert_save_user(self, response, name): command = "save_user" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(2, len(responseData), command) self.assertIsInstance(responseData[0], str, command) self.assertIsInstance(responseData[1], dict, command) self.assertEqual("User {} saved.".format(name), responseData[0], command) # TODO: assert types return responseData def assert_fetch_depot(self, response, name, depotType): command = "fetch_depot" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, dict, command) self.assertIn("Depot", responseData, command) self.assertEqual(name, responseData["Depot"], command) self.assertEqual(depotType, responseData["Type"], command) # TODO: assert types return responseData def assert_save_depot(self, response, name, depotType): command = "save_depot" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(2, len(responseData), command) self.assertIsInstance(responseData[0], str, command) self.assertIsInstance(responseData[1], dict, command) self.assertEqual("Depot {} saved.".format(name), responseData[0], command) self.assertEqual(depotType, responseData[1]["Type"], command) # TODO: assert types return responseData def assert_fetch_group(self, response, name): command = "fetch_group" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, dict, command) self.assertIn("Group", responseData, command) self.assertEqual(name, responseData["Group"], command) # TODO: assert types return responseData def assert_save_group(self, response, name): command = "save_group" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(2, len(responseData), command) self.assertIsInstance(responseData[0], str, command) self.assertIsInstance(responseData[1], dict, command) self.assertEqual("Group {} created.".format(name), responseData[0], command) # TODO: assert types return responseData def assert_fetch_server(self, response, name): command = "fetch_server" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, dict, command) self.assertIn("ServerID", responseData, command) self.assertEqual(name, responseData["ServerID"], command) # TODO: assert types return responseData def assert_save_server(self, response, name): command = "save_server" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(2, len(responseData), command) self.assertIsInstance(responseData[0], str, command) self.assertIsInstance(responseData[1], dict, command) self.assertEqual("Server {} saved.".format(name), responseData[0], command) # TODO: assert types return responseData def assert_fetch_remote(self, response, name): command = "fetch_remote" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, dict, command) self.assertIn("RemoteID", responseData, command) self.assertEqual(name, responseData["RemoteID"], command) # TODO: assert types return responseData def assert_save_remote(self, response, name): command = "save_remote" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(2, len(responseData), command) self.assertIsInstance(responseData[0], str, command) self.assertIsInstance(responseData[1], dict, command) self.assertEqual("Remote {} saved.".format(name), responseData[0], command) # TODO: assert types return responseData def assert_fetch_ldap(self, response, name): command = "fetch_ldap" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, dict, command) self.assertIn("Name", responseData, command) self.assertEqual(name, responseData["Name"], command) # TODO: assert types return responseData def assert_save_ldap(self, response, name): command = "save_ldap" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(2, len(responseData), command) self.assertIsInstance(responseData[0], str, command) self.assertIsInstance(responseData[1], dict, command) self.assertEqual("LDAP configuration {} saved.".format(name), responseData[0], command) # TODO: assert types return responseData def assert_fetch_job(self, response, name): command = "fetch_job" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, dict, command) self.assertIn("Job", responseData, command) self.assertEqual(name, responseData["Job"], command) # TODO: assert types return responseData def assert_save_job(self, response, name): command = "save_job" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(2, len(responseData), command) self.assertIsInstance(responseData[0], str, command) self.assertIsInstance(responseData[1], dict, command) self.assertEqual("Job {} saved.".format(name), responseData[0], command) # TODO: assert types return responseData def assert_fetch_branch(self, response, name): command = "fetch_branch" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, dict, command) self.assertIn("Branch", responseData, command) self.assertEqual(name, responseData["Branch"], command) # TODO: assert types return responseData def assert_save_branch(self, response, name): command = "save_branch" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(2, len(responseData), command) self.assertIsInstance(responseData[0], str, command) self.assertIsInstance(responseData[1], dict, command) self.assertEqual("Branch {} saved.".format(name), responseData[0], command) # TODO: assert types return responseData def assert_fetch_label(self, response, name): command = "fetch_label" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, dict, command) self.assertIn("Label", responseData, command) self.assertEqual(name, responseData["Label"], command) # TODO: assert types return responseData def assert_save_label(self, response, name): command = "save_label" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(2, len(responseData), command) self.assertIsInstance(responseData[0], str, command) self.assertIsInstance(responseData[1], dict, command) self.assertEqual("Label {} saved.".format(name), responseData[0], command) # TODO: assert types return responseData def assert_fetch_repo(self, response, name): command = "fetch_repo" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, dict, command) self.assertIn("Repo", responseData, command) self.assertEqual(name, responseData["Repo"], command) # TODO: assert types return responseData def assert_save_repo(self, response, name): command = "save_repo" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(2, len(responseData), command) self.assertIsInstance(responseData[0], str, command) self.assertIsInstance(responseData[1], dict, command) self.assertEqual("Repo {} saved.".format(name), responseData[0], command) # TODO: assert types return responseData def assert_fetch_stream(self, response, name): command = "fetch_stream" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, dict, command) self.assertIn("Stream", responseData, command) self.assertEqual(name, responseData["Stream"], command) # TODO: assert types return responseData def assert_save_stream(self, response, name): command = "save_stream" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(2, len(responseData), command) self.assertIsInstance(responseData[0], str, command) self.assertIsInstance(responseData[1], dict, command) self.assertEqual("Stream {} saved.".format(name), responseData[0], command) # TODO: assert types return responseData def assert_fetch_client(self, response, name): command = "fetch_client" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, dict, command) self.assertIn("Client", responseData, command) self.assertEqual(name, responseData["Client"], command) # TODO: assert types return responseData def assert_save_client(self, response, name): command = "save_client" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(2, len(responseData), command) self.assertIsInstance(responseData[0], str, command) self.assertIsInstance(responseData[1], dict, command) self.assertEqual("Client {} saved.".format(name), responseData[0], command) # TODO: assert types return responseData def assert_fetch_change(self, response, changeno): command = "fetch_change" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, dict, command) self.assertIn("Change", responseData, command) self.assertEqual(changeno, responseData["Change"], command) # TODO: assert types return responseData def assert_save_change(self, response): command = "save_change" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(2, len(responseData), command) self.assertIsInstance(responseData[0], str, command) self.assertIsInstance(responseData[1], dict, command) parts = responseData[0].split() self.assertEqual("Change", parts[0], command) self.assertEqual("created.", parts[2], command) self.assertEqual(parts[1], responseData[1]["Change"], command) # TODO: assert types return responseData def assert_run_where(self, response, filecount=None, depotFileMap=None): command = "run_where" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) if filecount is not None: self.assertEqual(filecount, len(responseData), command) self.assertIsInstance(responseData[0], dict, command) self.assertEqual(3, len(responseData[0]), command) if depotFileMap is not None: for revision in responseData: self.assertIn(revision["depotFile"], depotFileMap, command) # TODO: assert types return responseData def assert_run_add(self, response, changeno, depotFileMap): command = "run_add" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(len(depotFileMap), len(responseData), command) for revision in responseData: self.assertIn(revision["depotFile"], depotFileMap, command) # TODO: assert types return responseData def assert_run_submit(self, response, changeno, depotFileMap): command = "run_submit" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(len(depotFileMap) + 2, len(responseData), command) self.assertEqual(changeno, responseData[0]["change"], command) for revision in responseData[1:len(responseData)-2]: self.assertIn(revision["depotFile"], depotFileMap, command) self.assertIn("submittedChange", responseData[-1], command) # TODO: assert types return responseData def assert_run_sync(self, response, filecount=None, depotFileMap=None): command = "run_sync" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) if filecount is not None: self.assertEqual(filecount, len(responseData), command) if depotFileMap is not None: for revision in responseData: self.assertIn(revision["depotFile"], depotFileMap, command) # TODO: assert types return responseData def assert_run_edit(self, response, changeno, filecount=None, depotFileMap=None): command = "run_edit" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) if filecount is not None: self.assertEqual(filecount, len(responseData), command) if depotFileMap is not None: for revision in responseData: self.assertIn(revision["depotFile"], depotFileMap, command) # TODO: assert types return responseData def assert_run_shelve(self, response, changeno, depotFileMap=None): command = "run_shelve" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) if depotFileMap is not None: for revision in responseData: self.assertIn(revision["depotFile"], depotFileMap, command) # TODO: assert types return responseData def assert_run_revert(self, response, filecount=None, depotFileMap=None): command = "run_revert" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) if filecount is not None: self.assertEqual(filecount, len(responseData), command) if depotFileMap is not None: for revision in responseData: self.assertIn(revision["depotFile"], depotFileMap, command) # TODO: assert types return responseData def assert_run_have(self, response, filecount=None, depotFileMap=None): command = "run_have" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response # list or dict if isinstance(responseData, dict): self.assertIn("results", responseData, command) if filecount is not None: self.assertEqual(filecount, len(responseData["results"]), command) if depotFileMap is not None: for revision in responseData["results"]: self.assertIn(revision["depotFile"], depotFileMap, command) else: self.assertIsInstance(responseData, list, command) if filecount is not None: self.assertEqual(filecount, len(responseData), command) if depotFileMap is not None: for revision in responseData: self.assertIn(revision["depotFile"], depotFileMap, command) # TODO: assert types return responseData def assert_run_info(self, response, **kargs): command = "run_info" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(1, len(responseData), command) self.assertIsInstance(responseData[0], dict, command) for k,v in kargs.items(): self.assertIn(k, responseData[0], command) self.assertEqual(v, responseData[0][k], command) # TODO: assert types return responseData def assert_run_counter(self, response, name): command = "run_counter" self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) # validate response self.assertIsInstance(responseData, list, command) self.assertEqual(1, len(responseData), command) self.assertIn("counter", responseData[0], command) self.assertEqual(name, responseData[0]["counter"], command) # TODO: assert types return responseData def assert_iterate(self, response, name, itemcount=None): command = "iterate_{}".format(name) self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) logger.debug(responseData) # validate response # list or dict if isinstance(responseData, dict): self.assertIn("results", responseData, command) if itemcount is not None: self.assertEqual(itemcount, len(responseData["results"]), command) else: self.assertIsInstance(responseData, list, command) if itemcount is not None: self.assertEqual(itemcount, len(responseData), command) return responseData def rest_increment_counter(self, counterName, sessionId): command = "run_counter" requestPath = "/commands/{}".format(command) requestHeaders = None requestData = ["-i", counterName] response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200) counterValue = int(self.assert_run_counter(response, counterName)[0]["value"]) return counterValue def p4_increment_counter(self, counterName, p4api): counterValue = int(p4api.assert_run_counter("-i", counterName)[0]["value"]) return counterValue def rest_create_users(self, sessionId=None): # testname = self.id().rsplit(sep=".", maxsplit=1)[-1] p4port = self.p4server.p4api.port p4user = "p4testadmin" if sessionId is None or len(sessionId) < 1: superSessionId = self.createSession(p4port, p4user, "Perf0rce") else: superSessionId = sessionId counterName = "rest_create_depots" if self.rest_increment_counter(counterName, superSessionId) < 2: # create users for i in range(0,10): name = "user_{!s}".format(i) command = "fetch_user" requestPath = "/commands/{}?arg={}".format(command, name) requestHeaders = None requestData = None response = self.wwwrequest("GET", superSessionId, requestPath, requestData, requestHeaders, 200) spec = self.assert_fetch_user(response, name) # save the user command = "save_user" requestPath = "/commands/{}".format(command) requestHeaders = None requestData = [spec, "-f"] response = self.wwwrequest("POST", superSessionId, requestPath, requestData, requestHeaders, 200) self.assert_save_user(response, name) # set the password self.p4server.p4api.run_password("", "Perf0rce", name) if sessionId is None or len(sessionId) < 1: self.deleteSession(superSessionId) def rest_create_depots(self, sessionId=None): # testname = self.id().rsplit(sep=".", maxsplit=1)[-1] p4port = self.p4server.p4api.port p4user = "p4testadmin" if sessionId is None or len(sessionId) < 1: superSessionId = self.createSession(p4port, p4user, "Perf0rce") else: superSessionId = sessionId # depends upon users self.rest_create_users(superSessionId) counterName = "create_depots" if self.rest_increment_counter(counterName, superSessionId) < 2: # create depots for i in range(0,10): userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce") name = "depot_{!s}".format(i) command = "fetch_depot" requestHeaders = None requestData = None if i == 0: depotType = "stream" name = depotType elif i == 1: depotType = "graph" name = depotType elif i == 2: depotType = "local" name = depotType elif i == 3: depotType = "spec" name = depotType elif i == 4: depotType = "archive" name = depotType elif i == 4: depotType = "unload" name = depotType else: depotType = "local" requestPath = "/commands/{}?arg=-t&arg={}&arg={}".format(command, depotType, name) response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200) spec = self.assert_fetch_depot(response, name, depotType) # save the depot command = "save_depot" requestPath = "/commands/{}".format(command) requestHeaders = None requestData = [spec] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) self.assert_save_depot(response, name, depotType) self.deleteSession(userSessionId) if sessionId is None or len(sessionId) < 1: self.deleteSession(superSessionId) def rest_create_groups(self, sessionId=None): # testname = self.id().rsplit(sep=".", maxsplit=1)[-1] p4port = self.p4server.p4api.port p4user = "p4testadmin" if sessionId is None or len(sessionId) < 1: superSessionId = self.createSession(p4port, p4user, "Perf0rce") else: superSessionId = sessionId # depends upon users self.rest_create_users(superSessionId) counterName = "create_groups" if self.rest_increment_counter(counterName, superSessionId) < 2: # create groups command = "iterate_users" requestPath = "/commands/{}".format(command) requestHeaders = None requestData = None response = self.wwwrequest("GET", superSessionId, requestPath, requestData, requestHeaders, 200) responseData = json.loads(response.get_data()) self.assertIsInstance(responseData, list, command) userList = [] for userSpec in responseData: if userSpec["User"].startswith("user_"): userList.append(userSpec["User"]) for i in range(0,10): userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce") name = "group_{!s}".format(i) command = "fetch_group" requestPath = "/commands/{}?arg={}".format(command, name) requestHeaders = None requestData = None response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200) spec = self.assert_fetch_group(response, name) spec["Owners"] = ["user_{!s}".format(i)] spec["Users"] = userList # save the group command = "save_group" requestPath = "/commands/{}".format(command) requestHeaders = None requestData = [spec] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_save_group(response, name) self.deleteSession(userSessionId) if sessionId is None or len(sessionId) < 1: self.deleteSession(superSessionId) def rest_create_servers(self, sessionId=None): # testname = self.id().rsplit(sep=".", maxsplit=1)[-1] p4port = self.p4server.p4api.port p4user = "p4testadmin" if sessionId is None or len(sessionId) < 1: superSessionId = self.createSession(p4port, p4user, "Perf0rce") else: superSessionId = sessionId # depends upon users self.rest_create_users(superSessionId) counterName = "create_servers" if self.rest_increment_counter(counterName, superSessionId) < 2: for i in range(0,10): userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce") name = "server_{!s}".format(i) command = "fetch_server" requestPath = "/commands/{}?arg={}".format(command, name) requestHeaders = None requestData = None response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200) spec = self.assert_fetch_server(response, name) # save the server command = "save_server" requestPath = "/commands/{}".format(command) requestHeaders = None requestData = [spec] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) self.assert_save_server(response, name) self.deleteSession(userSessionId) if sessionId is None or len(sessionId) < 1: self.deleteSession(superSessionId) def rest_create_remotes(self, sessionId=None): # testname = self.id().rsplit(sep=".", maxsplit=1)[-1] p4port = self.p4server.p4api.port p4user = "p4testadmin" if sessionId is None or len(sessionId) < 1: superSessionId = self.createSession(p4port, p4user, "Perf0rce") else: superSessionId = sessionId # depends upon users self.rest_create_users(superSessionId) counterName = "create_remotes" if self.rest_increment_counter(counterName, superSessionId) < 2: for i in range(0,10): userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce") name = "remote_{!s}".format(i) command = "fetch_remote" requestPath = "/commands/{}?arg={}".format(command, name) requestHeaders = None requestData = None response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200) spec = self.assert_fetch_remote(response, name) # save the remote command = "save_remote" requestPath = "/commands/{}".format(command) requestHeaders = None requestData = [spec] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) self.assert_save_remote(response, name) self.deleteSession(userSessionId) if sessionId is None or len(sessionId) < 1: self.deleteSession(superSessionId) def rest_create_ldaps(self, sessionId=None): # testname = self.id().rsplit(sep=".", maxsplit=1)[-1] p4port = self.p4server.p4api.port p4user = "p4testadmin" if sessionId is None or len(sessionId) < 1: superSessionId = self.createSession(p4port, p4user, "Perf0rce") else: superSessionId = sessionId # depends upon users self.rest_create_users(superSessionId) counterName = "create_ldaps" if self.rest_increment_counter(counterName, superSessionId) < 2: for i in range(0,10): userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce") name = "ldap_{!s}".format(i) command = "fetch_ldap" requestPath = "/commands/{}?arg={}".format(command, name) requestHeaders = None requestData = None response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200) spec = self.assert_fetch_ldap(response, name) spec["SimplePattern"] = "%user%" # save the LDAP command = "save_ldap" requestPath = "/commands/{}".format(command) requestHeaders = None requestData = [spec] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) self.assert_save_ldap(response, name) self.deleteSession(userSessionId) if sessionId is None or len(sessionId) < 1: self.deleteSession(superSessionId) def rest_create_jobs(self, sessionId=None): # testname = self.id().rsplit(sep=".", maxsplit=1)[-1] p4port = self.p4server.p4api.port p4user = "p4testadmin" if sessionId is None or len(sessionId) < 1: superSessionId = self.createSession(p4port, p4user, "Perf0rce") else: superSessionId = sessionId # depends upon users self.rest_create_users(superSessionId) counterName = "create_jobs" if self.rest_increment_counter(counterName, superSessionId) < 2: for i in range(0,10): userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce") name = "job_{!s}".format(i) command = "fetch_job" requestPath = "/commands/{}?arg={}".format(command, name) requestHeaders = None requestData = None response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200) spec = self.assert_fetch_job(response, name) spec["Description"] = "description {!s}".format(i) # save the job command = "save_job" requestPath = "/commands/{}".format(command) requestHeaders = None requestData = [spec] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) self.assert_save_job(response, name) self.deleteSession(userSessionId) if sessionId is None or len(sessionId) < 1: self.deleteSession(superSessionId) def rest_create_branches(self, sessionId=None): # testname = self.id().rsplit(sep=".", maxsplit=1)[-1] p4port = self.p4server.p4api.port p4user = "p4testadmin" if sessionId is None or len(sessionId) < 1: superSessionId = self.createSession(p4port, p4user, "Perf0rce") else: superSessionId = sessionId # depends upon depots and indirectly users self.rest_create_depots(superSessionId) counterName = "create_branches" if self.rest_increment_counter(counterName, superSessionId) < 2: for i in range(0,10): userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce") name = "branch_{!s}".format(i) command = "fetch_branch" requestPath = "/commands/{}?arg={}".format(command, name) requestHeaders = None requestData = None response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200) spec = self.assert_fetch_branch(response, name) spec["View"].clear() spec["View"].append("//local/a/... //local/b/...") # save the branch command = "save_branch" requestPath = "/commands/{}".format(command) requestHeaders = None requestData = [spec] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) self.assert_save_branch(response, name) self.deleteSession(userSessionId) if sessionId is None or len(sessionId) < 1: self.deleteSession(superSessionId) def rest_create_labels(self, sessionId=None): # testname = self.id().rsplit(sep=".", maxsplit=1)[-1] p4port = self.p4server.p4api.port p4user = "p4testadmin" if sessionId is None or len(sessionId) < 1: superSessionId = self.createSession(p4port, p4user, "Perf0rce") else: superSessionId = sessionId # depends upon depots and indirectly users self.rest_create_depots(superSessionId) counterName = "create_labels" if self.rest_increment_counter(counterName, superSessionId) < 2: for i in range(0,10): userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce") name = "label_{!s}".format(i) command = "fetch_label" requestPath = "/commands/{}?arg={}".format(command, name) requestHeaders = None requestData = None response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200) spec = self.assert_fetch_label(response, name) spec["View"].clear() spec["View"].append("//local/...") # save the label command = "save_label" requestPath = "/commands/{}".format(command) requestHeaders = None requestData = [spec] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) self.assert_save_label(response, name) self.deleteSession(userSessionId) if sessionId is None or len(sessionId) < 1: self.deleteSession(superSessionId) def rest_create_repos(self, sessionId=None): # testname = self.id().rsplit(sep=".", maxsplit=1)[-1] p4port = self.p4server.p4api.port p4user = "p4testadmin" if sessionId is None or len(sessionId) < 1: superSessionId = self.createSession(p4port, p4user, "Perf0rce") else: superSessionId = sessionId # depends upon depots and indirectly users self.rest_create_depots(superSessionId) counterName = "create_repos" if self.rest_increment_counter(counterName, superSessionId) < 2: for i in range(0,10): userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce") name = "//graph/repo_{!s}".format(i) command = "fetch_repo" requestPath = "/commands/{}?arg={}".format(command, name) requestHeaders = None requestData = None response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200) spec = self.assert_fetch_repo(response, name) # save the repo command = "save_repo" requestPath = "/commands/{}".format(command) requestHeaders = None requestData = [spec] logger.debug(self.p4server.p4api.run_depots()) response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) self.assert_save_repo(response, name) self.deleteSession(userSessionId) if sessionId is None or len(sessionId) < 1: self.deleteSession(superSessionId) def rest_create_streams(self, sessionId=None): # testname = self.id().rsplit(sep=".", maxsplit=1)[-1] p4port = self.p4server.p4api.port p4user = "p4testadmin" if sessionId is None or len(sessionId) < 1: superSessionId = self.createSession(p4port, p4user, "Perf0rce") else: superSessionId = sessionId # depends upon depots and indirectly users self.rest_create_depots(superSessionId) counterName = "create_streams" if self.rest_increment_counter(counterName, superSessionId) < 2: for i in range(0,10): userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce") name = "//stream/stream_{!s}".format(i) command = "fetch_stream" requestPath = "/commands/{}?arg=-t&arg=mainline&arg={}".format(command, name) requestHeaders = None requestData = None response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200) spec = self.assert_fetch_stream(response, name) # save the stream command = "save_stream" requestPath = "/commands/{}".format(command) requestHeaders = None requestData = [spec] logger.debug(self.p4server.p4api.run_depots()) response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) self.assert_save_stream(response, name) self.deleteSession(userSessionId) if sessionId is None or len(sessionId) < 1: self.deleteSession(superSessionId) def rest_create_clients(self, sessionId=None): # testname = self.id().rsplit(sep=".", maxsplit=1)[-1] p4port = self.p4server.p4api.port p4user = "p4testadmin" if sessionId is None or len(sessionId) < 1: superSessionId = self.createSession(p4port, p4user, "Perf0rce") else: superSessionId = sessionId # depends upon depots self.rest_create_depots(superSessionId) counterName = "create_clients" if self.rest_increment_counter(counterName, superSessionId) < 2: for i in range(0,10): userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce") name = "client_{!s}".format(i) command = "fetch_client" requestPath = "/commands/{}?arg={}".format(command, name) requestHeaders = None requestData = None response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200) spec = self.assert_fetch_client(response, name) spec["Root"] = self.p4server.clientroot options = spec["Options"].split() options[-1] = "rmdir" spec["Options"] = " ".join(options) spec["View"].clear() spec["View"].append("//local/... //{}/...".format(name)) # save the client command = "save_client" requestPath = "/commands/{}".format(command) requestHeaders = None requestData = [spec] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) self.assert_save_client(response, name) self.deleteSession(userSessionId) if sessionId is None or len(sessionId) < 1: self.deleteSession(superSessionId) def rest_create_pending_changes(self, sessionId=None): # testname = self.id().rsplit(sep=".", maxsplit=1)[-1] p4port = self.p4server.p4api.port p4user = "p4testadmin" if sessionId is None or len(sessionId) < 1: superSessionId = self.createSession(p4port, p4user, "Perf0rce") else: superSessionId = sessionId # depends upon clients self.rest_create_clients(superSessionId) counterName = "create_pending_changes" if self.rest_increment_counter(counterName, superSessionId) < 2: for i in range(0,10): userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce") name = "new" client = "client_{!s}".format(i) command = "fetch_change" requestPath = "/commands/{}?arg={}&client={}".format(command, name, client) requestHeaders = None requestData = None response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200) spec = self.assert_fetch_change(response, name) spec["Description"] = "pending change {!s}".format(i) # save the change command = "save_change" requestPath = "/commands/{}?client={}".format(command, client) requestHeaders = None requestData = [spec] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) self.assert_save_change(response) self.deleteSession(userSessionId) if sessionId is None or len(sessionId) < 1: self.deleteSession(superSessionId) def rest_create_submitted_changes(self, sessionId=None): # testname = self.id().rsplit(sep=".", maxsplit=1)[-1] p4port = self.p4server.p4api.port p4user = "p4testadmin" if sessionId is None or len(sessionId) < 1: superSessionId = self.createSession(p4port, p4user, "Perf0rce") else: superSessionId = sessionId # depends upon clients self.rest_create_clients(superSessionId) counterName = "create_submitted_changes" if self.rest_increment_counter(counterName, superSessionId) < 2: for i in range(0,10): userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce") name = "new" client = "client_{!s}".format(i) command = "fetch_change" requestPath = "/commands/{}?arg={}&client={}".format(command, name, client) requestHeaders = None requestData = None response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200) spec = self.assert_fetch_change(response, name) spec["Description"] = "submitted change {!s}".format(i) # save the change command = "save_change" requestPath = "/commands/{}?client={}".format(command, client) requestHeaders = None requestData = [spec] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_save_change(response) changeno = responseData[1]["Change"] fileList = [] whereMap = {} for ii in range(0,10): command = "run_where" depotFile = "//local/{!s}/file{!s}.txt".format(i, ii) requestPath = "/commands/{}?arg={}&client={}".format(command, depotFile, client) requestHeaders = None requestData = None response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_run_where(response, filecount=1, depotFileMap={depotFile: True})[0] filename = responseData["path"] fileList.append(filename) whereMap[responseData["depotFile"]] = responseData if not os.path.isdir(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) with open(filename, mode="a") as f: f.write("a line {!s}".format(ii)) command = "run_add" requestPath = "/commands/{}?client={}".format(command, client) requestHeaders = None requestData = ["-c", changeno] + fileList response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_run_add(response, changeno, whereMap) command = "run_submit" requestPath = "/commands/{}?client={}".format(command, client) requestHeaders = None requestData = ["-c", changeno] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_run_submit(response, changeno, whereMap) command = "run_sync" requestPath = "/commands/{}?client={}".format(command, client) requestHeaders = None requestData = ["//...#0"] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_run_sync(response, filecount=10, depotFileMap=whereMap) self.deleteSession(userSessionId) if sessionId is None or len(sessionId) < 1: self.deleteSession(superSessionId) def rest_create_shelved_changes(self, sessionId=None): # testname = self.id().rsplit(sep=".", maxsplit=1)[-1] p4port = self.p4server.p4api.port p4user = "p4testadmin" if sessionId is None or len(sessionId) < 1: superSessionId = self.createSession(p4port, p4user, "Perf0rce") else: superSessionId = sessionId # depends upon submitted changes self.rest_create_submitted_changes(superSessionId) counterName = "create_shelved_changes" if self.rest_increment_counter(counterName, superSessionId) < 2: for i in range(0,10): userSessionId = self.createSession(p4port, "user_{!s}".format(i), "Perf0rce") name = "new" client = "client_{!s}".format(i) command = "fetch_change" requestPath = "/commands/{}?arg={}&client={}".format(command, name, client) requestHeaders = None requestData = None response = self.wwwrequest("GET", userSessionId, requestPath, requestData, requestHeaders, 200) spec = self.assert_fetch_change(response, name) spec["Description"] = "shelved change {!s}".format(i) # save the change command = "save_change" requestPath = "/commands/{}?client={}".format(command, client) requestHeaders = None requestData = [spec] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_save_change(response) changeno = responseData[1]["Change"] command = "run_sync" requestPath = "/commands/{}?client={}".format(command, client) requestHeaders = None requestData = ["//local/{!s}/...".format(i)] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_run_sync(response, filecount=10) depotFileMap = {} for revision in responseData: depotFileMap[revision["depotFile"]] = revision command = "run_edit" requestPath = "/commands/{}?client={}".format(command, client) requestHeaders = None requestData = ["-c", changeno, "//local/{!s}/...".format(i)] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_run_edit(response, changeno, filecount=10, depotFileMap=depotFileMap) command = "run_shelve" requestPath = "/commands/{}?client={}".format(command, client) requestHeaders = None requestData = ["-r", "-c", changeno] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_run_shelve(response, changeno, depotFileMap=depotFileMap) command = "run_revert" requestPath = "/commands/{}?client={}".format(command, client) requestHeaders = None requestData = ["//..."] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_run_revert(response, filecount=10, depotFileMap=depotFileMap) command = "run_sync" requestPath = "/commands/{}?client={}".format(command, client) requestHeaders = None requestData = ["//...#0"] response = self.wwwrequest("POST", userSessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_run_sync(response, filecount=10, depotFileMap=depotFileMap) self.deleteSession(userSessionId) if sessionId is None or len(sessionId) < 1: self.deleteSession(superSessionId) def create_all(self): self.rest_create_users() self.rest_create_depots() self.rest_create_groups() self.rest_create_servers() self.rest_create_remotes() self.rest_create_ldaps() self.rest_create_jobs() self.rest_create_branches() self.rest_create_labels() self.rest_create_repos() self.rest_create_streams() self.rest_create_clients() self.rest_create_pending_changes() self.rest_create_submitted_changes() self.rest_create_shelved_changes() def test_basic(self): self.rest_create_clients() p4port = self.p4server.p4api.port p4user = "user_0" sessionId = self.createSession(p4port, p4user, "Perf0rce") # run_info command = "run_info" requestPath = "/commands/{}".format(command) requestData = None requestHeaders = None response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200) self.assert_run_info(response, clientName="*unknown*") # set a client requestPath = "/commands/{}?client=client_0".format(command) response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200) self.assert_run_info(response, clientName="client_0") # delete self.deleteSession(sessionId) def testLimits01(self): # testname = self.id().rsplit(sep=".", maxsplit=1)[-1] expectedtotal = 100 self.rest_create_submitted_changes() p4port = self.p4server.p4api.port p4user = "user_0" p4client = "client_0" sessionId = self.createSession(p4port, p4user, "Perf0rce") command = "run_sync" requestPath = "/commands/{}?client={}".format(command, p4client) requestData = ["-k", "//..."] requestHeaders = None response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_run_sync(response, filecount=expectedtotal) depotFileMap = {} for revision in responseData: depotFileMap[revision["depotFile"]] = revision # have with no limits command = "run_have" requestPath = "/commands/{}?client={}".format(command, p4client) requestData = None requestHeaders = None response = self.wwwrequest("GET", sessionId, requestPath, requestData, requestHeaders, 200) self.assert_run_have(response, filecount=expectedtotal, depotFileMap=depotFileMap) # cleanup command = "run_sync" requestPath = "/commands/{}?client={}".format(command, p4client) requestData = ["-k", "//...#0"] requestHeaders = None response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200) self.assert_run_sync(response, filecount=100, depotFileMap=depotFileMap) # delete self.deleteSession(sessionId) def testLimits02(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] expectedtotal = 100 self.rest_create_submitted_changes() p4port = self.p4server.p4api.port p4user = "user_0" p4client = "client_0" sessionId = self.createSession(p4port, p4user, "Perf0rce") command = "run_sync" requestPath = "/commands/{}?client={}".format(command, p4client) requestData = ["-k", "//..."] requestHeaders = None response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_run_sync(response, filecount=expectedtotal) depotFileMap = {} for revision in responseData: depotFileMap[revision["depotFile"]] = revision # limit on the command command = "run_have" for limit in [1, 2, 3, 5, 7, 11]: currentoffset = 0 currenttotal = 0 expectedCounter = expectedtotal // limit + 1 if expectedtotal % limit > 0: expectedCounter = expectedCounter + 1 self.assertGreaterEqual(expectedCounter, 2, testname) actualCounter = 0 while currentoffset <= expectedtotal: if currentoffset < 1: requestPath = "/commands/{}?client={}&limit={!s}".format(command, p4client, limit) else: requestPath = "/commands/{}?client={}&limit={!s}&offset={!s}".format(command, p4client, limit, currentoffset) response = self.wwwrequest("GET", sessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_run_have(response, depotFileMap=depotFileMap) (currenttotal, currentoffset) = self.assert_limit_and_next(testname, responseData, limit, currentoffset, currenttotal, expectedtotal) actualCounter = actualCounter + 1 self.assertEqual(expectedCounter, actualCounter, testname) # cleanup command = "run_sync" requestPath = "/commands/{}?client={}".format(command, p4client) requestData = ["-k", "//...#0"] requestHeaders = None response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200) self.assert_run_sync(response, filecount=100, depotFileMap=depotFileMap) # delete self.deleteSession(sessionId) def testLimits03(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] expectedtotal = 100 self.rest_create_submitted_changes() p4port = self.p4server.p4api.port p4user = "user_0" p4client = "client_0" sessionId = self.createSession(p4port, p4user, "Perf0rce") command = "run_sync" requestPath = "/commands/{}?client={}".format(command, p4client) requestData = ["-k", "//..."] requestHeaders = None response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_run_sync(response, filecount=expectedtotal) depotFileMap = {} for revision in responseData: depotFileMap[revision["depotFile"]] = revision # delete self.deleteSession(sessionId) # limit on the object command = "run_have" orgAppLimit = self.wwwapp.config["MAXRESULTS"] for limit in [1, 2, 3, 5, 7, 11]: self.wwwapp.config["MAXRESULTS"] = limit sessionId = self.createSession(p4port, p4user, "Perf0rce") currentoffset = 0 currenttotal = 0 expectedCounter = expectedtotal // limit + 1 if expectedtotal % limit > 0: expectedCounter = expectedCounter + 1 self.assertGreaterEqual(expectedCounter, 2, testname) actualCounter = 0 while currentoffset <= expectedtotal: if currentoffset < 1: requestPath = "/commands/{}?client={}".format(command, p4client) else: requestPath = "/commands/{}?client={}&offset={!s}".format(command, p4client, currentoffset) response = self.wwwrequest("GET", sessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_run_have(response, depotFileMap=depotFileMap) (currenttotal, currentoffset) = self.assert_limit_and_next(testname, responseData, limit, currentoffset, currenttotal, expectedtotal) actualCounter = actualCounter + 1 self.assertEqual(expectedCounter, actualCounter, testname) # delete self.deleteSession(sessionId) # cleanup self.wwwapp.config["MAXRESULTS"] = orgAppLimit sessionId = self.createSession(p4port, p4user, "Perf0rce") command = "run_sync" requestPath = "/commands/{}?client={}".format(command, p4client) requestData = ["-k", "//...#0"] requestHeaders = None response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200) self.assert_run_sync(response, filecount=100, depotFileMap=depotFileMap) # delete self.deleteSession(sessionId) def run_iterator01(self, testname, plural, expectedtotal): p4port = self.p4server.p4api.port p4user = "user_0" sessionId = self.createSession(p4port, p4user, "Perf0rce") # first pass with no limits requestData = None requestHeaders = None command = "iterate_{}".format(plural) requestPath = "/commands/{}".format(command) response = self.wwwrequest("GET", sessionId, requestPath, requestData, requestHeaders, 200) self.assert_iterate(response, plural, itemcount=expectedtotal) # delete self.deleteSession(sessionId) def run_iterator02(self, testname, plural, expectedtotal): p4port = self.p4server.p4api.port p4user = "user_0" sessionId = self.createSession(p4port, p4user, "Perf0rce") # first pass with no limits requestData = None requestHeaders = None command = "iterate_{}".format(plural) for limit in [1, 2, 3, 5, 7, 11]: currentoffset = 0 currenttotal = 0 expectedCounter = expectedtotal // limit + 1 if expectedtotal % limit > 0: expectedCounter = expectedCounter + 1 self.assertGreaterEqual(expectedCounter, 2, testname) actualCounter = 0 while currentoffset <= expectedtotal: if currentoffset < 1: requestPath = "/commands/{}?limit={!s}".format(command, limit) else: requestPath = "/commands/{}?limit={!s}&offset={!s}".format(command, limit, currentoffset) response = self.wwwrequest("GET", sessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_iterate(response, plural) (currenttotal, currentoffset) = self.assert_limit_and_next(testname, responseData, limit, currentoffset, currenttotal, expectedtotal) actualCounter = actualCounter + 1 self.assertEqual(expectedCounter, actualCounter, testname) # delete self.deleteSession(sessionId) def run_iterator03(self, testname, plural, expectedtotal): p4port = self.p4server.p4api.port p4user = "user_0" # first pass with no limits requestData = None requestHeaders = None command = "iterate_{}".format(plural) orgAppLimit = self.wwwapp.config["MAXRESULTS"] for limit in [1, 2, 3, 5, 7, 11]: self.wwwapp.config["MAXRESULTS"] = limit sessionId = self.createSession(p4port, p4user, "Perf0rce") currentoffset = 0 currenttotal = 0 expectedCounter = expectedtotal // limit + 1 if expectedtotal % limit > 0: expectedCounter = expectedCounter + 1 self.assertGreaterEqual(expectedCounter, 2, testname) actualCounter = 0 while currentoffset <= expectedtotal: if currentoffset < 1: requestPath = "/commands/{}".format(command) else: requestPath = "/commands/{}?offset={!s}".format(command, currentoffset) response = self.wwwrequest("GET", sessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_iterate(response, plural) (currenttotal, currentoffset) = self.assert_limit_and_next(testname, responseData, limit, currentoffset, currenttotal, expectedtotal) actualCounter = actualCounter + 1 self.assertEqual(expectedCounter, actualCounter, testname) # delete self.deleteSession(sessionId) # cleanup self.wwwapp.config["MAXRESULTS"] = orgAppLimit def testIteratorbranches01(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_branches() plural = "branches" expectedtotal = 10 self.run_iterator01(testname, plural, expectedtotal) def testIteratorbranches02(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_branches() plural = "branches" expectedtotal = 10 self.run_iterator02(testname, plural, expectedtotal) def testIteratorbranches03(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_branches() plural = "branches" expectedtotal = 10 self.run_iterator03(testname, plural, expectedtotal) def testIteratorrepos01(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_repos() plural = "repos" expectedtotal = 10 self.run_iterator01(testname, plural, expectedtotal) def testIteratorrepos02(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_repos() plural = "repos" expectedtotal = 10 self.run_iterator02(testname, plural, expectedtotal) def testIteratorrepos03(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_repos() plural = "repos" expectedtotal = 10 self.run_iterator03(testname, plural, expectedtotal) def testIteratorchanges01(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_pending_changes() self.rest_create_submitted_changes() self.rest_create_shelved_changes() plural = "changes" expectedtotal = 30 self.run_iterator01(testname, plural, expectedtotal) def testIteratorchanges02(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_pending_changes() self.rest_create_submitted_changes() self.rest_create_shelved_changes() plural = "changes" expectedtotal = 30 self.run_iterator02(testname, plural, expectedtotal) def testIteratorchanges03(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_pending_changes() self.rest_create_submitted_changes() self.rest_create_shelved_changes() plural = "changes" expectedtotal = 30 self.run_iterator03(testname, plural, expectedtotal) def testIteratorclients01(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_clients() plural = "clients" expectedtotal = 10 self.run_iterator01(testname, plural, expectedtotal) def testIteratorclients02(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_clients() plural = "clients" expectedtotal = 10 self.run_iterator02(testname, plural, expectedtotal) def testIteratorclients03(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_clients() plural = "clients" expectedtotal = 10 self.run_iterator03(testname, plural, expectedtotal) def testIteratordepots01(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_depots() plural = "depots" expectedtotal = 10 self.run_iterator01(testname, plural, expectedtotal) def testIteratordepots02(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_depots() plural = "depots" expectedtotal = 10 self.run_iterator02(testname, plural, expectedtotal) def testIteratordepots03(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_depots() plural = "depots" expectedtotal = 10 self.run_iterator03(testname, plural, expectedtotal) def testIteratorgroups01(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_groups() plural = "groups" expectedtotal = 100 self.run_iterator01(testname, plural, expectedtotal) def testIteratorgroups02(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_groups() plural = "groups" expectedtotal = 100 self.run_iterator02(testname, plural, expectedtotal) def testIteratorgroups03(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_groups() plural = "groups" expectedtotal = 100 self.run_iterator03(testname, plural, expectedtotal) def testIteratorjobs01(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_jobs() plural = "jobs" expectedtotal = 10 self.run_iterator01(testname, plural, expectedtotal) def testIteratorjobs02(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_jobs() plural = "jobs" expectedtotal = 10 self.run_iterator02(testname, plural, expectedtotal) def testIteratorjobs03(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_jobs() plural = "jobs" expectedtotal = 10 self.run_iterator03(testname, plural, expectedtotal) def testIteratorlabels01(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_labels() plural = "labels" expectedtotal = 10 self.run_iterator01(testname, plural, expectedtotal) def testIteratorlabels02(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_labels() plural = "labels" expectedtotal = 10 self.run_iterator02(testname, plural, expectedtotal) def testIteratorlabels03(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_labels() plural = "labels" expectedtotal = 10 self.run_iterator03(testname, plural, expectedtotal) def testIteratorldaps01(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_ldaps() plural = "ldaps" expectedtotal = 10 self.run_iterator01(testname, plural, expectedtotal) def testIteratorldaps02(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_ldaps() plural = "ldaps" expectedtotal = 10 self.run_iterator02(testname, plural, expectedtotal) def testIteratorldaps03(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_ldaps() plural = "ldaps" expectedtotal = 10 self.run_iterator03(testname, plural, expectedtotal) def testIteratorremotes01(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_remotes() plural = "remotes" expectedtotal = 10 self.run_iterator01(testname, plural, expectedtotal) def testIteratorremotes02(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_remotes() plural = "remotes" expectedtotal = 10 self.run_iterator02(testname, plural, expectedtotal) def testIteratorremotes03(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_remotes() plural = "remotes" expectedtotal = 10 self.run_iterator03(testname, plural, expectedtotal) def testIteratorservers01(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_servers() plural = "servers" expectedtotal = 10 self.run_iterator01(testname, plural, expectedtotal) def testIteratorservers02(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_servers() plural = "servers" expectedtotal = 10 self.run_iterator02(testname, plural, expectedtotal) def testIteratorservers03(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_servers() plural = "servers" expectedtotal = 10 self.run_iterator03(testname, plural, expectedtotal) def testIteratorstreams01(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_streams() plural = "streams" expectedtotal = 10 self.run_iterator01(testname, plural, expectedtotal) def testIteratorstreams02(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_streams() plural = "streams" expectedtotal = 10 self.run_iterator02(testname, plural, expectedtotal) def testIteratorstreams03(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_streams() plural = "streams" expectedtotal = 10 self.run_iterator03(testname, plural, expectedtotal) def testIteratorusers01(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_users() plural = "users" expectedtotal = 12 self.run_iterator01(testname, plural, expectedtotal) def testIteratorusers02(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_users() plural = "users" expectedtotal = 12 self.run_iterator02(testname, plural, expectedtotal) def testIteratorusers03(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_users() plural = "users" expectedtotal = 12 self.run_iterator03(testname, plural, expectedtotal) def test_update(self): testname = self.id().rsplit(sep=".", maxsplit=1)[-1] self.rest_create_labels() p4port = self.p4server.p4api.port p4user = "p4testadmin" sessionId = self.createSession(p4port, p4user, "Perf0rce") # run_info command = "run_info" requestPath = "/commands/{}".format(command) requestData = None requestHeaders = None response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200) self.assert_run_info(response, clientName="*unknown*") command = "iterate_labels" requestPath = "/commands/{}".format(command) response = self.wwwrequest("GET", sessionId, requestPath, requestData, requestHeaders, 200) responseData = self.assert_iterate(response, "labels", itemcount=10) for label in responseData: name = label["Label"] label["Description"] = label["Description"] + " updated by {}".format(p4user) command = "update_label" requestPath = "/commands/{}".format(command) requestData = [name, label] requestHeaders = None response = self.wwwrequest("PUT", sessionId, requestPath, requestData, requestHeaders, 200) self.assert_save_label(response, name) command = "delete_label" requestPath = "/commands/{}?arg={}".format(command, name) requestData = None requestHeaders = None response = self.wwwrequest("DELETE", sessionId, requestPath, requestData, requestHeaders, 200) self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) logger.debug(responseData) counterName = "create_labels" command = "run_counter" requestPath = "/commands/{}".format(command) requestHeaders = None requestData = ["-d", counterName] response = self.wwwrequest("POST", sessionId, requestPath, requestData, requestHeaders, 200) self.assertEqual("application/json", response.headers.get("Content-Type")) responseData = json.loads(response.get_data()) logger.debug(responseData) # delete self.deleteSession(sessionId) if __name__ == "__main__": #import sys;sys.argv = ['', 'Test.testName'] unittest.main()