#!/usr/bin/env python3
"""
Perforce Server Log Analyzer (Python 3) — fast, streaming, no DB, single output file
Updates:
- ONLY outputs: <basename>.txt (default: logrpt.txt)
- Wider text columns (client) + safe clipping so alignment stays intact.
- Adds section: "LONGEST WRITE LOCK HOLD TIMES (BY COMMAND)".
- REMOVED section + calculation: "LONGEST LOCK HOLD TIMES (BY USER)" (per user aggregation).
- Fixes heap TypeError by adding a monotonic sequence tiebreaker (Python 3 heap compares tuples).
"""
from __future__ import annotations
import argparse
import datetime as dt
import gzip
import lzma
import re
import sys
import time
from dataclasses import dataclass, field
from heapq import heappush, heappushpop, nlargest
from itertools import count
from pathlib import Path
from typing import Dict, List, Optional, Tuple
TOPK = 25
UTC = dt.timezone.utc
# Wider report columns
W_USER = 24
W_CLIENT = 60
W_PID = 8
W_CMD = 24
W_DATE = 20
W_METRIC = 16 # right-aligned numeric
# Global monotonically increasing counter used as a heap tiebreaker
_HEAP_SEQ = count(1)
# ----------------------------
# Time helpers (timezone-aware UTC)
# ----------------------------
def utc_iso(ts: int) -> str:
if not ts:
return ""
return dt.datetime.fromtimestamp(ts, tz=UTC).strftime("%Y-%m-%d %H:%M:%S")
# ----------------------------
# Text formatting helpers
# ----------------------------
def _clip_left(s: str, width: int) -> str:
"""Keep leftmost chars; add ellipsis if clipped."""
if width <= 0:
return ""
if len(s) <= width:
return s
if width <= 1:
return s[:width]
return s[: width - 1] + "…"
def _clip_right(s: str, width: int) -> str:
"""Keep rightmost chars; add ellipsis if clipped. Useful for long client names."""
if width <= 0:
return ""
if len(s) <= width:
return s
if width <= 1:
return s[-width:]
return "…" + s[-(width - 1):]
def fmt_row(user: str, client: str, pid: int, cmd: str, date_s: str, metric: float, metric_fmt: str) -> str:
u = _clip_left(str(user), W_USER)
c = _clip_right(str(client), W_CLIENT)
d = _clip_left(str(date_s), W_DATE)
cm = _clip_left(str(cmd), W_CMD)
return (
f"{u:>{W_USER}} "
f"{c:<{W_CLIENT}} "
f"{pid:>{W_PID}d} "
f"{cm:<{W_CMD}} "
f"{d:<{W_DATE}} "
f"{metric:{metric_fmt}}"
"\n"
)
# ----------------------------
# Regexes / parsing helpers
# ----------------------------
FIRST_LINE_RE = re.compile(r"^(?P<date>\S+)\s+(?P<time>\S+)\s+(?P<rest>.*)$")
USERCLIENT_RE = re.compile(r"(?P<user>[^@\s]+)@(?P<client>[^\s]+)")
PID_RE = re.compile(r"\bpid\b\s+(?P<pid>\d+)", re.IGNORECASE)
ANY_INT_RE = re.compile(r"\b(?P<int>\d+)\b")
IP_RE = re.compile(r"\b(?:(?:\d{1,3}\.){3}\d{1,3}|[0-9a-fA-F:]{2,})\b")
APP_CMD_RE = re.compile(r"(?P<app>\[[^\]]*\])\s*'(?P<cmdline>.*)'\s*$")
DB_TABLE_RE = re.compile(r"^---\s+db\.(?P<table>.*)$", re.IGNORECASE)
PAGES_RE = re.compile(
r"^---\s+pages\s+in\s+(?P<in>\d+)\s+out\s+(?P<out>\d+)\s+cached\s+(?P<cached>\d+)",
re.IGNORECASE,
)
ROWLOCK_RE = re.compile(r"^---\s+locks\s+(?!wait)(?P<rest>.*)$", re.IGNORECASE)
LOCKWAIT_RE = re.compile(r"^---\s+(?:locks\s+wait|total\s+lock).*?$", re.IGNORECASE)
_DT_FMTS = ("%Y/%m/%d %H:%M:%S", "%Y-%m-%d %H:%M:%S")
def parse_epoch_utc(date_s: str, time_s: str) -> int:
s = f"{date_s} {time_s}"
for fmt in _DT_FMTS:
try:
return int(dt.datetime.strptime(s, fmt).replace(tzinfo=UTC).timestamp())
except ValueError:
continue
return 0
def open_text_maybe_compressed(path: str):
"""Open plain/gz/xz text logs. '-' reads stdin as text."""
if path == "-":
return sys.stdin
lower = path.lower()
if lower.endswith(".gz"):
return gzip.open(path, "rt", encoding="utf-8", errors="replace", newline="")
if lower.endswith(".xz") or lower.endswith(".lzma"):
return lzma.open(path, "rt", encoding="utf-8", errors="replace", newline="")
return open(path, "rt", encoding="utf-8", errors="replace", newline="")
def is_info_start(line: str) -> bool:
return line in ("Perforce server info:\n", "Perforce server info:\r\n")
def is_info_end(line: str) -> bool:
return (not line.strip()) or is_info_start(line)
def best_effort_parse_first_line(line: str) -> Optional[dict]:
m = FIRST_LINE_RE.match(line.strip())
if not m:
return None
date_s = m.group("date")
time_s = m.group("time")
rest = m.group("rest")
epoch = parse_epoch_utc(date_s, time_s)
pm = PID_RE.search(rest)
if pm:
pid = int(pm.group("pid"))
else:
im = ANY_INT_RE.search(rest)
pid = int(im.group("int")) if im else 0
ucm = USERCLIENT_RE.search(rest)
user, client = (ucm.group("user"), ucm.group("client")) if ucm else ("", "")
ipm = IP_RE.search(rest)
ip = ipm.group(0) if ipm else ""
app = ""
cmd = ""
args: Optional[str] = None
acm = APP_CMD_RE.search(rest)
if acm:
app = acm.group("app").strip()
if app.startswith("[") and app.endswith("]"):
app = app[1:-1]
cmdline = acm.group("cmdline").strip()
if cmdline:
parts = cmdline.split(None, 1)
cmd = parts[0]
if len(parts) > 1:
args = parts[1]
else:
q = re.search(r"'([^']+)'", rest)
if q:
cmdline = q.group(1).strip()
parts = cmdline.split(None, 1)
cmd = parts[0]
if len(parts) > 1:
args = parts[1]
if not cmd:
return None
return {
"epoch": epoch,
"pid": pid,
"user": user,
"client": client,
"ip": ip,
"app": app,
"cmd": cmd,
"args": args,
}
def parse_lockwait_line_four_ints(line: str) -> Tuple[int, int, int, int]:
# Expected: readWait readHeld writeWait writeHeld (ms)
ints = [int(x) for x in re.findall(r"\d+", line)]
vals = (ints + [0, 0, 0, 0])[:4]
return vals[0], vals[1], vals[2], vals[3]
# ----------------------------
# Aggregation structures
# ----------------------------
@dataclass
class ProcRowForTop:
pid: int
user: str
client: str
cmd: str
time_iso: str
metric: float
@dataclass
class Aggregates:
start_epoch: Optional[int] = None
end_epoch: Optional[int] = None
proc_count: int = 0
sum_wait_ms: int = 0
count_table_rows: int = 0
sum_pages_in: int = 0
sum_pages_out: int = 0
hour_counts: Dict[int, int] = field(default_factory=dict)
# Heap item is (metric, tie, seq, row) — seq prevents comparing row objects
top_compute: List[Tuple[float, int, int, ProcRowForTop]] = field(default_factory=list)
top_io: List[Tuple[float, int, int, ProcRowForTop]] = field(default_factory=list)
top_write_held: List[Tuple[float, int, int, ProcRowForTop]] = field(default_factory=list)
def push_topk(heap: List[Tuple[float, int, int, ProcRowForTop]], metric: float, tie: int, row: ProcRowForTop) -> None:
seq = next(_HEAP_SEQ)
item = (metric, tie, seq, row)
if len(heap) < TOPK:
heappush(heap, item)
else:
if metric > heap[0][0]:
heappushpop(heap, item)
# ----------------------------
# Reporting helpers
# ----------------------------
def fmt_lapsed(seconds: float) -> Tuple[float, str]:
unit = "second(s)"
v = seconds
if v >= 60:
v /= 60
unit = "minute(s)"
if v >= 60:
v /= 60
unit = "hour(s)"
if v >= 24:
v /= 24
unit = "day(s)"
if v >= 7:
v /= 7
unit = "week(s)"
return v, unit
def write_report(agg: Aggregates, txt_path: Path) -> None:
avg_wait = (agg.sum_wait_ms / agg.count_table_rows) if agg.count_table_rows else 0.0
denom_pages = agg.sum_pages_in + agg.sum_pages_out
read_pct = (agg.sum_pages_in / denom_pages * 100.0) if denom_pages else 0.0
write_pct = (agg.sum_pages_out / denom_pages * 100.0) if denom_pages else 0.0
start_iso = utc_iso(agg.start_epoch or 0) if agg.start_epoch else "N/A"
end_iso = utc_iso(agg.end_epoch or 0) if agg.end_epoch else "N/A"
lapsed = float((agg.end_epoch - agg.start_epoch)) if (agg.start_epoch and agg.end_epoch) else 0.0
lval, lunit = fmt_lapsed(lapsed)
top_compute = [t[3] for t in nlargest(TOPK, agg.top_compute)]
top_io = [t[3] for t in nlargest(TOPK, agg.top_io)]
top_write = [t[3] for t in nlargest(TOPK, agg.top_write_held)]
top_hours = sorted(agg.hour_counts.items(), key=lambda kv: kv[1], reverse=True)[:20]
top_hour_set = {hb for hb, _ in top_hours}
peak_hours_sorted = sorted(((hb, agg.hour_counts[hb]) for hb in top_hour_set), key=lambda kv: kv[0])
with txt_path.open("wt", encoding="utf-8", errors="replace") as out:
out.write("------------------------------------------------------------\n")
out.write("Average wait (ms)\n")
out.write(f"{avg_wait:f}\n")
out.write("------------------------------------------------------------\n\n\n")
out.write("------------------------------------------------------------\n")
out.write("Read/write percentage\n")
out.write(f"{read_pct:f}\t{write_pct:f}\n")
out.write("------------------------------------------------------------\n\n\n")
out.write("------------------------------------------------------------\n")
out.write("Start/end time\n")
out.write(f"{start_iso}\t{end_iso}\n")
out.write(f"Lapsed:\t{lval:g} {lunit}\n")
out.write("------------------------------------------------------------\n\n\n")
out.write("------------------------------------------------------------\n")
out.write("LONGEST COMPUTE PHASES\n")
out.write(
f"{'user':>{W_USER}} "
f"{'client':<{W_CLIENT}} "
f"{'pid':>{W_PID}} "
f"{'command':<{W_CMD}} "
f"{'date':<{W_DATE}} "
f"{'compute (ms)':>{W_METRIC}}\n"
)
for r in top_compute:
out.write(fmt_row(r.user, r.client, r.pid, r.cmd, r.time_iso, r.metric, f">{W_METRIC}.1f"))
out.write("------------------------------------------------------------\n\n\n")
out.write("------------------------------------------------------------\n")
out.write("MOST I/O\n")
out.write(
f"{'user':>{W_USER}} "
f"{'client':<{W_CLIENT}} "
f"{'pid':>{W_PID}} "
f"{'command':<{W_CMD}} "
f"{'date':<{W_DATE}} "
f"{'pages':>{W_METRIC}}\n"
)
for r in top_io:
out.write(fmt_row(r.user, r.client, r.pid, r.cmd, r.time_iso, r.metric, f">{W_METRIC}.0f"))
out.write("------------------------------------------------------------\n\n\n")
out.write("------------------------------------------------------------\n")
out.write("LONGEST WRITE LOCK HOLD TIMES (BY COMMAND)\n")
out.write(
f"{'user':>{W_USER}} "
f"{'client':<{W_CLIENT}} "
f"{'pid':>{W_PID}} "
f"{'command':<{W_CMD}} "
f"{'date':<{W_DATE}} "
f"{'writeHeld (ms)':>{W_METRIC}}\n"
)
for r in top_write:
out.write(fmt_row(r.user, r.client, r.pid, r.cmd, r.time_iso, r.metric, f">{W_METRIC}.1f"))
out.write("------------------------------------------------------------\n\n\n")
out.write("------------------------------------------------------------\n")
out.write("Top 20 peak hours\n")
for hb, cnt in peak_hours_sorted:
out.write(f"{utc_iso(hb)}\t{cnt}\n")
out.write("------------------------------------------------------------\n\n\n")
# ----------------------------
# Core analysis (single process, streaming)
# ----------------------------
def analyze_logs(logfiles: List[str], quiet: bool) -> Aggregates:
agg = Aggregates()
def process_block(lines: List[str]) -> None:
if not lines:
return
header = best_effort_parse_first_line(lines[0])
if not header:
return
epoch = int(header["epoch"])
pid = int(header["pid"])
user = header.get("user") or ""
client = header.get("client") or ""
cmd = header.get("cmd") or ""
agg.proc_count += 1
if epoch:
agg.start_epoch = epoch if agg.start_epoch is None else min(agg.start_epoch, epoch)
agg.end_epoch = epoch if agg.end_epoch is None else max(agg.end_epoch, epoch)
hb = (epoch // 3600) * 3600
agg.hour_counts[hb] = agg.hour_counts.get(hb, 0) + 1
current_table: Optional[str] = None
pages_in = pages_out = None
read_wait = read_held = write_wait = write_held = None
max_held = 0
max_wait = 0
io_pages = 0
max_write_held = 0
per_table_wait_rows = 0
per_table_wait_sum = 0
def finalize_table() -> None:
nonlocal current_table, pages_in, pages_out
nonlocal read_wait, read_held, write_wait, write_held
nonlocal max_held, max_wait, io_pages, max_write_held
nonlocal per_table_wait_rows, per_table_wait_sum
if current_table is None:
return
pi = int(pages_in) if pages_in is not None else 0
po = int(pages_out) if pages_out is not None else 0
io_pages += (pi + po)
agg.sum_pages_in += pi
agg.sum_pages_out += po
rw = int(read_wait) if read_wait is not None else 0
ww = int(write_wait) if write_wait is not None else 0
rh = int(read_held) if read_held is not None else 0
wh = int(write_held) if write_held is not None else 0
per_table_wait_rows += 1
per_table_wait_sum += (rw + ww)
held = rh + wh
wait = rw + ww
max_held = max(max_held, held)
max_wait = max(max_wait, wait)
max_write_held = max(max_write_held, wh)
current_table = None
pages_in = pages_out = None
read_wait = read_held = write_wait = write_held = None
for raw in lines[1:]:
s = raw.strip()
if not s:
continue
tm = DB_TABLE_RE.match(s)
if tm:
finalize_table()
current_table = tm.group("table").strip()
continue
if current_table is None:
continue
pm = PAGES_RE.match(s)
if pm:
pages_in = int(pm.group("in"))
pages_out = int(pm.group("out"))
continue
if LOCKWAIT_RE.match(s):
(read_wait, read_held, write_wait, write_held) = parse_lockwait_line_four_ints(s)
continue
if ROWLOCK_RE.match(s):
continue
finalize_table()
if per_table_wait_rows:
agg.sum_wait_ms += per_table_wait_sum
agg.count_table_rows += per_table_wait_rows
compute = max_held - max_wait if max_held > max_wait else max_held
time_s = utc_iso(epoch)
push_topk(
agg.top_compute,
float(compute),
pid,
ProcRowForTop(pid=pid, user=user, client=client, cmd=cmd, time_iso=time_s, metric=float(compute)),
)
push_topk(
agg.top_io,
float(io_pages),
pid,
ProcRowForTop(pid=pid, user=user, client=client, cmd=cmd, time_iso=time_s, metric=float(io_pages)),
)
push_topk(
agg.top_write_held,
float(max_write_held),
pid,
ProcRowForTop(pid=pid, user=user, client=client, cmd=cmd, time_iso=time_s, metric=float(max_write_held)),
)
for logfile in logfiles:
if not quiet:
print(f"Processing {logfile}...", flush=True)
in_info = False
block_lines: List[str] = []
with open_text_maybe_compressed(logfile) as fh:
for line in fh:
line = line.replace("\0", "/")
if not in_info:
if is_info_start(line):
in_info = True
block_lines = []
else:
if is_info_end(line):
if block_lines:
process_block(block_lines)
if is_info_start(line):
in_info = True
block_lines = []
else:
in_info = False
block_lines = []
else:
block_lines.append(line.rstrip("\n"))
if in_info and block_lines:
process_block(block_lines)
return agg
# ----------------------------
# CLI / main
# ----------------------------
def parse_args(argv: List[str]) -> argparse.Namespace:
p = argparse.ArgumentParser(
prog=Path(argv[0]).name if argv else "log_analyzer.py",
description="Perforce Server Log Analyzer (Python 3, streaming, single-output)",
)
p.add_argument("-q", action="store_true", help="be quiet")
p.add_argument("-f", action="store_true", help="overwrite existing output")
p.add_argument("-b", default="logrpt", help="output basename/prefix (default: logrpt)")
p.add_argument("-o", dest="rptfile", help="output report file (default: <basename>.txt)")
p.add_argument("logfiles", nargs="*", help="log files (.gz/.xz/plain) or '-' for stdin")
return p.parse_args(argv[1:])
def main(argv: List[str]) -> int:
ns = parse_args(argv)
basename = ns.b
rpt_path = Path(ns.rptfile) if ns.rptfile else Path(f"{basename}.txt")
logfiles = ns.logfiles if ns.logfiles else ["-"]
if rpt_path.exists():
if ns.f:
rpt_path.unlink()
else:
print(f"{rpt_path} exists! Use -f to overwrite or -o/-b to change output name.", file=sys.stderr)
return 2
t0 = time.time()
agg = analyze_logs(logfiles, quiet=ns.q)
write_report(agg, rpt_path)
if not ns.q:
print("Done.")
print(f" Processes parsed: {agg.proc_count}")
print(f" Output: {rpt_path}")
print(f" Elapsed: {time.time() - t0:.2f}s")
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv))