#!/usr/bin/env python3 # -*- coding: utf-8 -*- #============================================================================== # Copyright and license info is available in the LICENSE file included with # the Server Deployment Package (SDP), and also available online: # https://swarm.workshop.perforce.com/projects/perforce-software-sdp/view/main/LICENSE #------------------------------------------------------------------------------ """ NAME: CheckJobEditTrigger.py DESCRIPTION: This trigger is intended for use with P4DTG (Defect Tracking Replication) installations. It can (optionally as configured): 1. Prevent creation of new jobs in Perforce by anyone other than the replication user. 2 Prevent modification of read-only fields of jobs in Perforce by anyone other than the replication user. 3. Create newly mirrored jobs using the same name as that in the defect tracker. To install, add a line to your Perforce triggers table like the following: job_save_check form-save job "python /p4/common/bin/triggers/CheckJobEditTrigger.py -p %serverport% -u perforce %user% %formfile% " or (if server is standard SDP and has appropriate environment defaults for P4PORT and P4USER): job_save_check form-save job "python /p4/common/bin/triggers/CheckJobEditTrigger.py %user% %formfile% " You may need to provide the full path to python executable, or edit the path to the trigger. Also, don't forget to make the file executable, and set the configuration variables below. To configure, read and modify the following lines up to the comment that reads "END OF CONFIGURATION BLOCK". You may also need to modify the definition of which fields constitute a new job based on your jobspec. This is in the allowed_job() function. """ # Python 2.7/3.3 compatibility. from __future__ import print_function import P4 import sys import os import re import argparse import logging import textwrap # CONFIGURATION BLOCK # The error messages we give to the user MSG_CANT_CREATE_JOBS = """ You are not allowed to create new jobs! """ MSG_CANT_CHANGE_FIELDS = """ You have changed one or more read-only job fields: """ # The list of writeable fields that users can change. # Changes to any other fields are rejected. # Please validate this against your jobspec. WRITEABLE_FIELDS = ["Status", "Date"] # Replicator user - this user is allowed to change fields REPLICATOR_USER = "p4dtg" JIRA_USER = "jira" # If working on a server with the SDP, the 'LOGS' environment variable contains # the path the standard logging directory. The '-L ' argument shoudl be # specified in non-SDP environments. LOGDIR = os.getenv('LOGS', '/p4/1/logs') DEFAULT_LOG_FILE = "%s/p4triggers.log" % LOGDIR DEFAULT_VERBOSITY = 'DEBUG' LOGGER_NAME = 'P4Triggers' # END OF CONFIGURATION BLOCK class CheckJobEditTrigger(object): """A subclass of P4Trigger. Use this trigger to ensure that only certain job fields are writable. """ def __init__(self, *args, **kwargs): kwargs['charset'] = 'none' # API Levels are defined here: http://answers.perforce.com/articles/KB/3197 # Ensure this matches the P4Python version used. # API Level 79 is for p4d 2015.2. kwargs['api_level'] = 79 desc = textwrap.dedent(__doc__) parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=desc, epilog="Copyright (c) 2008-2017 Perforce Software, Inc." ) parser.add_argument('user', help="User carrying out the command - the %%user%% argument from trigger definition.") parser.add_argument('formfile', help="Formfile containing job definition - the %%formfile%% argument from trigger definition.") parser.add_argument('-p', '--port', default=None, help="Perforce server port - set using %%serverport%%. Default: $P4PORT") parser.add_argument('-u', '--user', default=None, help="Perforce user. Default: $P4USER") parser.add_argument('-L', '--log', default=DEFAULT_LOG_FILE, help="Default: " + DEFAULT_LOG_FILE) parser.add_argument('-v', '--verbosity', nargs='?', const="INFO", default=DEFAULT_VERBOSITY, choices=('DEBUG', 'WARNING', 'INFO', 'ERROR', 'FATAL'), help="Output verbosity level. Default is: " + DEFAULT_VERBOSITY) self.options = parser.parse_args() self.logger = logging.getLogger(LOGGER_NAME) self.logger.setLevel(self.options.verbosity) logformat = '%(levelname)s [%(asctime)s] [%(funcName)s : %(lineno)d] - %(message)s' logging.basicConfig(format=logformat, filename=self.options.log, level=self.options.verbosity) self.logger.debug("Command Line Options: %s\n" % self.options) def message(self, msg): """Method to send a message to the user. Just writes to stdout, but it's nice to encapsulate that here.""" print(msg) def errorMessage(self): return """ An error was encountered during trigger execution. Please contact your Perforce administrator and ask them to investigate the cause of this error """ def reportError(self, err): """Method to encapsulate error reporting to make sure all errors are reported in a consistent way :param err: """ self.logger.error("Error during trigger execution:") self.reportP4Errors() self.logger.error(err) # return message to the user self.message(self.errorMessage()) return 1 def reportP4Errors(self): lines = [] for e in self.p4.errors: lines.append("ERROR: %s" % e) for w in self.p4.warnings: lines.append("WARNING: %s" % w) print("\n".join(lines)) self.logger.error("\n".join(lines)) return 1 def find_date_field(self, job_spec): """Find's the specific field in job spec which is a date modified field :param job_spec: """ self.logger.debug("Jobspec: %s" % str(job_spec)) # Format: # 104 Date date 20 always for f in job_spec["Fields"]: parts = f.split() if parts[2] == "date" and parts[4] == "always": self.logger.debug("Date field: %s" % parts[1]) return parts[1] return None def run(self): """Runs trigger""" try: self.logger.debug("CheckJobEditTrigger firing") # Don't do anything if the user is replicator user if self.options.user in [REPLICATOR_USER, JIRA_USER]: return 0 # First fetch the original job and then compare all fields. self.p4 = P4.P4() if self.options.port: self.p4.port = self.options.port if self.options.user: self.p4.user = self.options.user self.logger.debug("port: '%s', user: '%s'" % (self.p4.port, self.p4.user)) self.p4.connect() with open(self.options.formfile, 'r') as f: content = f.read() self.logger.debug(content) new_job = self.p4.parse_job(content) self.logger.debug("Parsed: %s" % (str(new_job))) jobname = "new" if "Job" in new_job: jobname = new_job["Job"] if jobname == "new": self.message(MSG_CANT_CREATE_JOBS) return 1 job_spec = self.p4.fetch_jobspec() # Search for the field set when job created as it also indicates a new job when not present date_field = self.find_date_field(job_spec) if date_field and date_field not in new_job: self.message(MSG_CANT_CREATE_JOBS) return 1 orig_job = self.p4.fetch_job(jobname) self.logger.debug("Original: %s" % (str(orig_job))) fields_in_error = [] all_keys = set(orig_job.keys()) all_keys = all_keys.union(new_job.keys()) for key in all_keys: if key in WRITEABLE_FIELDS: continue if key in orig_job and key in new_job and orig_job[key] != new_job[key]: orig_val = new_val = None if key in orig_job: orig_val = orig_job[key].replace("\r\n", "\n") orig_val = orig_val.replace("\s+", " ") # TODO if key in new_job: new_val = new_job[key].replace("\r\n", "\n") new_val = re.sub("\s+", " ", new_val) if orig_val != new_val: self.logger.debug("Orig_val: '%s', new_val: '%s'" % (orig_val, new_val)) fields_in_error.append(key) if fields_in_error: msg = MSG_CANT_CHANGE_FIELDS + str(fields_in_error) + "\n\n" self.logger.error(msg) self.message(msg) return 1 except P4.P4Exception as err: return self.reportError(err) except: print("Exception during trigger execution: %s %s %s" % sys.exc_info()) return 1 return 0 if __name__ == '__main__': """ Main Program""" trigger = CheckJobEditTrigger(*sys.argv[1:]) sys.exit(trigger.run())