# -*- encoding: UTF8 -*- # Test harness for ControlStreamCreation.py from __future__ import print_function import sys import unittest import os import re import P4 from p4testutils import TestCase, P4Server, localDirectory, create_file, append_to_file python3 = sys.version_info[0] >= 3 parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, parent_dir) from ControlStreamCreation import ControlStreamCreation os.environ["LOGS"] = "." LOGGER_NAME = "TestControlStreamCreation" LOG_FILE = "log-TestControlStreamCreation.log" class TestControlStreamCreation(TestCase): def __init__(self, methodName='runTest'): super(TestControlStreamCreation, self).__init__(LOGGER_NAME, LOG_FILE, methodName=methodName) def setUp(self): self.server = P4Server() trigpath = os.path.join(parent_dir, "ControlStreamCreation.py") self.config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "~test_config.yaml") p4 = self.server.p4 self.p4 = p4 p4.logger = self.logger depot = p4.fetch_depot("streams") depot["Type"] = "stream" p4.save_depot(depot) # This works if no spaces in server root pathname! port = p4.port.replace('"', '') self.logger.debug("port: |%s|" % port) triggers = p4.fetch_triggers() triggers['Triggers'] = ['control-stream-creation form-save stream " python {} -p %quote%{}%quote% ' '-u {} -c {} %user% %formfile% "'.format(trigpath, port, p4.user, self.config_path), ] self.logger.debug(triggers) p4.save_triggers(triggers) # Reconnect to pick up changes p4.disconnect() p4.connect() def tearDown(self): pass def testControlStreamCreation(self): """check that it works when called as a trigger""" p4 = self.p4 with open(self.config_path, "w") as f: f.write(""" msg_cant_create_stream: - "" - "You are not allowed to create new streams." - "Check with the admins to be added to an appropriate group." can_create_streams: - name: All projects stream_paths: - ".*" allowed_users_groups: - user1 - group1 - group2 """) stream = p4.fetch_stream("-tmainline", "//streams/main") try: p4.save_stream(stream) self.assertTrue(False, "Expected exception not found") except P4.P4Exception as e: self.assertRegex(str(e), r"You are not allowed to create new streams") with open(self.config_path, "w") as f: f.write(""" msg_cant_create_stream: - "" - "You are not allowed to create new streams." - "Check with the admins to be added to an appropriate group." can_create_streams: - name: All projects stream_paths: - ".*" allowed_users_groups: - testuser """) stream = p4.fetch_stream("-tmainline", "//streams/main") p4.save_stream(stream) with open(self.config_path, "w") as f: f.write(""" msg_cant_create_stream: - "" - "You are not allowed to create new streams." - "Check with the admins to be added to an appropriate group." can_create_streams: - name: All projects stream_paths: - "//streams/.*2" allowed_users_groups: - anotheruser """) stream = p4.fetch_stream("-tmainline", "//streams/main2") try: p4.save_stream(stream) self.assertTrue(False, "Expected exception not found") except P4.P4Exception as e: self.assertRegex(str(e), r"You are not allowed to create new streams") with open(self.config_path, "w") as f: f.write(""" msg_cant_create_stream: - "" - "You are not allowed to create new streams." - "Check with the admins to be added to an appropriate group." can_create_streams: - name: All projects stream_paths: - "//streams/.*2" allowed_users_groups: - testuser """) stream = p4.fetch_stream("-tmainline", "//streams/main2") p4.save_stream(stream) with open(self.config_path, "w") as f: f.write(""" msg_cant_create_stream: - "" - "You are not allowed to create new streams." - "Check with the admins to be added to an appropriate group." can_create_streams: - name: All projects stream_paths: - "//streams/(.*" allowed_users_groups: - anotheruser """) stream = p4.fetch_stream("-tmainline", "//streams/main3") try: p4.save_stream(stream) self.assertTrue(False, "Expected exception not found") except P4.P4Exception as e: self.assertRegex(str(e), r"You are not allowed to create new streams") if __name__ == '__main__': unittest.main()