find_blocker.py #1

  • //
  • guest/
  • russell_jackson/
  • sdp/
  • Server/
  • Unix/
  • p4/
  • common/
  • bin/
  • find_blocker.py
  • View
  • Commits
  • Open Download .zip Download (53 KB)
#!/usr/bin/env python3
"""
Perforce Lock Blocker Finder

Given a Perforce server log (P4LOG with server=3 tracking) and a slow
command, this script identifies what other command(s) held the locks
that caused the slow command to wait.

It traces the full blocking chain: if the blocker was itself blocked,
it continues up the chain to find the root cause.

Usage:
    python find_blocker.py <logfile> --pid <PID> --timestamp <YYYY/MM/DD HH:MM:SS>
    python find_blocker.py <logfile> --completed "2026/04/01 11:44:28 pid 7432 completed 454s"

Examples:
    python find_blocker.py server.log --pid 7432 --timestamp "2026/04/01 11:44:28"
    python find_blocker.py server.log --pid 7432 --start-time "2026/04/01 11:36:53"
"""

import argparse
import re
import sys
import time
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set, Tuple


# ---------------------------------------------------------------------------
# Regex patterns for P4LOG parsing
# ---------------------------------------------------------------------------

RE_HEADER = re.compile(
    r'^\s*(\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2})\s+'
    r'pid\s+(\d+)\s+'
    r'(\S+?)@(\S+)\s+'
    r'([\d.:/a-fA-F]+(?:/[\d.:/a-fA-F]+)?)\s+'
    r'\[(.+?)\]\s*'
    r"'(.+)'(.*)$"
)

RE_COMPLETED = re.compile(
    r'^\s*(\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2})\s+'
    r'pid\s+(\d+)\s+completed\s+(\d+(?:\.\d+)?)(s|ms)'
)

RE_LAPSE = re.compile(r'^---\s+lapse\s+(\d*\.?\d+)(s|ms)')
RE_MEMORY = re.compile(
    r'^---\s+memory\s+cmd/proc\s+(\d+(?:\.\d+)?)mb/(\d+(?:\.\d+)?)mb'
)
RE_RPC = re.compile(
    r'^---\s+rpc\s+msgs/size\s+in\+out\s+'
    r'(\d+)\+(\d+)/(\d+(?:\.\d+)?)mb\+(\d+(?:\.\d+)?)mb'
    r'(?:\s+himarks\s+\d+/\d+)?'
    r'(?:\s+snd/rcv\s+(\d*\.?\d+)s/(\d*\.?\d+)s)?'
)
RE_DB_TABLE = re.compile(r'^---\s+(db\.\w+)\s*$')
RE_PAGES = re.compile(
    r'^---\s+pages\s+in\+out\+cached\s+(\d+)\+(\d+)\+(\d+)'
)
RE_LOCKS_ROWS = re.compile(
    r'^---\s+locks\s+read/write\s+(\d+)/(\d+)\s+'
    r'rows\s+get\+pos\+scan\s+put\+del\s+'
    r'(\d+)\+(\d+)\+(\d+)\s+(\d+)\+(\d+)'
)
RE_TOTAL_LOCK = re.compile(
    r'^---\s+total\s+lock\s+wait\+held\s+read/write\s+'
    r'(\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms'
)
RE_MAX_LOCK = re.compile(
    r'^---\s+max\s+lock\s+wait\+held\s+read/write\s+'
    r'(\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms'
)
RE_PEEK = re.compile(
    r'^---\s+peek\s+count\s+\d+\s+'
    r'wait\+held\s+total/max\s+'
    r'(\d+)ms\+(\d+)ms/(\d+)ms\+(\d+)ms'
)
# Virtual lock entries: --- meta/commit(W), --- clients/...(W), etc.
RE_VIRTUAL_LOCK = re.compile(
    r'^---\s+(\S+)\(([RW])\)\s*$'
)
RE_FILETOTALS = re.compile(r'^---\s+filetotals')


# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------

@dataclass
class TableLockInfo:
    """Lock details for a single database table within a command."""
    table: str = ""
    read_locks: int = 0
    write_locks: int = 0
    rows_get: int = 0
    rows_pos: int = 0
    rows_scan: int = 0
    rows_put: int = 0
    rows_del: int = 0
    pages_in: int = 0

    # Total lock times from "total lock" lines
    total_read_wait_ms: float = 0.0
    total_read_held_ms: float = 0.0
    total_write_wait_ms: float = 0.0
    total_write_held_ms: float = 0.0

    # Max lock times from "max lock" lines
    max_read_wait_ms: float = 0.0
    max_read_held_ms: float = 0.0
    max_write_wait_ms: float = 0.0
    max_write_held_ms: float = 0.0

    # Peek times
    peek_count: int = 0
    peek_held_total_ms: float = 0.0
    peek_held_max_ms: float = 0.0


@dataclass
class CommandInfo:
    """Full details for a parsed command."""
    line_number: int = 0
    timestamp: Optional[datetime] = None
    end_timestamp: Optional[datetime] = None
    pid: int = 0
    user: str = ""
    client: str = ""
    ip_address: str = ""
    app: str = ""
    command: str = ""
    args: str = ""
    trigger_info: str = ""

    lapse_seconds: float = 0.0
    memory_cmd_mb: float = 0.0
    memory_proc_mb: float = 0.0

    rpc_msgs_in: int = 0
    rpc_msgs_out: int = 0
    rpc_size_in_mb: float = 0.0
    rpc_size_out_mb: float = 0.0
    rpc_snd_seconds: float = 0.0
    rpc_rcv_seconds: float = 0.0

    # Per-table lock details
    table_locks: Dict[str, TableLockInfo] = field(default_factory=dict)

    # Virtual lock entries (meta/commit, clients/..., etc.)
    virtual_locks: List[Tuple[str, str, float]] = field(default_factory=list)
    # (name, R/W, held_ms)

    @property
    def total_scan_rows(self) -> int:
        return sum(
            t.rows_get + t.rows_pos + t.rows_scan
            for t in self.table_locks.values()
        )

    def get_lock_waits(self) -> List[Tuple[str, str, float]]:
        """Return list of (table, lock_type, wait_ms) for significant waits.

        lock_type is 'read' or 'write'.
        """
        waits = []
        for table, info in self.table_locks.items():
            if info.total_read_wait_ms > 0:
                waits.append((table, 'read', info.total_read_wait_ms))
            if info.total_write_wait_ms > 0:
                waits.append((table, 'write', info.total_write_wait_ms))
        return sorted(waits, key=lambda x: -x[2])

    def get_lock_helds(self) -> List[Tuple[str, str, float]]:
        """Return list of (table, lock_type, held_ms) for significant holds."""
        helds = []
        for table, info in self.table_locks.items():
            if info.total_read_held_ms > 100:
                helds.append((table, 'read', info.total_read_held_ms))
            if info.total_write_held_ms > 100:
                helds.append((table, 'write', info.total_write_held_ms))
        return sorted(helds, key=lambda x: -x[2])


