# -*- encoding: UTF8 -*- # Test harness for swarm_rename_users.py from __future__ import print_function import sys import unittest import os import json sys.path.append(os.path.join('..', '..', 'triggers', 'tests')) sys.path.append('..') from p4testutils import TestCase, P4Server, localDirectory, create_file, append_to_file # parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # sys.path.insert(0, parent_dir) import swarm_rename_users as sru os.environ["LOGS"] = "." LOGGER_NAME = "TestSwarmRenameUsers" LOG_FILE = "log-SwarmRenameUsers.log" python3 = sys.version_info[0] >= 3 class TestSwarmRenameUsersDetails(TestCase): def __init__(self, methodName='runTest'): super(TestSwarmRenameUsersDetails, self).__init__(LOGGER_NAME, LOG_FILE, methodName=methodName) def setUp(self): pass def tearDown(self): pass def testActivities(self): json1 = """ {"type":"change","link":["change",{"change":2072659}], "user":"olduser","action":"committed", "behalfOf":null,"projects":[]} """ json2 = """ {"type":"change","link":["change",{"change":2072659}], "user":"fred","action":"committed", "behalfOf":"olduser","projects":[]} """ json3 = """ {"type":"change","link":["change",{"change":2072659}], "user":"fred","action":"committed", "behalfOf":null,"projects":[]} """ usersToRename = {'olduser': 'newuser'} obj = sru.Activity(json.loads(json1)) self.assertEqual('olduser', obj.user()) obj.Update(usersToRename) self.assertEqual('newuser', obj.user()) self.assertTrue(obj.updated) obj = sru.Activity(json.loads(json2)) self.assertEqual('fred', obj.user()) self.assertEqual('olduser', obj.behalfOf()) obj.Update(usersToRename) self.assertEqual('fred', obj.user()) self.assertEqual('newuser', obj.behalfOf()) self.assertTrue(obj.updated) obj = sru.Activity(json.loads(json3)) self.assertEqual('fred', obj.user()) self.assertEqual(None, obj.behalfOf()) obj.Update(usersToRename) self.assertEqual('fred', obj.user()) self.assertEqual(None, obj.behalfOf()) self.assertFalse(obj.updated) def testComments(self): json1 = """ {"id":null,"topic":"changes\/577295","context":null, "user":"olduser","time":1358803378,"body":"Adding comment"} """ json2 = """ {"id":null,"topic":"changes\/577295","context":null, "user":"olduser","readBy":[],"time":1358803378,"body":"Adding comment"} """ json3 = """ {"id":null,"topic":"changes\/577295","context":null, "user":"jim","readBy":["olduser","fred"],"time":1358803378,"body":"Adding comment"} """ usersToRename = {'olduser': 'newuser'} obj = sru.Comment(json.loads(json1)) self.assertEqual('olduser', obj.user()) self.assertEqual(None, obj.readBy()) obj.Update(usersToRename) self.assertEqual('newuser', obj.user()) self.assertEqual(None, obj.readBy()) self.assertTrue(obj.updated) obj = sru.Comment(json.loads(json2)) self.assertEqual('olduser', obj.user()) self.assertEqual([], obj.readBy()) obj.Update(usersToRename) self.assertEqual('newuser', obj.user()) self.assertEqual([], obj.readBy()) self.assertTrue(obj.updated) obj = sru.Comment(json.loads(json3)) self.assertEqual('jim', obj.user()) self.assertEqual(['olduser', 'fred'], obj.readBy()) obj.Update(usersToRename) self.assertEqual('jim', obj.user()) self.assertEqual(['newuser', 'fred'], obj.readBy()) self.assertTrue(obj.updated) def testProjects(self): # Test project with no renames for owners/moderators json1 = """ {"name": "Test Project", "defaults": { "reviewers": [] }, "description": "Test for demo purposes", "owners": [ "fred", "bill" ], "moderators": [ "fred", "bill" ] } """ usersToRename = {'olduser': 'newuser'} obj = sru.Project(json.loads(json1)) self.assertEqual(['fred', 'bill'], obj.owners()) self.assertEqual(['fred', 'bill'], obj.moderators()) obj.Update(usersToRename) self.assertEqual(['fred', 'bill'], obj.owners()) self.assertEqual(['fred', 'bill'], obj.moderators()) self.assertFalse(obj.updated) # Test owners/moderators updated json2 = """ {"name": "Test Project", "defaults": { "reviewers": [] }, "description": "Test for demo purposes", "owners": [ "olduser", "bill" ], "moderators": [ "olduser", "bill" ] } """ usersToRename = {'olduser': 'newuser'} obj = sru.Project(json.loads(json2)) self.assertEqual(['olduser', 'bill'], obj.owners()) self.assertEqual(['olduser', 'bill'], obj.moderators()) obj.Update(usersToRename) self.assertEqual(['newuser', 'bill'], obj.owners()) self.assertEqual(['newuser', 'bill'], obj.moderators()) self.assertTrue(obj.updated) # Test default:reviewers not updated json3 = """ {"name": "Test Project", "defaults": { "reviewers": ["jim"] }, "description": "Test for demo purposes", "owners": ["fred"], "moderators": ["jim"] } """ usersToRename = {'olduser': 'newuser'} obj = sru.Project(json.loads(json3)) self.assertEqual(['fred'], obj.owners()) self.assertEqual(['jim'], obj.moderators()) self.assertEqual(['jim'], obj.defaultReviewers()) obj.Update(usersToRename) self.assertEqual(['fred'], obj.owners()) self.assertEqual(['jim'], obj.moderators()) self.assertEqual(['jim'], obj.defaultReviewers()) self.assertFalse(obj.updated) # Test default:reviewers are updated json4 = """ {"name": "Test Project", "defaults": { "reviewers": ["olduser", "jim"] }, "description": "Test for demo purposes", "owners": ["fred"], "moderators": ["jim"] } """ usersToRename = {'olduser': 'newuser'} obj = sru.Project(json.loads(json4)) self.assertEqual(['fred'], obj.owners()) self.assertEqual(['jim'], obj.moderators()) self.assertEqual(['olduser', 'jim'], obj.defaultReviewers()) obj.Update(usersToRename) self.assertEqual(['fred'], obj.owners()) self.assertEqual(['jim'], obj.moderators()) self.assertEqual(['newuser', 'jim'], obj.defaultReviewers()) self.assertTrue(obj.updated) # Test branches:defaults:reviewers not updated json5 = """ {"name": "Test Project", "description": "Test for demo purposes", "owners": ["fred"], "moderators": ["jim"], "branches": [ { "id": "main", "defaults": { "reviewers": [] }, "moderators": [] } ] } """ usersToRename = {'olduser': 'newuser'} obj = sru.Project(json.loads(json5)) self.assertEqual(['fred'], obj.owners()) self.assertEqual(['jim'], obj.moderators()) self.assertEqual(None, obj.defaultReviewers()) self.assertEqual([{'main': []}], obj.branchDefaultReviewers()) self.assertEqual([{'main': []}], obj.branchModerators()) obj.Update(usersToRename) self.assertEqual(['fred'], obj.owners()) self.assertEqual(['jim'], obj.moderators()) self.assertEqual(None, obj.defaultReviewers()) self.assertEqual([{'main': []}], obj.branchModerators()) self.assertEqual([{'main': []}], obj.branchDefaultReviewers()) self.assertFalse(obj.updated) json6 = """ {"name": "Test Project", "description": "Test for demo purposes", "owners": ["fred"], "moderators": ["jim"], "branches": [ { "id": "main", "defaults": { "reviewers": ["jim"] }, "moderators": ["john"] } ] } """ usersToRename = {'olduser': 'newuser'} obj = sru.Project(json.loads(json6)) self.assertEqual(['fred'], obj.owners()) self.assertEqual(['jim'], obj.moderators()) self.assertEqual(None, obj.defaultReviewers()) self.assertEqual([{'main': ["jim"]}], obj.branchDefaultReviewers()) self.assertEqual([{'main': ["john"]}], obj.branchModerators()) obj.Update(usersToRename) self.assertEqual(['fred'], obj.owners()) self.assertEqual(['jim'], obj.moderators()) self.assertEqual(None, obj.defaultReviewers()) self.assertEqual([{'main': ["jim"]}], obj.branchDefaultReviewers()) self.assertEqual([{'main': ["john"]}], obj.branchModerators()) self.assertFalse(obj.updated) # Test branches:defaults:reviewers and branches:moderators are updated json7 = """ {"name": "Test Project", "description": "Test for demo purposes", "owners": ["fred"], "moderators": ["jim"], "branches": [ { "id": "main", "defaults": { "reviewers": ["jim", "olduser"] }, "moderators": ["john", "olduser"] } ] } """ usersToRename = {'olduser': 'newuser'} obj = sru.Project(json.loads(json7)) self.assertEqual(['fred'], obj.owners()) self.assertEqual(['jim'], obj.moderators()) self.assertEqual(None, obj.defaultReviewers()) self.assertEqual([{'main': ["john", "olduser"]}], obj.branchModerators()) self.assertEqual([{'main': ["jim", "olduser"]}], obj.branchDefaultReviewers()) obj.Update(usersToRename) self.assertEqual(['fred'], obj.owners()) self.assertEqual(['jim'], obj.moderators()) self.assertEqual(None, obj.defaultReviewers()) self.assertEqual([{'main': ["john", "newuser"]}], obj.branchModerators()) self.assertEqual([{'main': ["jim", "newuser"]}], obj.branchDefaultReviewers()) self.assertTrue(obj.updated) def testReviews(self): # Test review with no renames for versions/author/approvals/participants json1 = """ { "changes": [12345], "commits": [123456], "versions": [{ "difference": 1, "stream": null, "user": "fred", "time": 1611858237, "pending": true, "addChangeMode": "replace"} ], "author": "fred", "approvals": { "mike": [1, 3], "fred": [3] }, "participants": { "mike": {"vote": {"value": 1,"version": 3}}, "fred": [], "swarm-group-test": { "minimumRequired": "0" } } } """ usersToRename = {'olduser': 'newuser'} obj = sru.Review(json.loads(json1)) self.assertEqual('fred', obj.author()) self.assertEqual(['mike', 'fred'], obj.approvals()) self.assertEqual(['mike', 'fred', 'swarm-group-test'], obj.participants()) self.assertEqual(['fred'], obj.versionUsers()) obj.Update(usersToRename) self.assertEqual('fred', obj.author()) self.assertEqual(['mike', 'fred'], obj.approvals()) self.assertEqual(['mike', 'fred', 'swarm-group-test'], obj.participants()) self.assertEqual(['fred'], obj.versionUsers()) self.assertFalse(obj.updated) # Test review with renames for versions/author/approvals/participants json2 = """ { "changes": [12345], "commits": [123456], "versions": [{ "difference": 1, "stream": null, "user": "olduser", "time": 1611858237, "pending": true, "addChangeMode": "replace"} ], "author": "olduser", "approvals": { "olduser": [1, 3], "fred": [3] }, "participants": { "olduser": {"vote": {"value": 1,"version": 3}}, "fred": [], "swarm-group-test": { "minimumRequired": "0" } } } """ obj = sru.Review(json.loads(json2)) self.assertEqual('olduser', obj.author()) self.assertEqual(['olduser', 'fred'], obj.approvals()) self.assertEqual(['olduser', 'fred', 'swarm-group-test'], obj.participants()) self.assertEqual(['olduser'], obj.versionUsers()) obj.Update(usersToRename) self.assertEqual('newuser', obj.author()) self.assertEqual(['fred', 'newuser'], obj.approvals()) self.assertEqual(['fred', 'swarm-group-test', 'newuser'], obj.participants()) self.assertEqual(['newuser'], obj.versionUsers()) self.assertEqual([1, 3], obj.json['approvals']['newuser']) self.assertEqual({'vote': {'value': 1, 'version': 3}}, obj.json['participants']['newuser']) self.assertTrue(obj.updated) def testFileInfo(self): # Test review with no renames for readBy json1 = """ {"readBy":{ "jim":{"version":28,"digest":"EEFF792157ADBB2311938D7358F0588B"}, "mike":{"version":30,"digest":"1E43325205374FBBE0E72DAB5930F8DB"} } } """ usersToRename = {'olduser': 'newuser'} obj = sru.FileInfo(json.loads(json1)) self.assertEqual(['jim', 'mike'], obj.readBy()) obj.Update(usersToRename) self.assertEqual(['jim', 'mike'], obj.readBy()) self.assertFalse(obj.updated) # Test review with no renames for readBy json1 = """ {"readBy":{ "olduser":{"version":28,"digest":"EEFF792157ADBB2311938D7358F0588B"}, "mike":{"version":30,"digest":"1E43325205374FBBE0E72DAB5930F8DB"} } } """ usersToRename = {'olduser': 'newuser'} obj = sru.FileInfo(json.loads(json1)) self.assertEqual(['olduser', 'mike'], obj.readBy()) obj.Update(usersToRename) self.assertEqual(['mike', 'newuser'], obj.readBy()) self.assertEqual({'version': 28, 'digest': 'EEFF792157ADBB2311938D7358F0588B'}, obj.json['readBy']['newuser']) self.assertTrue(obj.updated) def testWorkflow(self): # Test workflow with no renames for owners json1 = """ {"on_submit":{"with_review":{"rule":"approved","mode":"inherit"}, "without_review":{"rule":"reject","mode":"inherit"}}, "owners":["swarm-group-dev","dave", "scott"], "end_rules":{"update":{"rule":"no_checking","mode":"inherit"}} } """ usersToRename = {'olduser': 'newuser'} obj = sru.Workflow(json.loads(json1)) self.assertEqual(['swarm-group-dev', 'dave', 'scott'], obj.owners()) obj.Update(usersToRename) self.assertEqual(['swarm-group-dev', 'dave', 'scott'], obj.owners()) self.assertFalse(obj.updated) # Test workflow with renames for owners json1 = """ {"on_submit":{"with_review":{"rule":"approved","mode":"inherit"}, "without_review":{"rule":"reject","mode":"inherit"}}, "owners":["swarm-group-dev","olduser", "scott"], "end_rules":{"update":{"rule":"no_checking","mode":"inherit"}} } """ usersToRename = {'olduser': 'newuser'} obj = sru.Workflow(json.loads(json1)) self.assertEqual(['swarm-group-dev', 'olduser', 'scott'], obj.owners()) obj.Update(usersToRename) self.assertEqual(['swarm-group-dev', 'newuser', 'scott'], obj.owners()) self.assertTrue(obj.updated) class TestSwarmRenameUsers(TestCase): def __init__(self, methodName='runTest'): super(TestSwarmRenameUsers, self).__init__(LOGGER_NAME, LOG_FILE, methodName=methodName) def setUp(self): pass def tearDown(self): pass def setupServer(self): self.server = P4Server() p4 = self.server.p4 p4.logger = self.logger # This works if no spaces in server root pathname! port = p4.port.replace('"', '') self.logger.debug("port: |%s|" % port) return p4 def testRename(self): """Test renaming of users""" p4 = self.setupServer() usersToRename = {'olduser': 'newuser'} data = [{"key": "swarm-activity-ffe71695", "val":'{"type":"change","link":["change",{"change":2072657}],"user":"olduser","action": "committed","behalfOf":null,"projects":[]}'}, {"key": "swarm-activity-ffe71696", "val": '{"type":"change","link":["change",{"change":2072658}],"user":"fred","action": "committed","behalfOf":"olduser","projects":[]}'}, {"key": "swarm-activity-ffe71697", "val": '{"type":"change","link":["change",{"change":2072659}],"user":"fred","action":"committed","behalfOf":null,"projects":[]}'}, {"key": 'swarm-comment-0000010435', "val": '{"id":null,"topic":"changes/577295","context":null,"user":"olduser","time":1358803378,"body":"Adding comment"}'}, {"key": 'swarm-comment-0000010437', "val": '{"id":null,"topic":"changes/577295","context":null,"user":"jim","readBy":["olduser","fred"],"time":1358803378,"body":"Adding comment"}'}, {"key": 'swarm-project-test', "val": '{"name":"Test project","defaults":{"reviewers":[]},"description":"test desc","owners":["fred","olduser"],"branches":[{"id":"main","name":"Main","workflow":null,"paths":["//test/main/..."],"defaults":{"reviewers":[]},"minimumUpVotes":null,"retainDefaultReviewers":false,"moderators":[],"moderators-groups":[]},{"id":"p1","name":"p1","workflow":null,"paths":["//test/p16.1/..."],"defaults":{"reviewers":[]},"minimumUpVotes":null,"retainDefaultReviewers":false,"moderators":[],"moderators-groups":[]},{"id":"r16-1","name":"r16.1","workflow":null,"paths":["//test/r1/..."],"defaults":{"reviewers":[]},"minimumUpVotes":null,"retainDefaultReviewers":false,"moderators":[],"moderators-groups":[]}],"jobview":"","emailFlags":{"change_email_project_users":"1","review_email_project_members":"1"},"tests":{"enabled":false,"url":"","postBody":"","postFormat":"url"},"deploy":{"enabled":false,"url":""},"deleted":false,"private":false,"workflow":null,"retainDefaultReviewers":false,"minimumUpVotes":null,"upgrade":1,"id":"test-proj"}'}, {"key": 'swarm-user-fred', "val": '{"follows":[],"delayedComments":null,"user_notification_settings":null'}, {"key": 'swarm-user-olduser', "val": '{"follows":[],"delayedComments":null,"user_notification_settings":null'}, ] for d in data: p4.run_key(d["key"], d["val"]) obj = sru.SwarmRenameUsers() obj.renameUsers(p4, usersToRename) expected = {"swarm-activity-ffe71695": '{"type":"change","link":["change",{"change":2072657}],"user":"newuser","action":"committed","behalfOf":null,"projects":[]}', "swarm-activity-ffe71696": '{"type":"change","link":["change",{"change":2072658}],"user":"fred","action":"committed","behalfOf":"newuser","projects":[]}', "swarm-activity-ffe71697": '{"type":"change","link":["change",{"change":2072659}],"user":"fred","action":"committed","behalfOf":null,"projects":[]}' } fdata = p4.run_keys("-e", "swarm-activity-*") for d in fdata: self.assertTrue((d["key"] in expected)) self.assertEqual(expected[d["key"]], d["value"]) expected = {'swarm-comment-0000010435': '{"id":null,"topic":"changes/577295","context":null,"user":"newuser","time":1358803378,"body":"Adding comment"}', 'swarm-comment-0000010437': '{"id":null,"topic":"changes/577295","context":null,"user":"jim","readBy":["newuser","fred"],"time":1358803378,"body":"Adding comment"}', } fdata = p4.run_keys("-e", "swarm-comment-*") for d in fdata: self.assertTrue((d["key"] in expected)) self.assertEqual(expected[d["key"]], d["value"]) self.maxDiff = None expected = {'swarm-project-test': '{"name":"Test project","defaults":{"reviewers":[]},"description":"test desc","owners":["fred","newuser"],"branches":[{"id":"main","name":"Main","workflow":null,"paths":["//test/main/..."],"defaults":{"reviewers":[]},"minimumUpVotes":null,"retainDefaultReviewers":false,"moderators":[],"moderators-groups":[]},{"id":"p1","name":"p1","workflow":null,"paths":["//test/p16.1/..."],"defaults":{"reviewers":[]},"minimumUpVotes":null,"retainDefaultReviewers":false,"moderators":[],"moderators-groups":[]},{"id":"r16-1","name":"r16.1","workflow":null,"paths":["//test/r1/..."],"defaults":{"reviewers":[]},"minimumUpVotes":null,"retainDefaultReviewers":false,"moderators":[],"moderators-groups":[]}],"jobview":"","emailFlags":{"change_email_project_users":"1","review_email_project_members":"1"},"tests":{"enabled":false,"url":"","postBody":"","postFormat":"url"},"deploy":{"enabled":false,"url":""},"deleted":false,"private":false,"workflow":null,"retainDefaultReviewers":false,"minimumUpVotes":null,"upgrade":1,"id":"test-proj"}'} fdata = p4.run_keys("-e", "swarm-project-*") for d in fdata: self.assertTrue((d["key"] in expected)) self.assertEqual(expected[d["key"]], d["value"]) expected = {'swarm-user-fred': '{"follows":[],"delayedComments":null,"user_notification_settings":null', 'swarm-user-newuser':'{"follows":[],"delayedComments":null,"user_notification_settings":null'} fdata = p4.run_keys("-e", "swarm-user-*") for d in fdata: self.assertTrue((d["key"] in expected)) self.assertEqual(expected[d["key"]], d["value"]) if __name__ == '__main__': unittest.main()
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#1 | 27354 | C. Thomas Tyler |
Released SDP 2020.1.27351 (2021/01/31). Copy Up using 'p4 copy -r -b perforce_software-sdp-dev'. |
||
//guest/perforce_software/sdp/dev/Unsupported/Samples/bin/test/test_swarm_rename_users.py | |||||
#2 | 27338 | Robert Cowham | Handle swarm-user-* keys | ||
#1 | 27336 | Robert Cowham | Implement rename of users for Swarm - backdoor version updating 'p4 keys' info |