#!/usr/bin/env python3
"""
circuit-breaker-watchdog.py — RFC-002 §20 Breakers §1–§4 (cost-rate, self-traffic, per-thread + global write-rate).

All four breakers run on a single 5-minute LaunchAgent tick. The script reads
three data sources, evaluates each breaker independently, and shares the
sentinel + Telegram path:

  §1 cost-rate     — reads ~/.openclaw/workspace/state/oauth-usage-history.jsonl
                     produced by oauth_usage_collect.py (5-min cadence).
                     Tiers: STALE → CRITICAL → CRITICAL-PROJECTED → WARNING → NORMAL.

  §2 self-traffic  — reads /Users/openclaw/event-router/router.log produced by
                     event-router/router.js. Counts decision=drop AND
                     reason=self_authored entries in the last 5 min.
                     Tiers: STALE → CRITICAL → NORMAL.

Both share:
  - Sentinel at ~/.openclaw/workspace/state/breaker-paused.flag
    (CRITICAL writers; sentinel-present short-circuits both evaluators).
  - Telegram via `openclaw message send --channel telegram --target 8032472383`.
  - Log file /tmp/openclaw/circuit-breaker-watchdog.log.

§2 trigger shape — DELIBERATE §20 TEXT DEVIATION:
  RFC-002 §20 specifies "≥80% of decisions in a rolling 5-minute window".
  This implementation uses pure absolute-count: ≥5 self_authored drops in
  5 min. Rationale documented in THR-39 Phase 2 dialogue-log entry.
  Baseline filter_decision rate is near-zero in normal operation
  (Phase 1 inspection: ~1.7 self_authored/day), so any ratio computation
  requires a min-N gate that real loops will breach in absolute-count
  terms first. Loop signature is volume-shaped, not ratio-shaped.

Sentinel pause semantics still as documented for §1 (no external readers
yet; alert-suppression mechanism within this script). §2 STALE has no
internal dedup — re-fires every tick while router.log is stale. Tracked
for the same followup worker as §1's WARNING dedup (Point 9/10 thread).

Dedup sentinels (Worker B.42 / THR-42 four-way bundle — alert spam suppression):
  ~/.openclaw/workspace/state/breaker-warning-last-fire.flag
      §1 WARNING re-fire dedup.
  ~/.openclaw/workspace/state/breaker-stale-b1-last-fire.flag
      §1 STALE-FRESHNESS re-fire dedup.
  ~/.openclaw/workspace/state/breaker-stale-last-fire.flag
      §2 STALE re-fire dedup.
  All three share a 60-min cooldown (BREAKER_DEDUP_COOLDOWN_SECS).
  Schema: {ts, tier, cooldown_secs, values}. Distinct from the
  operator-facing breaker-paused.flag — these are dedup-internal.
  Fire on tier entry; suppress within cooldown; re-fire after cooldown;
  delete on tier recovery or escalation-to-CRITICAL.

Test mode:
  --simulate {normal,warning,critical,projected,stale,
              self_traffic_critical,router_stale}
    normal              — both §1 and §2 see synthetic normal data.
    warning|critical|
    projected|stale     — §1-focused; §2 sees synthetic normal.
    self_traffic_critical|
    router_stale        — §2-focused; §1 sees synthetic normal.
  --jsonl-path PATH            Override §1 JSONL path.
  --router-log-path PATH       Override §2 router.log path.
  --sentinel-path PATH         Override CRITICAL pause-flag path.
  --warning-sentinel-path PATH Override §1 WARNING dedup sentinel path.
  --stale-b1-sentinel-path PATH Override §1 STALE-FRESHNESS dedup path.
  --stale-b2-sentinel-path PATH Override §2 STALE dedup path.
  --log-path PATH              Override log path.
  --telegram-dry-run           Pass --dry-run to `openclaw message send`.
  --test-suffix STR            Appended to alert messages.

Exit codes:
  0  normal run (any tier)
  2  any CRITICAL fired but its Telegram delivery failed; max across §1/§2
"""

import argparse
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone

# ---------------------------------------------------------------------------
# §1 Constants (cost-rate)
# ---------------------------------------------------------------------------

REAL_JSONL_PATH    = "/Users/openclaw/.openclaw/workspace/state/oauth-usage-history.jsonl"
REAL_SENTINEL_PATH = "/Users/openclaw/.openclaw/workspace/state/breaker-paused.flag"
REAL_LOG_PATH      = "/tmp/openclaw/circuit-breaker-watchdog.log"

OPENCLAW_BIN     = "/opt/homebrew/bin/openclaw"
TELEGRAM_CHANNEL = "telegram"
TELEGRAM_TARGET  = "8032472383"
SEND_TIMEOUT_SECS = 15

START_INTERVAL_SECS    = 300        # must match plist StartInterval
STALE_FRESHNESS_SECS   = 2 * START_INTERVAL_SECS  # 600s
N_WINDOW               = 12         # ~60 min at 5-min cadence
N_RECENT_FOR_QUALITY   = 10
ERROR_RATIO_THRESHOLD  = 0.5        # >50%

WARNING_DAY_PCT          = 20.0
CRITICAL_DAY_PCT         = 80.0
PROJECTION_TARGET_PCT    = 100.0    # day_pct cap; project breach before reset
PROJECTION_MIN_ROWS      = 3        # need at least 3 evaluable rows for slope

# ---------------------------------------------------------------------------
# §2 Constants (self-traffic)
# ---------------------------------------------------------------------------

REAL_ROUTER_LOG_PATH = "/Users/openclaw/event-router/router.log"

B2_WINDOW_SECS              = 300       # 5-minute rolling window
B2_ABSOLUTE_COUNT_THRESHOLD = 5         # ≥K self_authored drops in window → CRITICAL
B2_STALE_LATEST_SECS        = 1800      # 3× inferred ~600s upstream health-check cadence
                                        # (cadence inferred from active router.log timestamps,
                                        #  not specified in router.js)
B2_TAIL_LINES               = 500       # max lines tailed per tick — safety margin
                                        # against a loop burst (baseline = 1-2 entries/window)

# ---------------------------------------------------------------------------
# Dedup-sentinel constants (Worker B.42 / THR-42 four-way bundle)
# ---------------------------------------------------------------------------

REAL_WARNING_SENTINEL_PATH    = "/Users/openclaw/.openclaw/workspace/state/breaker-warning-last-fire.flag"
REAL_STALE_B1_SENTINEL_PATH   = "/Users/openclaw/.openclaw/workspace/state/breaker-stale-b1-last-fire.flag"
REAL_STALE_B2_SENTINEL_PATH   = "/Users/openclaw/.openclaw/workspace/state/breaker-stale-last-fire.flag"

BREAKER_DEDUP_COOLDOWN_SECS   = 3600    # 60 min — shared across §1 WARNING,
                                        # §1 STALE-FRESHNESS, §2 STALE.
                                        # Not used by breaker-paused.flag
                                        # (CRITICAL pause-flag has its own
                                        # write-once semantics).

# ---------------------------------------------------------------------------
# §3 Constants (per-thread write-rate — RFC-002 §20 §3, alert-only)
# ---------------------------------------------------------------------------

REAL_WRITE_LOG_PATH         = "/tmp/openclaw/notion-writes.jsonl"
REAL_WRITE_SCRAPER_STATE    = "/Users/openclaw/.openclaw/workspace/state/notion-write-scraper-state.json"
REAL_B3_SENTINEL_PATH       = "/Users/openclaw/.openclaw/workspace/state/breaker-b3-last-fire.flag"
REAL_STALE_B3_SENTINEL_PATH = "/Users/openclaw/.openclaw/workspace/state/breaker-stale-b3-last-fire.flag"

B3_WINDOW_SECS              = 300       # 5-minute rolling window (spec §20 §3)
B3_THRESHOLD                = 3         # strict >: fire on the 4th write in window
B3_TAIL_LINES               = 2000      # defense-in-depth; 3x headroom over
                                        # plausible 5-min loop burst. Paired with
                                        # B.4 global-rate watcher + 60-min cooldown.
B3_STATE_STALE_SECS         = 300       # 5x scraper 60s cadence — clear failure signal
B3_CRITICAL_COOLDOWN_SECS   = 3600      # 60-min re-fire cooldown (matches B.42 pattern)
B3_STALE_COOLDOWN_SECS      = 3600      # 60-min re-fire cooldown

# ---------------------------------------------------------------------------
# §4 Constants (global write-rate — RFC-002 §20 §4, alert-only, per-consumer)
# ---------------------------------------------------------------------------

REAL_B4_SENTINEL_PATH       = "/Users/openclaw/.openclaw/workspace/state/breaker-b4-last-fire.flag"
REAL_STALE_B4_SENTINEL_PATH = "/Users/openclaw/.openclaw/workspace/state/breaker-stale-b4-last-fire.flag"

