#!/usr/bin/env python3
"""
Perforce Lock Blocker Finder
Given a Perforce server log (P4LOG with server=3 tracking) and a slow
command, this script identifies what other command(s) held the locks
that caused the slow command to wait.
It traces the full blocking chain: if the blocker was itself blocked,
it continues up the chain to find the root cause.
Usage:
python find_blocker.py <logfile> --pid <PID> --timestamp <YYYY/MM/DD HH:MM:SS>
python find_blocker.py <logfile> --completed "2026/04/01 11:44:28 pid 7432 completed 454s"
Examples:
python find_blocker.py server.log --pid 7432 --timestamp "2026/04/01 11:44:28"
python find_blocker.py server.log --pid 7432 --start-time "2026/04/01 11:36:53"
"""
import argparse
import re
import sys
import time
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set, Tuple
# ---------------------------------------------------------------------------
# Regex patterns for P4LOG parsing
# ---------------------------------------------------------------------------
RE_HEADER = re.compile(
r'^\s*(\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2})\s+'
r'pid\s+(\d+)\s+'
r'(\S+?)@(\S+)\s+'
r'([\d.:/a-fA-F]+(?:/[\d.:/a-fA-F]+)?)\s+'
r'\[(.+?)\]\s*'
r"'(.+)'(.*)$"
)
RE_COMPLETED = re.compile(
r'^\s*(\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2})\s+'
r'pid\s+(\d+)\s+completed\s+(\d+(?:\.\d+)?)(s|ms)'
)
RE_LAPSE = re.compile(r'^---\s+lapse\s+(\d*\.?\d+)(s|ms)')
RE_MEMORY = re.compile(
r'^---\s+memory\s+cmd/proc\s+(\d+(?:\.\d+)?)mb/(\d+(?:\.\d+)?)mb'
)
RE_RPC = re.compile(
r'^---\s+rpc\s+msgs/size\s+in\+out\s+'
r'(\d+)\+(\d+)/(\d+(?:\.\d+)?)mb\+(\d+(?:\.\d+)?)mb'
r'(?:\s+himarks\s+\d+/\d+)?'
r'(?:\s+snd/rcv\s+(\d*\.?\d+)s/(\d*\.?\d+)s)?'
)
RE_DB_TABLE = re.compile(r'^---\s+(db\.\w+)\s*$')
RE_PAGES = re.compile(
r'^---\s+pages\s+in\+out\+cached\s+(\d+)\+(\d+)\+(\d+)'
)
RE_LOCKS_ROWS = re.compile(
r'^---\s+locks\s+read/write\s+(\d+)/(\d+)\s+'
r'rows\s+get\+pos\+scan\s+put\+del\s+'
r'(\d+)\+(\d+)\+(\d+)\s+(\d+)\+(\d+)'
)
RE_TOTAL_LOCK = re.compile(
r'^---\s+total\s+lock\s+wait\+held\s+read/write\s+'
r'(\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms'
)
RE_MAX_LOCK = re.compile(
r'^---\s+max\s+lock\s+wait\+held\s+read/write\s+'
r'(\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms'
)
RE_PEEK = re.compile(
r'^---\s+peek\s+count\s+\d+\s+'
r'wait\+held\s+total/max\s+'
r'(\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms'
)
# Virtual lock entries: --- meta/commit(W), --- clients/...(W), etc.
RE_VIRTUAL_LOCK = re.compile(
r'^---\s+(\S+)\(([RW])\)\s*$'
)
RE_FILETOTALS = re.compile(r'^---\s+filetotals')
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
@dataclass
class TableLockInfo:
"""Lock details for a single database table within a command."""
table: str = ""
read_locks: int = 0
write_locks: int = 0
rows_get: int = 0
rows_pos: int = 0
rows_scan: int = 0
rows_put: int = 0
rows_del: int = 0
pages_in: int = 0
# Total lock times from "total lock" lines
total_read_wait_ms: float = 0.0
total_read_held_ms: float = 0.0
total_write_wait_ms: float = 0.0
total_write_held_ms: float = 0.0
# Max lock times from "max lock" lines
max_read_wait_ms: float = 0.0
max_read_held_ms: float = 0.0
max_write_wait_ms: float = 0.0
max_write_held_ms: float = 0.0
# Peek times
peek_count: int = 0
peek_held_total_ms: float = 0.0
peek_held_max_ms: float = 0.0
@dataclass
class CommandInfo:
"""Full details for a parsed command."""
line_number: int = 0
timestamp: Optional[datetime] = None
end_timestamp: Optional[datetime] = None
pid: int = 0
user: str = ""
client: str = ""
ip_address: str = ""
app: str = ""
command: str = ""
args: str = ""
trigger_info: str = ""
lapse_seconds: float = 0.0
memory_cmd_mb: float = 0.0
memory_proc_mb: float = 0.0
rpc_msgs_in: int = 0
rpc_msgs_out: int = 0
rpc_size_in_mb: float = 0.0
rpc_size_out_mb: float = 0.0
rpc_snd_seconds: float = 0.0
rpc_rcv_seconds: float = 0.0
# Per-table lock details
table_locks: Dict[str, TableLockInfo] = field(default_factory=dict)
# Virtual lock entries (meta/commit, clients/..., etc.)
virtual_locks: List[Tuple[str, str, float]] = field(default_factory=list)
# (name, R/W, held_ms)
@property
def total_scan_rows(self) -> int:
return sum(
t.rows_get + t.rows_pos + t.rows_scan
for t in self.table_locks.values()
)
def get_lock_waits(self) -> List[Tuple[str, str, float]]:
"""Return list of (table, lock_type, wait_ms) for significant waits.
lock_type is 'read' or 'write'.
"""
waits = []
for table, info in self.table_locks.items():
if info.total_read_wait_ms > 0:
waits.append((table, 'read', info.total_read_wait_ms))
if info.total_write_wait_ms > 0:
waits.append((table, 'write', info.total_write_wait_ms))
return sorted(waits, key=lambda x: -x[2])
def get_lock_helds(self) -> List[Tuple[str, str, float]]:
"""Return list of (table, lock_type, held_ms) for significant holds."""
helds = []
for table, info in self.table_locks.items():
if info.total_read_held_ms > 100:
helds.append((table, 'read', info.total_read_held_ms))
if info.total_write_held_ms > 100:
helds.append((table, 'write', info.total_write_held_ms))
return sorted(helds, key=lambda x: -x[2])
# ---------------------------------------------------------------------------
# Parsing functions
# ---------------------------------------------------------------------------
def parse_timestamp(s: str) -> Optional[datetime]:
"""Parse a P4LOG timestamp."""
try:
return datetime.strptime(s.strip(), '%Y/%m/%d %H:%M:%S')
except ValueError:
return None
def find_command_block(filepath: str, target_pid: int,
target_time: Optional[datetime] = None,
start_time: Optional[datetime] = None,
quiet: bool = False) -> Optional[CommandInfo]:
"""Find and parse the full command block for a specific PID and time.
This uses a two-pass approach to handle PID reuse:
Pass 1: Find the exact "completed" or header line that anchors
the target command. For completion time matching, we look for the
"completed" line at the exact timestamp. For start time matching,
we look for a header at the exact timestamp.
Pass 2: Re-scan the file collecting ALL tracking lines for the
target PID within the command's time range (start to completion).
Returns the fully parsed CommandInfo with all table lock details.
"""
if not quiet:
print(f" Searching for pid {target_pid} ...", file=sys.stderr)
t0 = time.time()
line_count = 0
# --- Pass 1: Find the anchor point ---
# Determine the command's start_time and end_time precisely.
cmd_start: Optional[datetime] = None
cmd_end: Optional[datetime] = None
cmd_lapse: float = 0.0
cmd_args: Optional[str] = None
cmd_user: str = ""
cmd_client: str = ""
cmd_ip: str = ""
cmd_app: str = ""
anchor_line: int = 0
# Collect all header timestamps and completed timestamps for this PID
headers: List[Tuple[int, datetime, str, str, str, str, str]] = []
completions: List[Tuple[int, datetime, float]] = []
with open(filepath, 'r', encoding='utf-8', errors='replace') as fh:
for line in fh:
line_count += 1
stripped = line.strip()
if not stripped:
continue
if stripped in ('Perforce server info:', 'Perforce server error:'):
continue
m = RE_HEADER.match(stripped)
if m:
pid = int(m.group(2))
if pid == target_pid:
ts = parse_timestamp(m.group(1))
if ts:
headers.append((
line_count, ts, m.group(3), m.group(4),
m.group(5), m.group(6), m.group(7)
))
continue
m = RE_COMPLETED.match(stripped)
if m:
pid = int(m.group(2))
if pid == target_pid:
ts = parse_timestamp(m.group(1))
val = float(m.group(3))
unit = m.group(4)
lapse = val if unit == 's' else val / 1000.0
if ts:
completions.append((line_count, ts, lapse))
continue
elapsed_pass1 = time.time() - t0
if not quiet:
print(f" Pass 1: Scanned {line_count:,} lines in {elapsed_pass1:.1f}s, "
f"found {len(headers)} headers and {len(completions)} "
f"completions for pid {target_pid}", file=sys.stderr)
# Find the matching completion or header
if target_time:
# Match on completion timestamp (exact second)
for ln, ts, lapse in completions:
if ts == target_time:
cmd_end = ts
cmd_lapse = lapse
anchor_line = ln
cmd_start = ts - timedelta(seconds=lapse)
break
if not cmd_end:
# Try close match (within 2 seconds)
for ln, ts, lapse in completions:
if abs((ts - target_time).total_seconds()) <= 2:
cmd_end = ts
cmd_lapse = lapse
anchor_line = ln
cmd_start = ts - timedelta(seconds=lapse)
break
elif start_time:
# Match on header timestamp
for ln, ts, user, client, ip, app, args in headers:
if ts == start_time:
cmd_start = ts
anchor_line = ln
break
if not cmd_start:
for ln, ts, user, client, ip, app, args in headers:
if abs((ts - start_time).total_seconds()) <= 2:
cmd_start = ts
anchor_line = ln
break
if not cmd_end and not cmd_start:
if not quiet:
print(f" Could not find matching entry for pid {target_pid}",
file=sys.stderr)
return None
# Find the first header for this PID at or near the command's start time
if cmd_start:
for ln, ts, user, client, ip, app, args in headers:
if abs((ts - cmd_start).total_seconds()) <= 2:
cmd_user = user
cmd_client = client
cmd_ip = ip
cmd_app = app
cmd_args = args
if anchor_line == 0:
anchor_line = ln
break
# If we still don't have start/end, compute from what we have
if cmd_start and not cmd_end and cmd_lapse > 0:
cmd_end = cmd_start + timedelta(seconds=cmd_lapse)
if cmd_end and not cmd_start and cmd_lapse > 0:
cmd_start = cmd_end - timedelta(seconds=cmd_lapse)
if not quiet:
print(f" Found command: {cmd_user}@{cmd_client} "
f"'{(cmd_args or '')[:60]}...'", file=sys.stderr)
print(f" Start: {cmd_start}, End: {cmd_end}, "
f"Lapse: {cmd_lapse:.1f}s", file=sys.stderr)
# --- Pass 2: Collect all tracking data for this PID ---
# Define the time window for collecting tracking lines
if cmd_start and cmd_end:
collect_start = cmd_start - timedelta(minutes=1)
collect_end = cmd_end + timedelta(minutes=1)
elif cmd_start:
collect_start = cmd_start - timedelta(minutes=1)
collect_end = cmd_start + timedelta(hours=2)
elif cmd_end:
collect_start = cmd_end - timedelta(hours=2)
collect_end = cmd_end + timedelta(minutes=1)
else:
return None
cmd = CommandInfo(
pid=target_pid, line_number=anchor_line,
timestamp=cmd_start, end_timestamp=cmd_end,
lapse_seconds=cmd_lapse,
user=cmd_user, client=cmd_client, ip_address=cmd_ip,
app=cmd_app, args=cmd_args or "", command=""
)
if cmd_args:
parts = cmd_args.split(None, 1)
cmd.command = parts[0] if parts else cmd_args
current_db_table: Optional[str] = None
current_pid: Optional[int] = None
in_target_block = False
line_count = 0
t1 = time.time()
with open(filepath, 'r', encoding='utf-8', errors='replace') as fh:
for line in fh:
line_count += 1
stripped = line.strip()
if not stripped:
continue
if stripped in ('Perforce server info:', 'Perforce server error:'):
continue
# Header
m = RE_HEADER.match(stripped)
if m:
pid = int(m.group(2))
ts = parse_timestamp(m.group(1))
current_pid = pid
if pid == target_pid and ts:
if collect_start <= ts <= collect_end:
in_target_block = True
current_db_table = None
else:
in_target_block = False
else:
in_target_block = False
continue
# Completed
m = RE_COMPLETED.match(stripped)
if m:
pid = int(m.group(2))
ts = parse_timestamp(m.group(1))
current_pid = pid
if pid == target_pid and ts:
if collect_start <= ts <= collect_end:
in_target_block = True
val = float(m.group(3))
unit = m.group(4)
cmd.lapse_seconds = val if unit == 's' else val / 1000.0
cmd.end_timestamp = ts
else:
in_target_block = False
else:
in_target_block = False
continue
# Tracking lines
if not stripped.startswith('---'):
continue
if not in_target_block:
continue
# Lapse - only update if larger (the final stats block has
# the overall lapse)
m = RE_LAPSE.match(stripped)
if m:
val = float(m.group(1))
unit = m.group(2)
lapse = val if unit == 's' else val / 1000.0
if lapse > cmd.lapse_seconds:
cmd.lapse_seconds = lapse
continue
# Memory - use max across multiple phases
m = RE_MEMORY.match(stripped)
if m:
cmd.memory_cmd_mb = max(cmd.memory_cmd_mb, float(m.group(1)))
cmd.memory_proc_mb = max(cmd.memory_proc_mb, float(m.group(2)))
continue
# RPC - use max across multiple phases
m = RE_RPC.match(stripped)
if m:
cmd.rpc_msgs_in = max(cmd.rpc_msgs_in, int(m.group(1)))
cmd.rpc_msgs_out = max(cmd.rpc_msgs_out, int(m.group(2)))
cmd.rpc_size_in_mb = max(cmd.rpc_size_in_mb, float(m.group(3)))
cmd.rpc_size_out_mb = max(cmd.rpc_size_out_mb, float(m.group(4)))
if m.group(5):
cmd.rpc_snd_seconds = max(
cmd.rpc_snd_seconds, float(m.group(5)))
cmd.rpc_rcv_seconds = max(
cmd.rpc_rcv_seconds, float(m.group(6)))
continue
# Virtual lock entry (meta/commit(W), clients/...(W), etc.)
m = RE_VIRTUAL_LOCK.match(stripped)
if m:
current_db_table = None # Reset db table context
continue
# DB table header
m = RE_DB_TABLE.match(stripped)
if m:
current_db_table = m.group(1)
if current_db_table not in cmd.table_locks:
cmd.table_locks[current_db_table] = TableLockInfo(
table=current_db_table
)
continue
# Pages
m = RE_PAGES.match(stripped)
if m and current_db_table and current_db_table in cmd.table_locks:
cmd.table_locks[current_db_table].pages_in += int(m.group(1))
continue
# Locks and rows
m = RE_LOCKS_ROWS.match(stripped)
if m and current_db_table:
if current_db_table not in cmd.table_locks:
cmd.table_locks[current_db_table] = TableLockInfo(
table=current_db_table
)
info = cmd.table_locks[current_db_table]
info.read_locks += int(m.group(1))
info.write_locks += int(m.group(2))
info.rows_get += int(m.group(3))
info.rows_pos += int(m.group(4))
info.rows_scan += int(m.group(5))
info.rows_put += int(m.group(6))
info.rows_del += int(m.group(7))
continue
# Total lock times
m = RE_TOTAL_LOCK.match(stripped)
if m and current_db_table:
if current_db_table not in cmd.table_locks:
cmd.table_locks[current_db_table] = TableLockInfo(
table=current_db_table
)
info = cmd.table_locks[current_db_table]
rw = float(m.group(1))
rh = float(m.group(2))
ww = float(m.group(3))
wh = float(m.group(4))
# Use max since the same table can appear multiple times
# (different phases of a submit)
info.total_read_wait_ms = max(info.total_read_wait_ms, rw)
info.total_read_held_ms = max(info.total_read_held_ms, rh)
info.total_write_wait_ms = max(info.total_write_wait_ms, ww)
info.total_write_held_ms = max(info.total_write_held_ms, wh)
continue
# Max lock times
m = RE_MAX_LOCK.match(stripped)
if m and current_db_table:
if current_db_table not in cmd.table_locks:
cmd.table_locks[current_db_table] = TableLockInfo(
table=current_db_table
)
info = cmd.table_locks[current_db_table]
info.max_read_wait_ms = max(info.max_read_wait_ms, float(m.group(1)))
info.max_read_held_ms = max(info.max_read_held_ms, float(m.group(2)))
info.max_write_wait_ms = max(info.max_write_wait_ms, float(m.group(3)))
info.max_write_held_ms = max(info.max_write_held_ms, float(m.group(4)))
continue
# Peek
m = RE_PEEK.match(stripped)
if m and current_db_table:
if current_db_table not in cmd.table_locks:
cmd.table_locks[current_db_table] = TableLockInfo(
table=current_db_table
)
info = cmd.table_locks[current_db_table]
info.peek_count += int(stripped.split()[3]) # count from raw
info.peek_held_total_ms += float(m.group(2))
info.peek_held_max_ms = max(info.peek_held_max_ms, float(m.group(4)))
continue
if stripped.startswith('locks acquired'):
continue
elapsed = time.time() - t0
if not quiet:
print(f" Pass 2: Scanned {line_count:,} lines in "
f"{elapsed - elapsed_pass1:.1f}s", file=sys.stderr)
if not cmd.command:
return None
return cmd
def find_blockers(filepath: str, target: CommandInfo,
blocked_tables: List[Tuple[str, str, float]],
quiet: bool = False) -> List[CommandInfo]:
"""Find commands that held locks blocking the target command.
For each table the target waited on:
- If target waited for a READ lock, find commands that held WRITE locks
- If target waited for a WRITE lock, find commands that held READ or WRITE locks
We look for commands that:
1. Overlapped in time with the target
2. Held the opposing lock type on the same table
3. Had significant lock held time
"""
if not blocked_tables:
return []
# Determine the time window to search
# The target started at timestamp and ran for lapse_seconds
if target.timestamp and target.end_timestamp:
search_start = target.timestamp - timedelta(minutes=5)
search_end = target.end_timestamp + timedelta(minutes=5)
elif target.end_timestamp:
search_start = target.end_timestamp - timedelta(
seconds=target.lapse_seconds + 300
)
search_end = target.end_timestamp + timedelta(minutes=5)
else:
return []
# Tables we're interested in and what lock type to look for
search_tables: Dict[str, str] = {}
for table, wait_type, wait_ms in blocked_tables:
if wait_type == 'read':
# Target waited for read lock -> blocker held write lock
search_tables[table] = 'write'
else:
# Target waited for write lock -> blocker held read or write lock
search_tables[table] = 'any'
if not quiet:
print(f"\n Searching for blockers on: "
f"{', '.join(f'{t} ({lt} lock)' for t, lt in search_tables.items())}",
file=sys.stderr)
print(f" Time window: {search_start} to {search_end}", file=sys.stderr)
# Now scan the log for candidate blockers
candidates: List[CommandInfo] = []
by_pid: Dict[int, CommandInfo] = {}
db_table_ctx: Dict[int, str] = {}
current_pid: Optional[int] = None
line_count = 0
in_window = False
past_window = False
t0 = time.time()
def emit_candidate(pid: int):
"""Check if this PID's command is a blocker candidate and save it."""
if pid not in by_pid:
return
rec = by_pid.pop(pid)
db_table_ctx.pop(pid, None)
if rec.pid == target.pid:
return # Skip the target itself
if rec.lapse_seconds <= 0:
return
# Check if this command held locks on any of our target tables
for table, needed_lock in search_tables.items():
if table not in rec.table_locks:
continue
info = rec.table_locks[table]
is_blocker = False
if needed_lock == 'write' and info.total_write_held_ms > 100:
is_blocker = True
elif needed_lock == 'any' and (
info.total_read_held_ms > 100 or
info.total_write_held_ms > 100
):
is_blocker = True
if is_blocker:
candidates.append(rec)
return
with open(filepath, 'r', encoding='utf-8', errors='replace') as fh:
for line in fh:
line_count += 1
stripped = line.strip()
if not stripped:
continue
if stripped in ('Perforce server info:', 'Perforce server error:'):
continue
# --- Header ---
m = RE_HEADER.match(stripped)
if m:
pid = int(m.group(2))
ts = parse_timestamp(m.group(1))
current_pid = pid
# Check time window
if ts:
if ts > search_end:
past_window = True
elif ts >= search_start:
in_window = True
if past_window and pid not in by_pid:
continue
if pid in by_pid:
old = by_pid[pid]
full_cmd = m.group(7)
if old.lapse_seconds > 0 and old.args != full_cmd:
emit_candidate(pid)
else:
continue
rec = CommandInfo(pid=pid, line_number=line_count)
rec.timestamp = ts
rec.user = m.group(3)
rec.client = m.group(4)
rec.ip_address = m.group(5)
rec.app = m.group(6)
full_cmd = m.group(7)
rec.args = full_cmd
parts = full_cmd.split(None, 1)
rec.command = parts[0] if parts else full_cmd
trailing = m.group(8).strip()
if trailing:
rec.trigger_info = trailing
by_pid[pid] = rec
continue
# --- Completed ---
m = RE_COMPLETED.match(stripped)
if m:
pid = int(m.group(2))
ts = parse_timestamp(m.group(1))
current_pid = pid
if ts:
if ts > search_end:
past_window = True
elif ts >= search_start:
in_window = True
if pid not in by_pid:
by_pid[pid] = CommandInfo(pid=pid)
rec = by_pid[pid]
val = float(m.group(3))
unit = m.group(4)
rec.lapse_seconds = val if unit == 's' else val / 1000.0
rec.end_timestamp = ts
continue
# --- Tracking lines ---
if not stripped.startswith('---'):
continue
pid = current_pid
if pid is None or pid not in by_pid:
continue
rec = by_pid[pid]
m = RE_LAPSE.match(stripped)
if m:
val = float(m.group(1))
unit = m.group(2)
rec.lapse_seconds = val if unit == 's' else val / 1000.0
continue
m = RE_MEMORY.match(stripped)
if m:
rec.memory_cmd_mb = float(m.group(1))
rec.memory_proc_mb = float(m.group(2))
continue
m = RE_RPC.match(stripped)
if m:
rec.rpc_msgs_in = int(m.group(1))
rec.rpc_msgs_out = int(m.group(2))
rec.rpc_size_in_mb = float(m.group(3))
rec.rpc_size_out_mb = float(m.group(4))
if m.group(5):
rec.rpc_snd_seconds = float(m.group(5))
rec.rpc_rcv_seconds = float(m.group(6))
continue
m = RE_VIRTUAL_LOCK.match(stripped)
if m:
db_table_ctx.pop(pid, None)
continue
m = RE_DB_TABLE.match(stripped)
if m:
tbl = m.group(1)
db_table_ctx[pid] = tbl
if tbl not in rec.table_locks:
rec.table_locks[tbl] = TableLockInfo(table=tbl)
continue
current_table = db_table_ctx.get(pid)
m = RE_PAGES.match(stripped)
if m and current_table and current_table in rec.table_locks:
rec.table_locks[current_table].pages_in += int(m.group(1))
continue
m = RE_LOCKS_ROWS.match(stripped)
if m and current_table:
if current_table not in rec.table_locks:
rec.table_locks[current_table] = TableLockInfo(
table=current_table
)
info = rec.table_locks[current_table]
info.read_locks += int(m.group(1))
info.write_locks += int(m.group(2))
info.rows_get += int(m.group(3))
info.rows_pos += int(m.group(4))
info.rows_scan += int(m.group(5))
info.rows_put += int(m.group(6))
info.rows_del += int(m.group(7))
continue
m = RE_TOTAL_LOCK.match(stripped)
if m and current_table:
if current_table not in rec.table_locks:
rec.table_locks[current_table] = TableLockInfo(
table=current_table
)
info = rec.table_locks[current_table]
info.total_read_wait_ms = max(
info.total_read_wait_ms, float(m.group(1)))
info.total_read_held_ms = max(
info.total_read_held_ms, float(m.group(2)))
info.total_write_wait_ms = max(
info.total_write_wait_ms, float(m.group(3)))
info.total_write_held_ms = max(
info.total_write_held_ms, float(m.group(4)))
continue
m = RE_MAX_LOCK.match(stripped)
if m and current_table:
if current_table not in rec.table_locks:
rec.table_locks[current_table] = TableLockInfo(
table=current_table
)
info = rec.table_locks[current_table]
info.max_read_wait_ms = max(
info.max_read_wait_ms, float(m.group(1)))
info.max_read_held_ms = max(
info.max_read_held_ms, float(m.group(2)))
info.max_write_wait_ms = max(
info.max_write_wait_ms, float(m.group(3)))
info.max_write_held_ms = max(
info.max_write_held_ms, float(m.group(4)))
continue
m = RE_PEEK.match(stripped)
if m and current_table:
if current_table not in rec.table_locks:
rec.table_locks[current_table] = TableLockInfo(
table=current_table
)
info = rec.table_locks[current_table]
info.peek_held_total_ms += float(m.group(2))
info.peek_held_max_ms = max(
info.peek_held_max_ms, float(m.group(4)))
continue
if stripped.startswith('locks acquired'):
continue
# Flush remaining
for pid in list(by_pid.keys()):
emit_candidate(pid)
elapsed = time.time() - t0
if not quiet:
print(f" Scanned {line_count:,} lines in {elapsed:.1f}s, "
f"found {len(candidates)} candidates", file=sys.stderr)
return candidates
# ---------------------------------------------------------------------------
# Analysis and ranking
# ---------------------------------------------------------------------------
def rank_blockers(target: CommandInfo,
blocked_tables: List[Tuple[str, str, float]],
candidates: List[CommandInfo]) -> List[Tuple[CommandInfo, str, float]]:
"""Rank blocker candidates by how likely they are the actual blocker.
Prioritises the table with the LARGEST wait time on the target, because
that is the dominant contributor to the target's slowness.
Returns list of (command, blocking_description, score) sorted by score desc.
"""
if not blocked_tables:
return []
# Build search map: table -> (needed_lock_type, target_wait_ms)
search_tables: Dict[str, Tuple[str, float]] = {}
for table, wait_type, wait_ms in blocked_tables:
needed = 'write' if wait_type == 'read' else 'any'
# Keep the entry with the largest wait if duplicated
if table not in search_tables or wait_ms > search_tables[table][1]:
search_tables[table] = (needed, wait_ms)
# The table the target waited on the longest drives the score weighting
primary_table = blocked_tables[0][0]
primary_wait = blocked_tables[0][2]
results = []
for cand in candidates:
best_score = 0.0
best_desc = ""
best_table = ""
best_held = 0.0
for table, (needed_lock, target_wait) in search_tables.items():
if table not in cand.table_locks:
continue
info = cand.table_locks[table]
held_ms = 0.0
lock_type_str = ""
if needed_lock == 'write' and info.total_write_held_ms > 0:
held_ms = info.total_write_held_ms
lock_type_str = "WRITE"
elif needed_lock == 'any':
if info.total_write_held_ms >= info.total_read_held_ms:
held_ms = info.total_write_held_ms
lock_type_str = "WRITE"
else:
held_ms = info.total_read_held_ms
lock_type_str = "READ"
if held_ms <= 0:
continue
score = 0.0
# Weight by which table this is -- primary table gets full
# weight, secondary tables get proportionally less.
table_weight = target_wait / primary_wait if primary_wait > 0 else 1.0
# How well does the held time cover the target's wait?
if target_wait > 0:
coverage = min(held_ms / target_wait, 2.0)
score += coverage * 50 * table_weight # up to 100 pts
# Longer held = more impactful
score += min(held_ms / 1000, 50) * table_weight
# Time overlap with target
if cand.end_timestamp and target.timestamp:
if cand.end_timestamp >= target.timestamp:
score += 25 # ended after target started
cand_start = cand.timestamp
if not cand_start and cand.end_timestamp and cand.lapse_seconds:
cand_start = cand.end_timestamp - timedelta(
seconds=cand.lapse_seconds)
if cand_start and cand_start <= target.timestamp:
score += 25 # started before target
if score > best_score:
best_score = score
best_desc = (f"{lock_type_str} lock on {table} "
f"held {format_ms(held_ms)}")
best_table = table
best_held = held_ms
if best_score > 0:
results.append((cand, best_desc, best_score))
results.sort(key=lambda x: -x[2])
return results
# ---------------------------------------------------------------------------
# Pile-up detection
# ---------------------------------------------------------------------------
def detect_pileup(filepath: str, release_time: datetime,
window_seconds: int = 3,
quiet: bool = False) -> int:
"""Count how many commands completed within window_seconds of release_time."""
count = 0
window_start = release_time - timedelta(seconds=window_seconds)
window_end = release_time + timedelta(seconds=window_seconds)
with open(filepath, 'r', encoding='utf-8', errors='replace') as fh:
for line in fh:
stripped = line.strip()
m = RE_COMPLETED.match(stripped)
if m:
ts = parse_timestamp(m.group(1))
if ts and window_start <= ts <= window_end:
count += 1
return count
# ---------------------------------------------------------------------------
# Formatting helpers
# ---------------------------------------------------------------------------
def format_ms(ms: float) -> str:
"""Format milliseconds human-readably."""
if ms < 1000:
return f"{ms:.0f}ms"
elif ms < 60_000:
return f"{ms / 1000:.0f}s"
elif ms < 3_600_000:
return f"{ms / 60_000:.1f}m"
else:
return f"{ms / 3_600_000:.1f}h"
def format_count(n: int) -> str:
return f"{n:,}"
def truncate(s: str, max_len: int = 100) -> str:
if len(s) <= max_len:
return s
return s[:max_len - 3] + "..."
def short_user(cmd: CommandInfo) -> str:
"""Short display name for a command's user."""
return cmd.user.split('@')[0] if '@' in cmd.user else cmd.user
# ---------------------------------------------------------------------------
# Narrative report generation
# ---------------------------------------------------------------------------
def generate_report(target: CommandInfo,
ranked: List[Tuple[CommandInfo, str, float]],
blocked_tables: List[Tuple[str, str, float]],
root_chain: Optional[List[Tuple[CommandInfo, str]]] = None,
pileup_count: int = 0):
"""Generate a narrative-style analysis report."""
w = print # shorthand
if not ranked:
w("=" * 78)
w(" PERFORCE LOCK BLOCKER ANALYSIS")
w("=" * 78)
w(f"\n Could not identify the blocking command for pid {target.pid}.")
w(f" Possible reasons:")
w(f" - The lock holder completed before this log's time range")
w(f" - The blocking was caused by a checkpoint or journal rotation")
w(f" - The log level (server=3) may not capture enough detail")
return
top_cmd, top_desc, _ = ranked[0]
primary_table, primary_lock_type, primary_wait_ms = blocked_tables[0]
# ===== Root Cause =====
w("")
w("=" * 78)
w(" Root Cause: pid {pid} '{cmd}' blocked your command".format(
pid=top_cmd.pid, cmd=truncate(top_cmd.args, 50)))
w("=" * 78)
w(f"""
The command that caused pid {target.pid}'s '{target.command}' to take
{target.lapse_seconds:.0f} seconds was:
pid {top_cmd.pid} -- {truncate(top_cmd.args, 70)}
User: {top_cmd.user}
App: {top_cmd.app}
IP: {top_cmd.ip_address}""")
if top_cmd.timestamp:
w(f" Started: {top_cmd.timestamp}")
if top_cmd.end_timestamp:
end_str = str(top_cmd.end_timestamp)
w(f" Completed: {end_str} ({top_cmd.lapse_seconds:.0f} seconds)")
# ===== The blocking lock =====
w(f"\n The blocking lock: {primary_table}\n")
# Describe the mechanism
lock_needed = primary_lock_type.upper()
opposing = "WRITE" if primary_lock_type == 'read' else "READ or WRITE"
# Find the blocker's held time on the primary table
blocker_held_ms = 0.0
blocker_lock_type = ""
if primary_table in top_cmd.table_locks:
info = top_cmd.table_locks[primary_table]
if info.total_write_held_ms > info.total_read_held_ms:
blocker_held_ms = info.total_write_held_ms
blocker_lock_type = "write"
else:
blocker_held_ms = info.total_read_held_ms
blocker_lock_type = "read"
if blocker_held_ms > 0:
w(f" pid {top_cmd.pid}'s '{top_cmd.command}' held a "
f"{blocker_lock_type.upper()} lock on {primary_table} for "
f"{format_ms(blocker_held_ms)} (~{blocker_held_ms/1000:.0f}s).")
w(f"\n Your pid {target.pid} needed a {lock_needed} lock on "
f"{primary_table} and waited {format_ms(primary_wait_ms)} "
f"(~{primary_wait_ms/1000:.0f}s) for it — blocked by pid "
f"{top_cmd.pid}'s {blocker_lock_type} lock.")
# ===== Why was the blocker slow? =====
blocker_waits = top_cmd.get_lock_waits()
significant_waits = [(t, lt, w) for t, lt, w in blocker_waits
if w > 10000]
blocker_helds = top_cmd.get_lock_helds()
if significant_waits or blocker_helds:
w(f"\n Why did pid {top_cmd.pid}'s {top_cmd.command} take "
f"{top_cmd.lapse_seconds:.0f} seconds?\n")
if blocker_helds:
w(f" It held locks on:")
for table, lt, hms in blocker_helds:
info = top_cmd.table_locks.get(table)
scan = (info.rows_get + info.rows_pos + info.rows_scan
if info else 0)
extra = ""
if scan > 0:
extra = f" scanning {format_count(scan)} rows"
w(f" - {table}: {lt.upper()} held "
f"{format_ms(hms)}{extra}")
if significant_waits:
w(f"\n It was itself blocked waiting for locks:")
for table, lt, wms in significant_waits:
w(f" - {table}: waited {format_ms(wms)} for "
f"{lt.upper()} lock")
w(f"\n This means pid {top_cmd.pid} was not inherently slow — "
f"it was waiting for locks the whole time.")
# ===== Chain of blocking =====
w("\n\n Chain of blocking\n")
# Build the chain
chain_items = [] # list of (cmd, wait_desc, held_desc)
# Root cause (if --chain found one)
if root_chain:
for rc_cmd, rc_desc in root_chain:
rc_helds = rc_cmd.get_lock_helds()
rc_held_desc = ""
if rc_helds:
t, lt, hms = rc_helds[0]
info = rc_cmd.table_locks.get(t)
scan = (info.rows_get + info.rows_pos + info.rows_scan
if info else 0)
scan_str = f" (scanned {format_count(scan)} rows!)" if scan > 100000 else ""
rc_held_desc = (f"held {lt.upper()} lock on {t} for "
f"{format_ms(hms)}{scan_str}")
chain_items.append((rc_cmd, "", rc_held_desc))
# The blocker
blocker_wait_str = ""
if significant_waits:
t, lt, wms = significant_waits[0]
blocker_wait_str = (f"which waited {format_ms(wms)} for "
f"{lt.upper()} lock on {t}")
blocker_held_str = ""
if blocker_held_ms > 0:
blocker_held_str = (f"then held {blocker_lock_type.upper()} lock on "
f"{primary_table} for {format_ms(blocker_held_ms)}")
chain_items.append((top_cmd, blocker_wait_str, blocker_held_str))
# The target
target_wait_str = (f"which waited {format_ms(primary_wait_ms)} for "
f"{lock_needed} lock on {primary_table}")
chain_items.append((target, target_wait_str, ""))
# Print the chain as a tree
for i, (cmd, wait_str, held_str) in enumerate(chain_items):
indent = " " + " " * i
arrow = ""
if i > 0:
prev_indent = " " + " " * (i - 1)
w(f"{prev_indent} |")
w(f"{prev_indent} +-- blocked -->")
label = f"pid {cmd.pid} ({short_user(cmd)}, {truncate(cmd.args, 55)})"
w(f"{indent}{label}")
if wait_str:
w(f"{indent} {wait_str}")
if held_str:
w(f"{indent} {held_str}")
# ===== Time breakdown =====
w(f"\n\n Time breakdown for pid {target.pid} "
f"({target.lapse_seconds:.0f}s total)\n")
total_ms = target.lapse_seconds * 1000
rpc_ms = (target.rpc_snd_seconds + target.rpc_rcv_seconds) * 1000
total_wait_ms = sum(wms for _, _, wms in blocked_tables)
# Don't double count — lock waits and RPC can overlap
accounted = max(rpc_ms, total_wait_ms)
other_ms = max(0, total_ms - accounted)
components = []
if rpc_ms > 1000:
rpc_desc = "RPC receive"
if target.rpc_size_in_mb > 1:
rpc_desc += f" ({target.rpc_size_in_mb:.0f}MB from edge)"
components.append((rpc_desc, rpc_ms))
if total_wait_ms > 1000:
components.append(
(f"Waiting for {primary_table} {lock_needed} lock",
total_wait_ms))
if other_ms > 1000:
components.append(("Actual processing", other_ms))
w(f" {'Component':<45s} {'Duration':>10s}")
w(f" {'-'*45} {'-'*10}")
for desc, ms in components:
w(f" {desc:<45s} ~{format_ms(ms):>8s}")
w(f" {'TOTAL':<45s} {format_ms(total_ms):>9s}")
# ===== Impact (pile-up) =====
if pileup_count > 5:
release_ts = top_cmd.end_timestamp or target.end_timestamp
w(f"\n\n Impact\n")
w(f" This created a massive pile-up — {pileup_count} commands all "
f"completed simultaneously")
if release_ts:
w(f" at {release_ts} when pid {top_cmd.pid}'s locks "
f"were released.")
# ===== Recommendations =====
w(f"\n\n Recommendations\n")
rec_num = 1
# Check root chain for the ultimate cause
root = root_chain[-1][0] if root_chain else top_cmd
# Wildcard pattern detection
if '...' in root.args and root.total_scan_rows > 1_000_000:
w(f" {rec_num}. The '{root.command}' command by {root.user} "
f"is the root cause — scanning {format_count(root.total_scan_rows)} "
f"rows")
# Try to extract the depot path
parts = root.args.split()
for p in parts:
if p.startswith('//') or p.startswith('...'):
w(f" using broad wildcard path: {truncate(p, 80)}")
break
w(f" Consider restricting broad '...' wildcard usage or "
f"setting group limits")
w(f" (MaxScanRows, MaxLockTime) to prevent these queries "
f"from impacting the server.")
rec_num += 1
# Memory-intensive commands
if root.memory_cmd_mb > 2000:
w(f" {rec_num}. The command consumed {root.memory_cmd_mb:.0f}MB "
f"of memory — consider MaxMemory limits.")
rec_num += 1
# Submit was blocked (not inherently slow)
if top_cmd.command in ('dm-CommitSubmit', 'user-submit') and significant_waits:
w(f" {rec_num}. The submit (pid {top_cmd.pid}) itself was fast "
f"— it was just waiting for locks the whole time.")
w(f" The fix is to address the root cause that held the "
f"locks, not the submit.")
rec_num += 1
# Long-running fstat/sync blocking submits
if (root.command in ('user-fstat', 'user-files', 'user-sync',
'user-changes')
and top_cmd.command in ('dm-CommitSubmit', 'user-submit')):
w(f" {rec_num}. Long-running read operations "
f"('{root.command}') are blocking submits.")
w(f" This is a common pattern where broad read queries hold "
f"shared locks that prevent")
w(f" exclusive write locks needed by submits. Solutions:")
w(f" - Set MaxScanRows limits to prevent unbounded scans")
w(f" - Review the client application "
f"({root.app}) for optimization")
w(f" - Consider edge servers to offload read traffic")
rec_num += 1
if rec_num == 1:
w(f" 1. Review the blocking command (pid {top_cmd.pid}) "
f"to determine if it can be optimized or scheduled "
f"during off-peak hours.")
w("")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description='Find what command blocked a slow Perforce command.',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Find blocker by PID and completion timestamp
python find_blocker.py server.log --pid 7432 --timestamp "2026/04/01 11:44:28"
# Find blocker by PID and start timestamp
python find_blocker.py server.log --pid 7432 --start-time "2026/04/01 11:36:53"
# Parse a "completed" line directly
python find_blocker.py server.log --completed "2026/04/01 11:44:28 pid 7432 completed 454s"
# Trace the blocking chain deeper (re-analyze the blocker)
python find_blocker.py server.log --pid 7432 --timestamp "2026/04/01 11:44:28" --chain
"""
)
parser.add_argument('logfile', help='Path to the Perforce server log file')
parser.add_argument('--pid', type=int, help='PID of the slow command')
parser.add_argument(
'--timestamp',
help='Completion timestamp (YYYY/MM/DD HH:MM:SS)'
)
parser.add_argument(
'--start-time',
help='Start timestamp of the command (YYYY/MM/DD HH:MM:SS)'
)
parser.add_argument(
'--completed',
help='The full "completed" line from the log'
)
parser.add_argument(
'--chain', action='store_true',
help='Trace the blocking chain (re-analyze the blocker too)'
)
parser.add_argument(
'--quiet', action='store_true',
help='Suppress progress messages'
)
args = parser.parse_args()
# Parse input parameters
target_pid = args.pid
target_time = None
start_time = None
if args.completed:
m = RE_COMPLETED.match(args.completed.strip())
if not m:
print("Error: Could not parse --completed line. Expected format:",
file=sys.stderr)
print(" YYYY/MM/DD HH:MM:SS pid NNNN completed NNNs",
file=sys.stderr)
sys.exit(1)
target_time = parse_timestamp(m.group(1))
target_pid = int(m.group(2))
elif args.timestamp:
target_time = parse_timestamp(args.timestamp)
elif args.start_time:
start_time = parse_timestamp(args.start_time)
if not target_pid:
parser.error("Must specify --pid or --completed")
if not target_time and not start_time:
parser.error(
"Must specify --timestamp, --start-time, or --completed"
)
if not args.quiet:
print(f"=" * 78, file=sys.stderr)
print(f" Finding blocker for pid {target_pid}", file=sys.stderr)
if target_time:
print(f" Completion time: {target_time}", file=sys.stderr)
if start_time:
print(f" Start time: {start_time}", file=sys.stderr)
print(f"=" * 78, file=sys.stderr)
# Phase 1: Find the target command
if not args.quiet:
print(f"\nPhase 1: Finding target command ...", file=sys.stderr)
target = find_command_block(
args.logfile, target_pid,
target_time=target_time,
start_time=start_time,
quiet=args.quiet
)
if not target:
print(f"Error: Could not find pid {target_pid} in the log file.",
file=sys.stderr)
sys.exit(1)
if not target.lapse_seconds:
print(f"Error: Found pid {target_pid} but no lapse/completion time.",
file=sys.stderr)
sys.exit(1)
# Identify which locks the target was waiting on
blocked_tables = target.get_lock_waits()
if not blocked_tables:
print(f"\nWarning: pid {target_pid} has no recorded lock waits.",
file=sys.stderr)
print(f"The command may have been slow due to:", file=sys.stderr)
rpc_s = target.rpc_snd_seconds + target.rpc_rcv_seconds
if rpc_s > 1:
print(f" - Large data transfer (RPC: {rpc_s:.1f}s, "
f"{target.rpc_size_in_mb:.0f}MB in / "
f"{target.rpc_size_out_mb:.0f}MB out)", file=sys.stderr)
print(f" - CPU-intensive processing", file=sys.stderr)
print(f" - Waiting on triggers", file=sys.stderr)
print("\n" + "=" * 78)
print(" PERFORCE LOCK BLOCKER ANALYSIS")
print("=" * 78)
print(f"\n pid {target.pid} '{target.command}' "
f"({target.lapse_seconds:.0f}s) had no significant lock waits.")
print(f"\n Command: {truncate(target.args, 100)}")
print(f" User: {target.user}")
total_ms = target.lapse_seconds * 1000
rpc_ms = rpc_s * 1000
print(f"\n Time breakdown:")
print(f" {'Component':<45s} {'Duration':>10s}")
print(f" {'-'*45} {'-'*10}")
if rpc_ms > 1000:
desc = "RPC transfer"
if target.rpc_size_in_mb > 1:
desc += f" ({target.rpc_size_in_mb:.0f}MB in)"
if target.rpc_size_out_mb > 1:
desc += f" ({target.rpc_size_out_mb:.0f}MB out)"
print(f" {desc:<45s} ~{format_ms(rpc_ms):>8s}")
other = max(0, total_ms - rpc_ms)
print(f" {'Processing / other':<45s} ~{format_ms(other):>8s}")
print(f" {'TOTAL':<45s} {format_ms(total_ms):>9s}")
if target.total_scan_rows > 100000:
print(f"\n The command scanned "
f"{format_count(target.total_scan_rows)} rows — "
f"this may be the bottleneck.")
print()
sys.exit(0)
# Phase 2: Find blockers
if not args.quiet:
print(f"\nPhase 2: Searching for blocking commands ...",
file=sys.stderr)
candidates = find_blockers(
args.logfile, target, blocked_tables, quiet=args.quiet
)
ranked = rank_blockers(target, blocked_tables, candidates)
# Phase 3: Chain analysis
root_chain: Optional[List[Tuple[CommandInfo, str]]] = None
if args.chain and ranked:
top_cmd, _, _ = ranked[0]
blocker_waits = top_cmd.get_lock_waits()
significant = [(t, lt, w) for t, lt, w in blocker_waits
if w > 10000]
if significant:
if not args.quiet:
print(f"\nPhase 3: Tracing blocking chain (pid "
f"{top_cmd.pid} was itself blocked) ...",
file=sys.stderr)
b2_candidates = find_blockers(
args.logfile, top_cmd, significant, quiet=args.quiet
)
ranked2 = rank_blockers(top_cmd, significant, b2_candidates)
if ranked2:
root_chain = [(ranked2[0][0], ranked2[0][1])]
# Phase 4: Pile-up detection
pileup_count = 0
if ranked:
top_cmd, _, _ = ranked[0]
release_ts = top_cmd.end_timestamp
if release_ts and not args.quiet:
print(f"\nPhase {'4' if args.chain else '3'}: "
f"Detecting pile-up at {release_ts} ...", file=sys.stderr)
pileup_count = detect_pileup(
args.logfile, release_ts, window_seconds=3, quiet=args.quiet
)
if not args.quiet:
print(f" {pileup_count} commands completed within 3s of "
f"lock release", file=sys.stderr)
# Generate report
generate_report(target, ranked, blocked_tables,
root_chain=root_chain,
pileup_count=pileup_count)
if __name__ == '__main__':
main()