#!/usr/bin/env python3
"""
circuit-breaker-watchdog.py — RFC-002 §20 Breaker §1: cost-rate (two-tier).

Reads ~/.openclaw/workspace/state/oauth-usage-history.jsonl produced by
oauth_usage_collect.py (5-min cadence). Evaluates four signals and alerts
via Telegram (using the same `openclaw message send` path as
verify-system-state.sh and gateway-watchdog.sh).

Tiers (priority order; only highest fires per run):
  CRITICAL           — latest day_pct >= 80% (immediate, no debounce)
  CRITICAL-PROJECTED — linear extrapolation of day_pct over the
                       last 60 min projects hitting 100% before
                       hours_until_reset elapses
  WARNING            — last 2 evaluable rows both have day_pct >= 20%
                       (debounce: requires 2 consecutive *evaluable* rows)
  STALE              — data freshness/quality signal (informational only):
                         (a) latest JSONL timestamp older than 600s
                             (2x StartInterval), or
                         (b) >50% of last 10 rows are errors

Side effects:
  CRITICAL / CRITICAL-PROJECTED:
    - Writes sentinel JSON at ~/.openclaw/workspace/state/breaker-paused.flag
    - Telegram alert via `openclaw message send --channel telegram
      --target 8032472383`
    - On Telegram-send failure: log CRITICAL_ALERT_NO_TELEGRAM and exit 2
      so launchd surfaces the failure. Sentinel still writes.
  WARNING:
    - Telegram alert only, no sentinel.
  STALE:
    - Telegram alert only, no sentinel, no other state change.
  Sentinel exists at start of run:
    - Heartbeat log entry, no alert, exit 0.

Pause semantics (honest): no other process reads the sentinel today
(verified by grep across ~/.openclaw/). The sentinel is a state marker
+ alert-suppression mechanism in this script only — not an automation
pause. Removal is manual: `rm ~/.openclaw/workspace/state/breaker-paused.flag`.

Test mode:
  --simulate {warning,critical,projected,stale,normal}
    Inject synthetic rows instead of reading the real JSONL.
  --jsonl-path PATH       Override JSONL path (default: real path).
  --sentinel-path PATH    Override sentinel path (default: real path).
  --log-path PATH         Override log path (default: /tmp/openclaw/...).
  --telegram-dry-run      Pass --dry-run to `openclaw message send`
                          (prints payload, skips actual send).
  --test-suffix STR       Appended to alert messages (e.g. " [TEST]").

Exit codes:
  0  normal run (any tier including CRITICAL when telegram succeeded)
  2  CRITICAL / CRITICAL-PROJECTED tier fired but telegram-send failed
  >0 unrecoverable error (sys.exit on uncaught exception → 2)
"""

import argparse
import json
import os
import subprocess
import sys
from datetime import datetime, timezone

# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

REAL_JSONL_PATH    = "/Users/openclaw/.openclaw/workspace/state/oauth-usage-history.jsonl"
REAL_SENTINEL_PATH = "/Users/openclaw/.openclaw/workspace/state/breaker-paused.flag"
REAL_LOG_PATH      = "/tmp/openclaw/circuit-breaker-watchdog.log"

OPENCLAW_BIN     = "/opt/homebrew/bin/openclaw"
TELEGRAM_CHANNEL = "telegram"
TELEGRAM_TARGET  = "8032472383"
SEND_TIMEOUT_SECS = 15

START_INTERVAL_SECS    = 300        # must match plist StartInterval
STALE_FRESHNESS_SECS   = 2 * START_INTERVAL_SECS  # 600s
N_WINDOW               = 12         # ~60 min at 5-min cadence
N_RECENT_FOR_QUALITY   = 10
ERROR_RATIO_THRESHOLD  = 0.5        # >50%

WARNING_DAY_PCT          = 20.0
CRITICAL_DAY_PCT         = 80.0
PROJECTION_TARGET_PCT    = 100.0    # day_pct cap; project breach before reset
PROJECTION_MIN_ROWS      = 3        # need at least 3 evaluable rows for slope

# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------

_LOG_PATH = REAL_LOG_PATH  # mutated by main() once args are parsed

def log(msg):
    os.makedirs(os.path.dirname(_LOG_PATH), exist_ok=True)
    ts = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')
    with open(_LOG_PATH, 'a') as f:
        f.write(f"[{ts}] {msg}\n")

# ---------------------------------------------------------------------------
# JSONL tail-read
# ---------------------------------------------------------------------------