B4_WINDOW_SECS              = 60        # 60-second window (RFC-002 §20 §4 spec, verbatim)
B4_GLOBAL_THRESHOLD         = 20        # strict >: fire on the 21st write per consumer in window
B4_TAIL_LINES               = 2000      # parity with §3 (cheap; covers loop bursts)
B4_STATE_STALE_SECS         = 300       # parity with §3 (5x scraper 60s cadence)
B4_CRITICAL_COOLDOWN_SECS   = 3600      # 60-min re-fire cooldown (B.42 pattern)
B4_STALE_COOLDOWN_SECS      = 3600      # 60-min re-fire cooldown

# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------

_LOG_PATH = REAL_LOG_PATH  # mutated by main() once args are parsed

# Set by run_b3 ONLY when it actually fires B3_STALE this tick (not when
# it suppresses by cooldown). Read by run_b4 to dedup the shared-stale
# Telegram path. Reset at top of main() defensively (launchd already
# spawns a fresh process per tick).
_B3_FIRED_STALE_THIS_TICK = False

def log(msg):
    os.makedirs(os.path.dirname(_LOG_PATH), exist_ok=True)
    ts = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')
    with open(_LOG_PATH, 'a') as f:
        f.write(f"[{ts}] {msg}\n")

# ---------------------------------------------------------------------------
# Tail-read helper (shared by §1 JSONL + §2 router log)
# ---------------------------------------------------------------------------

def tail_lines(path, n):
    """Read approximately the last n lines of a text file efficiently."""
    if not os.path.exists(path):
        return []
    with open(path, 'rb') as f:
        f.seek(0, 2)
        size = f.tell()
        block = 64 * 1024
        data = b''
        while size > 0 and data.count(b'\n') <= n:
            read_size = min(block, size)
            size -= read_size
            f.seek(size)
            data = f.read(read_size) + data
        lines = data.splitlines()
    return [l.decode('utf-8', errors='replace') for l in lines[-n:]]

def parse_rows(lines):
    """Parse JSONL lines, dropping any that fail to decode."""
    rows = []
    for ln in lines:
        ln = ln.strip()
        if not ln:
            continue
        try:
            rows.append(json.loads(ln))
        except json.JSONDecodeError:
            log(f"SKIP malformed JSON: {ln[:120]}")
    return rows

# ---------------------------------------------------------------------------
# Router log parsing (§2)
# ---------------------------------------------------------------------------

# Match the bracketed ISO-Z timestamp produced by router.js logInfo/logDecision.
# Active log lines also have a leading bare timestamp (node prepends), but
# rotated files sometimes have only the bracketed form; the bracketed
# form is universally present.
_ROUTER_TS_RE = re.compile(r'\[(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z)\]')

def parse_router_lines(lines):
    """
    Parse router.log text lines into structured records.

    Returns list of dicts: {ts: datetime|None, kind: str, is_self_authored_drop: bool, raw: str}
      kind is the event type token after '[notion]' (e.g. 'filter_decision',
      'ignored', 'CRITICAL_fail_closed_enabled', 'fail_closed_alert', or 'other').
    Lines that don't match the router log shape are dropped silently.
    """
    out = []
    for raw in lines:
        raw = raw.rstrip('\n')
        if not raw.strip():
            continue
        m = _ROUTER_TS_RE.search(raw)
        if not m:
            continue
        ts = parse_ts(m.group(1))
        # Locate '[notion] <kind>'
        notion_idx = raw.find('[notion]')
        kind = 'other'
        if notion_idx != -1:
            after = raw[notion_idx + len('[notion]'):].lstrip()
            kind = after.split(' ', 1)[0] if after else 'other'

        is_self_authored_drop = False
        if kind == 'filter_decision':
            # Locate JSON payload (last '{' to last '}')
            jstart = raw.find('{', notion_idx)
            jend = raw.rfind('}')
            if 0 <= jstart < jend:
                try:
                    obj = json.loads(raw[jstart:jend + 1])
                    if (obj.get('decision') == 'drop' and
                        obj.get('reason') == 'self_authored'):
                        is_self_authored_drop = True
                except json.JSONDecodeError:
                    pass

        out.append({
            'ts': ts,
            'kind': kind,
            'is_self_authored_drop': is_self_authored_drop,
            'raw': raw[:200],  # truncated for log safety
        })
    return out

# ---------------------------------------------------------------------------
# Scraper write-log parsing (§3)
# ---------------------------------------------------------------------------

def parse_write_log_rows(lines):
    """
    Parse notion-writes.jsonl lines into structured records.
    Returns list of dicts: {ts_dt: datetime, agent_id, page_id_canonical,
                            method, endpoint, raw_ts}.
    Drops rows that fail JSON decode, lack a parseable ts, or lack
    agent_id. Empty page_id_canonical is PRESERVED in records but will
    be excluded from §3 grouping in check_b3_critical.
    """
    out = []
    for raw in lines:
        raw = raw.strip()
        if not raw:
            continue
        try:
            obj = json.loads(raw)
        except json.JSONDecodeError:
            continue
        ts_str = obj.get('ts') or ''
        ts_dt = parse_ts(ts_str)
        agent_id = obj.get('agent_id') or ''
        if not agent_id or ts_dt is None:
            continue
        out.append({
            'ts_dt': ts_dt,
            'agent_id': agent_id,
            'page_id_canonical': obj.get('page_id_canonical') or '',
            'method': obj.get('method') or '',
            'endpoint': obj.get('endpoint') or '',
            'raw_ts': ts_str,
        })
    return out

# ---------------------------------------------------------------------------
# Row classification (§1)
# ---------------------------------------------------------------------------

def is_evaluable(row):
    """A row is evaluable iff error is not true and day_pct/week_pct are both numeric."""
    if row.get('error') is True:
        return False
    if row.get('day_pct') is None or row.get('week_pct') is None:
        return False
    return True

def parse_ts(s):
    """
    Parse ISO 8601 timestamp; return aware datetime or None.

    Hardened against format variance:
      - Strips trailing 'Z' and replaces with '+00:00' (Python 3.9
        fromisoformat() does not accept 'Z').
      - Assumes UTC if the parsed datetime is naive.
    Today's collector (oauth_usage_collect.py:154) emits +00:00 explicitly,
    so the Z-stripping branch is preventive only.
    For §2, router.js emits Z-suffix timestamps — the Z-stripping is
    load-bearing there.
    """
    if not s:
        return None
    try:
        if isinstance(s, str) and s.endswith('Z'):
            s = s[:-1] + '+00:00'
        dt = datetime.fromisoformat(s)
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        return dt
    except (ValueError, TypeError):
        return None

# ---------------------------------------------------------------------------
# §1 tier evaluation (cost-rate)
# ---------------------------------------------------------------------------

def check_stale(rows, now):
    """
    Return ('freshness'|'quality'|'both', reason_str) or (None, None).
    'freshness' means the latest row is too old → caller should NOT
    proceed to tier evaluation. 'quality' means error ratio is high
    but the latest row may still be fresh → caller MAY proceed.
    """
    if not rows:
        return ('freshness', 'no rows in JSONL')

    latest = rows[-1]
    latest_ts = parse_ts(latest.get('timestamp'))
    age_secs = (now - latest_ts).total_seconds() if latest_ts else None

    freshness_bad = age_secs is None or age_secs > STALE_FRESHNESS_SECS

    recent = rows[-N_RECENT_FOR_QUALITY:]
    if recent:
        err_count = sum(1 for r in recent if r.get('error') is True)
        err_ratio = err_count / len(recent)
    else:
        err_ratio = 0.0
    quality_bad = err_ratio > ERROR_RATIO_THRESHOLD

    if freshness_bad and quality_bad:
        return ('both',
                f'latest_age={age_secs}s (>{STALE_FRESHNESS_SECS}s) '
                f'AND error_ratio={err_ratio:.0%} of last {len(recent)}')
    if freshness_bad:
        return ('freshness',
                f'latest_age={age_secs}s (>{STALE_FRESHNESS_SECS}s)')
    if quality_bad:
        return ('quality',
                f'error_ratio={err_ratio:.0%} of last {len(recent)}')
    return (None, None)

def check_critical(evaluable):
    if not evaluable:
        return None
    latest = evaluable[-1]
    if latest['day_pct'] >= CRITICAL_DAY_PCT:
        return latest
    return None

