#!/usr/bin/env python3
"""
circuit-breaker-watchdog.py — RFC-002 §20 Breakers §1 (cost-rate) + §2 (self-traffic).

Both breakers run on a single 5-minute LaunchAgent tick. The script reads
two data sources, evaluates each breaker independently, and shares the
sentinel + Telegram path:

  §1 cost-rate     — reads ~/.openclaw/workspace/state/oauth-usage-history.jsonl
                     produced by oauth_usage_collect.py (5-min cadence).
                     Tiers: STALE → CRITICAL → CRITICAL-PROJECTED → WARNING → NORMAL.

  §2 self-traffic  — reads /Users/openclaw/event-router/router.log produced by
                     event-router/router.js. Counts decision=drop AND
                     reason=self_authored entries in the last 5 min.
                     Tiers: STALE → CRITICAL → NORMAL.

Both share:
  - Sentinel at ~/.openclaw/workspace/state/breaker-paused.flag
    (CRITICAL writers; sentinel-present short-circuits both evaluators).
  - Telegram via `openclaw message send --channel telegram --target 8032472383`.
  - Log file /tmp/openclaw/circuit-breaker-watchdog.log.

§2 trigger shape — DELIBERATE §20 TEXT DEVIATION:
  RFC-002 §20 specifies "≥80% of decisions in a rolling 5-minute window".
  This implementation uses pure absolute-count: ≥5 self_authored drops in
  5 min. Rationale documented in THR-39 Phase 2 dialogue-log entry.
  Baseline filter_decision rate is near-zero in normal operation
  (Phase 1 inspection: ~1.7 self_authored/day), so any ratio computation
  requires a min-N gate that real loops will breach in absolute-count
  terms first. Loop signature is volume-shaped, not ratio-shaped.

Sentinel pause semantics still as documented for §1 (no external readers
yet; alert-suppression mechanism within this script). §2 STALE has no
internal dedup — re-fires every tick while router.log is stale. Tracked
for the same followup worker as §1's WARNING dedup (Point 9/10 thread).

Test mode:
  --simulate {normal,warning,critical,projected,stale,
              self_traffic_critical,router_stale}
    normal              — both §1 and §2 see synthetic normal data.
    warning|critical|
    projected|stale     — §1-focused; §2 sees synthetic normal.
    self_traffic_critical|
    router_stale        — §2-focused; §1 sees synthetic normal.
  --jsonl-path PATH       Override §1 JSONL path.
  --router-log-path PATH  Override §2 router.log path.
  --sentinel-path PATH    Override sentinel path.
  --log-path PATH         Override log path.
  --telegram-dry-run      Pass --dry-run to `openclaw message send`.
  --test-suffix STR       Appended to alert messages.

Exit codes:
  0  normal run (any tier)
  2  any CRITICAL fired but its Telegram delivery failed; max across §1/§2
"""

import argparse
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone

# ---------------------------------------------------------------------------
# §1 Constants (cost-rate)
# ---------------------------------------------------------------------------

REAL_JSONL_PATH    = "/Users/openclaw/.openclaw/workspace/state/oauth-usage-history.jsonl"
REAL_SENTINEL_PATH = "/Users/openclaw/.openclaw/workspace/state/breaker-paused.flag"
REAL_LOG_PATH      = "/tmp/openclaw/circuit-breaker-watchdog.log"

OPENCLAW_BIN     = "/opt/homebrew/bin/openclaw"
TELEGRAM_CHANNEL = "telegram"
TELEGRAM_TARGET  = "8032472383"
SEND_TIMEOUT_SECS = 15

START_INTERVAL_SECS    = 300        # must match plist StartInterval
STALE_FRESHNESS_SECS   = 2 * START_INTERVAL_SECS  # 600s
N_WINDOW               = 12         # ~60 min at 5-min cadence
N_RECENT_FOR_QUALITY   = 10
ERROR_RATIO_THRESHOLD  = 0.5        # >50%

WARNING_DAY_PCT          = 20.0
CRITICAL_DAY_PCT         = 80.0
PROJECTION_TARGET_PCT    = 100.0    # day_pct cap; project breach before reset
PROJECTION_MIN_ROWS      = 3        # need at least 3 evaluable rows for slope

# ---------------------------------------------------------------------------
# §2 Constants (self-traffic)
# ---------------------------------------------------------------------------

REAL_ROUTER_LOG_PATH = "/Users/openclaw/event-router/router.log"