def tail_lines(path, n):
    """Read approximately the last n lines of a text file efficiently."""
    if not os.path.exists(path):
        return []
    with open(path, 'rb') as f:
        f.seek(0, 2)
        size = f.tell()
        block = 64 * 1024
        data = b''
        while size > 0 and data.count(b'\n') <= n:
            read_size = min(block, size)
            size -= read_size
            f.seek(size)
            data = f.read(read_size) + data
        lines = data.splitlines()
    return [l.decode('utf-8', errors='replace') for l in lines[-n:]]

def parse_rows(lines):
    """Parse JSONL lines, dropping any that fail to decode."""
    rows = []
    for ln in lines:
        ln = ln.strip()
        if not ln:
            continue
        try:
            rows.append(json.loads(ln))
        except json.JSONDecodeError:
            log(f"SKIP malformed JSON: {ln[:120]}")
    return rows

# ---------------------------------------------------------------------------
# Row classification
# ---------------------------------------------------------------------------

def is_evaluable(row):
    """A row is evaluable iff error is not true and day_pct/week_pct are both numeric."""
    if row.get('error') is True:
        return False
    if row.get('day_pct') is None or row.get('week_pct') is None:
        return False
    return True

def parse_ts(s):
    """
    Parse ISO 8601 timestamp; return aware datetime or None.

    Hardened against format variance:
      - Strips trailing 'Z' and replaces with '+00:00' (Python 3.9
        fromisoformat() does not accept 'Z').
      - Assumes UTC if the parsed datetime is naive.
    Today's collector (oauth_usage_collect.py:154) emits +00:00 explicitly,
    so the Z-stripping branch is preventive only.
    """
    if not s:
        return None
    try:
        if isinstance(s, str) and s.endswith('Z'):
            s = s[:-1] + '+00:00'
        dt = datetime.fromisoformat(s)
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        return dt
    except (ValueError, TypeError):
        return None

# ---------------------------------------------------------------------------
# Tier evaluation
# ---------------------------------------------------------------------------

def check_stale(rows, now):
    """
    Return ('freshness'|'quality'|'both', reason_str) or (None, None).
    'freshness' means the latest row is too old → caller should NOT
    proceed to tier evaluation. 'quality' means error ratio is high
    but the latest row may still be fresh → caller MAY proceed.
    """
    if not rows:
        return ('freshness', 'no rows in JSONL')

    latest = rows[-1]
    latest_ts = parse_ts(latest.get('timestamp'))
    age_secs = (now - latest_ts).total_seconds() if latest_ts else None

    freshness_bad = age_secs is None or age_secs > STALE_FRESHNESS_SECS

    recent = rows[-N_RECENT_FOR_QUALITY:]
    if recent:
        err_count = sum(1 for r in recent if r.get('error') is True)
        err_ratio = err_count / len(recent)
    else:
        err_ratio = 0.0
    quality_bad = err_ratio > ERROR_RATIO_THRESHOLD

    if freshness_bad and quality_bad:
        return ('both',
                f'latest_age={age_secs}s (>{STALE_FRESHNESS_SECS}s) '
                f'AND error_ratio={err_ratio:.0%} of last {len(recent)}')
    if freshness_bad:
        return ('freshness',
                f'latest_age={age_secs}s (>{STALE_FRESHNESS_SECS}s)')
    if quality_bad:
        return ('quality',
                f'error_ratio={err_ratio:.0%} of last {len(recent)}')
    return (None, None)

def check_critical(evaluable):
    if not evaluable:
        return None
    latest = evaluable[-1]
    if latest['day_pct'] >= CRITICAL_DAY_PCT:
        return latest
    return None

def check_projected(evaluable):
    """
    Linear regression of day_pct vs. time over the evaluable window.
    Returns (latest_row, projected_hours_to_100) if breach projected
    before reset, else None.
    """
    if len(evaluable) < PROJECTION_MIN_ROWS:
        return None

    pts = []
    for r in evaluable:
        ts = parse_ts(r.get('timestamp'))
        if ts is None:
            continue
        pts.append((ts.timestamp(), float(r['day_pct'])))
    if len(pts) < PROJECTION_MIN_ROWS:
        return None

    n = len(pts)
    sum_x = sum(p[0] for p in pts)
    sum_y = sum(p[1] for p in pts)
    sum_xy = sum(p[0] * p[1] for p in pts)
    sum_xx = sum(p[0] * p[0] for p in pts)
    denom = n * sum_xx - sum_x * sum_x
    if denom == 0:
        return None
    slope_per_sec = (n * sum_xy - sum_x * sum_y) / denom  # %/sec
    if slope_per_sec <= 0:
        return None  # flat or decreasing — no projected breach

    latest = evaluable[-1]
    headroom_pct = PROJECTION_TARGET_PCT - latest['day_pct']
    if headroom_pct <= 0:
        # already at/above 100% — CRITICAL would have caught this
        return None
    secs_to_100 = headroom_pct / slope_per_sec
    hours_to_100 = secs_to_100 / 3600.0

    hours_until_reset = latest.get('hours_until_reset')
    if hours_until_reset is None:
        return None
    if hours_to_100 < hours_until_reset:
        return (latest, hours_to_100)
    return None