def check_projected(evaluable):
    """
    Linear regression of day_pct vs. time over the evaluable window.
    Returns (latest_row, projected_hours_to_100) if breach projected
    before reset, else None.
    """
    if len(evaluable) < PROJECTION_MIN_ROWS:
        return None

    pts = []
    for r in evaluable:
        ts = parse_ts(r.get('timestamp'))
        if ts is None:
            continue
        pts.append((ts.timestamp(), float(r['day_pct'])))
    if len(pts) < PROJECTION_MIN_ROWS:
        return None

    n = len(pts)
    sum_x = sum(p[0] for p in pts)
    sum_y = sum(p[1] for p in pts)
    sum_xy = sum(p[0] * p[1] for p in pts)
    sum_xx = sum(p[0] * p[0] for p in pts)
    denom = n * sum_xx - sum_x * sum_x
    if denom == 0:
        return None
    slope_per_sec = (n * sum_xy - sum_x * sum_y) / denom  # %/sec
    if slope_per_sec <= 0:
        return None  # flat or decreasing — no projected breach

    latest = evaluable[-1]
    headroom_pct = PROJECTION_TARGET_PCT - latest['day_pct']
    if headroom_pct <= 0:
        # already at/above 100% — CRITICAL would have caught this
        return None
    secs_to_100 = headroom_pct / slope_per_sec
    hours_to_100 = secs_to_100 / 3600.0

    hours_until_reset = latest.get('hours_until_reset')
    if hours_until_reset is None:
        return None
    if hours_to_100 < hours_until_reset:
        return (latest, hours_to_100)
    return None

def check_warning(evaluable):
    if len(evaluable) < 2:
        return None
    if (evaluable[-1]['day_pct'] >= WARNING_DAY_PCT and
        evaluable[-2]['day_pct'] >= WARNING_DAY_PCT):
        return evaluable[-1]
    return None

# ---------------------------------------------------------------------------
# §2 tier evaluation (self-traffic)
# ---------------------------------------------------------------------------

def check_b2_critical(router_records, now):
    """
    Returns (count, [self_authored_records]) if CRITICAL fires (count ≥ K
    self_authored drops in last B2_WINDOW_SECS), else (count, []).
    """
    cutoff = now.timestamp() - B2_WINDOW_SECS
    self_drops = [
        r for r in router_records
        if r['is_self_authored_drop']
        and r['ts'] is not None
        and r['ts'].timestamp() >= cutoff
    ]
    if len(self_drops) >= B2_ABSOLUTE_COUNT_THRESHOLD:
        return (len(self_drops), self_drops)
    return (len(self_drops), [])

def check_b2_stale(router_records, now):
    """
    Returns (age_secs_or_marker, latest_ts) if STALE fires (latest ts older
    than B2_STALE_LATEST_SECS, or no parseable entries), else (None, None).
    Operates on ALL router log records (any kind), since filter_decisions
    are sparse and the presence of *any* recent entry — including an
    `ignored` health-check — proves the router is still writing.
    """
    timestamped = [r for r in router_records if r['ts'] is not None]
    if not timestamped:
        return ('no_entries', None)
    latest = max(r['ts'] for r in timestamped)
    age_secs = (now - latest).total_seconds()
    if age_secs > B2_STALE_LATEST_SECS:
        return (age_secs, latest)
    return (None, None)

# ---------------------------------------------------------------------------
# §3 tier evaluation (per-thread write-rate)
# ---------------------------------------------------------------------------

def check_b3_critical(write_records, now):
    """
    Find the highest-volume (agent_id, page_id_canonical) tuple within
    the last B3_WINDOW_SECS.
    Returns (top_key_or_None, top_count, group_count, top_records).
    top_records is non-empty iff top_count > B3_THRESHOLD (strict >).

    Empty page_id_canonical rows are EXCLUDED from grouping (per
    Cowork Phase 1 ACK Q3; dangling-variable pattern from INS.2). Their
    absence prevents false-positive on "4 PATCHes to unknown pages"
    which may be 4 different pages.
    """
    cutoff = now.timestamp() - B3_WINDOW_SECS
    window = [r for r in write_records
              if r['ts_dt'] is not None
              and r['ts_dt'].timestamp() >= cutoff
              and r['page_id_canonical']]  # exclude empty (Q3 ACK)
    if not window:
        return (None, 0, 0, [])
    groups = {}
    for r in window:
        groups.setdefault((r['agent_id'], r['page_id_canonical']), []).append(r)
    top_key, top_records = max(groups.items(), key=lambda kv: len(kv[1]))
    count = len(top_records)
    if count > B3_THRESHOLD:
        return (top_key, count, len(groups), top_records)
    return (top_key, count, len(groups), [])


def check_b3_stale(state_file_path, now):
    """
    Returns (age_secs, latest_scan_ts_iso_or_None) if the scraper state
    file mtime is older than B3_STATE_STALE_SECS, else (None, None).
    Missing state file → treated as stale (age_secs = threshold + 1).
    """
    try:
        mtime_epoch = os.path.getmtime(state_file_path)
    except OSError:
        return (B3_STATE_STALE_SECS + 1, None)
    age_secs = now.timestamp() - mtime_epoch
    if age_secs <= B3_STATE_STALE_SECS:
        return (None, None)
    latest_scan_ts = None
    try:
        with open(state_file_path, 'r') as f:
            data = json.load(f)
        scans = [e.get('last_scan_ts') for e
                 in data.get('session_files', {}).values()
                 if e.get('last_scan_ts')]
        if scans:
            latest_scan_ts = max(scans)
    except (OSError, json.JSONDecodeError):
        pass
    return (age_secs, latest_scan_ts)

# ---------------------------------------------------------------------------
# §4 tier evaluation (global write-rate, per-consumer)
# ---------------------------------------------------------------------------

def check_b4_critical(write_records, now):
    """
    Per-consumer (agent_id) write-rate over the last B4_WINDOW_SECS.
    Aggregation rule per RFC-002 §20 §4: total writes from a single
    consumer to any Notion destination, in any rolling 60-second window.
    Returns (top_agent_or_None, top_count, agent_count, top_records).
    top_records is non-empty iff top_count > B4_GLOBAL_THRESHOLD (strict >).

    Differs from §3 in three ways:
      - GROUP BY agent_id only (not (agent_id, page_id))
      - Window 60s (not 300s)
      - Includes rows with empty page_id_canonical (§4 measures consumer
        behavior; destination uniqueness is not the signature §4 catches —
        a runaway consumer hammering many destinations is exactly what
        §4 must detect, complementary to §3's same-page hammer)
    """
    cutoff = now.timestamp() - B4_WINDOW_SECS
    window = [r for r in write_records
              if r['ts_dt'] is not None
              and r['ts_dt'].timestamp() >= cutoff]
    if not window:
        return (None, 0, 0, [])
    groups = {}
    for r in window:
        groups.setdefault(r['agent_id'], []).append(r)
    top_agent, top_records = max(groups.items(), key=lambda kv: len(kv[1]))
    count = len(top_records)
    if count > B4_GLOBAL_THRESHOLD:
        return (top_agent, count, len(groups), top_records)
    return (top_agent, count, len(groups), [])


def check_b4_stale(state_file_path, now):
    """
    §4 STALE check — operates on the same scraper state file as §3.
    Threshold uses B4_STATE_STALE_SECS (currently equal to B3 value but
    independent for clean parameter ownership).
    Returns (age_secs, latest_scan_ts_iso_or_None) if state file mtime is
    older than B4_STATE_STALE_SECS, else (None, None).
    Missing state file → treated as stale (age_secs = threshold + 1).
    """
    try:
        mtime_epoch = os.path.getmtime(state_file_path)
    except OSError:
        return (B4_STATE_STALE_SECS + 1, None)
    age_secs = now.timestamp() - mtime_epoch
    if age_secs <= B4_STATE_STALE_SECS:
        return (None, None)
    latest_scan_ts = None
    try:
        with open(state_file_path, 'r') as f:
            data = json.load(f)
        scans = [e.get('last_scan_ts') for e
                 in data.get('session_files', {}).values()
                 if e.get('last_scan_ts')]
        if scans:
            latest_scan_ts = max(scans)
    except (OSError, json.JSONDecodeError):
        pass
    return (age_secs, latest_scan_ts)

# ---------------------------------------------------------------------------
# Side effects
# ---------------------------------------------------------------------------

def send_telegram(message, dry_run=False):
    cmd = [
        OPENCLAW_BIN, "message", "send",
        "--channel", TELEGRAM_CHANNEL,
        "--target", TELEGRAM_TARGET,
        "--message", message,
    ]
    if dry_run:
        cmd.append("--dry-run")
    try:
        result = subprocess.run(
            cmd, capture_output=True, text=True, timeout=SEND_TIMEOUT_SECS
        )
        if result.returncode != 0:
            log(f"TELEGRAM_SEND_FAILED rc={result.returncode} "
                f"stderr={result.stderr.strip()[:200]}")
            return False
        log(f"TELEGRAM_SENT dry_run={dry_run} chars={len(message)}")
        return True
    except subprocess.TimeoutExpired:
        log(f"TELEGRAM_SEND_TIMEOUT after {SEND_TIMEOUT_SECS}s")
        return False
    except Exception as e:
        log(f"TELEGRAM_SEND_EXCEPTION {type(e).__name__}: {e}")
        return False

