p4_bench.py #19

  • //
  • guest/
  • robert_cowham/
  • p4benchmark/
  • main/
  • p4_bench.py
  • View
  • Commits
  • Open Download .zip Download (12 KB)
#! /usr/bin/env python2
#
# Perforce benchmarks using Locust.io framework
#
# Copyright (C) 2016, Robert Cowham, Perforce
#

from __future__ import print_function

import os
import sys
import string
import logging
import platform
import errno
from locust import Locust, events, task, TaskSet
from locust.exception import StopLocust
import subprocess
import P4
import yaml
import time
import random
import pprint

from locust.stats import RequestStats
def noop(*arg, **kwargs):
    logger.info("Stats reset prevented by monkey patch!")
RequestStats.reset_all = noop

python3 = sys.version_info[0] >= 3

logger = logging.getLogger("repo_benchmark")

startdir = os.getcwd()

CONFIG_FILE = "config_p4_bench.yml"   # Configuration parameters

ENCODING = None
if hasattr(sys.stdin, 'encoding'):
    ENCODING = sys.stdin.encoding
if ENCODING is None:
    import locale
    locale_name, ENCODING = locale.getdefaultlocale()
if ENCODING is None:
    ENCODING = "ISO8859-1"

def popen(exe, cmd, decode=False, errors=True):
    cmd.insert(0, exe)
    pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    (stdout, stderr) = pipe.communicate()
    if errors and pipe.returncode > 0:
        raise Exception((stderr + stdout).decode(ENCODING))
    return stdout if not decode else stdout.decode(ENCODING)

def readConfig(startdir):
    config = {}
    with open(os.path.join(startdir, CONFIG_FILE), "r") as f:
        config = yaml.load(f)
    return config

def fmtsize(num):
    for x in ['bytes', 'KB', 'MB', 'GB', 'TB']:
        if num < 1024.0:
            return "%3.1f %s" % (num, x)
        num /= 1024.0

LINE_LENGTH = 80
BLOCK_SIZE = 65536

# Random generators
try:
    import numpy as np

    def create_random(size):
        if python3:
            return bytes((int(x) for x in np.random.random_integers(1, 254, size)))
        else:
            return bytearray((int(x) for x in np.random.random_integers(1, 254, size)))

    def generator(size, eol="\n"):
        s = string.ascii_letters + string.digits
        return "".join(np.random.choice(list(s), size - 1)) + eol

except ImportError:
    print("No numpy installed, falling back to standard Python random. Prepare to wait ...", file=sys.stderr)

    def create_random(size):
        if python3:
            return bytes((random.randint(1,254) for x in range(size)))
        else:
            return bytearray((random.randint(1,254) for x in range(size)))

    def generator(size, eol="\n"):
        s = string.ascii_letters + string.digits
        return "".join((random.choice(s) for x in range(size - 1))) + eol

class Timer(object):
    def __init__(self, request_type):
        self.start_time = time.time()
        self.request_type = request_type

    def report_failure(self, name, e):
        total_time = int((time.time() - self.start_time) * 1000)
        events.request_failure.fire(request_type=self.request_type, name=name, response_time=total_time, exception=e)

    def report_success(self, name, count):
        total_time = int((time.time() - self.start_time) * 1000)
        events.request_success.fire(request_type=self.request_type, name=name, response_time=total_time, response_length=count)

def create_file(fileSize, filename, binary=False):
    logger.debug("create_file '%s' binary: %s" % (filename, str(binary)))
    mode = "wb" if binary else "w"
    with open(filename, mode) as f:
        if binary:
            blocks = int(fileSize / BLOCK_SIZE)
            for unused in range(blocks):
                b = create_random(BLOCK_SIZE)
                f.write(b)
            rest = fileSize % BLOCK_SIZE
            if rest:
                b = create_random(rest)
                f.write(b)
        else:
            lines = int(fileSize / LINE_LENGTH)
            for unused in range(lines):
                f.write(generator(LINE_LENGTH))

            rest = fileSize % LINE_LENGTH
            if rest:
                f.write(generator(rest))