def check_warning(evaluable):
    if len(evaluable) < 2:
        return None
    if (evaluable[-1]['day_pct'] >= WARNING_DAY_PCT and
        evaluable[-2]['day_pct'] >= WARNING_DAY_PCT):
        return evaluable[-1]
    return None

# ---------------------------------------------------------------------------
# Side effects
# ---------------------------------------------------------------------------

def send_telegram(message, dry_run=False):
    cmd = [
        OPENCLAW_BIN, "message", "send",
        "--channel", TELEGRAM_CHANNEL,
        "--target", TELEGRAM_TARGET,
        "--message", message,
    ]
    if dry_run:
        cmd.append("--dry-run")
    try:
        result = subprocess.run(
            cmd, capture_output=True, text=True, timeout=SEND_TIMEOUT_SECS
        )
        if result.returncode != 0:
            log(f"TELEGRAM_SEND_FAILED rc={result.returncode} "
                f"stderr={result.stderr.strip()[:200]}")
            return False
        log(f"TELEGRAM_SENT dry_run={dry_run} chars={len(message)}")
        return True
    except subprocess.TimeoutExpired:
        log(f"TELEGRAM_SEND_TIMEOUT after {SEND_TIMEOUT_SECS}s")
        return False
    except Exception as e:
        log(f"TELEGRAM_SEND_EXCEPTION {type(e).__name__}: {e}")
        return False

def write_sentinel(path, payload):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    tmp = path + ".tmp"
    with open(tmp, 'w') as f:
        json.dump(payload, f, indent=2)
        f.write("\n")
    os.rename(tmp, path)
    log(f"SENTINEL_WRITTEN {path}")

# ---------------------------------------------------------------------------
# Simulation (test mode)
# ---------------------------------------------------------------------------

def synth_rows(scenario, now):
    """Generate synthetic JSONL rows for --simulate."""
    base = now.timestamp()
    def row(offset_secs, **overrides):
        ts = datetime.fromtimestamp(base + offset_secs, tz=timezone.utc).isoformat()
        r = {
            "timestamp": ts,
            "day_pct": 0.0, "week_pct": 1.0,
            "hours_until_reset": 4.0,
            "credits_remaining": None,
            "raw_output": "<synthetic>"
        }
        r.update(overrides)
        return r

    if scenario == 'normal':
        return [row(-i*300, day_pct=1.0) for i in range(12, 0, -1)]
    if scenario == 'warning':
        # Flat 22.5% — slope=0 ensures projected does NOT trip first.
        return [row(-i*300, day_pct=22.5) for i in range(12, 0, -1)]
    if scenario == 'critical':
        rows = [row(-i*300, day_pct=70.0) for i in range(12, 0, -1)]
        rows[-1]['day_pct'] = 85.0
        return rows
    if scenario == 'projected':
        # Steep linear climb that breaches 100% before reset (hours_until_reset=2.0)
        rows = []
        for i in range(12, 0, -1):
            offset = -i * 300
            # day_pct climbs ~3%/5min = 36%/hr
            pct = max(0.0, 50.0 - i * 3.0)
            rows.append(row(offset, day_pct=pct, hours_until_reset=2.0))
        return rows
    if scenario == 'stale':
        # Last row is 30 min old → freshness trigger
        return [row(-i*300 - 1800, day_pct=10.0) for i in range(12, 0, -1)]
    raise ValueError(f"unknown scenario: {scenario}")

# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

def fmt_values(row):
    return (f"day_pct={row.get('day_pct')}% "
            f"week_pct={row.get('week_pct')}% "
            f"hours_until_reset={row.get('hours_until_reset')} "
            f"ts={row.get('timestamp')}")