# ---------------------------------------------------------------------------
# Parsing functions
# ---------------------------------------------------------------------------

def parse_timestamp(s: str) -> Optional[datetime]:
    """Parse a P4LOG timestamp."""
    try:
        return datetime.strptime(s.strip(), '%Y/%m/%d %H:%M:%S')
    except ValueError:
        return None


def find_command_block(filepath: str, target_pid: int,
                       target_time: Optional[datetime] = None,
                       start_time: Optional[datetime] = None,
                       quiet: bool = False) -> Optional[CommandInfo]:
    """Find and parse the full command block for a specific PID and time.

    This uses a two-pass approach to handle PID reuse:

    Pass 1: Find the exact "completed" or header line that anchors
    the target command. For completion time matching, we look for the
    "completed" line at the exact timestamp. For start time matching,
    we look for a header at the exact timestamp.

    Pass 2: Re-scan the file collecting ALL tracking lines for the
    target PID within the command's time range (start to completion).

    Returns the fully parsed CommandInfo with all table lock details.
    """
    if not quiet:
        print(f"  Searching for pid {target_pid} ...", file=sys.stderr)

    t0 = time.time()
    line_count = 0

    # --- Pass 1: Find the anchor point ---
    # Determine the command's start_time and end_time precisely.
    cmd_start: Optional[datetime] = None
    cmd_end: Optional[datetime] = None
    cmd_lapse: float = 0.0
    cmd_args: Optional[str] = None
    cmd_user: str = ""
    cmd_client: str = ""
    cmd_ip: str = ""
    cmd_app: str = ""
    anchor_line: int = 0

    # Collect all header timestamps and completed timestamps for this PID
    headers: List[Tuple[int, datetime, str, str, str, str, str]] = []
    completions: List[Tuple[int, datetime, float]] = []

    with open(filepath, 'r', encoding='utf-8', errors='replace') as fh:
        for line in fh:
            line_count += 1
            stripped = line.strip()
            if not stripped:
                continue
            if stripped in ('Perforce server info:', 'Perforce server error:'):
                continue

            m = RE_HEADER.match(stripped)
            if m:
                pid = int(m.group(2))
                if pid == target_pid:
                    ts = parse_timestamp(m.group(1))
                    if ts:
                        headers.append((
                            line_count, ts, m.group(3), m.group(4),
                            m.group(5), m.group(6), m.group(7)
                        ))
                continue

            m = RE_COMPLETED.match(stripped)
            if m:
                pid = int(m.group(2))
                if pid == target_pid:
                    ts = parse_timestamp(m.group(1))
                    val = float(m.group(3))
                    unit = m.group(4)
                    lapse = val if unit == 's' else val / 1000.0
                    if ts:
                        completions.append((line_count, ts, lapse))
                continue

    elapsed_pass1 = time.time() - t0
    if not quiet:
        print(f"  Pass 1: Scanned {line_count:,} lines in {elapsed_pass1:.1f}s, "
              f"found {len(headers)} headers and {len(completions)} "
              f"completions for pid {target_pid}", file=sys.stderr)

    # Find the matching completion or header
    if target_time:
        # Match on completion timestamp (exact second)
        for ln, ts, lapse in completions:
            if ts == target_time:
                cmd_end = ts
                cmd_lapse = lapse
                anchor_line = ln
                cmd_start = ts - timedelta(seconds=lapse)
                break
        if not cmd_end:
            # Try close match (within 2 seconds)
            for ln, ts, lapse in completions:
                if abs((ts - target_time).total_seconds()) <= 2:
                    cmd_end = ts
                    cmd_lapse = lapse
                    anchor_line = ln
                    cmd_start = ts - timedelta(seconds=lapse)
                    break
    elif start_time:
        # Match on header timestamp
        for ln, ts, user, client, ip, app, args in headers:
            if ts == start_time:
                cmd_start = ts
                anchor_line = ln
                break
        if not cmd_start:
            for ln, ts, user, client, ip, app, args in headers:
                if abs((ts - start_time).total_seconds()) <= 2:
                    cmd_start = ts
                    anchor_line = ln
                    break

    if not cmd_end and not cmd_start:
        if not quiet:
            print(f"  Could not find matching entry for pid {target_pid}",
                  file=sys.stderr)
        return None

    # Find the first header for this PID at or near the command's start time
    if cmd_start:
        for ln, ts, user, client, ip, app, args in headers:
            if abs((ts - cmd_start).total_seconds()) <= 2:
                cmd_user = user
                cmd_client = client
                cmd_ip = ip
                cmd_app = app
                cmd_args = args
                if anchor_line == 0:
                    anchor_line = ln
                break

    # If we still don't have start/end, compute from what we have
    if cmd_start and not cmd_end and cmd_lapse > 0:
        cmd_end = cmd_start + timedelta(seconds=cmd_lapse)
    if cmd_end and not cmd_start and cmd_lapse > 0:
        cmd_start = cmd_end - timedelta(seconds=cmd_lapse)

    if not quiet:
        print(f"  Found command: {cmd_user}@{cmd_client} "
              f"'{(cmd_args or '')[:60]}...'", file=sys.stderr)
        print(f"  Start: {cmd_start}, End: {cmd_end}, "
              f"Lapse: {cmd_lapse:.1f}s", file=sys.stderr)

    # --- Pass 2: Collect all tracking data for this PID ---
    # Define the time window for collecting tracking lines
    if cmd_start and cmd_end:
        collect_start = cmd_start - timedelta(minutes=1)
        collect_end = cmd_end + timedelta(minutes=1)
    elif cmd_start:
        collect_start = cmd_start - timedelta(minutes=1)
        collect_end = cmd_start + timedelta(hours=2)
    elif cmd_end:
        collect_start = cmd_end - timedelta(hours=2)
        collect_end = cmd_end + timedelta(minutes=1)
    else:
        return None

    cmd = CommandInfo(
        pid=target_pid, line_number=anchor_line,
        timestamp=cmd_start, end_timestamp=cmd_end,
        lapse_seconds=cmd_lapse,
        user=cmd_user, client=cmd_client, ip_address=cmd_ip,
        app=cmd_app, args=cmd_args or "", command=""
    )
    if cmd_args:
        parts = cmd_args.split(None, 1)
        cmd.command = parts[0] if parts else cmd_args

    current_db_table: Optional[str] = None
    current_pid: Optional[int] = None
    in_target_block = False
    line_count = 0
    t1 = time.time()

    with open(filepath, 'r', encoding='utf-8', errors='replace') as fh:
        for line in fh:
            line_count += 1
            stripped = line.strip()
            if not stripped:
                continue
            if stripped in ('Perforce server info:', 'Perforce server error:'):
                continue

            # Header
            m = RE_HEADER.match(stripped)
            if m:
                pid = int(m.group(2))
                ts = parse_timestamp(m.group(1))
                current_pid = pid
                if pid == target_pid and ts:
                    if collect_start <= ts <= collect_end:
                        in_target_block = True
                        current_db_table = None
                    else:
                        in_target_block = False
                else:
                    in_target_block = False
                continue

            # Completed
            m = RE_COMPLETED.match(stripped)
            if m:
                pid = int(m.group(2))
                ts = parse_timestamp(m.group(1))
                current_pid = pid
                if pid == target_pid and ts:
                    if collect_start <= ts <= collect_end:
                        in_target_block = True
                        val = float(m.group(3))
                        unit = m.group(4)
                        cmd.lapse_seconds = val if unit == 's' else val / 1000.0
                        cmd.end_timestamp = ts
                    else:
                        in_target_block = False
                else:
                    in_target_block = False
                continue

            # Tracking lines
            if not stripped.startswith('---'):
                continue
            if not in_target_block:
                continue

            # Lapse - only update if larger (the final stats block has
            # the overall lapse)
            m = RE_LAPSE.match(stripped)
            if m:
                val = float(m.group(1))
                unit = m.group(2)
                lapse = val if unit == 's' else val / 1000.0
                if lapse > cmd.lapse_seconds:
                    cmd.lapse_seconds = lapse
                continue

            # Memory - use max across multiple phases
            m = RE_MEMORY.match(stripped)
            if m:
                cmd.memory_cmd_mb = max(cmd.memory_cmd_mb, float(m.group(1)))
                cmd.memory_proc_mb = max(cmd.memory_proc_mb, float(m.group(2)))
                continue

            # RPC - use max across multiple phases
            m = RE_RPC.match(stripped)
            if m:
                cmd.rpc_msgs_in = max(cmd.rpc_msgs_in, int(m.group(1)))
                cmd.rpc_msgs_out = max(cmd.rpc_msgs_out, int(m.group(2)))
                cmd.rpc_size_in_mb = max(cmd.rpc_size_in_mb, float(m.group(3)))
                cmd.rpc_size_out_mb = max(cmd.rpc_size_out_mb, float(m.group(4)))
                if m.group(5):
                    cmd.rpc_snd_seconds = max(
                        cmd.rpc_snd_seconds, float(m.group(5)))
                    cmd.rpc_rcv_seconds = max(
                        cmd.rpc_rcv_seconds, float(m.group(6)))
                continue

            # Virtual lock entry (meta/commit(W), clients/...(W), etc.)
            m = RE_VIRTUAL_LOCK.match(stripped)
            if m:
                current_db_table = None  # Reset db table context
                continue

            # DB table header
            m = RE_DB_TABLE.match(stripped)
            if m:
                current_db_table = m.group(1)
                if current_db_table not in cmd.table_locks:
                    cmd.table_locks[current_db_table] = TableLockInfo(
                        table=current_db_table
                    )
                continue

            # Pages
            m = RE_PAGES.match(stripped)
            if m and current_db_table and current_db_table in cmd.table_locks:
                cmd.table_locks[current_db_table].pages_in += int(m.group(1))
                continue

            # Locks and rows
            m = RE_LOCKS_ROWS.match(stripped)
            if m and current_db_table:
                if current_db_table not in cmd.table_locks:
                    cmd.table_locks[current_db_table] = TableLockInfo(
                        table=current_db_table
                    )
                info = cmd.table_locks[current_db_table]
                info.read_locks += int(m.group(1))
                info.write_locks += int(m.group(2))
                info.rows_get += int(m.group(3))
                info.rows_pos += int(m.group(4))
                info.rows_scan += int(m.group(5))
                info.rows_put += int(m.group(6))
                info.rows_del += int(m.group(7))
                continue

            # Total lock times
            m = RE_TOTAL_LOCK.match(stripped)
            if m and current_db_table:
                if current_db_table not in cmd.table_locks:
                    cmd.table_locks[current_db_table] = TableLockInfo(
                        table=current_db_table
                    )
                info = cmd.table_locks[current_db_table]
                rw = float(m.group(1))
                rh = float(m.group(2))
                ww = float(m.group(3))
                wh = float(m.group(4))
                # Use max since the same table can appear multiple times
                # (different phases of a submit)
                info.total_read_wait_ms = max(info.total_read_wait_ms, rw)
                info.total_read_held_ms = max(info.total_read_held_ms, rh)
                info.total_write_wait_ms = max(info.total_write_wait_ms, ww)
                info.total_write_held_ms = max(info.total_write_held_ms, wh)
                continue

            # Max lock times
            m = RE_MAX_LOCK.match(stripped)
            if m and current_db_table:
                if current_db_table not in cmd.table_locks:
                    cmd.table_locks[current_db_table] = TableLockInfo(
                        table=current_db_table
                    )
                info = cmd.table_locks[current_db_table]
                info.max_read_wait_ms = max(info.max_read_wait_ms, float(m.group(1)))
                info.max_read_held_ms = max(info.max_read_held_ms, float(m.group(2)))
                info.max_write_wait_ms = max(info.max_write_wait_ms, float(m.group(3)))
                info.max_write_held_ms = max(info.max_write_held_ms, float(m.group(4)))
                continue

            # Peek
            m = RE_PEEK.match(stripped)
            if m and current_db_table:
                if current_db_table not in cmd.table_locks:
                    cmd.table_locks[current_db_table] = TableLockInfo(
                        table=current_db_table
                    )
                info = cmd.table_locks[current_db_table]
                info.peek_count += int(stripped.split()[3])  # count from raw
                info.peek_held_total_ms += float(m.group(2))
                info.peek_held_max_ms = max(info.peek_held_max_ms, float(m.group(4)))
                continue

            if stripped.startswith('locks acquired'):
                continue

    elapsed = time.time() - t0
    if not quiet:
        print(f"  Pass 2: Scanned {line_count:,} lines in "
              f"{elapsed - elapsed_pass1:.1f}s", file=sys.stderr)

    if not cmd.command:
        return None

    return cmd