B2_WINDOW_SECS              = 300       # 5-minute rolling window
B2_ABSOLUTE_COUNT_THRESHOLD = 5         # ≥K self_authored drops in window → CRITICAL
B2_STALE_LATEST_SECS        = 1800      # 3× inferred ~600s upstream health-check cadence
                                        # (cadence inferred from active router.log timestamps,
                                        #  not specified in router.js)
B2_TAIL_LINES               = 500       # max lines tailed per tick — safety margin
                                        # against a loop burst (baseline = 1-2 entries/window)

# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------

_LOG_PATH = REAL_LOG_PATH  # mutated by main() once args are parsed

def log(msg):
    os.makedirs(os.path.dirname(_LOG_PATH), exist_ok=True)
    ts = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')
    with open(_LOG_PATH, 'a') as f:
        f.write(f"[{ts}] {msg}\n")

# ---------------------------------------------------------------------------
# Tail-read helper (shared by §1 JSONL + §2 router log)
# ---------------------------------------------------------------------------

def tail_lines(path, n):
    """Read approximately the last n lines of a text file efficiently."""
    if not os.path.exists(path):
        return []
    with open(path, 'rb') as f:
        f.seek(0, 2)
        size = f.tell()
        block = 64 * 1024
        data = b''
        while size > 0 and data.count(b'\n') <= n:
            read_size = min(block, size)
            size -= read_size
            f.seek(size)
            data = f.read(read_size) + data
        lines = data.splitlines()
    return [l.decode('utf-8', errors='replace') for l in lines[-n:]]

def parse_rows(lines):
    """Parse JSONL lines, dropping any that fail to decode."""
    rows = []
    for ln in lines:
        ln = ln.strip()
        if not ln:
            continue
        try:
            rows.append(json.loads(ln))
        except json.JSONDecodeError:
            log(f"SKIP malformed JSON: {ln[:120]}")
    return rows

# ---------------------------------------------------------------------------
# Router log parsing (§2)
# ---------------------------------------------------------------------------

# Match the bracketed ISO-Z timestamp produced by router.js logInfo/logDecision.
# Active log lines also have a leading bare timestamp (node prepends), but
# rotated files sometimes have only the bracketed form; the bracketed
# form is universally present.
_ROUTER_TS_RE = re.compile(r'\[(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z)\]')

def parse_router_lines(lines):
    """
    Parse router.log text lines into structured records.

    Returns list of dicts: {ts: datetime|None, kind: str, is_self_authored_drop: bool, raw: str}
      kind is the event type token after '[notion]' (e.g. 'filter_decision',
      'ignored', 'CRITICAL_fail_closed_enabled', 'fail_closed_alert', or 'other').
    Lines that don't match the router log shape are dropped silently.
    """
    out = []
    for raw in lines:
        raw = raw.rstrip('\n')
        if not raw.strip():
            continue
        m = _ROUTER_TS_RE.search(raw)
        if not m:
            continue
        ts = parse_ts(m.group(1))
        # Locate '[notion] <kind>'
        notion_idx = raw.find('[notion]')
        kind = 'other'
        if notion_idx != -1:
            after = raw[notion_idx + len('[notion]'):].lstrip()
            kind = after.split(' ', 1)[0] if after else 'other'

        is_self_authored_drop = False
        if kind == 'filter_decision':
            # Locate JSON payload (last '{' to last '}')
            jstart = raw.find('{', notion_idx)
            jend = raw.rfind('}')
            if 0 <= jstart < jend:
                try:
                    obj = json.loads(raw[jstart:jend + 1])
                    if (obj.get('decision') == 'drop' and
                        obj.get('reason') == 'self_authored'):
                        is_self_authored_drop = True
                except json.JSONDecodeError:
                    pass

        out.append({
            'ts': ts,
            'kind': kind,
            'is_self_authored_drop': is_self_authored_drop,
            'raw': raw[:200],  # truncated for log safety
        })
    return out

# ---------------------------------------------------------------------------
# Row classification (§1)
# ---------------------------------------------------------------------------

def is_evaluable(row):
    """A row is evaluable iff error is not true and day_pct/week_pct are both numeric."""
    if row.get('error') is True:
        return False
    if row.get('day_pct') is None or row.get('week_pct') is None:
        return False
    return True

