WorkflowTriggers.py #3

  • //
  • guest/
  • perforce_software/
  • sdp/
  • main/
  • Server/
  • Unix/
  • p4/
  • common/
  • bin/
  • triggers/
  • WorkflowTriggers.py
  • View
  • Commits
  • Open Download .zip Download (6 KB)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# ==============================================================================
# Copyright and license info is available in the LICENSE file included with
# the Server Deployment Package (SDP), and also available online:
# https://swarm.workshop.perforce.com/projects/perforce-software-sdp/view/main/LICENSE
# ------------------------------------------------------------------------------

"""
NAME:
    WorkflowTriggers.py

DESCRIPTION:
    Base class with utility functions for interfacing with Swarm and parsing workflow yaml file.
"""

# Python 2.7/3.3 compatibility.
from __future__ import print_function

import sys
import os
import P4Triggers
import P4
import requests
import yaml
from string import Template
from collections import OrderedDict, defaultdict

def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict):
    class OrderedLoader(Loader):
        pass
    def construct_mapping(loader, node):
        loader.flatten_mapping(node)
        return object_pairs_hook(loader.construct_pairs(node))
    OrderedLoader.add_constructor(
        yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
        construct_mapping)
    return yaml.load(stream, OrderedLoader)


class GroupMemberChecker:
    "Checks for user in any of the listed groups"

    def __init__(self, p4):
        self.p4 = p4

    def IsMember(self, user, users_or_groups):
        if user in users_or_groups:
            return True
        gresults = self.p4.run_groups()
        # [{'user': 'user1', 'group': 'G1', 'isSubGroup': '0', 'isOwner': '0', 'isUser': '1'},
        #  {'user': 'user2', 'group': 'G1', 'isSubGroup': '0', 'isOwner': '0', 'isUser': '1'},
        # Next record means G1 is a subgroup of G2
        #  {'user': 'G1', 'group': 'G2', 'isSubGroup': '1', 'isOwner': '0', 'isUser': '0'}]
        self.groups = defaultdict(list)
        self.supergroups = defaultdict(list)
        for rec in gresults:
            try:
                if rec['isSubGroup'] == '0' and (rec['isOwner'] == '1' or rec['isUser'] == '1'):
                    self.groups[rec['group']].append(rec['user'])
                elif rec['isSubGroup'] == '1':
                    self.supergroups[rec['group']].append(rec['user'])
            except:
                pass

        # If a name matches a group then check group membership for users
        for rec in users_or_groups:
            if rec in self.supergroups:
                if self.IsSuperGroupMember(user, self.supergroups[rec]):
                    return True
            if rec in self.groups:
                if user in self.groups[rec]:
                    return True
        return False

    def IsSuperGroupMember(self, user, groupmembers):
        # Recursively check supergroups
        if not groupmembers:
            return False
        for g in groupmembers:
            if g in self.supergroups:
                if self.IsSuperGroupMember(user, self.supergroups[g]):
                    return True

            if g in self.groups and user in self.groups[g]:
                return True
        return False

class WorkflowTrigger(P4Triggers.P4Trigger):
    """See module doc string for details"""

    def load_config(self, config_file=None):
        if not config_file:
            config_file = "/p4/common/config/Workflow.yaml"
        config = {}
        if not os.path.exists(config_file):
            return config
        try:
            with open(config_file, 'r') as f:
                config = ordered_load(f, yaml.SafeLoader)
        except Exception as e:
            self.logger.error(str(e))
        return config

    def get_swarm_url(self):
        """Read from Perforce server propery value"""
        if self.options.test_mode:
            return "http://swarm.dev/"
        prop = self.p4.run('property', '-l' , '-n', 'P4.Swarm.URL')
        self.logger.debug("Property: %s" % str(prop))
        url = prop[0]['value']
        if url[-1] != '/':
            url += '/'
        return url

    def get_project(self, config, change):
        """Return first matching project for a file in the change"""
        files = []
        if change.shelved and not change.files:
            try:
                files = [x['depotFile'] for x in self.p4.run_files("@=%s" % change.change)]
            except:
                self.logger.warning("No shelved files found for change @=%s" % change.change)
        else:
            files = [x.depotFile for x in change.files]
        return self.get_project_by_files(config, files)

    def get_project_by_files(self, config, files):
        """Return first matching project for a list of files"""
        if not 'projects' in config:
            return {}
        for prj in config['projects']:
            if not 'name' in prj or not 'depot_paths' in prj:
                return False
            map = P4.Map()
            for p in prj['depot_paths']:
                map.insert(p)
            for df in files:
                if map.includes(df):
                    return prj
        return {}

    def project_flag_true(self, config, change, key):
        """Returns True if specified field (key) has a value of 'y' for a matching project"""
        if not 'projects' in config:
            return False
        prj = self.get_project(config, change)
        self.logger.debug("prj: %s" % prj)
        if not key in prj or not prj[key] == 'y':
            return False
        return True

    def formatReviewDescription(self, review_description, **kwargs):
        """Format using specified format options - see call below
        Assumes review_description is an array of lines in config file"""
        desc = "\n".join(review_description)
        desc = desc.replace("\\n", "\n")
        t = Template(desc)
        result = t.safe_substitute(**kwargs)
        return result
# Change User Description Committed
#4 27331 C. Thomas Tyler Released SDP 2020.1.27325 (2021/01/29).
Copy Up using 'p4 copy -r -b perforce_software-sdp-dev'.
#3 26161 C. Thomas Tyler Released SDP 2019.3.26159 (2019/11/06).
Copy Up using 'p4 copy -r -b perforce_software-sdp-dev'.
#2 25245 C. Thomas Tyler Released SDP 2019.1.25238 (2019/03/02).
Copy Up using 'p4 copy -r -b perforce_software-sdp-dev'.
#1 23510 C. Thomas Tyler Released SDP 2018.1.23504 (2018/01/19).
Copy Up using 'p4 copy -r -b perforce_software-sdp-dev',
with selective removal of work-in-progress files.
//guest/perforce_software/sdp/dev/Server/Unix/p4/common/bin/triggers/WorkflowTriggers.py
#1 23420 Robert Cowham Refactored to use common WorkflowTriggers class