def write_sentinel(path, payload):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    tmp = path + ".tmp"
    with open(tmp, 'w') as f:
        json.dump(payload, f, indent=2)
        f.write("\n")
    os.rename(tmp, path)
    log(f"SENTINEL_WRITTEN {path}")

# ---------------------------------------------------------------------------
# Time formatting (Worker B.42 / THR-42 fmt_hours)
# ---------------------------------------------------------------------------

def fmt_hours(h):
    """
    Render a float-hours value as scannable 'Xh YYm' / 'YYm' / 'reset due' / 'unknown'.
    Used in Telegram bodies and log lines for hours_until_reset and projected
    hours-to-breach. Sub-minute precision intentionally dropped (cadence is 5 min).
    """
    if h is None:
        return "unknown"
    if h <= 0:
        return "reset due"
    total_mins = int(round(h * 60))
    hh, mm = divmod(total_mins, 60)
    return f"{mm}m" if hh == 0 else f"{hh}h {mm:02d}m"

# ---------------------------------------------------------------------------
# Dedup-sentinel helpers (Worker B.42 / THR-42 four-way bundle)
# ---------------------------------------------------------------------------

def read_dedup_sentinel(path):
    """
    Returns parsed sentinel dict or None. Fail-open on every error path.
    Schema: {ts, tier, cooldown_secs, values}.
    """
    try:
        with open(path, 'r') as f:
            data = json.load(f)
        for k in ('ts', 'tier', 'cooldown_secs', 'values'):
            if k not in data:
                log(f"SENTINEL_READ_FAILED path={path} reason=missing_key:{k}")
                return None
        return data
    except FileNotFoundError:
        return None
    except (json.JSONDecodeError, OSError) as e:
        log(f"SENTINEL_READ_FAILED path={path} error={type(e).__name__}: {e}")
        return None

def write_dedup_sentinel(path, tier, values, cooldown_secs):
    """Atomic write of dedup sentinel. Atomicity matches write_sentinel()."""
    payload = {
        "ts": datetime.now(timezone.utc).isoformat(),
        "tier": tier,
        "cooldown_secs": cooldown_secs,
        "values": values,
    }
    os.makedirs(os.path.dirname(path), exist_ok=True)
    tmp = path + ".tmp"
    with open(tmp, 'w') as f:
        json.dump(payload, f, indent=2)
        f.write("\n")
    os.rename(tmp, path)

def is_within_cooldown(sentinel, now, cooldown_secs):
    """Fail-open: missing/malformed ts -> False (treat as expired)."""
    sentinel_ts = parse_ts(sentinel.get('ts'))
    if sentinel_ts is None:
        return False
    elapsed = (now - sentinel_ts).total_seconds()
    return elapsed < cooldown_secs

def cooldown_elapsed_secs(sentinel, now):
    """Returns elapsed seconds since sentinel write, or 0 on fail-open."""
    sentinel_ts = parse_ts(sentinel.get('ts'))
    if sentinel_ts is None:
        return 0
    return (now - sentinel_ts).total_seconds()

def delete_dedup_sentinel(path):
    """Idempotent. FileNotFoundError silently ignored; other OSError logged."""
    try:
        os.remove(path)
    except FileNotFoundError:
        pass
    except OSError as e:
        log(f"SENTINEL_DELETE_FAILED path={path} error={type(e).__name__}: {e}")

# ---------------------------------------------------------------------------
# Simulation (test mode)
# ---------------------------------------------------------------------------

def synth_rows(scenario, now):
    """Generate synthetic JSONL rows for §1 --simulate."""
    base = now.timestamp()
    def row(offset_secs, **overrides):
        ts = datetime.fromtimestamp(base + offset_secs, tz=timezone.utc).isoformat()
        r = {
            "timestamp": ts,
            "day_pct": 0.0, "week_pct": 1.0,
            "hours_until_reset": 4.0,
            "credits_remaining": None,
            "raw_output": "<synthetic>"
        }
        r.update(overrides)
        return r

    if scenario == 'normal':
        return [row(-i*300, day_pct=1.0) for i in range(12, 0, -1)]
    if scenario == 'warning':
        # Flat 22.5% — slope=0 ensures projected does NOT trip first.
        return [row(-i*300, day_pct=22.5) for i in range(12, 0, -1)]
    if scenario == 'critical':
        rows = [row(-i*300, day_pct=70.0) for i in range(12, 0, -1)]
        rows[-1]['day_pct'] = 85.0
        return rows
    if scenario == 'projected':
        # Steep linear climb that breaches 100% before reset (hours_until_reset=2.0)
        rows = []
        for i in range(12, 0, -1):
            offset = -i * 300
            # day_pct climbs ~3%/5min = 36%/hr
            pct = max(0.0, 50.0 - i * 3.0)
            rows.append(row(offset, day_pct=pct, hours_until_reset=2.0))
        return rows
    if scenario == 'stale':
        # Last row is 30 min old → freshness trigger
        return [row(-i*300 - 1800, day_pct=10.0) for i in range(12, 0, -1)]
    raise ValueError(f"unknown scenario: {scenario}")

def synth_router_lines(scenario, now):
    """
    Generate synthetic router.log text lines for §2 --simulate.

    Lines match the real format produced by router.js so the parser is
    exercised end-to-end:
      <ts-iso-Z> [<ts-iso-Z>] [notion] <kind> <json>
    """
    base = now.timestamp()
    def ts_at(offset_secs):
        return datetime.fromtimestamp(base + offset_secs, tz=timezone.utc).isoformat().replace('+00:00', 'Z')

    if scenario == 'normal':
        # One health-check from ~5 min ago → §2 STALE doesn't fire (recent),
        # §2 CRITICAL doesn't fire (zero self_authored drops).
        ts = ts_at(-300)
        return [
            f'{ts} [{ts}] [notion] ignored {{"reason":"event_type_not_forwarded","eventId":"evt-synth-1","eventType":"health-check"}}'
        ]
    if scenario == 'self_traffic_critical':
        # K=5 self_authored drops, all within the last 5 minutes,
        # plus a recent health-check so STALE doesn't co-fire.
        out = []
        recent = ts_at(-30)
        out.append(
            f'{recent} [{recent}] [notion] ignored {{"reason":"event_type_not_forwarded","eventId":"evt-synth-hc","eventType":"health-check"}}'
        )
        for i in range(B2_ABSOLUTE_COUNT_THRESHOLD):
            ts = ts_at(-(i * 30 + 60))  # spread across last ~3 min
            out.append(
                f'{ts} [{ts}] [notion] filter_decision '
                f'{{"eventId":"evt-synth-self-{i}","stage":"self_authorship",'
                f'"decision":"drop","reason":"self_authored","eventType":"page.content_updated"}}'
            )
        return out
    if scenario == 'router_stale':
        # Latest entry is 1 hour old → STALE-FRESHNESS trigger (>1800s).
        ts = ts_at(-3600)
        return [
            f'{ts} [{ts}] [notion] ignored {{"reason":"event_type_not_forwarded","eventId":"evt-synth-old","eventType":"health-check"}}'
        ]
    raise ValueError(f"unknown §2 scenario: {scenario}")

def synth_write_rows(scenario, now):
    """Generate synthetic notion-writes.jsonl lines for §3 --simulate."""
    base = now.timestamp()
    def line(offset_secs, agent_id='worker',
             page_id='aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa',
             method='PATCH', endpoint=None):
        ts = datetime.fromtimestamp(base + offset_secs, tz=timezone.utc
                                    ).isoformat().replace('+00:00', 'Z')
        ep = endpoint if endpoint is not None else f'/v1/pages/{page_id}'
        obj = {
            "ts": ts, "agent_id": agent_id,
            "session_id": "11111111-1111-4111-8111-111111111111",
            "tool_call_id": f"call_synth{int(offset_secs):+d}|fc_synth",
            "method": method, "endpoint": ep,
            "page_id_canonical": page_id,
            "exit_code": 0, "duration_ms": 100, "cwd": "/tmp/synth",
        }
        return json.dumps(obj)

    if scenario == 'b3_normal':
        return [line(-240), line(-60)]  # 2 writes — under K=3
    if scenario == 'b3_critical':
        return [line(-240), line(-180), line(-90), line(-30)]  # 4 — fires
    if scenario == 'b3_critical_dangling':
        # 4 writes with EMPTY page_id_canonical — MUST NOT fire (Q3 ACK)
        return [line(o, page_id='',
                     endpoint='/v1/blocks/$PAGE_ID/children')
                for o in (-240, -180, -90, -30)]
    if scenario == 'b4_normal':
        # 5 writes from one agent across distinct page_ids, all in last 60s.
        # Sub-K (5 < 20) → §4 returns NORMAL. §3 sees ≤1 per (agent,page).
        return [line(-(i*10 + 5), agent_id='worker',
                     page_id=f'cccccccc-cccc-cccc-cccc-{i:012d}')
                for i in range(5)]
    if scenario == 'b4_critical':
        # 21 writes from one agent across 21 distinct page_ids, all in last
        # 60s. count=21 > K=20 → §4 fires B4_CRITICAL. §3 sees 1 per group
        # (no §3 fire) so b4_critical isolates §4 cleanly.
        return [line(-(i*2 + 1), agent_id='worker',
                     page_id=f'eeeeeeee-eeee-eeee-eeee-{i:012d}')
                for i in range(21)]
    if scenario == 'b4_stale':
        # No synthetic rows; run_b4 injects state_age artificially.
        return []
    raise ValueError(f"unknown §3 scenario: {scenario}")

# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def fmt_values(row):
    return (f"day_pct={row.get('day_pct')}% "
            f"week_pct={row.get('week_pct')}% "
            f"hours_until_reset={fmt_hours(row.get('hours_until_reset'))} "
            f"ts={row.get('timestamp')}")

# ---------------------------------------------------------------------------
# §1 main body (cost-rate evaluator)
# ---------------------------------------------------------------------------

def run_b1(args, now):
    """Run §1 cost-rate evaluator. Returns exit code (0 or 2).

    Log tokens here are bit-identical to the pre-B.2 implementation
    (no B1_ prefix) to preserve observability symmetry — existing greps,
    monitoring, and human-eyeball habits trained on the original tokens
    continue to work. Asymmetry with §2's B2_ prefix is deliberate.
    """
    # Read rows
    if args.simulate in ('normal', 'warning', 'critical', 'projected', 'stale'):
        rows = synth_rows(args.simulate, now)
        log(f"SIMULATE scenario={args.simulate} rows={len(rows)}")
    elif args.simulate in ('self_traffic_critical', 'router_stale'):
        rows = synth_rows('normal', now)
        log(f"SIMULATE scenario=normal (companion to §2 simulate={args.simulate}) rows={len(rows)}")
    elif args.simulate in ('b3_normal', 'b3_critical',
                           'b3_critical_dangling', 'b3_stale',
                           'b4_normal', 'b4_critical', 'b4_stale'):
        rows = synth_rows('normal', now)
        log(f"SIMULATE scenario=normal (companion to §3 simulate={args.simulate}) rows={len(rows)}")
    else:
        lines = tail_lines(args.jsonl_path, max(N_WINDOW, N_RECENT_FOR_QUALITY))
        rows = parse_rows(lines)

    # STALE check first
    stale_kind, stale_reason = check_stale(rows, now)
    if stale_kind in ('freshness', 'both'):
        sentinel = read_dedup_sentinel(args.stale_b1_sentinel_path)
        if sentinel and is_within_cooldown(sentinel, now, BREAKER_DEDUP_COOLDOWN_SECS):
            elapsed_h = cooldown_elapsed_secs(sentinel, now) / 3600.0
            current_age = -1
            if rows:
                latest_ts = parse_ts(rows[-1].get('timestamp'))
                if latest_ts:
                    current_age = int((now - latest_ts).total_seconds())
            log(f"SUPPRESS STALE-FRESHNESS "
                f"elapsed={fmt_hours(elapsed_h)} "
                f"cooldown={fmt_hours(BREAKER_DEDUP_COOLDOWN_SECS / 3600)} "
                f"current_age_secs={current_age}")
            return 0
        # Fire (fresh OR past cooldown)
        msg = f"[BREAKER §1 STALE] Cost-rate data not fresh: {stale_reason}{args.test_suffix}"
        log(f"FIRE STALE-FRESHNESS reason={stale_reason}")
        latest_jsonl_ts = rows[-1].get('timestamp') if rows else None
        latest_age_secs = None
        if latest_jsonl_ts:
            parsed = parse_ts(latest_jsonl_ts)
            if parsed:
                latest_age_secs = int((now - parsed).total_seconds())
        write_dedup_sentinel(args.stale_b1_sentinel_path, "STALE-FRESHNESS",
                             {"latest_jsonl_ts": latest_jsonl_ts,
                              "latest_age_secs": latest_age_secs,
                              "reason": stale_reason},
                             BREAKER_DEDUP_COOLDOWN_SECS)
        send_telegram(msg, dry_run=args.telegram_dry_run)
        return 0  # cannot trust tier evaluation when data is stale

    # STALE-FRESHNESS recovered (kind is None or 'quality') — clear sentinel if present
    if read_dedup_sentinel(args.stale_b1_sentinel_path) is not None:
        delete_dedup_sentinel(args.stale_b1_sentinel_path)
        log("CLEAR STALE-FRESHNESS_SENTINEL reason=jsonl_fresh")

    # STALE-QUALITY 5th-sentinel candidate — see Notion followup
    if stale_kind == 'quality':
        msg = f"[BREAKER §1 STALE] Cost-rate data quality degraded: {stale_reason}{args.test_suffix}"
        log(f"NOTE STALE-QUALITY reason={stale_reason}; continuing to tier eval")
        send_telegram(msg, dry_run=args.telegram_dry_run)
        # continue — latest row may still be evaluable

    # Filter to evaluable rows; log skips
    evaluable = []
    for r in rows:
        if is_evaluable(r):
            evaluable.append(r)
        else:
            reason = 'error=true' if r.get('error') is True else 'null_metric'
            log(f"SKIP_ROW {reason} ts={r.get('timestamp')}")

    if not evaluable:
        log("NO_EVALUABLE_ROWS exiting normal")
        return 0

    # Tier checks (priority order)
    crit = check_critical(evaluable)
    if crit:
        # Defensive cleanup of lower-tier dedup sentinels on escalation
        if read_dedup_sentinel(args.warning_sentinel_path) is not None:
            delete_dedup_sentinel(args.warning_sentinel_path)
            log("CLEAR WARNING_SENTINEL reason=escalated_to_CRITICAL")
        if read_dedup_sentinel(args.stale_b1_sentinel_path) is not None:
            delete_dedup_sentinel(args.stale_b1_sentinel_path)
            log("CLEAR STALE-FRESHNESS_SENTINEL reason=escalated_to_CRITICAL")
        msg = (f"[BREAKER §1 CRITICAL] Cost rate breached {CRITICAL_DAY_PCT}% threshold. "
               f"{fmt_values(crit)}{args.test_suffix}")
        log(f"FIRE CRITICAL {fmt_values(crit)}")
        # Write sentinel FIRST so durable state is recorded even if telegram fails.
        write_sentinel(args.sentinel_path, {
            "trigger": "CRITICAL",
            "timestamp": now.isoformat(),
            "values": {k: crit.get(k) for k in
                       ('day_pct', 'week_pct', 'hours_until_reset',
                        'credits_remaining', 'timestamp')},
            "reset_instructions": (
                f"Remove this file to re-arm Breakers: "
                f"rm {args.sentinel_path}"
            ),
        })
        ok = send_telegram(msg, dry_run=args.telegram_dry_run)
        if not ok:
            log("CRITICAL_ALERT_NO_TELEGRAM sentinel written but telegram delivery failed")
            return 2
        return 0

    proj = check_projected(evaluable)
    if proj:
        latest, hrs = proj
        # Defensive cleanup of lower-tier dedup sentinels on escalation
        if read_dedup_sentinel(args.warning_sentinel_path) is not None:
            delete_dedup_sentinel(args.warning_sentinel_path)
            log("CLEAR WARNING_SENTINEL reason=escalated_to_CRITICAL-PROJECTED")
        if read_dedup_sentinel(args.stale_b1_sentinel_path) is not None:
            delete_dedup_sentinel(args.stale_b1_sentinel_path)
            log("CLEAR STALE-FRESHNESS_SENTINEL reason=escalated_to_CRITICAL-PROJECTED")
        msg = (f"[BREAKER §1 CRITICAL-PROJECTED] Linear extrapolation projects "
               f"day_pct=100% in {fmt_hours(hrs)}, before reset in "
               f"{fmt_hours(latest.get('hours_until_reset'))}. "
               f"{fmt_values(latest)}{args.test_suffix}")
        log(f"FIRE CRITICAL-PROJECTED hrs_to_100={fmt_hours(hrs)} {fmt_values(latest)}")
        write_sentinel(args.sentinel_path, {
            "trigger": "CRITICAL-PROJECTED",
            "timestamp": now.isoformat(),
            "values": {
                **{k: latest.get(k) for k in
                   ('day_pct', 'week_pct', 'hours_until_reset',
                    'credits_remaining', 'timestamp')},
                "projected_hours_to_100": hrs,
            },
            "reset_instructions": (
                f"Remove this file to re-arm Breakers: "
                f"rm {args.sentinel_path}"
            ),
        })
        ok = send_telegram(msg, dry_run=args.telegram_dry_run)
        if not ok:
            log("CRITICAL_ALERT_NO_TELEGRAM sentinel written but telegram delivery failed")
            return 2
        return 0

    warn = check_warning(evaluable)
    if warn:
        sentinel = read_dedup_sentinel(args.warning_sentinel_path)
        if sentinel and is_within_cooldown(sentinel, now, BREAKER_DEDUP_COOLDOWN_SECS):
            elapsed_h = cooldown_elapsed_secs(sentinel, now) / 3600.0
            log(f"SUPPRESS WARNING "
                f"elapsed={fmt_hours(elapsed_h)} "
                f"cooldown={fmt_hours(BREAKER_DEDUP_COOLDOWN_SECS / 3600)} "
                f"current_day_pct={warn['day_pct']}%")
            return 0
        # Fire (fresh OR past cooldown)
        msg = (f"[BREAKER §1 WARNING] Cost rate >= {WARNING_DAY_PCT}% for 2 "
               f"consecutive evaluable data rows. {fmt_values(warn)}{args.test_suffix}")
        log(f"FIRE WARNING {fmt_values(warn)}")
        write_dedup_sentinel(args.warning_sentinel_path, "WARNING",
                             {"day_pct": warn['day_pct'],
                              "week_pct": warn['week_pct'],
                              "hours_until_reset": warn.get('hours_until_reset'),
                              "source_row_ts": warn.get('timestamp')},
                             BREAKER_DEDUP_COOLDOWN_SECS)
        send_telegram(msg, dry_run=args.telegram_dry_run)
        return 0

    # NORMAL fall-through — clear WARNING sentinel if it survived from a prior tick
    if read_dedup_sentinel(args.warning_sentinel_path) is not None:
        delete_dedup_sentinel(args.warning_sentinel_path)
        log("CLEAR WARNING_SENTINEL reason=tier_returned_to_normal")
    log(f"NORMAL latest day_pct={evaluable[-1]['day_pct']}% "
        f"week_pct={evaluable[-1]['week_pct']}% "
        f"evaluable_rows={len(evaluable)}/{len(rows)}")
    return 0