def parse_ts(s):
    """
    Parse ISO 8601 timestamp; return aware datetime or None.

    Hardened against format variance:
      - Strips trailing 'Z' and replaces with '+00:00' (Python 3.9
        fromisoformat() does not accept 'Z').
      - Assumes UTC if the parsed datetime is naive.
    Today's collector (oauth_usage_collect.py:154) emits +00:00 explicitly,
    so the Z-stripping branch is preventive only.
    For §2, router.js emits Z-suffix timestamps — the Z-stripping is
    load-bearing there.
    """
    if not s:
        return None
    try:
        if isinstance(s, str) and s.endswith('Z'):
            s = s[:-1] + '+00:00'
        dt = datetime.fromisoformat(s)
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        return dt
    except (ValueError, TypeError):
        return None

# ---------------------------------------------------------------------------
# §1 tier evaluation (cost-rate)
# ---------------------------------------------------------------------------

def check_stale(rows, now):
    """
    Return ('freshness'|'quality'|'both', reason_str) or (None, None).
    'freshness' means the latest row is too old → caller should NOT
    proceed to tier evaluation. 'quality' means error ratio is high
    but the latest row may still be fresh → caller MAY proceed.
    """
    if not rows:
        return ('freshness', 'no rows in JSONL')

    latest = rows[-1]
    latest_ts = parse_ts(latest.get('timestamp'))
    age_secs = (now - latest_ts).total_seconds() if latest_ts else None

    freshness_bad = age_secs is None or age_secs > STALE_FRESHNESS_SECS

    recent = rows[-N_RECENT_FOR_QUALITY:]
    if recent:
        err_count = sum(1 for r in recent if r.get('error') is True)
        err_ratio = err_count / len(recent)
    else:
        err_ratio = 0.0
    quality_bad = err_ratio > ERROR_RATIO_THRESHOLD

    if freshness_bad and quality_bad:
        return ('both',
                f'latest_age={age_secs}s (>{STALE_FRESHNESS_SECS}s) '
                f'AND error_ratio={err_ratio:.0%} of last {len(recent)}')
    if freshness_bad:
        return ('freshness',
                f'latest_age={age_secs}s (>{STALE_FRESHNESS_SECS}s)')
    if quality_bad:
        return ('quality',
                f'error_ratio={err_ratio:.0%} of last {len(recent)}')
    return (None, None)

def check_critical(evaluable):
    if not evaluable:
        return None
    latest = evaluable[-1]
    if latest['day_pct'] >= CRITICAL_DAY_PCT:
        return latest
    return None

def check_projected(evaluable):
    """
    Linear regression of day_pct vs. time over the evaluable window.
    Returns (latest_row, projected_hours_to_100) if breach projected
    before reset, else None.
    """
    if len(evaluable) < PROJECTION_MIN_ROWS:
        return None

    pts = []
    for r in evaluable:
        ts = parse_ts(r.get('timestamp'))
        if ts is None:
            continue
        pts.append((ts.timestamp(), float(r['day_pct'])))
    if len(pts) < PROJECTION_MIN_ROWS:
        return None

    n = len(pts)
    sum_x = sum(p[0] for p in pts)
    sum_y = sum(p[1] for p in pts)
    sum_xy = sum(p[0] * p[1] for p in pts)
    sum_xx = sum(p[0] * p[0] for p in pts)
    denom = n * sum_xx - sum_x * sum_x
    if denom == 0:
        return None
    slope_per_sec = (n * sum_xy - sum_x * sum_y) / denom  # %/sec
    if slope_per_sec <= 0:
        return None  # flat or decreasing — no projected breach

    latest = evaluable[-1]
    headroom_pct = PROJECTION_TARGET_PCT - latest['day_pct']
    if headroom_pct <= 0:
        # already at/above 100% — CRITICAL would have caught this
        return None
    secs_to_100 = headroom_pct / slope_per_sec
    hours_to_100 = secs_to_100 / 3600.0

    hours_until_reset = latest.get('hours_until_reset')
    if hours_until_reset is None:
        return None
    if hours_to_100 < hours_until_reset:
        return (latest, hours_to_100)
    return None

def check_warning(evaluable):
    if len(evaluable) < 2:
        return None
    if (evaluable[-1]['day_pct'] >= WARNING_DAY_PCT and
        evaluable[-2]['day_pct'] >= WARNING_DAY_PCT):
        return evaluable[-1]
    return None

# ---------------------------------------------------------------------------
# §2 tier evaluation (self-traffic)
# ---------------------------------------------------------------------------

