#!/usr/bin/env python3 ################################################################################ # # Copyright (c) 2026, Perforce Software, Inc. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL PERFORCE SOFTWARE, INC. BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # # DATE # # $Change: 32743 $ # $Date: 2026/06/16 $ # # SYNOPSIS # # p4diag [-h] [-q] LOG [--start TIME --end TIME] # p4diag trim LOG --start TIME --end TIME # p4diag stats LOG # p4diag log2sql LOG # p4diag summary LOG # p4diag FILE.sql [LOG] # p4diag list # p4diag schema # p4diag plots LOG # # DESCRIPTION # # This script aggregates common Perforce P4 server log analysis workflows. # It uses log2sql to build a SQLite database from trace data, runs canned and # summary SQL queries, writes grep-based log statistics, victim/culprit # write-wait analysis, and command-activity plots into text and HTML under # .p4diagnostics/ for viewing in a browser. Interactive mode provides a TTY # menu and starts a small local web server for output files. # # REQUIREMENT # # Python 3 (built-in sqlite3 module — test: python3 -c "import sqlite3") # pip install tabulate # log2sql on PATH (or set LOG2SQL_BIN) # sqlite3 on PATH — LOG2SQL canned queries, ad hoc SELECT, pid probe (3.31+ for @parameters) # gnuplot — optional, for plots and summary PNGs # grep, sed — log stats and trim # ################################################################################ # $Date: 2026/06/16 $ import os import pickle import re import sys import shlex import io import csv import json import gzip import signal import threading import argparse import hashlib try: import readline except ImportError: try: import gnureadline as readline # type: ignore[no-redef] except ImportError: readline = None # type: ignore[misc, assignment] _PYTHON_SQLITE3_MISSING_MSG = """\ p4diag: this Python was built without sqlite3 support (No module named '_sqlite3'). p4diag needs Python's sqlite3 module for lock shortcuts, summary SQL, victim/culprit analysis, and other core features (in addition to the sqlite3 CLI on PATH). Fix options: 1. Use a Python that includes sqlite3: python3 -c "import sqlite3" On RHEL/CentOS 7, try: yum install python3 Then run: /usr/bin/python3 /path/to/p4diag ... 2. Rebuild /usr/local Python with sqlite headers installed: yum install sqlite-devel then re-run Python's configure/make install. 3. Change the shebang at the top of p4diag to a working python3. """ try: import sqlite3 except ImportError: class _Sqlite3Unavailable: Error = Exception class Cursor: pass class Row: pass class Connection: pass @staticmethod def connect(*_args, **_kwargs): raise ImportError(_PYTHON_SQLITE3_MISSING_MSG) sqlite3 = _Sqlite3Unavailable() # type: ignore[misc, assignment] import subprocess as s import tempfile import shutil import glob import itertools from datetime import datetime, timedelta from typing import Any, Dict, Iterator, List, Optional, Tuple from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from functools import partial from tabulate import tabulate # Set in main(); used by log2sql summary / stats when run non-interactively. QUIET = False LOG_FILE = "" _P4DIAG_INSTALL_DIR = os.path.dirname(os.path.abspath(__file__)) # Canned queries that are always long; paginate even if the file omits ``-- p4diag: pager``. _SQL_FILES_AUTO_PAGER = frozenset({ "locks_held_total.sql", "locks_all_duration.sql", "locks_table_summary.sql", "vicitm_culprit.sql", "bigLocksPerTable.sql", "table_BigLocksPerTable.sql", }) # One-line purpose for each shipped SQL library report (shown in menu and help). _SQL_LIBRARY_SUMMARIES: Dict[str, str] = { "locks_table_summary.sql": ( "Which tables are hot across the whole log" ), "locks_table_by_cmd.sql": ( "One hot table → which command types drove the pain" ), "locks_held_total.sql": ( "Timeline of big holds (snowball hunting)" ), "locks_all_duration.sql": ( "All lock wait/hold over a threshold you choose (ms)" ), "command_summary.sql": ( "Command volume and top users" ), "cpu_summary.sql": ( "CPU totals by command and user" ), "memory_summary.sql": ( "Peak memory by command and user" ), "vicitm_culprit.sql": ( "Pairing victims with specific culprits" ), "bigLocksPerTable.sql": ( "Big lock holds (>5s) on revdx, resolve, integed, and have — newest first" ), "table_BigLocksPerTable.sql": ( "Big lock holds (>5s) grouped by table — alternate layout for hot tables" ), "vc.sql": ( "Write-wait victims paired with likely lock-holding culprits (short name for vc report)" ), } # Lock-query section order in ``h`` / help (box table). _LOCK_QUERY_HELP_ORDER: Tuple[str, ...] = ( "locks_table_summary.sql", "locks_held_total.sql", "locks_all_duration.sql", "vicitm_culprit.sql", "locks_table_by_cmd.sql", ) # Lock shortcut examples for ``h`` / help (command | what it shows). _LOCK_SHORTCUT_EXAMPLES: Tuple[Tuple[str, str], ...] = ( ("ww", "Top write waiters, any table"), ("ww db.rev 100", "Write waiters on db.rev, limit 100"), ("wh db.storage", "Write holders on db.storage"), ("rw db.revdx", "Read waiters on db.revdx"), ("rh db.revhx START END", "Read holders in time range"), ("rh db.revdx START END app,args", "Read holders + extra columns"), ("pid", "Probe one command (PID + startTime)"), ) # Labels for interactive ww/wh/rw/rh shortcuts (match summary HTML Lock Contention section). _LOCK_CONTENTION_SHORTCUT_LABELS: Dict[str, str] = { "ww": "Top 25 write waiters over 10 seconds - blocked by other read or write locks (victims)", "rw": "Top 25 read waiters over 10 seconds - blocked by other write locks (victims)", "rh": "Top 25 read holders over 10 seconds - blocking writers (culprits)", "wh": "Top 25 write holders over 10 seconds - blocking readers and writers (culprits)", } # Built-in SQL library (shipped in the script; disk ``sql_queries/`` overrides by basename). _BUILTIN_SQL_LIBRARY: Dict[str, str] = { 'locks_table_summary.sql': """\ .print .print 'DB CONTENTION - Average Locks Summary (total wait and held each > 10 seconds)' .print 'Does one table have high average or total wait (victims) or held (culprits)?' .print '' .print ' table table name' .print ' number lock events (tableUse rows for this table)' .print ' avgRL avg read-lock acquisitions (ms)' .print ' avgWL avg write-lock acquisitions (ms)' .print ' avgRW avg read wait (ms); victims blocked on read locks' .print ' avgRH avg read held (ms); culprits holding read locks' .print ' avgWW avg write wait (ms); victims blocked on write locks' .print ' avgWH avg write held (ms); culprits holding write locks' .print ' totalWait total read+write wait (ms)' .print ' totalHeld total read+write held (ms)' .print ' hot worst victim (highest totalWait) and culprit (highest totalHeld)' .print '' WITH stats AS ( SELECT tableName AS "table", COUNT(readLocks) AS number, CAST(ROUND(AVG(readLocks)) AS INTEGER) AS avgRL, CAST(ROUND(AVG(writeLocks)) AS INTEGER) AS avgWL, CAST(ROUND(AVG(totalReadWait)) AS INTEGER) AS avgRW, CAST(ROUND(AVG(totalReadHeld)) AS INTEGER) AS avgRH, CAST(ROUND(AVG(totalWriteWait)) AS INTEGER) AS avgWW, CAST(ROUND(AVG(totalWriteHeld)) AS INTEGER) AS avgWH, CAST(ROUND(SUM(totalReadWait) + SUM(totalWriteWait)) AS INTEGER) AS "totalWait", CAST(ROUND(SUM(totalReadHeld) + SUM(totalWriteHeld)) AS INTEGER) AS "totalHeld" FROM tableUse GROUP BY tableUse.tableName ), filtered AS ( SELECT * FROM stats WHERE "totalWait" > 10000 AND "totalHeld" > 10000 ), peaks AS ( SELECT MAX("totalWait") AS maxWait, MAX("totalHeld") AS maxHeld FROM filtered ) SELECT f."table", f.number, f.avgRL, f.avgWL, f.avgRW, f.avgRH, f.avgWW, f.avgWH, f."totalWait", f."totalHeld", TRIM( CASE WHEN f."totalWait" = p.maxWait THEN 'top wait' ELSE '' END || CASE WHEN f."totalWait" = p.maxWait AND f."totalHeld" = p.maxHeld THEN ', top held' WHEN f."totalHeld" = p.maxHeld THEN 'top held' ELSE '' END ) AS hot FROM filtered f CROSS JOIN peaks p ORDER BY f."totalWait" DESC; """, 'locks_held_total.sql': """\ -- p4diag: pager -- Lock holds (readHeld+writeHeld) > 5000ms, sorted by startTime. -- Optional table filter via @table (default % = all tables). -- -- Example: -- p4diag locks_held_total.sql LOG -- p4diag locks_held_total.sql LOG revhx .print "Find commands/tables where total lock hold time (totalReadHeld + totalWriteHeld) exceeded 5000ms." .print "Sorted by startTime (then held_ms desc) to help spot the first significant lock that could trigger a snowball." .print "Optional table filter: pass a table name on the command line, or omit for all tables." .print "" SELECT p.startTime, p.pid, p.user, p.cmd, tu.tableName, (tu.totalReadHeld + tu.totalWriteHeld) AS held_ms, tu.totalReadHeld AS readHeld_ms, tu.totalWriteHeld AS writeHeld_ms FROM tableUse tu JOIN process p ON p.processkey = tu.processkey AND p.lineNumber = tu.lineNumber WHERE (tu.totalReadHeld + tu.totalWriteHeld) > 5000 AND tu.tableName NOT IN ('storagemasterup_R', 'storageup_R') AND tu.tableName LIKE @table ORDER BY p.startTime ASC, held_ms DESC; """, 'locks_all_duration.sql': """\ -- p4diag: pager -- Lock wait or hold exceeding @duration ms, sorted by startTime. -- -- Requires sqlite3 parameter support (.parameter init). Set before .read: -- .parameter set @duration 5000 -- -- Example: -- p4diag locks_all_duration.sql LOG 5000 .print 'Lock events where readHeld, writeHeld, readWait, or writeWait exceeds the threshold (ms)' .print 'Sorted by startTime ASC.' .print '' SELECT p.startTime, p.pid, p.user, p.cmd, tu.tableName, tu.totalReadWait AS readWait_ms, tu.totalReadHeld AS readHeld_ms, tu.totalWriteWait AS writeWait_ms, tu.totalWriteHeld AS writeHeld_ms FROM tableUse tu JOIN process p ON p.processkey = tu.processkey AND p.lineNumber = tu.lineNumber WHERE ( tu.totalReadHeld > @duration OR tu.totalWriteHeld > @duration OR tu.totalReadWait > @duration OR tu.totalWriteWait > @duration ) AND tu.tableName NOT IN ('storagemasterup_R', 'storageup_R') ORDER BY p.startTime ASC; """, 'locks_table_by_cmd.sql': """\ -- p4diag: require-table -- Lock time on one table: rollup totals and breakdown by P4 command (cmd). -- -- Use when a table (e.g. revdx) is clearly the hotspot but victim/culprit -- pairing does not point to a single offender — see which command types -- contributed the most wait and hold time on that table. -- -- Requires sqlite3 parameter support (.parameter init). Set before .read: -- .parameter set @table revdx -- -- Example: -- p4diag locks_table_by_cmd.sql LOG revdx .print '' .print 'LOCK TIME ON ONE TABLE — totals and by command' .print '' .print ' totalLock_ms = readWait + writeWait + readHeld + writeHeld (all ms on this table)' .print '' WITH filtered AS ( SELECT tu.totalReadWait, tu.totalWriteWait, tu.totalReadHeld, tu.totalWriteHeld, p.cmd FROM tableUse tu JOIN process p ON p.processkey = tu.processkey AND p.lineNumber = tu.lineNumber WHERE tu.tableName LIKE @table ) SELECT @table AS "table", COUNT(*) AS events, CAST(SUM(totalReadWait) AS INTEGER) AS readWait_ms, CAST(SUM(totalWriteWait) AS INTEGER) AS writeWait_ms, CAST(SUM(totalReadHeld) AS INTEGER) AS readHeld_ms, CAST(SUM(totalWriteHeld) AS INTEGER) AS writeHeld_ms, CAST(SUM(totalReadWait + totalWriteWait) AS INTEGER) AS totalWait_ms, CAST(SUM(totalReadHeld + totalWriteHeld) AS INTEGER) AS totalHeld_ms, CAST( SUM(totalReadWait + totalWriteWait + totalReadHeld + totalWriteHeld) AS INTEGER ) AS totalLock_ms FROM filtered; .print '' .print 'By command (sorted by totalLock_ms desc)' .print '' WITH filtered AS ( SELECT tu.totalReadWait, tu.totalWriteWait, tu.totalReadHeld, tu.totalWriteHeld, p.cmd FROM tableUse tu JOIN process p ON p.processkey = tu.processkey AND p.lineNumber = tu.lineNumber WHERE tu.tableName LIKE @table ), totals AS ( SELECT SUM(totalReadWait + totalWriteWait + totalReadHeld + totalWriteHeld) AS grandLock_ms FROM filtered ) SELECT f.cmd AS command, COUNT(*) AS events, CAST(SUM(f.totalReadWait) AS INTEGER) AS readWait_ms, CAST(SUM(f.totalWriteWait) AS INTEGER) AS writeWait_ms, CAST(SUM(f.totalReadHeld) AS INTEGER) AS readHeld_ms, CAST(SUM(f.totalWriteHeld) AS INTEGER) AS writeHeld_ms, CAST(SUM(f.totalReadWait + f.totalWriteWait) AS INTEGER) AS totalWait_ms, CAST(SUM(f.totalReadHeld + f.totalWriteHeld) AS INTEGER) AS totalHeld_ms, CAST( SUM(f.totalReadWait + f.totalWriteWait + f.totalReadHeld + f.totalWriteHeld) AS INTEGER ) AS totalLock_ms, CAST( ROUND( 100.0 * SUM(f.totalReadWait + f.totalWriteWait + f.totalReadHeld + f.totalWriteHeld) / NULLIF(t.grandLock_ms, 0), 1 ) AS TEXT ) || '%' AS pct_of_table FROM filtered f CROSS JOIN totals t GROUP BY f.cmd, t.grandLock_ms ORDER BY totalLock_ms DESC; """, 'command_summary.sql': """\ .print '' .print 'COMMAND SUMMARY - command usage from the trace log' .print '' SELECT COUNT(*) AS total, COUNT(DISTINCT cmd) AS commands, COUNT(DISTINCT user) AS users, MIN(startTime) AS firstStart, MAX(endTime) AS lastEnd FROM process; .print '' .print 'Top 20 commands by count' .print '' .print ' command P4 command name' .print ' count number of invocations' .print ' pct share of all commands' .print ' users distinct users running this command' .print ' avgLapse avg completed lapse (s)' .print ' cpuMin total user+system CPU (minutes)' .print '' WITH totals AS ( SELECT COUNT(*) AS n FROM process ), by_cmd AS ( SELECT cmd AS command, COUNT(*) AS count, COUNT(DISTINCT user) AS users, CAST(ROUND(AVG(completedLapse)) AS INTEGER) AS avgLapse, CAST(ROUND(SUM(uCpu + sCpu) / 60000.0) AS INTEGER) AS cpuMin FROM process GROUP BY cmd ) SELECT command, count, CAST(ROUND(100.0 * count / totals.n, 1) AS TEXT) || '%' AS pct, users, avgLapse, cpuMin FROM by_cmd CROSS JOIN totals ORDER BY count DESC LIMIT 20; .print '' .print 'Top 20 command + user pairs by count' .print '' .print ' user P4 user' .print ' command P4 command name' .print ' count invocations for this user and command' .print '' SELECT user, cmd AS command, COUNT(*) AS count FROM process GROUP BY user, cmd ORDER BY count DESC LIMIT 20; """, 'cpu_summary.sql': """\ .print '' .print 'CPU SUMMARY - CPU usage from the trace log' .print '' .print 'Note: Windows P4 Server logs do not include "usage" lines, so uCpu/sCpu counters' .print 'are not populated. On Windows, use the substitute section at the end of this report' .print '(computed lapse and RPC times) instead of the CPU sections above.' .print '' .width 0 0 0 0 0 0 SELECT COUNT(*) AS total, SUM(CASE WHEN uCpu > 10000 THEN 1 ELSE 0 END) AS over10sUser, SUM(CASE WHEN sCpu > 10000 THEN 1 ELSE 0 END) AS over10sSys, CAST(SUM(uCpu) AS INTEGER) AS totalUserMs, CAST(SUM(sCpu) AS INTEGER) AS totalSysMs, CAST(ROUND(SUM(uCpu + sCpu) / 60000.0) AS INTEGER) AS totalCpuMin FROM process; .print '' .print 'Top 20 by total CPU (excluding sync and transmit)' .print '' .print ' user P4 user' .print ' command P4 command name' .print ' lapse completed lapse (s)' .print ' uCpuMs user CPU (ms)' .print ' sCpuMs system CPU (ms)' .print ' totalMs user + system CPU (ms)' .print ' startTime command start' .print ' endTime command end' .print '' .width 40 16 8 10 10 10 20 20 SELECT user, cmd AS command, CAST(completedLapse AS INTEGER) AS lapse, CAST(uCpu AS INTEGER) AS uCpuMs, CAST(sCpu AS INTEGER) AS sCpuMs, CAST(uCpu + sCpu AS INTEGER) AS totalMs, startTime, endTime FROM process WHERE cmd NOT LIKE '%sync%' AND cmd NOT LIKE '%transmit%' ORDER BY totalMs DESC LIMIT 20; .print '' .print 'Top 10 users by user CPU' .print '' .print ' user P4 user' .print ' uCpuMs total user CPU (ms)' .print ' uCpuMin total user CPU (minutes)' .print '' .width 40 12 10 SELECT user, CAST(SUM(uCpu) AS INTEGER) AS uCpuMs, CAST(ROUND(SUM(uCpu) / 60000.0) AS INTEGER) AS uCpuMin FROM process GROUP BY user ORDER BY uCpuMs DESC LIMIT 10; .print '' .print 'Top 10 users by system CPU' .print '' .print ' user P4 user' .print ' sCpuMs total system CPU (ms)' .print ' sCpuMin total system CPU (minutes)' .print '' .width 40 12 10 SELECT user, CAST(SUM(sCpu) AS INTEGER) AS sCpuMs, CAST(ROUND(SUM(sCpu) / 60000.0) AS INTEGER) AS sCpuMin FROM process GROUP BY user ORDER BY sCpuMs DESC LIMIT 10; .print '' .print 'Commands using more than 10 seconds of user CPU (by invocation count)' .print '' .print ' command P4 command name' .print ' count invocations over 10s user CPU' .print ' uCpuMin total user CPU (minutes)' .print '' .width 16 8 10 SELECT cmd AS command, COUNT(*) AS count, CAST(ROUND(SUM(uCpu) / 60000.0) AS INTEGER) AS uCpuMin FROM process WHERE uCpu > 10000 GROUP BY cmd ORDER BY count DESC; .print '' .print 'Commands using more than 10 seconds of system CPU (by invocation count)' .print '' .print ' command P4 command name' .print ' count invocations over 10s system CPU' .print ' sCpuMin total system CPU (minutes)' .print '' .width 16 8 10 SELECT cmd AS command, COUNT(*) AS count, CAST(ROUND(SUM(sCpu) / 60000.0) AS INTEGER) AS sCpuMin FROM process WHERE sCpu > 10000 GROUP BY cmd ORDER BY count DESC; .print '' .print 'Top 15 commands by total user CPU' .print '' .print ' command P4 command name' .print ' uCpuMin total user CPU (minutes)' .print '' .width 16 10 SELECT cmd AS command, CAST(ROUND(SUM(uCpu) / 60000.0) AS INTEGER) AS uCpuMin FROM process GROUP BY cmd ORDER BY uCpuMin DESC LIMIT 15; .print '' .print 'Windows substitute for CPU' .print '' .print ' pid process id' .print ' user P4 user' .print ' command P4 command name' .print ' lapse completed lapse (s)' .print ' rpcRcv RPC receive time' .print ' rpcSnd RPC send time' .print ' computed computed lapse' .print ' startTime command start' .print ' endTime command end' .print '' .width 8 40 16 8 10 10 10 20 20 SELECT pid, user, cmd AS command, CAST(ROUND(completedLapse) AS INTEGER) AS lapse, CAST(ROUND(rpcRcv) AS INTEGER) AS rpcRcv, CAST(ROUND(rpcSnd) AS INTEGER) AS rpcSnd, CAST(ROUND(computedLapse) AS INTEGER) AS computed, startTime, endTime FROM process ORDER BY computedLapse DESC LIMIT 50; """, 'memory_summary.sql': """\ .print '' .print 'MEMORY SUMMARY - memory usage from the trace log (excluding pull)' .print '' .print 'Overview (one row for the whole log):' .print ' total traced commands' .print ' withMem commands that reported memory data' .print ' peakMemMB highest cmd memory (MB) on any single command' .print ' peakMemPeakMB highest peak memory (MB) on any single command' .print ' avgMemMB average cmd memory (MB) across all commands' .print '' .width 0 0 0 0 0 SELECT COUNT(*) AS total, SUM(CASE WHEN memMB > 0 THEN 1 ELSE 0 END) AS withMem, CAST(MAX(memMB) AS INTEGER) AS peakMemMB, CAST(MAX(memPeakMB) AS INTEGER) AS peakMemPeakMB, CAST(ROUND(AVG(memMB)) AS INTEGER) AS avgMemMB FROM process WHERE cmd != 'pull'; .print '' .print 'Top 25 memory consumers (excluding pull)' .print '' .print ' pid process id' .print ' user P4 user' .print ' command P4 command name (cmd only, not args)' .print ' lapse completed lapse (s)' .print ' uCpuMs user CPU (ms)' .print ' sCpuMs system CPU (ms)' .print ' memMB cmd memory when command finished (MB)' .print ' memPeakMB peak memory during command (MB)' .print ' startTime command start' .print ' endTime command end' .print '' .width 8 40 16 8 10 10 8 10 20 20 SELECT pid, user, cmd AS command, CAST(ROUND(completedLapse) AS INTEGER) AS lapse, CAST(uCpu AS INTEGER) AS uCpuMs, CAST(sCpu AS INTEGER) AS sCpuMs, CAST(memMB AS INTEGER) AS memMB, CAST(memPeakMB AS INTEGER) AS memPeakMB, startTime, endTime FROM process WHERE cmd != 'pull' ORDER BY memMB DESC LIMIT 25; .print '' .print 'Top 15 commands by peak memory (memPeakMB)' .print '' .print ' command P4 command name' .print ' count invocations' .print ' peakMemMB highest memMB seen' .print ' peakMaxMB highest memPeakMB seen' .print ' avgMemMB average memMB' .print '' .width 16 10 10 10 10 SELECT cmd AS command, COUNT(*) AS count, CAST(MAX(memMB) AS INTEGER) AS peakMemMB, CAST(MAX(memPeakMB) AS INTEGER) AS peakMaxMB, CAST(ROUND(AVG(memMB)) AS INTEGER) AS avgMemMB FROM process WHERE cmd != 'pull' GROUP BY cmd ORDER BY peakMaxMB DESC LIMIT 15; .print '' .print 'Top 10 users by peak memory (memPeakMB)' .print '' .print ' user P4 user' .print ' count invocations' .print ' peakMemMB highest memMB seen' .print ' peakMaxMB highest memPeakMB seen' .print '' .width 40 10 10 10 SELECT user, COUNT(*) AS count, CAST(MAX(memMB) AS INTEGER) AS peakMemMB, CAST(MAX(memPeakMB) AS INTEGER) AS peakMaxMB FROM process WHERE cmd != 'pull' GROUP BY user ORDER BY peakMaxMB DESC LIMIT 10; """, 'vicitm_culprit.sql': """\ -- Victim / culprit write-wait drill-down (matches p4diag victim-culprit analysis). -- -- Finds commands with high write-wait on a table (victims), then lock-holders on the -- same table in a time window (culprits). Direct culprits (read/write held > 95% of -- the victim's write wait) are preferred; when none match, overlap culprits (held > 2s -- before the victim start) are shown instead. -- -- Tunables (edit literals below): -- global pool 50, per-table 5, write-wait floor 5000 ms, overlap floor 2000 ms, -- hold lookback 4 hours, end margin 4 minutes. -- -- Each victim/culprit PID includes a resource hint (lock-wait, lock-hold, cpu, rpc, io, mixed). -- -- Usage: -- sqlite3 -header .p4diagnostics/mylog.db ".read /warp/sql_queries/vc.sql" -- p4diagnostics query vc.sql mylog .headers off .mode list WITH base AS ( SELECT tu.tableName, p.user, p.startTime, p.endTime, p.pid, p.cmd, p.args, CAST(p.completedLapse AS INTEGER) AS lapse, tu.totalWriteWait FROM tableUse tu JOIN process p USING (processKey) WHERE tu.tableName NOT IN ( 'clients', 'clientEntity', 'change', 'storageup_R', 'storagemasterup_R', 'pull' ) AND tu.totalWriteWait > 5000 AND p.completedLapse > 0 ), global_pool AS ( SELECT *, ROW_NUMBER() OVER (ORDER BY totalWriteWait DESC) AS rn_global FROM base ), top_global AS ( SELECT * FROM global_pool WHERE rn_global <= 50 ), per_table AS ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY tableName ORDER BY totalWriteWait DESC ) AS rn_table FROM top_global ), victims AS ( SELECT ROW_NUMBER() OVER (ORDER BY totalWriteWait DESC) - 1 AS victim_idx, tableName, user, startTime, endTime, pid, cmd, args, lapse, totalWriteWait FROM per_table WHERE rn_table <= 5 ), victim_windows AS ( SELECT v.*, ROUND(totalWriteWait * 0.95) AS wait95, strftime( '%Y/%m/%d %H:%M:%S', datetime(replace(v.startTime, '/', '-'), '-4 hours') ) AS hold_search_start, strftime( '%Y/%m/%d %H:%M:%S', datetime(replace(v.endTime, '/', '-'), '+4 minutes') ) AS adj_end FROM victims v ), direct_culprits AS ( SELECT vw.victim_idx, 'direct' AS match_type, p.pid AS culprit_pid, p.user AS culprit_user, p.cmd AS culprit_cmd, tu.totalReadHeld AS culprit_read_held_ms, tu.totalWriteHeld AS culprit_write_held_ms, tu.tableName AS culprit_table, p.startTime AS culprit_start FROM victim_windows vw JOIN tableUse tu ON tu.tableName = vw.tableName JOIN process p ON p.processKey = tu.processKey WHERE p.startTime >= vw.hold_search_start AND p.startTime <= vw.adj_end AND ( tu.totalReadHeld > vw.wait95 OR tu.totalWriteHeld > vw.wait95 ) AND CAST(CAST(p.pid AS REAL) AS INTEGER) != CAST(CAST(vw.pid AS REAL) AS INTEGER) ), overlap_culprits AS ( SELECT vw.victim_idx, 'overlap' AS match_type, p.pid AS culprit_pid, p.user AS culprit_user, p.cmd AS culprit_cmd, tu.totalReadHeld AS culprit_read_held_ms, tu.totalWriteHeld AS culprit_write_held_ms, tu.tableName AS culprit_table, p.startTime AS culprit_start FROM victim_windows vw JOIN tableUse tu ON tu.tableName = vw.tableName JOIN process p ON p.processKey = tu.processKey WHERE NOT EXISTS ( SELECT 1 FROM direct_culprits dc WHERE dc.victim_idx = vw.victim_idx ) AND p.startTime >= vw.hold_search_start AND p.startTime <= vw.startTime AND ( tu.totalReadHeld > 2000 OR tu.totalWriteHeld > 2000 ) AND CAST(CAST(p.pid AS REAL) AS INTEGER) != CAST(CAST(vw.pid AS REAL) AS INTEGER) ), all_culprits AS ( SELECT * FROM direct_culprits UNION ALL SELECT * FROM overlap_culprits ), ranked_culprit_probes AS ( SELECT ac.culprit_pid AS pid, ac.culprit_start AS startTime, ROW_NUMBER() OVER ( PARTITION BY ac.victim_idx ORDER BY ac.culprit_read_held_ms + ac.culprit_write_held_ms DESC ) AS rn FROM all_culprits ac ), probe_targets AS ( SELECT pid, startTime FROM victims UNION SELECT pid, startTime FROM ranked_culprit_probes WHERE rn <= 5 ), table_agg AS ( SELECT tu.processKey, SUM(tu.pagesIn + tu.pagesOut) AS db_io, SUM(tu.scanRows) AS scans, SUM(tu.totalReadWait + tu.totalWriteWait) AS lock_wait_ms, SUM(tu.totalReadHeld + tu.totalWriteHeld) AS lock_held_ms FROM tableUse tu JOIN process p ON p.processKey = tu.processKey JOIN probe_targets pt ON pt.pid = p.pid AND pt.startTime = p.startTime GROUP BY tu.processKey ), probe_data AS ( SELECT p.pid, p.startTime, CAST(p.completedLapse AS INTEGER) AS lapse_s, p.uCpu + COALESCE(p.sCpu, 0) AS cpu_us, COALESCE(p.diskIn, 0) + COALESCE(p.diskOut, 0) AS disk_io, COALESCE(p.rpcSnd, 0) + COALESCE(p.rpcRcv, 0) AS rpc_s, t.db_io, t.scans, t.lock_wait_ms, t.lock_held_ms, CASE WHEN t.lock_wait_ms > t.lock_held_ms AND t.lock_wait_ms > 5000 THEN 'lock-wait' WHEN t.lock_held_ms > 5000 THEN 'lock-hold' WHEN p.uCpu + COALESCE(p.sCpu, 0) > CAST(p.completedLapse * 1e6 AS INTEGER) / 2 THEN 'cpu' WHEN COALESCE(p.rpcSnd, 0) + COALESCE(p.rpcRcv, 0) > p.completedLapse / 4 THEN 'rpc' WHEN t.db_io > 5000 OR COALESCE(p.diskIn, 0) + COALESCE(p.diskOut, 0) > 100000 THEN 'io' ELSE 'mixed' END AS hint FROM probe_targets pt JOIN process p ON p.pid = pt.pid AND p.startTime = pt.startTime LEFT JOIN table_agg t ON t.processKey = p.processKey ), culprit_lines AS ( SELECT victim_idx, match_type, culprit_start, printf( ' %-8s %-22s %-15s %12s %12s %-8s %s', CAST(ac.culprit_pid AS TEXT), substr(COALESCE(ac.culprit_user, ''), 1, 22), substr(COALESCE(ac.culprit_cmd, ''), 1, 15), printf('%,.0f', ac.culprit_read_held_ms), printf('%,.0f', ac.culprit_write_held_ms), COALESCE(pd.hint, '?'), ac.culprit_start ) AS line FROM all_culprits ac LEFT JOIN probe_data pd ON pd.pid = ac.culprit_pid AND pd.startTime = ac.culprit_start ), culprit_blocks AS ( SELECT cl.victim_idx, MAX(cl.match_type) AS match_type, ' Culprits (' || MAX(cl.match_type) || ') — lock holders on this table (victim PID excluded):' || char(10) || ' pid user cmd read-held write-held hint start' || char(10) || ( SELECT GROUP_CONCAT(cl2.line, char(10)) FROM culprit_lines cl2 WHERE cl2.victim_idx = cl.victim_idx ORDER BY cl2.culprit_start ) AS block FROM culprit_lines cl GROUP BY cl.victim_idx ), victim_reports AS ( SELECT vw.victim_idx, vw.totalWriteWait, '------------------------------------------------------------------------' || char(10) || 'Victim: ' || COALESCE(vw.cmd, '(unknown)') || ' (' || printf('%,.0f', vw.totalWriteWait) || 'ms write wait on ' || vw.tableName || ')' || char(10) || char(10) || ' Victim process (blocked — waiting to write):' || char(10) || ' PID: ' || COALESCE(CAST(vw.pid AS TEXT), '') || char(10) || ' Start: ' || COALESCE(vw.startTime, '') || char(10) || ' End: ' || COALESCE(vw.endTime, '') || char(10) || ' Cmd: ' || COALESCE(vw.cmd, '(unknown)') || char(10) || ' Args: ' || COALESCE(NULLIF(trim(vw.args), ''), '(none)') || char(10) || char(10) || ' Victim profile: ' || COALESCE(vp.hint, '(no process row)') || ' lapse=' || COALESCE(CAST(vp.lapse_s AS TEXT), '?') || 's' || ' lock_wait=' || printf('%,.0f', COALESCE(vp.lock_wait_ms, 0)) || 'ms' || ' lock_held=' || printf('%,.0f', COALESCE(vp.lock_held_ms, 0)) || 'ms' || ' db_io=' || printf('%,.0f', COALESCE(vp.db_io, 0)) || ' scans=' || printf('%,.0f', COALESCE(vp.scans, 0)) || char(10) || char(10) || COALESCE( cb.block, ' Culprits: (none) — no other lock-holder rows matched the time window and thresholds.' ) || char(10) AS report FROM victim_windows vw LEFT JOIN culprit_blocks cb ON cb.victim_idx = vw.victim_idx LEFT JOIN probe_data vp ON vp.pid = vw.pid AND vp.startTime = vw.startTime ), report_parts AS ( SELECT 0 AS sort_key, -1 AS victim_idx, 0.0 AS totalWriteWait, 'VICTIM / CULPRIT REPORT (write-wait drill-down)' || char(10) || 'Victims: up to 5 rows per table among the top 50 global write-wait rows (>5s).' || char(10) || 'Each victim is a command with high write-wait; culprits hold read/write locks on that table.' || char(10) || 'Search window: 4h before victim start through shortly after victim end (direct),' || char(10) || 'or through victim start only (overlap fallback).' || char(10) AS report UNION ALL SELECT 1 AS sort_key, -1 AS victim_idx, 0.0 AS totalWriteWait, 'No significant write waits (> 5s) found.' WHERE (SELECT COUNT(*) FROM victims) = 0 UNION ALL SELECT 1 AS sort_key, victim_idx, totalWriteWait, report FROM victim_reports WHERE (SELECT COUNT(*) FROM victims) > 0 ) SELECT report FROM report_parts ORDER BY sort_key, totalWriteWait DESC, victim_idx; """, } # Built-in PID probe SQL (``pid`` command only; not listed in canned-query menu). _BUILTIN_PID_PROBE_SQL: Dict[str, str] = { 'probe_pid.sql': """\ -- Probe one command instance by PID + startTime (log2sql process/tableUse). -- -- Requires sqlite3 parameter support (.parameter init). Set before .read: -- .parameter set @pid -- .parameter set @start 'YYYY/MM/DD HH:MM:SS' -- -- Example: -- sqlite3 -header -column DB.db \ -- -cmd ".parameter init" \ -- -cmd ".parameter set @pid 97736" \ -- -cmd ".parameter set @start '2025/07/10 12:00:01'" \ -- ".read /warp/sql_queries/probe_pid.sql" -- -- Or: probe_pid DB.db 97736 '2025/07/10 12:00:01' (see probe_pid.sh) .mode line .headers on .print '' .print '=== PID probe (summary) ===' .print '' WITH target AS ( SELECT * FROM process WHERE pid = @pid AND startTime = @start ), lock_agg AS ( SELECT COUNT(*) AS tables_touched, SUM(pagesIn + pagesOut) AS db_pages_io, SUM(scanRows) AS scan_rows, SUM(putRows + delRows) AS write_rows, SUM(totalReadWait + totalWriteWait) AS lock_wait_ms, SUM(totalReadHeld + totalWriteHeld) AS lock_held_ms, SUM(totalPeekWait) AS peek_wait_ms, SUM(readLocks + writeLocks) AS lock_ops_ms, MAX(totalWriteWait) AS max_write_wait_ms, MAX(totalReadWait) AS max_read_wait_ms, MAX(totalWriteHeld) AS max_write_held_ms, MAX(totalReadHeld) AS max_read_held_ms FROM tableUse JOIN target USING (processKey) ), m AS ( SELECT t.pid, t.startTime, t.endTime, t.user, t.workspace, t.ip, t.cmd, t.args, CAST(t.completedLapse AS INTEGER) AS lapse_s, CAST(t.computedLapse AS INTEGER) AS computed_s, COALESCE(t.paused, 0) AS paused_s, t.uCpu, t.sCpu, (COALESCE(t.uCpu, 0) + COALESCE(t.sCpu, 0)) AS cpu_us, CAST(t.completedLapse * 1000 AS INTEGER) AS lapse_ms, COALESCE(t.diskIn, 0) + COALESCE(t.diskOut, 0) AS proc_disk_io, COALESCE(t.ipcIn, 0) + COALESCE(t.ipcOut, 0) AS proc_net_io, COALESCE(t.rpcSnd, 0) + COALESCE(t.rpcRcv, 0) AS rpc_wait_s, t.rpcMsgsIn, t.rpcMsgsOut, ROUND(COALESCE(t.rpcSizeIn, 0) / 1048576.0, 1) AS rpc_in_mb, ROUND(COALESCE(t.rpcSizeOut, 0) / 1048576.0, 1) AS rpc_out_mb, t.fileTotalsSnd, t.fileTotalsRcv, t.fileTotalsSndMB, t.fileTotalsRcvMB, t.netSyncFilesAdded + t.netSyncFilesUpdated + t.netSyncFilesDeleted AS net_sync_files, t.memMB, t.memPeakMB, t.running, t.error, l.* FROM target t, lock_agg l ) SELECT pid, startTime, endTime, user, ip, cmd, substr(args, 1, 80) AS args_preview, lapse_s, computed_s, paused_s, ROUND(cpu_us / 1000.0, 0) AS cpu_ms, ROUND(100.0 * cpu_us / NULLIF(lapse_ms * 1000, 0), 1) AS pct_wall_cpu, proc_disk_io, proc_net_io, rpc_wait_s, ROUND(100.0 * rpc_wait_s / NULLIF(lapse_s, 0), 1) AS pct_wall_rpc, rpc_in_mb, rpc_out_mb, tables_touched, db_pages_io, scan_rows, write_rows, lock_wait_ms, lock_held_ms, peek_wait_ms, lock_ops_ms, max_write_wait_ms, max_read_wait_ms, max_write_held_ms, max_read_held_ms, CASE WHEN error IS NOT NULL AND error != '' AND error != '0' THEN 'ERROR: ' || error WHEN lock_wait_ms >= 5000 AND lock_wait_ms >= COALESCE(lock_held_ms, 0) AND lock_wait_ms > cpu_us / 1000 THEN 'LOCK-BOUND (victim — waited on DB locks)' WHEN lock_held_ms >= 5000 AND lock_held_ms > lock_wait_ms AND lock_held_ms > cpu_us / 1000 THEN 'LOCK-BOUND (culprit — held DB locks)' WHEN peek_wait_ms >= 1000 THEN 'LOCK-BOUND (peek contention)' WHEN rpc_wait_s >= 1.0 AND rpc_wait_s >= lapse_s * 0.25 THEN 'CLIENT/NETWORK (RPC snd/rcv dominates wall time)' WHEN cpu_us >= lapse_ms * 500 OR computed_s >= lapse_s * 0.5 THEN 'CPU-BOUND' WHEN db_pages_io >= 5000 OR proc_disk_io >= 100000 THEN 'IO-BOUND (DB pages or process disk I/O)' WHEN cmd LIKE '%sync%' OR cmd LIKE '%transmit%' OR net_sync_files > 0 THEN 'NETWORK/SYNC workload' ELSE 'MIXED / see per-table breakdown below' END AS bottleneck_hint FROM m; """, 'probe_pid_tables.sql': """\ -- Per-table breakdown for one PID + startTime. Same @pid / @start as probe_pid.sql. .mode column .headers on .print '' .print '=== PID probe (per-table) ===' .print '' SELECT tu.tableName, tu.pagesIn + tu.pagesOut AS pages_io, tu.scanRows, tu.getRows + tu.posRows AS keyed_rows, tu.putRows + tu.delRows AS write_rows, tu.readLocks + tu.writeLocks AS lock_ops_ms, tu.totalReadWait AS read_wait_ms, tu.totalWriteWait AS write_wait_ms, tu.totalReadHeld AS read_held_ms, tu.totalWriteHeld AS write_held_ms, tu.peekCount, tu.totalPeekWait AS peek_wait_ms, tu.triggerLapse FROM tableUse tu JOIN process p USING (processKey) WHERE p.pid = @pid AND p.startTime = @start ORDER BY (tu.totalReadWait + tu.totalWriteWait + tu.totalReadHeld + tu.totalWriteHeld) DESC, (tu.pagesIn + tu.pagesOut) DESC; """, 'probe_pid_min.sql': """\ -- log2sql: probe one command by PID + startTime -- Set parameters, then .read this file (sqlite3 3.31+): -- .parameter init -- .parameter set @pid 97736 -- .parameter set @start '2025/07/10 12:00:01' WITH p AS ( SELECT * FROM process WHERE pid = @pid AND startTime = @start ), t AS ( SELECT SUM(pagesIn + pagesOut) AS db_io, SUM(scanRows) AS scans, SUM(totalReadWait + totalWriteWait) AS lock_wait_ms, SUM(totalReadHeld + totalWriteHeld) AS lock_held_ms FROM tableUse JOIN p USING (processKey) ) SELECT p.pid, p.startTime, p.endTime, p.user, p.cmd, CAST(p.completedLapse AS INT) AS lapse_s, p.uCpu + COALESCE(p.sCpu, 0) AS cpu_us, COALESCE(p.diskIn, 0) + COALESCE(p.diskOut, 0) AS disk_io, COALESCE(p.rpcSnd, 0) + COALESCE(p.rpcRcv, 0) AS rpc_s, t.db_io, t.scans, t.lock_wait_ms, t.lock_held_ms, CASE WHEN t.lock_wait_ms > t.lock_held_ms AND t.lock_wait_ms > 5000 THEN 'lock-wait' WHEN t.lock_held_ms > 5000 THEN 'lock-hold' WHEN p.uCpu + COALESCE(p.sCpu, 0) > CAST(p.completedLapse * 1e6 AS INT) / 2 THEN 'cpu' WHEN COALESCE(p.rpcSnd, 0) + COALESCE(p.rpcRcv, 0) > p.completedLapse / 4 THEN 'rpc' WHEN t.db_io > 5000 OR COALESCE(p.diskIn, 0) + COALESCE(p.diskOut, 0) > 100000 THEN 'io' ELSE 'mixed' END AS hint FROM p, t; """, } def p4diag_sql_queries_dir() -> str: """Directory of ``.sql`` query files for CLI ``query`` and SQL Lab. Override order: ``P4DIAG_SQL_QUERIES``, then legacy ``P4SLA_SQL_DIR``. Default: ``/sql_queries``. """ for env in ("P4DIAG_SQL_QUERIES", "P4SLA_SQL_DIR"): override = (os.environ.get(env) or "").strip() if override: return os.path.abspath(override) return os.path.join(_P4DIAG_INSTALL_DIR, "sql_queries") def p4diag_summary_queries_dir() -> str: """Directory of summary-report ``.sql`` files (one subdir per section). Override: ``P4DIAG_SUMMARY_QUERIES``. Default: ``/summary_queries``. """ override = (os.environ.get("P4DIAG_SUMMARY_QUERIES") or "").strip() if override: return os.path.abspath(override) return os.path.join(_P4DIAG_INSTALL_DIR, "summary_queries") def _summary_section_display_name(dirname: str) -> str: """Strip optional ``NN-`` sort prefix from a section directory name.""" m = re.match(r"^\d+-(.+)$", dirname) return m.group(1) if m else dirname def _summary_query_display_name(filename_stem: str) -> str: """Strip optional ``NN-`` sort prefix from a summary ``.sql`` basename (label in reports).""" return _summary_section_display_name(filename_stem) SummarySectionQueries = List[Tuple[str, str]] def strip_sql_documentation_header(raw: str) -> str: """Strip leading ``/* ... */`` or consecutive ``--`` comment lines from a ``.sql`` file.""" if not raw: return raw text = raw.lstrip("\ufeff") m = re.match(r"^\s*/\*([\s\S]*?)\*/", text) if m: return text[m.end() :].lstrip("\n\r\t ") lines = text.splitlines() i = 0 desc_lines_seen = False while i < len(lines): s = lines[i].strip() if s == "": if desc_lines_seen: i += 1 continue i += 1 continue if s.startswith("--"): desc_lines_seen = True i += 1 continue break while i < len(lines) and lines[i].strip() == "": i += 1 return "\n".join(lines[i:]) def sql_file_pager_from_header(raw: str) -> Optional[List[str]]: """If the SQL file header contains ``-- p4diag: pager``, return pager argv (default ``less -S``).""" if not raw: return None text = raw.lstrip("\ufeff") lines = text.splitlines() in_block_comment = False for line in lines: sline = line.strip() if not in_block_comment and sline.startswith("/*"): in_block_comment = "*/" not in sline continue if in_block_comment: if "*/" in sline: in_block_comment = False continue if sline == "": continue if not sline.startswith("--"): break m = re.match(r"--\s*p4diag:\s*pager(?:\s*=\s*(.+))?\s*$", sline, re.IGNORECASE) if m: extra = (m.group(1) or "").strip() return shlex.split(extra) if extra else ["less"] return None def normalize_sql_basename(sql_token: str) -> str: """Return ``NAME.sql`` for a library token or path.""" base = os.path.basename(sql_token.strip()) if not base.lower().endswith(".sql"): base = base + ".sql" return base def resolve_sql_query_file(sql_token: str) -> Optional[str]: """Resolve library basename, path, or absolute ``.sql`` to an existing file (or None).""" raw = sql_token.strip() if os.path.isabs(raw): path = os.path.abspath(raw) return path if os.path.isfile(path) else None base = normalize_sql_basename(raw) lib = os.path.join(p4diag_sql_queries_dir(), base) if os.path.isfile(lib): return lib if os.path.isfile(raw): return os.path.abspath(raw) stem, _ = os.path.splitext(raw) cwd_rel = stem + ".sql" if os.path.isfile(cwd_rel): return os.path.abspath(cwd_rel) return None def resolve_sql_query(sql_token: str) -> Tuple[Optional[str], Optional[str]]: """Resolve a library query to ``(disk_path, builtin_body)``; exactly one is set when found.""" path = resolve_sql_query_file(sql_token) if path: return path, None base = normalize_sql_basename(sql_token) body = _BUILTIN_SQL_LIBRARY.get(base) if body is None: body = _BUILTIN_PID_PROBE_SQL.get(base) if body is not None: return None, body return None, None def list_sql_library_names() -> List[str]: """Sorted ``*.sql`` basenames from disk library plus menu-visible built-in queries.""" names = set(_BUILTIN_SQL_LIBRARY.keys()) names.update(list_sql_library_files(p4diag_sql_queries_dir())) names.difference_update(_BUILTIN_PID_PROBE_SQL.keys()) return sorted(names) def sql_library_summary(basename: str) -> str: """Short description of what a library ``.sql`` report is useful for.""" key = normalize_sql_basename(basename) return _SQL_LIBRARY_SUMMARIES.get( key, "custom report — open the .sql file for details", ) def _effective_sql_pager(pager: Optional[List[str]]) -> Optional[List[str]]: if not pager: return None if not sys.stdout.isatty(): return None if shutil.which(pager[0]) is None: return None return pager def _sqlite3_rc_ok_after_pager(pager_rc: int, sql_rc: Optional[int]) -> int: """Normalize exit code when sqlite3 is piped to a pager. Closing the pager with ``q`` breaks the pipe; sqlite3 then exits with SIGPIPE even though the query succeeded. """ if pager_rc != 0: return pager_rc if sql_rc in (0, None): return 0 sigpipe = getattr(signal, "SIGPIPE", 13) if sql_rc in (-sigpipe, 128 + sigpipe): return 0 return sql_rc def run_sqlite3_with_optional_pager(argv: List[str], pager: Optional[List[str]]) -> int: """Run sqlite3; pipe stdout through a pager when requested and stdout is a TTY.""" use_pager = _effective_sql_pager(pager) if not use_pager: return s.call(argv) proc_sql = s.Popen(argv, stdout=s.PIPE) proc_page = s.Popen(use_pager, stdin=proc_sql.stdout) proc_sql.stdout.close() rc_page = proc_page.wait() rc_sql = proc_sql.wait() return _sqlite3_rc_ok_after_pager(rc_page, rc_sql) def sql_file_requires_pid_start(file_body: str) -> bool: """True when a script expects sqlite ``@pid`` and ``@start`` bind parameters.""" return "@pid" in file_body and "@start" in file_body def sql_file_uses_table_parameter(file_body: str) -> bool: """True when a script binds sqlite ``@table`` (optional unless ``-- p4diag: require-table``).""" return "@table" in file_body def sql_file_requires_table(file_body: str) -> bool: """True when ``@table`` must be supplied (``-- p4diag: require-table`` in header).""" return bool( re.search(r"^\s*--\s*p4diag:\s*require-table\s*$", file_body, re.MULTILINE | re.IGNORECASE) ) def sql_file_requires_duration(file_body: str) -> bool: """True when a script expects a sqlite ``@duration`` bind parameter.""" return "@duration" in file_body def table_parameter_bind_value(table: Optional[str]) -> str: """Value for ``@table`` bind: ``%`` matches all tables when ``table`` is omitted.""" if not table: return "%" return normalize_table_parameter(table) def normalize_table_parameter(table: str) -> str: """Normalize a user-supplied table name for ``@table`` (strip ``db.`` prefix).""" t = (table or "").strip() if t.startswith("db."): t = t[3:] if "." in t: t = t.split(".")[-1] return t def validate_table_parameter(table: str) -> Optional[str]: """Return an error message when ``table`` is not a safe ``@table`` bind value.""" if not table: return "table name is required" if not re.fullmatch(r"[A-Za-z0-9_]+", table): return f"invalid table name {table!r} (use letters, digits, underscore only)" return None def validate_duration_parameter(duration: str) -> Optional[str]: """Return an error message when ``duration`` is not a safe ``@duration`` bind value.""" if not duration: return "duration (milliseconds) is required" if not re.fullmatch(r"[0-9]+", duration): return f"invalid duration {duration!r} (use a positive integer, milliseconds)" if int(duration) <= 0: return "duration must be greater than 0" return None def sqlite3_pid_start_parameter_argv(pid: str, start: str) -> List[str]: """sqlite3 ``-cmd`` arguments to bind ``@pid`` and ``@start`` (sqlite 3.31+).""" return [ "-cmd", ".parameter init", "-cmd", f".parameter set @pid {pid}", "-cmd", f".parameter set @start '{start}'", ] def sqlite3_table_parameter_argv(table: str) -> List[str]: """sqlite3 ``-cmd`` arguments to bind ``@table`` (sqlite 3.31+).""" return [ "-cmd", ".parameter init", "-cmd", f".parameter set @table {table}", ] def sqlite3_duration_parameter_argv(duration: str) -> List[str]: """sqlite3 ``-cmd`` arguments to bind ``@duration`` (sqlite 3.31+).""" return [ "-cmd", ".parameter init", "-cmd", f".parameter set @duration {duration}", ] def sqlite3_cli_argv_for_read_script(file_body: str) -> List[str]: """sqlite3 argv for ``.read`` scripts; omit ``-column`` when the script sets ``.mode``.""" if re.search(r"^\s*\.mode\b", file_body, re.MULTILINE | re.IGNORECASE): return ["sqlite3"] return ["sqlite3", "-header", "-column"] def prepare_sql_read_script(raw: str) -> str: """Insert ``.width 0`` before each query in a multi-statement ``.read`` script. sqlite3 ``-column`` mode reuses column widths from the previous result set; a short overview row (e.g. ``total``, ``withMem``) otherwise truncates ``user``/``command`` in later SELECTs. Width 0 resets auto-sizing for the next query. Inserts only at statement boundaries — not before a final ``SELECT`` that belongs to an open ``WITH`` block (which would split the statement and cause a syntax error). """ width_reset = ".width " + " ".join("0" for _ in range(32)) out: List[str] = [] prev_nonempty = "" for line in raw.splitlines(): if re.match(r"^WITH\b", line, re.IGNORECASE): out.append(width_reset) elif re.match(r"^SELECT\b", line, re.IGNORECASE): if ( not prev_nonempty or prev_nonempty.endswith(";") or prev_nonempty.startswith(".") ): out.append(width_reset) out.append(line) stripped = line.strip() if stripped and not stripped.startswith("--"): prev_nonempty = stripped text = "\n".join(out) if raw.endswith("\n"): text += "\n" return text def _parse_dot_print_line(line: str) -> str: """Return text from a sqlite ``.print '...'`` / ``.print "..."`` line.""" m = re.match(r"\.print\s*(.*)$", line.strip(), re.IGNORECASE) if not m: return "" rest = m.group(1).strip() if not rest: return "" if len(rest) >= 2 and rest[0] == rest[-1] and rest[0] in ("'", '"'): return rest[1:-1] return rest def iter_sql_read_segments(raw: str) -> Iterator[Tuple[str, str]]: """Yield ``('print', text)`` and ``('query', sql)`` from a sqlite ``.read`` script.""" query_lines: List[str] = [] def flush_query() -> Iterator[Tuple[str, str]]: nonlocal query_lines if not query_lines: return sql = "\n".join(query_lines).strip() query_lines = [] if sql: yield ("query", sql) for line in raw.splitlines(): stripped = line.strip() if not query_lines: if not stripped or stripped.startswith("--"): continue if stripped.startswith("#"): continue if re.match(r"\.print\b", stripped, re.IGNORECASE): yield from flush_query() yield ("print", _parse_dot_print_line(line)) continue if re.match(r"^\s*(WITH|SELECT)\b", line, re.IGNORECASE) or query_lines: query_lines.append(line) if stripped.endswith(";"): yield from flush_query() continue yield from flush_query() def load_summary_sections_from_disk(root: str) -> Optional[Dict[str, SummarySectionQueries]]: """Load ``section -> [(label, sql), ...]`` from ``/
/*.sql``. Returns ``None`` when ``root`` is missing or contains no usable queries. """ if not os.path.isdir(root): return None sections: Dict[str, SummarySectionQueries] = {} for entry in sorted(os.scandir(root), key=lambda e: e.name): if not entry.is_dir() or entry.name.startswith("."): continue section_name = _summary_section_display_name(entry.name) queries: SummarySectionQueries = [] for sql_path in sorted(glob.glob(os.path.join(entry.path, "*.sql"))): label = _summary_query_display_name( os.path.splitext(os.path.basename(sql_path))[0] ) try: with open(sql_path, encoding="utf-8") as fh: raw = fh.read() except OSError: continue sql = strip_sql_documentation_header(raw).strip() if not sql: continue if not sql.rstrip().endswith(";"): sql += ";" queries.append((label, sql)) if queries: sections[section_name] = queries return sections if sections else None _BUILTIN_SUMMARY_SECTIONS: Dict[str, SummarySectionQueries] = { 'Time Range': [ ('Log Range', "SELECT MIN(starttime) AS Start, MAX(starttime) AS End FROM process WHERE cmd NOT LIKE '%pull%' AND args LIKE '-i%';"), ('Peek Running', 'SELECT startTime, running FROM process ORDER BY running DESC LIMIT 1;'), ], 'Lock Contention': [ ('Top 25 write waiters over 10 seconds - blocked by other read or write locks (victims)', 'SELECT pid, user, cmd, CAST(completedLapse AS INTEGER) AS "lapse(s)", totalWriteWait AS "wait(ms)", tableName AS "table", startTime, endTime FROM tableUse JOIN process USING (processKey) WHERE totalWriteWait > 10000 AND completedLapse > 0 AND tableName NOT IN (\'clients\',\'clientEntity\',\'change\',\'storageup_R\',\'storagemasterup_R\',\'pull\') ORDER BY "wait(ms)" DESC LIMIT 25;'), ('Top 25 read waiters over 10 seconds - blocked by other write locks (victims)', 'SELECT pid, user, cmd, CAST(completedLapse AS INTEGER) AS "lapse(s)", totalReadWait AS "wait(ms)", tableName AS "table", startTime, endTime FROM tableUse JOIN process USING (processKey) WHERE totalReadWait > 10000 AND tableName NOT IN (\'clients\',\'clientEntity\',\'change\',\'storageup_R\',\'storagemasterup_R\',\'pull\') ORDER BY totalReadWait DESC LIMIT 25;'), ('Top 25 read holders over 10 seconds - blocking writers (culprits)', 'SELECT pid, user, cmd, CAST(completedLapse AS INTEGER) AS "lapse(s)", totalReadHeld AS "held(ms)", tableName AS "table", startTime, endTime FROM tableUse JOIN process USING (processKey) WHERE totalReadHeld > 10000 AND tableName NOT IN (\'clients\',\'clientEntity\',\'change\',\'storageup_R\',\'storagemasterup_R\',\'pull\') ORDER BY totalReadHeld DESC LIMIT 25;'), ('Top 25 write holders over 10 seconds - blocking readers and writers (culprits)', 'SELECT pid, user, cmd, CAST(completedLapse AS INTEGER) AS "lapse(s)", totalWriteHeld AS "held(ms)", tableName AS "table", startTime, endTime FROM tableUse JOIN process USING (processKey) WHERE totalWriteHeld > 10000 AND completedLapse > 0 AND tableName NOT IN (\'clients\',\'clientEntity\',\'change\',\'storageup_R\',\'storagemasterup_R\',\'pull\') ORDER BY totalWriteHeld DESC LIMIT 25;'), ('Average Locks Summary with total locks > 10 seconds. Does one table have high average or total wait (victims) or held (culprits)', 'SELECT * FROM ( SELECT tableName AS "table", COUNT(readLocks) AS Number, CAST(AVG(readLocks) AS INTEGER) AS "avg-read(ms)", CAST(AVG(writeLocks) AS INTEGER) AS "avg-write(ms)", CAST(AVG(totalReadWait) AS INTEGER) AS "avg-total-rw(ms)", CAST(AVG(totalReadHeld) AS INTEGER) AS "avg-total-rh(ms)", CAST(AVG(totalWriteWait) AS INTEGER) AS "avg-total-ww(ms)", CAST(AVG(totalWriteHeld) AS INTEGER) AS "avg-total-wh(ms)", CAST(SUM(totalReadWait)+SUM(totalWriteWait) AS INTEGER) AS "total-wait(ms)", CAST(SUM(totalReadHeld)+SUM(totalWriteHeld) AS INTEGER) AS "total-held(ms)" FROM tableUse GROUP BY tableUse.tableName ) WHERE "total-wait(ms)" > 10000 AND "total-held(ms)" > 10000 ORDER BY "total-wait(ms)" DESC;'), ('Worst Lock Offenders - (maxreadHeld + maxwriteHeld)', 'SELECT user, SUM(maxreadHeld + maxwriteHeld) AS "held(ms)" FROM tableUse JOIN process USING (processKey) GROUP BY user ORDER BY "held(ms)" DESC LIMIT 10;'), ], 'CPU Usage': [], 'Memory Usage': [], 'Command Activity': [], } # Summary sections rendered from the same multi-part scripts as the LOG2SQL library. _SUMMARY_CANNED_QUERY_SECTIONS: Dict[str, str] = { "Command Activity": "command_summary.sql", "CPU Usage": "cpu_summary.sql", "Memory Usage": "memory_summary.sql", } # Omitted from built-in and disk-backed summary reports. _SUMMARY_SECTIONS_OMITTED = frozenset({"Other Resource Metrics"}) def _summary_queries_search_roots() -> List[str]: """Directories to scan for summary ``.sql`` libraries (first match wins).""" roots: List[str] = [] override = (os.environ.get("P4DIAG_SUMMARY_QUERIES") or "").strip() if override: roots.append(os.path.abspath(override)) install_root = os.path.join(_P4DIAG_INSTALL_DIR, "summary_queries") if install_root not in roots: roots.append(install_root) cwd_root = os.path.join(os.getcwd(), "summary_queries") if cwd_root not in roots: roots.append(cwd_root) return roots def get_summary_sections() -> Dict[str, SummarySectionQueries]: """Summary SQL sections from disk when present, else built-in queries.""" for root in _summary_queries_search_roots(): custom = load_summary_sections_from_disk(root) if custom is not None: return { name: queries for name, queries in custom.items() if name not in _SUMMARY_SECTIONS_OMITTED } return { name: queries for name, queries in _BUILTIN_SUMMARY_SECTIONS.items() if name not in _SUMMARY_SECTIONS_OMITTED } def resolve_sql_query_text(sql_token: str) -> Optional[str]: """Return the full ``.sql`` script body for a library query (disk or built-in).""" _path, body = resolve_sql_query(sql_token) if body is not None: return body if _path and os.path.isfile(_path): try: with open(_path, encoding="utf-8") as fh: return fh.read() except OSError: return None return None RecommendationRule = Dict[str, Any] RecommendationMatch = Dict[str, Any] def p4diag_recommendations_dir() -> str: """Directory of ``*.json`` recommendation rule files. Override: ``P4DIAG_RECOMMENDATIONS``. Default: ``/recommendations``. """ override = (os.environ.get("P4DIAG_RECOMMENDATIONS") or "").strip() if override: return os.path.abspath(override) return os.path.join(_P4DIAG_INSTALL_DIR, "recommendations") def _recommendations_search_roots() -> List[str]: """Directories to scan for recommendation ``.json`` rules (first match wins).""" roots: List[str] = [] override = (os.environ.get("P4DIAG_RECOMMENDATIONS") or "").strip() if override: roots.append(os.path.abspath(override)) install_root = os.path.join(_P4DIAG_INSTALL_DIR, "recommendations") if install_root not in roots: roots.append(install_root) cwd_root = os.path.join(os.getcwd(), "recommendations") if cwd_root not in roots: roots.append(cwd_root) return roots def _normalize_mitigation_items(raw: Dict[str, Any]) -> List[str]: """Return bullet strings from ``mitigation_items`` list or ``mitigation`` text.""" raw_items = raw.get("mitigation_items") if isinstance(raw_items, list): items = [str(item).strip() for item in raw_items if str(item).strip()] if items: return items mitigation = (raw.get("mitigation") or "").strip() if mitigation: return [line.strip() for line in mitigation.splitlines() if line.strip()] return [] def _normalize_recommendation_rule(raw: Dict[str, Any], source: str) -> Optional[RecommendationRule]: """Validate and normalize one recommendation rule dict.""" rule_id = (raw.get("id") or "").strip() title = (raw.get("title") or "").strip() checks = raw.get("checks") mitigation_items = _normalize_mitigation_items(raw) if not rule_id or not title or not isinstance(checks, list) or not checks or not mitigation_items: return None normalized_checks: List[Dict[str, Any]] = [] for check in checks: if not isinstance(check, dict): return None sql = strip_sql_documentation_header((check.get("sql") or "").strip()).strip() if not sql: return None if not sql.rstrip().endswith(";"): sql += ";" label = (check.get("label") or "check").strip() min_count = check.get("min_count", 1) try: min_count_int = max(1, int(min_count)) except (TypeError, ValueError): min_count_int = 1 normalized_checks.append( {"label": label, "sql": sql, "min_count": min_count_int} ) normalized_stats_checks: List[Dict[str, Any]] = [] for check in raw.get("stats_checks") or []: if not isinstance(check, dict): return None key = (check.get("key") or "").strip() label = (check.get("label") or key or "stats check").strip() if not key: return None try: min_count_int = max(1, int(check.get("min_count", 1))) except (TypeError, ValueError): min_count_int = 1 normalized_stats_checks.append( {"key": key, "label": label, "min_count": min_count_int} ) normalized_stats_evidence: List[Dict[str, str]] = [] for item in raw.get("stats_evidence") or []: if not isinstance(item, dict): return None key = (item.get("key") or "").strip() label = (item.get("label") or key or "stats evidence").strip() if not key: return None normalized_stats_evidence.append({"key": key, "label": label}) confidence = (raw.get("confidence") or "likely").strip().lower() if confidence not in ("likely", "possible"): confidence = "likely" try: priority = int(raw.get("priority", 0)) except (TypeError, ValueError): priority = 0 return { "id": rule_id, "title": title, "priority": priority, "confidence": confidence, "checks": normalized_checks, "stats_checks": normalized_stats_checks, "stats_evidence": normalized_stats_evidence, "mitigation_items": mitigation_items, "_source": source, } def load_recommendations_from_disk(root: str) -> Optional[List[RecommendationRule]]: """Load recommendation rules from ``/*.json``. Returns ``None`` if none found.""" if not os.path.isdir(root): return None rules: List[RecommendationRule] = [] for json_path in sorted(glob.glob(os.path.join(root, "*.json"))): try: with open(json_path, encoding="utf-8") as fh: raw = json.load(fh) except (OSError, json.JSONDecodeError, TypeError): continue if not isinstance(raw, dict): continue rule = _normalize_recommendation_rule(raw, source=json_path) if rule is not None: rules.append(rule) return rules if rules else None _BUILTIN_RECOMMENDATIONS: List[RecommendationRule] = [ { "id": "revhx-revdx-read-starves-writers", "title": "Read locks on db.revhx/db.revdx may starve writers", "priority": 10, "confidence": "likely", "checks": [ { "label": "read holders on revhx/revdx from fstat/sync/files (>10s)", "sql": ( "SELECT COUNT(*) FROM tableUse tu " "JOIN process p ON tu.processKey = p.processKey " "WHERE tu.tableName IN ('revhx', 'revdx') " "AND p.cmd IN ('user-fstat', 'user-sync', 'user-files') " "AND tu.totalReadHeld > 10000" ), "min_count": 3, }, { "label": "write waiters on revhx/revdx (>10s)", "sql": ( "SELECT COUNT(*) FROM tableUse tu " "JOIN process p ON tu.processKey = p.processKey " "WHERE tu.tableName IN ('revhx', 'revdx') " "AND tu.totalWriteWait > 10000" ), "min_count": 1, }, ], "stats_evidence": [ { "key": "blocking_mode_commands", "label": ( "blocking-mode lock acquisitions in log " "(locks acquired after non-blocking attempts)" ), }, ], "mitigation_items": [ ( "At db.peeking=2, p4 fstat, p4 sync, and p4 files can hold read locks " "on db.revhx and db.revdx, blocking writers (p4 submit, p4 populate, " "and commit/edge paths such as dm-CommitSubmit and rmt-SubmitShelf)." ), ( "Starved writers often retry and may switch to blocking-mode locking " "(log: \"locks acquired by blocking after N non-blocking attempts\"), " "holding other table locks while waiting and amplifying contention." ), ( "Use the summary Lock Contention and Victim Culprit Report to " "identify which commands and users are read-locking db.revhx/db.revdx; " "reduce frequency if they are spamming the server (automation, IDE " "refresh, repeated fstat/files/sync, and similar)." ), ( "Consider db.peeking=3 (no server restart; new connections pick up " "the setting). fstat/sync/files then use peek locks on db.rev instead." ), "Review P4V.Performance.ServerRefresh to reduce P4V-driven fstat volume.", "See KB: Lockless Reads.", ], }, { "id": "istat-rev-starves-writers", "title": "p4 istat read locks on db.rev* may starve writers", "priority": 20, "confidence": "likely", "checks": [ { "label": "istat commands holding read locks on rev* tables (>10s)", "sql": ( "SELECT COUNT(*) FROM tableUse tu " "JOIN process p ON tu.processKey = p.processKey " "WHERE p.cmd = 'user-istat' " "AND tu.tableName LIKE 'rev%' " "AND tu.totalReadHeld > 10000" ), "min_count": 3, }, { "label": "istat commands using >10s user CPU", "sql": ( "SELECT COUNT(*) FROM process " "WHERE cmd = 'user-istat' AND uCpu > 10000" ), "min_count": 1, }, ], "mitigation_items": [ ( "After large stream updates, p4 istat can hold extended read locks on " "db.rev* tables and use high CPU while the stream cache refreshes. P4V " "stream graph views drive istat traffic." ), ( "Upgrade to a P4 Server build with lockless istat reads when possible " "(2025.2+, or supported patches on 2025.1/2024.2/2024.1)." ), ( "Update P4V to the latest release; consider " "P4V.Performance.AllowFullIstats to limit full-stream istat in the " "stream graph." ), ], }, ] def get_recommendation_rules() -> List[RecommendationRule]: """Recommendation rules from disk when present, else built-in rules.""" for root in _recommendations_search_roots(): custom = load_recommendations_from_disk(root) if custom is not None: return custom normalized: List[RecommendationRule] = [] for raw in _BUILTIN_RECOMMENDATIONS: rule = _normalize_recommendation_rule(raw, source="builtin") if rule is not None: normalized.append(rule) return normalized def _recommendations_rules_signature() -> str: """Hash of active recommendation rules for summary cache invalidation.""" source_tag = "builtin" mtime_part = "" for root in _recommendations_search_roots(): if not os.path.isdir(root): continue json_files = sorted(glob.glob(os.path.join(root, "*.json"))) if json_files: source_tag = os.path.abspath(root) mtime_part = str(max(os.path.getmtime(p) for p in json_files)) break rules = get_recommendation_rules() payload = json.dumps( [ { "id": r["id"], "priority": r["priority"], "confidence": r["confidence"], "checks": r["checks"], "stats_checks": r.get("stats_checks", []), "stats_evidence": r.get("stats_evidence", []), "mitigation_items": r.get("mitigation_items", []), "title": r["title"], } for r in rules ], sort_keys=True, ) digest = hashlib.sha256(f"{source_tag}|{mtime_part}|{payload}".encode()).hexdigest() return digest[:16] def _recommendation_check_count(cursor: sqlite3.Cursor, sql: str) -> Optional[int]: """Run a single-row COUNT query; return the integer count or ``None`` on error.""" try: cursor.execute(sql) row = cursor.fetchone() except sqlite3.Error: return None if row is None: return 0 val = row[0] if val is None: return 0 try: return int(val) except (TypeError, ValueError): return None _LOG_STATS_METRIC_PATTERNS: Dict[str, re.Pattern] = { "blocking_mode_commands": re.compile( r"^\s*blocking mode commands:\s*(\d+)\s*$", re.MULTILINE ), "write_waiters_over_10s": re.compile( r"^\s*write waiters over 10 seconds:\s*(\d+)\s*$", re.MULTILINE ), } def parse_log_stats_metrics(stats_text: str) -> Dict[str, int]: """Parse numeric counters from ``p4diag stats`` / summary LOG STATISTICS text.""" metrics: Dict[str, int] = {} if not stats_text: return metrics for key, pattern in _LOG_STATS_METRIC_PATTERNS.items(): match = pattern.search(stats_text) if not match: continue try: metrics[key] = int(match.group(1)) except ValueError: continue return metrics def evaluate_recommendations( db_path: str, rules: Optional[List[RecommendationRule]] = None, stats_text: str = "", ) -> List[RecommendationMatch]: """Return recommendation rules whose checks all pass against ``db_path``.""" if rules is None: rules = get_recommendation_rules() if not rules or not os.path.isfile(db_path): return [] stats_metrics = parse_log_stats_metrics(stats_text) matches: List[RecommendationMatch] = [] conn = sqlite3.connect(db_path) cursor = conn.cursor() try: for rule in rules: evidence: List[Tuple[str, int]] = [] failed = False for check in rule["checks"]: count = _recommendation_check_count(cursor, check["sql"]) if count is None or count < check["min_count"]: failed = True break evidence.append((check["label"], count)) if failed: continue for check in rule.get("stats_checks") or []: count = stats_metrics.get(check["key"], 0) if count < check["min_count"]: failed = True break evidence.append((check["label"], count)) if failed: continue for item in rule.get("stats_evidence") or []: count = stats_metrics.get(item["key"], 0) if count > 0: evidence.append((item["label"], count)) matches.append( { "rule": rule, "evidence": evidence, } ) finally: conn.close() matches.sort( key=lambda m: (-int(m["rule"].get("priority", 0)), m["rule"]["title"]) ) return matches def format_recommendations_plain_text(matches: List[RecommendationMatch]) -> str: """Plain-text RECOMMENDATIONS section body.""" if not matches: return "(no known performance patterns detected in this trace)\n" lines: List[str] = [] for match in matches: rule = match["rule"] conf = rule.get("confidence", "likely").upper() lines.append(f"[{conf}] {rule['title']}") ev_parts = [ f"{count} {label}" for label, count in match["evidence"] ] lines.append(f" Evidence: {'; '.join(ev_parts)}.") lines.append(" Mitigation:") for item in rule.get("mitigation_items") or []: lines.append(f" - {item}") lines.append("") return "\n".join(lines).rstrip() + "\n" _MITIGATION_HTML_LINKS: Tuple[Tuple[str, str], ...] = ( ("Lock Contention", "summary-lock-contention"), ("Victim Culprit Report", "victim-culprit-report"), ) def _mitigation_item_to_html(text: str) -> str: """Escape mitigation text and link phrases to summary report anchors.""" out = html.escape(text) for phrase, anchor in _MITIGATION_HTML_LINKS: esc_phrase = html.escape(phrase) link = f'{esc_phrase}' out = out.replace(esc_phrase, link) return out def format_recommendations_html(matches: List[RecommendationMatch]) -> str: """HTML fragment for the recommendations section.""" if not matches: return '

No known performance patterns detected in this trace.

\n' parts = ['
\n'] for match in matches: rule = match["rule"] conf = html.escape(str(rule.get("confidence", "likely")).upper()) title = html.escape(rule["title"]) parts.append(f'
\n') parts.append(f'

{conf} {title}

\n') parts.append("
    \n") for label, count in match["evidence"]: parts.append( f"
  • {html.escape(str(count))} {html.escape(label)}
  • \n" ) parts.append("
\n

Mitigation:

\n") parts.append("
    \n") for item in rule.get("mitigation_items") or []: parts.append(f"
  • {_mitigation_item_to_html(item)}
  • \n") parts.append("
\n
\n") parts.append("
\n") return "".join(parts) def log2sql_executable() -> str: """``log2sql`` binary (override: ``LOG2SQL_BIN`` or ``LOG2SQL``; default: ``log2sql`` on PATH).""" return ( os.environ.get("LOG2SQL_BIN") or os.environ.get("LOG2SQL") or "log2sql" ) def _python_sqlite3_available() -> bool: """True when this interpreter was built with the ``_sqlite3`` extension.""" try: import importlib importlib.import_module("_sqlite3") return True except ImportError: return False def _require_python_sqlite3() -> None: if not _python_sqlite3_available(): print(_PYTHON_SQLITE3_MISSING_MSG, file=sys.stderr) sys.exit(2) def _p4diag_interactive_setup() -> None: """Configure readline and SIGINT once (CLI/menu only; safe for ``import p4diag``).""" if getattr(_p4diag_interactive_setup, "_done", False): return if readline is not None: readline.set_completer_delims(" \t\n=") readline.parse_and_bind("tab: complete") signal.signal(signal.SIGINT, handler) _p4diag_interactive_setup._done = True # type: ignore[attr-defined] def trim_p4_server_log_file(log_path: str, start_time: str, end_time: str) -> str: """Return path to a trimmed log segment (creates via ``sed`` if missing). Naming matches ``log.db`` / p4sla: ``.-`` with ``/ :`` → ``_`` in timestamps. """ log_path = os.path.abspath(log_path) if not os.path.isfile(log_path): raise FileNotFoundError(f"Log file not found: {log_path}") def _grep_line_number(timestamp: str, *, last: bool) -> Optional[str]: proc = s.run( ["grep", "-a", "-n", timestamp, log_path], capture_output=True, text=True, errors="replace", ) if proc.returncode != 0 or not proc.stdout.strip(): return None lines = [ln for ln in proc.stdout.splitlines() if ln.strip()] if not lines: return None return lines[-1 if last else 0].split(":", 1)[0] def _find_nearest_line_by_probe( timestamp_str: str, direction: str ) -> Tuple[Optional[str], Optional[str]]: def grep_line(ts_minute: str) -> Tuple[Optional[int], Optional[str]]: pattern = f"{ts_minute}:[0-9][0-9]" proc = s.run( ["grep", "-a", "-n", "-E", pattern, log_path], capture_output=True, text=True, errors="replace", ) if proc.returncode != 0 or not proc.stdout.strip(): return None, None parts = proc.stdout.splitlines()[0].split(":", 1) if len(parts) != 2: return None, None return int(parts[0]), parts[1].strip() ts = datetime.strptime(timestamp_str, "%Y/%m/%d %H:%M:%S") for i in range(1, 121): probe = ts + timedelta(minutes=i) if direction == "forward" else ts - timedelta(minutes=i) line_num, line_text = grep_line(probe.strftime("%Y/%m/%d %H:%M")) if line_num: return str(line_num), line_text return None, None rep = {"/": "", ":": "", " ": "_"} st_s, et_s = start_time, end_time for k, v in rep.items(): st_s = st_s.replace(k, v) et_s = et_s.replace(k, v) log_trim = log_path + "." + st_s + "-" + et_s if os.path.exists(log_trim): return log_trim head = _grep_line_number(start_time, last=False) tail = _grep_line_number(end_time, last=True) adj_start, adj_end = start_time, end_time if head is None: head_line, head_text = _find_nearest_line_by_probe(start_time, "forward") if not head_line: raise ValueError( f"Start time {start_time!r} not found in log and no nearby timestamp within ±2 hours." ) head = head_line adj_start = head_text.split()[0] + " " + head_text.split()[1] if tail is None: tail_line, tail_text = _find_nearest_line_by_probe(end_time, "backward") if not tail_line: raise ValueError( f"End time {end_time!r} not found in log and no nearby timestamp within ±2 hours." ) tail = tail_line adj_end = tail_text.split()[0] + " " + tail_text.split()[1] st_s, et_s = adj_start, adj_end for k, v in rep.items(): st_s = st_s.replace(k, v) et_s = et_s.replace(k, v) log_trim = log_path + "." + st_s + "-" + et_s if os.path.exists(log_trim): return log_trim quit_line = str(int(tail) + 1) sed_script = f"{head},{tail}p; {quit_line}q" try: with open(log_trim, "wb") as out_f: s.run( ["sed", "-n", sed_script, log_path], stdout=out_f, check=True, ) except s.CalledProcessError as e: if os.path.exists(log_trim): os.remove(log_trim) raise RuntimeError(f"sed failed while trimming log: {e}") from e return log_trim def _collect_pid_tracking_lines(lines: List[str], pid: str) -> List[str]: """Collect ``---`` tracking lines following a command record in *lines*.""" tracking: List[str] = [] completed_re = re.compile(rf"\tpid {re.escape(str(pid))} completed\b") cmd_re = re.compile(rf"\tpid {re.escape(str(pid))}\b.*'") for ln in lines[1:]: stripped = ln.strip() if not stripped: continue if stripped.startswith("---"): tracking.append(stripped) continue if completed_re.search(ln): if tracking: break tracking.append(stripped) continue if cmd_re.search(ln) and not tracking: continue if ( "Perforce server info:" in ln or "Perforce server error:" in ln ) and tracking: break if tracking and not stripped.startswith("Perforce server"): tracking.append(stripped) return tracking def _collect_pid_tracking_after_completed( lines: List[str], pid: str, start_time: str ) -> Tuple[Optional[str], List[str]]: """Collect tracking after a ``pid N completed`` line (*lines*[0]).""" if not lines: return None, [] header = lines[0].strip() cmd_line: Optional[str] = None tracking: List[str] = [header] if header else [] cmd_re = re.compile(rf"\tpid {re.escape(str(pid))}\b.*'") for ln in lines[1:]: stripped = ln.strip() if not stripped or "Perforce server info:" in ln: if tracking and len(tracking) > 1: break continue if cmd_re.search(ln) and start_time in ln: cmd_line = stripped continue if stripped.startswith("---"): tracking.append(stripped) continue if tracking: tracking.append(stripped) return cmd_line, tracking def _pid_tracking_block_score(block: List[str]) -> int: score = len(block) for ln in block: if ln.startswith("--- lapse"): score += 10000 break return score def _grep_log_line_numbers(pattern: str, log_path: str) -> List[Tuple[int, str]]: proc = s.run( ["grep", "-a", "-n", "-E", pattern, log_path], capture_output=True, text=True, errors="replace", ) hits: List[Tuple[int, str]] = [] if proc.returncode != 0: return hits for ln in proc.stdout.splitlines(): if not ln.strip(): continue num_s, _, text = ln.partition(":") try: hits.append((int(num_s), text)) except ValueError: continue return hits def _read_log_window(log_path: str, line_num: int, count: int = 400) -> List[str]: proc = s.run( ["sed", "-n", f"{line_num},{line_num + count}p", log_path], capture_output=True, text=True, errors="replace", ) return proc.stdout.splitlines() def extract_pid_tracking_from_log( log_path: str, pid: str, start_time: str, *, end_time: Optional[str] = None, ) -> Tuple[Optional[str], List[str]]: """Return ``(command_line, tracking_lines)`` from a P4 server log for one command. Prefers the tracking block after ``pid N completed`` (then ``--- lapse``). Falls back to ``--- lapse`` after a matching command start record. """ log_path = os.path.abspath(log_path) if not os.path.isfile(log_path): return None, [] pid_s = str(pid) completed_patterns: List[str] = [] if end_time: completed_patterns.append( rf"{re.escape(end_time)}.*pid {re.escape(pid_s)} completed\b" ) completed_patterns.append(rf"\tpid {re.escape(pid_s)} completed\b") best_cmd: Optional[str] = None best_block: List[str] = [] best_score = -1 for pattern in completed_patterns: for line_num, _completed_text in _grep_log_line_numbers(pattern, log_path): lines = _read_log_window(log_path, line_num) cmd_line, block = _collect_pid_tracking_after_completed( lines, pid_s, start_time ) score = _pid_tracking_block_score(block) if score > best_score: best_score = score best_cmd = cmd_line or _completed_text.strip() best_block = block if best_block and any(ln.startswith("--- lapse") for ln in best_block): return best_cmd, best_block cmd_pattern = rf"{re.escape(start_time)}.*pid {re.escape(pid_s)} .+'" for line_num, cmd_text in _grep_log_line_numbers(cmd_pattern, log_path): lines = _read_log_window(log_path, line_num) block = _collect_pid_tracking_lines(lines, pid_s) score = _pid_tracking_block_score(block) if score > best_score: best_score = score best_cmd = cmd_text.strip() best_block = block return best_cmd, best_block def _server_error_blocks_in_lines(lines: List[str], pid: str) -> List[List[str]]: """Return ``Perforce server error`` bodies that name *pid*.""" pid_re = re.compile(rf"^Pid\s+{re.escape(str(pid))}\b", re.IGNORECASE) blocks: List[List[str]] = [] i = 0 while i < len(lines): if "Perforce server error:" in lines[i]: body: List[str] = [] i += 1 while i < len(lines): stripped = lines[i].strip() if stripped.startswith("Perforce server"): break if stripped: body.append(stripped) i += 1 if body and any(pid_re.match(ln) for ln in body): blocks.append(body) continue i += 1 return blocks def extract_pid_server_error_from_log( log_path: str, pid: str, start_time: str, *, end_time: Optional[str] = None, ) -> List[str]: """Return log lines from the ``Perforce server error`` block for one command.""" log_path = os.path.abspath(log_path) if not os.path.isfile(log_path): return [] pid_s = str(pid) search_starts: List[int] = [] if end_time: for pattern in ( rf"{re.escape(end_time)}.*pid {re.escape(pid_s)} completed\b", rf"\tpid {re.escape(pid_s)} completed\b", ): hits = _grep_log_line_numbers(pattern, log_path) if hits: search_starts.append(hits[0][0]) break if not search_starts: cmd_hits = _grep_log_line_numbers( rf"{re.escape(start_time)}.*pid {re.escape(pid_s)} .+'", log_path, ) if cmd_hits: search_starts.append(cmd_hits[-1][0]) for line_num in search_starts: lines = _read_log_window(log_path, line_num, 800) blocks = _server_error_blocks_in_lines(lines, pid_s) if blocks: return blocks[-1] return [] def generate_log_summary_text(log_path: str) -> str: """Grep-based P4 server log statistics (text). Used by p4diag stats and p4sla sidecars.""" log = os.path.abspath(log_path) if not os.path.isfile(log): return f"(Log file not found: {log})" is_gz = log.endswith(".gz") cat_cmd = f"zcat {shlex.quote(log)}" if is_gz else f"cat {shlex.quote(log)}" grep_cmd = "zgrep -a" if is_gz else "grep -a" lq = shlex.quote(log) def run(cmd: str) -> str: return s.getoutput(cmd) commands = { "size": f"ls -lh {lq} | awk '{{print $5}}'", "begin_range": f"{cat_cmd} | head -n200 | {grep_cmd} '2.*pid.*user-*' | head -n1 | awk -v OFS=' ' '{{print $1, $2}}'", "end_range": f"{cat_cmd} | tail -n5000 | {grep_cmd} '2.*pid.*user-*' | tail -n1 | awk -v OFS=' ' '{{print $1, $2}}'", "completed_cmds": f"{grep_cmd} completed {lq} | wc -l", "high_active_threads": f"{grep_cmd} 'active threads' {lq} | sort -n -r -k10 | head -n1", "num_write_waiters": ( f"{grep_cmd} -n -B1 -E 'total.*/+[0-9]{{5,}}ms+' {lq} | " f"{grep_cmd} -A1 'locks read' | {grep_cmd} total | wc -l" ), "num_blocking_mode": f"{grep_cmd} 'after.*non-blocking.*attempts' {lq} | wc -l", "num_killed_ml": f"{grep_cmd} 'killed by MaxLockTime' {lq} | wc -l", "num_killed_msr": f"{grep_cmd} 'killed by MaxScanRows' {lq} | wc -l", "num_killed_mres": f"{grep_cmd} 'killed by MaxResults' {lq} | wc -l", "num_shutdown": f"{grep_cmd} -F {shlex.quote('Perforce Server shutdown')} {lq} | wc -l", "num_starting": f"{grep_cmd} -F {shlex.quote('Perforce Server starting')} {lq} | wc -l", "num_signal_exit": f"{grep_cmd} -F {shlex.quote('exited on a signal')} {lq} | wc -l", "num_journal_repaired": ( f"{grep_cmd} -F {shlex.quote('Journal repaired for child process')} {lq} | wc -l" ), } results: Dict[str, str] = {} with ThreadPoolExecutor(max_workers=10) as executor: futures = {k: executor.submit(run, cmd) for k, cmd in commands.items()} for k, future in futures.items(): results[k] = future.result() match = re.match( r"^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}).*?using (\d+) active threads", results["high_active_threads"], ) if match: results["high_active_threads"] = ( f"{match.group(2)} active threads at {match.group(1)}" ) else: results["high_active_threads"] = "N/A" def _int_count(key: str) -> int: try: return int(results[key].strip()) except (ValueError, KeyError): return 0 buf = io.StringIO() def _append_match_block(title: str, key: str, pattern: str) -> None: n = _int_count(key) if n <= 0: return print(f"\n {title} ({n}):", file=buf) raw = run(f"{grep_cmd} -F {shlex.quote(pattern)} {lq}").strip() for log_line in raw.splitlines(): print(f" {log_line.lstrip()}", file=buf) lines = [ f"\n log name: {log}", f" log size: {results['size']}", f" log range: {results['begin_range']} - {results['end_range']}", f" completed commands: {results['completed_cmds'].strip()}", f" peak threads: {results['high_active_threads']}", f" write waiters over 10 seconds: {results['num_write_waiters'].strip()}", f" blocking mode commands: {results['num_blocking_mode'].strip()}", f" killed by MaxLockTime: {results['num_killed_ml'].strip()}", f" killed by MaxScanRows: {results['num_killed_msr'].strip()}", f" killed by MaxResults: {results['num_killed_mres'].strip()}", f" Perforce Server shutdown: {results['num_shutdown'].strip()}", f" Perforce Server starting: {results['num_starting'].strip()}", f" exited on a signal: {results['num_signal_exit'].strip()}", f" journal repaired (child process): {results['num_journal_repaired'].strip()}", ] for line in lines: print(line, file=buf) _append_match_block( "Perforce Server shutdown lines", "num_shutdown", "Perforce Server shutdown" ) _append_match_block( "Perforce Server starting lines", "num_starting", "Perforce Server starting" ) _append_match_block( "exited on a signal (e.g. under Perforce server error)", "num_signal_exit", "exited on a signal", ) _append_match_block( "Journal repaired for child process", "num_journal_repaired", "Journal repaired for child process", ) return buf.getvalue().rstrip() def _resolve_log2sql_executable() -> str: """Return an executable path for ``log2sql``; exit 127 if not found.""" exe = log2sql_executable() if os.path.isfile(exe) and os.access(exe, os.X_OK): return exe found = shutil.which(exe) if found: return found print( f"p4diag: log2sql not found ({exe!r}). " "Install log2sql and set LOG2SQL_BIN to its path, or add it to PATH.", file=sys.stderr, ) sys.exit(127) # Embedded trace used only for ``schema`` (build once under .p4diagnostics with log2sql). _HELP_SCHEMA_SAMPLE_LOG_TEXT = """Perforce server info: 2026/05/08 09:07:49 pid 1070252 bruno@bruno_ws 127.0.0.1 [p4/2025.2/LINUX26X86_64/2907753] 'user-fstat //depot/...' --- ident cmd/group 2D1CDF574ABC5DD327116F55B2CAA810/none --- lapse 3.92s --- usage 394+290us 816+0io 0+0net 143748k 0pf --- memory cmd/proc 132mb/132mb --- rpc msgs/size in+out 0+158354/0mb+66mb himarks 97604/97604 snd/rcv 3.21s/.000s --- filetotals (svr) send/recv files+bytes 0+0mb/0+0mb --- db.user --- pages in+out+cached 3+0+2 --- locks read/write 1/0 rows get+pos+scan put+del 1+0+0 0+0 --- db.ticket --- pages in+out+cached 3+0+2 --- locks read/write 1/0 rows get+pos+scan put+del 1+0+0 0+0 --- db.group --- pages in+out+cached 4+0+2 --- locks read/write 1/0 rows get+pos+scan put+del 0+2+3 0+0 --- peek count 1 wait+held total/max 0ms+0ms/0ms+0ms --- db.domain --- pages in+out+cached 4+0+3 --- locks read/write 1/0 rows get+pos+scan put+del 1+0+0 0+0 --- db.view --- pages in+out+cached 1+0+4 --- locks read/write 1/0 rows get+pos+scan put+del 0+1+2 0+0 --- db.have --- pages in+out+cached 6+0+5 --- locks read/write 0/0 rows get+pos+scan put+del 0+1+1 0+0 --- peek count 1 wait+held total/max 0ms+0ms/0ms+0ms --- db.revdx --- pages in+out+cached 4+0+3 --- locks read/write 1/0 rows get+pos+scan put+del 0+1+19 0+0 --- total lock wait+held read/write 0ms+279ms/0ms+0ms --- db.revhx --- pages in+out+cached 6462+0+96 --- locks read/write 1/0 rows get+pos+scan put+del 0+1+158337 0+0 --- total lock wait+held read/write 0ms+279ms/0ms+0ms --- db.locks --- pages in+out+cached 3+0+2 --- locks read/write 0/0 rows get+pos+scan put+del 0+158354+158354 0+0 --- peek count 1 wait+held total/max 0ms+44ms/0ms+44ms --- db.working --- pages in+out+cached 3+0+2 --- locks read/write 0/0 rows get+pos+scan put+del 0+1+1 0+0 --- peek count 1 wait+held total/max 0ms+0ms/0ms+0ms --- db.excl --- pages in+out+cached 3+0+2 --- locks read/write 0/0 rows get+pos+scan put+del 0+256+256 0+0 --- peek count 1 wait+held total/max 0ms+43ms/0ms+43ms --- db.trigger --- pages in+out+cached 3+0+2 --- locks read/write 1/0 rows get+pos+scan put+del 0+1+1 0+0 --- db.repo --- pages in+out+cached 3+0+2 --- locks read/write 1/0 rows get+pos+scan put+del 0+1+1 0+0 --- db.graphperm --- pages in+out+cached 4+0+2 --- locks read/write 1/0 rows get+pos+scan put+del 0+1+1 0+0 --- peek count 1 wait+held total/max 0ms+0ms/0ms+0ms --- db.protect --- pages in+out+cached 5+0+2 --- locks read/write 0/0 rows get+pos+scan put+del 0+1+20 0+0 --- peek count 3 wait+held total/max 0ms+0ms/0ms+0ms --- db.monitor --- pages in+out+cached 2+4+1024 --- locks read/write 6/2 rows get+pos+scan put+del 6+0+0 2+0 """ # Short quiet-cli names (normalized to long forms before ``argparse``; see ``_normalize_quiet_cli_argv``). QUIET_CLI_SHORT_ALIASES = { "log2sql": "log2sql", "plots": "log2sql-plots", "schema": "log2sql-schema", } P4DIAG_DESCRIPTION = """\ p4diag aggregates common P4 Server log analysis workflows in one script: • Uses log2sql to build a SQLite database from P4 Server log trace data. • Runs canned and summary SQL queries to surface patterns and commands that often point to lock contention, CPU use, and other performance issues. • Writes grep-based log statistics, summary SQL results, victim/culprit write-wait analysis, and command-activity plots into text and HTML under .p4diagnostics/ for viewing in a browser (interactive mode starts a small local web server for those files). Run from the directory containing your P4 Server log. Use the commands below, or p4diag LOG for an interactive menu.""" P4DIAG_EPILOG = """ Requirements: Python 3 (built-in sqlite3 module required — test: python3 -c "import sqlite3") pip install tabulate log2sql on PATH (or set LOG2SQL_BIN) sqlite3 on PATH — LOG2SQL canned .sql reports, ad hoc SELECT, pid probe (sqlite 3.31+ required for pid / @pid @start @table @duration bind parameters) gnuplot — optional, for plots and summary PNGs grep, sed — log stats and trim Lock shortcuts (ww/wh/rw/rh) use Python's built-in sqlite3 module only; the sqlite3 CLI is still required for the numbered SQL library and pid. Install / distribution: Download: https://swarm.workshop.perforce.com/projects/perforce_software-admin-toolkit/download/main/guest/perforce_software/admin_toolkit/p4diag Browse: https://swarm.workshop.perforce.com/projects/perforce_software-admin-toolkit/files/main/guest/perforce_software/admin_toolkit/p4diag Copy the p4diag script anywhere on PATH (e.g. ~/bin/p4diag) and chmod +x p4diag. Run from the directory that contains your P4 server log; output goes in ./.p4diagnostics/ beside the log. The standard canned-query library (lock, CPU, memory, command, victim/culprit reports) is built into the script — no sql_queries/ directory required. Optional: place sql_queries/ next to p4diag to override or add custom .sql reports (or set P4DIAG_SQL_QUERIES). Optional summary_queries/ and recommendations/ directories override built-in summary SQL and performance-pattern rules. Override paths with P4DIAG_SQL_QUERIES, P4DIAG_SUMMARY_QUERIES, and P4DIAG_RECOMMENDATIONS. Environment (optional): LOG2SQL_BIN path to log2sql binary (default: log2sql on PATH) P4DIAG_SQL_QUERIES directory of library .sql files (default: /sql_queries) P4DIAG_SUMMARY_QUERIES summary report sections (default: /summary_queries) P4DIAG_RECOMMENDATIONS pattern rules for summary RECOMMENDATIONS (default: /recommendations) For --start and --end TIME, TIME="YYYY/MM/DD HH:MM:SS" Large logs — trim before log2sql / summary: Building a log2sql database and generating summary reports can take a long time on large P4 Server logs (multi-GB traces are common). Trim the log to the time window where the problem occurred before running log2sql or summary. One useful heuristic: find the peak "active threads" line in the log (see "peak threads" in p4diag stats output, or interactive menu command active), note its timestamp, then trim roughly one hour before and one hour after that peak. That window usually captures the surrounding contention without processing the full log. p4diag trim LOG --start TIME --end TIME p4diag LOG --start TIME --end TIME # trim, then interactive menu Interactive (TTY menu): p4diag LOG # interactive TTY menu + HTTP server for .p4diagnostics/ p4diag LOG --start TIME --end TIME # trim log, then interactive menu + HTTP server Interactive web server (HTTP): Serves ./.p4diagnostics/ from the directory where you launched p4diag. The printed URL uses this host's work-network IPv4 (default-route interface), not the short hostname — open that URL from your laptop when VPN'd in. That address is not 127.0.0.1: loopback only works if the browser runs on the same machine as p4diag. Hostname links often fail when the name is missing from DNS or /etc/hosts even though the IP works. SSH without VPN: ssh -L PORT:127.0.0.1:PORT. Port is 8000 + (your Unix uid % 1000) — fixed per user, not per log or case directory, and not overridable. Only one listener per user: if that port is already in use, a second interactive p4diag prints "Web server already running" and does not start another server. The process already bound to the port keeps serving the first session's .p4diagnostics/ and uses the first session's LOG for /. For two cases at once, use quiet subcommands (no web server), open .p4diagnostics/ HTML directly, or run interactive sessions as different Unix users. CLI (no TTY menu): p4diag trim LOG --start TIME --end TIME # trim LOG by start/end time p4diag stats LOG # log statistics to .p4diagnostics/LOG.stats.txt p4diag log2sql LOG # create log2sql SQLite DB to .p4diagnostics/LOG.db p4diag summary LOG # LOG.summary.txt/LOG.summary.HTML to .p4diagnostics p4diag FILE.sql LOG # run NAME.sql (basename → P4DIAG_SQL_QUERIES) p4diag list # list library queries from P4DIAG_SQL_QUERIES p4diag schema # list schema for log2sql database p4diag plots LOG # gnuplot ASCII + PNG under .p4diagnostics/ """ # Subcommands handled by ``run_quiet_subcommand`` (also invocable without ``-q``; see ``_argv_invokes_quiet_cli``). QUIET_CLI_SUBCOMMANDS = frozenset( ( "log2sql", "stats", "log2sql-schema", "log2sql-query-sql", "log2sql-query-sql-list", "log2sql-plots", "list", "summary", "trim", *QUIET_CLI_SHORT_ALIASES.keys(), ) ) def _argv_is_sql_library_query(argv: list) -> bool: """True for ``p4diag NAME.sql [LOG]`` (library or absolute ``.sql`` path).""" return bool(argv) and argv[0].lower().endswith(".sql") def _argv_is_sql_query_list(argv: list) -> bool: """True for ``p4diag list``.""" return len(argv) == 1 and argv[0] == "list" def _argv_invokes_quiet_cli(argv: list) -> bool: """True if ``argv`` (args after script name) should use quiet CLI (no menu / web server).""" if not argv: return False if _argv_is_sql_library_query(argv) or _argv_is_sql_query_list(argv): return True return argv[0] in QUIET_CLI_SUBCOMMANDS @contextmanager def cli_spinner(message: str) -> Iterator[None]: """Animate a spinner on stderr during long CLI steps (no-op if stderr is not a TTY).""" msg = message.rstrip() if not getattr(sys.stderr, "isatty", lambda: False)(): yield return stop = threading.Event() def _spin() -> None: wrote_frame = False for ch in itertools.cycle("|/-\\"): if stop.wait(0.1): break sys.stderr.write(f"\r{msg} {ch} ") sys.stderr.flush() wrote_frame = True if wrote_frame: sys.stderr.write("\n") sys.stderr.flush() t = threading.Thread(target=_spin, daemon=True) t.start() try: yield finally: stop.set() t.join(timeout=60) if t.is_alive() and getattr(sys.stderr, "isatty", lambda: False)(): sys.stderr.write("\n") sys.stderr.flush() @contextmanager def quiet_stderr_activity(message: str): """While ``QUIET``, show ``cli_spinner`` on stderr.""" if not QUIET: yield return with cli_spinner(message): yield import html import http.server import socketserver import socket from urllib.parse import unquote, urlparse # ANSI color codes CYAN = '\033[96m' BOLD = '\033[1m' RESET = '\033[0m' WHITE_BOLD = '\033[97;1m' RED = '\033[91m' GREEN = '\033[92m' YELLOW = '\033[93m' DIM = '\033[2m' def handler(signum, frame): return class log2sql(): def __init__(self, theFile): self.logFile = "unset" self.logFileDetails = "unset" self.databaseFile = "unset" self.summaryFile = "" self.maxActiveThreadsFile = "" self.activeThreadsSummaryFile = "" self.failuresFile = "" self.summaryQueriesFile = "" self.begin_range=0 self.end_range=0 self.min_hold_time = 2000 self.VALID_COLUMNS = { # process table columns "processkey", "lineNumber", "pid", "startTime", "endTime", "computedLapse", "completedLapse", "paused", "user", "workspace", "ip", "app", "cmd", "args", "uCpu", "sCpu", "diskIn", "diskOut", "ipcIn", "ipcOut", "maxRss", "pageFaults", "memMB", "memPeakMB", "rpcMsgsIn", "rpcMsgsOut", "rpcSizeIn", "rpcSizeOut", "rpcHimarkFwd", "rpcHimarkRev", "rpcSnd", "rpcRcv", "fileTotalsSnd", "fileTotalsRcv", "fileTotalsSndMB", "fileTotalsRcvMB", "running", "netSyncFilesAdded", "netSyncFilesUpdated", "netSyncFilesDeleted", "netSyncBytesAdded", "netSyncBytesUpdated", "lbrRcsOpens", "lbrRcsCloses", "lbrRcsCheckins", "lbrRcsExists", "lbrRcsReads", "lbrRcsReadBytes", "lbrRcsWrites", "lbrRcsWriteBytes", "lbrRcsDigests", "lbrRcsFileSizes", "lbrRcsModtimes", "lbrRcsCopies", "lbrBinaryOpens", "lbrBinaryCloses", "lbrBinaryCheckins", "lbrBinaryExists", "lbrBinaryReads", "lbrBinaryReadBytes", "lbrBinaryWrites", "lbrBinaryWriteBytes", "lbrBinaryDigests", "lbrBinaryFileSizes", "lbrBinaryModtimes", "lbrBinaryCopies", "lbrCompressOpens", "lbrCompressCloses", "lbrCompressCheckins", "lbrCompressExists", "lbrCompressReads", "lbrCompressReadBytes", "lbrCompressWrites", "lbrCompressWriteBytes", "lbrCompressDigests", "lbrCompressFileSizes", "lbrCompressModtimes", "lbrCompressCopies", "lbrUncompressOpens", "lbrUncompressCloses", "lbrUncompressCheckins", "lbrUncompressExists", "lbrUncompressReads", "lbrUncompressReadBytes", "lbrUncompressWrites", "lbrUncompressWriteBytes", "lbrUncompressDigests", "lbrUncompressFileSizes", "lbrUncompressModtimes", "lbrUncompressCopies", "error", # tableUse table columns "tableName", "pagesIn", "pagesOut", "pagesCached", "pagesSplitInternal", "pagesSplitLeaf", "readLocks", "writeLocks", "getRows", "posRows", "scanRows", "putRows", "delRows", "totalReadWait", "totalReadHeld", "totalWriteWait", "totalWriteHeld", "maxReadWait", "maxReadHeld", "maxWriteWait", "maxWriteHeld", "peekCount", "totalPeekWait", "totalPeekHeld", "maxPeekWait", "maxPeekHeld", "triggerLapse" } if theFile: self.logFile = theFile self.databaseFile = theFile def setLogFile(self,show_menu=True): currentDatabaseFile = self.databaseFile if self.databaseFile == "unset" or show_menu: while True: filelist=self.listFiles(".") database_file=input('\n\033[1mSelect log file name or number (b = back, x = exit) > \033[0m') if database_file.isnumeric(): index=int(database_file) - 1 if validateInput(index, len(filelist)+1): database_file=filelist[index] break else: break self.databaseFile=database_file.strip() if self.databaseFile == 'x' or self.databaseFile == 'exit': exitP4() if self.databaseFile == 'b' or self.databaseFile == 'back': exitP4() if self.databaseFile.endswith('.db'): self.logFile = self.databaseFile[:-3] db_name = os.path.basename(self.databaseFile) else: self.logFile = self.databaseFile db_name = os.path.basename(self.logFile) + '.db' log_abs = os.path.abspath(self.logFile) diag_dir = os.path.join(os.path.dirname(log_abs), '.p4diagnostics') os.makedirs(diag_dir, exist_ok=True) self.databaseFile = os.path.join(diag_dir, db_name) log_base = os.path.basename(self.logFile) self.summaryFile = os.path.join(diag_dir, log_base + ".summary" + ".txt") self.summaryFileHTML = os.path.join(diag_dir, log_base + ".summary" + ".html") self.summaryQueriesFile = os.path.join(diag_dir, "queries.sql") self.logFileDetails = os.path.join(diag_dir, log_base + ".stats.txt") self.errorsSummaryFile = os.path.join(diag_dir, log_base + ".errors_summary") self.maxActiveThreadsFile = os.path.join(diag_dir, log_base + ".active_threads") self.activeThreadsSummaryFile = os.path.join(diag_dir, log_base + ".active_threads_summary") self.failuresFile = os.path.join(diag_dir, log_base + ".fails") if show_menu: self.menu() def createDatabase(self): if not os.path.exists(self.databaseFile): cmd = [_resolve_log2sql_executable(), '-d', self.databaseFile, '--no.metrics', self.logFile] if QUIET: with open(LOG_FILE, "a") as f: f.write("[INFO] Generating log2sql database ...\n") proc = s.Popen( cmd, stdout=f, stderr=f, text=True, bufsize=1, universal_newlines=True, ) with quiet_stderr_activity("Generating log2sql database..."): proc.wait() else: with cli_spinner("Generating log2sql database..."): proc = s.Popen( cmd, stdout=s.PIPE, stderr=s.STDOUT, text=True, bufsize=1, universal_newlines=True ) if proc.stdout is not None: for line in proc.stdout: print(line.rstrip()) proc.wait() if proc.returncode == 0: if QUIET: with open(LOG_FILE, "a") as f: f.write(f"[INFO] Generated log2sql database {self.databaseFile}\n") else: print( f"{GREEN}✓ Generated log2sql database{RESET} " f"{BOLD}{GREEN}{self.databaseFile}{RESET}" ) else: if QUIET: with open(LOG_FILE, "a") as f: f.write(f"[FAIL] log2sql failed with exit code {proc.returncode}\n") else: print(f"{RED}✗ log2sql failed with exit code {proc.returncode}{RESET}") def query_menu(self): if not os.path.exists(self.databaseFile): print("You need to generate a database first...") print("Run createDatabase from the menu.") self.menu() return commands = [ (".schema", "show schema for trace DB tables"), ("m or menu", "query menu"), ("b or back", "back to main menu"), ("x or exit", "exit p4diag"), ] print(f"\n {BOLD}command [table] [startDate endDate] [limit] [column1,columnN]{RESET}\n") print(" command = wh | ww | rh | rw | pid") print(" pid = probe one command (prompts for PID + startTime)") print(" table = table name") print(" startDate = begin range of date/time (format: YYYY/MM/DD HH:MM:SS)") print(" endDate = end range of date/time (format: YYYY/MM/DD HH:MM:SS)") print(" limit = LIMIT number for results (default 25)") print(" column1,columnN = additional columns to include in output\n") queries = list_sql_library_names() if queries: print_numbered_list(queries) print() else: print(f" {YELLOW}No SQL library queries available.{RESET}") print( " Optional: place sql_queries/ next to p4diag, " "or set P4DIAG_SQL_QUERIES.\n" ) print( f" {BOLD}h or help{RESET} describes what each numbered query is for " "and shortcut usage examples\n" ) for cmd, desc in commands: print(f" {BOLD}{cmd:<15}{RESET} {desc}") print( f"\nEnter {BOLD}command{RESET}, path to a .sql file, " f"or a {BOLD}numbered SQL query{RESET}.\n" ) self.queryPrompt() def queryPrompt(self): query = input( "\033[1mLOG2SQL (" + os.path.basename(self.databaseFile) + ") (m = menu): > \033[0m" ) if (query == 'b' or query == 'back'): self.menu() elif (query == 'q' or query == 'quit'): self.menu() elif (query == 'm' or query == 'menu'): self.query_menu() elif (query == 'x' or query == 'exit'): exitP4(); elif query.strip().lower() in ('h', 'help', '?'): self.print_query_shortcut_help() self.queryPrompt() elif query.strip().isnumeric(): queries = list_sql_library_names() if not queries: print(f"\n {YELLOW}No SQL library queries available.{RESET}") self.queryPrompt() return index = int(query.strip()) - 1 if validateInput(index, len(queries)): self.query(queries[index]) else: print(f"\n Enter a number between 1 and {len(queries)}.") self.queryPrompt() else: self.query(query) def print_query_shortcut_help(self) -> None: """Help for interactive LOG2SQL commands, lock shortcuts, and SQL library.""" sql_dir = p4diag_sql_queries_dir() print(f"\n {BOLD}LOG2SQL command help{RESET}\n") print(f" {BOLD}Lock shortcuts{RESET}\n") print(f" {BOLD}ww{RESET} write waiters (victims — blocked on write locks)") print(f" {BOLD}wh{RESET} write holders (culprits — holding write locks)") print(f" {BOLD}rw{RESET} read waiters (victims — blocked on read locks)") print(f" {BOLD}rh{RESET} read holders (culprits — holding read locks)") print(f" {BOLD}pid{RESET} probe one command (prompts for PID + startTime)\n") print( f" Syntax: {BOLD}command [table] [startDate startTime endDate endTime]" f" [limit] [col1,col2,...]{RESET}\n" ) print(" table db table name (optional; db. prefix is stripped)") print(" start/end both required if either is used (YYYY/MM/DD HH:MM:SS)") print(" limit max rows (default 25; ignored when a date range is set)") print(" col1,col2 extra columns from the trace DB (comma-separated)\n") print(f" {BOLD}Examples:{RESET}\n") print_help_query_table( list(_LOCK_SHORTCUT_EXAMPLES), col1="Command", col2="Shows", ) print( "\n START END = YYYY/MM/DD HH:MM:SS (both required if either is used)\n" ) print(f" {BOLD}SQL library (numbered list in menu){RESET}\n") print( " The numbered .sql files in the menu are canned reports shipped with " "p4diag or added locally." ) print(" Run one by entering its menu number, or type the basename") print(" (e.g. cpu_summary.sql) or a full path to any .sql file.\n") print(f" Library directory: {BOLD}{sql_dir}{RESET}\n") queries = list_sql_library_names() if queries: print_sql_library_help(queries) print(" Add your own queries:") print(" • copy a .sql file into that directory (any basename ending in .sql)") print(" • type m at the prompt to refresh the numbered list") print(" • override the directory with P4DIAG_SQL_QUERIES if needed") print( " Scripts may use sqlite dot-commands (.print, .width) or plain SQL." ) print( " pid prompts for PID + startTime (built into p4diag, not a numbered query).\n" " locks_all_duration.sql prompts for a duration in ms (or: locks_all_duration.sql 5000).\n" " locks_held_total.sql prompts for an optional table (or: locks_held_total.sql revhx).\n" " locks_table_by_cmd.sql prompts for a table name (or: locks_table_by_cmd.sql revdx).\n" ) print(f" {BOLD}Other inputs{RESET}\n") print(" .schema show trace DB table columns") print(" SELECT ... run ad hoc SQL against the trace database\n") def createLogSummary( self, filter_section=None, force_regenerate: bool = False, ) -> bool: """Write ``.summary.txt`` / ``.summary.html`` unless a valid cache exists. Returns True if rebuilt.""" MAX_COL_WIDTH = 50 # Max width before truncation summary_sections = get_summary_sections() meta_path = summary_meta_path(self.summaryFile) if force_regenerate: for path in (self.summaryFile, self.summaryFileHTML, meta_path): if os.path.isfile(path): try: os.remove(path) except OSError: pass if summary_cache_is_current( self.logFile, self.databaseFile, self.logFileDetails, self.summaryFile, self.summaryFileHTML, meta_path, ): if QUIET and LOG_FILE: try: with open(LOG_FILE, "a") as f: f.write( f"[INFO] Summary report unchanged, using {self.summaryFile}\n" ) except OSError: pass elif not QUIET: print( f"{DIM}Summary report up to date{RESET} " f"{BOLD}{self.summaryFile}{RESET}" ) return False def gen_summary_report(): conn = sqlite3.connect(self.databaseFile) conn.row_factory = sqlite3.Row cursor = conn.cursor() sections_to_run = ( summary_sections.items() if not filter_section else [(filter_section, summary_sections.get(filter_section, []))] ) vc_panels = get_victim_culprit_panels_cached(self.databaseFile) vc_html = format_victim_culprit_html(vc_panels) vc_text_embed = format_victim_culprit_ascii(vc_panels, include_banner=False) base_dir = os.getcwd() target_dir = os.path.dirname(self.databaseFile) diag_dir = target_dir log_path = os.path.join(base_dir, self.logFile) db_path = os.path.join(base_dir, self.databaseFile) write_p4_plot_pngs( log_path, db_path, target_dir, skip_existing=True, ) log_basename = os.path.basename(self.logFile) db_basename = os.path.basename(self.databaseFile) png_files = [ f"Active.{log_basename}.png", f"dbWaitTime.{db_basename}.png", f"Incoming.{db_basename}.png", f"Running.{db_basename}.png", ] stats_lines = [] if self.logFileDetails and os.path.isfile(self.logFileDetails): with open(self.logFileDetails, "r") as detailsFile: stats_lines = detailsFile.readlines() stats_text = "".join(stats_lines) plots_section_body = _build_summary_plots_section(diag_dir, png_files) recommendation_matches = evaluate_recommendations( self.databaseFile, stats_text=stats_text, ) recommendations_text = format_recommendations_plain_text(recommendation_matches) recommendations_html = format_recommendations_html(recommendation_matches) sql_buf = io.StringIO() queryFile = open(self.summaryQueriesFile, "w") queryFile.write(f".mode column\n") queryFile.write(f".headers on\n") for section_name, queries in sections_to_run: sql_buf.write(f"\n--- {section_name.upper()} ---\n") canned_basename = _SUMMARY_CANNED_QUERY_SECTIONS.get(section_name) if canned_basename: script_body = resolve_sql_query_text(canned_basename) if not script_body: sql_buf.write( f"(Canned query not found: {canned_basename})\n" ) continue queryFile.write(f"\n-- {section_name} ({canned_basename})\n") queryFile.write(script_body) if not script_body.endswith("\n"): queryFile.write("\n") try: section_text = self._run_sql_read_script_to_string(script_body) sql_buf.write(section_text) if section_text and not section_text.endswith("\n"): sql_buf.write("\n") except RuntimeError as e: sql_buf.write(f"Error: {e}\n") continue for label, query in queries: queryFile.write(f".print \'\'\n") queryFile.write(f"\n.print \'{label}\'\n") sql_buf.write(f"\n@@QUERY@@{label}@@\n") try: queryFile.write(f"{query}\n") cursor.execute(query) rows = cursor.fetchall() if rows: headers = rows[0].keys() columns = list(headers) data = [ [str(row[col]) if row[col] is not None else "" for col in columns] for row in rows ] truncated_data = [ [val if len(val) <= MAX_COL_WIDTH else val[:MAX_COL_WIDTH - 3] + "..." for val in row] for row in data ] widths = [max(len(col), max(len(row[i]) for row in truncated_data)) for i, col in enumerate(columns)] header_line = " ".join(col.ljust(widths[i]) for i, col in enumerate(columns)) sql_buf.write(header_line + "\n") underline = " ".join("-" * widths[i] for i in range(len(columns))) sql_buf.write(underline + "\n") for row in truncated_data: sql_buf.write(" ".join(row[i].ljust(widths[i]) for i in range(len(columns))) + "\n") else: sql_buf.write("(No results)\n") except sqlite3.Error as e: sql_buf.write(f"Error: {e}\n") queryFile.close() conn.close() sql_sections_text = sql_buf.getvalue() sep = "=" * 72 title_line = f"=== Summary Report for {self.logFile} ===\n" with open(self.summaryFile, "w") as summaryFile: summaryFile.write(title_line) summaryFile.write("\nLOG STATISTICS\n") summaryFile.write(sep + "\n") if stats_text.strip(): summaryFile.write(stats_text) if not stats_text.endswith("\n"): summaryFile.write("\n") else: summaryFile.write("(no log statistics file; run: p4diag stats )\n") summaryFile.write("\nRECOMMENDATIONS\n") summaryFile.write(sep + "\n") summaryFile.write(recommendations_text) summaryFile.write("\nPLOTS\n") summaryFile.write(sep + "\n") summaryFile.write(plots_section_body) summaryFile.write("\nDETAILED SQL REPORTS\n") summaryFile.write(sep + "\n") summaryFile.write(_summary_sql_sections_plain_text(sql_sections_text)) if not sql_sections_text.endswith("\n"): summaryFile.write("\n") summaryFile.write("\nVICTIM CULPRIT REPORT\n") summaryFile.write(sep + "\n") summaryFile.write(vc_text_embed) if not vc_text_embed.endswith("\n"): summaryFile.write("\n") html_file = self.summaryFileHTML stats_html_inner = ( stats_text if stats_text.strip() else "(no log statistics file; run: p4diag stats )" ) png_entries: List[Tuple[str, str]] = [] for fname in png_files: full_path = os.path.join(diag_dir, fname) if os.path.exists(full_path) and os.path.getsize(full_path) > 0: png_entries.append((fname, fname)) plots_empty = ( plots_section_body.strip() or "No PNG files yet — install gnuplot and run log2sql or plots." ) html_content = render_summary_html_report( log_file=self.logFile, db_file=self.databaseFile, stats_html_inner=stats_html_inner, recommendations_html=recommendations_html, png_entries=png_entries, plots_empty_note=plots_empty, sql_sections_html=_summary_sql_sections_to_html(sql_sections_text), vc_html=vc_html, ) with open(html_file, "w", encoding="utf-8", errors="replace") as f: f.write(html_content) sig = _summary_inputs_signature( self.logFile, self.databaseFile, self.logFileDetails ) if sig: payload = { "version": SUMMARY_CACHE_VERSION, "layout_version": SUMMARY_HTML_LAYOUT_VERSION, "recommendations_sig": _recommendations_rules_signature(), **sig, } _summary_meta_write(meta_path, payload) thread = threading.Thread(target=gen_summary_report) if QUIET: with open(LOG_FILE, "a") as f: f.write("[INFO] Generating summary report ...\n") thread.start() with quiet_stderr_activity("Generating summary report..."): thread.join() else: with cli_spinner("Generating summary report..."): thread.start() thread.join() if QUIET: with open(LOG_FILE, "a") as f: f.write(f"[INFO] Generated summary report {self.summaryFileHTML}\n") print(f"Wrote summary report: {self.summaryFile}", flush=True) print(f"Wrote summary HTML: {self.summaryFileHTML}", flush=True) else: print( f"{GREEN}✓ Generated summary report{RESET} " f"{BOLD}{self.summaryFileHTML}{RESET}" ) return True def printLogSummary(self): self.createLogSummary(force_regenerate=False) if os.path.isfile(self.summaryFile): s.run(["less", self.summaryFile]) def createPlots(self): base_dir = os.getcwd() target_dir = os.path.dirname(self.databaseFile) log_path = os.path.join(base_dir, self.logFile) db_path = os.path.join(base_dir, self.databaseFile) write_p4_plot_pngs(log_path, db_path, target_dir, skip_existing=True) def menu(self): menu_w = 12 print() for cmd, desc in ( ("1. query", "Query trace database"), ("2. summary", "Summary report for trace database"), ("3. stats", "Show log file statistics"), ("4. search", "Search log file for string"), ("5. pid", "Probe one command by PID + startTime"), ("6. errors", "Show log file errors messages"), ): print(f" {BOLD}{cmd:<{menu_w}}{RESET}{desc}") print(f"\n {BOLD}h or help{RESET}\tHelp for server log analysis") print(f" {BOLD}m or menu{RESET} - server log menu") print(f" {BOLD}b or back{RESET}- exit p4diag") print(f" {BOLD}x or exit{RESET} - exit p4diag") self.commandPrompt() def commandPrompt(self): command = input("\n\033[1mcommand (m = menu) > \033[0m") if (command == 'b' or command == 'back'): exitP4() elif (command == 'm' or command == 'menu'): self.menu() elif (command == 'x' or command == 'exit'): exitP4() else: self.runCommand(command) def runCommand(self,command): if (command == '1' or command == 'query'): self.query_menu() if (command == '2' or command == 'summary'): self.printLogSummary() self.commandPrompt() if (command == '3' or command == 'stats'): self.printLogStats() self.commandPrompt() elif (command == '4' or command == 'search'): self.search_log() self.commandPrompt() elif (command == '5' or command == 'pid'): self.show_pid() elif (command == '6' or command == 'errors'): self.show_errors() self.commandPrompt() elif (command == 'h' or command == 'help'): usage_log2sql() self.commandPrompt() elif (command == 'b' or command == 'back'): exitP4() elif (command == 'x' or command == 'exit'): exitP4() else: self.commandPrompt() def trim_log(self, start=None, end=None): def get_valid_time(prompt): while True: value = input(prompt) if value == 'b': exitP4() if value == 'x': exitP4() if re.match(r'^20\d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}$', value.strip()): return value.strip() print("\tPlease enter a valid value for date/time") if self.databaseFile == "unset": while True: filelist = listFiles(".") log_to_trim = input('\n\033[1mLog to trim (b = back, x = exit) > \033[0m') if log_to_trim == 'b': exitP4() if log_to_trim == 'x': exitP4() if log_to_trim.isnumeric(): index = int(log_to_trim) - 1 if validateInput(index, len(filelist)): log_to_trim = filelist[index] startTime = get_valid_time("\n\033[1mEnter start date/time (yyyy/mm/dd hh:mm:ss) > \033[0m") endTime = get_valid_time("\033[1mEnter end date/time (yyyy/mm/dd hh:mm:ss) > \033[0m") handle_trim(log_to_trim, startTime, endTime) else: log_to_trim = self.logFile if self.logFile not in ("unset", "") else self.databaseFile startTime = start endTime = end if startTime and endTime and log_to_trim not in ("unset", ""): return trim_p4_server_log_file(log_to_trim, startTime, endTime) def find_nearest_line_by_probe(log_file, timestamp_str, direction='forward'): def grep_line(ts_minute): # Match any timestamp within the same minute, e.g., 2025/10/22 08:56:XX pattern = f"{ts_minute}:[0-9][0-9]" cmd = f"grep -a -n -E '{pattern}' {log_file} | head -1" result = s.getoutput(cmd).strip() if result: parts = result.split(":", 1) if len(parts) == 2: line_num = int(parts[0]) line_text = parts[1] return line_num, line_text.strip() return None, None timestamp = datetime.strptime(timestamp_str, "%Y/%m/%d %H:%M:%S") for i in range(1, 121): # up to 2 hours probe_time = timestamp + timedelta(minutes=i) if direction == 'forward' else timestamp - timedelta(minutes=i) probe_time_str = probe_time.strftime("%Y/%m/%d %H:%M") line_num, line_text = grep_line(probe_time_str) if line_num: return line_num, line_text.strip() return None, None # Create filename rep = {"/": "", ":": "", " ": "_"} st = startTime et = endTime for i, j in rep.items(): st = st.replace(i, j) et = et.replace(i, j) logTrim = log_to_trim + "." + st + "-" + et if not os.path.exists(logTrim): head = s.getoutput(f"grep -a -n '{startTime}' {log_to_trim} | head -1 | cut -f 1 -d :") tail = s.getoutput(f"grep -a -n '{endTime}' {log_to_trim} | tail -1 | cut -f 1 -d :") if not head: print(f"Start time not found. Searching for nearest available time after: {startTime}") head_line, head_text = find_nearest_line_by_probe(log_to_trim, startTime, direction='forward') if head_line: head = head_line st = head_text.split()[0] + " " + head_text.split()[1] print(f"Found nearest time after: {st}") else: print(f"Start time {startTime} not found and no nearby timestamp found.") return None if not tail: print(f"End time not found. Searching for nearest available time before: {endTime}") tail_line, tail_text = find_nearest_line_by_probe(log_to_trim, endTime, direction='backward') if tail_line: tail = tail_line et = tail_text.split()[0] + " " + tail_text.split()[1] print(f"Found nearest time before: {et}") else: print(f"End time {endTime} not found and no nearby timestamp found.") return None # Create filename rep = {"/": "", ":": "", " ": "_"} for i, j in rep.items(): st = st.replace(i, j) et = et.replace(i, j) logTrim = log_to_trim + "." + st + "-" + et def run_trim(): quit_line = str(int(tail) + 1) s.getoutput(f"sed -n '{head},{tail}p; {quit_line}q' {log_to_trim} > {logTrim}") thread = threading.Thread(target=run_trim) if QUIET: thread.start() with quiet_stderr_activity("Trimming log..."): thread.join() print(f"Trimmed log: {logTrim}", flush=True) else: with cli_spinner("Trimming log file..."): thread.start() thread.join() print(f"{GREEN}✓ Trimmed log{RESET} {BOLD}{GREEN}{logTrim}{RESET}") return logTrim def createLogStats(self, force_regenerate: bool = False) -> bool: """Write ``.stats.txt`` from the P4 log unless a valid cache exists. Returns True if regenerated.""" meta_path = log_stats_meta_path(self.logFileDetails) log_path = self.logFile if force_regenerate: for p in (self.logFileDetails, meta_path): if os.path.isfile(p): try: os.remove(p) except OSError: pass if log_stats_cache_is_current(log_path, self.logFileDetails, meta_path): if QUIET and LOG_FILE: try: with open(LOG_FILE, "a") as f: f.write( f"[INFO] Log statistics unchanged (log mtime/size), using {self.logFileDetails}\n" ) except OSError: pass elif not QUIET: print(f"{DIM}Log statistics up to date{RESET} {BOLD}{self.logFileDetails}{RESET}") return False for p in (self.logFileDetails, meta_path): if os.path.isfile(p): try: os.remove(p) except OSError: pass def gen_log_stats(): text = generate_log_summary_text(self.logFile) with open(self.logFileDetails, "w", encoding="utf-8", errors="replace") as outfile: outfile.write(text) if text and not text.endswith("\n"): outfile.write("\n") sig = _log_source_signature(self.logFile) if sig: _log_stats_meta_write(meta_path, sig) thread = threading.Thread(target=gen_log_stats) if QUIET: with open(LOG_FILE, "a") as f: f.write("[INFO] Generating log statistics ...\n") thread.start() with quiet_stderr_activity("Generating log statistics..."): thread.join() else: with cli_spinner("Generating log statistics..."): thread.start() thread.join() if QUIET: with open(LOG_FILE, "a") as f: f.write(f"[INFO] Generated log statistics {self.logFileDetails}\n") else: print( f"{GREEN}✓ Generated log statistics{RESET} " f"{BOLD}{GREEN}{self.logFileDetails}{RESET}" ) return True def printLogStats(self): self.createLogStats() s.run(["cat", self.logFileDetails], check=True) def search_log(self): print( f"\n Search {BOLD}{os.path.basename(self.logFile)}{RESET} with grep " "(case-sensitive substring match)." ) print(" Quote the pattern if it contains spaces or shell characters.") context = input( "\n Show each match with surrounding context lines (±75)? (y/n) [n]: " ).strip().lower() search_string = input(" Enter search string: ").strip() if not search_string: print("\n No search string entered.") return if context in ("y", "yes"): result = s.run( [ "grep", "-a", "--color=always", "-C75", "--", search_string, self.logFile, ], capture_output=True, text=True, ) if result.returncode > 1: print(result.stderr or f"grep failed (exit {result.returncode})") return lines = result.stdout else: lines = s.getoutput( "grep -a " + shlex.quote(search_string) + " " + shlex.quote(self.logFile) ) if lines: print(lines, end="" if lines.endswith("\n") else "\n") else: print(" (no matches)") def show_active_threads(self): if not (os.path.isfile(self.activeThreadsSummaryFile)): pid_found = s.getoutput("grep -a -n 'active threads' " + self.logFile + " | sort -nr -k10 | head -1 | cut -f 5 -d ' ' | cut -f 1 -d ':'") if pid_found: threadsFile = open(self.activeThreadsSummaryFile, "a") threadsFile.write("\nMax active threads: \n\t") s.getoutput("grep -a -n 'active threads' " + self.logFile + " | grep -a " + pid_found + " > " + self.maxActiveThreadsFile) threadsFile.write(s.getoutput("grep -a -n 'active threads' " + self.logFile + " | grep -a " + pid_found + " | sort -nr -k10 | head -1")) threadsFile.close() print("\nSummary of active threads in log file " + self.logFile + "\n") else: print("\nNo active threads messages found in\033[1m " + self.logFile + "\033[0m") return s.run(["cat", self.activeThreadsSummaryFile]) show_all = input("\n\nShow all active threads from log (y/n)? ") if show_all == "yes" or show_all == "y": s.run(["less", self.maxActiveThreadsFile]) self.commandPrompt() def show_errors(self): if not (os.path.isfile(self.errorsSummaryFile)): errorsFile = open(self.errorsSummaryFile, "a") # calculate and report details print("\nSummary of errors in log file " + self.logFile + "\n") errorsFile.write("\nPartner exited unexpectedly errors: ") errorsFile.write(s.getoutput("grep -a 'Partner exited unexpectedly' " + self.logFile + " | wc -l")) errorsFile.write("\nFail errors\n") failure = s.getoutput("grep -a -A5 'Perforce server error' " + self.logFile + " | grep -a failed | sort | uniq -c | sort -r") s.getoutput("grep -a -n -A5 'Perforce server error' " + self.logFile + " > " + self.failuresFile) errorsFile.write(s.getoutput("grep -a -A5 'Perforce server error' " + self.logFile + " | grep -a failed | sort | uniq -c | sort -r")) print("\nFail errors\n") print(failure) errorsFile.close() s.run(["cat", self.errorsSummaryFile]) show_all = input("\n\nShow all errors from log (y/n)? ") if show_all == "yes" or show_all == "y": s.run(["less", self.failuresFile]) self.commandPrompt() def show_pid(self): self.probe_pid_interactive(return_to="command") def _prompt_probe_start_time(self) -> Optional[str]: while True: value = input( "\033[1mEnter startTime (YYYY/MM/DD HH:MM:SS, b = back): \033[0m" ).strip() if value in ("b", "back"): return None if value in ("x", "exit"): exitP4() if re.match(r"^20\d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}$", value): return value print("\tPlease enter startTime as YYYY/MM/DD HH:MM:SS") def probe_pid_interactive(self, *, return_to: str = "query") -> None: """Prompt for PID + startTime; probe trace DB and log tracking lines.""" pid = input("\033[1mEnter process id (b = back): \033[0m").strip() if pid in ("b", "back"): if return_to == "query": self.query_menu() else: self.commandPrompt() return if pid in ("x", "exit"): exitP4() if not pid: print(f"\n {RED}PID is required.{RESET}") if return_to == "query": self.queryPrompt() else: self.commandPrompt() return start = self._prompt_probe_start_time() if start is None: if return_to == "query": self.query_menu() else: self.commandPrompt() return try: conn = sqlite3.connect(self.databaseFile) try: row = conn.execute( "SELECT 1 FROM process WHERE pid = ? AND startTime = ?", (pid, start), ).fetchone() finally: conn.close() except sqlite3.Error as e: print(f"\n {RED}Database error: {e}{RESET}") if return_to == "query": self.queryPrompt() else: self.commandPrompt() return if not row: print( f"\n {RED}No command found for pid {pid} at startTime {start!r} " f"in {os.path.basename(self.databaseFile)}{RESET}" ) if return_to == "query": self.queryPrompt() else: self.commandPrompt() return rc = self.run_probe_pid(pid, start) if rc != 0: print(f"\n {RED}PID probe failed (exit {rc}).{RESET}") if return_to == "query": self.queryPrompt() else: self.commandPrompt() def locks_held_total_interactive(self, *, return_to: str = "query") -> None: """Prompt for optional table name; run ``locks_held_total.sql``.""" raw = input( "\033[1mEnter table name (optional, b = back) [all]: \033[0m" ).strip() if raw in ("b", "back"): if return_to == "query": self.query_menu() else: self.commandPrompt() return if raw in ("x", "exit"): exitP4() table = None if raw: table = normalize_table_parameter(raw) err = validate_table_parameter(table) if err: print(f"\n {RED}{err}.{RESET}") if return_to == "query": self.queryPrompt() else: self.commandPrompt() return rc = self._run_sql_query_token("locks_held_total.sql", table=table) if rc != 0: print(f"\n {RED}locks_held_total failed (exit {rc}).{RESET}") if return_to == "query": self.queryPrompt() else: self.commandPrompt() def sql_required_table_interactive( self, sql_token: str, *, return_to: str = "query" ) -> None: """Prompt for table name; run a ``-- p4diag: require-table`` script.""" raw = input("\033[1mEnter table name (b = back): \033[0m").strip() if raw in ("b", "back"): if return_to == "query": self.query_menu() else: self.commandPrompt() return if raw in ("x", "exit"): exitP4() table = normalize_table_parameter(raw) err = validate_table_parameter(table) if err: print(f"\n {RED}{err}.{RESET}") if return_to == "query": self.queryPrompt() else: self.commandPrompt() return rc = self._run_sql_query_token(sql_token, table=table) if rc != 0: print( f"\n {RED}{normalize_sql_basename(sql_token)} failed (exit {rc}).{RESET}" ) if return_to == "query": self.queryPrompt() else: self.commandPrompt() def locks_all_duration_interactive(self, *, return_to: str = "query") -> None: """Prompt for duration threshold (ms); run ``locks_all_duration.sql``.""" raw = input( "\033[1mEnter duration threshold in ms (b = back) [5000]: \033[0m" ).strip() if raw in ("b", "back"): if return_to == "query": self.query_menu() else: self.commandPrompt() return if raw in ("x", "exit"): exitP4() duration = raw or "5000" err = validate_duration_parameter(duration) if err: print(f"\n {RED}{err}.{RESET}") if return_to == "query": self.queryPrompt() else: self.commandPrompt() return rc = self._run_sql_query_token("locks_all_duration.sql", duration=duration) if rc != 0: print(f"\n {RED}locks_all_duration failed (exit {rc}).{RESET}") if return_to == "query": self.queryPrompt() else: self.commandPrompt() def run_probe_pid(self, pid: str, start: str) -> int: """Run ``probe_pid.sql``, per-table SQL, and log tracking excerpt.""" rc = self._run_sql_query_token("probe_pid.sql", pid=pid, start=start) if rc != 0: return rc end_time = self._probe_end_time(pid, start) self.print_probe_pid_server_error(pid, start, end_time=end_time) rc = self._run_sql_query_token( "probe_pid_tables.sql", pid=pid, start=start ) if rc != 0: return rc self.print_probe_pid_log_tracking(pid, start, end_time=end_time) return 0 def _probe_end_time(self, pid: str, start: str) -> Optional[str]: try: conn = sqlite3.connect(self.databaseFile) try: row = conn.execute( "SELECT endTime FROM process WHERE pid = ? AND startTime = ?", (str(pid), start), ).fetchone() return row[0] if row else None finally: conn.close() except sqlite3.Error: return None def print_probe_pid_server_error( self, pid: str, start: str, *, end_time: Optional[str] = None, ) -> None: """Print ``Perforce server error`` log lines for a probed command, if any.""" log_path = os.path.abspath(self.logFile) if not os.path.isfile(log_path): return if end_time is None: end_time = self._probe_end_time(pid, start) error_lines = extract_pid_server_error_from_log( log_path, pid, start, end_time=end_time ) if not error_lines: return print("", flush=True) print("=== Server error (from log) ===", flush=True) print("", flush=True) for ln in error_lines: print(ln, flush=True) def listFiles(self, path): process = s.Popen(['find', '.', '-maxdepth', '1', '-type', 'f', '-printf', '%f\n'], stdout=s.PIPE) files, err = process.communicate() #decode bytes to python string files = files.decode('utf-8') # convert to python list files = sorted(files.split('\n')) logs = [] counter = 0 for f in files: if f == "": continue f.lstrip('./') if os.path.isfile(f) and is_p4d_log(f): logs.append(f) for log in logs: if counter == 0: print(f"\n ({str(counter+1)}) {log}") else: print(f" ({str(counter+1)}) {log}") counter = counter + 1 return logs def get_column_widths(self, db_file, sql_query): rows = self._fetch_sqlite_csv_rows(db_file, sql_query) if not rows: return [] return self._column_widths_from_csv_rows(rows) def _fetch_sqlite_csv_rows(self, db_file: str, sql_query: str) -> List[List[str]]: if not sql_query.strip().endswith(";"): sql_query += ";" proc = s.run( ["sqlite3", db_file, "-header", "-csv", sql_query], stdout=s.PIPE, stderr=s.PIPE, text=True, ) if proc.returncode != 0: raise RuntimeError(f"SQLite error: {proc.stderr}") return list(csv.reader(io.StringIO(proc.stdout))) def _column_widths_from_csv_rows(self, rows: List[List[str]]) -> List[int]: if not rows: return [] ncols = max(len(row) for row in rows) widths = [0] * ncols for row in rows: for i, cell in enumerate(row): widths[i] = max(widths[i], len(str(cell))) mins = {"user": 16, "command": 48} for i, name in enumerate(rows[0]): if i < len(widths): widths[i] = max(widths[i], mins.get(str(name).lower(), 0)) return widths def run_sqlite_query_auto_width(self, db_file, sql_query, indent=False): if not sql_query.strip().endswith(";"): sql_query += ";" try: rows = self._fetch_sqlite_csv_rows(db_file, sql_query) except RuntimeError as e: print(str(e)) return if not rows: return widths = self._column_widths_from_csv_rows(rows) self._print_sqlite_csv_rows(rows, widths, sys.stdout, indent=indent) def _print_sqlite_csv_rows( self, rows: List[List[str]], widths: List[int], out, *, indent: bool = False, ) -> None: header_row = rows[0] padded_header = [ str(cell).ljust(widths[i]) for i, cell in enumerate(header_row) ] header_line = " ".join(padded_header) if indent: header_line = f"\t{header_line}" print(header_line, file=out) underline_line = " ".join("-" * widths[i] for i in range(len(widths))) if indent: underline_line = f"\t{underline_line}" print(underline_line, file=out) for row in rows[1:]: padded_row = [str(cell).ljust(widths[i]) for i, cell in enumerate(row)] line = " ".join(padded_row) if indent: line = f"\t{line}" print(line, file=out) def _run_sqlite_query_to_stream( self, db_file: str, sql_query: str, out, *, indent: bool = False, ) -> int: """Run one SQL query and write an auto-width table to ``out``. Returns exit code.""" if not sql_query.strip().endswith(";"): sql_query += ";" try: rows = self._fetch_sqlite_csv_rows(db_file, sql_query) except RuntimeError as e: print(str(e), file=sys.stderr) return 1 if rows: widths = self._column_widths_from_csv_rows(rows) self._print_sqlite_csv_rows(rows, widths, out, indent=indent) return 0 def _run_sql_read_script(self, file_body: str, pager: Optional[List[str]] = None) -> int: """Run a ``.print`` / SELECT script with auto-width columns (no sqlite3 truncation).""" use_pager = _effective_sql_pager(pager) buf = io.StringIO() if use_pager else None out = buf if buf is not None else sys.stdout for kind, payload in iter_sql_read_segments(file_body): if kind == "print": print(payload, file=out) else: rc = self._run_sqlite_query_to_stream( self.databaseFile, payload, out ) if rc != 0: return rc print(file=out) if use_pager and buf is not None: proc = s.Popen(use_pager, stdin=s.PIPE) proc.communicate(buf.getvalue().encode()) return proc.returncode or 0 return 0 def _run_sql_read_script_to_string(self, file_body: str) -> str: """Run a ``.print`` / SELECT script; return the same text the CLI would print.""" buf = io.StringIO() for kind, payload in iter_sql_read_segments(file_body): if kind == "print": print(payload, file=buf) else: rc = self._run_sqlite_query_to_stream( self.databaseFile, payload, buf ) if rc != 0: raise RuntimeError(f"Query failed in summary SQL script") print(file=buf) return buf.getvalue() def parse_query_parts(self, query: str, valid_columns: set) -> Dict: parts = query.strip().split() if not parts: print("Query is empty.") return query_type = parts[0].lower() if query_type not in {"ww", "wh", "rw", "rh"}: print (f"Unknown query type: {query_type}") return order_by_field_map = { "ww": "totalWriteWait AS \'wait(ms)\'", "wh": "totalWriteHeld AS \'held(ms)\'", "rw": "totalReadWait AS \'wait(ms)\'", "rh": "totalReadHeld AS \'held(ms)\'", } order_by_field = order_by_field_map[query_type] table_filter = "" date_filter = "" extra_columns = "" limit_value = 25 error_code="" idx = 1 table_name = None start_datetime = None end_datetime = None # 1. Optional table name if idx < len(parts) and not re.match(r'^\d{4}/\d{2}/\d{2}', parts[idx]) and not parts[idx].isdigit() and "," not in parts[idx]: table_name = parts[idx] # Strip 'db.' prefix if present if table_name not in 'rdb.lbr': if '.' in table_name: table_name = table_name.split('.')[-1] table_filter = f"AND tablename = '{table_name}'" idx += 1 # 2. Optional start and end datetime if idx + 3 < len(parts): if re.match(r'^\d{4}/\d{2}/\d{2}', parts[idx]) and re.match(r'^\d{2}:\d{2}:\d{2}', parts[idx+1]) and \ re.match(r'^\d{4}/\d{2}/\d{2}', parts[idx+2]) and re.match(r'^\d{2}:\d{2}:\d{2}', parts[idx+3]): start_datetime = f"{parts[idx]} {parts[idx+1]}" end_datetime = f"{parts[idx+2]} {parts[idx+3]}" date_filter = f"AND startTime >= '{start_datetime}' AND startTime <= '{end_datetime}'" idx += 4 elif re.match(r'^\d{4}/\d{2}/\d{2}', parts[idx]): error_code = f"Incomplete datetime provided. Both start and end datetime must be specified." # 3. Optional limit if idx < len(parts) and parts[idx].isdigit(): limit_value = int(parts[idx]) idx += 1 # 4. Remaining parts are extra columns if idx < len(parts): columns_str = " ".join(parts[idx:]).replace(" ", "") if columns_str: extra_columns_list = columns_str.split(',') invalid_columns = [col for col in extra_columns_list if col not in valid_columns] if invalid_columns: error_code = f"Invalid column(s): {', '.join(invalid_columns)}" extra_columns = ", " + ", ".join(extra_columns_list) # If date filter exists, ignore limit if date_filter: limit_value = None return { "cmd": query_type, "order_by_field": order_by_field, "table_filter": table_filter, "date_filter": date_filter, "extra_columns": extra_columns, "limit_value": limit_value, "error_code": error_code } def build_sql_query(self, parsed: dict) -> str: excluded_tables = [ 'clients', 'clientEntity', 'change', 'storageup_R', 'storagemasterup_R', 'pull' ] sql_query = ( f"SELECT pid, user, cmd, CAST(completedLapse AS INTEGER) AS \'lapse(s)\'" f"{parsed['extra_columns']}, " f"{parsed['order_by_field']} " f", tableName AS \'table\', startTime, endTime " f"FROM tableUse JOIN process USING (processKey) " f"WHERE 1=1 " f"{parsed['table_filter']} " f"{parsed['date_filter']} " ) # Exclude system tables unless user specifies a table if not parsed['table_filter']: excluded_list = ", ".join(repr(t) for t in excluded_tables) sql_query += f"AND tablename NOT IN ({excluded_list}) " # Add minimum hold/wait thresholds if table and date are filtered min_time_fields = { 'wh': 'totalWriteHeld', 'ww': 'totalWriteHeld', 'rh': 'totalReadHeld', 'rw': 'totalReadWait' } min_field = min_time_fields.get(parsed['cmd']) if parsed['table_filter'] and parsed['date_filter']: if min_field: sql_query += f"AND {min_field} > {self.min_hold_time} " # Order and limit if parsed['date_filter']: if min_field: sql_query += f"AND {min_field} > {self.min_hold_time} " sql_query += "ORDER BY startTime" else: order_field = parsed['order_by_field'].split(' AS ')[0].strip() sql_query += f" ORDER BY {order_field} DESC" if parsed['limit_value'] is not None: sql_query += f" LIMIT {parsed['limit_value']}" return sql_query def query(self, query: str): query_type = self.detect_query_type(query) handler = { 'select': self.handle_select_query, 'sql_file': self.handle_sql_file_query, 'shortcut': self.handle_shortcut_query }.get(query_type) if not query.strip(): print("No query provided. Please enter a valid query.") self.queryPrompt() return first_word = query.split()[0].lower() if query.strip() else "" if first_word == 'pid': self.probe_pid_interactive(return_to='query') return if query.strip() == '.schema': self.schema_tables_pretty() self.queryPrompt() return if query_type == 'shortcut': valid_queries = ['wh', 'ww', 'rh', 'rw', '.schema'] if first_word not in valid_queries: print(f"\n {RED}{query} not a valid command{RESET}") self.query_menu() self.queryPrompt() return if handler: handler(query) else: print("Unsupported query type.") self.queryPrompt() def detect_query_type(self, query: str) -> str: if re.search(r'\bselect\b', query, re.IGNORECASE): return 'select' if os.path.splitext(query)[1].lower() == '.sql': return 'sql_file' return 'shortcut' def handle_select_query(self, query: str): cleaned_query = query.rstrip(';') s.run(["sqlite3", self.databaseFile, "-header", "-column", cleaned_query]) self.queryPrompt() def handle_sql_file_query(self, query: str): parts = query.split(None, 1) sql_token = parts[0] extra = parts[1].strip() if len(parts) > 1 else None base = normalize_sql_basename(sql_token) if base in {"probe_pid.sql", "probe_pid_min.sql", "probe_pid_tables.sql"}: self.probe_pid_interactive(return_to="query") return if base == "locks_held_total.sql": if extra: table = normalize_table_parameter(extra.split()[0]) err = validate_table_parameter(table) if err: print(f"\n {RED}{err}.{RESET}") self.queryPrompt() return rc = self._run_sql_query_token(sql_token, table=table) else: self.locks_held_total_interactive(return_to="query") return if rc == 2: print(f"\n {RED}SQL file not found: {query}{RESET}") self.queryPrompt() return if base == "locks_all_duration.sql": if extra: duration = extra.split()[0] err = validate_duration_parameter(duration) if err: print(f"\n {RED}{err}.{RESET}") self.queryPrompt() return rc = self._run_sql_query_token(sql_token, duration=duration) else: self.locks_all_duration_interactive(return_to="query") return if rc == 2: print(f"\n {RED}SQL file not found: {query}{RESET}") self.queryPrompt() return path, body = resolve_sql_query(sql_token) file_body = "" if path is not None: try: with open(path, encoding="utf-8") as fh: file_body = fh.read() except OSError: pass elif body is not None: file_body = body if file_body and sql_file_requires_table(file_body): if extra: table = normalize_table_parameter(extra.split()[0]) err = validate_table_parameter(table) if err: print(f"\n {RED}{err}.{RESET}") self.queryPrompt() return rc = self._run_sql_query_token(sql_token, table=table) else: self.sql_required_table_interactive(sql_token, return_to="query") return if rc == 2: print(f"\n {RED}SQL file not found: {query}{RESET}") self.queryPrompt() return rc = self._run_sql_query_token(query) if rc == 2: print(f"\n {RED}SQL file not found: {query}{RESET}") self.queryPrompt() def _run_sql_query_token( self, query: str, *, pid: Optional[str] = None, start: Optional[str] = None, table: Optional[str] = None, duration: Optional[str] = None, ) -> int: """Run a library ``NAME.sql`` (disk or built-in). Returns exit code (0 = ok).""" path, body = resolve_sql_query(query) if path is not None: try: with open(path, encoding="utf-8") as fh: file_body = fh.read() except OSError as e: print(f"Could not read SQL file {path}: {e}", file=sys.stderr) return 2 return self._run_sql_body( file_body, display_name=os.path.basename(path), read_path=os.path.abspath(path), pid=pid, start=start, table=table, duration=duration, ) if body is not None: return self._run_sql_body( body, display_name=normalize_sql_basename(query), pid=pid, start=start, table=table, duration=duration, ) return 2 def _run_sql_body( self, file_body: str, *, display_name: str, read_path: Optional[str] = None, pid: Optional[str] = None, start: Optional[str] = None, table: Optional[str] = None, duration: Optional[str] = None, ) -> int: """Run SQL script text against ``self.databaseFile``. Returns exit code (0 = ok).""" if sql_file_requires_pid_start(file_body): if pid is None or start is None: print( "p4diag: " f"{display_name} requires PID and startTime:\n" f" p4diag {display_name} LOG PID 'YYYY/MM/DD HH:MM:SS'", file=sys.stderr, ) return 2 if sql_file_requires_table(file_body): if table is None: print( "p4diag: " f"{display_name} requires a table name:\n" f" p4diag {display_name} LOG TABLE", file=sys.stderr, ) return 2 err = validate_table_parameter(table) if err: print(f"p4diag: {err}", file=sys.stderr) return 2 elif sql_file_uses_table_parameter(file_body) and table is not None: err = validate_table_parameter(table) if err: print(f"p4diag: {err}", file=sys.stderr) return 2 if sql_file_requires_duration(file_body): if duration is None: print( "p4diag: " f"{display_name} requires a duration threshold in milliseconds:\n" f" p4diag {display_name} LOG MILLISECONDS", file=sys.stderr, ) return 2 err = validate_duration_parameter(duration) if err: print(f"p4diag: {err}", file=sys.stderr) return 2 pager = sql_file_pager_from_header(file_body) if pager is None and display_name in _SQL_FILES_AUTO_PAGER: pager = ["less"] sql = strip_sql_documentation_header(file_body).strip() if not sql: print(f"SQL file is empty: {display_name}", file=sys.stderr) return 2 if re.search(r"^\s*\.", sql, re.MULTILINE): if ( re.search(r"^\s*\.width\b", file_body, re.MULTILINE | re.IGNORECASE) or sql_file_requires_pid_start(file_body) or sql_file_uses_table_parameter(file_body) or sql_file_requires_duration(file_body) ): prepared = file_body else: prepared = prepare_sql_read_script(file_body) tmp_path = None try: with tempfile.NamedTemporaryFile( mode="w", suffix=".sql", delete=False, encoding="utf-8", ) as tf: tf.write(prepared) tmp_path = tf.name argv = sqlite3_cli_argv_for_read_script(file_body) if sql_file_uses_table_parameter(file_body): argv.extend( sqlite3_table_parameter_argv( table_parameter_bind_value(table) ) ) elif duration is not None: argv.extend(sqlite3_duration_parameter_argv(duration)) elif pid is not None and start is not None: argv.extend(sqlite3_pid_start_parameter_argv(pid, start)) argv.extend([self.databaseFile, f".read {tmp_path}"]) return run_sqlite3_with_optional_pager(argv, pager) finally: if tmp_path: try: os.unlink(tmp_path) except OSError: pass if not sql.rstrip().endswith(";"): sql += ";" argv = ["sqlite3", "-header", "-column"] if sql_file_uses_table_parameter(file_body): argv.extend( sqlite3_table_parameter_argv(table_parameter_bind_value(table)) ) elif duration is not None: argv.extend(sqlite3_duration_parameter_argv(duration)) elif pid is not None and start is not None: argv.extend(sqlite3_pid_start_parameter_argv(pid, start)) argv.extend([self.databaseFile, sql]) if pager: return run_sqlite3_with_optional_pager(argv, pager) if ( (pid is not None and start is not None) or sql_file_uses_table_parameter(file_body) or duration is not None ): return s.call(argv) self.run_sqlite_query_auto_width(self.databaseFile, sql) return 0 def handle_shortcut_query(self, query: str): parsed = self.parse_query_parts(query, self.VALID_COLUMNS) if parsed['error_code']: print(f"\n {RED}{parsed['error_code']}{RESET}") self.query_menu() else: if parsed['cmd'] in {'ww', 'wh', 'rw', 'rh'}: self.print_shortcut_query_info(parsed) sql_query = self.build_sql_query(parsed) if parsed['cmd'] not in {'ww', 'wh', 'rw', 'rh'}: print(f"\n{sql_query}\n") self.run_sqlite_query_auto_width(self.databaseFile, sql_query) self.queryPrompt() def execute_sql_file_noninteractive( self, sql_path: str, *, pid: Optional[str] = None, start: Optional[str] = None, table: Optional[str] = None, ) -> None: """Run a ``.sql`` file against ``self.databaseFile``; results go to stdout. Absolute ``sql_path`` selects that file. A non-absolute name uses ``P4DIAG_SQL_QUERIES/`` (adds ``.sql`` when missing). ``probe_pid*.sql`` scripts require ``pid`` and ``start`` (``@pid`` / ``@start``). ``probe_pid.sql`` always includes the per-table breakdown and log tracking. ``locks_held_total.sql`` accepts an optional ``table`` (``@table``) as the third positional argument; omit it for all tables. ``locks_table_by_cmd.sql`` requires ``table`` (``@table``); pass it as the third positional argument. ``locks_all_duration.sql`` requires ``duration`` (``@duration`` ms); pass it as the third positional argument. """ base = normalize_sql_basename(sql_path) if base == "probe_pid.sql" and pid and start: rc = self.run_probe_pid(pid, start) if rc == 2: sys.exit(2) if rc != 0: sys.exit(rc) return bind_table = table bind_duration = None if bind_table is None and bind_duration is None and pid and start is None: path, body = resolve_sql_query(sql_path) file_body = "" if path is not None: try: with open(path, encoding="utf-8") as fh: file_body = fh.read() except OSError: pass elif body is not None: file_body = body if sql_file_requires_duration( file_body ) and not sql_file_requires_pid_start(file_body): bind_duration = pid pid = None elif sql_file_uses_table_parameter( file_body ) and not sql_file_requires_pid_start(file_body): bind_table = normalize_table_parameter(pid) pid = None rc = self._run_sql_query_token( sql_path, pid=pid, start=start, table=bind_table, duration=bind_duration, ) if rc == 2: raw = sql_path.strip() tried = ( os.path.abspath(raw) if os.path.isabs(raw) else os.path.join(p4diag_sql_queries_dir(), normalize_sql_basename(raw)) ) file_body = "" if os.path.isfile(tried): try: file_body = open(tried, encoding="utf-8").read() except OSError: pass needs_pid = sql_file_requires_pid_start(file_body) and ( pid is None or start is None ) needs_table = sql_file_requires_table(file_body) and bind_table is None needs_duration = ( sql_file_requires_duration(file_body) and bind_duration is None ) if not (needs_pid or needs_table or needs_duration): print(f"SQL file not found: {tried}", file=sys.stderr) sys.exit(2) if rc != 0: sys.exit(rc) if base == "probe_pid_min.sql" and pid and start: self.print_probe_pid_log_tracking(pid, start) def print_probe_pid_log_tracking( self, pid: str, start: str, *, end_time: Optional[str] = None, ) -> None: """Print raw ``---`` tracking lines from the server log for a probed command.""" log_path = os.path.abspath(self.logFile) if not os.path.isfile(log_path): print( f"\n(Log file not found for tracking excerpt: {log_path})", file=sys.stderr, ) return if end_time is None: end_time = self._probe_end_time(pid, start) if not QUIET and getattr(sys.stderr, "isatty", lambda: False)(): print( "Scanning server log for tracking data (large logs may take a moment)...", file=sys.stderr, flush=True, ) cmd_line, tracking = extract_pid_tracking_from_log( log_path, pid, start, end_time=end_time ) print("") print("=== Log tracking (from server log) ===") print("") if cmd_line and not any( ln == cmd_line or ln.endswith(cmd_line) for ln in tracking[:3] ): print(cmd_line) if not tracking: print( f"(no tracking lines found for pid {pid} at {start} in {os.path.basename(log_path)})" ) return for ln in tracking: print(ln) def schema_tables_pretty(self) -> None: """Print readable column lists (with log2sql DDL comments) for trace tables.""" conn = sqlite3.connect(self.databaseFile) try: master = conn.execute( "SELECT name, sql FROM sqlite_master " "WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name" ).fetchall() if not master: print("(no tables in database)") return print( f"\n{BOLD}log2sql trace database:{RESET} " f"{os.path.basename(self.databaseFile)}\n" ) for tbl, create_sql in master: comments = _column_comments_from_create_sql(create_sql) rows = conn.execute(f'PRAGMA table_info("{tbl}")').fetchall() if not rows: print(f"-- {tbl}\n (table missing or empty)\n") continue display = [] for _cid, col, typ, notnull, _dflt, pk in rows: flags = [] if pk: flags.append("PK") if notnull: flags.append("NOT NULL") desc = comments.get(col, "") if len(desc) > 60: desc = desc[:57] + "..." display.append( (col, typ, ", ".join(flags) if flags else "-", desc) ) print(f"-- {tbl}") print( tabulate( display, headers=["column", "type", "flags", "description"], tablefmt="simple", ) ) print() finally: conn.close() def print_shortcut_query_info(self, parsed: dict): label = _LOCK_CONTENTION_SHORTCUT_LABELS.get(parsed["cmd"], "Lock contention") if not parsed['table_filter'] and not parsed['date_filter']: print(f"\n{label}") elif parsed['table_filter'] and parsed['date_filter']: table_name = re.search(r"tablename = '([^']+)'", parsed['table_filter']) table_name = table_name.group(1) if table_name else "Unknown" date_range = re.findall(r"\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}", parsed['date_filter']) if len(date_range) == 2: date_str = f"between {date_range[0]} and {date_range[1]}" else: date_str = "Unknown date range" print(f"\n{label} ({table_name}, {date_str})") def usage_log2sql(): print("") print("The \033[1mquery\033[0m option provides a list of canned queries to run. You can also enter custom") print("SQL queries, providing SQL directly at the query prompt or by passing a *.sql filename") print("containing SQL queries.") print("") print("The \033[1msummary\033[0m option runs a series of SQL queries against the current trace database.") def exitP4(): sys.exit(0) def validateInput(inputNum, listLen): if inputNum < 0 or inputNum >= listLen: print(f"\nPlease enter a number between 1 and {listLen}") return False return True def _column_comments_from_create_sql(create_sql: Optional[str]) -> Dict[str, str]: """Map column name → trailing ``--`` comment from log2sql ``CREATE TABLE`` DDL.""" if not create_sql: return {} comments: Dict[str, str] = {} for line in create_sql.splitlines(): line = line.strip().rstrip(",") if not line or line.upper().startswith("CREATE") or line.upper().startswith("PRIMARY"): continue m = re.match(r"(\w+)\s+.+?--\s*(.+)$", line) if m: comments[m.group(1)] = m.group(2).strip() return comments def list_sql_library_files(directory: str) -> List[str]: """Sorted ``*.sql`` basenames under ``directory`` (empty if missing or none).""" if not os.path.isdir(directory): return [] return sorted( name for name in os.listdir(directory) if name.endswith(".sql") and os.path.isfile(os.path.join(directory, name)) ) def print_help_query_table( rows: List[Tuple[str, str]], *, indent: str = " ", col1: str = "Query", col2: str = "Best for", ) -> None: """Print a two-column help table with box-drawing borders.""" if not rows: return query_w = max(len(col1), *(len(query) for query, _ in rows)) best_w = max(len(col2), *(len(best_for) for _, best_for in rows)) def border(left: str, mid: str, right: str) -> str: return ( indent + left + "─" * (query_w + 2) + mid + "─" * (best_w + 2) + right ) def cell(text: str, width: int) -> str: return f" {text}{' ' * (width - len(text))} " def data_row(query: str, best_for: str) -> str: return ( indent + "│" + cell(query, query_w) + "│" + cell(best_for, best_w) + "│" ) print(border("┌", "┬", "┐")) print(data_row(col1, col2)) print(border("├", "┼", "┤")) for query, best_for in rows: print(data_row(query, best_for)) print(border("└", "┴", "┘")) def print_sql_library_help(queries: List[str]) -> None: """Help tables for shipped SQL library reports (lock section + other reports).""" query_set = set(queries) lock_rows = [ (name, sql_library_summary(name)) for name in _LOCK_QUERY_HELP_ORDER if name in query_set ] if lock_rows: print(f" {BOLD}Lock contention queries:{RESET}\n") print_help_query_table(lock_rows) print() other_names = sorted( name for name in queries if name not in _LOCK_QUERY_HELP_ORDER ) if other_names: other_rows = [(name, sql_library_summary(name)) for name in other_names] print(f" {BOLD}Other reports:{RESET}\n") print_help_query_table(other_rows) print() def print_numbered_list(items: List[str]) -> None: for i, item in enumerate(items, 1): print(f" ({i}) {item}") def print_numbered_sql_library(queries: List[str]) -> None: """Numbered SQL library list with a one-line purpose per report.""" for i, name in enumerate(queries, 1): print(f" ({i}) {name}") print(f" {sql_library_summary(name)}") def listFiles(path): # List files at a provided path (trim log picker). Used by 'ls' and legacy callers. if not os.path.isdir(path): print(f" {YELLOW}Directory not found:{RESET} {path}") return [] names = sorted( name for name in os.listdir(path) if os.path.isfile(os.path.join(path, name)) ) print_numbered_list(names) return names def is_p4d_log(logfile, max_lines=2000): pattern = re.compile(r"20.* pid ") def read_lines(file_obj): for i, line in enumerate(file_obj): if i >= max_lines: break if pattern.search(line): return True return False open_func = gzip.open if logfile.endswith('.gz') else open mode = 'rt' if logfile.endswith('.gz') else 'r' try: with open_func(logfile, mode, errors='ignore') as f: return read_lines(f) except Exception as e: return False def p4diag_summary_html_basename(log_token: str) -> str: """Basename of ``LOG.summary.html`` under ``.p4diagnostics`` for a log path or basename.""" base = os.path.basename((log_token or "").rstrip("/")) if base.lower().endswith(".db"): base = base[:-3] return base + ".summary.html" def _resolve_summary_html_index(diag_dir: str, log_token: Optional[str] = None) -> Optional[str]: """Basename of the summary HTML to serve at ``/`` (existing file under ``diag_dir``).""" if log_token: candidate = p4diag_summary_html_basename(log_token) if os.path.isfile(os.path.join(diag_dir, candidate)): return candidate newest_path: Optional[str] = None newest_mtime = -1.0 try: names = os.listdir(diag_dir) except OSError: return None for name in names: if not name.endswith(".summary.html"): continue path = os.path.join(diag_dir, name) if not os.path.isfile(path): continue mtime = os.path.getmtime(path) if mtime > newest_mtime: newest_mtime = mtime newest_path = name return newest_path class P4diagHTTPRequestHandler(http.server.SimpleHTTPRequestHandler): """Serve ``.p4diagnostics``; ``/`` shows ``LOG.summary.html`` when available.""" log_token: Optional[str] = None def __init__(self, *args, directory=None, log_token=None, **kwargs): self.log_token = log_token super().__init__(*args, directory=directory, **kwargs) def log_message(self, format, *args): pass def _index_document(self) -> Optional[str]: return _resolve_summary_html_index(self.directory, self.log_token) def list_directory(self, path): self.send_error( 404, "No summary report yet. Run: p4diag summary LOG (or use the interactive log menu).", ) return None def do_GET(self): parsed = urlparse(self.path) req_path = unquote(parsed.path) if req_path in ("", "/"): index_doc = self._index_document() if index_doc: self.path = "/" + index_doc return super().do_GET() class ReusableTCPServer(socketserver.TCPServer): allow_reuse_address = True def is_port_in_use(port): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: return sock.connect_ex(('0.0.0.0', port)) == 0 def _web_server_reachable_ipv4() -> Optional[str]: """IPv4 on the default-route interface (VPN/LAN), for browser on another host.""" try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: sock.connect(("8.8.8.8", 80)) return sock.getsockname()[0] except OSError: return None def _web_server_display_url(port: int) -> str: """URL shown at startup; bind is still on all interfaces (``""``).""" host = _web_server_reachable_ipv4() or "127.0.0.1" return f"http://{host}:{port}/" def _print_web_server_urls(port: int, *, already_running: bool, detail: str = "") -> None: url = _web_server_display_url(port) prefix = "Web server already running" if already_running else "Starting web server on" line = f"{prefix} {url}" if detail: line += f" {detail}" print(line) lan_ip = _web_server_reachable_ipv4() if lan_ip: print(f" Browser on this host only: http://127.0.0.1:{port}/") def start_web_server(path, port, log_token: Optional[str] = None): if is_port_in_use(port): _print_web_server_urls(port, already_running=True) return def run_server(): handler = partial( P4diagHTTPRequestHandler, directory=path, log_token=log_token, ) index_document = _resolve_summary_html_index(path, log_token) if index_document: detail = f"({index_document})" else: detail = "(summary HTML appears here after log2sql summary)" _print_web_server_urls(port, already_running=False, detail=detail) with ReusableTCPServer(("", port), handler) as httpd: httpd.serve_forever() thread = threading.Thread(target=run_server, daemon=True) thread.start() def _preflight_log2sql_log_exists(file, start, end) -> None: """Exit with code 2 if the log path for ``log2sql`` is not usable. Does not run ``trim_log`` (avoids duplicate work). Call from ``main`` before starting the web UI so a missing log is reported without printing \"Web server already running\". """ if start is not None and end is not None: _p = os.path.abspath(file) if not os.path.isfile(_p): print( f"p4diag: log file not found: {file!r} (resolved: {_p!r})", file=sys.stderr, ) sys.exit(2) return l = log2sql(file) l.setLogFile(show_menu=False) _log_in = os.path.abspath(l.logFile) if not os.path.isfile(_log_in): print( f"p4diag: log file not found: {l.logFile!r} (resolved: {_log_in!r})", file=sys.stderr, ) sys.exit(2) def handle_log2sql(file, start=None, end=None): if (start is None) != (end is None): print( "p4diag: --start and --end must be provided together", file=sys.stderr, ) sys.exit(2) global l l = log2sql(file) if start != None and end != None: trimmed_log = l.trim_log(start,end) l.logFile = l.databaseFile = trimmed_log l.setLogFile(show_menu=False) _log_in = os.path.abspath(l.logFile) if not os.path.isfile(_log_in): print( f"p4diag: log file not found: {l.logFile!r} (resolved: {_log_in!r})", file=sys.stderr, ) sys.exit(2) l.createLogStats() l.createDatabase() l.createLogSummary() l.createPlots() if not QUIET: l.menu() def parse_datetime(dt_str): """Parse and validate date string in YYYY/MM/DD HH:MM:SS format.""" try: datetime.strptime(dt_str, "%Y/%m/%d %H:%M:%S") return dt_str except ValueError: raise argparse.ArgumentTypeError(f"Invalid date format: '{dt_str}'. Use YYYY/MM/DD HH:MM:SS") # --- Log statistics cache (keyed on source P4 log path + mtime + size) --- # Bump when log-stats grep logic changes so ``.stats.meta.json`` is invalidated. LOG_STATS_CACHE_VERSION = 2 SUMMARY_CACHE_VERSION = 3 def log_stats_meta_path(stats_txt_path: str) -> str: """Sidecar JSON next to ``*.stats.txt`` (e.g. ``log.stats.meta.json``).""" if stats_txt_path.endswith(".stats.txt"): return stats_txt_path[: -len(".stats.txt")] + ".stats.meta.json" return stats_txt_path + ".stats.meta.json" def _log_source_signature(log_path: str) -> Optional[Tuple[str, float, int]]: if not log_path or not os.path.isfile(log_path): return None ab = os.path.abspath(log_path) st = os.stat(ab) return (ab, st.st_mtime, st.st_size) def _log_stats_meta_read(meta_path: str) -> Optional[Dict[str, Any]]: try: with open(meta_path, "r", encoding="utf-8") as fh: return json.load(fh) except (OSError, json.JSONDecodeError, TypeError): return None def _log_stats_meta_write(meta_path: str, sig: Tuple[str, float, int]) -> None: payload = { "version": LOG_STATS_CACHE_VERSION, "log_path": sig[0], "log_mtime": sig[1], "log_size": sig[2], } try: with open(meta_path, "w", encoding="utf-8") as fh: json.dump(payload, fh, indent=2) except OSError: pass def log_stats_cache_is_current( log_path: str, stats_txt_path: str, meta_path: str ) -> bool: """True if ``stats_txt_path`` matches the current signature of ``log_path``.""" if not os.path.isfile(stats_txt_path) or not os.path.isfile(meta_path): return False cur = _log_source_signature(log_path) if cur is None: return False meta = _log_stats_meta_read(meta_path) if not meta or meta.get("version") != LOG_STATS_CACHE_VERSION: return False if meta.get("log_path") != cur[0]: return False if meta.get("log_mtime") != cur[1] or meta.get("log_size") != cur[2]: return False return True def summary_meta_path(summary_txt_path: str) -> str: """Sidecar JSON next to ``*.summary.txt`` (e.g. ``log.summary.meta.json``).""" if summary_txt_path.endswith(".summary.txt"): return summary_txt_path[: -len(".summary.txt")] + ".summary.meta.json" return summary_txt_path + ".summary.meta.json" def _summary_meta_read(meta_path: str) -> Optional[Dict[str, Any]]: try: with open(meta_path, "r", encoding="utf-8") as fh: return json.load(fh) except (OSError, json.JSONDecodeError, TypeError): return None def _summary_meta_write(meta_path: str, payload: Dict[str, Any]) -> None: try: with open(meta_path, "w", encoding="utf-8") as fh: json.dump(payload, fh, indent=2) except OSError: pass def _summary_inputs_signature( log_path: str, db_path: str, stats_path: str, ) -> Optional[Dict[str, Any]]: """JSON-serializable signatures for summary cache invalidation.""" log_sig = _log_source_signature(log_path) db_sig = _log_source_signature(db_path) stats_sig = _log_source_signature(stats_path) if log_sig is None or db_sig is None: return None out: Dict[str, Any] = { "log_path": log_sig[0], "log_mtime": log_sig[1], "log_size": log_sig[2], "db_path": db_sig[0], "db_mtime": db_sig[1], "db_size": db_sig[2], } if stats_sig is not None: out["stats_path"] = stats_sig[0] out["stats_mtime"] = stats_sig[1] out["stats_size"] = stats_sig[2] else: out["stats_path"] = None return out def summary_cache_is_current( log_path: str, db_path: str, stats_path: str, summary_txt_path: str, summary_html_path: str, meta_path: str, ) -> bool: """True if text/HTML summary reports match current log, DB, and stats inputs.""" if not ( os.path.isfile(summary_txt_path) and os.path.isfile(summary_html_path) and os.path.isfile(meta_path) ): return False if _summary_html_needs_regen(summary_html_path): return False cur = _summary_inputs_signature(log_path, db_path, stats_path) if cur is None: return False meta = _summary_meta_read(meta_path) if not meta or meta.get("version") != SUMMARY_CACHE_VERSION: return False if meta.get("layout_version") != SUMMARY_HTML_LAYOUT_VERSION: return False if meta.get("recommendations_sig") != _recommendations_rules_signature(): return False for key in ( "log_path", "log_mtime", "log_size", "db_path", "db_mtime", "db_size", "stats_path", "stats_mtime", "stats_size", ): if meta.get(key) != cur.get(key): return False return True def quiet_list_sql_queries() -> None: """Print sorted built-in + disk ``*.sql`` basenames, one per line.""" names = list_sql_library_names() if not names: d = p4diag_sql_queries_dir() print(f"No library queries (checked built-in and {d!r})", file=sys.stderr) sys.exit(2) for n in names: print(n) # --- Victim/culprit (aligned with p4sla.py write-wait analysis; sqlite3 only, no pandas) --- VC_HOLD_LOOKBACK = timedelta(hours=4) VC_TIME_MARGIN = timedelta(minutes=4) # Pool: top N global rows by totalWriteWait; then up to M rows per tableName within that pool. VC_VICTIM_GLOBAL_POOL = 50 VC_VICTIM_PER_TABLE = 5 VC_SQL = f""" WITH base AS ( SELECT tableName, user, startTime, endTime, pid, cmd, args, CAST(completedLapse AS INTEGER) AS lapse, totalWriteWait FROM tableUse JOIN process USING (processKey) WHERE tableName NOT IN ('clients', 'clientEntity', 'change', 'storageup_R', 'storagemasterup_R', 'pull') AND totalWriteWait > 5000 AND completedLapse > 0 ), global_pool AS ( SELECT *, ROW_NUMBER() OVER (ORDER BY totalWriteWait DESC) AS rn_global FROM base ), top_global AS ( SELECT * FROM global_pool WHERE rn_global <= {VC_VICTIM_GLOBAL_POOL} ), per_table AS ( SELECT *, ROW_NUMBER() OVER (PARTITION BY tableName ORDER BY totalWriteWait DESC) AS rn_table FROM top_global ) SELECT tableName, user, startTime, endTime, pid, cmd, args, lapse, totalWriteWait FROM per_table WHERE rn_table <= {VC_VICTIM_PER_TABLE} ORDER BY totalWriteWait DESC """ # Bump when VC_SQL / panel shape changes so old .victim_culprit.pkl files are ignored. VC_PANELS_CACHE_VERSION = 2 VC_PANELS_CACHE_SUFFIX = ".victim_culprit.pkl" def _victim_culprit_cache_path(database_file: str) -> str: ab = os.path.abspath(database_file) d, base = os.path.split(ab) stem = base[:-3] if base.lower().endswith(".db") else base return os.path.join(d, stem + VC_PANELS_CACHE_SUFFIX) def _victim_culprit_db_stat(db_path: str) -> Optional[Tuple[float, int]]: try: st = os.stat(db_path) return (st.st_mtime, st.st_size) except OSError: return None def _load_victim_culprit_panels_cache(database_file: str) -> Optional[list]: ab = os.path.abspath(database_file) stat = _victim_culprit_db_stat(ab) if stat is None: return None cache_path = _victim_culprit_cache_path(ab) if not os.path.isfile(cache_path): return None try: with open(cache_path, "rb") as fh: payload = pickle.load(fh) except (OSError, pickle.UnpicklingError, EOFError, TypeError, AttributeError): return None if not isinstance(payload, dict): return None if payload.get("version") != VC_PANELS_CACHE_VERSION: return None if payload.get("db_path") != ab: return None if payload.get("db_mtime") != stat[0] or payload.get("db_size") != stat[1]: return None panels = payload.get("panels") if not isinstance(panels, list): return None return panels def _save_victim_culprit_panels_cache(database_file: str, panels: list) -> None: ab = os.path.abspath(database_file) stat = _victim_culprit_db_stat(ab) if stat is None: return cache_path = _victim_culprit_cache_path(ab) payload = { "version": VC_PANELS_CACHE_VERSION, "db_path": ab, "db_mtime": stat[0], "db_size": stat[1], "panels": panels, } try: with open(cache_path, "wb") as fh: pickle.dump(payload, fh, protocol=pickle.HIGHEST_PROTOCOL) except OSError: pass def get_victim_culprit_panels_cached(database_file: str) -> list: """Victim/culprit panels; recomputes only when the log2sql DB mtime/size changes.""" loaded = _load_victim_culprit_panels_cache(database_file) if loaded is not None: return loaded panels = compute_victim_culprit_panels(database_file) _save_victim_culprit_panels_cache(database_file, panels) return panels def _vc_sql_quote_ident(s: str) -> str: return str(s).replace("'", "''") def _vc_format_ms(val: Any) -> str: try: return f"{int(round(float(val))):,}" except (TypeError, ValueError): return str(val) def _vc_row_to_dict(row: sqlite3.Row) -> Dict[str, Any]: return {k: row[k] for k in row.keys()} def _vc_fetchall_dicts(conn: sqlite3.Connection, sql: str) -> list: cur = conn.execute(sql) rows = cur.fetchall() if not rows: return [] return [_vc_row_to_dict(r) for r in rows] def _vc_exclude_victim_pid(culprits: list, victim: Dict[str, Any]) -> list: v_pid = victim.get("pid") if v_pid is None: return culprits try: v_n = int(float(v_pid)) except (TypeError, ValueError): return culprits out = [] for r in culprits: try: if int(float(r.get("pid"))) == v_n: continue except (TypeError, ValueError): pass out.append(r) return out def _vc_one_panel(conn: sqlite3.Connection, idx: int, row: Dict[str, Any]) -> Dict[str, Any]: t_name = row.get("tableName") or row.get("tablename") or "UnknownTable" v_wait = row.get("totalWriteWait", 0) cmd = str(row.get("cmd") or "").strip() or "(unknown)" args_s = str(row.get("args") or "").strip() or "(none)" label = f"Victim: {cmd} ({_vc_format_ms(v_wait)}ms write wait on {t_name})" code = ( f"PID: {row.get('pid', '')}\n" f"Start: {row.get('startTime', '')}\n" f"End: {row.get('endTime', '')}\n" f"Cmd: {cmd}\n" f"Args: {args_s}" ) ks = str(idx) tq = _vc_sql_quote_ident(t_name) try: start_dt = datetime.strptime(str(row["startTime"]).strip(), "%Y/%m/%d %H:%M:%S") end_dt = datetime.strptime(str(row["endTime"]).strip(), "%Y/%m/%d %H:%M:%S") hold_search_start = (start_dt - VC_HOLD_LOOKBACK).strftime("%Y/%m/%d %H:%M:%S") adj_end = (end_dt + VC_TIME_MARGIN).strftime("%Y/%m/%d %H:%M:%S") wait95 = round(float(v_wait) * 0.95) except (ValueError, KeyError, TypeError): return { "type": "panel", "label": label, "code": code, "tableName": t_name, "victim_cmd": cmd, "culprits": None, "key_suffix": ks, } direct_sql = ( f"SELECT pid, user, cmd, " f'totalReadHeld AS "totalReadHeld(ms)", totalWriteHeld AS "totalWriteHeld(ms)", ' f"tableName, startTime " f"FROM tableUse JOIN process USING (processKey) WHERE tableName = '{tq}' " f"AND startTime >= '{hold_search_start}' AND startTime <= '{adj_end}' " f"AND (totalReadHeld > {wait95} OR totalWriteHeld > {wait95}) ORDER BY startTime" ) culprits = _vc_fetchall_dicts(conn, direct_sql) culprits = _vc_exclude_victim_pid(culprits, row) if culprits: return { "type": "panel", "label": label, "code": code, "tableName": t_name, "victim_cmd": cmd, "culprits": culprits, "key_suffix": ks, } vst = _vc_sql_quote_ident(str(row["startTime"]).strip()) overlap_sql = ( f"SELECT pid, user, cmd, " f'totalReadHeld AS "totalReadHeld(ms)", totalWriteHeld AS "totalWriteHeld(ms)", ' f"tableName, startTime " f"FROM tableUse JOIN process USING (processKey) WHERE tableName = '{tq}' " f"AND startTime >= '{hold_search_start}' AND startTime <= '{vst}' " f"AND (totalReadHeld > 2000 OR totalWriteHeld > 2000) ORDER BY startTime" ) culprits2 = _vc_fetchall_dicts(conn, overlap_sql) culprits2 = _vc_exclude_victim_pid(culprits2, row) if culprits2: return { "type": "panel", "label": label, "code": code, "tableName": t_name, "victim_cmd": cmd, "culprits": culprits2, "key_suffix": ks, } return { "type": "panel", "label": label, "code": code, "tableName": t_name, "victim_cmd": cmd, "culprits": None, "key_suffix": ks, } def compute_victim_culprit_panels(database_file: str) -> list: """Return panel dicts (same roles as p4sla ``compute_write_waiter_panels``). Victims are selected with ``VC_SQL``: up to ``VC_VICTIM_PER_TABLE`` rows per ``tableName`` within the top ``VC_VICTIM_GLOBAL_POOL`` global write-wait rows. """ db_path = os.path.abspath(database_file) if not os.path.isfile(db_path): return [{"type": "error", "message": f"Database not found: {db_path}"}] conn = None try: conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row victims = _vc_fetchall_dicts(conn, VC_SQL) if not victims: return [{"type": "warning", "message": "No significant write waits (> 5s) found."}] panels = [] for i, row in enumerate(victims): panels.append(_vc_one_panel(conn, i, row)) return panels except sqlite3.Error as e: return [{"type": "error", "message": str(e)}] finally: if conn: conn.close() def format_victim_culprit_ascii(panels: list, include_banner: bool = True) -> str: """Fixed-width friendly text for terminals. If ``include_banner`` is False, omit the top title lines (for embedding in the combined summary file where a section header is printed above this block). """ lines = [] if include_banner: lines.append("VICTIM / CULPRIT REPORT (write-wait drill-down)") lines.append( f"Victims: up to {VC_VICTIM_PER_TABLE} rows per table among the top " f"{VC_VICTIM_GLOBAL_POOL} global write-wait rows (>5s, excluded tables filtered)." ) lines.append("Each victim is a command with high write-wait on a table; culprits hold read/write") lines.append( f"locks on that table in a window from {int(VC_HOLD_LOOKBACK.total_seconds() // 3600)}h " "before the victim start through shortly after the victim end." ) lines.append("") if not panels: lines.append("(no panels)") return "\n".join(lines) + "\n" first = panels[0] if first.get("type") == "warning": lines.append(first.get("message", "")) return "\n".join(lines) + "\n" if first.get("type") == "error": lines.append(first.get("message", "Error")) return "\n".join(lines) + "\n" for p in panels: if p.get("type") != "panel": continue lines.append("-" * 72) lines.append(p.get("label", "")) lines.append("") lines.append( " Victim process (blocked — waiting to write; PID/start/end/cmd/args below identify the victim):" ) for cl in str(p.get("code", "")).splitlines(): lines.append(" " + cl) cul = p.get("culprits") lines.append("") if cul: lines.append( " Culprits (other processes holding read/write locks on this table in the " "search window; victim PID excluded):" ) lines.append("") hdr = list(cul[0].keys()) tab = [[str(r.get(h, "")) for h in hdr] for r in cul] lines.append(tabulate(tab, headers=hdr, tablefmt="simple")) else: lines.append( " Culprits: (none) — no other lock-holder rows matched the time window " "and thresholds." ) lines.append("") return "\n".join(lines).rstrip() + "\n" def format_victim_culprit_html(panels: list) -> str: """HTML fragment for summary page (no outer ).""" parts = [ '
', "