# ---------------------------------------------------------------------------
# §2 main body (self-traffic evaluator)
# ---------------------------------------------------------------------------

def run_b2(args, now):
    """Run §2 self-traffic evaluator. Returns exit code (0 or 2)."""
    if args.simulate in ('self_traffic_critical', 'router_stale'):
        router_lines = synth_router_lines(args.simulate, now)
        log(f"SIMULATE_B2 scenario={args.simulate} lines={len(router_lines)}")
    elif args.simulate == 'normal':
        router_lines = synth_router_lines('normal', now)
        log(f"SIMULATE_B2 scenario=normal lines={len(router_lines)}")
    elif args.simulate in ('warning', 'critical', 'projected', 'stale'):
        router_lines = synth_router_lines('normal', now)
        log(f"SIMULATE_B2 scenario=normal (companion to §1 simulate={args.simulate}) lines={len(router_lines)}")
    elif args.simulate in ('b3_normal', 'b3_critical',
                           'b3_critical_dangling', 'b3_stale',
                           'b4_normal', 'b4_critical', 'b4_stale'):
        router_lines = synth_router_lines('normal', now)
        log(f"SIMULATE_B2 scenario=normal (companion to §3 simulate={args.simulate}) lines={len(router_lines)}")
    else:
        router_lines = tail_lines(args.router_log_path, B2_TAIL_LINES)

    records = parse_router_lines(router_lines)

    # STALE check first — early-exit (parallels §1 STALE-FRESHNESS handling)
    age_or_marker, latest_ts = check_b2_stale(records, now)
    if age_or_marker is not None:
        sentinel = read_dedup_sentinel(args.stale_b2_sentinel_path)
        if sentinel and is_within_cooldown(sentinel, now, BREAKER_DEDUP_COOLDOWN_SECS):
            elapsed_h = cooldown_elapsed_secs(sentinel, now) / 3600.0
            current_age = -1 if age_or_marker == 'no_entries' else int(age_or_marker)
            log(f"SUPPRESS B2_STALE "
                f"elapsed={fmt_hours(elapsed_h)} "
                f"cooldown={fmt_hours(BREAKER_DEDUP_COOLDOWN_SECS / 3600)} "
                f"current_age_secs={current_age}")
            return 0
        # Fire (fresh OR past cooldown)
        if age_or_marker == 'no_entries':
            reason = 'no entries with parseable timestamp in tailed router log'
            latest_age_secs = None
        else:
            reason = (f'latest_age={age_or_marker:.0f}s (>{B2_STALE_LATEST_SECS}s); '
                      f'latest_ts={latest_ts.isoformat() if latest_ts else None}')
            latest_age_secs = int(age_or_marker)
        msg = f"[BREAKER §2 STALE] Router log not fresh: {reason}{args.test_suffix}"
        log(f"FIRE B2_STALE reason={reason}")
        write_dedup_sentinel(args.stale_b2_sentinel_path, "B2_STALE",
                             {"latest_router_ts": latest_ts.isoformat() if latest_ts else None,
                              "latest_age_secs": latest_age_secs,
                              "reason": reason},
                             BREAKER_DEDUP_COOLDOWN_SECS)
        send_telegram(msg, dry_run=args.telegram_dry_run)
        return 0  # cannot trust §2 evaluation when router data is stale (parallels §1 STALE-FRESHNESS)

    # Router fresh — clear §2 STALE sentinel if it survived from a prior tick
    if read_dedup_sentinel(args.stale_b2_sentinel_path) is not None:
        delete_dedup_sentinel(args.stale_b2_sentinel_path)
        log("CLEAR B2_STALE_SENTINEL reason=router_fresh")

    # CRITICAL check (absolute-count K self_authored drops in 5-min window)
    count, drops = check_b2_critical(records, now)
    if drops:
        # Defensive cleanup (unreachable today due to STALE early-exit; future-proof)
        if read_dedup_sentinel(args.stale_b2_sentinel_path) is not None:
            delete_dedup_sentinel(args.stale_b2_sentinel_path)
            log("CLEAR B2_STALE_SENTINEL reason=escalated_to_B2_CRITICAL")
        sample_raws = [d.get('raw', '')[:80] for d in drops[:3]]
        msg = (f"[BREAKER §2 CRITICAL] Self-traffic loop detected: "
               f"{count} self_authored drops in last {B2_WINDOW_SECS}s "
               f"(threshold ≥{B2_ABSOLUTE_COUNT_THRESHOLD}). "
               f"Phase 1 filter is dropping correctly but loop shape suggests "
               f"upstream bypass or misclassification. "
               f"Sample raws: {sample_raws}{args.test_suffix}")
        log(f"FIRE B2_CRITICAL count={count} threshold={B2_ABSOLUTE_COUNT_THRESHOLD}")
        write_sentinel(args.sentinel_path, {
            "trigger": "SELF_TRAFFIC",
            "timestamp": now.isoformat(),
            "values": {
                "self_authored_drop_count": count,
                "window_secs": B2_WINDOW_SECS,
                "threshold": B2_ABSOLUTE_COUNT_THRESHOLD,
                "latest_drop_ts": drops[-1]['ts'].isoformat() if drops[-1]['ts'] else None,
                "earliest_drop_ts": drops[0]['ts'].isoformat() if drops[0]['ts'] else None,
            },
            "reset_instructions": (
                f"Remove this file to re-arm Breakers: "
                f"rm {args.sentinel_path}"
            ),
        })
        ok = send_telegram(msg, dry_run=args.telegram_dry_run)
        if not ok:
            log("B2_CRITICAL_ALERT_NO_TELEGRAM sentinel written but telegram delivery failed")
            return 2
        return 0

    log(f"B2_NORMAL self_authored_drops_in_window={count} "
        f"total_records_parsed={len(records)} "
        f"window_secs={B2_WINDOW_SECS}")
    return 0

# ---------------------------------------------------------------------------
# §3 main body (per-thread write-rate evaluator — alert-only)
# ---------------------------------------------------------------------------

