#!/usr/bin/env python3
# -*- encoding: UTF8 -*-
"""##############################################################################
#
# Copyright (c) 2008,2016 Perforce Software, Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL PERFORCE SOFTWARE, INC. BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# = Description
#
# Output SQL queries generated by parsing commands logged in a Perforce
# server log file.
# The output can be imported directly in any SQL database.
#
# log2sql populates the following tables:
#
# +----------------+----------+------+-----+
# | process |
# +----------------+----------+------+-----+
# | Field | Type | Null | Key |
# +----------------+----------+------+-----+
# | processKey | char(32) | NO | PRI |
# | lineNumber | int | NO | PRI |
# | startTime | date | NO | |
# | endTime | date | YES | |
# | computedLapse | float | YES | |
# | completedLapse | float | YES | |
# | pid | int | NO | |
# | user | text | NO | |
# | workspace | text | NO | |
# | ip | text | NO | |
# | app | text | NO | |
# | cmd | text | NO | |
# | args | text | YES | |
# | uCpu | int | YES | |
# | sCpu | int | YES | |
# | diskIn | int | YES | |
# | diskOut | int | YES | |
# | ipcIn | int | YES | |
# | ipcOut | int | YES | |
# | maxRss | int | YES | |
# | pageFaults | int | YES | |
# | rpcMsgsIn | int | YES | |
# | rpcMsgsOut | int | YES | |
# | rpcSizeIn | int | YES | |
# | rpcSizeOut | int | YES | |
# | rpcHimarkFwd | int | YES | |
# | rpcHimarkRev | int | YES | |
# | rpcSnd | float | YES | |
# | rpcRcv | float | YES | |
# | running | int | NO | |
# | error | text | YES | |
# +----------------+----------+------+-----+
#
# +----------------+--------------+------+-----+
# | tableUse |
# +----------------+--------------+------+-----+
# | Field | Type | Null | Key |
# +----------------+--------------+------+-----+
# | processKey | char(32) | NO | PRI |
# | lineNumber | int | NO | PRI |
# | tableName | varchar(255) | NO | PRI |
# | pagesIn | int | YES | |
# | pagesOut | int | YES | |
# | pagesCached | int | YES | |
# | readLocks | int | YES | |
# | writeLocks | int | YES | |
# | getRows | int | YES | |
# | posRows | int | YES | |
# | scanRows | int | YES | |
# | putRows | int | YES | |
# | delRows | int | YES | |
# | totalReadWait | int | YES | |
# | totalReadHeld | int | YES | |
# | totalWriteWait | int | YES | |
# | totalWriteHeld | int | YES | |
# | maxReadWait | int | YES | |
# | maxReadHeld | int | YES | |
# | maxWriteWait | int | YES | |
# | maxWriteHeld | int | YES | |
# | peekCount | int | YES | |
# | totalPeekWait | int | YES | |
# | totalPeekHeld | int | YES | |
# | maxPeekWait | int | YES | |
# | maxPeekHeld | int | YES | |
# +----------------+--------------+------+-----+
#
# = Usage
#
# See below
#
# = Requirements
#
##############################################################################
"""
from __future__ import print_function
import re
import sys
import os
import io
import math
import argparse
import datetime
import hashlib
import sqlite3
import logging
import time
from collections import defaultdict
python3 = sys.version_info[0] >= 3
RowsPerTransaction = 50000 # Commit every so many rows if in SQL mode
DEFAULT_VERBOSITY = 'INFO'
DEFAULT_LOG_FILE = 'log-log2sql.log' # Output log file
LOGGER_NAME = 'LOG2SQL'
def escapeChar(str):
str = str.replace("\\", "\\\\")
str = str.replace("\"", "\\\"")
return str
def dateAdd(str, seconds):
"Add specified seconds to date in string format"
date = datetime.datetime.strptime(str, "%Y-%m-%d %H:%M:%S")
date = date + datetime.timedelta(seconds=seconds)
return date.strftime("%Y-%m-%d %H:%M:%S")
def nullValue(str):
"For SQL inserts"
if str is None:
return "NULL"
return str
def quotedNullValue(str):
"For SQL inserts"
if str is None:
return "NULL"
return '"%s"' % str
epochCache = {}
def getEpoch(str):
"Handles conversion of date time strings to epochs"
try:
return epochCache[str]
except KeyError:
dt = int(time.mktime(datetime.datetime.strptime(str, "%Y-%m-%d %H:%M:%S").timetuple()))
epochCache[str] = dt
return dt
class Command:
processKey = None
pid = 0
completed = False # Set when a completed record is found
hasTrackInfo = False
duplicateKey = False
startTime = None
endTime = None
startTimeEpoch = 0 # Unix epoch seconds
endTimeEpoch = 0
computedLapse = 0.0
completedLapse = 0.0
user = None
workspace = None
ip = None
app = None
name = None
args = None
uCpu = None
sCpu = None
diskIn = None
diskOut = None
ipcIn = None
ipcOut = None
maxRss = None
pageFaults = None
rpcMsgsIn = None
rpcMsgsOut = None
rpcSizeIn = None
rpcSizeOut = None
rpcHimarkFwd = None
rpcHimarkRev = None
rpcSnd = None
rpcRcv = None
running = 0
lineNumber = 0
error = None
def __init__(self, processKey, lineNumber, pid, startTime,
user, workspace, ip, app, name, args):
self.processKey = processKey
self.lineNumber = lineNumber
self.pid = pid
self.startTime = startTime
self.startTimeEpoch = getEpoch(startTime)
self.user = escapeChar(user)
self.workspace = escapeChar(workspace)
self.ip = ip
self.app = app
self.name = name
if args:
self.args = escapeChar(args)
else:
self.args = None
self.endTime = self.computedLapse = self.completedLapse = self.uCpu = None
self.sCpu = self.diskIn = self.diskOut = self.ipcIn = self.ipcOut = self.maxRss = None
self.pageFaults = self.rpcMsgsIn = self.rpcMsgsOut = self.rpcSizeOut = None
self.rpcSizeIn = self.rpcHimarkFwd = self.rpcHimarkRev = self.error = None
self.rpcSnd = self.rpcRcv = None
self.running = 0
self.tables = {} # Some special vtrack tables that can be updated multiple times
def getKey(self):
if self.duplicateKey:
return "%s.%d" % (self.processKey, self.lineNumber)
return self.processKey
def setEndTime(self, endTime):
self.endTime = endTime
self.endTimeEpoch = getEpoch(endTime)
def updateFrom(self, other):
"Update any potentiall null fields - see __init__"
for f in ["endTime", "computedLapse", "completedLapse", "uCpu",
"sCpu", "diskIn", "diskOut", "ipcIn", "ipcOut", "maxRss",
"pageFaults", "rpcMsgsIn", "rpcMsgsOut", "rpcSizeOut",
"rpcSizeIn", "rpcHimarkFwd", "rpcHimarkRev", "error",
"rpcSnd", "rpcRcv"]:
if getattr(other, f) and not getattr(self, f):
setattr(self, f, getattr(other, f))
def setUsage(self, uCpu, sCpu, diskIn, diskOut, ipcIn, ipcOut, maxRss, pageFaults):
self.uCpu = uCpu
self.sCpu = sCpu
self.diskIn = diskIn
self.diskOut = diskOut
self.ipcIn = ipcIn
self.ipcOut = ipcOut
self.maxRss = maxRss
self.pageFaults = pageFaults
def setRpc(self, rpcMsgsIn, rpcMsgsOut, rpcSizeIn,
rpcSizeOut, rpcHimarkFwd, rpcHimarkRev, rpcSnd=None, rpcRcv=None):
self.rpcMsgsIn = rpcMsgsIn
self.rpcMsgsOut = rpcMsgsOut
self.rpcSizeIn = rpcSizeIn
self.rpcSizeOut = rpcSizeOut
self.rpcHimarkFwd = rpcHimarkFwd
self.rpcHimarkRev = rpcHimarkRev
self.rpcSnd = rpcSnd
self.rpcRcv = rpcRcv
class Table:
processKey = None
lineNumber = 0
tableName = None
pagesIn = None
pagesOut = None
pagesCached = None
readLocks = None
writeLocks = None
getRows = None
posRows = None
scanRows = None
putRows = None
delRow = None
totalReadWait = None
totalReadHeld = None
totalWriteWait = None
totalWriteHeld = None
maxReadWait = None
maxReadHeld = None
maxWriteWait = None
maxWriteHeld = None
peekCount = None
totalPeekWait = None
totalPeekHeld = None
maxPeekWait = None
maxPeekHeld = None
def __init__(self, cmd, tableName):
self.cmd = cmd
self.processKey = cmd.processKey
self.lineNumber = cmd.lineNumber
self.tableName = tableName
self.pagesIn = self.pagesOut = self.pagesCached = None
self.readLocks = self.writeLocks = self.getRows = self.posRows = self.scanRows = None
self.putRows = self.delRow = self.totalReadWait = self.totalReadHeld = None
self.totalWriteWait = self.totalWriteHeld = self.maxReadWait = None
self.maxReadHeld = self.maxWriteWait = self.maxWriteHeld = None
self.peekCount = None
self.totalPeekWait = self.totalPeekHeld = self.maxPeekWait = self.maxPeekHeld = None
def setPages(self, pagesIn, pagesOut, pagesCached):
self.pagesIn = pagesIn
self.pagesOut = pagesOut
self.pagesCached = pagesCached
def setLocksRows(self, readLocks, writeLocks, getRows, posRows,
scanRows, putRows, delRow):
self.readLocks = readLocks
self.writeLocks = writeLocks
self.getRows = getRows
self.posRows = posRows
self.scanRows = scanRows
self.putRows = putRows
self.delRow = delRow
def setTotalLock(self, totalReadWait, totalReadHeld, totalWriteWait, totalWriteHeld):
self.totalReadWait = totalReadWait
self.totalReadHeld = totalReadHeld
self.totalWriteWait = totalWriteWait
self.totalWriteHeld = totalWriteHeld
def setMaxLock(self, maxReadWait, maxReadHeld, maxWriteWait, maxWriteHeld):
self.maxReadWait = maxReadWait
self.maxReadHeld = maxReadHeld
self.maxWriteWait = maxWriteWait
self.maxWriteHeld = maxWriteHeld
def setPeek(self, peekCount, totalPeekWait, totalPeekHeld, maxPeekWait, maxPeekHeld):
self.peekCount = peekCount
self.totalPeekWait = totalPeekWait
self.totalPeekHeld = totalPeekHeld
self.maxPeekWait = maxPeekWait
self.maxPeekHeld = maxPeekHeld
class Block:
def __init__(self):
self.lines = []
self.lineNo = 0
def addLine(self, line, lineNo):
self.lines.append(line)
# Only at start of block
if not self.lineNo:
self.lineNo = lineNo
class Log2sql:
logname = None
dbname = None
logfile = None
ckpSize = None
readBytes = None
reportingInterval = None
cmds = None
lineNo = 0
countCmds = 0
countInserts = 0
currStartTime = None # String
currStartTimeEpoch = None # Epochtime
def __init__(self, options, instream=None, outstream=None, errstream=None):
self.dbname = options.dbname
self.options = options
self.outstream = outstream # For testing
self.errstream = errstream # For testing
self.init_logger()
if outstream is None:
if options.output:
if options.output == "-":
self.outstream = sys.stdout
else:
self.outstream = open(options.output, "w")
else:
self.outstream = None
if instream is None:
self.logfile = io.open(options.logfile[0], "r", encoding="latin1", errors="backslashreplace")
self.ckpSize = os.stat(options.logfile[0]).st_size
self.logger.info("Processing %s:" % self.options.logfile[0])
else:
self.logfile = instream
self.ckpSize = 500
if self.options.sql:
self.conn = sqlite3.connect("%s.db" % self.dbname)
self.conn.text_factory = str
self.cursor = self.conn.cursor()
self.cursor.execute("PRAGMA synchronous = OFF")
self.cursor.execute("PRAGMA journal_mode = MEMORY")
self.cmds = {}
self.cmdsByEpoch = defaultdict(dict) # Indexed by EpochTime
self.tables = {}
# Track which pids have been seen for current time stamp (per second)
self.currSecond = None
self.pidsSeenThisSecond = {} # List of any new pids
self.readBytes = 0.0
self.reportingInterval = float(self.options.interval)
self.running = 0
self.rowCount = 0
self.db_create_database()
self.db_create_process()
self.db_create_tableUse()
query = "BEGIN TRANSACTION;"
self.output(query)
if self.options.sql:
try:
self.conn.execute(query)
except:
pass
self.logger.info("...0%")
def init_logger(self):
self.logger = logging.getLogger(LOGGER_NAME)
self.logger.setLevel(self.options.verbosity)
formatter = logging.Formatter('%(asctime)s:%(levelname)s %(message)s')
if self.errstream:
ch = logging.StreamHandler(self.errstream)
ch.setLevel(self.options.verbosity)
else:
ch = logging.StreamHandler(sys.stderr)
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
self.logger.addHandler(ch)
if self.options.verbosity != logging.INFO and self.options.outlog:
fh = logging.FileHandler(self.options.outlog, mode='w')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
def outputRequired(self, file=sys.stdout):
return self.outstream or file == sys.stderr
def output(self, text, file=sys.stdout):
if (file == sys.stderr):
print(text, file=file)
elif self.outstream:
try:
self.outstream.write("%s\n" % text)
except UnicodeEncodeError:
str = text.encode(encoding="latin1", errors="backslashreplace")
self.outstream.write("%s\n" % str)
def terminate(self):
self.flush_output()
self.db_commit(True)
if self.options.sql:
self.conn.commit()
self.conn.close()
def getLineNumber(self):
return self.lineNo
def db_commit_updates(self):
self.rowCount += 1
if self.rowCount % RowsPerTransaction == 0:
query = "COMMIT;"
if self.outputRequired():
self.output(query)
if self.options.sql:
self.conn.commit()
query = "BEGIN TRANSACTION;"
if self.outputRequired():
self.output(query)
if self.options.sql:
self.conn.execute(query)
def db_commit(self, state):
if (state):
query = "COMMIT;"
if self.outputRequired():
self.output(query)
if self.options.sql:
self.conn.commit()
query = "BEGIN TRANSACTION;"
if self.outputRequired():
self.output(query)
if self.options.sql:
self.conn.execute(query)
else:
query = "SET autocommit=0;"
if self.outputRequired():
self.output(query)
def db_create_database(self):
query = "CREATE DATABASE IF NOT EXISTS " + self.dbname + ";"
if self.outputRequired():
self.output(query)
query = "USE " + self.dbname + ";"
if self.outputRequired():
self.output(query)
def db_create_process(self):
query = "DROP TABLE IF EXISTS process;"
if self.outputRequired():
self.output(query)
query = "CREATE TABLE process (processkey CHAR(32) NOT NULL, lineNumber INT NOT NULL, pid INT NOT NULL, " \
"startTime DATETIME NOT NULL,endTime DATETIME NULL, computedLapse FLOAT NULL,completedLapse FLOAT NULL, " \
"user TEXT NOT NULL, workspace TEXT NOT NULL, ip TEXT NOT NULL, app TEXT NOT NULL, cmd TEXT NOT NULL, " \
"args TEXT NULL, uCpu INT NULL, sCpu INT NULL, diskIn INT NULL, diskOut INT NULL, ipcIn INT NULL, " \
"ipcOut INT NULL, maxRss INT NULL, pageFaults INT NULL, rpcMsgsIn INT NULL, rpcMsgsOut INT NULL, " \
"rpcSizeIn INT NULL, rpcSizeOut INT NULL, rpcHimarkFwd INT NULL, rpcHimarkRev INT NULL, " \
"rpcSnd FLOAT NULL, rpcRcv FLOAT NULL, running INT NULL, " \
"error TEXT NULL, PRIMARY KEY (processkey, lineNumber));"
if self.outputRequired():
self.output(query)
if self.options.sql:
try:
self.cursor.execute(query)
except:
pass
def db_create_tableUse(self):
query = "DROP TABLE IF EXISTS tableUse;"
if self.outputRequired():
self.output(query)
if self.options.sql:
self.cursor.execute(query)
query = "CREATE TABLE tableUse (processkey CHAR(32) NOT NULL, lineNumber INT NOT NULL, " \
"tableName VARCHAR(255) NOT NULL, pagesIn INT NULL, pagesOut INT NULL, pagesCached INT NULL, " \
"readLocks INT NULL, writeLocks INT NULL, getRows INT NULL, posRows INT NULL, scanRows INT NULL, " \
"putRows int NULL, delRows INT NULL, totalReadWait INT NULL, totalReadHeld INT NULL, " \
"totalWriteWait INT NULL, totalWriteHeld INT NULL, maxReadWait INT NULL, maxReadHeld INT NULL, " \
"maxWriteWait INT NULL, maxWriteHeld INT NULL, peekCount INT NULL, " \
"totalPeekWait INT NULL, totalPeekHeld INT NULL, maxPeekWait INT NULL, maxPeekHeld INT NULL, " \
"PRIMARY KEY (processkey, lineNumber, tableName));"
if self.outputRequired():
self.output(query)
if self.options.sql:
try:
self.cursor.execute(query)
except:
pass
def addCommand(self, cmd, hasTrackInfo):
cmd.running = self.running
self.countCmds += 1
if not self.currStartTime or (self.currStartTimeEpoch != cmd.startTimeEpoch and cmd.startTimeEpoch > self.currStartTimeEpoch):
self.currStartTime = cmd.startTime
self.currStartTimeEpoch = cmd.startTimeEpoch
self.pidsSeenThisSecond = {}
# If we are re-using pid then process previous command
# Or process 'rmt-FileFetch' because they don't have 'completed' records
if cmd.pid in self.cmds:
if cmd.processKey != self.cmds[cmd.pid].processKey:
self.sql_process_insert(self.cmds[cmd.pid])
self.cmds[cmd.pid] = cmd # replace
elif cmd.name == 'rmt-FileFetch' and not hasTrackInfo:
self.sql_process_insert(self.cmds[cmd.pid])
self.cmds[cmd.pid] = cmd # replace
else:
self.cmds[cmd.pid].updateFrom(cmd)
if hasTrackInfo:
self.cmds[cmd.pid].hasTrackInfo = True
else:
self.cmds[cmd.pid] = cmd
if cmd.pid in self.pidsSeenThisSecond:
cmd.duplicateKey = True
self.pidsSeenThisSecond[cmd.pid] = 1
self.running += 1
self.processCompletedCommands()
def processCompletedCommands(self):
"""Remove any commands with completed state - should only be called by addCommand when
we are adding a new command - at that point we know there are no new records to come for any previous
ones as even track output will have been processed.
In order to cope with track commands coming a little later then completed commands, we only process commands
with a starttime of 3 or more seconds earlier than the current one."""
for pid in list(self.cmds.keys()):
cmd = self.cmds[pid]
completed = False
if cmd.completed and (cmd.hasTrackInfo or self.currStartTimeEpoch >= (cmd.endTimeEpoch + 3)):
completed = True
if not completed and (cmd.hasTrackInfo and self.currStartTimeEpoch >= (cmd.endTimeEpoch + 3)):
completed = True
if completed:
self.sql_process_insert(cmd)
del self.cmds[pid]
self.running -= 1
def updateComputedTime(self, pid, computedLapse):
if pid in self.cmds:
# sum all the compute values of a same command
if self.cmds[pid].computedLapse is None:
sum = 0.0
else:
sum = float(self.cmds[pid].computedLapse)
sum += float(computedLapse)
self.cmds[pid].computedLapse = str(sum)
def updateCompletionTime(self, pid, endTime, completedLapse):
if pid in self.cmds:
self.cmds[pid].completed = True
self.cmds[pid].setEndTime(endTime)
self.cmds[pid].completedLapse = str(float(completedLapse))
def processTrackRecords(self, cmd, lines):
self.tables = {}
tableName = None
rpcMsgsIn = rpcMsgsOut = rpcSizeIn = rpcSizeOut = rpcSnd = rpcRcv = 0
rpcHimarkFwd = rpcHimarRev = uCpu = 0
sCpu = diskIn = diskOut = ipcIn = ipcOut = maxRss = pageFaults = 0
readLocks = writeLocks = getRows = posRows = scanRows = 0
putRows = delRow = totalReadWait = totalReadHeld = 0
totalWriteWait = totalWriteHeld = maxReadWait = 0
maxReadHeld = maxWriteWait = maxWriteHeld = 0
totalPeekWait = totalPeekHeld = maxPeekWait = maxPeekHeld = 0
# Use line number from original cmd if appropriate
if cmd.pid in self.cmds and cmd.processKey == self.cmds[cmd.pid].processKey:
cmd.lineNumber = self.cmds[cmd.pid].lineNumber
tablesTracked = []
for line in lines:
if not RE_TRACK.match(line):
break
gotMatch = False
match = RE_TRACK_LAPSE.match(line)
if match:
gotMatch = True
cmd.completedLapse = match.group(1)
if not gotMatch:
match = RE_TRACK_LAPSE2.match(line)
if match:
gotMatch = True
cmd.completedLapse = "0." + match.group(1)
if not gotMatch:
match = RE_FAILED_AUTH.match(line)
if match:
gotMatch = True
cmd.error = "\"" + match.group(1) + "\""
if not gotMatch:
match = RE_KILLED_BY.match(line)
if match:
gotMatch = True
cmd.error = "\"" + match.group(1) + "\""
if not gotMatch:
match = RE_EXITED.match(line)
if match:
gotMatch = True
cmd.error = "\"" + match.group(1) + "\""
if not gotMatch:
match = RE_TRACK_USAGE.match(line)
if match:
gotMatch = True
cmd.setUsage(match.group(1), match.group(2), match.group(3), match.group(4),
match.group(5), match.group(6), match.group(7), match.group(8))
if not gotMatch:
match = RE_TRACK_RPC2.match(line)
if match:
gotMatch = True
cmd.setRpc(match.group(1), match.group(2), match.group(3),
match.group(4), match.group(5), match.group(6),
match.group(7), match.group(8))
if not gotMatch:
match = RE_TRACK_RPC.match(line)
if match:
gotMatch = True
cmd.setRpc(match.group(1), match.group(2), match.group(3),
match.group(4), match.group(5), match.group(6))
if not gotMatch:
match = RE_TRACK_TABLE.match(line)
if match:
gotMatch = True
tableName = match.group(1)
self.tables[tableName] = Table(cmd, tableName)
tablesTracked.append(tableName)
if not gotMatch:
match = RE_TRACK_CLIENT_LOCK.match(line)
if match:
gotMatch = True
tableName = "clients_%s" % match.group(2) # clients/<name>(W)
self.tables[tableName] = Table(cmd, tableName)
tablesTracked.append(tableName)
if not gotMatch:
match = RE_TRACK_CHANGE_LOCK.match(line)
if match:
gotMatch = True
tableName = "changes_%s" % match.group(2) # changes/<number>(W)
self.tables[tableName] = Table(cmd, tableName)
tablesTracked.append(tableName)
if not gotMatch:
match = RE_TRACK_META_LOCK.match(line)
if match:
gotMatch = True
tableName = "meta_%s_%s" % (match.group(1), match.group(2)) # meta/db(W)
self.tables[tableName] = Table(cmd, tableName)
tablesTracked.append(tableName)
if not gotMatch:
match = RE_TRACK_PAGES.match(line)
if match:
gotMatch = True
if tableName in self.tables:
self.tables[tableName].setPages(match.group(1), match.group(2), match.group(3))
if not gotMatch:
match = RE_TRACK_LOCKS_ROWS.match(line)
if match:
gotMatch = True
if tableName in self.tables:
self.tables[tableName].setLocksRows(match.group(1), match.group(2),
match.group(3), match.group(4),
match.group(5), match.group(6),
match.group(7))
if not gotMatch:
match = RE_TRACK_TOTAL_LOCK.match(line)
if match:
gotMatch = True
if tableName in self.tables:
self.tables[tableName].setTotalLock(match.group(1), match.group(2),
match.group(3), match.group(4))
# The max is only reported if there is more than one
# lock taken on the table
if (self.tables[tableName].readLocks is None or
int(self.tables[tableName].readLocks) < 2) and \
(self.tables[tableName].writeLocks is None or
int(self.tables[tableName].writeLocks) < 2):
self.tables[tableName].setMaxLock(match.group(1), match.group(2),
match.group(3), match.group(4))
if not gotMatch:
match = RE_TRACK_MAX_LOCK.match(line)
if match:
gotMatch = True
if tableName in self.tables:
self.tables[tableName].setMaxLock(match.group(1), match.group(2),
match.group(3), match.group(4))
if not gotMatch:
match = RE_TRACK_PEEK.match(line)
if match:
gotMatch = True
if tableName in self.tables:
self.tables[tableName].setPeek(match.group(1), match.group(2),
match.group(3), match.group(4),
match.group(5))
if not gotMatch:
self.logger.debug("Unrecognised track: %d, %s" % (cmd.lineNumber, line[:-1]))
if cmd.completedLapse is not None:
cmd.setEndTime(dateAdd(cmd.startTime, float(cmd.completedLapse)))
else:
cmd.setEndTime(cmd.startTime)
# Don't set tracked info if is one of the special commands which can occur multiple times and
# which don't indicate the completion of the command
hasTrackInfo = False
for t in tablesTracked:
if not t.startswith("meta_") and not t.startswith("changes_") and not t.startswith("clients_"):
hasTrackInfo = True
self.addCommand(cmd, hasTrackInfo=hasTrackInfo)
if hasTrackInfo:
self.cmd_tables_insert(cmd, self.tables)
else:
# Save special tables for processing when cmd is completed
for t in self.tables.keys():
self.cmds[cmd.pid].tables[t] = self.tables[t]
def cmd_tables_insert(self, cmd, tables):
try:
self.sql_tableUse_insert_items(tables)
except Exception as e:
self.logger.warning("%s, pid %d, lineNo %d, cmd lineNo %d, %s, %s, %s" % (
str(e), cmd.pid, self.lineNo, cmd.lineNumber, cmd.processKey, cmd.name, cmd.args))
def flush_output(self):
if self.logger.isEnabledFor(logging.DEBUG):
cmds = []
self.logger.debug("outstanding commands:")
if python3:
for p, v in self.cmds.items():
self.sql_process_insert(v)
if self.logger.isEnabledFor(logging.DEBUG):
cmds.append("lineNo %d, pid %s, cmd %s" % (v.lineNumber, v.pid, v.name))
else:
for p, v in self.cmds.iteritems():
self.sql_process_insert(v)
if self.logger.isEnabledFor(logging.DEBUG):
cmds.append("lineNo %d, pid %s, cmd %s" % (v.lineNumber, v.pid, v.name))
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("\n\t".join(cmds))
self.logger.info("")
def sql_process_insert(self, cmd):
self.cmd_tables_insert(cmd, cmd.tables) # Cmd may have some special tables
processKey = cmd.getKey()
if self.outputRequired():
query = 'INSERT IGNORE INTO process VALUES ("%s",%d,%d,"%s",%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%d,%s);' % \
(processKey, cmd.lineNumber, cmd.pid,
cmd.startTime, quotedNullValue(cmd.endTime), nullValue(cmd.computedLapse),
nullValue(cmd.completedLapse), quotedNullValue(cmd.user), quotedNullValue(cmd.workspace),
quotedNullValue(cmd.ip), quotedNullValue(cmd.app), quotedNullValue(cmd.name),
quotedNullValue(cmd.args),
nullValue(cmd.rpcMsgsIn), nullValue(cmd.rpcMsgsOut),
nullValue(cmd.rpcSizeIn), nullValue(cmd.rpcSizeOut),
nullValue(cmd.rpcHimarkFwd), nullValue(cmd.rpcHimarkRev),
nullValue(cmd.rpcSnd), nullValue(cmd.rpcRcv), nullValue(cmd.uCpu),
nullValue(cmd.sCpu), nullValue(cmd.diskIn), nullValue(cmd.diskOut),
nullValue(cmd.ipcIn), nullValue(cmd.ipcOut),
nullValue(cmd.maxRss), nullValue(cmd.pageFaults), nullValue(cmd.running),
nullValue(cmd.error))
self.output(query)
if self.options.sql:
try:
self.countInserts += 1
self.cursor.execute('INSERT INTO process VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)',
(processKey, cmd.lineNumber, cmd.pid,
cmd.startTime, cmd.endTime, cmd.computedLapse,
cmd.completedLapse, cmd.user, cmd.workspace,
cmd.ip, cmd.app, cmd.name, cmd.args,
cmd.rpcMsgsIn,
cmd.rpcMsgsOut, cmd.rpcSizeIn, cmd.rpcSizeOut,
cmd.rpcHimarkFwd, cmd.rpcHimarkRev,
cmd.rpcSnd, cmd.rpcRcv, cmd.uCpu,
cmd.sCpu, cmd.diskIn, cmd.diskOut,
cmd.ipcIn, cmd.ipcOut,
cmd.maxRss, cmd.pageFaults, cmd.running,
cmd.error))
except Exception as e:
raise Exception("%s: %s, %d, %d, %s" % (str(e), cmd.processKey, cmd.lineNumber, cmd.pid, cmd.name))
self.db_commit_updates()
def outputRow(self, tab):
processKey = self.cmds[tab.cmd.pid].getKey() # Look up in case of dupliates
query = 'INSERT IGNORE INTO tableUse VALUES ("%s",%d,"%s",%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'\
'%s,%s,%s,%s,%s,%s,%s);' % \
(processKey, tab.lineNumber, tab.tableName,
nullValue(tab.pagesIn), nullValue(tab.pagesOut), nullValue(tab.pagesCached),
nullValue(tab.readLocks), nullValue(tab.writeLocks), nullValue(tab.getRows), nullValue(tab.posRows),
nullValue(tab.scanRows), nullValue(tab.putRows), nullValue(tab.delRow),
nullValue(tab.totalReadWait), nullValue(tab.totalReadHeld),
nullValue(tab.totalWriteWait), nullValue(tab.totalWriteHeld),
nullValue(tab.maxReadWait), nullValue(tab.maxReadHeld),
nullValue(tab.maxWriteWait), nullValue(tab.maxWriteHeld),
nullValue(tab.peekCount),
nullValue(tab.totalPeekWait), nullValue(tab.totalPeekHeld),
nullValue(tab.maxPeekWait), nullValue(tab.maxPeekHeld))
self.output(query)
def sql_tableUse_insert_items(self, tables):
if self.outputRequired():
if python3:
for k, v in tables.items():
self.outputRow(v)
else:
for table in tables.values():
self.outputRow(table)
if self.options.sql:
rows = []
if python3:
for k, tab in tables.items():
self.countInserts += 1
processKey = self.cmds[tab.cmd.pid].getKey() # Look up in case of dupliates
rows.append( \
(processKey, tab.lineNumber, tab.tableName,
tab.pagesIn, tab.pagesOut, tab.pagesCached,
tab.readLocks, tab.writeLocks, tab.getRows, tab.posRows,
tab.scanRows, tab.putRows, tab.delRow,
tab.totalReadWait, tab.totalReadHeld,
tab.totalWriteWait, tab.totalWriteHeld,
tab.maxReadWait, tab.maxReadHeld,
tab.maxWriteWait, tab.maxWriteHeld,
tab.peekCount,
tab.totalPeekWait, tab.totalPeekHeld,
tab.maxPeekWait, tab.maxPeekHeld))
else:
for tab in tables.values():
processKey = self.cmds[tab.cmd.pid].getKey() # Look up in case of dupliates
rows.append( \
(processKey, tab.lineNumber, tab.tableName,
tab.pagesIn, tab.pagesOut, tab.pagesCached,
tab.readLocks, tab.writeLocks, tab.getRows, tab.posRows,
tab.scanRows, tab.putRows, tab.delRow,
tab.totalReadWait, tab.totalReadHeld,
tab.totalWriteWait, tab.totalWriteHeld,
tab.maxReadWait, tab.maxReadHeld,
tab.maxWriteWait, tab.maxWriteHeld,
tab.peekCount,
tab.totalPeekWait, tab.totalPeekHeld,
tab.maxPeekWait, tab.maxPeekHeld))
try:
# Use executemany for performance
self.cursor.executemany('INSERT INTO tableUse VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)',
rows)
except Exception as e:
raise Exception("%s: %s, %d, %s" % (str(e), rows[0][0], rows[0][1], rows[0][2]))
self.db_commit_updates()
def processInfoBlock(self, block):
cmd = None
i = 0
# First line of block is info line - process the rest
for line in block.lines[1:]:
i += 1
# Check for track lines and once we have found one, process them all and finish
if cmd:
match = RE_TRACK.match(line)
if match:
self.processTrackRecords(cmd, block.lines[i:])
break # Block has been processed
match = RE_CMD.match(line)
if not match:
match = RE_CMD_NOARG.match(line)
if match:
pid = int(match.group(2))
startTime = match.group(1).replace("/", "-")
user = match.group(3)
workspace = match.group(4)
ip = match.group(5)
# following gsub required due to a 2009.2 P4V bug
app = match.group(6).replace("\x00", "/")
name = match.group(7)
cmdArgs = None
if len(match.groups()) == 8:
cmdArgs = match.group(8)
# Strip Swarm/Git Fusion commands with lots of json
smatch = re.search(RE_JSON_CMDARGS, cmdArgs)
if smatch:
cmdArgs = smatch.group(1)
m = hashlib.md5()
line = re.sub(r'[^\x00-\x7F]', '?', line) # Avoid encoding errors
if python3:
m.update(line.encode())
else:
m.update(line)
processKey = m.hexdigest()
cmd = Command(processKey, block.lineNo, pid, startTime,
user, workspace, ip, app, name, cmdArgs)
self.addCommand(cmd, False)
continue
# Pattern matching a completed line
match = RE_COMPLETED.match(line)
if match:
pid = int(match.group(2))
endTime = match.group(1).replace("/", "-")
completedLapse = match.group(3)
self.updateCompletionTime(pid, endTime, completedLapse)
continue
# Pattern matching a compute line
match = RE_COMPUTE.match(line)
if match:
pid = int(match.group(2))
computedLapse = float(match.group(3))
self.updateComputedTime(pid, computedLapse)
continue
def blankLine(self, line):
if line == "" or line == "\n" or line == "\r\n":
return True
def blockEnd(self, line):
"Blank line or one of terminators"
terminators = ["Perforce server info:",
"Perforce server error:"
"locks acquired by blocking after",
"Rpc himark:"]
if self.blankLine(line):
return True
for t in terminators:
if line[:len(t)] == t:
return True
return False
def blockInfo(self, line):
t = "Perforce server info:"
if line[:len(t)] == t:
return True
return False
def reportProgress(self):
if math.floor(((self.readBytes / self.ckpSize) * 100.0) / self.reportingInterval) == 1.0:
self.logger.info("...%d%%" % (self.reportingInterval))
self.reportingInterval += self.options.interval
self.logger.debug("Runnning: %d, cmds curr/processed: %d/%d" % (self.running, len(self.cmds), self.countCmds))
self.logger.debug("Inserts: %d" % (self.countInserts))
def processLog(self):
block = Block()
for line in self.logfile:
self.lineNo += 1
self.readBytes += len(line)
self.reportProgress()
if self.blockEnd(line):
if block.lines:
if self.blockInfo(block.lines[0]):
self.processInfoBlock(block)
elif self.blankLine(block.lines[0]):
self.processCompletedCommands()
else:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug("Unrecognised block: %d, %s" % (block.lineNo, block.lines[0]))
block = Block()
block.addLine(line, self.lineNo)
else:
block.addLine(line, self.lineNo)
if block.lines:
if self.blockInfo(block.lines[0]):
self.processInfoBlock(block)
self.terminate()
######################
# START OF MAIN SCRIPT
######################
RE_JSON_CMDARGS = re.compile('^(.*) \{.*\}$')
RE_CMD = re.compile('^\t(\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d) pid (\d+) ([^ @]*)@([^ ]*) ([^ ]*) \[([^\]]*)\] \'(\w+-\w+) (.*)\'.*')
RE_CMD_NOARG = re.compile('^\t(\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d) pid (\d+) ([^ @]*)@([^ ]*) ([^ ]*) \[([^\]]*)\] \'(\w+-\w+)\'.*')
RE_COMPUTE = re.compile('^\t(\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d) pid (\d+) compute end ([0-9]+|[0-9]+\.[0-9]+|\.[0-9]+)s.*')
RE_COMPLETED = re.compile('^\t(\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d) pid (\d+) completed ([0-9]+|[0-9]+\.[0-9]+|\.[0-9]+)s.*')
RE_TRACK = re.compile('^---|^locks acquired by blocking after 3 non-blocking attempts')
RE_TRACK_LAPSE = re.compile('^--- lapse (\d+)')
RE_TRACK_LAPSE2 = re.compile('^--- lapse \.(\d+)')
RE_TRACK_RPC = re.compile('^--- rpc msgs/size in\+out (\d+)\+(\d+)/(\d+)mb\+(\d+)mb himarks (\d+)/(\d+)')
RE_TRACK_RPC2 = re.compile('^--- rpc msgs/size in\+out (\d+)\+(\d+)/(\d+)mb\+(\d+)mb himarks (\d+)/(\d+) snd/rcv ([0-9]+|[0-9]+\.[0-9]+|\.[0-9]+)s/([0-9]+|[0-9]+\.[0-9]+|\.[0-9]+)s')
RE_TRACK_USAGE = re.compile('^--- usage (\d+)\+(\d+)us (\d+)\+(\d+)io (\d+)\+(\d+)net (\d+)k (\d+)pf')
RE_FAILED_AUTH = re.compile('^--- (failed authentication check)')
RE_KILLED_BY = re.compile('^--- (killed by .*)')
RE_EXITED = re.compile('^--- (exited on fatal server error)')
RE_TRACK_TABLE = re.compile('^--- db.([a-zA-Z]*)')
RE_TRACK_CLIENT_LOCK = re.compile('^--- clients/([^\(]+)\(([RW])\)')
RE_TRACK_CHANGE_LOCK = re.compile('^--- change/([^\(]+)\(([RW])\)')
RE_TRACK_META_LOCK = re.compile('^--- meta/([^\(]+)\(([RW])\)')
RE_TRACK_PAGES = re.compile('^--- pages in\+out\+cached (\d+)\+(\d+)\+(\d+)')
RE_TRACK_LOCKS_ROWS = re.compile(
'^--- locks read/write (\d+)/(\d+) rows get\+pos\+scan put\+del (\d+)\+(\d+)\+(\d+) (\d+)\+(\d+)')
RE_TRACK_TOTAL_LOCK = re.compile('^--- total lock wait\+held read/write (\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms')
RE_TRACK_PEEK = re.compile('^--- peek count (\d+) wait\+held total/max (\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms')
RE_TRACK_MAX_LOCK = re.compile(
'^--- max lock wait\+held read/write (\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms|--- locks wait+held read/write (\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms')
def main():
parser = argparse.ArgumentParser(add_help=True)
parser.add_argument('logfile', nargs=1, help='log file to process')
parser.add_argument('-d', '--dbname', help="Database name to use", default=None)
parser.add_argument('-o', '--output', help="Name of file to print SQL to (if not specified then no output)", default=None)
parser.add_argument('-i', '--interval', help="Percentage reporting interval (1-99), default 10", type=int, default=10)
parser.add_argument('-v', '--verbosity', nargs='?', const="INFO", default=DEFAULT_VERBOSITY,
choices=('DEBUG', 'INFO', 'WARNING', 'ERROR', 'FATAL'),
help="Output verbosity level. Default is: " + DEFAULT_VERBOSITY)
parser.add_argument('-L', '--outlog', default=DEFAULT_LOG_FILE, help="Default: " + DEFAULT_LOG_FILE)
parser.add_argument('-s', '--sql', help="Use local SQL Lite database", action='store_true', default=False)
try:
options = parser.parse_args()
except:
parser.print_help()
sys.exit(1)
if not options.output and not options.sql:
print("Please specify either an output file or -s/--sql")
parser.print_help()
sys.exit(1)
if options.interval < 1 or options.interval > 99:
print("Please specify an interval between 1 and 99")
parser.print_help()
sys.exit(1)
if not os.path.exists(options.logfile[0]):
print("Specified logfile doesn't exist: '%s'" % options.logfile[0])
sys.exit(0)
log2sql = Log2sql(options)
log2sql.processLog()
if __name__ == '__main__':
main()