#!/usr/bin/env python3 # -*- encoding: UTF8 -*- """############################################################################## # # Copyright (c) 2008,2016 Perforce Software, Inc. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL PERFORCE SOFTWARE, INC. BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # # = Description # # Output SQL queries generated by parsing commands logged in a Perforce # server log file. # The output can be imported directly in any SQL database. # # log2sql populates the following tables: # # +----------------+----------+------+-----+ # | process | # +----------------+----------+------+-----+ # | Field | Type | Null | Key | # +----------------+----------+------+-----+ # | processKey | char(50) | NO | PRI | # | lineNumber | int | NO | PRI | # | startTime | date | NO | | # | endTime | date | YES | | # | computedLapse | float | YES | | # | completedLapse | float | YES | | # | pid | int | NO | | # | user | text | NO | | # | workspace | text | NO | | # | ip | text | NO | | # | app | text | NO | | # | cmd | text | NO | | # | args | text | YES | | # | uCpu | int | YES | | # | sCpu | int | YES | | # | diskIn | int | YES | | # | diskOut | int | YES | | # | ipcIn | int | YES | | # | ipcOut | int | YES | | # | maxRss | int | YES | | # | pageFaults | int | YES | | # | rpcMsgsIn | int | YES | | # | rpcMsgsOut | int | YES | | # | rpcSizeIn | int | YES | | # | rpcSizeOut | int | YES | | # | rpcHimarkFwd | int | YES | | # | rpcHimarkRev | int | YES | | # | rpcSnd | float | YES | | # | rpcRcv | float | YES | | # | running | int | NO | | # | error | text | YES | | # +----------------+----------+------+-----+ # # +----------------+--------------+------+-----+ # | tableUse | # +----------------+--------------+------+-----+ # | Field | Type | Null | Key | # +----------------+--------------+------+-----+ # | processKey | char(50) | NO | PRI | # | lineNumber | int | NO | PRI | # | tableName | varchar(255) | NO | PRI | # | pagesIn | int | YES | | # | pagesOut | int | YES | | # | pagesCached | int | YES | | # | readLocks | int | YES | | # | writeLocks | int | YES | | # | getRows | int | YES | | # | posRows | int | YES | | # | scanRows | int | YES | | # | putRows | int | YES | | # | delRows | int | YES | | # | totalReadWait | int | YES | | # | totalReadHeld | int | YES | | # | totalWriteWait | int | YES | | # | totalWriteHeld | int | YES | | # | maxReadWait | int | YES | | # | maxReadHeld | int | YES | | # | maxWriteWait | int | YES | | # | maxWriteHeld | int | YES | | # | peekCount | int | YES | | # | totalPeekWait | int | YES | | # | totalPeekHeld | int | YES | | # | maxPeekWait | int | YES | | # | maxPeekHeld | int | YES | | # | triggerLapse | float | YES | | # +----------------+--------------+------+-----+ # # = Usage # # See below # # = Requirements # # = Algorithm # # Parses the log into "blocks" of information which may be a start/middle/end # of a command. # # Perforce server info: # 2015/09/02 15:23:09 pid 1616 robert@robert-test 127.0.0.1 [p4/2016.1/NTX64/1396108] 'user-sync //...' # Perforce server info: # 2015/09/02 15:23:09 pid 1616 compute end .031s # Perforce server info: # 2015/09/02 15:23:09 pid 1616 completed .031s # Perforce server info: # 2015/09/02 15:23:09 pid 1616 robert@robert-test 127.0.0.1 [p4/2016.1/NTX64/1396108] 'user-sync //...' # --- lapse .875s # --- rpc msgs/size in+out 2+3/0mb+0mb himarks 523588/523588 snd/rcv .000s/.015s # --- db.user # --- pages in+out+cached 4+0+3 # --- locks read/write 1/0 rows get+pos+scan put+del 1+0+0 0+0 # # Note that a compute-end records occurs for certain commands such as sync, but not for many other # commands. # # Also there may be various "track" records written as shown above. They can be # matched to the original entry since they contain the same pid/datetime/cmd/args as the original # # Normally these track records are written immediately after a "completed" entry. # However, on a busy server, the track records can occur some time later in the log after # entries for other commands - makes life slightly more challenging as we have to wait before # outputting information for those until a little time has passed (currently 2 seconds). # # Other complexities include: # - Some commands never have a "completed" record # - If a pid is re-used for a different command (e.g. on the same connection by client) you # can consider the previous command as completed # - Sometimes a client can re-issue the same command with same parameters in the same second # (e.g. P4VS). Needs to be treated as seperate commands ideally. # # So we maintain a hash of commands indexed by pid for all currently "active" commands. When # a command is completed and has any track records added to it, it is written either to Sqlite # database (default) or output as SQL statement (or both). ############################################################################## """ from __future__ import print_function import re import sys import os import io import math import argparse import datetime import hashlib import sqlite3 import logging import time from collections import defaultdict from six import iteritems import zipfile import bz2 import gzip python3 = sys.version_info[0] >= 3 RowsPerTransaction = 50000 # Commit every so many rows if in SQL mode DEFAULT_VERBOSITY = 'INFO' DEFAULT_LOG_FILE = 'log-log2sql.log' # Output log file LOGGER_NAME = 'LOG2SQL' def escapeChar(str): str = str.replace("\\", "\\\\") str = str.replace("\"", "\\\"") return str def dateAdd(str, seconds): "Add specified seconds to date in string format" date = datetime.datetime.strptime(str, "%Y-%m-%d %H:%M:%S") date = date + datetime.timedelta(seconds=seconds) return date.strftime("%Y-%m-%d %H:%M:%S") def nullValue(str): "For SQL inserts" if str is None: return "NULL" return str def quotedNullValue(str): "For SQL inserts" if str is None: return "NULL" return '"%s"' % str epochCache = {} def getEpoch(str): "Handles conversion of date time strings to Unix epoch int seconds (since 1970) - using a cache for performance" try: return epochCache[str] except KeyError: dt = int(time.mktime(datetime.datetime.strptime(str, "%Y-%m-%d %H:%M:%S").timetuple())) epochCache[str] = dt return dt class CompressedFile(object): magic = None file_type = None proper_extension = None def __init__(self, filename): self.filename = filename @classmethod def is_magic(self, data): return data.startswith(self.magic) def fileSize(self): return os.stat(self.filename).st_size def open(self): return None class TextFile(CompressedFile): def fileSize(self): return os.stat(self.filename).st_size def open(self): return io.open(self.filename, "r", encoding="latin1", errors="backslashreplace") class ZIPFile(CompressedFile): magic = b'\x50\x4b\x03\x04' file_type = 'zip' def fileSize(self): return os.stat(self.filename).st_size * 20 # return zipfile.ZipFile.getinfo(self.filename).file_size def open(self): z = zipfile.ZipFile(self.filename, 'r') files = z.infolist() return io.TextIOWrapper(z.open(files[0], 'r')) class GZFile(CompressedFile): magic = b'\x1f\x8b\x08' file_type = 'gz' def fileSize(self): return os.stat(self.filename).st_size * 20 def open(self): if python3: return io.TextIOWrapper(gzip.open(self.filename, 'r')) else: gzip.GzipFile.read1 = gzip.GzipFile.read return io.TextIOWrapper(gzip.open(self.filename, 'r')) # factory function to create a suitable instance for accessing files def open_file(filename): with open(filename, 'rb') as f: start_of_file = f.read(1024) f.seek(0) for cls in (ZIPFile, GZFile): if cls.is_magic(start_of_file): return cls(filename) return TextFile(filename) class Command: processKey = None pid = 0 completed = False # Set when a completed record is found hasTrackInfo = False duplicateKey = False startTime = None endTime = None startTimeEpoch = 0 # Unix epoch seconds endTimeEpoch = 0 computedLapse = 0.0 completedLapse = 0.0 user = None workspace = None ip = None app = None name = None args = None uCpu = None sCpu = None diskIn = None diskOut = None ipcIn = None ipcOut = None maxRss = None pageFaults = None rpcMsgsIn = None rpcMsgsOut = None rpcSizeIn = None rpcSizeOut = None rpcHimarkFwd = None rpcHimarkRev = None rpcSnd = None rpcRcv = None running = 0 lineNumber = 0 error = None def __init__(self, processKey, lineNumber, pid, startTime, user, workspace, ip, app, name, args): self.processKey = processKey self.lineNumber = lineNumber self.pid = pid self.startTime = startTime self.startTimeEpoch = getEpoch(startTime) self.user = escapeChar(user) self.workspace = escapeChar(workspace) self.ip = ip self.app = app self.name = name if args: self.args = escapeChar(args) else: self.args = None self.endTime = self.computedLapse = self.completedLapse = self.uCpu = None self.sCpu = self.diskIn = self.diskOut = self.ipcIn = self.ipcOut = self.maxRss = None self.pageFaults = self.rpcMsgsIn = self.rpcMsgsOut = self.rpcSizeOut = None self.rpcSizeIn = self.rpcHimarkFwd = self.rpcHimarkRev = self.error = None self.rpcSnd = self.rpcRcv = None self.running = 0 self.tables = {} # Some special vtrack tables that can be updated multiple times def getKey(self): if self.duplicateKey: return "%s.%d" % (self.processKey, self.lineNumber) return self.processKey def setEndTime(self, endTime): self.endTime = endTime self.endTimeEpoch = getEpoch(endTime) def updateFrom(self, other): "Update any potentiall null fields - see __init__" for f in ["endTime", "computedLapse", "completedLapse", "uCpu", "sCpu", "diskIn", "diskOut", "ipcIn", "ipcOut", "maxRss", "pageFaults", "rpcMsgsIn", "rpcMsgsOut", "rpcSizeOut", "rpcSizeIn", "rpcHimarkFwd", "rpcHimarkRev", "error", "rpcSnd", "rpcRcv"]: if getattr(other, f) and not getattr(self, f): setattr(self, f, getattr(other, f)) def setUsage(self, uCpu, sCpu, diskIn, diskOut, ipcIn, ipcOut, maxRss, pageFaults): self.uCpu = uCpu self.sCpu = sCpu self.diskIn = diskIn self.diskOut = diskOut self.ipcIn = ipcIn self.ipcOut = ipcOut self.maxRss = maxRss self.pageFaults = pageFaults def setRpc(self, rpcMsgsIn, rpcMsgsOut, rpcSizeIn, rpcSizeOut, rpcHimarkFwd, rpcHimarkRev, rpcSnd=None, rpcRcv=None): self.rpcMsgsIn = rpcMsgsIn self.rpcMsgsOut = rpcMsgsOut self.rpcSizeIn = rpcSizeIn self.rpcSizeOut = rpcSizeOut self.rpcHimarkFwd = rpcHimarkFwd self.rpcHimarkRev = rpcHimarkRev self.rpcSnd = rpcSnd self.rpcRcv = rpcRcv class Table: processKey = None lineNumber = 0 tableName = None pagesIn = None pagesOut = None pagesCached = None readLocks = None writeLocks = None getRows = None posRows = None scanRows = None putRows = None delRow = None totalReadWait = None totalReadHeld = None totalWriteWait = None totalWriteHeld = None maxReadWait = None maxReadHeld = None maxWriteWait = None maxWriteHeld = None peekCount = None totalPeekWait = None totalPeekHeld = None maxPeekWait = None maxPeekHeld = None triggerLapse = None def __init__(self, cmd, tableName): self.cmd = cmd self.processKey = cmd.processKey self.lineNumber = cmd.lineNumber self.tableName = tableName self.pagesIn = self.pagesOut = self.pagesCached = None self.readLocks = self.writeLocks = self.getRows = self.posRows = self.scanRows = None self.putRows = self.delRow = self.totalReadWait = self.totalReadHeld = None self.totalWriteWait = self.totalWriteHeld = self.maxReadWait = None self.maxReadHeld = self.maxWriteWait = self.maxWriteHeld = None self.peekCount = None self.totalPeekWait = self.totalPeekHeld = self.maxPeekWait = self.maxPeekHeld = None self.triggerLapse = None def setPages(self, pagesIn, pagesOut, pagesCached): self.pagesIn = pagesIn self.pagesOut = pagesOut self.pagesCached = pagesCached def setLocksRows(self, readLocks, writeLocks, getRows, posRows, scanRows, putRows, delRow): self.readLocks = readLocks self.writeLocks = writeLocks self.getRows = getRows self.posRows = posRows self.scanRows = scanRows self.putRows = putRows self.delRow = delRow def setTotalLock(self, totalReadWait, totalReadHeld, totalWriteWait, totalWriteHeld): self.totalReadWait = totalReadWait self.totalReadHeld = totalReadHeld self.totalWriteWait = totalWriteWait self.totalWriteHeld = totalWriteHeld def setMaxLock(self, maxReadWait, maxReadHeld, maxWriteWait, maxWriteHeld): self.maxReadWait = maxReadWait self.maxReadHeld = maxReadHeld self.maxWriteWait = maxWriteWait self.maxWriteHeld = maxWriteHeld def setPeek(self, peekCount, totalPeekWait, totalPeekHeld, maxPeekWait, maxPeekHeld): self.peekCount = peekCount self.totalPeekWait = totalPeekWait self.totalPeekHeld = totalPeekHeld self.maxPeekWait = maxPeekWait self.maxPeekHeld = maxPeekHeld def setTriggerLapse(self, triggerLapse): self.triggerLapse = triggerLapse class Block: def __init__(self): self.lines = [] self.lineNo = 0 def addLine(self, line, lineNo): self.lines.append(line) # Only at start of block if not self.lineNo: self.lineNo = lineNo class Log2sql: logname = None dbname = None logfile = None ckpSize = None readBytes = None reportingInterval = None cmds = None lineNo = 0 countCmds = 0 countInserts = 0 currStartTime = None # String currStartTimeEpoch = None # Epochtime def __init__(self, options, instream=None, outstream=None, errstream=None): if not options.dbname: root, ext = os.path.splitext(options.logfile[0]) dname, fname = os.path.split(root) options.dbname = fname self.dbname = options.dbname self.dbfile = "%s.db" % options.dbname self.options = options self.options.sql = not self.options.no_sql self.instream = instream self.outstream = outstream # For testing self.errstream = errstream # For testing self.init_logger() self.reportingInterval = 10.0 if options.reset and os.path.exists(self.dbfile): self.logger.info("Cleaning database: %s" % self.dbfile) os.remove(self.dbfile) if outstream is None: if options.output: if options.output == "-": self.outstream = sys.stdout else: self.outstream = open(options.output, "w") else: self.outstream = None if self.options.sql: self.logger.info("Creating database: %s" % self.dbfile) self.conn = sqlite3.connect(self.dbfile) self.conn.text_factory = str self.cursor = self.conn.cursor() # Performance PRAGMAS - at some risk of security - but we can always be run again self.cursor.execute("PRAGMA synchronous = OFF") self.cursor.execute("PRAGMA journal_mode = OFF") self.cursor.execute("PRAGMA locking_mode = EXCLUSIVE") self.cmds = {} self.cmdsByEpoch = defaultdict(dict) # Indexed by EpochTime self.tables = {} # Track which pids have been seen for current time stamp (per second) self.currSecond = None self.pidsSeenThisSecond = {} # List of any new pids self.readBytes = 0.0 self.running = 0 self.rowCount = 0 self.db_create_database() self.db_create_process() self.db_create_tableUse() query = "BEGIN TRANSACTION;" self.output(query) if self.options.sql: try: self.conn.execute(query) except: pass def openLog(self, logname): if self.instream is None: logfile = open_file(logname) self.logfile = logfile.open() self.ckpSize = logfile.fileSize() self.logger.info("Processing log: %s" % logname) # Autoset reporting interval based on size if self.options.interval is None: self.options.fileInterval = 10 if self.ckpSize > 10 * 1000 * 1000: self.options.fileInterval = 5 elif self.ckpSize > 100 * 1000 * 1000: self.options.fileInterval = 2 elif self.ckpSize > 500 * 1000 * 1000: self.options.fileInterval = 1 self.reportingInterval = float(self.options.fileInterval) else: self.logfile = self.instream self.ckpSize = 500 def init_logger(self): self.logger = logging.getLogger(LOGGER_NAME) self.logger.setLevel(self.options.verbosity) formatter = logging.Formatter('%(asctime)s:%(levelname)s %(message)s') if self.errstream: ch = logging.StreamHandler(self.errstream) ch.setLevel(self.options.verbosity) else: ch = logging.StreamHandler(sys.stderr) ch.setLevel(logging.INFO) ch.setFormatter(formatter) self.logger.addHandler(ch) if self.options.verbosity != logging.INFO and self.options.outlog: fh = logging.FileHandler(self.options.outlog, mode='w') fh.setFormatter(formatter) self.logger.addHandler(fh) def outputRequired(self): return self.outstream is not None def output(self, text): if self.outstream: try: self.outstream.write("%s\n" % text) except UnicodeEncodeError: str = text.encode(encoding="latin1", errors="backslashreplace") self.outstream.write("%s\n" % str) def terminate(self): self.flush_output() self.db_commit(True) if self.options.sql: self.conn.commit() self.conn.close() def getLineNumber(self): return self.lineNo def db_commit_updates(self): self.rowCount += 1 if self.rowCount % RowsPerTransaction == 0: self.output("COMMIT;") if self.options.sql: self.conn.commit() query = "BEGIN TRANSACTION;" self.output(query) if self.options.sql: self.conn.execute(query) def db_commit(self, state): if (state): self.output("COMMIT;") if self.options.sql: self.conn.commit() query = "BEGIN TRANSACTION;" self.output(query) if self.options.sql: self.conn.execute(query) else: self.output("SET autocommit=0;") def db_create_database(self): self.output("CREATE DATABASE IF NOT EXISTS " + self.dbname + ";") self.output("USE " + self.dbname + ";") def db_create_process(self): self.output("DROP TABLE IF EXISTS process;") query = "CREATE TABLE process (processkey CHAR(50) NOT NULL, lineNumber INT NOT NULL, pid INT NOT NULL, " \ "startTime DATETIME NOT NULL,endTime DATETIME NULL, computedLapse FLOAT NULL,completedLapse FLOAT NULL, " \ "user TEXT NOT NULL, workspace TEXT NOT NULL, ip TEXT NOT NULL, app TEXT NOT NULL, cmd TEXT NOT NULL, " \ "args TEXT NULL, uCpu INT NULL, sCpu INT NULL, diskIn INT NULL, diskOut INT NULL, ipcIn INT NULL, " \ "ipcOut INT NULL, maxRss INT NULL, pageFaults INT NULL, rpcMsgsIn INT NULL, rpcMsgsOut INT NULL, " \ "rpcSizeIn INT NULL, rpcSizeOut INT NULL, rpcHimarkFwd INT NULL, rpcHimarkRev INT NULL, " \ "rpcSnd FLOAT NULL, rpcRcv FLOAT NULL, running INT NULL, " \ "error TEXT NULL, " \ "PRIMARY KEY (processkey, lineNumber));" self.output(query) if self.options.sql: try: self.cursor.execute(query) except: pass def db_create_tableUse(self): query = "DROP TABLE IF EXISTS tableUse;" self.output(query) query = "CREATE TABLE tableUse (processkey CHAR(50) NOT NULL, lineNumber INT NOT NULL, " \ "tableName VARCHAR(255) NOT NULL, pagesIn INT NULL, pagesOut INT NULL, pagesCached INT NULL, " \ "readLocks INT NULL, writeLocks INT NULL, getRows INT NULL, posRows INT NULL, scanRows INT NULL, " \ "putRows int NULL, delRows INT NULL, totalReadWait INT NULL, totalReadHeld INT NULL, " \ "totalWriteWait INT NULL, totalWriteHeld INT NULL, maxReadWait INT NULL, maxReadHeld INT NULL, " \ "maxWriteWait INT NULL, maxWriteHeld INT NULL, peekCount INT NULL, " \ "totalPeekWait INT NULL, totalPeekHeld INT NULL, maxPeekWait INT NULL, maxPeekHeld INT NULL, " \ "triggerLapse FLOAT NULL, " \ "PRIMARY KEY (processkey, lineNumber, tableName));" self.output(query) if self.options.sql: try: self.cursor.execute(query) except: pass def addCommand(self, cmd, hasTrackInfo): cmd.running = self.running self.countCmds += 1 if not self.currStartTime or (self.currStartTimeEpoch != cmd.startTimeEpoch and cmd.startTimeEpoch > self.currStartTimeEpoch): self.currStartTime = cmd.startTime self.currStartTimeEpoch = cmd.startTimeEpoch self.pidsSeenThisSecond = {} # If we are re-using pid then process previous command # Or process 'rmt-FileFetch/rmt-Journal/pull' because they don't have 'completed' records if cmd.pid in self.cmds: if cmd.processKey != self.cmds[cmd.pid].processKey: self.sql_process_insert(self.cmds[cmd.pid]) self.cmds[cmd.pid] = cmd # replace elif cmd.name in ['rmt-FileFetch', 'rmt-Journal', 'pull']: if hasTrackInfo: self.cmds[cmd.pid].updateFrom(cmd) else: self.sql_process_insert(self.cmds[cmd.pid]) cmd.duplicateKey = True self.cmds[cmd.pid] = cmd # replace else: self.cmds[cmd.pid].updateFrom(cmd) if hasTrackInfo: self.cmds[cmd.pid].hasTrackInfo = True else: self.cmds[cmd.pid] = cmd if cmd.pid in self.pidsSeenThisSecond: cmd.duplicateKey = True self.pidsSeenThisSecond[cmd.pid] = 1 self.running += 1 self.processCompletedCommands() def processCompletedCommands(self): """Remove any commands with completed state - should only be called by addCommand when we are adding a new command - at that point we know there are no new records to come for any previous ones as even track output will have been processed. In order to cope with track commands coming a little later then completed commands, we only process commands with a starttime of 3 or more seconds earlier than the current one.""" for pid in list(self.cmds.keys()): cmd = self.cmds[pid] completed = False if cmd.completed and (cmd.hasTrackInfo or self.currStartTimeEpoch >= (cmd.endTimeEpoch + 3)): completed = True if not completed and (cmd.hasTrackInfo and self.currStartTimeEpoch >= (cmd.endTimeEpoch + 3)): completed = True if completed: self.sql_process_insert(cmd) del self.cmds[pid] self.running -= 1 def updateComputedTime(self, pid, computedLapse): if pid in self.cmds: # sum all the compute values of a same command if self.cmds[pid].computedLapse is None: sum = 0.0 else: sum = float(self.cmds[pid].computedLapse) sum += float(computedLapse) self.cmds[pid].computedLapse = str(sum) def updateCompletionTime(self, pid, endTime, completedLapse): if pid in self.cmds: self.cmds[pid].completed = True self.cmds[pid].setEndTime(endTime) self.cmds[pid].completedLapse = str(float(completedLapse)) def processTrackRecords(self, cmd, lines): self.tables = {} tableName = None # Use line number from original cmd if appropriate if cmd.pid in self.cmds and cmd.processKey == self.cmds[cmd.pid].processKey: cmd.lineNumber = self.cmds[cmd.pid].lineNumber tablesTracked = [] for line in lines: if not RE_TRACK.match(line): break gotMatch = False match = RE_TRACK_LAPSE.match(line) if match: gotMatch = True cmd.completedLapse = match.group(1) if not gotMatch: match = RE_TRACK_LAPSE2.match(line) if match: gotMatch = True cmd.completedLapse = "0." + match.group(1) if not gotMatch: match = RE_FAILED_AUTH.match(line) if match: gotMatch = True cmd.error = "\"" + match.group(1) + "\"" if not gotMatch: match = RE_KILLED_BY.match(line) if match: gotMatch = True cmd.error = "\"" + match.group(1) + "\"" if not gotMatch: match = RE_EXITED.match(line) if match: gotMatch = True cmd.error = "\"" + match.group(1) + "\"" if not gotMatch: match = RE_TRACK_USAGE.match(line) if match: gotMatch = True cmd.setUsage(match.group(1), match.group(2), match.group(3), match.group(4), match.group(5), match.group(6), match.group(7), match.group(8)) if not gotMatch: match = RE_TRACK_RPC2.match(line) if match: gotMatch = True cmd.setRpc(match.group(1), match.group(2), match.group(3), match.group(4), match.group(5), match.group(6), match.group(7), match.group(8)) if not gotMatch: match = RE_TRACK_RPC.match(line) if match: gotMatch = True cmd.setRpc(match.group(1), match.group(2), match.group(3), match.group(4), match.group(5), match.group(6)) if not gotMatch: match = RE_TRACK_TABLE.match(line) if not match: match = RE_TRACK_TABLE_RDB.match(line) if match: gotMatch = True tableName = match.group(1) self.tables[tableName] = Table(cmd, tableName) tablesTracked.append(tableName) if not gotMatch: match = RE_TRACK_CLIENT_LOCK.match(line) if match: gotMatch = True tableName = "clients_%s" % match.group(2) # clients/(W) self.tables[tableName] = Table(cmd, tableName) tablesTracked.append(tableName) if not gotMatch: match = RE_TRACK_CHANGE_LOCK.match(line) if match: gotMatch = True tableName = "changes_%s" % match.group(2) # changes/(W) self.tables[tableName] = Table(cmd, tableName) tablesTracked.append(tableName) if not gotMatch: match = RE_TRACK_META_LOCK.match(line) if match: gotMatch = True tableName = "meta_%s_%s" % (match.group(1), match.group(2)) # meta/db(W) self.tables[tableName] = Table(cmd, tableName) tablesTracked.append(tableName) if not gotMatch: match = RE_TRACK_REPLICA_LOCK.match(line) if match: gotMatch = True tableName = "replica_%s_%s" % (match.group(1), match.group(2)) # replica/pull(W) self.tables[tableName] = Table(cmd, tableName) tablesTracked.append(tableName) if not gotMatch: match = RE_TRACK_PAGES.match(line) if match: gotMatch = True if tableName in self.tables: self.tables[tableName].setPages(match.group(1), match.group(2), match.group(3)) if not gotMatch: match = RE_TRACK_LOCKS_ROWS.match(line) if match: gotMatch = True if tableName in self.tables: self.tables[tableName].setLocksRows(match.group(1), match.group(2), match.group(3), match.group(4), match.group(5), match.group(6), match.group(7)) if not gotMatch: match = RE_TRACK_TOTAL_LOCK.match(line) if match: gotMatch = True if tableName in self.tables: self.tables[tableName].setTotalLock(match.group(1), match.group(2), match.group(3), match.group(4)) # The max is only reported if there is more than one # lock taken on the table if (self.tables[tableName].readLocks is None or int(self.tables[tableName].readLocks) < 2) and \ (self.tables[tableName].writeLocks is None or int(self.tables[tableName].writeLocks) < 2): self.tables[tableName].setMaxLock(match.group(1), match.group(2), match.group(3), match.group(4)) if not gotMatch: match = RE_TRACK_MAX_LOCK.match(line) if match: gotMatch = True if tableName in self.tables: self.tables[tableName].setMaxLock(match.group(1), match.group(2), match.group(3), match.group(4)) if not gotMatch: match = RE_TRACK_PEEK.match(line) if match: gotMatch = True if tableName in self.tables: self.tables[tableName].setPeek(match.group(1), match.group(2), match.group(3), match.group(4), match.group(5)) if not gotMatch: self.logger.debug("Unrecognised track: %d, %s" % (cmd.lineNumber, line[:-1])) if cmd.completedLapse is not None: cmd.setEndTime(dateAdd(cmd.startTime, float(cmd.completedLapse))) else: cmd.setEndTime(cmd.startTime) # Don't set tracked info if is one of the special commands which can occur multiple times and # which don't indicate the completion of the command hasTrackInfo = False for t in tablesTracked: if not t.startswith("meta_") and not t.startswith("changes_") and not t.startswith("clients_"): hasTrackInfo = True self.addCommand(cmd, hasTrackInfo=hasTrackInfo) if hasTrackInfo: self.cmd_tables_insert(cmd, self.tables) else: # Save special tables for processing when cmd is completed for t in self.tables.keys(): self.cmds[cmd.pid].tables[t] = self.tables[t] def processTriggerLapse(self, cmd, trigger, line): "Expects a single line with a lapse statement on it" tablesTracked = [] gotMatch = False match = RE_TRIGGER_LAPSE.match(line) if match: gotMatch = True triggerLapse = float(match.group(1)) if not gotMatch: match = RE_TRIGGER_LAPSE2.match(line) if match: gotMatch = True triggerLapse = float("0." + match.group(1)) if not gotMatch: return tableName = "trigger_%s" % trigger tables = {} tables[tableName] = Table(cmd, tableName) tables[tableName].setTriggerLapse(triggerLapse) self.cmd_tables_insert(cmd, tables) def cmd_tables_insert(self, cmd, tables): try: self.sql_tableUse_insert_items(tables) except Exception as e: self.logger.warning("%s, pid %d, lineNo %d, cmd lineNo %d, %s, %s, %s" % ( str(e), cmd.pid, self.lineNo, cmd.lineNumber, cmd.processKey, cmd.name, cmd.args)) def flush_output(self): if self.logger.isEnabledFor(logging.DEBUG): cmds = [] self.logger.debug("outstanding commands:") for p, v in iteritems(self.cmds): self.sql_process_insert(v) if self.logger.isEnabledFor(logging.DEBUG): cmds.append("lineNo %d, pid %s, cmd %s" % (v.lineNumber, v.pid, v.name)) if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("\n\t".join(cmds)) self.logger.info("") def sql_process_insert(self, cmd): self.cmd_tables_insert(cmd, cmd.tables) # Cmd may have some special tables processKey = cmd.getKey() if self.outputRequired(): query = 'INSERT IGNORE INTO process VALUES ("%s",%d,%d,"%s",%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%d,%s);' % \ (processKey, cmd.lineNumber, cmd.pid, cmd.startTime, quotedNullValue(cmd.endTime), nullValue(cmd.computedLapse), nullValue(cmd.completedLapse), quotedNullValue(cmd.user), quotedNullValue(cmd.workspace), quotedNullValue(cmd.ip), quotedNullValue(cmd.app), quotedNullValue(cmd.name), quotedNullValue(cmd.args), nullValue(cmd.rpcMsgsIn), nullValue(cmd.rpcMsgsOut), nullValue(cmd.rpcSizeIn), nullValue(cmd.rpcSizeOut), nullValue(cmd.rpcHimarkFwd), nullValue(cmd.rpcHimarkRev), nullValue(cmd.rpcSnd), nullValue(cmd.rpcRcv), nullValue(cmd.uCpu), nullValue(cmd.sCpu), nullValue(cmd.diskIn), nullValue(cmd.diskOut), nullValue(cmd.ipcIn), nullValue(cmd.ipcOut), nullValue(cmd.maxRss), nullValue(cmd.pageFaults), nullValue(cmd.running), nullValue(cmd.error)) self.output(query) if self.options.sql: try: self.countInserts += 1 self.cursor.execute('INSERT INTO process VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', (processKey, cmd.lineNumber, cmd.pid, cmd.startTime, cmd.endTime, cmd.computedLapse, cmd.completedLapse, cmd.user, cmd.workspace, cmd.ip, cmd.app, cmd.name, cmd.args, cmd.rpcMsgsIn, cmd.rpcMsgsOut, cmd.rpcSizeIn, cmd.rpcSizeOut, cmd.rpcHimarkFwd, cmd.rpcHimarkRev, cmd.rpcSnd, cmd.rpcRcv, cmd.uCpu, cmd.sCpu, cmd.diskIn, cmd.diskOut, cmd.ipcIn, cmd.ipcOut, cmd.maxRss, cmd.pageFaults, cmd.running, cmd.error)) except Exception as e: self.logger.warning("%s: %s, %d, %d, %s" % (str(e), cmd.processKey, cmd.lineNumber, cmd.pid, cmd.name)) self.db_commit_updates() def outputRow(self, tab): processKey = self.cmds[tab.cmd.pid].getKey() # Look up in case of dupliates query = 'INSERT IGNORE INTO tableUse VALUES ("%s",%d,"%s",%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'\ '%s,%s,%s,%s,%s,%s,%s,%s);' % \ (processKey, tab.lineNumber, tab.tableName, nullValue(tab.pagesIn), nullValue(tab.pagesOut), nullValue(tab.pagesCached), nullValue(tab.readLocks), nullValue(tab.writeLocks), nullValue(tab.getRows), nullValue(tab.posRows), nullValue(tab.scanRows), nullValue(tab.putRows), nullValue(tab.delRow), nullValue(tab.totalReadWait), nullValue(tab.totalReadHeld), nullValue(tab.totalWriteWait), nullValue(tab.totalWriteHeld), nullValue(tab.maxReadWait), nullValue(tab.maxReadHeld), nullValue(tab.maxWriteWait), nullValue(tab.maxWriteHeld), nullValue(tab.peekCount), nullValue(tab.totalPeekWait), nullValue(tab.totalPeekHeld), nullValue(tab.maxPeekWait), nullValue(tab.maxPeekHeld), nullValue(tab.triggerLapse)) self.output(query) def sql_tableUse_insert_items(self, tables): if self.outputRequired(): for k, v in iteritems(tables): self.outputRow(v) if self.options.sql: for k, tab in iteritems(tables): self.countInserts += 1 processKey = self.cmds[tab.cmd.pid].getKey() # Look up in case of duplicates try: self.cursor.execute('INSERT INTO tableUse VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', (processKey, tab.lineNumber, tab.tableName, tab.pagesIn, tab.pagesOut, tab.pagesCached, tab.readLocks, tab.writeLocks, tab.getRows, tab.posRows, tab.scanRows, tab.putRows, tab.delRow, tab.totalReadWait, tab.totalReadHeld, tab.totalWriteWait, tab.totalWriteHeld, tab.maxReadWait, tab.maxReadHeld, tab.maxWriteWait, tab.maxWriteHeld, tab.peekCount, tab.totalPeekWait, tab.totalPeekHeld, tab.maxPeekWait, tab.maxPeekHeld, tab.triggerLapse)) except Exception as e: self.logger.warning("%s: %s, %d, %s" % (str(e), processKey, tab.lineNumber, tab.tableName)) self.db_commit_updates() def processInfoBlock(self, block): cmd = None i = 0 # First line of block is info line - process the rest for line in block.lines[1:]: i += 1 # Check for track lines and once we have found one, process them all and finish if cmd: match = RE_TRACK.match(line) if match: self.processTrackRecords(cmd, block.lines[i:]) break # Block has been processed match = RE_CMD.match(line) if not match: match = RE_CMD_NOARG.match(line) if match: pid = int(match.group(2)) startTime = match.group(1).replace("/", "-") user = match.group(3) workspace = match.group(4) ip = match.group(5) # following gsub required due to a 2009.2 P4V bug app = match.group(6).replace("\x00", "/") name = match.group(7) cmdArgs = None if len(match.groups()) == 8: cmdArgs = match.group(8) # Strip Swarm/Git Fusion commands with lots of json smatch = re.search(RE_JSON_CMDARGS, cmdArgs) if smatch: cmdArgs = smatch.group(1) # Search for trailing trigger entries trigger = None tmatch = RE_CMD_TRIGGER.search(line) if tmatch: trigger = tmatch.group(1).rstrip() line = line.replace(" trigger %s" % trigger, "") m = hashlib.md5() line = re.sub(RE_NON_ASCII, '?', line) # Avoid encoding errors if python3: m.update(line.encode()) else: m.update(line) processKey = m.hexdigest() cmd = Command(processKey, block.lineNo, pid, startTime, user, workspace, ip, app, name, cmdArgs) self.addCommand(cmd, False) if trigger: self.processTriggerLapse(cmd, trigger, block.lines[-1]) continue # Pattern matching a completed line match = RE_COMPLETED.match(line) if match: pid = int(match.group(2)) endTime = match.group(1).replace("/", "-") completedLapse = match.group(3) self.updateCompletionTime(pid, endTime, completedLapse) continue # Pattern matching a compute line match = RE_COMPUTE.match(line) if match: pid = int(match.group(2)) computedLapse = float(match.group(3)) self.updateComputedTime(pid, computedLapse) continue def blankLine(self, line): if line == "" or line == "\n" or line == "\r\n": return True def blockEnd(self, line): "Blank line or one of terminators" terminators = ["Perforce server info:", "Perforce server error:" "locks acquired by blocking after", "Rpc himark:"] if self.blankLine(line): return True for t in terminators: if line[:len(t)] == t: return True return False def blockInfo(self, line): t = "Perforce server info:" if line[:len(t)] == t: return True return False def reportProgress(self): if math.floor(((self.readBytes / self.ckpSize) * 100.0) / self.reportingInterval) == 1.0: self.logger.info("...%d%%" % (self.reportingInterval)) self.reportingInterval += self.options.fileInterval self.logger.debug("Runnning: %d, cmds curr/processed: %d/%d" % (self.running, len(self.cmds), self.countCmds)) self.logger.debug("Inserts: %d" % (self.countInserts)) def processLog(self): block = Block() for line in self.logfile: self.lineNo += 1 self.readBytes += len(line) self.reportProgress() line = line.rstrip() if self.blockEnd(line): if block.lines: if self.blockInfo(block.lines[0]): self.processInfoBlock(block) elif self.blankLine(block.lines[0]): self.processCompletedCommands() else: if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("Unrecognised block: %d, %s" % (block.lineNo, block.lines[0])) block = Block() block.addLine(line, self.lineNo) else: block.addLine(line, self.lineNo) if block.lines: if self.blockInfo(block.lines[0]): self.processInfoBlock(block) def processLogs(self): "Process all specified logs" for f in self.options.logfile: self.openLog(f) self.processLog() self.terminate() ###################### # START OF MAIN SCRIPT ###################### RE_JSON_CMDARGS = re.compile('^(.*) \{.*\}$') RE_CMD = re.compile('^\t(\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d) pid (\d+) ([^ @]*)@([^ ]*) ([^ ]*) \[([^\]]*)\] \'([\w-]+) (.*)\'.*') RE_CMD_NOARG = re.compile('^\t(\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d) pid (\d+) ([^ @]*)@([^ ]*) ([^ ]*) \[([^\]]*)\] \'([\w-]+)\'.*') RE_CMD_TRIGGER = re.compile(' trigger ([^ ]+)$') RE_COMPUTE = re.compile('^\t(\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d) pid (\d+) compute end ([0-9]+|[0-9]+\.[0-9]+|\.[0-9]+)s.*') RE_COMPLETED = re.compile('^\t(\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d) pid (\d+) completed ([0-9]+|[0-9]+\.[0-9]+|\.[0-9]+)s.*') RE_TRACK = re.compile('^---|^locks acquired by blocking after 3 non-blocking attempts') RE_TRACK_LAPSE = re.compile('^--- lapse (\d+)') RE_TRACK_LAPSE2 = re.compile('^--- lapse \.(\d+)') RE_TRIGGER_LAPSE = re.compile('^lapse (\d+)s') RE_TRIGGER_LAPSE2 = re.compile('^lapse \.(\d+)s') RE_TRACK_RPC = re.compile('^--- rpc msgs/size in\+out (\d+)\+(\d+)/(\d+)mb\+(\d+)mb himarks (\d+)/(\d+)') RE_TRACK_RPC2 = re.compile('^--- rpc msgs/size in\+out (\d+)\+(\d+)/(\d+)mb\+(\d+)mb himarks (\d+)/(\d+) snd/rcv ([0-9]+|[0-9]+\.[0-9]+|\.[0-9]+)s/([0-9]+|[0-9]+\.[0-9]+|\.[0-9]+)s') RE_TRACK_USAGE = re.compile('^--- usage (\d+)\+(\d+)us (\d+)\+(\d+)io (\d+)\+(\d+)net (\d+)k (\d+)pf') RE_FAILED_AUTH = re.compile('^--- (failed authentication check)') RE_KILLED_BY = re.compile('^--- (killed by .*)') RE_EXITED = re.compile('^--- (exited on fatal server error)') RE_TRACK_TABLE = re.compile('^--- db.([a-zA-Z]*)') RE_TRACK_TABLE_RDB = re.compile('^--- (rdb\.[a-zA-Z]*)') RE_TRACK_CLIENT_LOCK = re.compile('^--- clients/([^\(]+)\(([RW])\)') RE_TRACK_CHANGE_LOCK = re.compile('^--- change/([^\(]+)\(([RW])\)') RE_TRACK_META_LOCK = re.compile('^--- meta/([^\(]+)\(([RW])\)') RE_TRACK_REPLICA_LOCK = re.compile('^--- replica/([^\(]+)\(([RW])\)') RE_TRACK_PAGES = re.compile('^--- pages in\+out\+cached (\d+)\+(\d+)\+(\d+)') RE_TRACK_LOCKS_ROWS = re.compile( '^--- locks read/write (\d+)/(\d+) rows get\+pos\+scan put\+del (\d+)\+(\d+)\+(\d+) (\d+)\+(\d+)') RE_TRACK_TOTAL_LOCK = re.compile('^--- total lock wait\+held read/write (\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms') RE_TRACK_PEEK = re.compile('^--- peek count (\d+) wait\+held total/max (\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms') RE_TRACK_MAX_LOCK = re.compile( '^--- max lock wait\+held read/write (\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms|--- locks wait+held read/write (\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms') RE_NON_ASCII = re.compile(r'[^\x00-\x7F]') def main(): parser = argparse.ArgumentParser(add_help=True) parser.add_argument('logfile', nargs='+', help='log file(s) to process') parser.add_argument('-d', '--dbname', help="Database name to use", default=None) parser.add_argument('-r', '--reset', help="Remove database before starting", action='store_true', default=False) parser.add_argument('-o', '--output', help="Name of file to print SQL to (if not specified then no output)", default=None) parser.add_argument('-i', '--interval', help="Percentage reporting interval (1-99), default automatic", default=None) parser.add_argument('-v', '--verbosity', nargs='?', const="INFO", default=DEFAULT_VERBOSITY, choices=('DEBUG', 'INFO', 'WARNING', 'ERROR', 'FATAL'), help="Output verbosity level. Default is: " + DEFAULT_VERBOSITY) parser.add_argument('-L', '--outlog', default=DEFAULT_LOG_FILE, help="Default: " + DEFAULT_LOG_FILE) parser.add_argument('-n', '--no-sql', help="Don't use local SQLite database - otherwise will be created", action='store_true', default=False) try: options = parser.parse_args() except Exception as e: parser.print_help() sys.exit(1) if not options.output and options.no_sql: print("Please specify an output file if you specify -n/--no-sql") parser.print_help() sys.exit(1) if options.interval is not None: validInterval = True try: interval = int(options.interval) except: validInterval = False if options.interval < 1 or options.interval > 99: validInterval = False if not validInterval: print("Please specify an interval between 1 and 99") parser.print_help() sys.exit(1) for f in options.logfile: if not os.path.exists(f): print("Specified logfile doesn't exist: '%s'" % f) sys.exit(0) log2sql = Log2sql(options) log2sql.processLogs() if __name__ == '__main__': main()