def find_blockers(filepath: str, target: CommandInfo,
                  blocked_tables: List[Tuple[str, str, float]],
                  quiet: bool = False) -> List[CommandInfo]:
    """Find commands that held locks blocking the target command.

    For each table the target waited on:
    - If target waited for a READ lock, find commands that held WRITE locks
    - If target waited for a WRITE lock, find commands that held READ or WRITE locks

    We look for commands that:
    1. Overlapped in time with the target
    2. Held the opposing lock type on the same table
    3. Had significant lock held time
    """
    if not blocked_tables:
        return []

    # Determine the time window to search
    # The target started at timestamp and ran for lapse_seconds
    if target.timestamp and target.end_timestamp:
        search_start = target.timestamp - timedelta(minutes=5)
        search_end = target.end_timestamp + timedelta(minutes=5)
    elif target.end_timestamp:
        search_start = target.end_timestamp - timedelta(
            seconds=target.lapse_seconds + 300
        )
        search_end = target.end_timestamp + timedelta(minutes=5)
    else:
        return []

    # Tables we're interested in and what lock type to look for
    search_tables: Dict[str, str] = {}
    for table, wait_type, wait_ms in blocked_tables:
        if wait_type == 'read':
            # Target waited for read lock -> blocker held write lock
            search_tables[table] = 'write'
        else:
            # Target waited for write lock -> blocker held read or write lock
            search_tables[table] = 'any'

    if not quiet:
        print(f"\n  Searching for blockers on: "
              f"{', '.join(f'{t} ({lt} lock)' for t, lt in search_tables.items())}",
              file=sys.stderr)
        print(f"  Time window: {search_start} to {search_end}", file=sys.stderr)

    # Now scan the log for candidate blockers
    candidates: List[CommandInfo] = []
    by_pid: Dict[int, CommandInfo] = {}
    db_table_ctx: Dict[int, str] = {}
    current_pid: Optional[int] = None
    line_count = 0
    in_window = False
    past_window = False
    t0 = time.time()

    def emit_candidate(pid: int):
        """Check if this PID's command is a blocker candidate and save it."""
        if pid not in by_pid:
            return
        rec = by_pid.pop(pid)
        db_table_ctx.pop(pid, None)

        if rec.pid == target.pid:
            return  # Skip the target itself

        if rec.lapse_seconds <= 0:
            return

        # Check if this command held locks on any of our target tables
        for table, needed_lock in search_tables.items():
            if table not in rec.table_locks:
                continue
            info = rec.table_locks[table]
            is_blocker = False
            if needed_lock == 'write' and info.total_write_held_ms > 100:
                is_blocker = True
            elif needed_lock == 'any' and (
                info.total_read_held_ms > 100 or
                info.total_write_held_ms > 100
            ):
                is_blocker = True
            if is_blocker:
                candidates.append(rec)
                return

    with open(filepath, 'r', encoding='utf-8', errors='replace') as fh:
        for line in fh:
            line_count += 1
            stripped = line.strip()
            if not stripped:
                continue
            if stripped in ('Perforce server info:', 'Perforce server error:'):
                continue

            # --- Header ---
            m = RE_HEADER.match(stripped)
            if m:
                pid = int(m.group(2))
                ts = parse_timestamp(m.group(1))
                current_pid = pid

                # Check time window
                if ts:
                    if ts > search_end:
                        past_window = True
                    elif ts >= search_start:
                        in_window = True

                if past_window and pid not in by_pid:
                    continue

                if pid in by_pid:
                    old = by_pid[pid]
                    full_cmd = m.group(7)
                    if old.lapse_seconds > 0 and old.args != full_cmd:
                        emit_candidate(pid)
                    else:
                        continue

                rec = CommandInfo(pid=pid, line_number=line_count)
                rec.timestamp = ts
                rec.user = m.group(3)
                rec.client = m.group(4)
                rec.ip_address = m.group(5)
                rec.app = m.group(6)
                full_cmd = m.group(7)
                rec.args = full_cmd
                parts = full_cmd.split(None, 1)
                rec.command = parts[0] if parts else full_cmd
                trailing = m.group(8).strip()
                if trailing:
                    rec.trigger_info = trailing
                by_pid[pid] = rec
                continue

            # --- Completed ---
            m = RE_COMPLETED.match(stripped)
            if m:
                pid = int(m.group(2))
                ts = parse_timestamp(m.group(1))
                current_pid = pid

                if ts:
                    if ts > search_end:
                        past_window = True
                    elif ts >= search_start:
                        in_window = True

                if pid not in by_pid:
                    by_pid[pid] = CommandInfo(pid=pid)
                rec = by_pid[pid]
                val = float(m.group(3))
                unit = m.group(4)
                rec.lapse_seconds = val if unit == 's' else val / 1000.0
                rec.end_timestamp = ts
                continue

            # --- Tracking lines ---
            if not stripped.startswith('---'):
                continue

            pid = current_pid
            if pid is None or pid not in by_pid:
                continue
            rec = by_pid[pid]

            m = RE_LAPSE.match(stripped)
            if m:
                val = float(m.group(1))
                unit = m.group(2)
                rec.lapse_seconds = val if unit == 's' else val / 1000.0
                continue

            m = RE_MEMORY.match(stripped)
            if m:
                rec.memory_cmd_mb = float(m.group(1))
                rec.memory_proc_mb = float(m.group(2))
                continue

            m = RE_RPC.match(stripped)
            if m:
                rec.rpc_msgs_in = int(m.group(1))
                rec.rpc_msgs_out = int(m.group(2))
                rec.rpc_size_in_mb = float(m.group(3))
                rec.rpc_size_out_mb = float(m.group(4))
                if m.group(5):
                    rec.rpc_snd_seconds = float(m.group(5))
                    rec.rpc_rcv_seconds = float(m.group(6))
                continue

            m = RE_VIRTUAL_LOCK.match(stripped)
            if m:
                db_table_ctx.pop(pid, None)
                continue

            m = RE_DB_TABLE.match(stripped)
            if m:
                tbl = m.group(1)
                db_table_ctx[pid] = tbl
                if tbl not in rec.table_locks:
                    rec.table_locks[tbl] = TableLockInfo(table=tbl)
                continue

            current_table = db_table_ctx.get(pid)

            m = RE_PAGES.match(stripped)
            if m and current_table and current_table in rec.table_locks:
                rec.table_locks[current_table].pages_in += int(m.group(1))
                continue

            m = RE_LOCKS_ROWS.match(stripped)
            if m and current_table:
                if current_table not in rec.table_locks:
                    rec.table_locks[current_table] = TableLockInfo(
                        table=current_table
                    )
                info = rec.table_locks[current_table]
                info.read_locks += int(m.group(1))
                info.write_locks += int(m.group(2))
                info.rows_get += int(m.group(3))
                info.rows_pos += int(m.group(4))
                info.rows_scan += int(m.group(5))
                info.rows_put += int(m.group(6))
                info.rows_del += int(m.group(7))
                continue

            m = RE_TOTAL_LOCK.match(stripped)
            if m and current_table:
                if current_table not in rec.table_locks:
                    rec.table_locks[current_table] = TableLockInfo(
                        table=current_table
                    )
                info = rec.table_locks[current_table]
                info.total_read_wait_ms = max(
                    info.total_read_wait_ms, float(m.group(1)))
                info.total_read_held_ms = max(
                    info.total_read_held_ms, float(m.group(2)))
                info.total_write_wait_ms = max(
                    info.total_write_wait_ms, float(m.group(3)))
                info.total_write_held_ms = max(
                    info.total_write_held_ms, float(m.group(4)))
                continue

            m = RE_MAX_LOCK.match(stripped)
            if m and current_table:
                if current_table not in rec.table_locks:
                    rec.table_locks[current_table] = TableLockInfo(
                        table=current_table
                    )
                info = rec.table_locks[current_table]
                info.max_read_wait_ms = max(
                    info.max_read_wait_ms, float(m.group(1)))
                info.max_read_held_ms = max(
                    info.max_read_held_ms, float(m.group(2)))
                info.max_write_wait_ms = max(
                    info.max_write_wait_ms, float(m.group(3)))
                info.max_write_held_ms = max(
                    info.max_write_held_ms, float(m.group(4)))
                continue

            m = RE_PEEK.match(stripped)
            if m and current_table:
                if current_table not in rec.table_locks:
                    rec.table_locks[current_table] = TableLockInfo(
                        table=current_table
                    )
                info = rec.table_locks[current_table]
                info.peek_held_total_ms += float(m.group(2))
                info.peek_held_max_ms = max(
                    info.peek_held_max_ms, float(m.group(4)))
                continue

            if stripped.startswith('locks acquired'):
                continue

    # Flush remaining
    for pid in list(by_pid.keys()):
        emit_candidate(pid)

    elapsed = time.time() - t0
    if not quiet:
        print(f"  Scanned {line_count:,} lines in {elapsed:.1f}s, "
              f"found {len(candidates)} candidates", file=sys.stderr)

    return candidates