def main(argv=None):
    p = argparse.ArgumentParser()
    p.add_argument('--jsonl-path', default=REAL_JSONL_PATH)
    p.add_argument('--sentinel-path', default=REAL_SENTINEL_PATH)
    p.add_argument('--log-path', default=REAL_LOG_PATH)
    p.add_argument('--simulate', choices=['normal','warning','critical','projected','stale'])
    p.add_argument('--telegram-dry-run', action='store_true')
    p.add_argument('--test-suffix', default='')
    args = p.parse_args(argv)

    global _LOG_PATH
    _LOG_PATH = args.log_path

    now = datetime.now(timezone.utc)

    # Sentinel short-circuit
    if os.path.exists(args.sentinel_path):
        log(f"HEARTBEAT sentinel present at {args.sentinel_path}; suppressing alerts")
        return 0

    # Read rows
    if args.simulate:
        rows = synth_rows(args.simulate, now)
        log(f"SIMULATE scenario={args.simulate} rows={len(rows)}")
    else:
        lines = tail_lines(args.jsonl_path, max(N_WINDOW, N_RECENT_FOR_QUALITY))
        rows = parse_rows(lines)

    # STALE check first
    stale_kind, stale_reason = check_stale(rows, now)
    if stale_kind in ('freshness', 'both'):
        msg = f"[BREAKER §1 STALE] Cost-rate data not fresh: {stale_reason}{args.test_suffix}"
        log(f"FIRE STALE-FRESHNESS reason={stale_reason}")
        send_telegram(msg, dry_run=args.telegram_dry_run)
        return 0  # cannot trust tier evaluation when data is stale
    if stale_kind == 'quality':
        msg = f"[BREAKER §1 STALE] Cost-rate data quality degraded: {stale_reason}{args.test_suffix}"
        log(f"NOTE STALE-QUALITY reason={stale_reason}; continuing to tier eval")
        send_telegram(msg, dry_run=args.telegram_dry_run)
        # continue — latest row may still be evaluable

    # Filter to evaluable rows; log skips
    evaluable = []
    for r in rows:
        if is_evaluable(r):
            evaluable.append(r)
        else:
            reason = 'error=true' if r.get('error') is True else 'null_metric'
            log(f"SKIP_ROW {reason} ts={r.get('timestamp')}")

    if not evaluable:
        log("NO_EVALUABLE_ROWS exiting normal")
        return 0

    # Tier checks (priority order)
    crit = check_critical(evaluable)
    if crit:
        msg = (f"[BREAKER §1 CRITICAL] Cost rate breached {CRITICAL_DAY_PCT}% threshold. "
               f"{fmt_values(crit)}{args.test_suffix}")
        log(f"FIRE CRITICAL {fmt_values(crit)}")
        # Write sentinel FIRST so durable state is recorded even if telegram fails.
        write_sentinel(args.sentinel_path, {
            "trigger": "CRITICAL",
            "timestamp": now.isoformat(),
            "values": {k: crit.get(k) for k in
                       ('day_pct','week_pct','hours_until_reset',
                        'credits_remaining','timestamp')},
            "reset_instructions": (
                f"Remove this file to re-arm Breaker §1: "
                f"rm {args.sentinel_path}"
            ),
        })
        ok = send_telegram(msg, dry_run=args.telegram_dry_run)
        if not ok:
            log("CRITICAL_ALERT_NO_TELEGRAM sentinel written but telegram delivery failed")
            return 2
        return 0

    proj = check_projected(evaluable)
    if proj:
        latest, hrs = proj
        msg = (f"[BREAKER §1 CRITICAL-PROJECTED] Linear extrapolation projects "
               f"day_pct=100% in {hrs:.2f}h, before reset in "
               f"{latest.get('hours_until_reset'):.2f}h. "
               f"{fmt_values(latest)}{args.test_suffix}")
        log(f"FIRE CRITICAL-PROJECTED hrs_to_100={hrs:.2f} {fmt_values(latest)}")
        write_sentinel(args.sentinel_path, {
            "trigger": "CRITICAL-PROJECTED",
            "timestamp": now.isoformat(),
            "values": {
                **{k: latest.get(k) for k in
                   ('day_pct','week_pct','hours_until_reset',
                    'credits_remaining','timestamp')},
                "projected_hours_to_100": hrs,
            },
            "reset_instructions": (
                f"Remove this file to re-arm Breaker §1: "
                f"rm {args.sentinel_path}"
            ),
        })
        ok = send_telegram(msg, dry_run=args.telegram_dry_run)
        if not ok:
            log("CRITICAL_ALERT_NO_TELEGRAM sentinel written but telegram delivery failed")
            return 2
        return 0

    warn = check_warning(evaluable)
    if warn:
        msg = (f"[BREAKER §1 WARNING] Cost rate >= {WARNING_DAY_PCT}% for 2 "
               f"consecutive evaluable data rows. {fmt_values(warn)}{args.test_suffix}")
        log(f"FIRE WARNING {fmt_values(warn)}")
        send_telegram(msg, dry_run=args.telegram_dry_run)
        return 0

    log(f"NORMAL latest day_pct={evaluable[-1]['day_pct']}% "
        f"week_pct={evaluable[-1]['week_pct']}% "
        f"evaluable_rows={len(evaluable)}/{len(rows)}")
    return 0

if __name__ == "__main__":
    try:
        sys.exit(main())
    except Exception as e:
        try:
            log(f"FATAL {type(e).__name__}: {e}")
        except Exception:
            pass
        sys.exit(2)