Up to " f"{VC_VICTIM_PER_TABLE} rows per table among the top {VC_VICTIM_GLOBAL_POOL} global write-wait rows; " "culprits are processes holding read/write locks on the same table.

", ] if not panels: parts.append("

(no data)

") return "\n".join(parts) first = panels[0] if first.get("type") == "warning": parts.append(f"

{html.escape(first.get('message', ''))}

") return "\n".join(parts) if first.get("type") == "error": parts.append(f"

Error: {html.escape(first.get('message', ''))}

") return "\n".join(parts) for p in panels: if p.get("type") != "panel": continue parts.append("

" + html.escape(p.get("label", "")) + "

") parts.append( "

Victim process — blocked, waiting to write; " "PID/start/end/cmd/args below identify the victim.

" ) parts.append("
" + html.escape(str(p.get("code", ""))) + "
") cul = p.get("culprits") if cul: parts.append( "

Culprits — other processes holding read/write locks on " "this table in the search window (victim PID excluded).

" ) hdr = list(cul[0].keys()) tab = [[str(r.get(h, "")) for h in hdr] for r in cul] parts.append("
" + html.escape(tabulate(tab, headers=hdr, tablefmt="simple")) + "
") else: parts.append( "

Culprits: none — no other lock-holder rows matched the time window " "and thresholds.