def run_b3(args, now):
    """Run §3 per-thread write-rate evaluator. Returns exit code (0 or 2).

    ALERT-ONLY posture (RFC-002 §20 §3 deviation). All six openclaw
    agents share ai.openclaw.gateway as their LaunchAgent host;
    per-agent launchctl bootout is architecturally impossible (INS.1
    2026-04-22). §3 writes Telegram + breaker-paused.flag; Braden
    enforces the pause by hand. No launchctl.
    """
    state_latest_scan = None
    # Read write-log
    if args.simulate in ('b3_normal', 'b3_critical', 'b3_critical_dangling'):
        lines = synth_write_rows(args.simulate, now)
        log(f"SIMULATE_B3 scenario={args.simulate} lines={len(lines)}")
        records = parse_write_log_rows(lines)
        state_age = None
    elif args.simulate == 'b3_stale':
        log("SIMULATE_B3 scenario=b3_stale (state-file artificially stale)")
        records = []
        state_age = B3_STATE_STALE_SECS + 60
    elif args.simulate in ('normal', 'warning', 'critical', 'projected',
                           'stale', 'self_traffic_critical', 'router_stale',
                           'b4_normal', 'b4_critical', 'b4_stale'):
        log(f"SIMULATE_B3 scenario=empty (companion to simulate={args.simulate})")
        records = []
        state_age = None
    else:
        # Real tick
        lines = tail_lines(args.write_log_path, B3_TAIL_LINES)
        records = parse_write_log_rows(lines)
        state_age, state_latest_scan = check_b3_stale(
            args.write_scraper_state_path, now)

    # STALE check first — scraper silent → cannot trust §3 view
    if state_age is not None:
        sentinel = read_dedup_sentinel(args.stale_b3_sentinel_path)
        if sentinel and is_within_cooldown(sentinel, now, B3_STALE_COOLDOWN_SECS):
            elapsed_h = cooldown_elapsed_secs(sentinel, now) / 3600.0
            log(f"SUPPRESS B3_STALE "
                f"elapsed={fmt_hours(elapsed_h)} "
                f"cooldown={fmt_hours(B3_STALE_COOLDOWN_SECS / 3600)} "
                f"state_age_secs={int(state_age)}")
            return 0
        latest_str = state_latest_scan if state_latest_scan else "UNKNOWN"
        msg = (f"[BREAKER §3 STALE] notion-write-scraper state is not fresh: "
               f"state_file_age={int(state_age)}s "
               f"(>{B3_STATE_STALE_SECS}s); last_scan_ts={latest_str}. "
               f"§3 cannot evaluate per-thread write rate while scraper "
               f"is silent.{args.test_suffix}")
        log(f"FIRE B3_STALE state_file_age_secs={int(state_age)} "
            f"scraper_last_scan={latest_str}")
        write_dedup_sentinel(args.stale_b3_sentinel_path, "B3_STALE",
                             {"state_file_age_secs": int(state_age),
                              "scraper_last_scan_ts_or_unknown": latest_str},
                             B3_STALE_COOLDOWN_SECS)
        send_telegram(msg, dry_run=args.telegram_dry_run)
        # Signal §4 that B3 already fired the shared-source STALE Telegram
        # this tick. §4 will skip its own STALE Telegram (DRY).
        global _B3_FIRED_STALE_THIS_TICK
        _B3_FIRED_STALE_THIS_TICK = True
        return 0

    # Scraper fresh — clear B3_STALE sentinel if it survived
    if read_dedup_sentinel(args.stale_b3_sentinel_path) is not None:
        delete_dedup_sentinel(args.stale_b3_sentinel_path)
        log("CLEAR B3_STALE_SENTINEL reason=scraper_fresh")

    # CRITICAL check
    top_key, count, group_count, violating = check_b3_critical(records, now)
    if violating:
        agent_id, page_id = top_key
        sentinel = read_dedup_sentinel(args.b3_sentinel_path)
        if sentinel and is_within_cooldown(sentinel, now, B3_CRITICAL_COOLDOWN_SECS):
            remaining = B3_CRITICAL_COOLDOWN_SECS - int(
                cooldown_elapsed_secs(sentinel, now))
            log(f"B3_SKIPPED_DEDUP last_fire={sentinel.get('ts')} "
                f"cooldown_remaining={remaining}s")
            return 0
        earliest_ts = min(r['ts_dt'] for r in violating).isoformat()
        latest_ts   = max(r['ts_dt'] for r in violating).isoformat()
        msg = (f"[BREAKER §3 CRITICAL] Per-thread write-rate breach: "
               f"agent={agent_id} page={page_id} write_count={count} "
               f"window_secs={B3_WINDOW_SECS} threshold=>{B3_THRESHOLD}. "
               f"Alert-only (RFC-002 §20 §3 deviation — shared gateway "
               f"LaunchAgent; bootout cannot partition). Manual pause "
               f"enforcement: inspect breaker-paused.flag.{args.test_suffix}")
        log(f"FIRE B3_CRITICAL agent_id={agent_id} page_id={page_id} "
            f"write_count={count} window_secs={B3_WINDOW_SECS} "
            f"threshold={B3_THRESHOLD} earliest_ts={earliest_ts} "
            f"latest_ts={latest_ts}")
        pause_values = {
            "agent_id": agent_id,
            "page_id_canonical": page_id,
            "write_count": count,
            "window_secs": B3_WINDOW_SECS,
            "threshold": B3_THRESHOLD,
            "earliest_write_ts": earliest_ts,
            "latest_write_ts": latest_ts,
        }
        write_sentinel(args.sentinel_path, {
            "trigger": "WRITE_RATE_PER_THREAD",
            "timestamp": now.isoformat(),
            "values": pause_values,
            "reset_instructions": (
                f"Remove this file to re-arm Breakers: "
                f"rm {args.sentinel_path}"),
        })
        write_dedup_sentinel(args.b3_sentinel_path, "B3_CRITICAL",
                             pause_values, B3_CRITICAL_COOLDOWN_SECS)
        ok = send_telegram(msg, dry_run=args.telegram_dry_run)
        if not ok:
            log("B3_CRITICAL_ALERT_NO_TELEGRAM sentinel written but "
                "telegram delivery failed")
            return 2
        return 0

    # NORMAL summary
    cutoff = now.timestamp() - B3_WINDOW_SECS
    window = [r for r in records
              if r['ts_dt'] and r['ts_dt'].timestamp() >= cutoff
              and r['page_id_canonical']]
    top_group_summary = "none"
    if window and top_key is not None:
        agent_id, page_id = top_key
        top_group_summary = f"{agent_id}/{page_id[:13]}…:{count}"
    log(f"B3_NORMAL window_writes={len(window)} "
        f"groups={group_count} top_group={top_group_summary}")
    return 0

# ---------------------------------------------------------------------------
# §4 main body (global write-rate evaluator — alert-only, per-consumer)
# ---------------------------------------------------------------------------