# ---------------------------------------------------------------------------
# Analysis and ranking
# ---------------------------------------------------------------------------

def rank_blockers(target: CommandInfo,
                  blocked_tables: List[Tuple[str, str, float]],
                  candidates: List[CommandInfo]) -> List[Tuple[CommandInfo, str, float]]:
    """Rank blocker candidates by how likely they are the actual blocker.

    Prioritises the table with the LARGEST wait time on the target, because
    that is the dominant contributor to the target's slowness.

    Returns list of (command, blocking_description, score) sorted by score desc.
    """
    if not blocked_tables:
        return []

    # Build search map: table -> (needed_lock_type, target_wait_ms)
    search_tables: Dict[str, Tuple[str, float]] = {}
    for table, wait_type, wait_ms in blocked_tables:
        needed = 'write' if wait_type == 'read' else 'any'
        # Keep the entry with the largest wait if duplicated
        if table not in search_tables or wait_ms > search_tables[table][1]:
            search_tables[table] = (needed, wait_ms)

    # The table the target waited on the longest drives the score weighting
    primary_table = blocked_tables[0][0]
    primary_wait = blocked_tables[0][2]

    results = []

    for cand in candidates:
        best_score = 0.0
        best_desc = ""
        best_table = ""
        best_held = 0.0

        for table, (needed_lock, target_wait) in search_tables.items():
            if table not in cand.table_locks:
                continue
            info = cand.table_locks[table]

            held_ms = 0.0
            lock_type_str = ""
            if needed_lock == 'write' and info.total_write_held_ms > 0:
                held_ms = info.total_write_held_ms
                lock_type_str = "WRITE"
            elif needed_lock == 'any':
                if info.total_write_held_ms >= info.total_read_held_ms:
                    held_ms = info.total_write_held_ms
                    lock_type_str = "WRITE"
                else:
                    held_ms = info.total_read_held_ms
                    lock_type_str = "READ"

            if held_ms <= 0:
                continue

            score = 0.0

            # Weight by which table this is -- primary table gets full
            # weight, secondary tables get proportionally less.
            table_weight = target_wait / primary_wait if primary_wait > 0 else 1.0

            # How well does the held time cover the target's wait?
            if target_wait > 0:
                coverage = min(held_ms / target_wait, 2.0)
                score += coverage * 50 * table_weight  # up to 100 pts

            # Longer held = more impactful
            score += min(held_ms / 1000, 50) * table_weight

            # Time overlap with target
            if cand.end_timestamp and target.timestamp:
                if cand.end_timestamp >= target.timestamp:
                    score += 25  # ended after target started
                cand_start = cand.timestamp
                if not cand_start and cand.end_timestamp and cand.lapse_seconds:
                    cand_start = cand.end_timestamp - timedelta(
                        seconds=cand.lapse_seconds)
                if cand_start and cand_start <= target.timestamp:
                    score += 25  # started before target

            if score > best_score:
                best_score = score
                best_desc = (f"{lock_type_str} lock on {table} "
                             f"held {format_ms(held_ms)}")
                best_table = table
                best_held = held_ms

        if best_score > 0:
            results.append((cand, best_desc, best_score))

    results.sort(key=lambda x: -x[2])
    return results


