#! /usr/bin/env python2
#
# Perforce benchmarks using Locust.io framework
#
# Copyright (C) 2016, Robert Cowham, Perforce
#
from __future__ import print_function
import os
import sys
import string
import logging
import platform
import errno
from locust import Locust, events, task, TaskSet
from locust.exception import StopLocust
import subprocess
import P4
import yaml
import time
import random
import pprint
from locust.stats import RequestStats
def noop(*arg, **kwargs):
logger.info("Stats reset prevented by monkey patch!")
RequestStats.reset_all = noop
python3 = sys.version_info[0] >= 3
logger = logging.getLogger("repo_benchmark")
startdir = os.getcwd()
CONFIG_FILE = "config_p4_bench.yml" # Configuration parameters
ENCODING = None
if hasattr(sys.stdin, 'encoding'):
ENCODING = sys.stdin.encoding
if ENCODING is None:
import locale
locale_name, ENCODING = locale.getdefaultlocale()
if ENCODING is None:
ENCODING = "ISO8859-1"
def popen(exe, cmd, decode=False, errors=True):
cmd.insert(0, exe)
pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = pipe.communicate()
if errors and pipe.returncode > 0:
raise Exception((stderr + stdout).decode(ENCODING))
return stdout if not decode else stdout.decode(ENCODING)
def readConfig(startdir):
config = {}
with open(os.path.join(startdir, CONFIG_FILE), "r") as f:
config = yaml.load(f)
return config
def fmtsize(num):
for x in ['bytes', 'KB', 'MB', 'GB', 'TB']:
if num < 1024.0:
return "%3.1f %s" % (num, x)
num /= 1024.0
LINE_LENGTH = 80
BLOCK_SIZE = 65536
# Random generators
try:
import numpy as np
def create_random(size):
if python3:
return bytes((int(x) for x in np.random.random_integers(1, 254, size)))
else:
return bytearray((int(x) for x in np.random.random_integers(1, 254, size)))
def generator(size, eol="\n"):
s = string.ascii_letters + string.digits
return "".join(np.random.choice(list(s), size - 1)) + eol
except ImportError:
print("No numpy installed, falling back to standard Python random. Prepare to wait ...", file=sys.stderr)
def create_random(size):
if python3:
return bytes((random.randint(1,254) for x in range(size)))
else:
return bytearray((random.randint(1,254) for x in range(size)))
def generator(size, eol="\n"):
s = string.ascii_letters + string.digits
return "".join((random.choice(s) for x in range(size - 1))) + eol
class Timer(object):
def __init__(self, request_type):
self.start_time = time.time()
self.request_type = request_type
def report_failure(self, name, e):
total_time = int((time.time() - self.start_time) * 1000)
events.request_failure.fire(request_type=self.request_type, name=name, response_time=total_time, exception=e)
def report_success(self, name, count):
total_time = int((time.time() - self.start_time) * 1000)
events.request_success.fire(request_type=self.request_type, name=name, response_time=total_time, response_length=count)
def create_file(fileSize, filename, binary=False):
logger.debug("create_file '%s' binary: %s" % (filename, str(binary)))
mode = "wb" if binary else "w"
with open(filename, mode) as f:
if binary:
blocks = int(fileSize / BLOCK_SIZE)
for unused in range(blocks):
b = create_random(BLOCK_SIZE)
f.write(b)
rest = fileSize % BLOCK_SIZE
if rest:
b = create_random(rest)
f.write(b)
else:
lines = int(fileSize / LINE_LENGTH)
for unused in range(lines):
f.write(generator(LINE_LENGTH))
rest = fileSize % LINE_LENGTH
if rest:
f.write(generator(rest))
class SyncOutput(P4.OutputHandler):
"Log sync progress"
def __init__(self, timer, event_name, sync_progress_size_interval=100000):
P4.OutputHandler.__init__(self)
self.timer = timer
self.event_name = event_name
self.sync_progress_size_interval = sync_progress_size_interval
self.filesSynced = 0
self.sizeSynced = 0
self.previousSizeSynced = 0
def reportFileSync(self, fileSize):
self.filesSynced += 1
self.sizeSynced += fileSize
if not self.sync_progress_size_interval:
return
if self.sizeSynced > self.previousSizeSynced + self.sync_progress_size_interval:
self.previousSizeSynced = self.sizeSynced
self.timer.report_success(self.event_name, 1)
logger.info("Synced %d files, size %s" % (self.filesSynced, fmtsize(self.sizeSynced)))
def outputStat(self, stat):
if 'fileSize' in stat:
self.reportFileSync(int(stat['fileSize']))
return P4.OutputHandler.HANDLED
class P4Benchmark(object):
"""Generic benchmarking class - must be subclassed"""
def __init__(self, startdir, config, prog="p4_bench"):
self.id = id(self) # Object id - unique enough for now.
if "workspace_root" in config["general"]:
self.test_root = config["general"]["workspace_root"]
else:
self.test_root = os.path.join(startdir, "testdata")
self.localfilelist = []
os.environ["P4CONFIG"] = os.path.join(startdir, "bench_p4config.txt")
self.p4 = P4.P4()
self.p4.prog = prog # Identifier for log analysis
self.p4.logger = logger
self.config = config
p4config = config["perforce"]
logger.info(pprint.pformat(p4config))
if isinstance(p4config["port"], (str, unicode)):
self.p4.port = p4config["port"]
elif isinstance(p4config["port"], list):
self.p4.port = random.choice(p4config["port"])
else:
raise Exception("Unknown port config")
logger.info("Connecting to server: %s" % self.p4.port)
self.p4.prog = "%s-%s" % (prog, self.p4.port)
if "charset" in p4config and p4config["charset"]:
self.p4.charset = p4config["charset"]
self.repoPath = p4config["repoPath"]
self.p4.user = p4config["user"]
self.sync_progress_size_interval = 1000 * 1000 * 1000 # 1GB
if "sync_progress_size_interval" in p4config:
self.sync_progress_size_interval = int(eval(p4config["sync_progress_size_interval"]))
result = self.p4.connect()
if p4config["password"]:
self.p4.password = p4config["password"]
self.p4.run_login()
self.workspace_name = "{}.{}.{}".format(self.p4.user, self.id, platform.node())
self.workspace_root = os.path.join(self.test_root, self.workspace_name)
if not os.path.isdir(self.test_root):
try:
os.makedirs(self.test_root, 0o777)
except OSError as ex:
if ex.errno != errno.EEXIST:
raise ex
pass
def getView(self):
"Randomly select between dirs, or as directed by config file"
p4config = self.config["perforce"]
if p4config["repoSubDir"] == "*":
dirs = [x['dir'] for x in self.p4.run_dirs("%s/%s" % (p4config["repoPath"], p4config["repoSubDir"]))]
else:
dirs = ["%s/%s" % (p4config["repoPath"], p4config["repoSubDir"])]
if len(dirs) > 1:
dir = random.choice(dirs)
if "repoSubDirNum" in p4config and int(p4config["repoSubDirNum"]) > 1:
subdirs = [x['dir'] for x in self.p4.run_dirs("%s/*" % dir)]
subset = random.sample(subdirs, int(p4config["repoSubDirNum"]))
return ["{}/... //{}/{}/...".format(x, self.workspace_name, x.replace("//", "")) for x in subset]
else:
return ["{}/... //{}/{}/...".format(dir, self.workspace_name, dir.replace("//", ""))]
elif len(dirs) == 1:
dir = dirs[0]
return ["{}/... //{}/{}/...".format(dir, self.workspace_name, dir.replace("//", ""))]
else:
raise Exception("No dirs found!")
def createWorkspace(self):
p4config = self.config["perforce"]
# Important to set client before attempting to create it in case we are talking to replica
self.p4.client = self.workspace_name
ws = self.p4.fetch_client(self.workspace_name)
existed = (ws._root == self.workspace_root)
ws._root = self.workspace_root
ws._view = self.getView()
ws._options = ws._options.replace("normdir", "rmdir")
if "options" in p4config and p4config["options"]:
logger.warn("Overwiting options")
ws._options = p4config["options"]
logger.info("Saving workspace view: %s" % ws._view[0])
logger.info(pprint.pformat(ws))
result = self.p4.save_client(ws)
logger.info(pprint.pformat(result))
return existed
def syncWorkspace(self, timer):
result = []
p4config = self.config["perforce"]
if "sync_args" in p4config and p4config["sync_args"]:
cmd = "p4"
args = [p4config["sync_args"], "-p", self.p4.port, "-u", self.p4.user, "-c", self.p4.client]
args.extend(["sync", "//{}/...".format(self.p4.client)])
logger.info("%s %s" % (cmd, " ".join(args)))
result = popen(cmd, args)
logger.info(result)
else:
if not os.path.isdir(self.workspace_root):
os.makedirs(self.workspace_root, 0o777)
os.chdir(self.workspace_root)
event_name = "sync_partial_%s" % fmtsize(self.sync_progress_size_interval)
syncCallback = SyncOutput(timer, event_name, self.sync_progress_size_interval)
with self.p4.at_exception_level(P4.P4.RAISE_ERRORS):
result = self.p4.run_sync("//{}/...".format(self.p4.client), handler=syncCallback)
havelist = self.p4.run_have()
self.localfilelist = [f["path"] for f in havelist]
return len(result)
class P4BuildFarmBenchmark(P4Benchmark):
"""Performs basic benchmark test - Perforce specific subclass"""
def __init__(self, startdir, config):
super(P4BuildFarmBenchmark, self).__init__(startdir, config, "p4buildfarm")
def buildFarmActions(bench, request_type):
"Build farm basically just does a sync"
count = 0
t = Timer(request_type)
name = "create"
try:
bench.createWorkspace()
count = 1
t.report_success(name, count)
except Exception as e:
logger.exception(e)
t.report_failure(name, e)
t = Timer(request_type)
name = "sync"
try:
count = bench.syncWorkspace(t)
t.report_success(name, count)
except Exception as e:
logger.exception(e)
t.report_failure(name, e)
class AllTasks(TaskSet):
"""Entry point for locust"""
min_wait = 1000
max_wait = 10000
request_type = "p4"
def __init__(self, *args, **kwargs):
super(AllTasks, self).__init__(*args, **kwargs)
self.config = readConfig(startdir)
self.min_wait = self.config["general"]["min_wait"]
self.max_wait = self.config["general"]["max_wait"]
def on_start(self):
pass
@task(10)
def buildFarmActions(self):
task_name = "p4buildfarm"
self.bench = P4BuildFarmBenchmark(startdir, self.config)
buildFarmActions(self.bench, task_name)
logger.info("Finished %s" % task_name)
raise StopLocust("task_name") # Run once only and die
class P4RepoTestLocust(Locust):
"""Will be imported and then run by locust"""
task_set = AllTasks
| # | Change | User | Description | Committed | |
|---|---|---|---|---|---|
| #21 | 24711 | Robert Cowham | Restructure and tidy up | ||
| #20 | 24686 | Robert Cowham | Getting it working - especially with Python3 | ||
| #19 | 22003 | Robert Cowham | Latest state - with p4python and no syncing | ||
| #18 | 21899 | Robert Cowham | Randomly choose hostname instead of attempting to evenly split | ||
| #17 | 21885 | Robert Cowham | Add sync_args | ||
| #16 | 21820 | Robert Cowham |
Remove unused code. Make a little more configurable via config file |
||
| #15 | 21800 | Robert Cowham | Sync one level down - hard coded to select 50 for now | ||
| #14 | 21799 | Robert Cowham |
Avoid mkdirs error. Remove user task |
||
| #13 | 21793 | Robert Cowham | Use hostname to choose which server to connect to | ||
| #12 | 21769 | Robert Cowham | Allow multiple ports to be specified - for random choice | ||
| #11 | 21767 | Robert Cowham | add latest stuff with analyse | ||
| #10 | 21761 | Robert Cowham |
Avoid stats reset. Stop each task after it has run once |
||
| #9 | 21760 | Robert Cowham | Log config and ws options | ||
| #8 | 21759 | Robert Cowham | Allow options to be configurable | ||
| #7 | 21744 | Robert Cowham | Output sync progress via logger too | ||
| #6 | 21743 | Robert Cowham | Make it configurable | ||
| #5 | 21742 | Robert Cowham | Added partial sync reporting | ||
| #4 | 21737 | Robert Cowham | Fix dirs command | ||
| #3 | 21735 | Robert Cowham | Put all benchmark into one file | ||
| #2 | 21718 | Robert Cowham | Refactored a little - into one file | ||
| #1 | 21717 | Robert Cowham | Trying nested tasks |