log_analyzer.py #2

  • //
  • guest/
  • russell_jackson/
  • sdp/
  • Server/
  • Unix/
  • p4/
  • common/
  • bin/
  • log_analyzer.py
  • View
  • Commits
  • Open Download .zip Download (18 KB)
#!/usr/bin/env python3
"""
Perforce Server Log Analyzer (Python 3) — fast, streaming, no DB, single output file

Updates:
- ONLY outputs: <basename>.txt  (default: logrpt.txt)
- Wider text columns (client) + safe clipping so alignment stays intact.
- Adds section: "LONGEST WRITE LOCK HOLD TIMES (BY COMMAND)".
- REMOVED section + calculation: "LONGEST LOCK HOLD TIMES (BY USER)" (per user aggregation).
- Fixes heap TypeError by adding a monotonic sequence tiebreaker (Python 3 heap compares tuples).
"""

from __future__ import annotations

import argparse
import datetime as dt
import gzip
import lzma
import re
import sys
import time
from dataclasses import dataclass, field
from heapq import heappush, heappushpop, nlargest
from itertools import count
from pathlib import Path
from typing import Dict, List, Optional, Tuple


TOPK = 25
UTC = dt.timezone.utc

# Wider report columns
W_USER = 24
W_CLIENT = 60
W_PID = 8
W_CMD = 24
W_DATE = 20
W_METRIC = 16  # right-aligned numeric

# Global monotonically increasing counter used as a heap tiebreaker
_HEAP_SEQ = count(1)


# ----------------------------
# Time helpers (timezone-aware UTC)
# ----------------------------

def utc_iso(ts: int) -> str:
    if not ts:
        return ""
    return dt.datetime.fromtimestamp(ts, tz=UTC).strftime("%Y-%m-%d %H:%M:%S")


# ----------------------------
# Text formatting helpers
# ----------------------------

def _clip_left(s: str, width: int) -> str:
    """Keep leftmost chars; add ellipsis if clipped."""
    if width <= 0:
        return ""
    if len(s) <= width:
        return s
    if width <= 1:
        return s[:width]
    return s[: width - 1] + "…"


def _clip_right(s: str, width: int) -> str:
    """Keep rightmost chars; add ellipsis if clipped. Useful for long client names."""
    if width <= 0:
        return ""
    if len(s) <= width:
        return s
    if width <= 1:
        return s[-width:]
    return "…" + s[-(width - 1):]


def fmt_row(user: str, client: str, pid: int, cmd: str, date_s: str, metric: float, metric_fmt: str) -> str:
    u = _clip_left(str(user), W_USER)
    c = _clip_right(str(client), W_CLIENT)
    d = _clip_left(str(date_s), W_DATE)
    cm = _clip_left(str(cmd), W_CMD)
    return (
        f"{u:>{W_USER}} "
        f"{c:<{W_CLIENT}} "
        f"{pid:>{W_PID}d} "
        f"{cm:<{W_CMD}} "
        f"{d:<{W_DATE}} "
        f"{metric:{metric_fmt}}"
        "\n"
    )


# ----------------------------
# Regexes / parsing helpers
# ----------------------------

FIRST_LINE_RE = re.compile(r"^(?P<date>\S+)\s+(?P<time>\S+)\s+(?P<rest>.*)$")
USERCLIENT_RE = re.compile(r"(?P<user>[^@\s]+)@(?P<client>[^\s]+)")
PID_RE = re.compile(r"\bpid\b\s+(?P<pid>\d+)", re.IGNORECASE)
ANY_INT_RE = re.compile(r"\b(?P<int>\d+)\b")
IP_RE = re.compile(r"\b(?:(?:\d{1,3}\.){3}\d{1,3}|[0-9a-fA-F:]{2,})\b")
APP_CMD_RE = re.compile(r"(?P<app>\[[^\]]*\])\s*'(?P<cmdline>.*)'\s*$")

DB_TABLE_RE = re.compile(r"^---\s+db\.(?P<table>.*)$", re.IGNORECASE)