def check_b2_critical(router_records, now):
    """
    Returns (count, [self_authored_records]) if CRITICAL fires (count ≥ K
    self_authored drops in last B2_WINDOW_SECS), else (count, []).
    """
    cutoff = now.timestamp() - B2_WINDOW_SECS
    self_drops = [
        r for r in router_records
        if r['is_self_authored_drop']
        and r['ts'] is not None
        and r['ts'].timestamp() >= cutoff
    ]
    if len(self_drops) >= B2_ABSOLUTE_COUNT_THRESHOLD:
        return (len(self_drops), self_drops)
    return (len(self_drops), [])

def check_b2_stale(router_records, now):
    """
    Returns (age_secs_or_marker, latest_ts) if STALE fires (latest ts older
    than B2_STALE_LATEST_SECS, or no parseable entries), else (None, None).
    Operates on ALL router log records (any kind), since filter_decisions
    are sparse and the presence of *any* recent entry — including an
    `ignored` health-check — proves the router is still writing.
    """
    timestamped = [r for r in router_records if r['ts'] is not None]
    if not timestamped:
        return ('no_entries', None)
    latest = max(r['ts'] for r in timestamped)
    age_secs = (now - latest).total_seconds()
    if age_secs > B2_STALE_LATEST_SECS:
        return (age_secs, latest)
    return (None, None)

# ---------------------------------------------------------------------------
# Side effects
# ---------------------------------------------------------------------------

def send_telegram(message, dry_run=False):
    cmd = [
        OPENCLAW_BIN, "message", "send",
        "--channel", TELEGRAM_CHANNEL,
        "--target", TELEGRAM_TARGET,
        "--message", message,
    ]
    if dry_run:
        cmd.append("--dry-run")
    try:
        result = subprocess.run(
            cmd, capture_output=True, text=True, timeout=SEND_TIMEOUT_SECS
        )
        if result.returncode != 0:
            log(f"TELEGRAM_SEND_FAILED rc={result.returncode} "
                f"stderr={result.stderr.strip()[:200]}")
            return False
        log(f"TELEGRAM_SENT dry_run={dry_run} chars={len(message)}")
        return True
    except subprocess.TimeoutExpired:
        log(f"TELEGRAM_SEND_TIMEOUT after {SEND_TIMEOUT_SECS}s")
        return False
    except Exception as e:
        log(f"TELEGRAM_SEND_EXCEPTION {type(e).__name__}: {e}")
        return False

def write_sentinel(path, payload):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    tmp = path + ".tmp"
    with open(tmp, 'w') as f:
        json.dump(payload, f, indent=2)
        f.write("\n")
    os.rename(tmp, path)
    log(f"SENTINEL_WRITTEN {path}")

# ---------------------------------------------------------------------------
# Simulation (test mode)
# ---------------------------------------------------------------------------

def synth_rows(scenario, now):
    """Generate synthetic JSONL rows for §1 --simulate."""
    base = now.timestamp()
    def row(offset_secs, **overrides):
        ts = datetime.fromtimestamp(base + offset_secs, tz=timezone.utc).isoformat()
        r = {
            "timestamp": ts,
            "day_pct": 0.0, "week_pct": 1.0,
            "hours_until_reset": 4.0,
            "credits_remaining": None,
            "raw_output": "<synthetic>"
        }
        r.update(overrides)
        return r

    if scenario == 'normal':
        return [row(-i*300, day_pct=1.0) for i in range(12, 0, -1)]
    if scenario == 'warning':
        # Flat 22.5% — slope=0 ensures projected does NOT trip first.
        return [row(-i*300, day_pct=22.5) for i in range(12, 0, -1)]
    if scenario == 'critical':
        rows = [row(-i*300, day_pct=70.0) for i in range(12, 0, -1)]
        rows[-1]['day_pct'] = 85.0
        return rows
    if scenario == 'projected':
        # Steep linear climb that breaches 100% before reset (hours_until_reset=2.0)
        rows = []
        for i in range(12, 0, -1):
            offset = -i * 300
            # day_pct climbs ~3%/5min = 36%/hr
            pct = max(0.0, 50.0 - i * 3.0)
            rows.append(row(offset, day_pct=pct, hours_until_reset=2.0))
        return rows
    if scenario == 'stale':
        # Last row is 30 min old → freshness trigger
        return [row(-i*300 - 1800, day_pct=10.0) for i in range(12, 0, -1)]
    raise ValueError(f"unknown scenario: {scenario}")

