#!/usr/bin/env python3 # -*- encoding: UTF8 -*- # Test harness for p4storageanalyzer.py from __future__ import print_function import sys import os import logging import unittest import P4 import time import string import six from faker import Faker import random from p4testutils import TestCase, P4Server, localDirectory, create_file, append_to_file parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, parent_dir) from p4storageanalyzer import P4StorageAnalyzer, LOGGER_NAME, DEFAULT_LOG_FILE, ArchiveLocator, FileInfo python3 = sys.version_info[0] >= 3 if python3: from io import StringIO else: from StringIO import StringIO HEADER_ROW = "change,user,workspace,submitDateTime,fileCount,lazyCount,path,clientSize,archiveSizeRCS,archiveSizeBinary\n" python3 = sys.version_info[0] >= 3 LINE_LENGTH = 80 BLOCK_SIZE = 4096 fake = Faker() # Random generators try: import numpy as np def create_random(size): if python3: return bytes((int(x) for x in np.random.randint(1, 254, size))) else: return bytearray((int(x) for x in np.random.randint(1, 254, size))) def generator(size, eol="\n"): s = string.ascii_letters + string.digits return "".join(np.random.choice(list(s), size - 1)) + eol except ImportError: print("No numpy installed, falling back to standard Python random. Prepare to wait ...", file=sys.stderr) def create_random(size): if python3: return bytes((random.randint(1, 254) for x in range(size))) else: return bytearray((random.randint(1, 254) for x in range(size))) def generator(size, eol="\n"): s = string.ascii_letters + string.digits return "".join((random.choice(s) for x in range(size - 1))) + eol def create_random_file(fileSize, filename, binary=False): "Approximation for data generation" # logger.debug("create_file '%s' binary: %s" % (filename, str(binary))) mode = "wb" if binary else "w" with open(filename, mode) as f: if binary: blocks = int(fileSize / BLOCK_SIZE) for unused in range(blocks): b = create_random(BLOCK_SIZE) f.write(b) else: lines = int(fileSize / LINE_LENGTH) for unused in range(lines): f.write(fake.text()) def getDateTime(timestr): return time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(int(timestr))) class StorageAnalyzerOptions(): def __init__(self, port=None, user=None, path=None, root=None, depth=0, append=False, depot_root=None, summary=None, verbosity=logging.DEBUG): self.port = port self.user = user self.path = path self.root = root self.depth = depth self.append = append self.depot_root = depot_root self.summary = "summary.csv" self.verbosity = verbosity self.verbose_rcs = False class TestP4StorageAnalyzer(TestCase): def __init__(self, methodName='runTest'): super(TestP4StorageAnalyzer, self).__init__(logger_name=LOGGER_NAME, log_file=DEFAULT_LOG_FILE, methodName=methodName) def setUp(self): self.server = P4Server() p4 = self.server.p4 self.p4 = p4 p4.logger = self.logger # p4.ignore_file = ".p4ignore-tests" os.environ["P4CONFIG"] = ".p4config-tests" def tearDown(self): pass def testDepotRootBasic(self): p4 = self.p4 options = StorageAnalyzerOptions() server_root = self.server.server_root a = ArchiveLocator(options, p4, self.logger) self.assertEqual((True, "file1,v"), a.getLbrExt('file1', 'text', '1.1')) self.assertEqual((True, "file1,v"), a.getLbrExt('file1', 'text', '1.2')) self.assertEqual((True, "file1,v"), a.getLbrExt('file1', 'symlink', '1.2')) self.assertEqual((True, "file1,v"), a.getLbrExt('file1', 'unicode', '1.2')) self.assertEqual((True, "file1,v"), a.getLbrExt('file1', 'utf8', '1.2')) self.assertEqual((True, "file1,v"), a.getLbrExt('file1', 'utf16', '1.2')) self.assertEqual((False, os.path.join("file1,d", "1.1.gz")), a.getLbrExt('file1', 'binary', '1.1')) self.assertEqual((False, os.path.join("file1,d", "1.1")), a.getLbrExt('file1', 'binary+F', '1.1')) self.assertEqual((False, os.path.join("file1,d", "1.1234.gz")), a.getLbrExt('file1', 'text+C', '1.1234')) fstats = [{'depotFile': '//depot/inside/file1', 'headAction': 'add', 'headType': 'text', 'headTime': '1530692642', 'headRev': '1', 'headChange': '1', 'headModTime': '1530692642', 'fileSize': '12', 'digest': 'B53227DA4280F0E18270F21DD77C91D0', 'lbrFile': '//depot/inside/file1', 'lbrRev': '1.1', 'lbrType': 'text', 'lbrIsLazy': '0'}] self.assertEqual(os.path.join(server_root, "depot"), a.getDepotRoot("//depot/inside/file1")) fileInfo = FileInfo(fstats[0]) a.setLbrPath(fileInfo, fstats[0]) self.assertEqual(os.path.join(server_root, "depot", "inside", "file1,v"), fileInfo.lbrPath) a = ArchiveLocator(StorageAnalyzerOptions(depot_root="/tmp/depots"), p4, self.logger) self.assertEqual("/tmp/depots/depot", a.getDepotRoot("//depot/inside/file1")) depot = p4.fetch_depot("abs1") depot['map'] = "/tmp/abs1/..." p4.save_depot(depot) p4.disconnect() p4.connect() a = ArchiveLocator(options, p4, self.logger) self.assertEqual("/tmp/abs1", a.getDepotRoot("//abs1/inside/file1")) depot = p4.fetch_depot("abs2") depot['map'] = "d:/p4/abs2/..." p4.save_depot(depot) p4.disconnect() p4.connect() a = ArchiveLocator(options, p4, self.logger) self.assertEqual("d:/p4/abs2", a.getDepotRoot("//abs2/inside/file1")) p4.run("configure", "set", "server.depot.root=/p4/1/depots") p4.disconnect() p4.connect() a = ArchiveLocator(options, p4, self.logger) self.assertEqual("/p4/1/depots/depot", a.getDepotRoot("//depot/inside/file1")) def run_test(self, p4, expected, depth=0, path=None, verbose_rcs=None): if not path: path = ["//..."] options = StorageAnalyzerOptions(path=path, port=p4.port, user=p4.user, depth=depth) if verbose_rcs: options.verbose_rcs = True self.saved_output = StringIO() obj = P4StorageAnalyzer(options, outstream=self.saved_output) obj.run() actual = self.saved_output.getvalue() self.assertMultiLineEqual(HEADER_ROW, actual[:len(HEADER_ROW)]) if not verbose_rcs: self.assertMultiLineEqual(expected, actual[len(HEADER_ROW):]) else: explines = [x for x in expected.split("\n")] actlines = [x for x in actual[len(HEADER_ROW):].split("\n")] self.assertEqual(len(explines), len(actlines)) for i in range(len(explines) - 1): eparts = explines[i].split(",") aparts = actlines[i].split(",") self.assertEqual(eparts[:-2], aparts[:-2]) self.assertTrue(abs(int(aparts[-2]) - int(eparts[-2])) < 100, "Sizes different: %s vs %s" % (explines[i], actlines[i])) def testBasic(self): """Basic functions""" p4 = self.p4 self.maxDiff = None expected = "" self.run_test(p4, expected=expected) inside = localDirectory(self.server.client_root, "inside") # outside = localDirectory(self.server.client_root, "outside") file1 = os.path.join(inside, "file1") # # file2 = os.path.join(inside, "file2") create_file(file1, "Some content") # # create_file(file2, "Some content2") p4.run_add(file1) p4.run_submit("-d", "Initial checkin") desc = p4.run_describe("-s", "1") submit_time1 = getDateTime(desc[0]['time']) expected = "1,testuser,test_ws,%s,1,0,//depot/inside/file1,12,184,0\n" % submit_time1 self.run_test(p4, expected=expected) file2 = os.path.join(inside, "file2") create_random_file(2048, file2) file3 = os.path.join(inside, "file3.bin") create_random_file(5000, file3, binary=True) p4.run_add(file2) p4.run_add("-t", "binary", file3) p4.run_submit("-d", "More files") desc = p4.run_describe("-s", "2") submit_time2 = getDateTime(desc[0]['time']) file1_clientSize = os.path.getsize(os.path.join(self.server.client_root, 'inside', 'file1')) file2_clientSize = os.path.getsize(os.path.join(self.server.client_root, 'inside', 'file2')) file3_clientSize = os.path.getsize(os.path.join(self.server.client_root, 'inside', 'file3.bin')) file1_lbrSize = os.path.getsize(os.path.join(self.server.server_root, 'depot', 'inside', 'file1,v')) file2_lbrSize = os.path.getsize(os.path.join(self.server.server_root, 'depot', 'inside', 'file2,v')) file3_lbrSize = os.path.getsize(os.path.join(self.server.server_root, 'depot', 'inside', 'file3.bin,d', '1.2.gz')) expected = """2,testuser,test_ws,{submit_time2},1,0,//depot/inside/file2,{file2_clientSize},{file2_lbrSize},0 2,testuser,test_ws,{submit_time2},1,0,//depot/inside/file3.bin,{file3_clientSize},0,{file3_lbrSize} 1,testuser,test_ws,{submit_time1},1,0,//depot/inside/file1,{file1_clientSize},{file1_lbrSize},0 """.format( submit_time1=submit_time1, submit_time2=submit_time2, file1_clientSize=file1_clientSize, file1_lbrSize=file1_lbrSize, file2_clientSize=file2_clientSize, file2_lbrSize=file2_lbrSize, file3_clientSize=file3_clientSize, file3_lbrSize=file3_lbrSize) self.run_test(p4, expected=expected) # Test depth expected = """2,testuser,test_ws,{submit_time2},2,0,//depot/inside/,{sum_clientSize},{sum_lbrRCSSize},{sum_lbrBinarySize} 1,testuser,test_ws,{submit_time1},1,0,//depot/inside/,{file1_clientSize},{file1_lbrSize},0 """.format( submit_time1=submit_time1, submit_time2=submit_time2, file1_clientSize=file1_clientSize, file1_lbrSize=file1_lbrSize, sum_clientSize=file2_clientSize + file3_clientSize, sum_lbrRCSSize=file2_lbrSize, sum_lbrBinarySize=file3_lbrSize) self.run_test(p4, depth=2, expected=expected) file4 = os.path.join(inside, "file4.bin") create_random_file(6000, file4, binary=True) p4.run_add("-t", "binary+F", file4) p4.run_submit("-d", "More files") desc = p4.run_describe("-s", "3") submit_time3 = getDateTime(desc[0]['time']) file4_clientSize = os.path.getsize(os.path.join(self.server.client_root, 'inside', 'file4.bin')) file4_lbrSize = os.path.getsize(os.path.join(self.server.server_root, 'depot', 'inside', 'file4.bin,d', '1.3')) # Test depth expected = """3,testuser,test_ws,{submit_time3},1,0,//depot/inside/,{file4_clientSize},0,{file4_lbrBinarySize} 2,testuser,test_ws,{submit_time2},2,0,//depot/inside/,{sum_clientSize},{sum_lbrRCSSize},{sum_lbrBinarySize} 1,testuser,test_ws,{submit_time1},1,0,//depot/inside/,{file1_clientSize},{file1_lbrSize},0 """.format( submit_time1=submit_time1, submit_time2=submit_time2, submit_time3=submit_time3, file1_clientSize=file1_clientSize, file1_lbrSize=file1_lbrSize, sum_clientSize=file2_clientSize + file3_clientSize, sum_lbrRCSSize=file2_lbrSize, sum_lbrBinarySize=file3_lbrSize, file4_lbrBinarySize=file4_lbrSize, file4_clientSize=file4_clientSize) self.run_test(p4, depth=2, expected=expected) # Branch files - shouldn't be counted - well only clientSize the other 2 values should be 0 p4.run_integ("//depot/inside/...", "//depot/outside/...") p4.run_submit("-d", "Branched files") desc = p4.run_describe("-s", "4") submit_time4 = getDateTime(desc[0]['time']) # Test depth expected = """4,testuser,test_ws,{submit_time4},4,4,//depot/outside/,{chg4_clientSize},0,0 3,testuser,test_ws,{submit_time3},1,0,//depot/inside/,{file4_clientSize},0,{file4_lbrBinarySize} 2,testuser,test_ws,{submit_time2},2,0,//depot/inside/,{sum_clientSize},{sum_lbrRCSSize},{sum_lbrBinarySize} 1,testuser,test_ws,{submit_time1},1,0,//depot/inside/,{file1_clientSize},{file1_lbrSize},0 """.format( submit_time1=submit_time1, submit_time2=submit_time2, submit_time3=submit_time3, submit_time4=submit_time4, file1_clientSize=file1_clientSize, file1_lbrSize=file1_lbrSize, sum_clientSize=file2_clientSize + file3_clientSize, sum_lbrRCSSize=file2_lbrSize, sum_lbrBinarySize=file3_lbrSize, file4_lbrBinarySize=file4_lbrSize, file4_clientSize=file4_clientSize, chg4_clientSize=file1_clientSize + file2_clientSize + file3_clientSize + file4_clientSize) self.run_test(p4, depth=2, expected=expected) def testVerboseRcs(self): """Test rewriting""" p4 = self.p4 self.maxDiff = None inside = localDirectory(self.server.client_root, "inside") # outside = localDirectory(self.server.client_root, "outside") file1 = os.path.join(inside, "file1") # # file2 = os.path.join(inside, "file2") create_file(file1, "Some content") # # create_file(file2, "Some content2") p4.run_add(file1) p4.run_submit("-d", "Initial checkin") desc = p4.run_describe("-s", "1") submit_time1 = getDateTime(desc[0]['time']) # Record size after first version created file1_clientSize1 = os.path.getsize(os.path.join(self.server.client_root, 'inside', 'file1')) file1_lbrSize1 = os.path.getsize(os.path.join(self.server.server_root, 'depot', 'inside', 'file1,v')) p4.run_edit(file1) append_to_file(file1, "More content") p4.run_submit("-d", "More edits") desc = p4.run_describe("-s", "2") submit_time2 = getDateTime(desc[0]['time']) file1_clientSize2 = os.path.getsize(os.path.join(self.server.client_root, 'inside', 'file1')) file1_lbrSize2 = os.path.getsize(os.path.join(self.server.server_root, 'depot', 'inside', 'file1,v')) expected = """2,testuser,test_ws,{submit_time2},1,0,//depot/inside/file1,{file1_clientSize2},{file1_lbrSize2},0 1,testuser,test_ws,{submit_time1},1,0,//depot/inside/file1,{file1_clientSize1},{file1_lbrSize1},0 """.format( submit_time1=submit_time1, submit_time2=submit_time2, file1_clientSize1=file1_clientSize1, file1_lbrSize1=file1_lbrSize1, file1_clientSize2=file1_clientSize2, file1_lbrSize2=file1_lbrSize2 - file1_lbrSize1) self.run_test(p4, expected=expected, verbose_rcs=True) if __name__ == '__main__': unittest.main()