PAGES_RE = re.compile(
    r"^---\s+pages\s+in\s+(?P<in>\d+)\s+out\s+(?P<out>\d+)\s+cached\s+(?P<cached>\d+)",
    re.IGNORECASE,
)

ROWLOCK_RE = re.compile(r"^---\s+locks\s+(?!wait)(?P<rest>.*)$", re.IGNORECASE)
LOCKWAIT_RE = re.compile(r"^---\s+(?:locks\s+wait|total\s+lock).*?$", re.IGNORECASE)

_DT_FMTS = ("%Y/%m/%d %H:%M:%S", "%Y-%m-%d %H:%M:%S")


def parse_epoch_utc(date_s: str, time_s: str) -> int:
    s = f"{date_s} {time_s}"
    for fmt in _DT_FMTS:
        try:
            return int(dt.datetime.strptime(s, fmt).replace(tzinfo=UTC).timestamp())
        except ValueError:
            continue
    return 0


def open_text_maybe_compressed(path: str):
    """Open plain/gz/xz text logs. '-' reads stdin as text."""
    if path == "-":
        return sys.stdin
    lower = path.lower()
    if lower.endswith(".gz"):
        return gzip.open(path, "rt", encoding="utf-8", errors="replace", newline="")
    if lower.endswith(".xz") or lower.endswith(".lzma"):
        return lzma.open(path, "rt", encoding="utf-8", errors="replace", newline="")
    return open(path, "rt", encoding="utf-8", errors="replace", newline="")


def is_info_start(line: str) -> bool:
    return line in ("Perforce server info:\n", "Perforce server info:\r\n")


def is_info_end(line: str) -> bool:
    return (not line.strip()) or is_info_start(line)


def best_effort_parse_first_line(line: str) -> Optional[dict]:
    m = FIRST_LINE_RE.match(line.strip())
    if not m:
        return None

    date_s = m.group("date")
    time_s = m.group("time")
    rest = m.group("rest")

    epoch = parse_epoch_utc(date_s, time_s)

    pm = PID_RE.search(rest)
    if pm:
        pid = int(pm.group("pid"))
    else:
        im = ANY_INT_RE.search(rest)
        pid = int(im.group("int")) if im else 0

    ucm = USERCLIENT_RE.search(rest)
    user, client = (ucm.group("user"), ucm.group("client")) if ucm else ("", "")

    ipm = IP_RE.search(rest)
    ip = ipm.group(0) if ipm else ""

    app = ""
    cmd = ""
    args: Optional[str] = None

    acm = APP_CMD_RE.search(rest)
    if acm:
        app = acm.group("app").strip()
        if app.startswith("[") and app.endswith("]"):
            app = app[1:-1]
        cmdline = acm.group("cmdline").strip()
        if cmdline:
            parts = cmdline.split(None, 1)
            cmd = parts[0]
            if len(parts) > 1:
                args = parts[1]
    else:
        q = re.search(r"'([^']+)'", rest)
        if q:
            cmdline = q.group(1).strip()
            parts = cmdline.split(None, 1)
            cmd = parts[0]
            if len(parts) > 1:
                args = parts[1]

    if not cmd:
        return None

    return {
        "epoch": epoch,
        "pid": pid,
        "user": user,
        "client": client,
        "ip": ip,
        "app": app,
        "cmd": cmd,
        "args": args,
    }


def parse_lockwait_line_four_ints(line: str) -> Tuple[int, int, int, int]:
    # Expected: readWait readHeld writeWait writeHeld (ms)
    ints = [int(x) for x in re.findall(r"\d+", line)]
    vals = (ints + [0, 0, 0, 0])[:4]
    return vals[0], vals[1], vals[2], vals[3]


# ----------------------------
# Aggregation structures
# ----------------------------

@dataclass
class ProcRowForTop:
    pid: int
    user: str
    client: str
    cmd: str
    time_iso: str
    metric: float


