#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ #============================================================================== # Copyright and license info is available in the LICENSE file included with # the Server Deployment Package (SDP), and also available online: # https://swarm.workshop.perforce.com/projects/perforce-software-sdp/view/main/LICENSE #------------------------------------------------------------------------------ This script ensures that read-only job fields are not changed by ordinary users. # # == Synopsis # # check_job_edit.rb # # == Sample Trigger Definition # # Using a trigger spec like this: # # check_job_edit form-save job "check_job_edit.rb -p %serverport% -u %user% %formfile%" # check_job_edit form-save job "ruby c:/perforce/scripts/check_job_edit.rb -p %serverport% -u perforce %user% %formname% %formfile% " ## This script can optionally: ## ## 1. Prevent creation of new jobs in Perforce by anyone other than the ## replication user. ## 2 Prevent modification of read-only fields of jobs in Perforce by anyone ## other than the replication user. ## 3. Create newly mirrored jobs using the same name as that in the defect ## tracker. ## ## To install, add a line to your triggers table like the following: ## ## jobId form-in job "/p4root/triggers/changeid.pl %user% %formfile%" ## ## or if you're on Windows, you need to prepend the Perl interpreter like this: ## ## jobId form-in job "c:\path\to\perl.exe changeid.pl %user% %formfile%" ## ## Also, don't forget to make the file executable, change the path to the Perl ## interpreter, and set the configuration variables below. ## ## To configure, read and modify the following lines up to the comment that ## reads "END OF CONFIGURATION BLOCK". You may also need to modify the ## definition of which fields constitute a new job based on your jobspec. This ## is in the allowed_job() function. """ # Python 2.7/3.3 compatibility. from __future__ import print_function import P4 import sys import os import re import argparse import logging import textwrap # The error message we give to the user USER_MESSAGE = """ You have changed one or more read-only job fields: """ # The list of writeable fields that users can change. # Changes to any other fields are rejected. WRITEABLE_FIELDS = ["Status", "Date"] # Replicator user - this user is allowed to change fields REPLICATOR_USER = "p4dtg" JIRA_USER = "jira" # If working on a server with the SDP, the 'LOGS' environment variable contains # the path the standard logging directory. The '-L ' argument shoudl be # specified in non-SDP environments. LOGDIR = os.getenv('LOGS', '/p4/1/logs') DEFAULT_LOG_FILE = "%s/p4triggers.log" % LOGDIR DEFAULT_VERBOSITY = 'DEBUG' LOGGER_NAME = 'P4Triggers' class CheckJobEditTrigger(object): """A subclass of P4Trigger. Use this trigger to ensure that only certain job fields are writable. """ def __init__(self, *args, **kwargs): kwargs['charset'] = 'none' # API Levels are defined here: http://answers.perforce.com/articles/KB/3197 # Ensure this matches the P4Python version used. # API Level 79 is for p4d 2015.2. kwargs['api_level'] = 79 parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=textwrap.dedent('''\ NAME P4Triggers.py DESCRIPTION Common trigger template class '''), epilog="Copyright (c) 2008-2016 Perforce Software, Inc." ) parser.add_argument('user', help="User carrying out the command - the %user% argument from trigger definition.") parser.add_argument('formfile', help="Formfile containing job definition - the %formfile% argument from trigger definition.") parser.add_argument('-p', '--port', default=None, help="Perforce server port: set using %serverport% or $P4PORT") parser.add_argument('-u', '--user', default=None, help="Perforce user: $P4USER") parser.add_argument('-L', '--log', default=DEFAULT_LOG_FILE, help="Default: " + DEFAULT_LOG_FILE) parser.add_argument('-v', '--verbosity', nargs='?', const="INFO", default=DEFAULT_VERBOSITY, choices=('DEBUG', 'WARNING', 'INFO', 'ERROR', 'FATAL'), help="Output verbosity level. Default is: " + DEFAULT_VERBOSITY) self.options = parser.parse_args() self.logger = logging.getLogger(LOGGER_NAME) self.logger.setLevel(self.options.verbosity) logformat = '%(levelname)s [%(asctime)s] [%(funcName)s : %(lineno)d] - %(message)s' logging.basicConfig(format=logformat, filename=self.options.log, level=self.options.verbosity) # self.logHandler = logging.FileHandler(self.options.log, mode='a') # # # df = logging.Formatter('%(asctime)s %(levelname)s: %(message)s', datefmt='%m/%d/%Y %H:%M:%S') # bf = logging.Formatter('%(levelname)s: %(message)s') # self.logHandler.setFormatter(bf) # self.logger.addHandler(self.logHandler) self.logger.debug("Command Line Options: %s\n" % self.options) # # Method to send a message to the user. Just writes to stdout, but it's # nice to encapsulate that here. # def message(self, msg): print(msg) def errorMessage(self): return """ An error was encountered during trigger execution. Please contact your Perforce administrator and ask them to investigate the cause of this error """ def reportError(self, err): """Method to encapsulate error reporting to make sure all errors are reported in a consistent way""" self.logger.error("Error during trigger execution:") self.reportP4Errors() self.logger.error(err) # return message to the user self.message(self.errorMessage()) return 1 def reportP4Errors(self): lines = [] for e in self.p4.errors: lines.append("ERROR: %s" % e) for w in self.p4.warnings: lines.append("WARNING: %s" % w) print("\n".join(lines)) self.logger.error("\n".join(lines)) return 1 def run(self): """Runs trigger""" try: self.logger.debug("CheckJobEditTrigger firing") # Don't do anything if the user is replicator user if self.options.user in [REPLICATOR_USER, JIRA_USER]: return 0 # First fetch the original job and then compare all fields. self.p4 = P4.P4() if self.options.port: self.p4.port = self.options.port if self.options.user: self.p4.user = self.options.user self.logger.debug("port: '%s', user: '%s'" % (self.p4.port, self.p4.user)) self.p4.connect() with open(self.options.formfile, 'r') as f: content = f.read() self.logger.debug(content) new_job = self.p4.parse_job(content) self.logger.debug("Parsed: %s" % (str(new_job))) jobname = "new" if "Job" in new_job: jobname = new_job["Job"] if jobname == "new": self.message("You are not allowed to create new jobs!") return 1 else: orig_job = self.p4.fetch_job(jobname) self.logger.debug("Original: %s" % (str(orig_job))) fields_in_error = [] all_keys = set(orig_job.keys()) all_keys = all_keys.union(new_job.keys()) for key in all_keys: if key in WRITEABLE_FIELDS: continue if key in orig_job and key in new_job and orig_job[key] != new_job[key]: # if !((!orig_job[key] && new_job[key] == "") || (!new_job[key] && orig_job[key] == "")) then orig_val = new_val = None if key in orig_job: orig_val = orig_job[key].replace("\r\n", "\n") orig_val = orig_val.replace("\s+", " ") # TODO if key in new_job: new_val = new_job[key].replace("\r\n", "\n") new_val = re.sub("\s+", " ", new_val) if orig_val != new_val: # message("Orig #{key} field'\n#{orig_val.inspect}'\nNew:'\n#{new_val.inspect}'\n") fields_in_error.append(key) if fields_in_error: msg = USER_MESSAGE + str(fields_in_error) + "\n\n" self.logger.error(msg) self.message(msg) return 1 except P4.P4Exception as err: return self.reportError(err) except: print("Exception during trigger execution: %s %s %s" % sys.exc_info()) return 1 return 0 if __name__ == '__main__': """ Main Program""" trigger = CheckJobEditTrigger(*sys.argv[1:]) sys.exit(trigger.run())