"""Anomaly detection layer — pluggable detector classes. Each detector consumes the rolling SQLite window and emits CandidateIncident events when thresholds are exceeded. """ from __future__ import annotations import logging from abc import ABC, abstractmethod from pathlib import Path from typing import Iterator from .models import CandidateIncident logger = logging.getLogger(__name__) class BaseDetector(ABC): """Abstract base for all detectors.""" @abstractmethod def scan(self, db_path: Path) -> Iterator[CandidateIncident]: """Scan the rolling SQLite window and yield any triggered incidents.""" ... class WedgeDetector(BaseDetector): """Detects database wedge scenarios. Signature: write lock on a specific db.* table with lock wait time exceeding threshold, combined with a process pile-up count above min_waiting_processes. """ def __init__( self, lock_wait_threshold_ms: float = 5000.0, min_waiting_processes: int = 3, ) -> None: """ Args: lock_wait_threshold_ms: Write lock wait time that triggers detection. min_waiting_processes: Minimum number of waiting processes required. """ self.lock_wait_threshold_ms = lock_wait_threshold_ms self.min_waiting_processes = min_waiting_processes def scan(self, db_path: Path) -> Iterator[CandidateIncident]: raise NotImplementedError class SlowCommandDetector(BaseDetector): """Detects individual commands exceeding compute or lock time thresholds.""" def __init__(self, compute_threshold_ms: float = 30000.0) -> None: self.compute_threshold_ms = compute_threshold_ms def scan(self, db_path: Path) -> Iterator[CandidateIncident]: raise NotImplementedError class ConnectionSpikeDetector(BaseDetector): """Detects abnormal spikes in connection count (configurable σ threshold).""" def __init__(self, sigma_threshold: float = 3.0) -> None: self.sigma_threshold = sigma_threshold def scan(self, db_path: Path) -> Iterator[CandidateIncident]: raise NotImplementedError