def synth_router_lines(scenario, now):
    """
    Generate synthetic router.log text lines for §2 --simulate.

    Lines match the real format produced by router.js so the parser is
    exercised end-to-end:
      <ts-iso-Z> [<ts-iso-Z>] [notion] <kind> <json>
    """
    base = now.timestamp()
    def ts_at(offset_secs):
        return datetime.fromtimestamp(base + offset_secs, tz=timezone.utc).isoformat().replace('+00:00', 'Z')

    if scenario == 'normal':
        # One health-check from ~5 min ago → §2 STALE doesn't fire (recent),
        # §2 CRITICAL doesn't fire (zero self_authored drops).
        ts = ts_at(-300)
        return [
            f'{ts} [{ts}] [notion] ignored {{"reason":"event_type_not_forwarded","eventId":"evt-synth-1","eventType":"health-check"}}'
        ]
    if scenario == 'self_traffic_critical':
        # K=5 self_authored drops, all within the last 5 minutes,
        # plus a recent health-check so STALE doesn't co-fire.
        out = []
        recent = ts_at(-30)
        out.append(
            f'{recent} [{recent}] [notion] ignored {{"reason":"event_type_not_forwarded","eventId":"evt-synth-hc","eventType":"health-check"}}'
        )
        for i in range(B2_ABSOLUTE_COUNT_THRESHOLD):
            ts = ts_at(-(i * 30 + 60))  # spread across last ~3 min
            out.append(
                f'{ts} [{ts}] [notion] filter_decision '
                f'{{"eventId":"evt-synth-self-{i}","stage":"self_authorship",'
                f'"decision":"drop","reason":"self_authored","eventType":"page.content_updated"}}'
            )
        return out
    if scenario == 'router_stale':
        # Latest entry is 1 hour old → STALE-FRESHNESS trigger (>1800s).
        ts = ts_at(-3600)
        return [
            f'{ts} [{ts}] [notion] ignored {{"reason":"event_type_not_forwarded","eventId":"evt-synth-old","eventType":"health-check"}}'
        ]
    raise ValueError(f"unknown §2 scenario: {scenario}")

# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def fmt_values(row):
    return (f"day_pct={row.get('day_pct')}% "
            f"week_pct={row.get('week_pct')}% "
            f"hours_until_reset={row.get('hours_until_reset')} "
            f"ts={row.get('timestamp')}")

# ---------------------------------------------------------------------------
# §1 main body (cost-rate evaluator)
# ---------------------------------------------------------------------------