# ---------------------------------------------------------------------------
# Pile-up detection
# ---------------------------------------------------------------------------

def detect_pileup(filepath: str, release_time: datetime,
                  window_seconds: int = 3,
                  quiet: bool = False) -> int:
    """Count how many commands completed within window_seconds of release_time."""
    count = 0
    window_start = release_time - timedelta(seconds=window_seconds)
    window_end = release_time + timedelta(seconds=window_seconds)

    with open(filepath, 'r', encoding='utf-8', errors='replace') as fh:
        for line in fh:
            stripped = line.strip()
            m = RE_COMPLETED.match(stripped)
            if m:
                ts = parse_timestamp(m.group(1))
                if ts and window_start <= ts <= window_end:
                    count += 1
    return count


# ---------------------------------------------------------------------------
# Formatting helpers
# ---------------------------------------------------------------------------

def format_ms(ms: float) -> str:
    """Format milliseconds human-readably."""
    if ms < 1000:
        return f"{ms:.0f}ms"
    elif ms < 60_000:
        return f"{ms / 1000:.0f}s"
    elif ms < 3_600_000:
        return f"{ms / 60_000:.1f}m"
    else:
        return f"{ms / 3_600_000:.1f}h"


def format_count(n: int) -> str:
    return f"{n:,}"


