#!/usr/bin/env python3 """ Perforce P4LOG Analyzer A comprehensive Python script for analyzing Perforce server log files (P4LOG). Based on Perforce documentation for interpreting server logs. Features: - Parse command entries with timestamps, users, PIDs, IPs, and commands - Extract performance metrics: lapse time, CPU usage, I/O, network, locks - Identify slow commands and performance bottlenecks - Generate summary reports and statistics - Export results to CSV for further analysis Usage: python p4log_analyzer.py [options] Options: --top N Show top N slowest commands (default: 20) --user USER Filter by specific user --command CMD Filter by command pattern --min-lapse SEC Only show commands taking longer than SEC seconds --export CSV Export results to CSV file --summary Show summary statistics only --locks Analyze lock wait times --verbose Show detailed parsing information """ import re import argparse import csv import sys from collections import defaultdict from datetime import datetime from dataclasses import dataclass, field from typing import Optional, List, Dict, Any @dataclass class CommandEntry: """Represents a single command entry from the P4 log.""" timestamp: Optional[datetime] = None pid: Optional[int] = None user: Optional[str] = None client: Optional[str] = None ip_address: Optional[str] = None client_info: Optional[str] = None # e.g., [p4/2018.2/LINUX26X86_64/165458] command: Optional[str] = None args: Optional[str] = None ident: Optional[str] = None # command identifier # Performance metrics lapse_seconds: float = 0.0 compute_seconds: float = 0.0 completed_seconds: float = 0.0 user_time_ms: float = 0.0 system_time_ms: float = 0.0 io_in: int = 0 io_out: int = 0 net_in: int = 0 net_out: int = 0 max_memory_kb: int = 0 page_faults: int = 0 # Memory metrics memory_cmd_mb: float = 0.0 memory_proc_mb: float = 0.0 # RPC metrics rpc_msgs_in: int = 0 rpc_msgs_out: int = 0 rpc_size_in_mb: float = 0.0 rpc_size_out_mb: float = 0.0 rpc_himarks_send: int = 0 rpc_himarks_recv: int = 0 rpc_snd_time: float = 0.0 rpc_rcv_time: float = 0.0 # Database metrics db_stats: Dict[str, Dict[str, Any]] = field(default_factory=dict) # Lock metrics (totals - accumulated across all tables) total_read_lock_wait_ms: float = 0.0 total_read_lock_held_ms: float = 0.0 total_write_lock_wait_ms: float = 0.0 total_write_lock_held_ms: float = 0.0 # Max lock metrics (single worst lock - for identifying culprits/victims) max_read_lock_wait_ms: float = 0.0 max_read_lock_held_ms: float = 0.0 max_write_lock_wait_ms: float = 0.0 max_write_lock_held_ms: float = 0.0 max_write_lock_held_table: str = "" # Which table had the longest write hold max_write_lock_wait_table: str = "" # Which table had the longest write wait # Meta lock metrics meta_read_lock_wait_ms: float = 0.0 meta_read_lock_held_ms: float = 0.0 meta_write_lock_wait_ms: float = 0.0 meta_write_lock_held_ms: float = 0.0 raw_lines: List[str] = field(default_factory=list) class P4LogParser: """Parser for Perforce P4LOG files.""" # Regex pattern for indented header lines (handles complex client info with nested brackets) # Format: 2026/01/30 18:34:43 pid 1174682 user@client IP [client info] 'command args' # Note: Client info can contain nested brackets like [script [VERSION]/v95] HEADER_PATTERN = re.compile( r'^\s*(\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2})\s+' r'pid\s+(\d+)\s+' r'(\S+)@(\S+)\s+' r'(\d+\.\d+\.\d+\.\d+)\s+' r'\[(.+)\]\s*' r"'(.+)'$" ) # Simpler pattern without client info brackets HEADER_PATTERN_SIMPLE = re.compile( r'^\s*(\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2})\s+' r'pid\s+(\d+)\s+' r'(\S+)@(\S+)\s+' r'(\d+\.\d+\.\d+\.\d+)\s*' r"'(.+)'$" ) # Compute/completed line pattern: pid 1174682 compute end .010s 2+0us 0+0io 0+0net 10856k 0pf COMPUTE_PATTERN = re.compile( r'^\s*(\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2})\s+' r'pid\s+(\d+)\s+' r'(compute\s+end|completed)\s+' r'(\d+(?:\.\d+)?)(s|ms)\s+' r'(\d+)\+(\d+)us\s+' r'(\d+)\+(\d+)io\s+' r'(\d+)\+(\d+)net\s+' r'(\d+)k\s+' r'(\d+)pf' ) # Ident pattern: --- ident cmd/group HASH/none IDENT_PATTERN = re.compile(r'^---\s+ident\s+cmd/group\s+(\S+)/(\S+)') # Lapse time pattern: --- lapse .010s or --- lapse 109s LAPSE_PATTERN = re.compile(r'^---\s+lapse\s+(\d*\.?\d+)(s|ms)') # Usage pattern: --- usage 2+0us 0+8io 0+0net 11368k 0pf USAGE_PATTERN = re.compile( r'^---\s+usage\s+' r'(\d+)\+(\d+)us\s+' r'(\d+)\+(\d+)io\s+' r'(\d+)\+(\d+)net\s+' r'(\d+)k\s+' r'(\d+)pf' ) # Memory pattern: --- memory cmd/proc 32mb/32mb MEMORY_PATTERN = re.compile( r'^---\s+memory\s+cmd/proc\s+' r'(\d+(?:\.\d+)?)mb/(\d+(?:\.\d+)?)mb' ) # RPC pattern: --- rpc msgs/size in+out 2+3/0mb+0mb himarks 64835/130372 snd/rcv .000s/.007s RPC_PATTERN = re.compile( r'^---\s+rpc\s+msgs/size\s+in\+out\s+' r'(\d+)\+(\d+)/(\d+(?:\.\d+)?)mb\+(\d+(?:\.\d+)?)mb' r'(?:\s+himarks\s+(\d+)/(\d+))?' r'(?:\s+snd/rcv\s+(\d*\.?\d+)s/(\d*\.?\d+)s)?' ) # Meta/db lock pattern: --- meta/db(R) followed by total lock line META_DB_PATTERN = re.compile(r'^---\s+meta/db\(([RW])\)') # Database table pattern: --- db.tablename DB_TABLE_PATTERN = re.compile(r'^---\s+(db\.\w+)\s*$') # Pages pattern: --- pages in+out+cached 182412+0+64 PAGES_PATTERN = re.compile( r'^---\s+pages\s+in\+out\+cached\s+' r'(\d+)\+(\d+)\+(\d+)' ) # Locks/rows pattern: --- locks read/write 1/0 rows get+pos+scan put+del 0+1+26635 0+0 LOCKS_ROWS_PATTERN = re.compile( r'^---\s+locks\s+read/write\s+' r'(\d+)/(\d+)\s+' r'rows\s+get\+pos\+scan\s+' r'(\d+)\+(\d+)\+(\d+)\s+' r'put\+del\s+' r'(\d+)\+(\d+)' ) # Total lock pattern: --- total lock wait+held read/write 0ms+252ms/0ms+0ms TOTAL_LOCK_PATTERN = re.compile( r'^---\s+total\s+lock\s+wait\+held\s+read/write\s+' r'(\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms' ) # Max lock pattern: --- max lock wait+held read/write 0ms+1ms/0ms+0ms MAX_LOCK_PATTERN = re.compile( r'^---\s+max\s+lock\s+wait\+held\s+read/write\s+' r'(\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms' ) # Peek count pattern: --- peek count 1 wait+held total/max 0ms+0ms/0ms+0ms PEEK_PATTERN = re.compile( r'^---\s+peek\s+count\s+(\d+)\s+' r'wait\+held\s+total/max\s+' r'(\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms' ) # File totals pattern: --- filetotals (svr) send/recv files+bytes 0+0mb/0+0mb FILETOTALS_PATTERN = re.compile( r'^---\s+filetotals\s+\((\w+)\)\s+' r'send/recv\s+files\+bytes\s+' r'(\d+)\+(\d+(?:\.\d+)?)mb/(\d+)\+(\d+(?:\.\d+)?)mb' ) def __init__(self, verbose: bool = False): self.verbose = verbose self.entries: List[CommandEntry] = [] self.parse_errors: List[str] = [] def parse_file(self, filepath: str) -> List[CommandEntry]: """Parse a P4LOG file and return list of command entries.""" self.entries = [] self.parse_errors = [] # Dictionary to track commands by PID - we'll update them as we see more data commands_by_pid: Dict[int, CommandEntry] = {} current_db_table: Optional[str] = None current_pid: Optional[int] = None in_meta_section = False try: with open(filepath, 'r', encoding='utf-8', errors='replace') as f: for line_num, line in enumerate(f, 1): line = line.rstrip('\n\r') # Skip empty lines if not line.strip(): continue # Skip marker lines if line.strip() in ('Perforce server info:', 'Perforce server error:'): continue # Skip server info lines that don't have command data if line.strip().startswith('Date ') or line.strip().startswith('Operation:'): continue if line.strip().startswith('Ident:') or line.strip().startswith('Server network estimates:'): continue if 'server to client' in line or 'file(s) up-to-date' in line: continue # Strip leading whitespace for pattern matching stripped = line.lstrip() # Try to match a command header with full client info header_match = self.HEADER_PATTERN.match(stripped) if header_match: entry = self._create_entry_from_header(header_match, has_client_info=True) current_pid = entry.pid current_db_table = None in_meta_section = False # Only store if this is a new command or has lapse info if entry.pid not in commands_by_pid: commands_by_pid[entry.pid] = entry else: # Update existing entry with this header info (keeps tracking data) existing = commands_by_pid[entry.pid] if not existing.timestamp: existing.timestamp = entry.timestamp if not existing.user: existing.user = entry.user if not existing.client: existing.client = entry.client if not existing.command: existing.command = entry.command existing.args = entry.args continue # Try simpler header pattern (without brackets) header_simple = self.HEADER_PATTERN_SIMPLE.match(stripped) if header_simple: entry = self._create_entry_from_header_simple(header_simple) current_pid = entry.pid current_db_table = None in_meta_section = False if entry.pid not in commands_by_pid: commands_by_pid[entry.pid] = entry continue # Try compute/completed pattern compute_match = self.COMPUTE_PATTERN.match(stripped) if compute_match: pid = int(compute_match.group(2)) current_pid = pid if pid not in commands_by_pid: commands_by_pid[pid] = CommandEntry(pid=pid) entry = commands_by_pid[pid] time_value = float(compute_match.group(4)) time_unit = compute_match.group(5) seconds = time_value if time_unit == 's' else time_value / 1000.0 if 'compute' in compute_match.group(3): entry.compute_seconds = seconds else: entry.completed_seconds = seconds entry.user_time_ms = float(compute_match.group(6)) entry.system_time_ms = float(compute_match.group(7)) entry.io_in = int(compute_match.group(8)) entry.io_out = int(compute_match.group(9)) entry.net_in = int(compute_match.group(10)) entry.net_out = int(compute_match.group(11)) entry.max_memory_kb = int(compute_match.group(12)) entry.page_faults = int(compute_match.group(13)) continue # If we have a current PID, try to parse tracking lines if current_pid and current_pid in commands_by_pid: entry = commands_by_pid[current_pid] # Ident ident_match = self.IDENT_PATTERN.match(stripped) if ident_match: entry.ident = ident_match.group(1) continue # Meta/db section meta_match = self.META_DB_PATTERN.match(stripped) if meta_match: in_meta_section = True current_db_table = None continue # Lapse time lapse_match = self.LAPSE_PATTERN.match(stripped) if lapse_match: value = float(lapse_match.group(1)) unit = lapse_match.group(2) entry.lapse_seconds = value if unit == 's' else value / 1000.0 continue # Usage usage_match = self.USAGE_PATTERN.match(stripped) if usage_match: entry.user_time_ms = float(usage_match.group(1)) entry.system_time_ms = float(usage_match.group(2)) entry.io_in = int(usage_match.group(3)) entry.io_out = int(usage_match.group(4)) entry.net_in = int(usage_match.group(5)) entry.net_out = int(usage_match.group(6)) entry.max_memory_kb = int(usage_match.group(7)) entry.page_faults = int(usage_match.group(8)) continue # Memory memory_match = self.MEMORY_PATTERN.match(stripped) if memory_match: entry.memory_cmd_mb = float(memory_match.group(1)) entry.memory_proc_mb = float(memory_match.group(2)) continue # RPC rpc_match = self.RPC_PATTERN.match(stripped) if rpc_match: entry.rpc_msgs_in = int(rpc_match.group(1)) entry.rpc_msgs_out = int(rpc_match.group(2)) entry.rpc_size_in_mb = float(rpc_match.group(3)) entry.rpc_size_out_mb = float(rpc_match.group(4)) if rpc_match.group(5): entry.rpc_himarks_send = int(rpc_match.group(5)) entry.rpc_himarks_recv = int(rpc_match.group(6)) if rpc_match.group(7): entry.rpc_snd_time = float(rpc_match.group(7)) entry.rpc_rcv_time = float(rpc_match.group(8)) continue # Database table db_match = self.DB_TABLE_PATTERN.match(stripped) if db_match: current_db_table = db_match.group(1) in_meta_section = False if current_db_table not in entry.db_stats: entry.db_stats[current_db_table] = {} continue # Pages (associated with current db table) pages_match = self.PAGES_PATTERN.match(stripped) if pages_match and current_db_table: if current_db_table not in entry.db_stats: entry.db_stats[current_db_table] = {} entry.db_stats[current_db_table]['pages_in'] = int(pages_match.group(1)) entry.db_stats[current_db_table]['pages_out'] = int(pages_match.group(2)) entry.db_stats[current_db_table]['pages_cached'] = int(pages_match.group(3)) continue # Locks and rows locks_match = self.LOCKS_ROWS_PATTERN.match(stripped) if locks_match and current_db_table: if current_db_table not in entry.db_stats: entry.db_stats[current_db_table] = {} entry.db_stats[current_db_table]['read_locks'] = int(locks_match.group(1)) entry.db_stats[current_db_table]['write_locks'] = int(locks_match.group(2)) entry.db_stats[current_db_table]['rows_get'] = int(locks_match.group(3)) entry.db_stats[current_db_table]['rows_pos'] = int(locks_match.group(4)) entry.db_stats[current_db_table]['rows_scan'] = int(locks_match.group(5)) entry.db_stats[current_db_table]['rows_put'] = int(locks_match.group(6)) entry.db_stats[current_db_table]['rows_del'] = int(locks_match.group(7)) continue # Total lock times total_lock_match = self.TOTAL_LOCK_PATTERN.match(stripped) if total_lock_match: read_wait = float(total_lock_match.group(1)) read_held = float(total_lock_match.group(2)) write_wait = float(total_lock_match.group(3)) write_held = float(total_lock_match.group(4)) if in_meta_section: entry.meta_read_lock_wait_ms = read_wait entry.meta_read_lock_held_ms = read_held entry.meta_write_lock_wait_ms = write_wait entry.meta_write_lock_held_ms = write_held elif current_db_table: # Ensure the table exists in db_stats if current_db_table not in entry.db_stats: entry.db_stats[current_db_table] = {} entry.db_stats[current_db_table]['total_read_wait_ms'] = read_wait entry.db_stats[current_db_table]['total_read_held_ms'] = read_held entry.db_stats[current_db_table]['total_write_wait_ms'] = write_wait entry.db_stats[current_db_table]['total_write_held_ms'] = write_held # Accumulate totals for the command entry.total_read_lock_wait_ms += read_wait entry.total_read_lock_held_ms += read_held entry.total_write_lock_wait_ms += write_wait entry.total_write_lock_held_ms += write_held continue # Peek count peek_match = self.PEEK_PATTERN.match(stripped) if peek_match and current_db_table: if current_db_table not in entry.db_stats: entry.db_stats[current_db_table] = {} entry.db_stats[current_db_table]['peek_count'] = int(peek_match.group(1)) entry.db_stats[current_db_table]['peek_wait_total_ms'] = int(peek_match.group(2)) entry.db_stats[current_db_table]['peek_held_total_ms'] = int(peek_match.group(3)) entry.db_stats[current_db_table]['peek_wait_max_ms'] = int(peek_match.group(4)) entry.db_stats[current_db_table]['peek_held_max_ms'] = int(peek_match.group(5)) continue # Max lock times (individual worst lock per table) max_lock_match = self.MAX_LOCK_PATTERN.match(stripped) if max_lock_match and current_db_table: if current_db_table not in entry.db_stats: entry.db_stats[current_db_table] = {} max_read_wait = float(max_lock_match.group(1)) max_read_held = float(max_lock_match.group(2)) max_write_wait = float(max_lock_match.group(3)) max_write_held = float(max_lock_match.group(4)) entry.db_stats[current_db_table]['max_read_wait_ms'] = max_read_wait entry.db_stats[current_db_table]['max_read_held_ms'] = max_read_held entry.db_stats[current_db_table]['max_write_wait_ms'] = max_write_wait entry.db_stats[current_db_table]['max_write_held_ms'] = max_write_held # Track the worst individual locks across all tables if max_read_wait > entry.max_read_lock_wait_ms: entry.max_read_lock_wait_ms = max_read_wait if max_read_held > entry.max_read_lock_held_ms: entry.max_read_lock_held_ms = max_read_held if max_write_wait > entry.max_write_lock_wait_ms: entry.max_write_lock_wait_ms = max_write_wait entry.max_write_lock_wait_table = current_db_table if max_write_held > entry.max_write_lock_held_ms: entry.max_write_lock_held_ms = max_write_held entry.max_write_lock_held_table = current_db_table continue # Collect all entries that have useful data (lapse time indicates a completed command) for pid, entry in commands_by_pid.items(): if entry.lapse_seconds > 0 or entry.completed_seconds > 0: # Use completed_seconds if lapse not set if entry.lapse_seconds == 0 and entry.completed_seconds > 0: entry.lapse_seconds = entry.completed_seconds if entry.command: # Only include if we have a command self.entries.append(entry) except IOError as e: self.parse_errors.append(f"Error reading file: {e}") if self.verbose: print(f"Parsed {len(self.entries)} command entries") if self.parse_errors: print(f"Encountered {len(self.parse_errors)} errors") return self.entries def _create_entry_from_header(self, match, has_client_info: bool = True) -> CommandEntry: """Create a CommandEntry from a regex match.""" entry = CommandEntry() entry.timestamp = datetime.strptime(match.group(1), '%Y/%m/%d %H:%M:%S') entry.pid = int(match.group(2)) entry.user = match.group(3) entry.client = match.group(4) entry.ip_address = match.group(5) if has_client_info: entry.client_info = match.group(6) full_command = match.group(7) else: full_command = match.group(6) parts = full_command.split(None, 1) entry.command = parts[0] entry.args = full_command return entry def _create_entry_from_header_simple(self, match) -> CommandEntry: """Create a CommandEntry from the simple header pattern.""" return self._create_entry_from_header(match, has_client_info=False) class P4LogAnalyzer: """Analyzer for parsed P4LOG entries.""" def __init__(self, entries: List[CommandEntry]): self.entries = entries def filter_entries( self, user: Optional[str] = None, command: Optional[str] = None, min_lapse: Optional[float] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None ) -> List[CommandEntry]: """Filter entries based on criteria.""" filtered = self.entries if user: filtered = [e for e in filtered if e.user and user.lower() in e.user.lower()] if command: filtered = [e for e in filtered if e.command and command.lower() in e.command.lower()] if min_lapse is not None: filtered = [e for e in filtered if e.lapse_seconds >= min_lapse] if start_time: filtered = [e for e in filtered if e.timestamp and e.timestamp >= start_time] if end_time: filtered = [e for e in filtered if e.timestamp and e.timestamp <= end_time] return filtered def get_slowest_commands(self, n: int = 20) -> List[CommandEntry]: """Get the N slowest commands by lapse time.""" return sorted(self.entries, key=lambda e: e.lapse_seconds, reverse=True)[:n] def get_commands_with_long_lock_waits( self, min_wait_ms: float = 10000 ) -> List[CommandEntry]: """Get commands that waited a long time for locks.""" return [ e for e in self.entries if (e.total_read_lock_wait_ms + e.total_write_lock_wait_ms) >= min_wait_ms ] def get_commands_with_high_io(self, min_io_blocks: int = 10000) -> List[CommandEntry]: """Get commands with high I/O.""" return [e for e in self.entries if (e.io_in + e.io_out) >= min_io_blocks] def get_write_lock_culprits(self, min_held_ms: float = 1000) -> List[CommandEntry]: """ Get commands that held write locks for a long time (potential culprits). These are commands that may have blocked other processes. Only uses MAX lock times, and validates that lock time doesn't exceed lapse time. """ culprits = [] for e in self.entries: # Sanity check: lock held time cannot reasonably exceed command lapse time # If it does, the data is likely from 'total lock' not 'max lock' max_reasonable_lock_ms = e.lapse_seconds * 1000 if e.lapse_seconds > 0 else float('inf') if e.max_write_lock_held_ms >= min_held_ms and e.max_write_lock_held_ms <= max_reasonable_lock_ms: culprits.append(e) return sorted(culprits, key=lambda e: e.max_write_lock_held_ms, reverse=True) def get_write_lock_victims(self, min_wait_ms: float = 1000) -> List[CommandEntry]: """ Get commands that waited a long time for write locks (victims). These commands were blocked by other processes holding locks. Only uses MAX lock times, and validates that lock time doesn't exceed lapse time. """ victims = [] for e in self.entries: # Sanity check: lock wait time cannot reasonably exceed command lapse time max_reasonable_lock_ms = e.lapse_seconds * 1000 if e.lapse_seconds > 0 else float('inf') if e.max_write_lock_wait_ms >= min_wait_ms and e.max_write_lock_wait_ms <= max_reasonable_lock_ms: victims.append(e) return sorted(victims, key=lambda e: e.max_write_lock_wait_ms, reverse=True) def find_culprit_for_victim( self, victim: CommandEntry, time_window_seconds: float = 60 ) -> List[CommandEntry]: """ Given a victim (command that waited for a lock), find potential culprits. Culprits are commands that: 1. Held a write lock on the same table the victim was waiting for 2. Were running at or before the victim's timestamp Only uses MAX lock times (single lock acquisition), not TOTAL times. """ if not victim.timestamp or not victim.max_write_lock_wait_table: return [] table = victim.max_write_lock_wait_table culprits = [] for e in self.entries: if e.pid == victim.pid: # Skip the victim itself continue if not e.timestamp: continue # Check if this command was running before or during the victim time_diff = (victim.timestamp - e.timestamp).total_seconds() if time_diff < -time_window_seconds or time_diff > time_window_seconds: continue # Outside time window # Check if this command held a write lock on the same table # ONLY use max_write_held_ms - do not fall back to total if table in e.db_stats: held_ms = e.db_stats[table].get('max_write_held_ms', 0) if held_ms > 0: culprits.append((e, held_ms, time_diff)) # Sort by held time (most likely culprit first) culprits.sort(key=lambda x: x[1], reverse=True) return culprits def analyze_lock_contention(self, min_wait_ms: float = 1000) -> Dict[str, Any]: """ Comprehensive lock contention analysis. Returns victims, culprits, and table-level statistics. IMPORTANT: Only uses MAX lock times (single worst lock), not TOTAL lock times. Total lock times are accumulated across multiple lock acquisitions and would give misleading results (e.g., a read-only command showing high "held" times). """ analysis = { 'victims': [], 'culprits': [], 'tables_with_contention': defaultdict(lambda: { 'total_wait_ms': 0, 'total_held_ms': 0, 'max_wait_ms': 0, 'max_held_ms': 0, 'victim_count': 0, 'culprit_count': 0 }) } # Find all victims and culprits using ONLY max lock values for e in self.entries: # Check each table for this command for table, stats in e.db_stats.items(): # ONLY use max values - these represent single lock acquisitions # Do NOT fall back to total values as they accumulate across multiple locks max_write_wait = stats.get('max_write_wait_ms', 0) max_write_held = stats.get('max_write_held_ms', 0) if max_write_wait >= min_wait_ms: analysis['tables_with_contention'][table]['total_wait_ms'] += max_write_wait analysis['tables_with_contention'][table]['victim_count'] += 1 if max_write_wait > analysis['tables_with_contention'][table]['max_wait_ms']: analysis['tables_with_contention'][table]['max_wait_ms'] = max_write_wait if max_write_held >= min_wait_ms: analysis['tables_with_contention'][table]['total_held_ms'] += max_write_held analysis['tables_with_contention'][table]['culprit_count'] += 1 if max_write_held > analysis['tables_with_contention'][table]['max_held_ms']: analysis['tables_with_contention'][table]['max_held_ms'] = max_write_held analysis['victims'] = self.get_write_lock_victims(min_wait_ms) analysis['culprits'] = self.get_write_lock_culprits(min_wait_ms) return analysis def get_summary_statistics(self) -> Dict[str, Any]: """Generate summary statistics for all entries.""" if not self.entries: return {} stats = { 'total_commands': len(self.entries), 'unique_users': len(set(e.user for e in self.entries if e.user)), 'unique_clients': len(set(e.client for e in self.entries if e.client)), 'unique_ips': len(set(e.ip_address for e in self.entries if e.ip_address)), } # Time range timestamps = [e.timestamp for e in self.entries if e.timestamp] if timestamps: stats['start_time'] = min(timestamps) stats['end_time'] = max(timestamps) stats['duration'] = stats['end_time'] - stats['start_time'] # Lapse time statistics lapse_times = [e.lapse_seconds for e in self.entries] if lapse_times: stats['total_lapse_seconds'] = sum(lapse_times) stats['avg_lapse_seconds'] = stats['total_lapse_seconds'] / len(lapse_times) stats['max_lapse_seconds'] = max(lapse_times) stats['min_lapse_seconds'] = min(lapse_times) # Command breakdown command_counts = defaultdict(int) command_lapse = defaultdict(float) for e in self.entries: if e.command: cmd = e.command.replace('user-', '') command_counts[cmd] += 1 command_lapse[cmd] += e.lapse_seconds stats['command_counts'] = dict(command_counts) stats['command_total_lapse'] = dict(command_lapse) # User breakdown user_counts = defaultdict(int) user_lapse = defaultdict(float) for e in self.entries: if e.user: user_counts[e.user] += 1 user_lapse[e.user] += e.lapse_seconds stats['user_counts'] = dict(user_counts) stats['user_total_lapse'] = dict(user_lapse) # Network statistics stats['total_rpc_in_mb'] = sum(e.rpc_size_in_mb for e in self.entries) stats['total_rpc_out_mb'] = sum(e.rpc_size_out_mb for e in self.entries) return stats def get_user_statistics(self) -> List[Dict[str, Any]]: """Get per-user statistics.""" user_data = defaultdict(lambda: { 'count': 0, 'total_lapse': 0.0, 'commands': defaultdict(int) }) for e in self.entries: if e.user: user_data[e.user]['count'] += 1 user_data[e.user]['total_lapse'] += e.lapse_seconds if e.command: user_data[e.user]['commands'][e.command] += 1 result = [] for user, data in user_data.items(): result.append({ 'user': user, 'command_count': data['count'], 'total_lapse_seconds': data['total_lapse'], 'avg_lapse_seconds': data['total_lapse'] / data['count'] if data['count'] > 0 else 0, 'top_commands': dict(sorted( data['commands'].items(), key=lambda x: x[1], reverse=True )[:5]) }) return sorted(result, key=lambda x: x['total_lapse_seconds'], reverse=True) def get_hourly_distribution(self) -> Dict[int, Dict[str, Any]]: """Get command distribution by hour.""" hourly = defaultdict(lambda: {'count': 0, 'total_lapse': 0.0}) for e in self.entries: if e.timestamp: hour = e.timestamp.hour hourly[hour]['count'] += 1 hourly[hour]['total_lapse'] += e.lapse_seconds return dict(sorted(hourly.items())) class ReportGenerator: """Generate reports from analysis results.""" def __init__(self, analyzer: P4LogAnalyzer): self.analyzer = analyzer def print_summary(self): """Print a summary report to stdout.""" stats = self.analyzer.get_summary_statistics() if not stats: print("No data to analyze.") return print("\n" + "=" * 70) print("PERFORCE LOG ANALYSIS SUMMARY") print("=" * 70) print(f"\nTotal Commands: {stats['total_commands']:,}") print(f"Unique Users: {stats['unique_users']:,}") print(f"Unique Clients: {stats['unique_clients']:,}") print(f"Unique IPs: {stats['unique_ips']:,}") if 'start_time' in stats: print(f"\nTime Range: {stats['start_time']} to {stats['end_time']}") print(f"Duration: {stats['duration']}") print(f"\nLapse Time Statistics:") print(f" Total: {stats.get('total_lapse_seconds', 0):,.2f} seconds") print(f" Average: {stats.get('avg_lapse_seconds', 0):.3f} seconds") print(f" Maximum: {stats.get('max_lapse_seconds', 0):,.2f} seconds") print(f" Minimum: {stats.get('min_lapse_seconds', 0):.3f} seconds") print(f"\nNetwork Transfer:") print(f" Total Received: {stats.get('total_rpc_in_mb', 0):,.2f} MB") print(f" Total Sent: {stats.get('total_rpc_out_mb', 0):,.2f} MB") # Top commands by count if 'command_counts' in stats: print("\n" + "-" * 50) print("TOP 10 COMMANDS BY COUNT") print("-" * 50) sorted_cmds = sorted( stats['command_counts'].items(), key=lambda x: x[1], reverse=True )[:10] for cmd, count in sorted_cmds: lapse = stats['command_total_lapse'].get(cmd, 0) print(f" {cmd:30s} {count:8,d} calls {lapse:10,.2f}s total") # Top users by lapse time if 'user_total_lapse' in stats: print("\n" + "-" * 50) print("TOP 10 USERS BY TOTAL LAPSE TIME") print("-" * 50) sorted_users = sorted( stats['user_total_lapse'].items(), key=lambda x: x[1], reverse=True )[:10] for user, lapse in sorted_users: count = stats['user_counts'].get(user, 0) print(f" {user:30s} {lapse:10,.2f}s total ({count:,d} commands)") def print_slowest_commands(self, n: int = 20): """Print the slowest commands.""" slowest = self.analyzer.get_slowest_commands(n) print("\n" + "=" * 70) print(f"TOP {n} SLOWEST COMMANDS") print("=" * 70) for i, entry in enumerate(slowest, 1): print(f"\n{i}. [{entry.lapse_seconds:,.2f}s] {entry.command}") print(f" Time: {entry.timestamp}") print(f" User: {entry.user}@{entry.client}") print(f" PID: {entry.pid}, IP: {entry.ip_address}") print(f" Args: {entry.args[:80]}..." if len(entry.args or '') > 80 else f" Args: {entry.args}") # Show max lock info (individual locks, not totals) lock_info = [] if entry.max_read_lock_wait_ms > 0: lock_info.append(f"ReadWait={entry.max_read_lock_wait_ms:.0f}ms") if entry.max_write_lock_wait_ms > 0: lock_info.append(f"WriteWait={entry.max_write_lock_wait_ms:.0f}ms on {entry.max_write_lock_wait_table}") if entry.max_write_lock_held_ms > 0: lock_info.append(f"WriteHeld={entry.max_write_lock_held_ms:.0f}ms on {entry.max_write_lock_held_table}") if lock_info: print(f" Locks: {', '.join(lock_info)}") def print_lock_analysis(self, min_wait_ms: float = 1000): """Print comprehensive lock contention analysis identifying culprits and victims.""" analysis = self.analyzer.analyze_lock_contention(min_wait_ms) print("\n" + "=" * 80) print("WRITE LOCK CONTENTION ANALYSIS") print("=" * 80) print(f"\nThreshold: {min_wait_ms}ms (using MAX lock times only, not totals)") print("\nCULPRIT = Command holding a write lock (blocking others)") print("VICTIM = Command waiting for a write lock (blocked by others)") print("\nTo find who blocked a victim, look for culprits on the SAME TABLE") print("that were running AT OR BEFORE the victim's timestamp.") # Tables with contention if analysis['tables_with_contention']: print("\n" + "-" * 80) print("TABLES WITH WRITE LOCK CONTENTION") print("-" * 80) print(f"{'Table':<25} {'Max Wait':<12} {'Max Held':<12} {'Victims':<10} {'Culprits':<10}") print("-" * 80) sorted_tables = sorted( analysis['tables_with_contention'].items(), key=lambda x: x[1]['max_wait_ms'] + x[1]['max_held_ms'], reverse=True ) for table, stats in sorted_tables[:20]: if stats['max_wait_ms'] > 0 or stats['max_held_ms'] > 0: print(f"{table:<25} {stats['max_wait_ms']:>8.0f}ms {stats['max_held_ms']:>8.0f}ms " f"{stats['victim_count']:>6} {stats['culprit_count']:>6}") # Culprits (commands holding write locks - blocking others) print("\n" + "-" * 80) print("CULPRITS - Commands HOLDING write locks (blocking other processes)") print("-" * 80) if not analysis['culprits']: print(f"\nNo commands found holding write locks >= {min_wait_ms}ms") else: for i, entry in enumerate(analysis['culprits'][:30], 1): print(f"\n{i}. HELD {entry.max_write_lock_held_ms:,.0f}ms on {entry.max_write_lock_held_table}") print(f" Time: {entry.timestamp} PID: {entry.pid}") print(f" User: {entry.user}@{entry.client}") print(f" Command: {entry.command}") print(f" Args: {entry.args[:100]}{'...' if len(entry.args or '') > 100 else ''}") print(f" Lapse: {entry.lapse_seconds:.3f}s") # Show all tables where this command held write locks (using MAX values only) tables_held = [] for table, stats in entry.db_stats.items(): held = stats.get('max_write_held_ms', 0) if held > 0: tables_held.append((table, held)) if tables_held: tables_held.sort(key=lambda x: x[1], reverse=True) print(f" Write locks held: ", end="") print(", ".join(f"{t}={h:.0f}ms" for t, h in tables_held[:5])) # Victims (commands waiting for write locks - blocked by others) print("\n" + "-" * 80) print("VICTIMS - Commands WAITING for write locks (blocked by other processes)") print("-" * 80) if not analysis['victims']: print(f"\nNo commands found waiting for write locks >= {min_wait_ms}ms") else: for i, entry in enumerate(analysis['victims'][:30], 1): print(f"\n{i}. WAITED {entry.max_write_lock_wait_ms:,.0f}ms for {entry.max_write_lock_wait_table}") print(f" Time: {entry.timestamp} PID: {entry.pid}") print(f" User: {entry.user}@{entry.client}") print(f" Command: {entry.command}") print(f" Args: {entry.args[:100]}{'...' if len(entry.args or '') > 100 else ''}") print(f" Lapse: {entry.lapse_seconds:.3f}s") # Try to find potential culprits potential_culprits = self.analyzer.find_culprit_for_victim(entry) if potential_culprits: print(f" >>> POTENTIAL CULPRITS (held write lock on {entry.max_write_lock_wait_table}):") for culprit, held_ms, time_diff in potential_culprits[:3]: direction = "before" if time_diff > 0 else "after" print(f" - PID {culprit.pid} ({culprit.user}) held {held_ms:.0f}ms, " f"{abs(time_diff):.1f}s {direction}") print(f" Command: {culprit.command} {(culprit.args or '').split()[1] if culprit.args and len(culprit.args.split()) > 1 else ''}") # Summary print("\n" + "-" * 80) print("SUMMARY") print("-" * 80) print(f"Total culprits (holding write locks >= {min_wait_ms}ms): {len(analysis['culprits'])}") print(f"Total victims (waiting for write locks >= {min_wait_ms}ms): {len(analysis['victims'])}") if analysis['culprits']: worst_culprit = analysis['culprits'][0] print(f"\nWorst culprit: PID {worst_culprit.pid} ({worst_culprit.user})") print(f" Held write lock for {worst_culprit.max_write_lock_held_ms:,.0f}ms on {worst_culprit.max_write_lock_held_table}") if analysis['victims']: worst_victim = analysis['victims'][0] print(f"\nWorst victim: PID {worst_victim.pid} ({worst_victim.user})") print(f" Waited {worst_victim.max_write_lock_wait_ms:,.0f}ms for {worst_victim.max_write_lock_wait_table}") def print_hourly_distribution(self): """Print hourly command distribution.""" hourly = self.analyzer.get_hourly_distribution() print("\n" + "=" * 70) print("HOURLY COMMAND DISTRIBUTION") print("=" * 70) max_count = max(h['count'] for h in hourly.values()) if hourly else 0 for hour in range(24): data = hourly.get(hour, {'count': 0, 'total_lapse': 0}) bar_len = int(40 * data['count'] / max_count) if max_count > 0 else 0 bar = '#' * bar_len print(f" {hour:02d}:00 {data['count']:6,d} {bar}") def export_to_csv(self, filepath: str): """Export analysis results to CSV.""" with open(filepath, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) # Write header writer.writerow([ 'timestamp', 'pid', 'user', 'client', 'ip_address', 'command', 'args', 'lapse_seconds', 'user_time_ms', 'system_time_ms', 'io_in', 'io_out', 'net_in', 'net_out', 'max_memory_kb', 'page_faults', 'memory_cmd_mb', 'memory_proc_mb', 'rpc_msgs_in', 'rpc_msgs_out', 'rpc_size_in_mb', 'rpc_size_out_mb', 'rpc_snd_time', 'rpc_rcv_time', 'total_read_lock_wait_ms', 'total_read_lock_held_ms', 'total_write_lock_wait_ms', 'total_write_lock_held_ms', 'meta_read_lock_wait_ms', 'meta_read_lock_held_ms', 'meta_write_lock_wait_ms', 'meta_write_lock_held_ms' ]) # Write data for entry in self.analyzer.entries: writer.writerow([ entry.timestamp.isoformat() if entry.timestamp else '', entry.pid or '', entry.user or '', entry.client or '', entry.ip_address or '', entry.command or '', entry.args or '', entry.lapse_seconds, entry.user_time_ms, entry.system_time_ms, entry.io_in, entry.io_out, entry.net_in, entry.net_out, entry.max_memory_kb, entry.page_faults, entry.memory_cmd_mb, entry.memory_proc_mb, entry.rpc_msgs_in, entry.rpc_msgs_out, entry.rpc_size_in_mb, entry.rpc_size_out_mb, entry.rpc_snd_time, entry.rpc_rcv_time, entry.total_read_lock_wait_ms, entry.total_read_lock_held_ms, entry.total_write_lock_wait_ms, entry.total_write_lock_held_ms, entry.meta_read_lock_wait_ms, entry.meta_read_lock_held_ms, entry.meta_write_lock_wait_ms, entry.meta_write_lock_held_ms ]) print(f"\nExported {len(self.analyzer.entries)} entries to {filepath}") def main(): parser = argparse.ArgumentParser( description='Analyze Perforce P4LOG files', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s server.log # Basic analysis %(prog)s server.log --top 50 # Show top 50 slowest commands %(prog)s server.log --user john # Filter by user %(prog)s server.log --min-lapse 10 # Only commands > 10 seconds %(prog)s server.log --export results.csv # Export to CSV %(prog)s server.log --locks # Show lock analysis """ ) parser.add_argument('logfile', help='Path to P4LOG file to analyze') parser.add_argument('--top', type=int, default=20, help='Show top N slowest commands (default: 20)') parser.add_argument('--user', help='Filter by user name (partial match)') parser.add_argument('--command', help='Filter by command name (partial match)') parser.add_argument('--min-lapse', type=float, help='Only show commands taking longer than this many seconds') parser.add_argument('--export', metavar='CSV', help='Export results to CSV file') parser.add_argument('--summary', action='store_true', help='Show summary statistics only') parser.add_argument('--hourly', action='store_true', help='Show hourly distribution') parser.add_argument('--verbose', '-v', action='store_true', help='Show verbose parsing information') parser.add_argument('--lock-threshold', type=float, default=1000, help='Lock wait/held threshold in ms for culprit/victim analysis (default: 1000)') args = parser.parse_args() # Parse the log file print(f"Parsing {args.logfile}...") parser_obj = P4LogParser(verbose=args.verbose) entries = parser_obj.parse_file(args.logfile) if not entries: print("No command entries found in log file.") if parser_obj.parse_errors: print("\nParse errors:") for err in parser_obj.parse_errors[:10]: print(f" - {err}") sys.exit(1) print(f"Found {len(entries)} command entries.") # Create analyzer analyzer = P4LogAnalyzer(entries) # Apply filters if args.user or args.command or args.min_lapse: filtered = analyzer.filter_entries( user=args.user, command=args.command, min_lapse=args.min_lapse ) print(f"After filtering: {len(filtered)} entries") analyzer = P4LogAnalyzer(filtered) # Generate reports report = ReportGenerator(analyzer) # Summary is always shown unless only exporting if not args.export or args.summary: report.print_summary() # Show slowest commands unless summary-only if not args.summary: report.print_slowest_commands(args.top) # Lock analysis always runs report.print_lock_analysis(args.lock_threshold) # Hourly distribution if args.hourly: report.print_hourly_distribution() # Export to CSV if args.export: report.export_to_csv(args.export) if __name__ == '__main__': main()