def run_b1(args, now):
    """Run §1 cost-rate evaluator. Returns exit code (0 or 2).

    Log tokens here are bit-identical to the pre-B.2 implementation
    (no B1_ prefix) to preserve observability symmetry — existing greps,
    monitoring, and human-eyeball habits trained on the original tokens
    continue to work. Asymmetry with §2's B2_ prefix is deliberate.
    """
    # Read rows
    if args.simulate in ('normal', 'warning', 'critical', 'projected', 'stale'):
        rows = synth_rows(args.simulate, now)
        log(f"SIMULATE scenario={args.simulate} rows={len(rows)}")
    elif args.simulate in ('self_traffic_critical', 'router_stale'):
        rows = synth_rows('normal', now)
        log(f"SIMULATE scenario=normal (companion to §2 simulate={args.simulate}) rows={len(rows)}")
    else:
        lines = tail_lines(args.jsonl_path, max(N_WINDOW, N_RECENT_FOR_QUALITY))
        rows = parse_rows(lines)

    # STALE check first
    stale_kind, stale_reason = check_stale(rows, now)
    if stale_kind in ('freshness', 'both'):
        msg = f"[BREAKER §1 STALE] Cost-rate data not fresh: {stale_reason}{args.test_suffix}"
        log(f"FIRE STALE-FRESHNESS reason={stale_reason}")
        send_telegram(msg, dry_run=args.telegram_dry_run)
        return 0  # cannot trust tier evaluation when data is stale
    if stale_kind == 'quality':
        msg = f"[BREAKER §1 STALE] Cost-rate data quality degraded: {stale_reason}{args.test_suffix}"
        log(f"NOTE STALE-QUALITY reason={stale_reason}; continuing to tier eval")
        send_telegram(msg, dry_run=args.telegram_dry_run)
        # continue — latest row may still be evaluable

    # Filter to evaluable rows; log skips
    evaluable = []
    for r in rows:
        if is_evaluable(r):
            evaluable.append(r)
        else:
            reason = 'error=true' if r.get('error') is True else 'null_metric'
            log(f"SKIP_ROW {reason} ts={r.get('timestamp')}")

    if not evaluable:
        log("NO_EVALUABLE_ROWS exiting normal")
        return 0

    # Tier checks (priority order)
    crit = check_critical(evaluable)
    if crit:
        msg = (f"[BREAKER §1 CRITICAL] Cost rate breached {CRITICAL_DAY_PCT}% threshold. "
               f"{fmt_values(crit)}{args.test_suffix}")
        log(f"FIRE CRITICAL {fmt_values(crit)}")
        # Write sentinel FIRST so durable state is recorded even if telegram fails.
        write_sentinel(args.sentinel_path, {
            "trigger": "CRITICAL",
            "timestamp": now.isoformat(),
            "values": {k: crit.get(k) for k in
                       ('day_pct', 'week_pct', 'hours_until_reset',
                        'credits_remaining', 'timestamp')},
            "reset_instructions": (
                f"Remove this file to re-arm Breakers: "
                f"rm {args.sentinel_path}"
            ),
        })
        ok = send_telegram(msg, dry_run=args.telegram_dry_run)
        if not ok:
            log("CRITICAL_ALERT_NO_TELEGRAM sentinel written but telegram delivery failed")
            return 2
        return 0

    proj = check_projected(evaluable)
    if proj:
        latest, hrs = proj
        msg = (f"[BREAKER §1 CRITICAL-PROJECTED] Linear extrapolation projects "
               f"day_pct=100% in {hrs:.2f}h, before reset in "
               f"{latest.get('hours_until_reset'):.2f}h. "
               f"{fmt_values(latest)}{args.test_suffix}")
        log(f"FIRE CRITICAL-PROJECTED hrs_to_100={hrs:.2f} {fmt_values(latest)}")
        write_sentinel(args.sentinel_path, {
            "trigger": "CRITICAL-PROJECTED",
            "timestamp": now.isoformat(),
            "values": {
                **{k: latest.get(k) for k in
                   ('day_pct', 'week_pct', 'hours_until_reset',
                    'credits_remaining', 'timestamp')},
                "projected_hours_to_100": hrs,
            },
            "reset_instructions": (
                f"Remove this file to re-arm Breakers: "
                f"rm {args.sentinel_path}"
            ),
        })
        ok = send_telegram(msg, dry_run=args.telegram_dry_run)
        if not ok:
            log("CRITICAL_ALERT_NO_TELEGRAM sentinel written but telegram delivery failed")
            return 2
        return 0

    warn = check_warning(evaluable)
    if warn:
        msg = (f"[BREAKER §1 WARNING] Cost rate >= {WARNING_DAY_PCT}% for 2 "
               f"consecutive evaluable data rows. {fmt_values(warn)}{args.test_suffix}")
        log(f"FIRE WARNING {fmt_values(warn)}")
        send_telegram(msg, dry_run=args.telegram_dry_run)
        return 0

    log(f"NORMAL latest day_pct={evaluable[-1]['day_pct']}% "
        f"week_pct={evaluable[-1]['week_pct']}% "
        f"evaluable_rows={len(evaluable)}/{len(rows)}")
    return 0

# ---------------------------------------------------------------------------
# §2 main body (self-traffic evaluator)
# ---------------------------------------------------------------------------