class SyncOutput(P4.OutputHandler):
    "Log sync progress"

    def __init__(self, timer, event_name, sync_progress_size_interval=100000):
        P4.OutputHandler.__init__(self)
        self.timer = timer
        self.event_name = event_name
        self.sync_progress_size_interval = sync_progress_size_interval
        self.filesSynced = 0
        self.sizeSynced = 0
        self.previousSizeSynced = 0

    def reportFileSync(self, fileSize):
        self.filesSynced += 1
        self.sizeSynced += fileSize
        if not self.sync_progress_size_interval:
            return
        if self.sizeSynced > self.previousSizeSynced + self.sync_progress_size_interval:
            self.previousSizeSynced = self.sizeSynced
            self.timer.report_success(self.event_name, 1)
            logger.info("Synced %d files, size %s" % (self.filesSynced, fmtsize(self.sizeSynced)))

    def outputStat(self, stat):
        if 'fileSize' in stat:
            self.reportFileSync(int(stat['fileSize']))
        return P4.OutputHandler.HANDLED

class P4Benchmark(object):
    """Generic benchmarking class - must be subclassed"""

    def __init__(self, startdir, config, prog="p4_bench"):
        self.id = id(self)      # Object id - unique enough for now.
        if "workspace_root" in config["general"]:
            self.test_root = config["general"]["workspace_root"]
        else:
            self.test_root = os.path.join(startdir, "testdata")
        self.localfilelist = []
        os.environ["P4CONFIG"] = os.path.join(startdir, "bench_p4config.txt")
        self.p4 = P4.P4()
        self.p4.prog = prog     # Identifier for log analysis
        self.p4.logger = logger
        self.config = config
        p4config = config["perforce"]
        logger.info(pprint.pformat(p4config))
        if isinstance(p4config["port"], (str, unicode)):
            self.p4.port = p4config["port"]
        elif isinstance(p4config["port"], list):
            self.p4.port = random.choice(p4config["port"])
        else:
            raise Exception("Unknown port config")
        logger.info("Connecting to server: %s" % self.p4.port)
        self.p4.prog = "%s-%s" % (prog, self.p4.port)
        if "charset" in p4config and p4config["charset"]:
            self.p4.charset = p4config["charset"]
        self.repoPath = p4config["repoPath"]
        self.p4.user = p4config["user"]
        self.sync_progress_size_interval = 1000 * 1000 * 1000   # 1GB
        if "sync_progress_size_interval" in p4config:
            self.sync_progress_size_interval = int(eval(p4config["sync_progress_size_interval"]))
        result = self.p4.connect()
        if p4config["password"]:
            self.p4.password = p4config["password"]
            self.p4.run_login()
        self.workspace_name = "{}.{}.{}".format(self.p4.user, self.id, platform.node())
        self.workspace_root = os.path.join(self.test_root, self.workspace_name)
        if not os.path.isdir(self.test_root):
            try:
                os.makedirs(self.test_root, 0o777)
            except OSError as ex:
                if ex.errno != errno.EEXIST:
                    raise ex
                pass

    def getView(self):
        "Randomly select between dirs, or as directed by config file"
        p4config = self.config["perforce"]
        if p4config["repoSubDir"] == "*":
            dirs = [x['dir'] for x in self.p4.run_dirs("%s/%s" % (p4config["repoPath"], p4config["repoSubDir"]))]
        else:
            dirs = ["%s/%s" % (p4config["repoPath"], p4config["repoSubDir"])]
        if len(dirs) > 1:
            dir = random.choice(dirs)
            if "repoSubDirNum" in p4config and int(p4config["repoSubDirNum"]) > 1:
                subdirs = [x['dir'] for x in self.p4.run_dirs("%s/*" % dir)]
                subset = random.sample(subdirs, int(p4config["repoSubDirNum"]))
                return ["{}/... //{}/{}/...".format(x, self.workspace_name, x.replace("//", "")) for x in subset]
            else:
                return ["{}/... //{}/{}/...".format(dir, self.workspace_name, dir.replace("//", ""))]
        elif len(dirs) == 1:
            dir = dirs[0]
            return ["{}/... //{}/{}/...".format(dir, self.workspace_name, dir.replace("//", ""))]
        else:
            raise Exception("No dirs found!")

    def createWorkspace(self):
        p4config = self.config["perforce"]
        # Important to set client before attempting to create it in case we are talking to replica
        self.p4.client = self.workspace_name
        ws = self.p4.fetch_client(self.workspace_name)
        existed = (ws._root == self.workspace_root)
        ws._root = self.workspace_root
        ws._view = self.getView()
        ws._options = ws._options.replace("normdir", "rmdir")
        if "options" in p4config and p4config["options"]:
            logger.warn("Overwiting options")
            ws._options = p4config["options"]
        logger.info("Saving workspace view: %s" % ws._view[0])
        logger.info(pprint.pformat(ws))
        result = self.p4.save_client(ws)
        logger.info(pprint.pformat(result))
        return existed

    def syncWorkspace(self, timer):
        result = []
        p4config = self.config["perforce"]
        if "sync_args" in p4config and p4config["sync_args"]:
            cmd = "p4"
            args = [p4config["sync_args"], "-p", self.p4.port, "-u", self.p4.user, "-c", self.p4.client]
            args.extend(["sync", "//{}/...".format(self.p4.client)])
            logger.info("%s %s" % (cmd, " ".join(args)))
            result = popen(cmd, args)
            logger.info(result)
        else:
            if not os.path.isdir(self.workspace_root):
                os.makedirs(self.workspace_root, 0o777)
            os.chdir(self.workspace_root)
            event_name = "sync_partial_%s" % fmtsize(self.sync_progress_size_interval)
            syncCallback = SyncOutput(timer, event_name, self.sync_progress_size_interval)
            with self.p4.at_exception_level(P4.P4.RAISE_ERRORS):
                result = self.p4.run_sync("//{}/...".format(self.p4.client), handler=syncCallback)
            havelist = self.p4.run_have()
            self.localfilelist = [f["path"] for f in havelist]
        return len(result)

