#!/usr/bin/env python3 # -*- coding: utf-8 -*- #============================================================================== # Copyright and license info is available in the LICENSE file included with # the Server Deployment Package (SDP), and also available online: # https://swarm.workshop.perforce.com/projects/perforce-software-sdp/view/main/LICENSE #------------------------------------------------------------------------------ """ NAME: CheckJobEditTrigger.py DESCRIPTION: This trigger is intended for use with P4DTG (Defect Tracking Replication) installations. It can (optionally as configured): 1. Prevent creation of new jobs in Perforce by anyone other than the replication user. 2 Prevent modification of read-only fields of jobs in Perforce by anyone other than the replication user. 3. Create newly mirrored jobs using the same name as that in the defect tracker. To install, add a line to your Perforce triggers table like the following: job_save_check form-in job "python /p4/common/bin/triggers/CheckJobEditTrigger.py -p %serverport% -u perforce %specdef% %user% %formfile% " or (if server is standard SDP and has appropriate environment defaults for P4PORT and P4USER): job_save_check form-in job "python /p4/common/bin/triggers/CheckJobEditTrigger.py %specdef% %user% %formfile% " You may need to provide the full path to python executable, or edit the path to the trigger. Also, don't forget to make the file executable, and set the configuration variables below. To configure, read and modify the following lines up to the comment that reads "END OF CONFIGURATION BLOCK". You may also need to modify the definition of which fields constitute a new job based on your jobspec. This is in the allowed_job() function. """ # Python 2.7/3.3 compatibility. from __future__ import print_function import P4 import sys import re import argparse import textwrap import P4Triggers import traceback import shutil import os ###################################################################### # CONFIGURATION BLOCK # The error messages we give to the user MSG_CANT_CREATE_JOBS = """ You are not allowed to create new jobs! """ MSG_CANT_CHANGE_FIELDS = """ You have changed one or more read-only job fields: """ # The list of writeable fields that users can change. # Changes to any other fields are rejected. # Please validate this against your jobspec. WRITEABLE_FIELDS = ["Status", "Date"] # Replicator user - this user is allowed to change fields REPLICATOR_USER = "p4dtg" JIRA_USER = "jira" # END OF CONFIGURATION BLOCK ###################################################################### class CheckJobEditTrigger(P4Triggers.P4Trigger): """See module doc string for details""" def __init__(self, *args, **kwargs): P4Triggers.P4Trigger.__init__(self, **kwargs) desc = textwrap.dedent(__doc__) parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=desc, epilog="Copyright (c) 2008-2017 Perforce Software, Inc." ) self.addParseArgs(parser) self.options = parser.parse_args(args=args) self.initLogger() self.logger.debug("Command Line Options: %s\n" % self.options) def addParseArgs(self, parser): """Specific args for this trigger - also calls super class to add common trigger args""" parser.add_argument('-r', '--rename-job', default=False, action="store_true", help="Whether to rename-jobs to P4DTG name. Default: False") parser.add_argument('--prefix', default="", help="Prefix for jobnames when renaming. Default: ''") parser.add_argument('specdef', help="Job spec - %%specdef%% argument from triggers entry.") parser.add_argument('user', help="User carrying out the command - %%user%% argument from triggers entry.") parser.add_argument('formfile', help="Formfile containing job definition - %%formfile%% argument from triggers entry.") super(CheckJobEditTrigger, self).addParseArgs(parser) def find_date_field(self, job_spec): """Find's the specific field in job spec which is a date modified field. If this field is blank it indicates the job is a new job. :param job_spec: """ self.logger.debug("Jobspec: %s" % str(job_spec)) # Format: # 104 Date date 20 always for f in job_spec["Fields"]: parts = f.split() if parts[2] == "date" and parts[4] == "always": self.logger.debug("Date field: %s" % parts[1]) return parts[1] return None def fields_in_error(self, new_job, orig_job): """Return the names of all fields that have been modified when they shouldn't have been""" fields_in_error = [] all_keys = set(orig_job.keys()) all_keys = all_keys.union(new_job.keys()) for key in all_keys: if key in WRITEABLE_FIELDS: continue if key in orig_job and key in new_job and orig_job[key] != new_job[key]: orig_val = new_val = None if key in orig_job: orig_val = orig_job[key].replace("\r\n", "\n") orig_val = re.sub("\s+", " ", orig_val) if key in new_job: new_val = new_job[key].replace("\r\n", "\n") new_val = re.sub("\s+", " ", new_val) if orig_val != new_val: self.logger.debug("Orig_val: '%s', new_val: '%s'" % (orig_val, new_val)) fields_in_error.append(key) return fields_in_error def run(self): """Runs trigger""" try: self.logger.debug("CheckJobEditTrigger firing") self.setupP4() self.p4.connect() self.p4.logger = self.logger job_spec = self.p4.fetch_jobspec() self.logger.debug(job_spec) # This seems very hacky! self.p4.define_spec('job', self.options.specdef) with open(self.options.formfile, 'r') as f: content = f.read() self.logger.debug(content) new_job = self.p4.parse_job(content) self.logger.debug("Parsed: %s" % (str(new_job))) jobname = "new" if "Job" in new_job: jobname = new_job["Job"] if not self.options.user in [REPLICATOR_USER, JIRA_USER]: if jobname == "new": self.message(MSG_CANT_CREATE_JOBS) return 1 # Search for the field set when job created as it also indicates a new job when not present date_field = self.find_date_field(job_spec) if date_field and date_field not in new_job: self.message(MSG_CANT_CREATE_JOBS) return 1 orig_job = self.p4.fetch_job(jobname) self.logger.debug("Original: %s" % (str(orig_job))) fields_in_error = self.fields_in_error(new_job, orig_job) if fields_in_error: msg = MSG_CANT_CHANGE_FIELDS + str(fields_in_error) + "\n\n" self.logger.error(msg) self.message(msg) return 1 elif jobname == "new": # Decide if we want to change the jobname if "P4DTG_DTISSUE" in new_job: dtName = new_job["P4DTG_DTISSUE"] if self.options.rename_job: new_job["Job"] = dtName contents = self.p4.format_job(new_job) tempfile = "%s~" % self.options.formfile shutil.move(self.options.formfile, tempfile) with open(self.options.formfile, 'w') as f: f.write(contents) os.remove(tempfile) except P4.P4Exception as err: return self.reportError(err) except: exc_type, exc_value, exc_tb = sys.exc_info() print("Exception during trigger execution: %s %s %s" % (exc_type, exc_value, exc_tb)) self.logger.error("called from:\n%s", "".join(traceback.format_exception(exc_type, exc_value, exc_tb))) return 1 return 0 if __name__ == '__main__': """ Main Program""" trigger = CheckJobEditTrigger(*sys.argv[1:]) sys.exit(trigger.run())