" ) parts.append("") return "\n".join(parts) _SUMMARY_QUERY_MARKER_RE = re.compile(r"^@@QUERY@@(.+)@@\s*$") def _summary_sql_sections_plain_text(section_text: str) -> str: """Strip ``@@QUERY@@…@@`` markers for the plain-text summary file.""" out_lines: List[str] = [] for line in section_text.splitlines(keepends=True): m = _SUMMARY_QUERY_MARKER_RE.match(line.strip()) if m: out_lines.append(m.group(1).strip() + ("\n" if line.endswith("\n") else "")) else: out_lines.append(line) return "".join(out_lines) def _summary_html_section_id(section_title: str) -> str: """Stable fragment id for a detailed SQL section heading.""" slug = re.sub(r"[^a-z0-9-]+", "", section_title.strip().lower().replace(" ", "-")) return f"summary-{slug}" if slug else "summary-section" def _summary_sql_sections_to_html(section_text: str) -> str: """Turn section headers, query titles, and result tables into HTML.""" if not section_text.strip(): return "

(no detailed SQL sections)

" out: list = ['
'] pre_buf: list = [] def flush_pre() -> None: nonlocal pre_buf if not pre_buf: return text = "".join(pre_buf) if not text.strip(): pre_buf = [] return out.append("
" + html.escape(text) + "
") pre_buf = [] for line in section_text.splitlines(keepends=True): stripped = line.strip() qm = _SUMMARY_QUERY_MARKER_RE.match(stripped) if qm: flush_pre() out.append( "

" + html.escape(qm.group(1).strip()) + "

" ) continue sm = re.match(r"^--- (.+) ---\s*$", stripped) if sm: flush_pre() title = sm.group(1).strip() sec_id = _summary_html_section_id(title) out.append(f'

' + html.escape(title) + "

") else: pre_buf.append(line) flush_pre() out.append("
") return "\n".join(out) # Bump when summary HTML section order or styling changes (triggers HTML regen). SUMMARY_HTML_LAYOUT_VERSION = 9 _SUMMARY_HTML_STYLES = """ :root { color-scheme: light; --bg: #eef1f5; --card: #ffffff; --ink: #1e293b; --muted: #64748b; --accent: #1e4d8c; --accent-soft: #e8f0fa; --border: #d8dee9; --pre-bg: #f8fafc; --shadow: 0 1px 3px rgba(15, 23, 42, 0.08), 0 4px 14px rgba(15, 23, 42, 0.06); } * { box-sizing: border-box; } body { font-family: "Segoe UI", system-ui, -apple-system, sans-serif; background: var(--bg); color: var(--ink); margin: 0; padding: 16px 12px 40px; line-height: 1.45; } .report-wrap { max-width: min(96vw, 1680px); width: 100%; margin: 0 auto; } .report-header { background: linear-gradient(135deg, #1e4d8c 0%, #2d6bb3 100%); color: #fff; border-radius: 10px; padding: 22px 26px; margin-bottom: 24px; box-shadow: var(--shadow); } .report-header h1 { margin: 0 0 8px; font-size: 1.65rem; font-weight: 600; letter-spacing: -0.02em; } .report-meta { margin: 0; font-size: 0.9rem; opacity: 0.92; font-family: ui-monospace, "Cascadia Code", "Consolas", monospace; } .report-meta span { display: block; margin-top: 4px; } section.section-card { background: var(--card); border: 1px solid var(--border); border-radius: 10px; padding: 20px 22px 22px; margin-bottom: 22px; box-shadow: var(--shadow); } section.section-card > h2 { margin: 0 0 14px; font-size: 1.2rem; font-weight: 600; color: var(--accent); border-bottom: 2px solid var(--accent-soft); padding-bottom: 8px; } .section-lead { margin: 0 0 14px; color: var(--muted); font-size: 0.95rem; } .stats-block pre, .vc-report pre, .sql-sections pre { white-space: pre; overflow-x: auto; overflow-y: visible; max-width: 100%; font-family: ui-monospace, "Cascadia Code", "Consolas", monospace; font-size: 0.84rem; line-height: 1.4; background: var(--pre-bg); border: 1px solid var(--border); border-radius: 6px; padding: 14px 16px; margin: 0 0 12px; -webkit-overflow-scrolling: touch; } .gallery { display: flex; flex-direction: column; gap: 22px; } .plot { background: var(--pre-bg); border: 1px solid var(--border); border-radius: 8px; padding: 14px; text-align: center; } .plot a { display: block; line-height: 0; cursor: zoom-in; } .plot a:hover img { box-shadow: 0 4px 16px rgba(15, 23, 42, 0.12); } .plot a:focus-visible { outline: 2px solid var(--accent); outline-offset: 3px; border-radius: 4px; } .plot img { max-width: 100%; width: auto; height: auto; border-radius: 4px; vertical-align: middle; } .filename { margin-top: 10px; font-size: 0.8rem; color: var(--muted); word-break: break-all; } .filename a { color: var(--accent); text-decoration: none; cursor: pointer; } .filename a:hover { text-decoration: underline; } .sql-sections h3 { margin: 22px 0 10px; font-size: 1.05rem; font-weight: 700; color: var(--accent); } .sql-sections h3:first-child { margin-top: 0; } .sql-sections h4.query-title { margin: 16px 0 8px; font-size: 0.98rem; font-weight: 700; color: #1e293b; line-height: 1.35; } .vc-report h3 { margin: 20px 0 8px; font-size: 1rem; font-weight: 600; color: #334155; } .vc-report h3:first-of-type { margin-top: 0; } .vc-report p { margin: 0 0 10px; } .empty-note { color: var(--muted); margin: 0; } .plots-empty { color: var(--muted); font-size: 0.95rem; margin: 0; } .recommendations { display: flex; flex-direction: column; gap: 18px; } .rec-item { border: 1px solid var(--border); border-radius: 8px; padding: 14px 16px; background: #fafbfc; } .rec-item h3 { margin: 0 0 10px; font-size: 1rem; font-weight: 700; color: #1e293b; line-height: 1.35; } .rec-badge { display: inline-block; font-size: 0.72rem; font-weight: 700; letter-spacing: 0.04em; padding: 2px 8px; border-radius: 999px; margin-right: 8px; vertical-align: middle; } .rec-likely .rec-badge { background: #fef3c7; color: #92400e; } .rec-possible .rec-badge { background: #e0e7ff; color: #3730a3; } .rec-evidence { margin: 0 0 10px 1.1rem; padding: 0; color: #334155; font-size: 0.92rem; } .rec-mitigation-label { margin: 0 0 6px; font-size: 0.92rem; color: #334155; } .rec-mitigation { margin: 0 0 0 1.1rem; padding: 0; color: #334155; font-size: 0.92rem; line-height: 1.45; } .rec-mitigation li { margin-bottom: 6px; } .rec-mitigation li:last-child { margin-bottom: 0; } .rec-mitigation a { color: var(--accent); text-decoration: none; } .rec-mitigation a:hover { text-decoration: underline; } """ def _summary_html_needs_regen(html_path: str) -> bool: """True if ``html_path`` is missing or was produced by an older layout version.""" if not os.path.isfile(html_path): return True marker = f"p4diag-summary-layout:{SUMMARY_HTML_LAYOUT_VERSION}" try: with open(html_path, encoding="utf-8", errors="replace") as fh: return marker not in fh.read(512) except OSError: return True def render_summary_html_report( *, log_file: str, db_file: str, stats_html_inner: str, recommendations_html: str, png_entries: List[Tuple[str, str]], plots_empty_note: str, sql_sections_html: str, vc_html: str, ) -> str: """Full HTML document for ``.summary.html`` (stats, recommendations, plots, SQL, VC).""" log_esc = html.escape(os.path.basename(log_file)) db_esc = html.escape(os.path.basename(db_file)) parts = [ f"\n", "\n\n\n", "\n", "\n", "P4 Diagnostics Summary — ", log_esc, "\n\n\n\n
\n", "
\n", "

P4 Diagnostics Summary

\n", "

", f"Log: {html.escape(log_file)}", f"Database: {html.escape(db_file)}", "

\n
\n", "
\n

Log statistics

\n", "
",
        html.escape(stats_html_inner),
        "
\n
\n", "
\n

Recommendations

\n", recommendations_html, "
\n", "
\n

Plots

\n", ] if png_entries: parts.append("
\n") for fname, alt in png_entries: fn = html.escape(fname) parts.append( f"
" f"" f"\"{html.escape(alt)}\"" f"
\n" ) parts.append("
\n") else: parts.append(f"

{html.escape(plots_empty_note)}

\n") parts.extend( [ "
\n", "
\n

Detailed SQL reports

\n", sql_sections_html, "
\n", "
\n" "

Victim Culprit Report

\n", vc_html, "
\n", "
\n\n\n", ] ) return "".join(parts) def _normalize_quiet_cli_argv(av: list) -> None: """Map ``FILE.sql``, ``list``, legacy ``query``, and short aliases to internal subcommand names.""" if not av: return if _argv_is_sql_query_list(av): av[:] = ["log2sql-query-sql-list"] return if _argv_is_sql_library_query(av): av[:] = ["log2sql-query-sql"] + av return if av[0] == "query": if len(av) >= 2 and av[1] == "list": av[:] = ["log2sql-query-sql-list"] else: av[:] = ["log2sql-query-sql"] + av[1:] return dst = QUIET_CLI_SHORT_ALIASES.get(av[0]) if dst: av[0] = dst def _resolve_quiet_log_source_path(lt: str, log_base: str) -> Optional[str]: """Find an existing log file on disk for ``log2sql`` input (absolute path).""" candidates = [] if lt: candidates.append(lt) candidates.append(os.path.expanduser(lt)) if log_base: candidates.append(os.path.join(os.getcwd(), log_base)) candidates.append(log_base) if lt.lower().endswith(".db"): stem = lt[:-3] candidates.append(stem) candidates.append(os.path.expanduser(stem)) candidates.append(os.path.join(os.getcwd(), os.path.basename(stem))) seen = set() for c in candidates: if not c or c in seen: continue seen.add(c) if os.path.isfile(c): return os.path.abspath(c) return None def _configure_log2sql_quiet_paths( log_token: str, ) -> Tuple[str, str, str, str, str]: """Return ``(lt, db_path, diag_dir, log_base, db_name)`` for quiet log2sql layout.""" lt = log_token.strip() base = os.path.basename(lt.rstrip("/")) if base.lower().endswith(".db"): db_name = base log_base = base[:-3] else: db_name = base + ".db" log_base = base db_path = os.path.abspath(os.path.join(".p4diagnostics", db_name)) diag_dir = os.path.dirname(db_path) return lt, db_path, diag_dir, log_base, db_name def _populate_log2sql_quiet_sidecars( l: "log2sql", *, log_for_obj: str, db_path: str, diag_dir: str, sidecar_base: str, ) -> None: l.logFile = log_for_obj l.databaseFile = db_path l.summaryFile = os.path.join(diag_dir, sidecar_base + ".summary" + ".txt") l.summaryFileHTML = os.path.join(diag_dir, sidecar_base + ".summary" + ".html") l.summaryQueriesFile = os.path.join(diag_dir, "queries.sql") l.logFileDetails = os.path.join(diag_dir, sidecar_base + ".stats.txt") l.errorsSummaryFile = os.path.join(diag_dir, sidecar_base + ".errors_summary") l.maxActiveThreadsFile = os.path.join(diag_dir, sidecar_base + ".active_threads") l.activeThreadsSummaryFile = os.path.join(diag_dir, sidecar_base + ".active_threads_summary") l.failuresFile = os.path.join(diag_dir, sidecar_base + ".fails") def configure_log2sql_for_quiet_db(log_token: str) -> "log2sql": """Point log2sql at ``.p4diagnostics/.db`` for a log basename or path. Matches ``setLogFile``: the DB is ``basename(logFile) + '.db'``. Do not use ``os.path.splitext`` on the basename — logs like ``log.0710.20250710_...`` have multiple dots and splitext would wrongly treat part of the name as an extension. If the database file is missing, creates it (same as ``create``). """ lt, db_path, diag_dir, log_base, _ = _configure_log2sql_quiet_paths(log_token) if not os.path.isfile(db_path): print( "p4diag: log2sql database not found; creating it ... ", file=sys.stderr, flush=True, ) return quiet_log2sql_create_database(log_token) l = log2sql(log_base) _populate_log2sql_quiet_sidecars( l, log_for_obj=log_base, db_path=db_path, diag_dir=diag_dir, sidecar_base=log_base ) return l def configure_log2sql_for_quiet_log_stats_only(log_token: str) -> "log2sql": """Set sidecar paths for a P4 server log and ``.p4diagnostics/.stats.txt``. Does not read or create a log2sql SQLite database — only the log file is required. """ lt, db_path, diag_dir, log_base, _ = _configure_log2sql_quiet_paths(log_token) os.makedirs(diag_dir, exist_ok=True) log_src = _resolve_quiet_log_source_path(lt, log_base) if not log_src: print( f"Could not find log file for {log_token!r}.\n" "Expected a readable P4 server log (basename or path).", file=sys.stderr, ) sys.exit(2) l = log2sql(log_src) _populate_log2sql_quiet_sidecars( l, log_for_obj=log_src, db_path=db_path, diag_dir=diag_dir, sidecar_base=log_base, ) return l def quiet_log2sql_create_database(log_token: str) -> "log2sql": """Ensure ``.p4diagnostics/.db`` exists, creating it with ``log2sql`` if needed.""" lt, db_path, diag_dir, log_base, _ = _configure_log2sql_quiet_paths(log_token) os.makedirs(diag_dir, exist_ok=True) if os.path.isfile(db_path): l = log2sql(log_base) _populate_log2sql_quiet_sidecars( l, log_for_obj=log_base, db_path=db_path, diag_dir=diag_dir, sidecar_base=log_base ) print(f"log2sql database (already present): {db_path}", flush=True) return l log_src = _resolve_quiet_log_source_path(lt, log_base) if not log_src: print( f"log2sql database not found: {db_path}\n" f"Could not find log file {log_base!r} (or path {lt!r}) to build it.", file=sys.stderr, ) sys.exit(2) sidecar_base = os.path.basename(log_src) l = log2sql(log_src) _populate_log2sql_quiet_sidecars( l, log_for_obj=log_src, db_path=db_path, diag_dir=diag_dir, sidecar_base=sidecar_base, ) l.createDatabase() if not os.path.isfile(db_path): print( f"log2sql did not create database at {db_path} (see {LOG_FILE or 'log2sql output'}).", file=sys.stderr, ) sys.exit(1) print(f"log2sql database written: {db_path}", flush=True) return l def ensure_help_schema_sample_database() -> "log2sql": """Materialize ``_HELP_SCHEMA_SAMPLE_LOG_TEXT`` and an SQLite DB via ``log2sql`` for ``schema`` output.""" digest = hashlib.sha256(_HELP_SCHEMA_SAMPLE_LOG_TEXT.encode("utf-8")).hexdigest() diag_dir = os.path.abspath(".p4diagnostics") os.makedirs(diag_dir, exist_ok=True) log_path = os.path.join(diag_dir, "_schema_help_sample.log") db_path = os.path.join(diag_dir, "_schema_help_sample.db") stamp_path = os.path.join(diag_dir, "_schema_help_sample.sha256") try: with open(log_path, "w", encoding="utf-8", newline="\n") as fh: fh.write(_HELP_SCHEMA_SAMPLE_LOG_TEXT) except OSError as e: print(f"p4diag: schema: cannot write sample log {log_path}: {e}", file=sys.stderr) sys.exit(1) need_build = True try: if os.path.isfile(stamp_path) and os.path.isfile(db_path): with open(stamp_path, encoding="utf-8") as sf: prev = sf.read().strip() need_build = prev != digest except OSError: need_build = True if need_build: try: if os.path.isfile(db_path): os.remove(db_path) except OSError as e: print(f"p4diag: schema: cannot refresh sample database: {e}", file=sys.stderr) sys.exit(1) lo = log2sql(log_path) lo.logFile = os.path.abspath(log_path) lo.databaseFile = db_path lo.createDatabase() if not os.path.isfile(db_path): print( "p4diag: schema: log2sql failed to build sample database " f"(see {LOG_FILE or 'log2sql output'}): {db_path}", file=sys.stderr, ) sys.exit(1) try: with open(stamp_path, "w", encoding="utf-8") as sf: sf.write(digest + "\n") except OSError: pass lo = log2sql(log_path) lo.logFile = os.path.abspath(log_path) lo.databaseFile = db_path return lo # SQL for DB/incoming/running plots (minute-rounded times via substr(..., 1, 16) where applicable). _SQL_PLOT_DBWAIT = ( "SELECT substr(startTime, 1, 16) AS t, " "round((SUM(totalReadWait) + SUM(totalWriteWait)) / 1000) AS wait_s " "FROM tableUse JOIN process USING (processKey) " "WHERE cmd NOT IN ('user-sync', 'user-transmit') " "GROUP BY substr(startTime, 1, 16) ORDER BY t" ) _SQL_PLOT_INCOMING = ( "SELECT substr(startTime, 1, 16) AS minute, COUNT(*) AS n " "FROM process GROUP BY minute ORDER BY minute" ) _SQL_PLOT_RUNNING = "SELECT endTime, MIN(running) AS r FROM process GROUP BY endTime ORDER BY endTime" def _terminal_width_for_plots() -> int: try: return max(40, os.get_terminal_size().columns) except OSError: try: return max(40, int(os.environ.get("COLUMNS", "80"))) except ValueError: return 80 def _parse_plot_time(s: str) -> Optional[datetime]: s = (s or "").strip().strip('"') for fmt in ("%Y/%m/%d %H:%M:%S", "%Y/%m/%d %H:%M"): try: return datetime.strptime(s, fmt) except ValueError: continue return None def _xtics_step_seconds(start: datetime, end: datetime, target_ticks: int = 10) -> int: """Seconds between major x tics; ~``target_ticks`` across the span, rounded to a readable step.""" span = max(1, int((end - start).total_seconds())) raw = max(60, span // max(1, target_ticks)) # Round up to a human-friendly interval so date labels do not pile up (e.g. dbWaitTime). nices = ( 60, 120, 180, 300, 600, 900, 1800, 3600, 7200, 10800, 14400, 21600, 43200, 86400, 172800, 604800, 2592000, ) for n in nices: if n >= raw: return n days = (raw + 86399) // 86400 return max(86400, days * 86400) def _collect_active_threads_csv_rows(log_path: str, pid_token: str) -> List[Tuple[str, int]]: """Lines matching plot_p4_active_commands.sh: grep 'active threads' | grep PID; awk $1,$2,$10.""" rows: List[Tuple[str, int]] = [] tok = (pid_token or "active").strip() with open(log_path, "r", encoding="utf-8", errors="replace") as fh: for line in fh: if "active threads" not in line: continue if tok not in line: continue parts = line.split() if len(parts) < 10: continue dt = f"{parts[0]} {parts[1]}" try: n = int(parts[9]) except ValueError: continue rows.append((dt, n)) return rows def _sqlite_plot_rows(db_path: str, sql: str) -> List[Tuple]: conn = sqlite3.connect(db_path) try: cur = conn.execute(sql) return [tuple(r) for r in cur.fetchall()] finally: conn.close() def _rows_with_leading_date(rows: List[Tuple]) -> List[Tuple]: out: List[Tuple] = [] for r in rows: if not r or r[0] is None: continue if re.match(r"^\d{4}/", str(r[0]).strip()): out.append(r) return out def _write_csv_two_cols(path: str, rows: List[Tuple]) -> None: with open(path, "w", encoding="utf-8") as fh: for r in rows: if len(r) < 2: continue a, b = r[0], r[1] fh.write(f"{a},{b}\n") def _run_gnuplot_script(script: str) -> Tuple[int, str, str]: gnuplot = shutil.which("gnuplot") if not gnuplot: return 127, "", "gnuplot not found in PATH" with tempfile.NamedTemporaryFile(mode="w", suffix=".gp", delete=False, encoding="utf-8") as tf: tf.write(script) gp_path = tf.name try: proc = s.run([gnuplot, gp_path], capture_output=True, text=True, timeout=120) return proc.returncode, proc.stdout, proc.stderr or "" finally: try: os.unlink(gp_path) except OSError: pass def _gnuplot_active_threads_ascii( csv_path: str, term_width: int, warn: int, crit: int, start_t: str, end_t: str, xtics_step: int, ) -> str: """plot_p4_active_commands.sh-style data; colored thresholds like the user's dumb example.""" esc = csv_path.replace("\\", "/").replace("'", "''") st = start_t.replace("'", "''") en = end_t.replace("'", "''") rc, out, err = _run_gnuplot_script( "\n".join( [ 'set title "P4 Commands Active Threads"', "set xdata time", 'set timefmt "%Y/%m/%d %H:%M:%S"', 'set xrange ["{}":"{}"]'.format(st, en), 'set format x "%m/%d %H:%M"', "set xtics {}".format(xtics_step), "set term dumb ansi size {}, 30".format(term_width), "set grid xtics ytics", "set datafile separator ','", "plot '{}' using 1:($2 < {} ? $2 : 1/0) title 'Normal' with impulses lc 2, \\" .format(esc, warn), " '{}' using 1:($2 >= {} && $2 < {} ? $2 : 1/0) title 'Warning' with impulses lc 3, \\" .format(esc, warn, crit), " '{}' using 1:($2 >= {} ? $2 : 1/0) title 'Critical' with impulses lc 1".format( esc, crit ), ] ) ) if rc != 0: return "(gnuplot failed: {})\n".format((err or "").strip() or rc) return out def _gnuplot_simple_impulse_ascii( title: str, csv_path: str, term_width: int, timefmt: str, y_label: str, xtics_step: int, start_t: str, end_t: str, ) -> str: cpath = csv_path.replace("\\", "/").replace("'", "''") st = start_t.replace("'", "''") en = end_t.replace("'", "''") xfmt = "%m/%d" if xtics_step >= 86400 else "%m/%d %H:%M" rc, out, err = _run_gnuplot_script( "\n".join( [ 'set title "{}"'.format(title.replace('"', '\\"')), "set xdata time", 'set timefmt "{}"'.format(timefmt), 'set xrange ["{}":"{}"]'.format(st, en), 'set format x "{}"'.format(xfmt), "set xtics {}".format(xtics_step), "set term dumb ansi size {}, 30".format(term_width), "set grid xtics ytics", "set datafile separator ','", "plot '{}' using 1:2 title '{}' with impulses linewidth 1 lc rgb 'blue'".format( cpath, y_label.replace("'", "''") ), ] ) ) if rc != 0: return "(gnuplot failed: {})\n".format((err or "").strip() or rc) return out def _gnuplot_active_threads_png( csv_path: str, out_png: str, start_t: str, end_t: str, xtics_step: int, ) -> Optional[str]: """Same style as plot_p4_active_commands.sh: single blue impulse, 2048x420 PNG.""" esc = csv_path.replace("\\", "/").replace("'", "''") st = start_t.replace("'", "''") en = end_t.replace("'", "''") outp = out_png.replace("\\", "/").replace("'", "''") rc, _out, err = _run_gnuplot_script( "\n".join( [ 'set title "Commands Active"', "set xdata time", 'set timefmt "%Y/%m/%d %H:%M:%S"', 'set xrange ["{}":"{}"]'.format(st, en), 'set format x "%m/%d %H:%M"', "set xtics {}".format(xtics_step), "set grid xtics ytics", "set term png size 2048,420", 'set output "{}"'.format(outp), "set datafile separator ','", "plot '{}' using 1:2 title 'P4 Commands Active Threads' with impulses linewidth 1 lc rgb 'blue'".format( esc ), "unset output", ] ) ) if rc != 0: return (err or "").strip() or str(rc) return None def _gnuplot_simple_impulse_png( title: str, csv_path: str, out_png: str, timefmt: str, y_label: str, xtics_step: int, start_t: str, end_t: str, ) -> Optional[str]: """Same style as plot_p4_dbwaittime / incoming / running .sh: blue impulses, 2048x420 PNG.""" cpath = csv_path.replace("\\", "/").replace("'", "''") st = start_t.replace("'", "''") en = end_t.replace("'", "''") outp = out_png.replace("\\", "/").replace("'", "''") # Shorter labels when tics are sparse (days); rotate so long labels do not overlap. xfmt = "%m/%d" if xtics_step >= 86400 else "%m/%d %H:%M" rc, _out, err = _run_gnuplot_script( "\n".join( [ 'set title "{}"'.format(title.replace('"', '\\"')), "set xdata time", 'set timefmt "{}"'.format(timefmt), 'set xrange ["{}":"{}"]'.format(st, en), 'set format x "{}"'.format(xfmt), "set xtics {} rotate by -45".format(xtics_step), "set bmargin 6", "set grid xtics ytics", "set term png size 2048,480", 'set output "{}"'.format(outp), "set datafile separator ','", "plot '{}' using 1:2 title '{}' with impulses linewidth 1 lc rgb 'blue'".format( cpath, y_label.replace("'", "''") ), "unset output", ] ) ) if rc != 0: return (err or "").strip() or str(rc) return None def write_p4_plot_pngs( log_path: str, db_path: str, diag_dir: str, *, pid_filter: str = "active", warn: int = 20, crit: int = 50, skip_existing: bool = True, ) -> List[str]: """Write summary PNGs under ``diag_dir`` (embedded gnuplot; same output names as legacy plot_p4_*.sh).""" if not shutil.which("gnuplot"): return [] log_path = os.path.abspath(log_path) db_path = os.path.abspath(db_path) diag_dir = os.path.abspath(diag_dir) os.makedirs(diag_dir, exist_ok=True) db_bn = os.path.basename(db_path) log_bn = os.path.basename(log_path) png_written: List[str] = [] def _maybe_skip(name: str) -> bool: if not skip_existing: return False out = os.path.join(diag_dir, name) return os.path.isfile(out) and os.path.getsize(out) > 0 with tempfile.TemporaryDirectory(prefix="p4diag_plots_") as tmp: active_png_name = f"Active.{log_bn}.png" if os.path.isfile(log_path) and not _maybe_skip(active_png_name): active_rows = _collect_active_threads_csv_rows(log_path, pid_filter) if active_rows: active_csv = os.path.join(tmp, "Active.csv") _write_csv_two_cols(active_csv, active_rows) t0 = _parse_plot_time(active_rows[0][0]) t1 = _parse_plot_time(active_rows[-1][0]) xstep = ( max(1, int((t1 - t0).total_seconds() // 10)) if t0 and t1 else 60 ) active_png = os.path.join(diag_dir, active_png_name) pe = _gnuplot_active_threads_png( active_csv, active_png, active_rows[0][0], active_rows[-1][0], xstep, ) if pe: print( f"p4diag: could not write {active_png}: {pe}", file=sys.stderr, flush=True, ) elif os.path.isfile(active_png) and os.path.getsize(active_png) > 0: png_written.append(os.path.abspath(active_png)) def _sql_png( csv_name: str, sql: str, g_title: str, y_lab: str, minute_fmt: bool, png_basename: str, ) -> None: if _maybe_skip(png_basename): return try: raw = _sqlite_plot_rows(db_path, sql) except sqlite3.Error as e: print( f"p4diag: plot {png_basename}: SQL error: {e}", file=sys.stderr, flush=True, ) return rows = _rows_with_leading_date(raw) if not rows: return path = os.path.join(tmp, csv_name) _write_csv_two_cols(path, rows) t0 = _parse_plot_time(str(rows[0][0])) t1 = _parse_plot_time(str(rows[-1][0])) if not t0 or not t1: return tfmt = "%Y/%m/%d %H:%M" if minute_fmt else "%Y/%m/%d %H:%M:%S" step = _xtics_step_seconds(t0, t1) st = str(rows[0][0]).strip().strip('"') en = str(rows[-1][0]).strip().strip('"') out_png = os.path.join(diag_dir, png_basename) pe = _gnuplot_simple_impulse_png( g_title, path, out_png, tfmt, y_lab, step, st, en ) if pe: print( f"p4diag: could not write {out_png}: {pe}", file=sys.stderr, flush=True, ) elif os.path.isfile(out_png) and os.path.getsize(out_png) > 0: png_written.append(os.path.abspath(out_png)) if os.path.isfile(db_path): _sql_png( "dbWaitTime.csv", _SQL_PLOT_DBWAIT, "Commands dbWaitTime", "P4 Commands DB Wait Time in seconds", True, f"dbWaitTime.{db_bn}.png", ) _sql_png( "Incoming.csv", _SQL_PLOT_INCOMING, "Commands Incoming per minute", "P4 Commands Incoming", True, f"Incoming.{db_bn}.png", ) _sql_png( "Running.csv", _SQL_PLOT_RUNNING, "Commands Running", "P4 Commands Running", False, f"Running.{db_bn}.png", ) return png_written def format_log2sql_ascii_plots( log_path: str, db_path: str, *, pid_filter: str = "active", warn: int = 20, crit: int = 50, term_width: Optional[int] = None, ) -> str: """Return gnuplot ``dumb`` ASCII plot blocks for embedding in text reports.""" if not shutil.which("gnuplot"): return "" w = term_width if term_width is not None else _terminal_width_for_plots() log_path = os.path.abspath(log_path) db_path = os.path.abspath(db_path) chunks: List[str] = [] with tempfile.TemporaryDirectory(prefix="p4diag_plots_") as tmp: active_csv = os.path.join(tmp, "Active.csv") if os.path.isfile(log_path): active_rows = _collect_active_threads_csv_rows(log_path, pid_filter) if active_rows: _write_csv_two_cols(active_csv, active_rows) t0 = _parse_plot_time(active_rows[0][0]) t1 = _parse_plot_time(active_rows[-1][0]) xstep = ( max(1, int((t1 - t0).total_seconds() // 10)) if t0 and t1 else 60 ) chunks.append("=== Commands Active (from log) ===\n") chunks.append( _gnuplot_active_threads_ascii( active_csv, w, warn, crit, active_rows[0][0], active_rows[-1][0], xstep, ).rstrip() + "\n" ) else: chunks.append( "=== Commands Active (from log) ===\n(no lines matching 'active threads'" + (f" and {pid_filter!r}" if pid_filter else "") + ")\n" ) else: chunks.append( "=== Commands Active (from log) ===\n" f"(log file not readable: {log_path})\n" ) def _sql_block( section_title: str, csv_name: str, sql: str, g_title: str, y_lab: str, minute_fmt: bool, ) -> None: if not os.path.isfile(db_path): chunks.append(f"=== {section_title} ===\n(database not found: {db_path})\n") return try: raw = _sqlite_plot_rows(db_path, sql) except sqlite3.Error as e: chunks.append(f"=== {section_title} ===\n(SQL error: {e})\n") return rows = _rows_with_leading_date(raw) if not rows: chunks.append(f"=== {section_title} ===\n(no data)\n") return path = os.path.join(tmp, csv_name) _write_csv_two_cols(path, rows) t0 = _parse_plot_time(str(rows[0][0])) t1 = _parse_plot_time(str(rows[-1][0])) if not t0 or not t1: chunks.append(f"=== {section_title} ===\n(could not parse time column)\n") return tfmt = "%Y/%m/%d %H:%M" if minute_fmt else "%Y/%m/%d %H:%M:%S" step = _xtics_step_seconds(t0, t1) st = str(rows[0][0]).strip().strip('"') en = str(rows[-1][0]).strip().strip('"') chunks.append(f"=== {section_title} ===\n") chunks.append( _gnuplot_simple_impulse_ascii( g_title, path, w, tfmt, y_lab, step, st, en, ).rstrip() + "\n" ) _sql_block( "DB wait time", "dbWaitTime.csv", _SQL_PLOT_DBWAIT, "Commands dbWaitTime", "P4 Commands DB Wait Time in seconds", True, ) _sql_block( "Incoming commands", "Incoming.csv", _SQL_PLOT_INCOMING, "Commands Incoming per minute", "P4 Commands Incoming", True, ) _sql_block( "Running commands", "Running.csv", _SQL_PLOT_RUNNING, "Commands Running", "P4 Commands Running", False, ) return "".join(chunks) def _build_summary_plots_section(diag_dir: str, png_files: List[str]) -> str: """PNG file list for the plain-text summary (ASCII plots: ``p4diag plots``).""" plot_lines = [] for fname in png_files: full_path = os.path.join(diag_dir, fname) if os.path.isfile(full_path) and os.path.getsize(full_path) > 0: plot_lines.append(f" {fname}\n") if plot_lines: return f"Directory: {diag_dir}\n" + "".join(plot_lines) return " (no PNG files yet — install gnuplot and run log2sql or plots)\n" def run_log2sql_plots( *, log_token: str, lq: Any, pid_filter: str, warn: int, crit: int, term_width: Optional[int], ) -> Tuple[str, List[str]]: """Build ASCII plots (gnuplot dumb) and write PNGs under ``.p4diagnostics/`` (same names as createPlots).""" lt, _db_path, _diag, log_base, _ = _configure_log2sql_quiet_paths(log_token) log_src = _resolve_quiet_log_source_path(lt, log_base) if not log_src or not os.path.isfile(log_src): cand = os.path.abspath(lq.logFile) if os.path.isfile(cand): log_src = cand db_path = os.path.abspath(lq.databaseFile) diag_dir = os.path.dirname(db_path) os.makedirs(diag_dir, exist_ok=True) log_file = log_src or os.path.abspath(lq.logFile) ascii_text = format_log2sql_ascii_plots( log_file, db_path, pid_filter=pid_filter, warn=warn, crit=crit, term_width=term_width, ) png_written = write_p4_plot_pngs( log_file, db_path, diag_dir, pid_filter=pid_filter, warn=warn, crit=crit, skip_existing=False, ) return ascii_text, png_written def run_quiet_subcommand(argv: list) -> None: """Non-interactive CLI (``-q`` or bare subcommand; no prompts).""" global LOG_FILE av = list(argv) _normalize_quiet_cli_argv(av) p = argparse.ArgumentParser( prog="p4diag", description=( "Non-interactive mode (no menu, prompts, or web UI). " "Invoke as p4diag SUBCOMMAND ... (with or without -q). " "Subcommands: trim, stats, log2sql, summary, schema, plots; " "or p4diag FILE.sql [LOG], p4diag list. See: p4diag -h" ), ) sub = p.add_subparsers(dest="qcmd", required=True, metavar="SUBCOMMAND") p_trim = sub.add_parser( "trim", help="Trim a P4 server log to --start / --end (writes segment file).", ) p_trim.add_argument("logfile", help="Path to source log file") p_trim.add_argument("--start", required=True, type=parse_datetime, metavar="TIME") p_trim.add_argument("--end", required=True, type=parse_datetime, metavar="TIME") def _add_log2sql_db_arg(sp): sp.add_argument( "log", nargs="?", default="log", help="Basename for .p4diagnostics/.db (default: log)", ) def _add_log2sql_query_sql(sp_name: str, help_txt: str) -> None: sp = sub.add_parser( sp_name, help=help_txt, ) sp.add_argument( "sqlfile", help="SQL script: basename uses P4DIAG_SQL_QUERIES/NAME.sql; absolute path runs that file", ) sp.add_argument( "log", nargs="?", default="log", help="Basename for .p4diagnostics/.db (default: log)", ) sp.add_argument( "pid", nargs="?", help=( "For probe_pid*.sql: process PID (@pid). " "For locks_held_total.sql: optional table name (@table). " "For locks_table_by_cmd.sql: table name (@table). " "For locks_all_duration.sql: duration in ms (@duration)." ), ) sp.add_argument( "start", nargs="?", help="For probe_pid*.sql: startTime (@start), e.g. '2026/06/02 12:53:21'", ) _add_log2sql_query_sql( "log2sql-query-sql", "Run NAME.sql with sqlite3 .read on .p4diagnostics/.db.", ) sub.add_parser( "log2sql-query-sql-list", help=f"List .sql query files in {p4diag_sql_queries_dir()} (basename per line).", ) p_sch = sub.add_parser( "log2sql-schema", help="Print process/tableUse columns using a built-in sample trace + SQLite DB.", ) p_stats = sub.add_parser( "stats", help=( "Write log statistics only (.p4diagnostics/.stats.txt) from the P4 server log; " "no SQLite DB required. Run this before summary when generating a log summary." ), ) _add_log2sql_db_arg(p_stats) # Subcommand details: set ``formatter_class=argparse.RawDescriptionHelpFormatter`` and a # multi-line ``description=`` so ``p4diag -h`` preserves notes / blank lines. p_plots = sub.add_parser( "log2sql-plots", formatter_class=argparse.RawDescriptionHelpFormatter, help=( "ASCII + PNG command-activity plots (gnuplot); use -h for what each graph shows." ), description=( "\tGraph active threads based on a log\n" "\tGraph db WaitTime based on log.db\n" "\tGraph incoming commands based on log.db\n" "\tGraph running commands based on log.db\n" "\n" "Also prints ASCII plots to stdout (gnuplot dumb) and writes PNGs under .p4diagnostics/\n" "(Active..png, dbWaitTime..png, Incoming..png, Running..png).\n" "Creates log.db if missing. Requires gnuplot in PATH." ), ) _add_log2sql_db_arg(p_plots) p_plots.add_argument( "--pid", default="active", help=( "Active-threads line filter: substring (default: active). " "Pass a parent P4D PID to restrict to one server when logs are shared." ), ) p_plots.add_argument( "--warn", type=int, default=20, metavar="N", help="Active plot: warn threshold in thread count (default: 20).", ) p_plots.add_argument( "--crit", type=int, default=50, metavar="N", help="Active plot: critical threshold in thread count (default: 50).", ) p_plots.add_argument( "--width", type=int, default=None, metavar="COLS", help="gnuplot dumb width in columns (default: terminal width or COLUMNS).", ) p_l2create = sub.add_parser( "log2sql", help="Build .p4diagnostics/.db from the source log if missing.", ) _add_log2sql_db_arg(p_l2create) p_sum_auto = sub.add_parser( "summary", formatter_class=argparse.RawDescriptionHelpFormatter, help="Write LOG.summary.txt and LOG.summary.html under .p4diagnostics/.", description=( "Write summary text/HTML under .p4diagnostics/. " "Creates the SQLite DB if missing (log2sql) and log statistics if missing (stats)." ), ) p_sum_auto.add_argument( "log", help="P4 server log path or basename for .p4diagnostics/.db", ) args = p.parse_args(av) if args.qcmd == "trim": lo = log2sql(args.logfile) lo.databaseFile = args.logfile lo.trim_log(args.start, args.end) return if args.qcmd == "summary": path_tok = args.log lq = configure_log2sql_for_quiet_db(path_tok) if not os.path.isfile(lq.logFileDetails): print( "p4diag: log statistics not found; building them " "(same as stats) …", file=sys.stderr, flush=True, ) lq.createLogStats(force_regenerate=False) if not os.path.isfile(lq.logFileDetails): print( f"p4diag: log statistics not present: {lq.logFileDetails}\n" "Could not build statistics — is the P4 server log readable?\n" f" Log path in use: {lq.logFile!r}\n" f"Try: p4diag stats {path_tok!r}", file=sys.stderr, ) sys.exit(2) lq.createLogSummary(force_regenerate=False) return if args.qcmd == "log2sql": quiet_log2sql_create_database(args.log) return if args.qcmd == "stats": lq = configure_log2sql_for_quiet_log_stats_only(args.log) rebuilt = lq.createLogStats(force_regenerate=False) if rebuilt: msg = f"Wrote log statistics: {lq.logFileDetails}" else: msg = f"Log statistics up to date (log unchanged): {lq.logFileDetails}" print(msg, file=sys.stderr, flush=True) with open(lq.logFileDetails, "r", encoding="utf-8", errors="replace") as fh: sys.stdout.write(fh.read()) return if args.qcmd == "log2sql-query-sql-list": quiet_list_sql_queries() return if args.qcmd == "log2sql-schema": lq = ensure_help_schema_sample_database() with quiet_stderr_activity("Reading schema..."): lq.schema_tables_pretty() return if args.qcmd == "log2sql-plots": if not shutil.which("gnuplot"): print( "p4diag: log2sql-plots requires gnuplot in PATH.", file=sys.stderr, ) sys.exit(2) lq = configure_log2sql_for_quiet_db(args.log) with quiet_stderr_activity("ASCII + PNG plots (gnuplot)..."): text, png_written = run_log2sql_plots( log_token=args.log, lq=lq, pid_filter=args.pid, warn=args.warn, crit=args.crit, term_width=args.width, ) sys.stdout.write(text) for p in png_written: print(f"p4diag: wrote PNG: {p}", file=sys.stderr, flush=True) return if args.qcmd == "log2sql-query-sql": lq = configure_log2sql_for_quiet_db(args.log) lq.execute_sql_file_noninteractive( args.sqlfile, pid=args.pid, start=args.start, ) return p.error(f"Unhandled quiet command: {args.qcmd}") def run_interactive_auto_detect( av: list, *, case_dir: str, export_dir: str, port: int, ) -> None: """Interactive TTY menu for a P4 server log.""" global QUIET QUIET = False parser = argparse.ArgumentParser( prog="p4diag", description="Interactive mode (TTY menu): PATH is a Perforce server log.", ) parser.add_argument( "path", help="Server log file (basename or path)", ) parser.add_argument( "--start", type=parse_datetime, metavar="TIME", default=None, help="Start time (YYYY/MM/DD HH:MM:SS); must be paired with --end", ) parser.add_argument( "--end", type=parse_datetime, metavar="TIME", default=None, help="End time (YYYY/MM/DD HH:MM:SS); must be paired with --start", ) args = parser.parse_args(av) if (args.start is None) != (args.end is None): parser.error("--start and --end must be provided together") _preflight_log2sql_log_exists(args.path, args.start, args.end) if not QUIET: start_web_server(export_dir, port, log_token=args.path) os.chdir(case_dir) handle_log2sql(args.path, args.start, args.end) def _root_help_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="p4diag", usage="p4diag [-h] [-q] PATH [--start TIME --end TIME] ...", description=P4DIAG_DESCRIPTION, epilog=P4DIAG_EPILOG, formatter_class=argparse.RawTextHelpFormatter, ) parser.add_argument( "-q", "--quiet", action="store_true", help="Use quiet CLI (optional if you invoke a quiet subcommand directly, e.g. log2sql).", ) return parser def main(): global l, port, QUIET, LOG_FILE _p4diag_interactive_setup() case_dir = os.getcwd() s.run(["mkdir", "-p", ".p4diagnostics"]) port = 8000 + os.getuid() % 1000 export_dir = os.path.join(os.getcwd(), ".p4diagnostics") LOG_FILE = os.path.join(".p4diagnostics", "p4diag.log") av = sys.argv[1:] wants_help = "-h" in av or "--help" in av if wants_help and not (av and av[0] not in ("-h", "--help") and _argv_invokes_quiet_cli(av)): _root_help_parser().print_help() return _require_python_sqlite3() if av and _argv_invokes_quiet_cli(av): QUIET = not (len(av) == 1 and av[0] == "stats") os.chdir(case_dir) run_quiet_subcommand(av) return if av and av[0] in ("-q", "--quiet"): QUIET = True rest = av[1:] if not rest: print( "p4diag -q: missing subcommand. Try: log2sql, stats, summary, " "plots, schema, trim, list, or FILE.sql [LOG].", file=sys.stderr, ) sys.exit(2) os.chdir(case_dir) run_quiet_subcommand(rest) return if av and not av[0].startswith("-"): run_interactive_auto_detect(av, case_dir=case_dir, export_dir=export_dir, port=port) return print("p4diag: missing LOG path (Perforce server log).", file=sys.stderr) _root_help_parser().print_help() sys.exit(2) if __name__ == "__main__": main()