def truncate(s: str, max_len: int = 100) -> str:
    if len(s) <= max_len:
        return s
    return s[:max_len - 3] + "..."


def short_user(cmd: CommandInfo) -> str:
    """Short display name for a command's user."""
    return cmd.user.split('@')[0] if '@' in cmd.user else cmd.user


# ---------------------------------------------------------------------------
# Narrative report generation
# ---------------------------------------------------------------------------

def generate_report(target: CommandInfo,
                    ranked: List[Tuple[CommandInfo, str, float]],
                    blocked_tables: List[Tuple[str, str, float]],
                    root_chain: Optional[List[Tuple[CommandInfo, str]]] = None,
                    pileup_count: int = 0):
    """Generate a narrative-style analysis report."""
    w = print  # shorthand

    if not ranked:
        w("=" * 78)
        w("  PERFORCE LOCK BLOCKER ANALYSIS")
        w("=" * 78)
        w(f"\n  Could not identify the blocking command for pid {target.pid}.")
        w(f"  Possible reasons:")
        w(f"    - The lock holder completed before this log's time range")
        w(f"    - The blocking was caused by a checkpoint or journal rotation")
        w(f"    - The log level (server=3) may not capture enough detail")
        return

    top_cmd, top_desc, _ = ranked[0]
    primary_table, primary_lock_type, primary_wait_ms = blocked_tables[0]

    # ===== Root Cause =====
    w("")
    w("=" * 78)
    w("  Root Cause: pid {pid} '{cmd}' blocked your command".format(
        pid=top_cmd.pid, cmd=truncate(top_cmd.args, 50)))
    w("=" * 78)

    w(f"""
  The command that caused pid {target.pid}'s '{target.command}' to take
  {target.lapse_seconds:.0f} seconds was:

  pid {top_cmd.pid} -- {truncate(top_cmd.args, 70)}
    User:      {top_cmd.user}
    App:       {top_cmd.app}
    IP:        {top_cmd.ip_address}""")
    if top_cmd.timestamp:
        w(f"    Started:   {top_cmd.timestamp}")
    if top_cmd.end_timestamp:
        end_str = str(top_cmd.end_timestamp)
        w(f"    Completed: {end_str} ({top_cmd.lapse_seconds:.0f} seconds)")

    # ===== The blocking lock =====
    w(f"\n  The blocking lock: {primary_table}\n")

    # Describe the mechanism
    lock_needed = primary_lock_type.upper()
    opposing = "WRITE" if primary_lock_type == 'read' else "READ or WRITE"

    # Find the blocker's held time on the primary table
    blocker_held_ms = 0.0
    blocker_lock_type = ""
    if primary_table in top_cmd.table_locks:
        info = top_cmd.table_locks[primary_table]
        if info.total_write_held_ms > info.total_read_held_ms:
            blocker_held_ms = info.total_write_held_ms
            blocker_lock_type = "write"
        else:
            blocker_held_ms = info.total_read_held_ms
            blocker_lock_type = "read"

    if blocker_held_ms > 0:
        w(f"  pid {top_cmd.pid}'s '{top_cmd.command}' held a "
          f"{blocker_lock_type.upper()} lock on {primary_table} for "
          f"{format_ms(blocker_held_ms)} (~{blocker_held_ms/1000:.0f}s).")
        w(f"\n  Your pid {target.pid} needed a {lock_needed} lock on "
          f"{primary_table} and waited {format_ms(primary_wait_ms)} "
          f"(~{primary_wait_ms/1000:.0f}s) for it — blocked by pid "
          f"{top_cmd.pid}'s {blocker_lock_type} lock.")

    # ===== Why was the blocker slow? =====
    blocker_waits = top_cmd.get_lock_waits()
    significant_waits = [(t, lt, w) for t, lt, w in blocker_waits
                         if w > 10000]
    blocker_helds = top_cmd.get_lock_helds()

    if significant_waits or blocker_helds:
        w(f"\n  Why did pid {top_cmd.pid}'s {top_cmd.command} take "
          f"{top_cmd.lapse_seconds:.0f} seconds?\n")

        if blocker_helds:
            w(f"  It held locks on:")
            for table, lt, hms in blocker_helds:
                info = top_cmd.table_locks.get(table)
                scan = (info.rows_get + info.rows_pos + info.rows_scan
                        if info else 0)
                extra = ""
                if scan > 0:
                    extra = f" scanning {format_count(scan)} rows"
                w(f"    - {table}: {lt.upper()} held "
                  f"{format_ms(hms)}{extra}")

        if significant_waits:
            w(f"\n  It was itself blocked waiting for locks:")
            for table, lt, wms in significant_waits:
                w(f"    - {table}: waited {format_ms(wms)} for "
                  f"{lt.upper()} lock")
            w(f"\n  This means pid {top_cmd.pid} was not inherently slow — "
              f"it was waiting for locks the whole time.")

    # ===== Chain of blocking =====
    w("\n\n  Chain of blocking\n")

    # Build the chain
    chain_items = []  # list of (cmd, wait_desc, held_desc)

    # Root cause (if --chain found one)
    if root_chain:
        for rc_cmd, rc_desc in root_chain:
            rc_helds = rc_cmd.get_lock_helds()
            rc_held_desc = ""
            if rc_helds:
                t, lt, hms = rc_helds[0]
                info = rc_cmd.table_locks.get(t)
                scan = (info.rows_get + info.rows_pos + info.rows_scan
                        if info else 0)
                scan_str = f" (scanned {format_count(scan)} rows!)" if scan > 100000 else ""
                rc_held_desc = (f"held {lt.upper()} lock on {t} for "
                                f"{format_ms(hms)}{scan_str}")
            chain_items.append((rc_cmd, "", rc_held_desc))

    # The blocker
    blocker_wait_str = ""
    if significant_waits:
        t, lt, wms = significant_waits[0]
        blocker_wait_str = (f"which waited {format_ms(wms)} for "
                            f"{lt.upper()} lock on {t}")
    blocker_held_str = ""
    if blocker_held_ms > 0:
        blocker_held_str = (f"then held {blocker_lock_type.upper()} lock on "
                            f"{primary_table} for {format_ms(blocker_held_ms)}")
    chain_items.append((top_cmd, blocker_wait_str, blocker_held_str))

    # The target
    target_wait_str = (f"which waited {format_ms(primary_wait_ms)} for "
                       f"{lock_needed} lock on {primary_table}")
    chain_items.append((target, target_wait_str, ""))

    # Print the chain as a tree
    for i, (cmd, wait_str, held_str) in enumerate(chain_items):
        indent = "  " + "     " * i
        arrow = ""
        if i > 0:
            prev_indent = "  " + "     " * (i - 1)
            w(f"{prev_indent}  |")
            w(f"{prev_indent}  +-- blocked -->")
        label = f"pid {cmd.pid} ({short_user(cmd)}, {truncate(cmd.args, 55)})"
        w(f"{indent}{label}")
        if wait_str:
            w(f"{indent}  {wait_str}")
        if held_str:
            w(f"{indent}  {held_str}")

    # ===== Time breakdown =====
    w(f"\n\n  Time breakdown for pid {target.pid} "
      f"({target.lapse_seconds:.0f}s total)\n")

    total_ms = target.lapse_seconds * 1000
    rpc_ms = (target.rpc_snd_seconds + target.rpc_rcv_seconds) * 1000
    total_wait_ms = sum(wms for _, _, wms in blocked_tables)
    # Don't double count — lock waits and RPC can overlap
    accounted = max(rpc_ms, total_wait_ms)
    other_ms = max(0, total_ms - accounted)

    components = []
    if rpc_ms > 1000:
        rpc_desc = "RPC receive"
        if target.rpc_size_in_mb > 1:
            rpc_desc += f" ({target.rpc_size_in_mb:.0f}MB from edge)"
        components.append((rpc_desc, rpc_ms))
    if total_wait_ms > 1000:
        components.append(
            (f"Waiting for {primary_table} {lock_needed} lock",
             total_wait_ms))
    if other_ms > 1000:
        components.append(("Actual processing", other_ms))

    w(f"  {'Component':<45s} {'Duration':>10s}")
    w(f"  {'-'*45} {'-'*10}")
    for desc, ms in components:
        w(f"  {desc:<45s} ~{format_ms(ms):>8s}")
    w(f"  {'TOTAL':<45s} {format_ms(total_ms):>9s}")

    # ===== Impact (pile-up) =====
    if pileup_count > 5:
        release_ts = top_cmd.end_timestamp or target.end_timestamp
        w(f"\n\n  Impact\n")
        w(f"  This created a massive pile-up — {pileup_count} commands all "
          f"completed simultaneously")
        if release_ts:
            w(f"  at {release_ts} when pid {top_cmd.pid}'s locks "
              f"were released.")

    # ===== Recommendations =====
    w(f"\n\n  Recommendations\n")

    rec_num = 1

    # Check root chain for the ultimate cause
    root = root_chain[-1][0] if root_chain else top_cmd

    # Wildcard pattern detection
    if '...' in root.args and root.total_scan_rows > 1_000_000:
        w(f"  {rec_num}. The '{root.command}' command by {root.user} "
          f"is the root cause — scanning {format_count(root.total_scan_rows)} "
          f"rows")
        # Try to extract the depot path
        parts = root.args.split()
        for p in parts:
            if p.startswith('//') or p.startswith('...'):
                w(f"     using broad wildcard path: {truncate(p, 80)}")
                break
        w(f"     Consider restricting broad '...' wildcard usage or "
          f"setting group limits")
        w(f"     (MaxScanRows, MaxLockTime) to prevent these queries "
          f"from impacting the server.")
        rec_num += 1

    # Memory-intensive commands
    if root.memory_cmd_mb > 2000:
        w(f"  {rec_num}. The command consumed {root.memory_cmd_mb:.0f}MB "
          f"of memory — consider MaxMemory limits.")
        rec_num += 1

    # Submit was blocked (not inherently slow)
    if top_cmd.command in ('dm-CommitSubmit', 'user-submit') and significant_waits:
        w(f"  {rec_num}. The submit (pid {top_cmd.pid}) itself was fast "
          f"— it was just waiting for locks the whole time.")
        w(f"     The fix is to address the root cause that held the "
          f"locks, not the submit.")
        rec_num += 1

    # Long-running fstat/sync blocking submits
    if (root.command in ('user-fstat', 'user-files', 'user-sync',
                          'user-changes')
            and top_cmd.command in ('dm-CommitSubmit', 'user-submit')):
        w(f"  {rec_num}. Long-running read operations "
          f"('{root.command}') are blocking submits.")
        w(f"     This is a common pattern where broad read queries hold "
          f"shared locks that prevent")
        w(f"     exclusive write locks needed by submits. Solutions:")
        w(f"       - Set MaxScanRows limits to prevent unbounded scans")
        w(f"       - Review the client application "
          f"({root.app}) for optimization")
        w(f"       - Consider edge servers to offload read traffic")
        rec_num += 1

    if rec_num == 1:
        w(f"  1. Review the blocking command (pid {top_cmd.pid}) "
          f"to determine if it can be optimized or scheduled "
          f"during off-peak hours.")

    w("")


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

