#!/usr/bin/env python # #Copyright (c) 2009, Perforce Software, Inc. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL PERFORCE SOFTWARE, INC. BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #******************************************************************************* #Author: Stephen Moon #This program spawns processes using Python's multiprocessing module. #You can't have more clients than the available commands because there is at least #one client for each commands. If there are more commands than the available #clients, there will be more than one command running in each client. # #So far, I have tried to implement this for the commands worked on for lockless reads. #But, the list may get expanded to be comprehensive. # #By manually modifying the code in main, you can increase the number of files to be #added as well as number of runs for each command. Each command will be put in a loop #so that it runs continuously. Each process will have the client number and # the command that it is associated with as well as the instance number and run number. #If any one instance of a command finishes the specified number of runs, it will kill #all processes running concurrently. # #CAUTION: If you have more than one writer, you need to run them in a separate client #for now. Otherwise, you will get "No files in default changelist" error message. #If you have two writers, specify at least two clients and so on. import os, sys, shutil, time, re import multiprocessing, logging, create_client, create_file, create_user from subprocess import PIPE, Popen ''' p4 fstat (have and working tables peeked and processed early - all tables except hx/dx) p4 filelog (lockless) p4 diff lockless (some options are not) p4 submit (rewritten into two-stage commit, also updates 'maxCommitChange' post change p4 integ/merge/copy (copy uses buffered hx/dx) p4 interchanges (streams/copy uses hx/dx) p4 istat p4 changes p4 dirs (lockless) p4 sync p4 streams p4 files/sizes/print (-a only) p4 diff2 p4 cstat (partial) p4 opened (partial) p4 resolved p4 have p4 annotate p4 depots p4 fixes p4 jobs (-e still read locks indexes) p4 integed ''' class P4_Process(object): count = icount = scount = 0 def __init__(self, port): #port string from the stdin argument self.one_param = ('filelog','dirs','changes','fstat','sync','files', 'sizes','print','opened','have','annotate','depots','fixes', 'jobs','integed','integrated','dirs','cstat') self.two_param = ('diff','interchanges','diff2','populate') self.two_param_submit = ('integ','copy','merge') self.P4 = 'p4' self.P4_Ztrack = '-Ztrack' self.P4PORT = port self.run_cmd = [self.P4, self.P4_Ztrack, '-p', self.P4PORT, '-c'] def exit_message(self, process_name, cmd, count, p4debug): msg = '' if cmd == 'submit': msg = "Number of changes submitted: " + str(count) + " Name: " + process_name elif cmd in self.one_param: msg = "Last " + str(cmd) + ": Process " + str(count) + " Name: " + process_name elif cmd in self.two_param: msg = "Last " + str(cmd) + ": Process " + str(count) + " Name: " + process_name print(msg.center(80,' ')) p4debug.debug(msg) def p4process(self, e, process_name, fq_port, port, cmd, mcount, dirname, client_number, p4debug, p4error): prefix = '//depot/' src_path = prefix + dirname #//depot/test dest_path = prefix + process_name #//depot/integ1_t11112 client_used = 't' + str(int(port) + client_number) user_used = 'u' + str(int(port) + client_number) if cmd in self.one_param: self.run_cmd.append(client_used) self.run_cmd.append('-u') self.run_cmd.append(user_used) self.run_cmd.append(cmd) if cmd == 'dirs': self.run_cmd.append(prefix + '*') #//depot/* elif cmd == 'have': create_file.sync_files('sync', fq_port, client_used, user_used, dirname, p4debug, p4error) self.run_cmd.append(src_path + '/...') #//depot/test/... else: self.run_cmd.append(src_path + '/...') #//depot/test/... P4_Process.count = 0 while(P4_Process.count < mcount): try: P4_Process.count += 1 (out,err) = Popen(self.run_cmd,stdin=PIPE,stdout=PIPE).communicate() p4debug.debug(cmd + " from p4process: " + process_name + "_PROCESS " + str(P4_Process.count)) msg = cmd + " from p4process: " + process_name + "_PROCESS " + str(P4_Process.count) print(msg.center(80,'.')) self.get_ztrack_output(process_name, cmd, out, "", p4debug) sys.stdout.flush() time.sleep(2) if e.is_set(): self.exit_message(process_name, cmd, P4_Process.count, p4debug) sys.exit(1) except Exception as e: p4error.exception(e) e.set() self.exit_message(process_name, cmd, P4_Process.count, p4debug) elif cmd in self.two_param: self.run_cmd.append(client_used) self.run_cmd.append('-u') self.run_cmd.append(user_used) if cmd == 'interchanges': self.run_cmd.append(cmd) self.run_cmd.append('-f') elif cmd == 'populate': self.run_cmd.append(cmd) P4_Process.icount = 0 while(P4_Process.icount < mcount): P4_Process.icount += 1 if cmd == 'interchanges': self.run_cmd.append(src_path + '/...') #//depot/test/... self.run_cmd.append(prefix + 'integ...') #//depot/test0/... elif cmd == 'populate': self.run_cmd.append('-d') self.run_cmd.append('submitting_' + process_name) self.run_cmd.append(src_path + '/...') #//depot/test/... self.run_cmd.append(dest_path + str(P4_Process.icount) + 'p/...') #//depot/test0p/... try: #print("cmd: " + cmd + " run: ",self.run_cmd) (out,err) = Popen(self.run_cmd,stdin=PIPE,stdout=PIPE).communicate() p4debug.debug(cmd + " from p4process: " + process_name + "_PROCESS " + str(P4_Process.icount)) msg = cmd + " from p4process: " + process_name + "_PROCESS " + str(P4_Process.icount) self.get_ztrack_output(process_name, cmd, out, "", p4debug) sys.stdout.flush() time.sleep(2) if e.is_set(): self.exit_message(process_name, cmd, P4_Process.icount, p4debug) sys.exit(1) except Exception as e: p4error.exception(e) finally: if cmd == 'populate': self.run_cmd.remove(dest_path + str(P4_Process.icount) + 'p/...') #//depot/test0p/... self.run_cmd.remove(src_path + '/...') #//depot/test/... self.run_cmd.remove('submitting_' + process_name) self.run_cmd.remove('-d') elif cmd == 'interchanges': self.run_cmd.remove(prefix + 'integ...') #//depot/test0p/... self.run_cmd.remove(src_path + '/...') #//depot/test/... e.set() self.exit_message(process_name, cmd, P4_Process.icount, p4debug) elif cmd in self.two_param_submit: create_file.sync_files('sync', fq_port, client_used, user_used, dirname, p4debug, p4error) p4debug.debug("CMD: " + cmd) while(P4_Process.scount < mcount): P4_Process.scount += 1 try: (integ_array, submit_array) = create_file.integ_files(fq_port, client_used, user_used, process_name, cmd, P4_Process.scount, dirname, p4debug, p4error) p4debug.debug(cmd + " from p4process: " + process_name + "_PROCESS " + str(P4_Process.scount)) msg = cmd + " from p4process: " + process_name + "_PROCESS " + str(P4_Process.scount) print(msg.center(80,'#')) self.get_ztrack_output(process_name, cmd, integ_array, cmd, p4debug) self.get_ztrack_output(process_name, cmd, submit_array, "submit", p4debug) sys.stdout.flush() if e.is_set(): self.exit_message(process_name, cmd, P4_Process.scount, p4debug) sys.exit(1) except Exception as err: p4error.exception(err) e.set() self.exit_message(process_name, cmd, P4_Process.scount, p4debug) def get_ztrack_output(self, process_name, cmd, out, out_type, p4debug): ztrack = re.compile(r'^---') for each_line in out.split(os.linesep): if ztrack.match(each_line): if cmd in self.one_param: p4debug.debug(process_name + " :" + str(P4_Process.count) + ":" + each_line) elif cmd in self.two_param: p4debug.debug(process_name + " :" + str(P4_Process.icount) + ":" + each_line) else: if out_type == "integ": p4debug.debug(process_name + " :integ" + str(P4_Process.scount) + ":" + each_line) elif out_type == "copy": p4debug.debug(process_name + " :copy" + str(P4_Process.scount) + ":" + each_line) elif out_type == "merge": p4debug.debug(process_name + " :merge" + str(P4_Process.scount) + ":" + each_line) elif out_type == "submit": p4debug.debug(process_name + " :submit" + str(P4_Process.scount) + ":" + each_line) def p4info(self, port, p4debug, p4error): info_cmd = ['p4','-p',port,'info'] configure_cmd = ['p4','-p',port,'configure','show'] try: (info_out,info_err) = Popen(info_cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE).communicate() p4debug.debug(info_out) except Exception as e: p4error.exception(e) try: (conf_out,conf_err) = Popen(configure_cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE).communicate() p4debug.debug(conf_out) except Exception as e: p4error.exception(e) def logAutomation(self,logName): #Enable logging of the backup script logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s', datefmt='%m-%d %H:%M', filename= logName + ".log", filemode='w' ) # define a Handler which writes INFO messages or higher to the sys.stderr console = logging.StreamHandler() console.setLevel(logging.INFO) # set a format which is simpler for console use formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') # tell the handler to use this format console.setFormatter(formatter) # add the handler to the root logger logging.getLogger('').addHandler(console) #define all the environmental variables self.p4debug = logging.getLogger('p4debug') self.p4error = logging.getLogger('p4error') return (self.p4debug,self.p4error) def pick_changes_delete(port_num, chg): describe_cmd = ['p4','-p',port_num,'describe','-s',chg] submit = re.compile(r'^\s+submitting_change\s\w+\s(.+)$') try: (describe_out,describe_err) = Popen(describe_cmd,stdin=PIPE,stdout=PIPE).communicate() for each_line in describe_out.split(os.linesep): m = submit.match(each_line) if m != None: return (chg, m.group(1)) except Exception as e: print(e) return (chg,'no_submit') def get_changes(port_num): changes = [] submits = [] port = str(port_num) chg_regex = re.compile(r'^Change (\d+) on') changes_cmd = ['p4','-p', port, 'changes'] try: (changes_out,changes_err) = Popen(changes_cmd,stdin=PIPE,stdout=PIPE).communicate() for each_line in changes_out.split(os.linesep): m = chg_regex.match(each_line) if m != None: (each_change, each_submit) = pick_changes_delete(port, m.group(1)) if each_submit != 'no_submit': changes.append(each_change) submits.append(each_submit) except Exception as e: print(e) return (changes, submits) def get_clients(port_num): clients = [] port = str(port_num) client = re.compile(r'^Client (t\d+) .+$') clients_cmd = ['p4','-p', port, 'clients'] try: (clients_out,clients_err) = Popen(clients_cmd,stdin=PIPE,stdout=PIPE).communicate() for each_line in clients_out.split(os.linesep): m = client.match(each_line) if m != None: clients.append(m.group(1)) except Exception as e: print(e) return clients def delete_clients(port): clients = get_clients(port) client_cmd = ['p4','-p', port, 'client', '-d', '-f'] for each_num in clients: client = each_num try: client_cmd.append(client) (client_out,client_err) = Popen(client_cmd, stdin=PIPE, stdout=PIPE).communicate() print("client_delete:",client_out) print("client_delete_error:",client_err) shutil.rmtree(client) except Exception as e: print(e) finally: client_cmd.remove(client) def get_users(port_num): users = [] port = str(port_num) user = re.compile(r'(u\d+) .+$') users_cmd = ['p4','-p', port, 'users'] try: (users_out,users_err) = Popen(users_cmd,stdin=PIPE,stdout=PIPE).communicate() for each_line in users_out.split(os.linesep): m = user.match(each_line) if m != None: users.append(m.group(1)) except Exception as e: print(e) return users def delete_users(port): users = get_users(port) user_cmd = ['p4','-p', port, 'user', '-d', '-f'] for each_num in users: user = each_num try: user_cmd.append(user) (user_out,user_err) = Popen(user_cmd, stdin=PIPE, stdout=PIPE).communicate() print("user_delete:",user_out) except Exception as e: print(e) finally: user_cmd.remove(user) def delete_changes(port): deleted_changes = [] chg_regex = re.compile(r'^Change (\d+) deleted.') (changes, submits) = get_changes(port) change_cmd = ['p4','-p', port, 'change', '-d', '-f'] for each_change in changes: try: change_cmd.append(each_change) (change_out,change_err) = Popen(change_cmd,stdin=PIPE,stdout=PIPE).communicate() m = chg_regex.match(change_out) if m != None: deleted_changes.append(m.group(1)) except Exception as e: print(e) finally: change_cmd.remove(each_change) print("Changes deleted: ",deleted_changes) return (deleted_changes, submits) def obliterate(port, submits, dirname): oblit_cmd = ['p4','-p', port, 'obliterate', '-y'] for each_submit in submits: try: oblit_cmd.append("//depot/" + each_submit + "*/...") (out,err) = Popen(oblit_cmd, stdin=PIPE, stdout=PIPE).communicate() print("Obliterated: branch //depot/" + each_submit + "*/...") except Exception as e: print(e) finally: oblit_cmd.remove("//depot/" + each_submit + "*/...") try: oblit_cmd.append("//depot/" + dirname + "/...") (out,err) = Popen(oblit_cmd, stdin=PIPE, stdout=PIPE).communicate() print("Obliterated: branch //depot/" + dirname + "/...") except Exception as e: print(e) def p4_clean(port_num, dirname): port = str(port_num) delete_clients(port) (changes, submits) = delete_changes(port) obliterate(port, submits, dirname) (changes, submits) = delete_changes(port) delete_users(port) def build_cmd_list(writer_names, reader_names): cmds_list = [] for name, nums in writer_names.iteritems(): for i in range(nums): cmds_list.append(name) print("WRITER: " + name + " Counts: " + str(nums)) for name, nums in reader_names.iteritems(): for i in range(nums): cmds_list.append(name) print("READER: " + name + " Counts: " + str(nums)) return cmds_list def main(): if len(sys.argv) < 3 or len(sys.argv) > 5: print("Usage: {0} [clean]".format(sys.argv[0])) sys.exit(1) port = sys.argv[1] num_client =sys.argv[2] #writer_names = {'copy':20,'integ':20} #':' writer_names = {'copy':1} #':' #reader_names = {'fileog':40,'fstat':40,'interchanges':40,'diff':40,'diff2':40,'integed':40,'changes':40,'sync':40,'dirs':40} #reader_names = {'fileog':1,'fstat':1,'interchanges':1,'diff':1,'diff2':1,'integed':1,'changes':1,'sync':2,'dirs':1} reader_names = {'fileog':2,'fstat':1,'dirs':1,'interchanges':1,'sync':1,'integed':1} cmds_list = build_cmd_list(writer_names, reader_names) print(cmds_list) print("Number of commands: ",len(cmds_list)) if int(num_client) < 1 or len(cmds_list) < int(num_client): if not int(num_client): print("No client specified") else: print("Number of clients greater than the available commands") sys.exit(1) dir_or_fname = 'qa_test' if len(sys.argv) == 4: p4_clean(port,dir_or_fname) sys.exit(1) cmd = '' p_obj = P4_Process(port) (p4debug, p4error) = p_obj.logAutomation(os.path.basename(sys.argv[0]).split('.')[0] + '_' + port +'_' + num_client) sys.stdout.flush() p_obj.p4info(port,p4debug,p4error) for each_item in range(int(num_client)): user_name = create_user.create_user(port, each_item, p4debug, p4error) create_client.create_client(port, each_item, user_name, p4debug, p4error) #file_num = 20000 integ_repeated = 11 #cmd, port, client, path, dir/file, file_num, p4debug, p4error fq_port = '' #fully qualified port if port.isdigit(): fq_port = 'localhost:' + str(port) else: fq_port = port #p4prod.perforce.com:1666 port = port.split(':')[1] #1666 create_file.fast_open_files('add', fq_port, 't' + str(port), 'u' + str(port), 't' + str(port) , dir_or_fname, integ_repeated, p4debug, p4error) file_num = 500 * (2 ** integ_repeated) msg = " Done adding " + str(file_num) + " files. " print(msg.center(80,'#')) jobs = [] count = 30 e = multiprocessing.Event() seq = 0 elapsed_time = 0.0 start_time = time.time() for cmd in cmds_list: seq += 1 client_number = seq % int(num_client) process_name = cmd + str(seq) + '_t' + str(int(port) + client_number) p = multiprocessing.Process(name=process_name, target=p_obj.p4process,args=(e, process_name, fq_port, port, cmd, count, dir_or_fname, client_number, p4debug,p4error)) p.start() jobs.append(p) for each_p in jobs: print('process_name:',each_p) each_p.join() elapsed_time = time.time() - start_time print("Elapsed time: ", elapsed_time) p4debug.debug("Elapsed_time: " + str(elapsed_time)) if __name__ == '__main__': main()