def run_b4(args, now):
    """Run §4 global write-rate evaluator. Returns exit code (0 or 2).

    ALERT-ONLY posture (RFC-002 §20 §4 deviation; same precedent as §3,
    documented in THR-56 Phase 5). Aggregation rule per spec verbatim:
    total writes from a single consumer (agent_id) to any Notion
    destination, in any rolling B4_WINDOW_SECS (60s) window. Threshold
    strict >: fire on the (B4_GLOBAL_THRESHOLD+1)-th write. Includes
    empty page_id_canonical rows since §4 measures consumer behavior,
    not destination uniqueness.
    """
    state_latest_scan = None
    # Read write-log
    if args.simulate in ('b4_normal', 'b4_critical'):
        lines = synth_write_rows(args.simulate, now)
        log(f"SIMULATE_B4 scenario={args.simulate} lines={len(lines)}")
        records = parse_write_log_rows(lines)
        state_age = None
    elif args.simulate == 'b4_stale':
        log("SIMULATE_B4 scenario=b4_stale (state-file artificially stale)")
        records = []
        state_age = B4_STATE_STALE_SECS + 60
    elif args.simulate in ('normal', 'warning', 'critical', 'projected',
                           'stale', 'self_traffic_critical', 'router_stale',
                           'b3_normal', 'b3_critical',
                           'b3_critical_dangling', 'b3_stale'):
        log(f"SIMULATE_B4 scenario=empty (companion to simulate={args.simulate})")
        records = []
        state_age = None
    else:
        # Real tick
        lines = tail_lines(args.write_log_path, B4_TAIL_LINES)
        records = parse_write_log_rows(lines)
        state_age, state_latest_scan = check_b4_stale(
            args.write_scraper_state_path, now)

    # B3-stale dedup short-circuit: if §3 just FIRED B3_STALE this tick,
    # §4's data view is identically unreliable. Skip without firing
    # B4_STALE Telegram (one Telegram per scraper failure).
    # Note: §3-suppressed-by-cooldown leaves _B3_FIRED_STALE_THIS_TICK
    # False, so §4 still evaluates its own STALE+dedup independently.
    if _B3_FIRED_STALE_THIS_TICK:
        log("B4_SKIPPED_STALE_DEDUP_BY_B3")
        return 0

    # STALE check (independent)
    if state_age is not None:
        sentinel = read_dedup_sentinel(args.stale_b4_sentinel_path)
        if sentinel and is_within_cooldown(sentinel, now, B4_STALE_COOLDOWN_SECS):
            elapsed_h = cooldown_elapsed_secs(sentinel, now) / 3600.0
            log(f"SUPPRESS B4_STALE "
                f"elapsed={fmt_hours(elapsed_h)} "
                f"cooldown={fmt_hours(B4_STALE_COOLDOWN_SECS / 3600)} "
                f"state_age_secs={int(state_age)}")
            return 0
        latest_str = state_latest_scan if state_latest_scan else "UNKNOWN"
        msg = (f"[BREAKER §4 STALE] notion-write-scraper state is not fresh: "
               f"state_file_age={int(state_age)}s "
               f"(>{B4_STATE_STALE_SECS}s); last_scan_ts={latest_str}. "
               f"§4 cannot evaluate global write rate while scraper "
               f"is silent.{args.test_suffix}")
        log(f"FIRE B4_STALE state_file_age_secs={int(state_age)} "
            f"scraper_last_scan={latest_str}")
        write_dedup_sentinel(args.stale_b4_sentinel_path, "B4_STALE",
                             {"state_file_age_secs": int(state_age),
                              "scraper_last_scan_ts_or_unknown": latest_str},
                             B4_STALE_COOLDOWN_SECS)
        send_telegram(msg, dry_run=args.telegram_dry_run)
        return 0

    # Scraper fresh — clear B4_STALE sentinel if it survived
    if read_dedup_sentinel(args.stale_b4_sentinel_path) is not None:
        delete_dedup_sentinel(args.stale_b4_sentinel_path)
        log("CLEAR B4_STALE_SENTINEL reason=scraper_fresh")

    # CRITICAL check (per-consumer aggregation)
    top_agent, count, agent_count, violating = check_b4_critical(records, now)
    if violating:
        sentinel = read_dedup_sentinel(args.b4_sentinel_path)
        if sentinel and is_within_cooldown(sentinel, now, B4_CRITICAL_COOLDOWN_SECS):
            remaining = B4_CRITICAL_COOLDOWN_SECS - int(
                cooldown_elapsed_secs(sentinel, now))
            log(f"B4_SKIPPED_DEDUP last_fire={sentinel.get('ts')} "
                f"cooldown_remaining={remaining}s")
            return 0
        earliest_ts = min(r['ts_dt'] for r in violating).isoformat()
        latest_ts   = max(r['ts_dt'] for r in violating).isoformat()
        unique_pages = len({r['page_id_canonical'] for r in violating
                            if r['page_id_canonical']})
        msg = (f"[BREAKER §4 CRITICAL] Global write-rate breach: "
               f"agent={top_agent} write_count={count} "
               f"window_secs={B4_WINDOW_SECS} threshold=>{B4_GLOBAL_THRESHOLD} "
               f"unique_pages={unique_pages}. "
               f"Alert-only (RFC-002 §20 §4 deviation — shared gateway "
               f"LaunchAgent; bootout cannot partition). Manual pause "
               f"enforcement: inspect breaker-paused.flag.{args.test_suffix}")
        log(f"FIRE B4_CRITICAL agent={top_agent} write_count={count} "
            f"threshold={B4_GLOBAL_THRESHOLD} window_secs={B4_WINDOW_SECS} "
            f"earliest_ts={earliest_ts} latest_ts={latest_ts} "
            f"unique_pages={unique_pages} unique_agents={agent_count}")
        pause_values = {
            "agent_id": top_agent,
            "write_count": count,
            "window_secs": B4_WINDOW_SECS,
            "threshold": B4_GLOBAL_THRESHOLD,
            "earliest_write_ts": earliest_ts,
            "latest_write_ts": latest_ts,
            "unique_pages": unique_pages,
            "unique_agents": agent_count,
        }
        write_sentinel(args.sentinel_path, {
            "trigger": "WRITE_RATE_GLOBAL",
            "timestamp": now.isoformat(),
            "values": pause_values,
            "reset_instructions": (
                f"Remove this file to re-arm Breakers: "
                f"rm {args.sentinel_path}"),
        })
        write_dedup_sentinel(args.b4_sentinel_path, "B4_CRITICAL",
                             pause_values, B4_CRITICAL_COOLDOWN_SECS)
        ok = send_telegram(msg, dry_run=args.telegram_dry_run)
        if not ok:
            log("B4_CRITICAL_ALERT_NO_TELEGRAM sentinel written but "
                "telegram delivery failed")
            return 2
        return 0

    # NORMAL summary
    cutoff = now.timestamp() - B4_WINDOW_SECS
    window_size = len([r for r in records
                       if r['ts_dt'] and r['ts_dt'].timestamp() >= cutoff])
    log(f"B4_NORMAL window_writes={window_size} "
        f"threshold={B4_GLOBAL_THRESHOLD} top_agent_writes={count} "
        f"unique_agents={agent_count}")
    return 0

# ---------------------------------------------------------------------------
# Main (orchestrator)
# ---------------------------------------------------------------------------

def main(argv=None):
    p = argparse.ArgumentParser()
    p.add_argument('--jsonl-path', default=REAL_JSONL_PATH)
    p.add_argument('--router-log-path', default=REAL_ROUTER_LOG_PATH)
    p.add_argument('--write-log-path', default=REAL_WRITE_LOG_PATH)
    p.add_argument('--write-scraper-state-path', default=REAL_WRITE_SCRAPER_STATE)
    p.add_argument('--sentinel-path', default=REAL_SENTINEL_PATH)
    p.add_argument('--warning-sentinel-path', default=REAL_WARNING_SENTINEL_PATH)
    p.add_argument('--stale-b1-sentinel-path', default=REAL_STALE_B1_SENTINEL_PATH)
    p.add_argument('--stale-b2-sentinel-path', default=REAL_STALE_B2_SENTINEL_PATH)
    p.add_argument('--b3-sentinel-path', default=REAL_B3_SENTINEL_PATH)
    p.add_argument('--stale-b3-sentinel-path', default=REAL_STALE_B3_SENTINEL_PATH)
    p.add_argument('--b4-sentinel-path', default=REAL_B4_SENTINEL_PATH)
    p.add_argument('--stale-b4-sentinel-path', default=REAL_STALE_B4_SENTINEL_PATH)
    p.add_argument('--log-path', default=REAL_LOG_PATH)
    p.add_argument('--simulate', choices=['normal', 'warning', 'critical', 'projected', 'stale',
                                          'self_traffic_critical', 'router_stale',
                                          'b3_normal', 'b3_critical',
                                          'b3_critical_dangling', 'b3_stale',
                                          'b4_normal', 'b4_critical', 'b4_stale'])
    p.add_argument('--telegram-dry-run', action='store_true')
    p.add_argument('--test-suffix', default='')
    args = p.parse_args(argv)

    global _LOG_PATH
    _LOG_PATH = args.log_path

    global _B3_FIRED_STALE_THIS_TICK
    _B3_FIRED_STALE_THIS_TICK = False

    now = datetime.now(timezone.utc)

    # Sentinel short-circuit (covers ALL evaluators)
    if os.path.exists(args.sentinel_path):
        log(f"HEARTBEAT sentinel present at {args.sentinel_path}; suppressing alerts (§1, §2, §3, §4)")
        return 0

    rc1 = run_b1(args, now)

    # If §1 just wrote sentinel during this tick, skip §2, §3, §4.
    if os.path.exists(args.sentinel_path):
        log("B2_SKIPPED_SENTINEL_JUST_WRITTEN_BY_B1")
        log("B3_SKIPPED_SENTINEL_JUST_WRITTEN_BY_B1_OR_B2")
        log("B4_SKIPPED_SENTINEL_JUST_WRITTEN_BY_B1_OR_B2_OR_B3")
        return rc1

    rc2 = run_b2(args, now)

    # If §2 just wrote sentinel, skip §3, §4.
    if os.path.exists(args.sentinel_path):
        log("B3_SKIPPED_SENTINEL_JUST_WRITTEN_BY_B1_OR_B2")
        log("B4_SKIPPED_SENTINEL_JUST_WRITTEN_BY_B1_OR_B2_OR_B3")
        return max(rc1, rc2)

    rc3 = run_b3(args, now)

    # If §3 just wrote sentinel, skip §4.
    if os.path.exists(args.sentinel_path):
        log("B4_SKIPPED_SENTINEL_JUST_WRITTEN_BY_B1_OR_B2_OR_B3")
        return max(rc1, rc2, rc3)

    rc4 = run_b4(args, now)
    return max(rc1, rc2, rc3, rc4)

if __name__ == "__main__":
    try:
        sys.exit(main())
    except Exception as e:
        try:
            log(f"FATAL {type(e).__name__}: {e}")
        except Exception:
            pass
        sys.exit(2)