def main():
    parser = argparse.ArgumentParser(
        description='Find what command blocked a slow Perforce command.',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Find blocker by PID and completion timestamp
  python find_blocker.py server.log --pid 7432 --timestamp "2026/04/01 11:44:28"

  # Find blocker by PID and start timestamp
  python find_blocker.py server.log --pid 7432 --start-time "2026/04/01 11:36:53"

  # Parse a "completed" line directly
  python find_blocker.py server.log --completed "2026/04/01 11:44:28 pid 7432 completed 454s"

  # Trace the blocking chain deeper (re-analyze the blocker)
  python find_blocker.py server.log --pid 7432 --timestamp "2026/04/01 11:44:28" --chain
"""
    )
    parser.add_argument('logfile', help='Path to the Perforce server log file')
    parser.add_argument('--pid', type=int, help='PID of the slow command')
    parser.add_argument(
        '--timestamp',
        help='Completion timestamp (YYYY/MM/DD HH:MM:SS)'
    )
    parser.add_argument(
        '--start-time',
        help='Start timestamp of the command (YYYY/MM/DD HH:MM:SS)'
    )
    parser.add_argument(
        '--completed',
        help='The full "completed" line from the log'
    )
    parser.add_argument(
        '--chain', action='store_true',
        help='Trace the blocking chain (re-analyze the blocker too)'
    )
    parser.add_argument(
        '--quiet', action='store_true',
        help='Suppress progress messages'
    )

    args = parser.parse_args()

    # Parse input parameters
    target_pid = args.pid
    target_time = None
    start_time = None

    if args.completed:
        m = RE_COMPLETED.match(args.completed.strip())
        if not m:
            print("Error: Could not parse --completed line. Expected format:",
                  file=sys.stderr)
            print("  YYYY/MM/DD HH:MM:SS pid NNNN completed NNNs",
                  file=sys.stderr)
            sys.exit(1)
        target_time = parse_timestamp(m.group(1))
        target_pid = int(m.group(2))
    elif args.timestamp:
        target_time = parse_timestamp(args.timestamp)
    elif args.start_time:
        start_time = parse_timestamp(args.start_time)

    if not target_pid:
        parser.error("Must specify --pid or --completed")

    if not target_time and not start_time:
        parser.error(
            "Must specify --timestamp, --start-time, or --completed"
        )

    if not args.quiet:
        print(f"=" * 78, file=sys.stderr)
        print(f"  Finding blocker for pid {target_pid}", file=sys.stderr)
        if target_time:
            print(f"  Completion time: {target_time}", file=sys.stderr)
        if start_time:
            print(f"  Start time: {start_time}", file=sys.stderr)
        print(f"=" * 78, file=sys.stderr)

    # Phase 1: Find the target command
    if not args.quiet:
        print(f"\nPhase 1: Finding target command ...", file=sys.stderr)

    target = find_command_block(
        args.logfile, target_pid,
        target_time=target_time,
        start_time=start_time,
        quiet=args.quiet
    )

    if not target:
        print(f"Error: Could not find pid {target_pid} in the log file.",
              file=sys.stderr)
        sys.exit(1)

    if not target.lapse_seconds:
        print(f"Error: Found pid {target_pid} but no lapse/completion time.",
              file=sys.stderr)
        sys.exit(1)

    # Identify which locks the target was waiting on
    blocked_tables = target.get_lock_waits()
    if not blocked_tables:
        print(f"\nWarning: pid {target_pid} has no recorded lock waits.",
              file=sys.stderr)
        print(f"The command may have been slow due to:", file=sys.stderr)
        rpc_s = target.rpc_snd_seconds + target.rpc_rcv_seconds
        if rpc_s > 1:
            print(f"  - Large data transfer (RPC: {rpc_s:.1f}s, "
                  f"{target.rpc_size_in_mb:.0f}MB in / "
                  f"{target.rpc_size_out_mb:.0f}MB out)", file=sys.stderr)
        print(f"  - CPU-intensive processing", file=sys.stderr)
        print(f"  - Waiting on triggers", file=sys.stderr)

        print("\n" + "=" * 78)
        print("  PERFORCE LOCK BLOCKER ANALYSIS")
        print("=" * 78)
        print(f"\n  pid {target.pid} '{target.command}' "
              f"({target.lapse_seconds:.0f}s) had no significant lock waits.")
        print(f"\n  Command: {truncate(target.args, 100)}")
        print(f"  User:    {target.user}")
        total_ms = target.lapse_seconds * 1000
        rpc_ms = rpc_s * 1000
        print(f"\n  Time breakdown:")
        print(f"  {'Component':<45s} {'Duration':>10s}")
        print(f"  {'-'*45} {'-'*10}")
        if rpc_ms > 1000:
            desc = "RPC transfer"
            if target.rpc_size_in_mb > 1:
                desc += f" ({target.rpc_size_in_mb:.0f}MB in)"
            if target.rpc_size_out_mb > 1:
                desc += f" ({target.rpc_size_out_mb:.0f}MB out)"
            print(f"  {desc:<45s} ~{format_ms(rpc_ms):>8s}")
        other = max(0, total_ms - rpc_ms)
        print(f"  {'Processing / other':<45s} ~{format_ms(other):>8s}")
        print(f"  {'TOTAL':<45s} {format_ms(total_ms):>9s}")
        if target.total_scan_rows > 100000:
            print(f"\n  The command scanned "
                  f"{format_count(target.total_scan_rows)} rows — "
                  f"this may be the bottleneck.")
        print()
        sys.exit(0)

    # Phase 2: Find blockers
    if not args.quiet:
        print(f"\nPhase 2: Searching for blocking commands ...",
              file=sys.stderr)

    candidates = find_blockers(
        args.logfile, target, blocked_tables, quiet=args.quiet
    )
    ranked = rank_blockers(target, blocked_tables, candidates)

    # Phase 3: Chain analysis
    root_chain: Optional[List[Tuple[CommandInfo, str]]] = None
    if args.chain and ranked:
        top_cmd, _, _ = ranked[0]
        blocker_waits = top_cmd.get_lock_waits()
        significant = [(t, lt, w) for t, lt, w in blocker_waits
                        if w > 10000]
        if significant:
            if not args.quiet:
                print(f"\nPhase 3: Tracing blocking chain (pid "
                      f"{top_cmd.pid} was itself blocked) ...",
                      file=sys.stderr)
            b2_candidates = find_blockers(
                args.logfile, top_cmd, significant, quiet=args.quiet
            )
            ranked2 = rank_blockers(top_cmd, significant, b2_candidates)
            if ranked2:
                root_chain = [(ranked2[0][0], ranked2[0][1])]

    # Phase 4: Pile-up detection
    pileup_count = 0
    if ranked:
        top_cmd, _, _ = ranked[0]
        release_ts = top_cmd.end_timestamp
        if release_ts and not args.quiet:
            print(f"\nPhase {'4' if args.chain else '3'}: "
                  f"Detecting pile-up at {release_ts} ...", file=sys.stderr)
            pileup_count = detect_pileup(
                args.logfile, release_ts, window_seconds=3, quiet=args.quiet
            )
            if not args.quiet:
                print(f"  {pileup_count} commands completed within 3s of "
                      f"lock release", file=sys.stderr)

    # Generate report
    generate_report(target, ranked, blocked_tables,
                    root_chain=root_chain,
                    pileup_count=pileup_count)


if __name__ == '__main__':
    main()
# Change User Description Committed
#1 32517 Russell C. Jackson (Rusty) New log analysis files.