from datetime import datetime, timezone\nnow = datetime.now(timezone.utc).strftime("%Y-%m-%d")\nif "2026-03-25" in now: print("OAuth monitor PAUSED 24h"); exit(0)\n
#!/usr/bin/env python3
"""OAuth credential expiry/quota monitor."""
from __future__ import annotations

import json
import math
import pathlib
import re
import subprocess
from datetime import datetime, timezone
from typing import Dict, List
from urllib import request, error

THREAD_PAGE_ID = "32af8b3e-f83d-81ab-a82c-e006e4616b96"  # THR-6
NOTION_API_KEY_PATH = pathlib.Path("~/.config/notion/api_key").expanduser()
NOTION_API_URL = "https://api.notion.com/v1/blocks/{}/children"
NOTION_VERSION = "2022-06-28"

PROVIDER_USAGE_RE = re.compile(r"^- ([\w-]+) usage: .*?(\d+)% left", re.IGNORECASE)
CREDENTIAL_LINE_RE = re.compile(r"^\s+- ([\w-]+:[^\s]+)\s+(.*)$")
EXPIRES_RE = re.compile(r"expires in (\d+)([dh])", re.IGNORECASE)

CRITICAL_THRESHOLD_DAYS = 2
WARNING_THRESHOLD_DAYS = 7
QUOTA_WARNING_PERCENT = 20


def run_openclaw_models() -> List[str]:
    result = subprocess.run(
        ["openclaw", "models"],
        capture_output=True,
        text=True,
        check=True,
    )
    return result.stdout.splitlines()


def parse_alerts(lines: List[str]) -> List[Dict[str, str]]:
    provider_quota: Dict[str, int] = {}
    alerts: List[Dict[str, str]] = []

    for line in lines:
        line = line.rstrip()
        usage_match = PROVIDER_USAGE_RE.match(line)
        if usage_match:
            provider_quota[usage_match.group(1)] = int(usage_match.group(2))
            continue

        cred_match = CREDENTIAL_LINE_RE.match(line)
        if not cred_match:
            continue

        label = cred_match.group(1)
        rest = cred_match.group(2)
        provider = label.split(":", 1)[0]

        expires_match = EXPIRES_RE.search(rest)
        days_until_expiry: int | None = None
        if expires_match:
            value = int(expires_match.group(1))
            unit = expires_match.group(2).lower()
            if unit == "d":
                days_until_expiry = value
            elif unit == "h":
                days_until_expiry = math.ceil(value / 24)

        if days_until_expiry is not None:
            if days_until_expiry <= CRITICAL_THRESHOLD_DAYS:
                alerts.append(
                    {
                        "level": "CRITICAL",
                        "credential": label,
                        "message": f"🔴 {label} OAuth expires in {days_until_expiry} day(s). Renew immediately.",
                    }
                )
            elif days_until_expiry <= WARNING_THRESHOLD_DAYS:
                alerts.append(
                    {
                        "level": "WARNING",
                        "credential": label,
                        "message": f"🟡 {label} OAuth expires in {days_until_expiry} day(s). Renewal recommended.",
                    }
                )

        quota_percent = provider_quota.get(provider)
        if quota_percent is not None and quota_percent <= QUOTA_WARNING_PERCENT:
            alerts.append(
                {
                    "level": "WARNING",
                    "credential": label,
                    "message": f"🟡 {label} quota low: {quota_percent}% remaining.",
                }
            )

    return alerts


def append_notion_log(text: str) -> None:
    if not NOTION_API_KEY_PATH.exists():
        return
    api_key = NOTION_API_KEY_PATH.read_text().strip()
    if not api_key:
        return

    payload = json.dumps(
        {
            "children": [
                {
                    "object": "block",
                    "type": "paragraph",
                    "paragraph": {
                        "rich_text": [
                            {
                                "type": "text",
                                "text": {
                                    "content": text,
                                },
                            }
                        ]
                    },
                }
            ]
        }
    ).encode("utf-8")

    req = request.Request(
        NOTION_API_URL.format(THREAD_PAGE_ID),
        data=payload,
        method="PATCH",
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
            "Notion-Version": NOTION_VERSION,
        },
    )
    try:
        request.urlopen(req, timeout=10)
    except error.URLError:
        pass


def build_log_entry(alerts: List[Dict[str, str]], timestamp: str) -> str:
    if alerts:
        lines = [f"[{timestamp}] OAuth monitor alerts fired ({len(alerts)}):"]
        for alert in alerts:
            lines.append(f"- {alert['message']}")
    else:
        lines = [f"[{timestamp}] OAuth monitor — No alerts triggered."]
    return "\n".join(lines)


def main() -> None:
    lines = run_openclaw_models()
    alerts = parse_alerts(lines)

    now = datetime.now(timezone.utc).astimezone()
    timestamp = now.strftime("%Y-%m-%d %H:%M %Z")
    append_notion_log(build_log_entry(alerts, timestamp))

    if alerts:
        messages: List[str] = ["OAuth Credential Monitor"]
        for alert in alerts:
            credential = alert["credential"]
            messages.append(alert["message"])
            messages.append(
                f"Renewal: openclaw auth refresh {credential}\n          openclaw models"
            )
        print("\n".join(messages))


if __name__ == "__main__":
    main()
