''' Created on Nov 20, 2017 @author: Charlie McLouth ''' import unittest import os.path from p4rest.util import osenviron, mkdtemp, rmtree, osremovefile, oslistdir from p4rest.session import P4RestSession, P4RestSessionAdmin, P4RestSessionP4, \ SESSION_HEADER_NAME from p4rest.exceptions import P4RestSessionError from p4rest.flask.config import P4RestAppConfig from P4Server import P4Testcase, testConfiguration from P4 import P4Exception from datetime import datetime from p4rest.flask.application import create_app, APP_CONFIG_KEY from werkzeug.datastructures import Headers from base64 import b64encode from flask import current_app, json import logging logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) debugHandler = logging.StreamHandler() debugHandler.setFormatter(logging.Formatter("%(levelname)s:%(filename)s:" "%(lineno)d:%(funcName)s:" "%(message)s")) logger.addHandler(debugHandler) class TestP4RestSession(unittest.TestCase): '''Testing P4RestSession and P4RestSessionP4''' def setUp(self): self.orgEnviron = {} for k,v in osenviron.items(): if k.startswith("P4"): self.orgEnviron[k] = v self.testdir = os.path.abspath(".") if not os.path.isdir(self.testdir): raise Exception("Test directory ({}) does not exist.".format(self.testdir)) self.testdir = mkdtemp(suffix=None, prefix=__name__, dir=self.testdir) def tearDown(self): if os.path.isdir(self.testdir): rmtree(self.testdir) else: logger.warn('not a directory?:' + self.testdir) osenviron.update(self.orgEnviron) todelete = [] for k in osenviron.keys(): if k.startswith("P4"): if k not in self.orgEnviron: todelete.append(k) for k in todelete: del(osenviron[k]) def testdump(self): session = P4RestSession() self.assertEqual(0, len(session)) with self.assertRaises(P4RestSessionError) as cm: session.load() expected = "undefined session" actual = cm.exception.args[0] self.assertEqual(expected, actual) filename = os.path.join(self.testdir, ".conf") with self.assertRaises(P4RestSessionError) as cm: session.load(filename) expected = "error reading from file ({})".format(filename) actual = cm.exception.args[0] self.assertEqual(expected, actual) session["sessionfile"] = filename with self.assertRaises(P4RestSessionError) as cm: session.load() expected = "error reading from file ({})".format(filename) actual = cm.exception.args[0] self.assertEqual(expected, actual) session.dump(filename) session.clear() session.load(filename, True) self.assertEqual(2, len(session)) self.assertEqual(filename, session["sessionfile"]) self.assertEqual("P4RestSession", session["SESSIONTYPE"]) testdict = {} testdict.update(session) session.validate(testdict) session.clear() session["sessionfile"] = filename session.load(None, True) self.assertEqual(2, len(session)) self.assertEqual(filename, session["sessionfile"]) self.assertEqual("P4RestSession", session["SESSIONTYPE"]) session["A"] = "A" session["#A"] = "A" session["a"] = "a" session["#a"] = "a" session.dump() session.load() self.assertEqual(6, len(session)) self.assertEqual(filename, session["sessionfile"]) self.assertEqual("P4RestSession", session["SESSIONTYPE"]) session.clear() filename2 = os.path.join(mkdtemp(suffix=None, prefix=__name__, dir=self.testdir), ".conf") session.dump(filename2) session.load(filename2) self.assertEqual(2, len(session)) self.assertEqual(filename2, session["sessionfile"]) self.assertEqual("P4RestSession", session["SESSIONTYPE"]) def testdumpP4(self): session = P4RestSession() self.assertEqual(0, len(session)) filename = os.path.join(self.testdir, ".session") session.dump(filename) testdict = {} testdict.update(session) session = P4RestSessionP4() with self.assertRaises(P4RestSessionError) as cm: session.validate(testdict) expected = "undefined session" actual = cm.exception.args[0] self.assertEqual(expected, actual) testdict["sessionfile"] = filename for name in ["P4PORT", "P4USER", "P4CLIENT", "P4TICKETS", "P4TRUST"]: for x in range(0,5): if x == 0: if name in testdict: testdict.pop(name) elif x == 1: testdict[name] = None elif x == 2: testdict[name] = 35 elif x == 3: testdict[name] = "" elif x == 4: if name not in ["P4TICKETS", "P4TRUST"]: continue testdict[name] = os.path.join(self.testdir, "baddir", ".file") with self.assertRaises(P4RestSessionError) as cm: session.validate(testdict) expected = "Invalid {}".format(name) actual = cm.exception.args[0] self.assertEqual(expected, actual) if name == "P4PORT": testdict[name] = "something:something:something" elif name == "P4USER": testdict[name] = "someuser" elif name == "P4CLIENT": testdict[name] = "someclient" elif name == "P4TICKETS": testdict[name] = os.path.join(self.testdir, ".file") elif name == "P4TRUST": testdict[name] = os.path.join(self.testdir, ".file") session.validate(testdict) session.update(testdict) session.validate() session.dump() session.clear() session.load(filename, True) self.assertEqual(7, len(session)) self.assertEqual(filename, session["sessionfile"]) self.assertEqual("P4RestSessionP4", session["SESSIONTYPE"]) session.clear() session["sessionfile"] = filename session.load(None, True) self.assertEqual(7, len(session)) self.assertEqual(filename, session["sessionfile"]) self.assertEqual("P4RestSessionP4", session["SESSIONTYPE"]) session["A"] = "A" session["#A"] = "A" session["a"] = "a" session["#a"] = "a" session.dump() session.load() self.assertEqual(11, len(session)) self.assertEqual(filename, session["sessionfile"]) self.assertEqual("P4RestSessionP4", session["SESSIONTYPE"]) filename2 = os.path.join(mkdtemp(suffix=None, prefix=__name__, dir=self.testdir), ".session") session.dump(filename2) session.load(filename2, True) self.assertEqual(11, len(session)) self.assertEqual(filename2, session["sessionfile"]) self.assertEqual("P4RestSessionP4", session["SESSIONTYPE"]) def teststatics(self): filename = os.path.join(self.testdir, ".conf") with self.assertRaises(P4RestSessionError) as cm: P4RestSession.createFromFile(filename) expected = "error reading from file ({})".format(filename) actual = cm.exception.args[0] self.assertEqual(expected, actual) with open(filename, 'w'): pass #create from empty file session = P4RestSession.createFromFile(filename) self.assertEqual(2, len(session)) self.assertEqual(filename, session["sessionfile"]) self.assertEqual("P4RestSession", session["SESSIONTYPE"]) self.assertEqual("P4RestSession", getattr(session, "__class__").__name__) self.assertIsInstance(session, P4RestSession) session["A"] = "A" session["#A"] = "A" session["a"] = "a" session["#a"] = "a" session.validate() session.dump(filename) testdict = {} testdict.update(session) session = P4RestSession.createFromFile(filename) # dict contains 2 lower-case entries self.assertEqual(len(testdict)-2, len(session)) self.assertEqual(2, len(set(testdict.keys()) - set(session.keys()))) for k in session: self.assertEqual(session[k], testdict[k]) osremovefile(filename) session = P4RestSession.createFromDict(testdict) self.assertEqual(len(testdict), len(session)) self.assertEqual(0, len(set(testdict.keys()) - set(session.keys()))) for k in session: self.assertEqual(session[k], testdict[k]) osremovefile(filename) testdict.clear() testdict["sessionfile"] = filename testdict["SESSIONTYPE"] = "P4RestSessionP4" testdict["P4PORT"] = "something:something:something" testdict["P4USER"] = "someuser" testdict["P4CLIENT"] = "someclient" testdict["P4TICKETS"] = os.path.join(self.testdir, ".p4tickets") testdict["P4TRUST"] = os.path.join(self.testdir, ".p4trust") session = P4RestSession.createFromDict(testdict) self.assertIsInstance(session, P4RestSessionP4) self.assertEqual(len(testdict), len(session)) self.assertEqual(0, len(set(testdict.keys()) - set(session.keys()))) for k in session: self.assertEqual(session[k], testdict[k]) self.assertEqual("P4RestSessionP4", session["SESSIONTYPE"]) session = P4RestSession.createFromFile(filename) self.assertIsInstance(session, P4RestSessionP4) self.assertEqual(len(testdict), len(session)) self.assertEqual(0, len(set(testdict.keys()) - set(session.keys()))) for k in session: self.assertEqual(session[k], testdict[k]) self.assertEqual("P4RestSessionP4", session["SESSIONTYPE"]) osremovefile(filename) testdict["SESSIONTYPE"] = "P4RestSessionAdmin" session = P4RestSession.createFromDict(testdict) self.assertIsInstance(session, P4RestSessionAdmin) self.assertEqual(len(testdict), len(session)) self.assertEqual(0, len(set(testdict.keys()) - set(session.keys()))) for k in session: self.assertEqual(session[k], testdict[k]) self.assertEqual("P4RestSessionAdmin", session["SESSIONTYPE"]) session = P4RestSession.createFromFile(filename) self.assertIsInstance(session, P4RestSessionAdmin) self.assertEqual(len(testdict), len(session)) self.assertEqual(0, len(set(testdict.keys()) - set(session.keys()))) for k in session: self.assertEqual(session[k], testdict[k]) self.assertEqual("P4RestSessionAdmin", session["SESSIONTYPE"]) osremovefile(filename) class TestP4RestSessionP4(P4Testcase): '''Test P4 environment within the P4RestSessionP4''' def setUp(self): self.orgP4CONFIG = osenviron.get("P4CONFIG", None) self.orgP4TICKETS = osenviron.get("P4TICKETS", None) self.orgP4TRUST = osenviron.get("P4TRUST", None) self.testdir = os.path.abspath(".") if not os.path.isdir(self.testdir): raise Exception("Test directory ({}) does not exist." "".format(self.testdir)) self.testdir = mkdtemp(suffix=None, prefix=__name__, dir=self.testdir) self.configKey = "TestP4RestAppConfig_key" configdefaults = {self.configKey: os.path.join(self.testdir, "test.json"), "WORKDIR": os.path.join(self.testdir, "workdir")} self.appconfig = P4RestAppConfig(os.path.abspath("."), configdefaults) configfile = self.appconfig.get_config_file(self.configKey) # add/update program defaults self.appconfig.set_program_defaults(os.path.dirname(configfile)) self.appconfig.from_json(configfile, silent=True) # validate application configuration self.appconfig.validate(self.configKey) # dump config file self.appconfig.dump_to_json(configfile, silent=True) testConfiguration["TESTDIR"] = self.testdir P4Testcase.setUp(self) def tearDown(self): P4Testcase.tearDown(self) if os.path.isdir(self.testdir): rmtree(self.testdir) else: logger.warn('not a directory?:' + self.testdir) if self.orgP4CONFIG is not None: osenviron["P4CONFIG"] = self.orgP4CONFIG elif "P4CONFIG" in osenviron: del(osenviron["P4CONFIG"]) if self.orgP4TICKETS is not None: osenviron["P4TICKETS"] = self.orgP4TICKETS elif "P4TICKETS" in osenviron: del(osenviron["P4TICKETS"]) if self.orgP4TRUST is not None: osenviron["P4TRUST"] = self.orgP4TRUST elif "P4CONFIG" in osenviron: del(osenviron["P4CONFIG"]) def initTestData(self, p4api): testname = self.id().rsplit(".", 1)[1] if testname == "testP4Sessions": p4api = self.p4server.p4api p4user = p4api.user for x in range(0, 5): newuser = "{}{!s}".format(testname, x) userdata = p4api.fetch_user(newuser) p4api.save_user(userdata, "-f") p4api.user = newuser p4api.run_password("Perf0rce", "Perf0rce") p4api.user = p4user p4api.run_configure("set", "security=3") def countTickets(self): linecount = 0 with open(self.appconfig["P4TICKETS"]) as f: for line in f: if len(line.strip()) > 0: linecount = linecount + 1 return linecount def countSessionDirs(self): dircount = 0 for directory in oslistdir(self.appconfig["WORKDIR"]): if os.path.isdir(os.path.join(self.appconfig["WORKDIR"], directory)): dircount = dircount + 1 return dircount def testP4Sessions(self): testname = self.id().rsplit(".", 1)[1] with open(self.appconfig["P4TICKETS"]) as f: linecount = 0 for line in f: if len(line.strip()) > 0: linecount = linecount + 1 self.assertEqual(0, self.countTickets()) self.assertEqual(0, self.countSessionDirs()) sessions = [] users = [] for cmdresult in self.p4server.p4api.run_users(): if cmdresult["User"].startswith(testname): sessionData = {"P4USER": cmdresult["User"], "P4PORT": self.p4server.p4api.port, "P4CLIENT": self.appconfig["P4CLIENT"], "P4TICKETS": self.appconfig["P4TICKETS"], "P4TRUST": self.appconfig["P4TRUST"], "SESSIONTYPE": "P4RestSessionP4"} sessionKey = P4RestSession.hashkey(sessionData) sessionData["sessionfile"] = os.path.join(self.appconfig["WORKDIR"], sessionKey, self.appconfig["P4CONFIG"]) session = P4RestSession.createFromDict(sessionData) self.assertIsNotNone(session) session.run_login("Perf0rce") sessions.append(session) users.append(cmdresult["User"]) p4api = session.getP4API() cmdresult = p4api.fetch_user() self.assertEqual(users[-1], cmdresult["User"]) cmdresult = p4api.run_info() self.assertEqual(users[-1], cmdresult[0]["userName"]) self.assertEqual(session.getdirname(), cmdresult[0]["clientCwd"]) self.assertFalse(session.isExpired()) session.dispose() self.assertEqual(5, len(sessions)) self.assertEqual(len(sessions), self.countTickets()) self.assertEqual(len(sessions), self.countSessionDirs()) for i in range(0, len(sessions)): session = sessions[i] user = users[i] p4api = session.getP4API() cmdresult = p4api.fetch_user() self.assertEqual(user, cmdresult["User"]) cmdresult = p4api.run_info() self.assertEqual(user, cmdresult[0]["userName"]) self.assertEqual(session.getdirname(), cmdresult[0]["clientCwd"]) self.assertFalse(session.isExpired()) session.expire() p4api = session.getP4API() with self.assertRaises(P4Exception) as cm: print(p4api.fetch_user()) self.assertEqual(1, len(cm.exception.errors)) expected = "Perforce password (P4PASSWD) invalid or unset." actual = cm.exception.errors[0] self.assertEqual(expected, actual) self.assertEqual(5, len(sessions)) self.assertEqual(0, self.countTickets()) self.assertEqual(len(sessions), self.countSessionDirs()) for i in range(0, len(sessions)): session = sessions[i] user = users[i] self.assertTrue(session.isExpired()) session.run_login("Perf0rce") expiresin = (session.get("EXPIRES", datetime.utcnow()) - datetime.utcnow()).total_seconds() self.assertGreater(expiresin, 0) self.assertLess(expiresin, 300) p4api = session.getP4API() cmdresult = p4api.fetch_user() self.assertEqual(user, cmdresult["User"]) cmdresult = p4api.run_info() self.assertEqual(user, cmdresult[0]["userName"]) self.assertEqual(session.getdirname(), cmdresult[0]["clientCwd"]) session.dispose() self.assertEqual(5, len(sessions)) self.assertEqual(len(sessions), self.countTickets()) self.assertEqual(len(sessions), self.countSessionDirs()) for i in range(0, len(sessions)): session = sessions[i] user = users[i] p4api = session.getP4API() cmdresult = p4api.fetch_user() self.assertEqual(user, cmdresult["User"]) cmdresult = p4api.run_info() self.assertEqual(user, cmdresult[0]["userName"]) self.assertEqual(session.getdirname(), cmdresult[0]["clientCwd"]) session.dispose() self.assertEqual(5, len(sessions)) self.assertEqual(len(sessions), self.countTickets()) self.assertEqual(len(sessions), self.countSessionDirs()) for i in range(0, len(sessions)): session = P4RestSession.createFromFile(sessions[i].getfilename()) user = users[i] p4api = session.getP4API() cmdresult = p4api.fetch_user() self.assertEqual(user, cmdresult["User"]) cmdresult = p4api.run_info() self.assertEqual(user, cmdresult[0]["userName"]) self.assertEqual(session.getdirname(), cmdresult[0]["clientCwd"]) session.expire() session.dispose() self.assertFalse(os.path.exists(session.getfilename())) self.assertFalse(os.path.exists(session.getdirname())) self.assertEqual(5, len(sessions)) self.assertEqual(0, self.countTickets()) self.assertEqual(0, self.countSessionDirs()) class TestWWWP4Sessions(P4Testcase): '''Test WWW environment within the P4RestSessionP4''' def setUp(self): self.orgP4CONFIG = osenviron.get("P4CONFIG", None) self.orgP4TICKETS = osenviron.get("P4TICKETS", None) self.orgP4TRUST = osenviron.get("P4TRUST", None) self.testdir = os.path.abspath(".") if not os.path.isdir(self.testdir): raise Exception("Test directory ({}) does not exist." "".format(self.testdir)) self.testdir = mkdtemp(suffix=None, prefix=__name__, dir=self.testdir) self.configKey = "TestP4RestAppConfig_key" configdefaults = {self.configKey: os.path.join(self.testdir, "test.json"), "WORKDIR": os.path.join(self.testdir, "workdir"), APP_CONFIG_KEY: self.configKey, "DEBUG": True} self.wwwapp = create_app(configdefaults) self.wwwapp.testing = True self.workdir = self.wwwapp.config["WORKDIR"] self.p4tickets = self.wwwapp.config["P4TICKETS"] testConfiguration["TESTDIR"] = self.testdir P4Testcase.setUp(self) def tearDown(self): P4Testcase.tearDown(self) if os.path.isdir(self.testdir): rmtree(self.testdir) else: logger.warn('not a directory?:' + self.testdir) if self.orgP4CONFIG is not None: osenviron["P4CONFIG"] = self.orgP4CONFIG elif "P4CONFIG" in osenviron: del(osenviron["P4CONFIG"]) if self.orgP4TICKETS is not None: osenviron["P4TICKETS"] = self.orgP4TICKETS elif "P4TICKETS" in osenviron: del(osenviron["P4TICKETS"]) if self.orgP4TRUST is not None: osenviron["P4TRUST"] = self.orgP4TRUST elif "P4CONFIG" in osenviron: del(osenviron["P4CONFIG"]) def genAuthHeader(self, user, password): data = b"" if user is not None and len(user) > 0: data = user.encode() data = data + b":" if password is not None and len(password) > 0: data = data + password.encode() return b"Basic " + b64encode(data) def initTestData(self, p4api): testname = self.id().rsplit(".", 1)[1] if testname == "testP4SessionsAuthFail": p4api = self.p4server.p4api p4user = p4api.user newuser = "{}{!s}".format(testname, 0) userdata = p4api.fetch_user(newuser) p4api.save_user(userdata, "-f") p4api.user = newuser p4api.run_password("Perf0rce", "Perf0rce") p4api.user = p4user p4api.run_configure("set", "security=3") elif testname == "testP4Sessions": p4api = self.p4server.p4api p4user = p4api.user for x in range(0, 5): newuser = "{}{!s}".format(testname, x) userdata = p4api.fetch_user(newuser) p4api.save_user(userdata, "-f") p4api.user = newuser p4api.run_password("Perf0rce", "Perf0rce") p4api.user = p4user p4api.run_configure("set", "security=3") def testP4SessionsAuthFail(self): testname = self.id().rsplit(".", 1)[1] users = [] for cmdresult in self.p4server.p4api.run_users(): if cmdresult["User"].startswith(testname): users.append(cmdresult["User"]) self.assertEqual(1, len(users)) with self.wwwapp.app_context(): with current_app.test_client() as client: # success user = users[0] headers = Headers() headers.add('Authorization', self.genAuthHeader(user, "Perf0rce")) headers.add("Content-Type", "application/json") data = {"P4PORT": self.p4server.p4api.port} json_data = json.dumps(data) response = client.post(path="/sessions/", data=json_data, headers=headers) self.assertEqual(200, response.status_code) self.assertEqual("application/json", response.headers.get("Content-Type")) response_data = json.loads(response.get_data()) self.assertIsInstance(response_data, dict) self.assertEqual(user, response_data["userName"]) sessionId = response_data.get(SESSION_HEADER_NAME, None) self.assertIsNotNone(sessionId) self.assertEqual(sessionId, response_data.get("SESSIONID", None)) # test get headers.clear() headers.add('Authorization', "{}={}".format(SESSION_HEADER_NAME, sessionId)) json_data = None response = client.get(path='/sessions/{}'.format(sessionId), data=json_data, headers=headers) self.assertEqual(200, response.status_code) self.assertEqual("application/json", response.headers.get("Content-Type")) response_data = json.loads(response.get_data()) self.assertIsInstance(response_data, dict) self.assertEqual(user, response_data["userName"]) sessionId = response_data.get(SESSION_HEADER_NAME, None) self.assertIsNotNone(sessionId) self.assertEqual(sessionId, response_data.get("SESSIONID", None)) # test delete headers.clear() headers.add('Authorization', "{}={}".format(SESSION_HEADER_NAME, sessionId)) json_data = None response = client.delete(path='/sessions/{}'.format(sessionId), data=json_data, headers=headers) self.assertEqual(204, response.status_code) with self.wwwapp.app_context(): with current_app.test_client() as client: # test no P4PORT headers.clear() headers.add('Authorization', self.genAuthHeader(user, "Perf0rce")) json_data = None response = client.post(path="/sessions/", data=json_data, headers=headers) self.assertEqual(401, response.status_code) # test no P4PORT headers = Headers() headers.add('Authorization', self.genAuthHeader(user, "Perf0rce")) headers.add("Content-Type", "application/json") json_data = None response = client.post(path="/sessions/", data=json_data, headers=headers) self.assertEqual(400, response.status_code) # test no P4PORT headers.clear() headers.add('Authorization', self.genAuthHeader(user, "Perf0rce")) headers.add("Content-Type", "application/json") data = {"BADP4PORT": self.p4server.p4api.port} json_data = json.dumps(data) response = client.post(path="/sessions/", data=json_data, headers=headers) self.assertEqual(401, response.status_code) # test no password headers.clear() headers.add('Authorization', self.genAuthHeader(user, None)) headers.add("Content-Type", "application/json") data = {"P4PORT": self.p4server.p4api.port} json_data = json.dumps(data) response = client.post(path="/sessions/", data=json_data, headers=headers) self.assertEqual(401, response.status_code) # test no USER headers.clear() headers.add('Authorization', self.genAuthHeader(None, "Perf0rce")) headers.add("Content-Type", "application/json") data = {"P4PORT": self.p4server.p4api.port} json_data = json.dumps(data) response = client.post(path="/sessions/", data=json_data, headers=headers) self.assertEqual(401, response.status_code) def postSession(self, client, P4PORT, P4USER, P4PASSWORD): headers = Headers() headers.add('Authorization', self.genAuthHeader(P4USER, P4PASSWORD)) headers.add("Content-Type", "application/json") data = {"P4PORT": P4PORT} json_data = json.dumps(data) return client.post(path="/sessions/", data=json_data, headers=headers) def getSession(self, client, sessionId): headers = Headers() headers.add('Authorization', "{}={}".format(SESSION_HEADER_NAME, sessionId)) return client.get(path="/sessions/{}".format(sessionId), headers=headers) def deleteSession(self, client, sessionId): headers = Headers() headers.add('Authorization', "{}={}".format(SESSION_HEADER_NAME, sessionId)) return client.delete(path="/sessions/{}".format(sessionId), headers=headers) def countTickets(self): linecount = 0 with open(self.p4tickets) as f: for line in f: if len(line.strip()) > 0: linecount = linecount + 1 return linecount def countSessionDirs(self): dircount = 0 for directory in oslistdir(self.workdir): if os.path.isdir(os.path.join(self.workdir, directory)): dircount = dircount + 1 return dircount def testP4Sessions(self): testname = self.id().rsplit(".", 1)[1] users = [] sessions = {} self.assertEqual(0, len(users)) self.assertEqual(0, len(sessions)) self.assertEqual(len(sessions), self.countTickets()) self.assertEqual(len(sessions), self.countSessionDirs()) for cmdresult in self.p4server.p4api.run_users(): if cmdresult["User"].startswith(testname): users.append(cmdresult["User"]) with self.wwwapp.app_context(): with current_app.test_client() as client: response = self.postSession(client, self.p4server.p4api.port, users[-1], "Perf0rce") self.assertEqual(200, response.status_code) self.assertEqual("application/json", response.headers.get("Content-Type")) response_data = json.loads(response.get_data()) self.assertIsInstance(response_data, dict) self.assertEqual(users[-1], response_data["userName"]) sessionId = response_data.get(SESSION_HEADER_NAME, None) self.assertIsNotNone(sessionId) self.assertEqual(sessionId, response_data.get("SESSIONID", None)) sessions[users[-1]] = sessionId self.assertEqual(len(sessions), self.countTickets()) self.assertEqual(len(sessions), self.countSessionDirs()) self.assertEqual(5, len(users)) self.assertEqual(len(users), len(sessions)) for user in users: sessionId = sessions.pop(user) with self.wwwapp.app_context(): with current_app.test_client() as client: response = self.getSession(client, sessionId) self.assertEqual(200, response.status_code) self.assertEqual("application/json", response.headers.get("Content-Type")) response_data = json.loads(response.get_data()) self.assertIsInstance(response_data, dict) self.assertEqual(user, response_data["userName"]) self.assertEqual(sessionId, response_data.get("SESSIONID", None)) with self.wwwapp.app_context(): with current_app.test_client() as client: response = self.deleteSession(client, sessionId) self.assertEqual(204, response.status_code) self.assertEqual(len(sessions), self.countTickets()) self.assertEqual(len(sessions), self.countSessionDirs()) with self.wwwapp.app_context(): with current_app.test_client() as client: response = self.getSession(client, sessionId) self.assertEqual(401, response.status_code) response = self.deleteSession(client, sessionId) self.assertEqual(401, response.status_code) self.assertEqual(5, len(users)) self.assertEqual(0, len(sessions)) self.assertEqual(len(sessions), self.countTickets()) self.assertEqual(len(sessions), self.countSessionDirs()) if __name__ == "__main__": #import sys;sys.argv = ['', 'Test.testName'] unittest.main()