#!/usr/bin/env python3 # -*- encoding: UTF8 -*- """############################################################################## # # Copyright (c) 2008,2016 Perforce Software, Inc. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL PERFORCE SOFTWARE, INC. BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # # = Description # # Output SQL queries generated by parsing commands logged in a Perforce # Proxy log file - ideally one created with -vtrack=1 # The output can be imported directly in any SQL database. # # pxlog2sql populates the following tables: # # +----------------+----------+------+-----+ # | syncs | # +----------------+----------+------+-----+ # | Field | Type | Null | Key | # +----------------+----------+------+-----+ # | lineNumber | int | NO | PRI | # | endTime | date | YES | | # | completedLapse | float | YES | | # | pid | int | NO | | # | ip | text | NO | | # | uCpu | int | YES | | # | sCpu | int | YES | | # | diskIn | int | YES | | # | diskOut | int | YES | | # | ipcIn | int | YES | | # | ipcOut | int | YES | | # | maxRss | int | YES | | # | pageFaults | int | YES | | # | rpcMsgsIn | int | YES | | # | rpcMsgsOut | int | YES | | # | rpcSizeIn | int | YES | | # | rpcSizeOut | int | YES | | # | rpcHimarkFwd | int | YES | | # | rpcHimarkRev | int | YES | | # | rpcSnd | float | YES | | # | rpcRcv | float | YES | | # | filesServer | int | YES | | # | filesCache | int | YES | | # | MBServer | int | YES | | # | MBCache | int | YES | | # +----------------+----------+------+-----+ # # = Usage # # See below # # = Requirements # # = Algorithm # # Parses the log into "blocks" of information which are output at the end of a sync command. # # Note that the proxy logs this information if run with parameter "-vtrack=1". The key # proxytotals line is only output for 16.2+ versions of p4p (latest patch versions). # # Perforce proxy info: # 2018/04/06 13:46:19 pid 51449 completed .246s # --- lapse .246s # --- usage 122+92us 0+0io 6+10143net 1101824k 0pf # --- rpc msgs/size in+out 5+10010/0mb+39mb himarks 2000/2000 snd/rcv .005s/.009s # --- proxy faults 0 MB 0 other 0 flushes 2 cached 2 # --- proxytotals files/size svr+cache 0+2/0mb+40mb # ############################################################################## """ from __future__ import print_function import re import sys import os import io import math import argparse import datetime import hashlib import sqlite3 import logging import time from collections import defaultdict from six import iteritems import zipfile import gzip python3 = sys.version_info[0] >= 3 RowsPerTransaction = 50000 # Commit every so many rows if in SQL mode DEFAULT_VERBOSITY = 'INFO' DEFAULT_LOG_FILE = 'log-pxlog2sql.log' # Output log file LOGGER_NAME = 'LOG2SQL' def escapeChar(str): str = str.replace("\\", "\\\\") str = str.replace("\"", "\\\"") return str def dateAdd(str, seconds): "Add specified seconds to date in string format" date = datetime.datetime.strptime(str, "%Y-%m-%d %H:%M:%S") date = date + datetime.timedelta(seconds=seconds) return date.strftime("%Y-%m-%d %H:%M:%S") def nullValue(str): "For SQL inserts" if str is None: return "NULL" return str def quotedNullValue(str): "For SQL inserts" if str is None: return "NULL" return '"%s"' % str epochCache = {} def getEpoch(str): "Handles conversion of date time strings to Unix epoch int seconds (since 1970) - using a cache for performance" try: return epochCache[str] except KeyError: dt = int(time.mktime(datetime.datetime.strptime(str, "%Y-%m-%d %H:%M:%S").timetuple())) epochCache[str] = dt return dt class CompressedFile(object): magic = None file_type = None proper_extension = None def __init__(self, filename): self.filename = filename @classmethod def is_magic(self, data): return data.startswith(self.magic) def fileSize(self): return os.stat(self.filename).st_size def open(self): return None class TextFile(CompressedFile): def fileSize(self): return os.stat(self.filename).st_size def open(self): return io.open(self.filename, "r", encoding="latin1", errors="backslashreplace") class ZIPFile(CompressedFile): magic = b'\x50\x4b\x03\x04' file_type = 'zip' def fileSize(self): return os.stat(self.filename).st_size * 20 # return zipfile.ZipFile.getinfo(self.filename).file_size def open(self): z = zipfile.ZipFile(self.filename, 'r') files = z.infolist() return io.TextIOWrapper(z.open(files[0], 'r')) class GZFile(CompressedFile): magic = b'\x1f\x8b\x08' file_type = 'gz' def fileSize(self): return os.stat(self.filename).st_size * 20 def open(self): if python3: return io.TextIOWrapper(gzip.open(self.filename, 'r')) else: gzip.GzipFile.read1 = gzip.GzipFile.read return io.TextIOWrapper(gzip.open(self.filename, 'r')) # factory function to create a suitable instance for accessing files def open_file(filename): with open(filename, 'rb') as f: start_of_file = f.read(1024) f.seek(0) for cls in (ZIPFile, GZFile): if cls.is_magic(start_of_file): return cls(filename) return TextFile(filename) class Command: processKey = None pid = 0 completed = False # Set when a completed record is found hasTrackInfo = False endTime = None endTimeEpoch = 0 completedLapse = 0.0 uCpu = sCpu = diskIn = diskOut = None ipcIn = ipcOut = maxRss = pageFaults = None rpcMsgsIn = rpcMsgsOut = rpcSizeIn = rpcSizeOut = None rpcHimarkFwd = rpcHimarkRev = rpcSnd = rpcRcv = None lineNumber = 0 filesServer = filesCache = MBServer = MBCache = None def __init__(self, lineNumber, pid, endTime, completedLapse): self.lineNumber = lineNumber self.pid = pid self.endTime = endTime self.endTimeEpoch = getEpoch(endTime) self.completedLapse = completedLapse self.uCpu = None self.sCpu = self.diskIn = self.diskOut = self.ipcIn = self.ipcOut = self.maxRss = None self.pageFaults = self.rpcMsgsIn = self.rpcMsgsOut = self.rpcSizeOut = None self.rpcSizeIn = self.rpcHimarkFwd = self.rpcHimarkRev = self.error = None self.rpcSnd = self.rpcRcv = None def setUsage(self, uCpu, sCpu, diskIn, diskOut, ipcIn, ipcOut, maxRss, pageFaults): self.uCpu = uCpu self.sCpu = sCpu self.diskIn = diskIn self.diskOut = diskOut self.ipcIn = ipcIn self.ipcOut = ipcOut self.maxRss = maxRss self.pageFaults = pageFaults def setRpc(self, rpcMsgsIn, rpcMsgsOut, rpcSizeIn, rpcSizeOut, rpcHimarkFwd, rpcHimarkRev, rpcSnd=None, rpcRcv=None): self.rpcMsgsIn = rpcMsgsIn self.rpcMsgsOut = rpcMsgsOut self.rpcSizeIn = rpcSizeIn self.rpcSizeOut = rpcSizeOut self.rpcHimarkFwd = rpcHimarkFwd self.rpcHimarkRev = rpcHimarkRev self.rpcSnd = rpcSnd self.rpcRcv = rpcRcv def setProxyTotals(self, filesServer, MBServer, filesCache, MBCache): self.filesServer = int(filesServer) self.filesCache = int(filesCache) self.MBServer = int(MBServer) self.MBCache = int(MBCache) class Block: def __init__(self): self.lines = [] self.lineNo = 0 def addLine(self, line, lineNo): self.lines.append(line) # Only at start of block if not self.lineNo: self.lineNo = lineNo class PxLog2sql: logname = None dbname = None logfile = None ckpSize = None readBytes = None reportingInterval = None cmds = None lineNo = 0 countSyncs = 0 def __init__(self, options, inStream=None, outStream=None, errStream=None, csvStream=None): if not options.dbname: root, ext = os.path.splitext(options.logfile[0]) dname, fname = os.path.split(root) options.dbname = fname self.dbname = options.dbname self.dbfile = "%s.db" % options.dbname self.options = options self.options.sql = not self.options.no_sql self.inStream = inStream self.outStream = outStream # For testing self.errStream = errStream # For testing self.csvStream = csvStream self.init_logger() self.reportingInterval = 10.0 if options.reset and os.path.exists(self.dbfile): self.logger.info("Cleaning database: %s" % self.dbfile) os.remove(self.dbfile) if outStream is None: if options.output: if options.output == "-": self.outStream = sys.stdout else: self.outStream = open(options.output, "w") else: self.outStream = None if csvStream is None: if options.csv: self.csvStream = open(options.csv, "w") else: self.csvStream = csvStream if self.options.sql: self.logger.info("Creating database: %s" % self.dbfile) self.conn = sqlite3.connect(self.dbfile) self.conn.text_factory = str self.cursor = self.conn.cursor() # Performance PRAGMAS - at some risk of security - but we can always be run again self.cursor.execute("PRAGMA synchronous = OFF") self.cursor.execute("PRAGMA journal_mode = OFF") self.cursor.execute("PRAGMA locking_mode = EXCLUSIVE") self.readBytes = 0.0 self.running = 0 self.rowCount = 0 self.db_create_database() self.db_create_syncs_table() query = "BEGIN TRANSACTION;" self.output(query) if self.options.sql: try: self.conn.execute(query) except: pass def openLog(self, logname): if self.inStream is None: logfile = open_file(logname) self.logfile = logfile.open() self.ckpSize = logfile.fileSize() self.logger.info("Processing log: %s" % logname) # Autoset reporting interval based on size if self.options.interval is None: self.options.fileInterval = 10 if self.ckpSize > 10 * 1000 * 1000: self.options.fileInterval = 5 elif self.ckpSize > 100 * 1000 * 1000: self.options.fileInterval = 2 elif self.ckpSize > 500 * 1000 * 1000: self.options.fileInterval = 1 self.reportingInterval = float(self.options.fileInterval) else: self.logfile = self.inStream self.ckpSize = 500 self.options.fileInterval = 10 def init_logger(self): self.logger = logging.getLogger(LOGGER_NAME) self.logger.setLevel(self.options.verbosity) formatter = logging.Formatter('%(asctime)s:%(levelname)s %(message)s') if self.errStream: ch = logging.StreamHandler(self.errStream) ch.setLevel(self.options.verbosity) else: ch = logging.StreamHandler(sys.stderr) ch.setLevel(logging.INFO) ch.setFormatter(formatter) self.logger.addHandler(ch) if self.options.verbosity != logging.INFO and self.options.outlog: fh = logging.FileHandler(self.options.outlog, mode='w') fh.setFormatter(formatter) self.logger.addHandler(fh) def outputRequired(self): return self.outStream is not None def output(self, text): if self.outStream: try: self.outStream.write("%s\n" % text) except UnicodeEncodeError: str = text.encode(encoding="latin1", errors="backslashreplace") self.outStream.write("%s\n" % str) def outputCsvRequired(self): return self.csvStream is not None def outputCsv(self, text): if self.csvStream: try: self.csvStream.write("%s\n" % text) except UnicodeEncodeError: str = text.encode(encoding="latin1", errors="backslashreplace") self.csvStream.write("%s\n" % str) def terminate(self): self.db_commit(True) if self.options.sql: self.conn.commit() self.conn.close() def getLineNumber(self): return self.lineNo def db_commit_updates(self): self.rowCount += 1 if self.rowCount % RowsPerTransaction == 0: query = "COMMIT;" self.output(query) if self.options.sql: self.conn.commit() query = "BEGIN TRANSACTION;" self.output(query) if self.options.sql: self.conn.execute(query) def db_commit(self, state): if (state): query = "COMMIT;" self.output(query) if self.options.sql: self.conn.commit() query = "BEGIN TRANSACTION;" self.output(query) if self.options.sql: self.conn.execute(query) else: query = "SET autocommit=0;" self.output(query) def db_create_database(self): query = "CREATE DATABASE IF NOT EXISTS " + self.dbname + ";" self.output(query) query = "USE " + self.dbname + ";" self.output(query) def db_create_syncs_table(self): query = "DROP TABLE IF EXISTS syncs;" self.output(query) query = "CREATE TABLE syncs (lineNumber INT NOT NULL, pid INT NOT NULL, " \ "endTime DATETIME NULL, completedLapse FLOAT NULL, " \ "uCpu INT NULL, sCpu INT NULL, diskIn INT NULL, diskOut INT NULL, ipcIn INT NULL, " \ "ipcOut INT NULL, maxRss INT NULL, pageFaults INT NULL, rpcMsgsIn INT NULL, rpcMsgsOut INT NULL, " \ "rpcSizeIn INT NULL, rpcSizeOut INT NULL, rpcHimarkFwd INT NULL, rpcHimarkRev INT NULL, " \ "rpcSnd FLOAT NULL, rpcRcv FLOAT NULL, " \ "filesServer int NULL, MBServer int NULL, filesCache int NULL, MBCache int NULL, " \ "PRIMARY KEY (lineNumber));" self.output(query) self.outputCsv("lineNo,pid,endTime,completeLapse,filesServer,MBServer,filesCache,MBCache") if self.options.sql: try: self.cursor.execute(query) except: pass def processTrackRecords(self, cmd, lines): for line in lines: if not RE_TRACK.match(line): break gotMatch = False match = RE_TRACK_LAPSE.match(line) if match: gotMatch = True cmd.completedLapse = match.group(1) if not gotMatch: match = RE_TRACK_LAPSE2.match(line) if match: gotMatch = True cmd.completedLapse = "0." + match.group(1) if not gotMatch: match = RE_TRACK_USAGE.match(line) if match: gotMatch = True cmd.setUsage(match.group(1), match.group(2), match.group(3), match.group(4), match.group(5), match.group(6), match.group(7), match.group(8)) if not gotMatch: match = RE_TRACK_RPC2.match(line) if match: gotMatch = True cmd.setRpc(match.group(1), match.group(2), match.group(3), match.group(4), match.group(5), match.group(6), match.group(7), match.group(8)) if not gotMatch: match = RE_TRACK_RPC.match(line) if match: gotMatch = True cmd.setRpc(match.group(1), match.group(2), match.group(3), match.group(4), match.group(5), match.group(6)) if not gotMatch: match = RE_TRACK_PROXY.match(line) if match: gotMatch = True # Ignore line if not gotMatch: match = RE_TRACK_TOTALS.match(line) if match: gotMatch = True cmd.setProxyTotals(filesServer=match.group(1), filesCache=match.group(2), MBServer=match.group(3), MBCache=match.group(4)) if not gotMatch: self.logger.debug("Unrecognised track: %d, %s" % (cmd.lineNumber, line[:-1])) self.sql_syncs_insert(cmd) def sql_syncs_insert(self, cmd): if self.outputRequired(): query = 'INSERT IGNORE INTO syncs VALUES (%d,%d,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);' % \ (cmd.lineNumber, cmd.pid, quotedNullValue(cmd.endTime), nullValue(cmd.completedLapse), nullValue(cmd.rpcMsgsIn), nullValue(cmd.rpcMsgsOut), nullValue(cmd.rpcSizeIn), nullValue(cmd.rpcSizeOut), nullValue(cmd.rpcHimarkFwd), nullValue(cmd.rpcHimarkRev), nullValue(cmd.rpcSnd), nullValue(cmd.rpcRcv), nullValue(cmd.uCpu), nullValue(cmd.sCpu), nullValue(cmd.diskIn), nullValue(cmd.diskOut), nullValue(cmd.ipcIn), nullValue(cmd.ipcOut), nullValue(cmd.maxRss), nullValue(cmd.pageFaults), nullValue(cmd.filesServer), nullValue(cmd.MBServer), nullValue(cmd.filesCache), nullValue(cmd.MBCache)) self.output(query) if self.outputCsvRequired(): text = '%d,%d,"%s",%s,%d,%d,%d,%d' % \ (cmd.lineNumber, cmd.pid, cmd.endTime, cmd.completedLapse, cmd.filesServer, cmd.MBServer, cmd.filesCache, cmd.MBCache) self.outputCsv(text) if self.options.sql: try: self.countSyncs += 1 self.cursor.execute('INSERT INTO syncs VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', (cmd.lineNumber, cmd.pid, cmd.endTime, cmd.completedLapse, cmd.rpcMsgsIn, cmd.rpcMsgsOut, cmd.rpcSizeIn, cmd.rpcSizeOut, cmd.rpcHimarkFwd, cmd.rpcHimarkRev, cmd.rpcSnd, cmd.rpcRcv, cmd.uCpu, cmd.sCpu, cmd.diskIn, cmd.diskOut, cmd.ipcIn, cmd.ipcOut, cmd.maxRss, cmd.pageFaults, cmd.filesServer, cmd.filesCache, cmd.MBServer, cmd.MBCache)) except Exception as e: self.logger.warning("%s: %d, %d" % (str(e), cmd.lineNumber, cmd.pid)) self.db_commit_updates() def processSyncBlock(self, block): cmd = None i = 0 # First line of block is info line - process the rest for line in block.lines[1:]: i += 1 # Check for track lines and once we have found one, process them all and finish if cmd: match = RE_TRACK.match(line) if match: self.processTrackRecords(cmd, block.lines[i:]) break # Block has been processed # Pattern matching a completed line match = RE_COMPLETED.match(line) if not match: match = RE_COMPLETED2.match(line) if match: pid = int(match.group(2)) endTime = match.group(1).replace("/", "-") completedLapse = match.group(3) cmd = Command(block.lineNo, pid, endTime, completedLapse) def blankLine(self, line): if line == "" or line == "\n" or line == "\r\n": return True def blockEnd(self, line): "Blank line or one of terminators" terminators = ["Perforce proxy info:", "Perforce proxy error:" "locks acquired by blocking after", "Rpc himark:"] if self.blankLine(line): return True for t in terminators: if line[:len(t)] == t: return True return False def blockSync(self, line): t = "Perforce proxy info:" if line[:len(t)] == t: return True return False def reportProgress(self): if math.floor(((self.readBytes / self.ckpSize) * 100.0) / self.reportingInterval) == 1.0: self.logger.info("...%d%%" % (self.reportingInterval)) self.reportingInterval += self.options.fileInterval self.logger.debug("Inserts: %d" % (self.countSyncs)) def processLog(self): block = Block() for line in self.logfile: self.lineNo += 1 self.readBytes += len(line) self.reportProgress() line = line.rstrip() if self.blockEnd(line): if block.lines: if self.blockSync(block.lines[0]): self.processSyncBlock(block) else: if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("Unrecognised block: %d, %s" % (block.lineNo, block.lines[0])) block = Block() block.addLine(line, self.lineNo) else: block.addLine(line, self.lineNo) if block.lines: if self.blockSync(block.lines[0]): self.processSyncBlock(block) def processLogs(self): "Process all specified logs" for f in self.options.logfile: self.openLog(f) self.processLog() self.terminate() ###################### # START OF MAIN SCRIPT ###################### RE_COMPLETED = re.compile('^\t(\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d) pid (\d+) completed ([0-9]+|[0-9]+\.[0-9]+|\.[0-9]+)s.*') RE_COMPLETED2 = re.compile('^ (\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d) pid (\d+) completed ([0-9]+|[0-9]+\.[0-9]+|\.[0-9]+)s.*') RE_TRACK = re.compile('^---|^locks acquired by blocking after 3 non-blocking attempts') RE_TRACK_LAPSE = re.compile('^--- lapse (\d+)') RE_TRACK_LAPSE2 = re.compile('^--- lapse \.(\d+)') RE_TRACK_RPC = re.compile('^--- rpc msgs/size in\+out (\d+)\+(\d+)/(\d+)mb\+(\d+)mb himarks (\d+)/(\d+)') RE_TRACK_RPC2 = re.compile('^--- rpc msgs/size in\+out (\d+)\+(\d+)/(\d+)mb\+(\d+)mb himarks (\d+)/(\d+) snd/rcv ([0-9]+|[0-9]+\.[0-9]+|\.[0-9]+)s/([0-9]+|[0-9]+\.[0-9]+|\.[0-9]+)s') RE_TRACK_USAGE = re.compile('^--- usage (\d+)\+(\d+)us (\d+)\+(\d+)io (\d+)\+(\d+)net (\d+)k (\d+)pf') RE_TRACK_PROXY = re.compile('^--- proxy faults (\d+) MB (\d+) other (\d+) flushes (\d+) cached (\d+)') RE_TRACK_TOTALS = re.compile('^--- proxytotals files/size svr\+cache (\d+)\+(\d+)/(\d+)mb\+(\d+)mb') RE_NON_ASCII = re.compile(r'[^\x00-\x7F]') def main(): parser = argparse.ArgumentParser(add_help=True) parser.add_argument('logfile', nargs='+', help='log file(s) to process') parser.add_argument('-d', '--dbname', help="Database name to use", default=None) parser.add_argument('-r', '--reset', help="Remove database before starting", action='store_true', default=False) parser.add_argument('-o', '--output', help="Name of file to print SQL to (if not specified then no output)", default=None) parser.add_argument('--csv', help="Name of file to print CSV to (if not specified then no output)", default=None) parser.add_argument('-i', '--interval', help="Percentage reporting interval (1-99), default automatic", default=None) parser.add_argument('-v', '--verbosity', nargs='?', const="INFO", default=DEFAULT_VERBOSITY, choices=('DEBUG', 'INFO', 'WARNING', 'ERROR', 'FATAL'), help="Output verbosity level. Default is: " + DEFAULT_VERBOSITY) parser.add_argument('-L', '--outlog', default=DEFAULT_LOG_FILE, help="Default: " + DEFAULT_LOG_FILE) parser.add_argument('-n', '--no-sql', help="Don't use local SQLite database - otherwise will be created", action='store_true', default=False) try: options = parser.parse_args() except Exception as e: parser.print_help() sys.exit(1) if not options.output and not options.csv and options.no_sql: print("Please specify a csv file or an output file if you specify -n/--no-sql") parser.print_help() sys.exit(1) if options.interval is not None: validInterval = True try: interval = int(options.interval) except: validInterval = False if options.interval < 1 or options.interval > 99: validInterval = False if not validInterval: print("Please specify an interval between 1 and 99") parser.print_help() sys.exit(1) for f in options.logfile: if not os.path.exists(f): print("Specified logfile doesn't exist: '%s'" % f) sys.exit(0) parser = PxLog2sql(options) parser.processLogs() if __name__ == '__main__': main()