#!/usr/bin/env python3 # -*- coding: utf-8 -*- #============================================================================== # Copyright and license info is available in the LICENSE file included with # the Server Deployment Package (SDP), and also available online: # https://swarm.workshop.perforce.com/projects/perforce-software-sdp/view/main/LICENSE #------------------------------------------------------------------------------ """ NAME: CheckJobEditTrigger.py DESCRIPTION: This trigger is intended for use with P4DTG (Defect Tracking Replication) installations. It can (optionally as configured): 1. Prevent creation of new jobs in Perforce by anyone other than the replication user. 2 Prevent modification of read-only fields of jobs in Perforce by anyone other than the replication user. 3. Create newly mirrored jobs using the same name as that in the defect tracker. To install, add a line to your Perforce triggers table like the following: job_save_check form-save job "python /p4/common/bin/triggers/CheckJobEditTrigger.py -p %serverport% -u perforce %user% %formfile% " or (if server is standard SDP and has appropriate environment defaults for P4PORT and P4USER): job_save_check form-save job "python /p4/common/bin/triggers/CheckJobEditTrigger.py %user% %formfile% " You may need to provide the full path to python executable, or edit the path to the trigger. Also, don't forget to make the file executable, and set the configuration variables below. To configure, read and modify the following lines up to the comment that reads "END OF CONFIGURATION BLOCK". You may also need to modify the definition of which fields constitute a new job based on your jobspec. This is in the allowed_job() function. """ # Python 2.7/3.3 compatibility. from __future__ import print_function import P4 import sys import os import re import argparse import logging import textwrap import P4Triggers ###################################################################### # CONFIGURATION BLOCK # The error messages we give to the user MSG_CANT_CREATE_JOBS = """ You are not allowed to create new jobs! """ MSG_CANT_CHANGE_FIELDS = """ You have changed one or more read-only job fields: """ # The list of writeable fields that users can change. # Changes to any other fields are rejected. # Please validate this against your jobspec. WRITEABLE_FIELDS = ["Status", "Date"] # Replicator user - this user is allowed to change fields REPLICATOR_USER = "p4dtg" JIRA_USER = "jira" # END OF CONFIGURATION BLOCK ###################################################################### class CheckJobEditTrigger(P4Triggers.P4Trigger): """A subclass of P4Trigger. Use this trigger to ensure that only certain job fields are writable. """ def __init__(self, *args, **kwargs): P4Triggers.P4Trigger.__init__(self, **kwargs) desc = textwrap.dedent(__doc__) parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=desc, epilog="Copyright (c) 2008-2017 Perforce Software, Inc." ) self.addParseArgs(parser) self.options = parser.parse_args(args=args) self.initLogger() self.logger.debug("Command Line Options: %s\n" % self.options) def addParseArgs(self, parser): """Specific args for this trigger - also calls super class to add common ones""" parser.add_argument('user', help="User carrying out the command - %%user%% argument from triggers entry.") parser.add_argument('formfile', help="Formfile containing job definition - %%formfile%% argument from triggers entry.") super(CheckJobEditTrigger, self).addParseArgs(parser) def find_date_field(self, job_spec): """Find's the specific field in job spec which is a date modified field :param job_spec: """ self.logger.debug("Jobspec: %s" % str(job_spec)) # Format: # 104 Date date 20 always for f in job_spec["Fields"]: parts = f.split() if parts[2] == "date" and parts[4] == "always": self.logger.debug("Date field: %s" % parts[1]) return parts[1] return None def run(self): """Runs trigger""" try: self.logger.debug("CheckJobEditTrigger firing") # Don't do anything if the user is replicator user if self.options.user in [REPLICATOR_USER, JIRA_USER]: return 0 # First fetch the original job and then compare all fields. self.p4 = P4.P4() if self.options.port: self.p4.port = self.options.port if self.options.user: self.p4.user = self.options.user self.logger.debug("port: '%s', user: '%s'" % (self.p4.port, self.p4.user)) self.p4.connect() with open(self.options.formfile, 'r') as f: content = f.read() self.logger.debug(content) new_job = self.p4.parse_job(content) self.logger.debug("Parsed: %s" % (str(new_job))) jobname = "new" if "Job" in new_job: jobname = new_job["Job"] if jobname == "new": self.message(MSG_CANT_CREATE_JOBS) return 1 job_spec = self.p4.fetch_jobspec() # Search for the field set when job created as it also indicates a new job when not present date_field = self.find_date_field(job_spec) if date_field and date_field not in new_job: self.message(MSG_CANT_CREATE_JOBS) return 1 orig_job = self.p4.fetch_job(jobname) self.logger.debug("Original: %s" % (str(orig_job))) fields_in_error = [] all_keys = set(orig_job.keys()) all_keys = all_keys.union(new_job.keys()) for key in all_keys: if key in WRITEABLE_FIELDS: continue if key in orig_job and key in new_job and orig_job[key] != new_job[key]: orig_val = new_val = None if key in orig_job: orig_val = orig_job[key].replace("\r\n", "\n") orig_val = re.sub("\s+", " ", orig_val) if key in new_job: new_val = new_job[key].replace("\r\n", "\n") new_val = re.sub("\s+", " ", new_val) if orig_val != new_val: self.logger.debug("Orig_val: '%s', new_val: '%s'" % (orig_val, new_val)) fields_in_error.append(key) if fields_in_error: msg = MSG_CANT_CHANGE_FIELDS + str(fields_in_error) + "\n\n" self.logger.error(msg) self.message(msg) return 1 except P4.P4Exception as err: return self.reportError(err) except: print("Exception during trigger execution: %s %s %s" % sys.exc_info()) return 1 return 0 if __name__ == '__main__': """ Main Program""" trigger = CheckJobEditTrigger(*sys.argv[1:]) sys.exit(trigger.run())