#!/usr/bin/env python3 # -*- coding: utf-8 -*- # ============================================================================== # Copyright and license info is available in the LICENSE file included with # the Server Deployment Package (SDP), and also available online: # https://swarm.workshop.perforce.com/projects/perforce-software-sdp/view/main/LICENSE # ------------------------------------------------------------------------------ """ NAME: WorkflowTriggers.py DESCRIPTION: Base class with utility functions for interfacing with Swarm and parsing workflow yaml file. """ # Python 2.7/3.3 compatibility. from __future__ import print_function import sys import os import P4Triggers import P4 import requests import yaml from string import Template from collections import OrderedDict, defaultdict def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict): class OrderedLoader(Loader): pass def construct_mapping(loader, node): loader.flatten_mapping(node) return object_pairs_hook(loader.construct_pairs(node)) OrderedLoader.add_constructor( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, construct_mapping) return yaml.load(stream, OrderedLoader) class GroupMemberChecker: "Checks for user in any of the listed groups" def __init__(self, p4): self.p4 = p4 def IsMember(self, user, users_or_groups): if user in users_or_groups: return True gresults = self.p4.run_groups() # [{'user': 'user1', 'group': 'G1', 'isSubGroup': '0', 'isOwner': '0', 'isUser': '1'}, # {'user': 'user2', 'group': 'G1', 'isSubGroup': '0', 'isOwner': '0', 'isUser': '1'}, # Next record means G1 is a subgroup of G2 # {'user': 'G1', 'group': 'G2', 'isSubGroup': '1', 'isOwner': '0', 'isUser': '0'}] self.groups = defaultdict(list) self.supergroups = defaultdict(list) for rec in gresults: try: if rec['isSubGroup'] == '0' and (rec['isOwner'] == '1' or rec['isUser'] == '1'): self.groups[rec['group']].append(rec['user']) elif rec['isSubGroup'] == '1': self.supergroups[rec['group']].append(rec['user']) except: pass # If a name matches a group then check group membership for users for rec in users_or_groups: if rec in self.supergroups: if self.IsSuperGroupMember(user, self.supergroups[rec]): return True if rec in self.groups: if user in self.groups[rec]: return True return False def IsSuperGroupMember(self, user, groupmembers): # Recursively check supergroups if not groupmembers: return False for g in groupmembers: if g in self.supergroups: if self.IsSuperGroupMember(user, self.supergroups[g]): return True if g in self.groups and user in self.groups[g]: return True return False class WorkflowTrigger(P4Triggers.P4Trigger): """See module doc string for details""" def load_config(self, config_file=None): if not config_file: config_file = "/p4/common/config/Workflow.yaml" config = {} if not os.path.exists(config_file): return config try: with open(config_file, 'r') as f: config = ordered_load(f, yaml.SafeLoader) except Exception as e: self.logger.error(str(e)) return config def get_swarm_url(self): """Read from Perforce server propery value""" if self.options.test_mode: return "http://swarm.dev/" prop = self.p4.run('property', '-l' , '-n', 'P4.Swarm.URL') self.logger.debug("Property: %s" % str(prop)) url = prop[0]['value'] if url[-1] != '/': url += '/' return url def get_project(self, config, change): """Return first matching project for a file in the change""" files = [] if change.shelved and not change.files: try: files = [x['depotFile'] for x in self.p4.run_files("@=%s" % change.change)] except: self.logger.warning("No shelved files found for change @=%s" % change.change) else: files = [x.depotFile for x in change.files] return self.get_project_by_files(config, files) def get_project_by_files(self, config, files): """Return first matching project for a list of files""" if not 'projects' in config: return {} for prj in config['projects']: if not 'name' in prj or not 'depot_paths' in prj: return False map = P4.Map() for p in prj['depot_paths']: map.insert(p) for df in files: if map.includes(df): return prj return {} def project_flag_true(self, config, change, key): """Returns True if specified field (key) has a value of 'y' for a matching project""" if not 'projects' in config: return False prj = self.get_project(config, change) self.logger.debug("prj: %s" % prj) if not key in prj or not prj[key] == 'y': return False return True def formatReviewDescription(self, review_description, **kwargs): """Format using specified format options - see call below Assumes review_description is an array of lines in config file""" desc = "\n".join(review_description) desc = desc.replace("\\n", "\n") t = Template(desc) result = t.safe_substitute(**kwargs) return result