def run_b2(args, now):
    """Run §2 self-traffic evaluator. Returns exit code (0 or 2)."""
    if args.simulate in ('self_traffic_critical', 'router_stale'):
        router_lines = synth_router_lines(args.simulate, now)
        log(f"SIMULATE_B2 scenario={args.simulate} lines={len(router_lines)}")
    elif args.simulate == 'normal':
        router_lines = synth_router_lines('normal', now)
        log(f"SIMULATE_B2 scenario=normal lines={len(router_lines)}")
    elif args.simulate in ('warning', 'critical', 'projected', 'stale'):
        router_lines = synth_router_lines('normal', now)
        log(f"SIMULATE_B2 scenario=normal (companion to §1 simulate={args.simulate}) lines={len(router_lines)}")
    else:
        router_lines = tail_lines(args.router_log_path, B2_TAIL_LINES)

    records = parse_router_lines(router_lines)

    # STALE check first — early-exit (parallels §1 STALE-FRESHNESS handling)
    age_or_marker, latest_ts = check_b2_stale(records, now)
    if age_or_marker is not None:
        if age_or_marker == 'no_entries':
            reason = 'no entries with parseable timestamp in tailed router log'
        else:
            reason = (f'latest_age={age_or_marker:.0f}s (>{B2_STALE_LATEST_SECS}s); '
                      f'latest_ts={latest_ts.isoformat() if latest_ts else None}')
        msg = f"[BREAKER §2 STALE] Router log not fresh: {reason}{args.test_suffix}"
        log(f"FIRE B2_STALE reason={reason}")
        send_telegram(msg, dry_run=args.telegram_dry_run)
        return 0  # cannot trust §2 evaluation when router data is stale (parallels §1 STALE-FRESHNESS)

    # CRITICAL check (absolute-count K self_authored drops in 5-min window)
    count, drops = check_b2_critical(records, now)
    if drops:
        sample_raws = [d.get('raw', '')[:80] for d in drops[:3]]
        msg = (f"[BREAKER §2 CRITICAL] Self-traffic loop detected: "
               f"{count} self_authored drops in last {B2_WINDOW_SECS}s "
               f"(threshold ≥{B2_ABSOLUTE_COUNT_THRESHOLD}). "
               f"Phase 1 filter is dropping correctly but loop shape suggests "
               f"upstream bypass or misclassification. "
               f"Sample raws: {sample_raws}{args.test_suffix}")
        log(f"FIRE B2_CRITICAL count={count} threshold={B2_ABSOLUTE_COUNT_THRESHOLD}")
        write_sentinel(args.sentinel_path, {
            "trigger": "SELF_TRAFFIC",
            "timestamp": now.isoformat(),
            "values": {
                "self_authored_drop_count": count,
                "window_secs": B2_WINDOW_SECS,
                "threshold": B2_ABSOLUTE_COUNT_THRESHOLD,
                "latest_drop_ts": drops[-1]['ts'].isoformat() if drops[-1]['ts'] else None,
                "earliest_drop_ts": drops[0]['ts'].isoformat() if drops[0]['ts'] else None,
            },
            "reset_instructions": (
                f"Remove this file to re-arm Breakers: "
                f"rm {args.sentinel_path}"
            ),
        })
        ok = send_telegram(msg, dry_run=args.telegram_dry_run)
        if not ok:
            log("B2_CRITICAL_ALERT_NO_TELEGRAM sentinel written but telegram delivery failed")
            return 2
        return 0

    log(f"B2_NORMAL self_authored_drops_in_window={count} "
        f"total_records_parsed={len(records)} "
        f"window_secs={B2_WINDOW_SECS}")
    return 0

# ---------------------------------------------------------------------------
# Main (orchestrator)
# ---------------------------------------------------------------------------

def main(argv=None):
    p = argparse.ArgumentParser()
    p.add_argument('--jsonl-path', default=REAL_JSONL_PATH)
    p.add_argument('--router-log-path', default=REAL_ROUTER_LOG_PATH)
    p.add_argument('--sentinel-path', default=REAL_SENTINEL_PATH)
    p.add_argument('--log-path', default=REAL_LOG_PATH)
    p.add_argument('--simulate', choices=['normal', 'warning', 'critical', 'projected', 'stale',
                                          'self_traffic_critical', 'router_stale'])
    p.add_argument('--telegram-dry-run', action='store_true')
    p.add_argument('--test-suffix', default='')
    args = p.parse_args(argv)

    global _LOG_PATH
    _LOG_PATH = args.log_path

    now = datetime.now(timezone.utc)

    # Sentinel short-circuit (covers BOTH evaluators)
    if os.path.exists(args.sentinel_path):
        log(f"HEARTBEAT sentinel present at {args.sentinel_path}; suppressing alerts (both §1 and §2)")
        return 0

    rc1 = run_b1(args, now)

    # If §1 just wrote sentinel during this tick, skip §2 — system is now paused.
    if os.path.exists(args.sentinel_path):
        log("B2_SKIPPED_SENTINEL_JUST_WRITTEN_BY_B1")
        return rc1

    rc2 = run_b2(args, now)
    return max(rc1, rc2)

if __name__ == "__main__":
    try:
        sys.exit(main())
    except Exception as e:
        try:
            log(f"FATAL {type(e).__name__}: {e}")
        except Exception:
            pass
        sys.exit(2)