class P4BuildFarmBenchmark(P4Benchmark):
    """Performs basic benchmark test - Perforce specific subclass"""

    def __init__(self, startdir, config):
        super(P4BuildFarmBenchmark, self).__init__(startdir, config, "p4buildfarm")

def buildFarmActions(bench, request_type):
    "Build farm basically just does a sync"
    count = 0
    t = Timer(request_type)
    name = "create"
    try:
        bench.createWorkspace()
        count = 1
        t.report_success(name, count)
    except Exception as e:
        logger.exception(e)
        t.report_failure(name, e)
    t = Timer(request_type)
    name = "sync"
    try:
        count = bench.syncWorkspace(t)
        t.report_success(name, count)
    except Exception as e:
        logger.exception(e)
        t.report_failure(name, e)

class AllTasks(TaskSet):
    """Entry point for locust"""

    min_wait = 1000
    max_wait = 10000
    request_type = "p4"

    def __init__(self, *args, **kwargs):
        super(AllTasks, self).__init__(*args, **kwargs)
        self.config = readConfig(startdir)
        self.min_wait = self.config["general"]["min_wait"]
        self.max_wait = self.config["general"]["max_wait"]

    def on_start(self):
        pass

    @task(10)
    def buildFarmActions(self):
        task_name = "p4buildfarm"
        self.bench = P4BuildFarmBenchmark(startdir, self.config)
        buildFarmActions(self.bench, task_name)
        logger.info("Finished %s" % task_name)
        raise StopLocust("task_name")   # Run once only and die

class P4RepoTestLocust(Locust):
    """Will be imported and then run by locust"""
    task_set = AllTasks
# Change User Description Committed
#21 24711 Robert Cowham Restructure and tidy up
#20 24686 Robert Cowham Getting it working - especially with Python3
#19 22003 Robert Cowham Latest state - with p4python and no syncing
#18 21899 Robert Cowham Randomly choose hostname instead of attempting to evenly split
#17 21885 Robert Cowham Add sync_args
#16 21820 Robert Cowham Remove unused code.
Make a little more configurable via config file
#15 21800 Robert Cowham Sync one level down - hard coded to select 50 for now
#14 21799 Robert Cowham Avoid mkdirs error.
Remove user task
#13 21793 Robert Cowham Use hostname to choose which server to connect to
#12 21769 Robert Cowham Allow multiple ports to be specified - for random choice
#11 21767 Robert Cowham add latest stuff with analyse
#10 21761 Robert Cowham Avoid stats reset.
Stop each task after it has run once
#9 21760 Robert Cowham Log config and ws options
#8 21759 Robert Cowham Allow options to be configurable
#7 21744 Robert Cowham Output sync progress via logger too
#6 21743 Robert Cowham Make it configurable
#5 21742 Robert Cowham Added partial sync reporting
#4 21737 Robert Cowham Fix dirs command
#3 21735 Robert Cowham Put all benchmark into one file
#2 21718 Robert Cowham Refactored a little - into one file
#1 21717 Robert Cowham Trying nested tasks