@dataclass
class Aggregates:
    start_epoch: Optional[int] = None
    end_epoch: Optional[int] = None
    proc_count: int = 0

    sum_wait_ms: int = 0
    count_table_rows: int = 0
    sum_pages_in: int = 0
    sum_pages_out: int = 0

    hour_counts: Dict[int, int] = field(default_factory=dict)

    # Heap item is (metric, tie, seq, row) — seq prevents comparing row objects
    top_compute: List[Tuple[float, int, int, ProcRowForTop]] = field(default_factory=list)
    top_io: List[Tuple[float, int, int, ProcRowForTop]] = field(default_factory=list)
    top_write_held: List[Tuple[float, int, int, ProcRowForTop]] = field(default_factory=list)


def push_topk(heap: List[Tuple[float, int, int, ProcRowForTop]], metric: float, tie: int, row: ProcRowForTop) -> None:
    seq = next(_HEAP_SEQ)
    item = (metric, tie, seq, row)
    if len(heap) < TOPK:
        heappush(heap, item)
    else:
        if metric > heap[0][0]:
            heappushpop(heap, item)


# ----------------------------
# Reporting helpers
# ----------------------------

def fmt_lapsed(seconds: float) -> Tuple[float, str]:
    unit = "second(s)"
    v = seconds
    if v >= 60:
        v /= 60
        unit = "minute(s)"
        if v >= 60:
            v /= 60
            unit = "hour(s)"
            if v >= 24:
                v /= 24
                unit = "day(s)"
                if v >= 7:
                    v /= 7
                    unit = "week(s)"
    return v, unit


def write_report(agg: Aggregates, txt_path: Path) -> None:
    avg_wait = (agg.sum_wait_ms / agg.count_table_rows) if agg.count_table_rows else 0.0
    denom_pages = agg.sum_pages_in + agg.sum_pages_out
    read_pct = (agg.sum_pages_in / denom_pages * 100.0) if denom_pages else 0.0
    write_pct = (agg.sum_pages_out / denom_pages * 100.0) if denom_pages else 0.0

    start_iso = utc_iso(agg.start_epoch or 0) if agg.start_epoch else "N/A"
    end_iso = utc_iso(agg.end_epoch or 0) if agg.end_epoch else "N/A"
    lapsed = float((agg.end_epoch - agg.start_epoch)) if (agg.start_epoch and agg.end_epoch) else 0.0
    lval, lunit = fmt_lapsed(lapsed)

    top_compute = [t[3] for t in nlargest(TOPK, agg.top_compute)]
    top_io = [t[3] for t in nlargest(TOPK, agg.top_io)]
    top_write = [t[3] for t in nlargest(TOPK, agg.top_write_held)]

    top_hours = sorted(agg.hour_counts.items(), key=lambda kv: kv[1], reverse=True)[:20]
    top_hour_set = {hb for hb, _ in top_hours}
    peak_hours_sorted = sorted(((hb, agg.hour_counts[hb]) for hb in top_hour_set), key=lambda kv: kv[0])

    with txt_path.open("wt", encoding="utf-8", errors="replace") as out:
        out.write("------------------------------------------------------------\n")
        out.write("Average wait (ms)\n")
        out.write(f"{avg_wait:f}\n")
        out.write("------------------------------------------------------------\n\n\n")

        out.write("------------------------------------------------------------\n")
        out.write("Read/write percentage\n")
        out.write(f"{read_pct:f}\t{write_pct:f}\n")
        out.write("------------------------------------------------------------\n\n\n")

        out.write("------------------------------------------------------------\n")
        out.write("Start/end time\n")
        out.write(f"{start_iso}\t{end_iso}\n")
        out.write(f"Lapsed:\t{lval:g} {lunit}\n")
        out.write("------------------------------------------------------------\n\n\n")

        out.write("------------------------------------------------------------\n")
        out.write("LONGEST COMPUTE PHASES\n")
        out.write(
            f"{'user':>{W_USER}} "
            f"{'client':<{W_CLIENT}} "
            f"{'pid':>{W_PID}} "
            f"{'command':<{W_CMD}} "
            f"{'date':<{W_DATE}} "
            f"{'compute (ms)':>{W_METRIC}}\n"
        )
        for r in top_compute:
            out.write(fmt_row(r.user, r.client, r.pid, r.cmd, r.time_iso, r.metric, f">{W_METRIC}.1f"))
        out.write("------------------------------------------------------------\n\n\n")

        out.write("------------------------------------------------------------\n")
        out.write("MOST I/O\n")
        out.write(
            f"{'user':>{W_USER}} "
            f"{'client':<{W_CLIENT}} "
            f"{'pid':>{W_PID}} "
            f"{'command':<{W_CMD}} "
            f"{'date':<{W_DATE}} "
            f"{'pages':>{W_METRIC}}\n"
        )
        for r in top_io:
            out.write(fmt_row(r.user, r.client, r.pid, r.cmd, r.time_iso, r.metric, f">{W_METRIC}.0f"))
        out.write("------------------------------------------------------------\n\n\n")

        out.write("------------------------------------------------------------\n")
        out.write("LONGEST WRITE LOCK HOLD TIMES (BY COMMAND)\n")
        out.write(
            f"{'user':>{W_USER}} "
            f"{'client':<{W_CLIENT}} "
            f"{'pid':>{W_PID}} "
            f"{'command':<{W_CMD}} "
            f"{'date':<{W_DATE}} "
            f"{'writeHeld (ms)':>{W_METRIC}}\n"
        )
        for r in top_write:
            out.write(fmt_row(r.user, r.client, r.pid, r.cmd, r.time_iso, r.metric, f">{W_METRIC}.1f"))
        out.write("------------------------------------------------------------\n\n\n")

        out.write("------------------------------------------------------------\n")
        out.write("Top 20 peak hours\n")
        for hb, cnt in peak_hours_sorted:
            out.write(f"{utc_iso(hb)}\t{cnt}\n")
        out.write("------------------------------------------------------------\n\n\n")


# ----------------------------
# Core analysis (single process, streaming)
# ----------------------------

def analyze_logs(logfiles: List[str], quiet: bool) -> Aggregates:
    agg = Aggregates()

    def process_block(lines: List[str]) -> None:
        if not lines:
            return

        header = best_effort_parse_first_line(lines[0])
        if not header:
            return

        epoch = int(header["epoch"])
        pid = int(header["pid"])
        user = header.get("user") or ""
        client = header.get("client") or ""
        cmd = header.get("cmd") or ""

        agg.proc_count += 1
        if epoch:
            agg.start_epoch = epoch if agg.start_epoch is None else min(agg.start_epoch, epoch)
            agg.end_epoch = epoch if agg.end_epoch is None else max(agg.end_epoch, epoch)
            hb = (epoch // 3600) * 3600
            agg.hour_counts[hb] = agg.hour_counts.get(hb, 0) + 1

        current_table: Optional[str] = None
        pages_in = pages_out = None
        read_wait = read_held = write_wait = write_held = None

        max_held = 0
        max_wait = 0
        io_pages = 0
        max_write_held = 0

        per_table_wait_rows = 0
        per_table_wait_sum = 0

        def finalize_table() -> None:
            nonlocal current_table, pages_in, pages_out
            nonlocal read_wait, read_held, write_wait, write_held
            nonlocal max_held, max_wait, io_pages, max_write_held
            nonlocal per_table_wait_rows, per_table_wait_sum

            if current_table is None:
                return

            pi = int(pages_in) if pages_in is not None else 0
            po = int(pages_out) if pages_out is not None else 0
            io_pages += (pi + po)
            agg.sum_pages_in += pi
            agg.sum_pages_out += po

            rw = int(read_wait) if read_wait is not None else 0
            ww = int(write_wait) if write_wait is not None else 0
            rh = int(read_held) if read_held is not None else 0
            wh = int(write_held) if write_held is not None else 0

            per_table_wait_rows += 1
            per_table_wait_sum += (rw + ww)

            held = rh + wh
            wait = rw + ww
            max_held = max(max_held, held)
            max_wait = max(max_wait, wait)
            max_write_held = max(max_write_held, wh)

            current_table = None
            pages_in = pages_out = None
            read_wait = read_held = write_wait = write_held = None

        for raw in lines[1:]:
            s = raw.strip()
            if not s:
                continue

            tm = DB_TABLE_RE.match(s)
            if tm:
                finalize_table()
                current_table = tm.group("table").strip()
                continue

            if current_table is None:
                continue

            pm = PAGES_RE.match(s)
            if pm:
                pages_in = int(pm.group("in"))
                pages_out = int(pm.group("out"))
                continue

            if LOCKWAIT_RE.match(s):
                (read_wait, read_held, write_wait, write_held) = parse_lockwait_line_four_ints(s)
                continue

            if ROWLOCK_RE.match(s):
                continue

        finalize_table()

        if per_table_wait_rows:
            agg.sum_wait_ms += per_table_wait_sum
            agg.count_table_rows += per_table_wait_rows

        compute = max_held - max_wait if max_held > max_wait else max_held
        time_s = utc_iso(epoch)

        push_topk(
            agg.top_compute,
            float(compute),
            pid,
            ProcRowForTop(pid=pid, user=user, client=client, cmd=cmd, time_iso=time_s, metric=float(compute)),
        )

        push_topk(
            agg.top_io,
            float(io_pages),
            pid,
            ProcRowForTop(pid=pid, user=user, client=client, cmd=cmd, time_iso=time_s, metric=float(io_pages)),
        )

        push_topk(
            agg.top_write_held,
            float(max_write_held),
            pid,
            ProcRowForTop(pid=pid, user=user, client=client, cmd=cmd, time_iso=time_s, metric=float(max_write_held)),
        )

    for logfile in logfiles:
        if not quiet:
            print(f"Processing {logfile}...", flush=True)

        in_info = False
        block_lines: List[str] = []

        with open_text_maybe_compressed(logfile) as fh:
            for line in fh:
                line = line.replace("\0", "/")

                if not in_info:
                    if is_info_start(line):
                        in_info = True
                        block_lines = []
                else:
                    if is_info_end(line):
                        if block_lines:
                            process_block(block_lines)
                        if is_info_start(line):
                            in_info = True
                            block_lines = []
                        else:
                            in_info = False
                            block_lines = []
                    else:
                        block_lines.append(line.rstrip("\n"))

            if in_info and block_lines:
                process_block(block_lines)

    return agg


# ----------------------------
# CLI / main
# ----------------------------

def parse_args(argv: List[str]) -> argparse.Namespace:
    p = argparse.ArgumentParser(
        prog=Path(argv[0]).name if argv else "log_analyzer.py",
        description="Perforce Server Log Analyzer (Python 3, streaming, single-output)",
    )
    p.add_argument("-q", action="store_true", help="be quiet")
    p.add_argument("-f", action="store_true", help="overwrite existing output")
    p.add_argument("-b", default="logrpt", help="output basename/prefix (default: logrpt)")
    p.add_argument("-o", dest="rptfile", help="output report file (default: <basename>.txt)")
    p.add_argument("logfiles", nargs="*", help="log files (.gz/.xz/plain) or '-' for stdin")
    return p.parse_args(argv[1:])


def main(argv: List[str]) -> int:
    ns = parse_args(argv)

    basename = ns.b
    rpt_path = Path(ns.rptfile) if ns.rptfile else Path(f"{basename}.txt")
    logfiles = ns.logfiles if ns.logfiles else ["-"]

    if rpt_path.exists():
        if ns.f:
            rpt_path.unlink()
        else:
            print(f"{rpt_path} exists! Use -f to overwrite or -o/-b to change output name.", file=sys.stderr)
            return 2

    t0 = time.time()
    agg = analyze_logs(logfiles, quiet=ns.q)
    write_report(agg, rpt_path)

    if not ns.q:
        print("Done.")
        print(f"  Processes parsed: {agg.proc_count}")
        print(f"  Output: {rpt_path}")
        print(f"  Elapsed: {time.time() - t0:.2f}s")

    return 0


if __name__ == "__main__":
    raise SystemExit(main(sys.argv))

# Change User Description Committed
#2 32301 Russell C. Jackson (Rusty) Improvements for the output of log_analyzer.
#1 32299 Russell C. Jackson (Rusty) New log anyalyzer script completely rewritten in python.