#!/usr/bin/env python3
"""
notion-write-scraper.py — Structured extraction of openclaw-side
Notion writes from per-agent session .jsonl files. Emits one JSON
line per resolved write to /tmp/openclaw/notion-writes.jsonl.

Part of the Notion write instrumentation effort (INS.2). Read-only
with respect to all input files (session .jsonl); only writes its
own state file and output JSONL.

Invoked by LaunchAgent ai.openclaw.notion-write-scraper every 60s.
Safe to run manually for testing via --simulate or --real-read.

See INS.1 Phase 1 inspection for rationale and INS.2 Phase 1 for
schema/invariant verifications.
"""

import argparse
import glob
import json
import os
import re
import shutil
import sys
import tempfile
import time
from datetime import datetime, timezone
from pathlib import Path

# --- Constants ---------------------------------------------------

AGENTS = [
    "main", "worker", "ops-maintainer",
    "research-intelligence", "systems-architect", "ops-manager",
]
AGENTS_BASE = Path("/Users/openclaw/.openclaw/agents")
STATE_PATH = Path(
    "/Users/openclaw/.openclaw/workspace/state/notion-write-scraper-state.json"
)
OUTPUT_PATH = Path("/tmp/openclaw/notion-writes.jsonl")

# Orphan timeout: if a toolCall is seen but no matching toolResult
# arrives within this window, emit an orphaned entry and drop it
# from unresolved_calls so the dict cannot grow unbounded.
ORPHAN_TIMEOUT_SECONDS = 600  # 10 minutes

# Filename markers for archived/retired sessions. INS.1 confirmed
# active files carry none of these.
ARCHIVE_MARKERS = (".reset.", ".deleted.", ".bak-")

# --- Regexes -----------------------------------------------------

# URL inside a curl command: captures the /v1/... path.
NOTION_URL_RE = re.compile(r"https?://api\.notion\.com(/v1/[^\s\"']+)")
# The -X flag with its method.
METHOD_RE = re.compile(r"-X\s+(POST|PATCH|DELETE)\b")
# Read-shape POSTs that must NOT be counted as writes.
READ_POST_RE = re.compile(
    r"/v1/(search\b|databases/[0-9a-f-]+/query\b|data_sources/[0-9a-f-]+/query\b)"
)
# UUID embedded in a notion /v1 path (dashed or undashed).
UUID_IN_PATH_RE = re.compile(
    r"/v1/(?:pages|blocks|comments|databases|data_sources)/([0-9a-fA-F-]{32,36})"
)
# Canonical dashed UUID form (for validation in tests).
UUID_DASHED_RE = re.compile(
    r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
)
HEX_ONLY_RE = re.compile(r"[^0-9a-fA-F]")
# Session UUID inside filename like <uuid>.jsonl
SESSION_UUID_RE = re.compile(r"([0-9a-fA-F-]{36})\.jsonl")

# --- Utility -----------------------------------------------------


def now_iso_utc():
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def now_epoch():
    return int(time.time())


def log(msg):
    """Single-line log; captured to plist-configured log file."""
    print(f"{now_iso_utc()} {msg}", flush=True)


def canonicalize_uuid(raw):
    """Normalize a UUID to dashed 8-4-4-4-12 form.
    Accepts both dashed and undashed 32-hex inputs. Returns '' on
    any input that isn't exactly 32 hex chars after stripping."""
    if not raw:
        return ""
    hex_only = HEX_ONLY_RE.sub("", str(raw)).lower()
    if len(hex_only) != 32:
        return ""
    return (
        f"{hex_only[0:8]}-{hex_only[8:12]}-{hex_only[12:16]}"
        f"-{hex_only[16:20]}-{hex_only[20:32]}"
    )


def read_state(path):
    """Fail-open: return empty state on any error."""
    try:
        with path.open("r", encoding="utf-8") as f:
            data = json.load(f)
        sf = data.get("session_files", {})
        uc = data.get("unresolved_calls", {})
        return {
            "session_files": sf if isinstance(sf, dict) else {},
            "unresolved_calls": uc if isinstance(uc, dict) else {},
        }
    except Exception:
        return {"session_files": {}, "unresolved_calls": {}}


def write_state_atomic(path, data):
    """Atomic tmp + rename on the same filesystem."""
    path.parent.mkdir(parents=True, exist_ok=True)
    fd, tmp_path = tempfile.mkstemp(
        prefix=".notion-write-scraper-state-", dir=str(path.parent)
    )
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            json.dump(data, f, indent=2, sort_keys=True)
            f.write("\n")
        os.replace(tmp_path, path)
    except Exception:
        try:
            os.unlink(tmp_path)
        except Exception:
            pass
        raise


def list_active_sessions():
    """Glob active (non-archived) .jsonl files across all agents."""
    files = []
    for agent in AGENTS:
        base = AGENTS_BASE / agent / "sessions"
        if not base.is_dir():
            continue
        for p in base.glob("*.jsonl"):
            name = p.name
            if any(m in name for m in ARCHIVE_MARKERS):
                continue
            files.append(str(p))
    return sorted(files)


def agent_id_from_path(path):
    parts = Path(path).parts
    try:
        i = parts.index("agents")
        return parts[i + 1]
    except (ValueError, IndexError):
        return ""


def session_id_from_path(path):
    m = SESSION_UUID_RE.search(os.path.basename(path))
    return m.group(1) if m else ""


# --- Parsing primitives ------------------------------------------


def extract_method(cmd):
    m = METHOD_RE.search(cmd)
    return m.group(1) if m else ""


def extract_endpoint_and_pageid(cmd):
    """Return (endpoint_path, page_id_canonical_from_url).
    endpoint is the raw /v1/... path string; page_id is '' unless
    the URL embeds a UUID after pages|blocks|comments|databases|
    data_sources."""
    m = NOTION_URL_RE.search(cmd)
    if not m:
        return "", ""
    url_path = m.group(1).strip().strip('"').strip("'")
    if "?" in url_path:
        url_path = url_path.split("?", 1)[0]
    endpoint = url_path
    uid = ""
    um = UUID_IN_PATH_RE.search(url_path)
    if um:
        uid = canonicalize_uuid(um.group(1))
    return endpoint, uid


def is_write_command(cmd):
    """True iff this command is a Notion write.
    Criteria:
      - Contains 'curl' and 'api.notion.com'
      - Has -X POST|PATCH|DELETE
      - Not a pure read-shape POST (/v1/search or /query), unless
        the command chains multiple -X flags (pipe/sequence)."""
    if not isinstance(cmd, str):
        return False
    if "api.notion.com" not in cmd:
        return False
    if "curl" not in cmd:
        return False
    method = extract_method(cmd)
    if method not in ("POST", "PATCH", "DELETE"):
        return False
    if method == "POST" and READ_POST_RE.search(cmd):
        # Single-verb command and it's a read-shape POST -> skip.
        if len(METHOD_RE.findall(cmd)) == 1:
            return False
    return True


def extract_page_id_from_result_text(text):
    """For creates (POST /v1/pages), the page id is in response body."""
    if not text:
        return ""
    try:
        obj = json.loads(text)
    except Exception:
        # Loose recovery: first {...} substring.
        start = text.find("{")
        end = text.rfind("}")
        if start < 0 or end <= start:
            return ""
        try:
            obj = json.loads(text[start:end + 1])
        except Exception:
            return ""
    if isinstance(obj, dict) and obj.get("object") in ("page", "block"):
        return canonicalize_uuid(obj.get("id", ""))
    return ""


def parse_jsonl_bytes(raw_bytes):
    """Parse raw bytes into (list_of_json_objs, consumed_byte_count).
    Discards any trailing partial line (no terminating newline).
    consumed_byte_count is the length of the consumed prefix in
    utf-8 bytes, to update the file's byte_offset safely."""
    if not raw_bytes:
        return [], 0
    text = raw_bytes.decode("utf-8", errors="replace")
    last_nl = text.rfind("\n")
    if last_nl < 0:
        return [], 0
    complete = text[:last_nl + 1]
    consumed = len(complete.encode("utf-8"))
    parsed = []
    for line in complete.splitlines():
        line = line.strip()
        if not line:
            continue
        try:
            parsed.append(json.loads(line))
        except Exception as e:
            log(f"WARN json_parse_failed line_len={len(line)} err={type(e).__name__}")
    return parsed, consumed


# --- Two-pass extraction -----------------------------------------


def process_new_objs(parsed_objs, session_path, unresolved):
    """Two-pass on new objects from one session file.
    Pass A collects candidate toolCall writes; Pass B matches
    toolResults (including those paired with toolCalls from earlier
    ticks via the shared `unresolved` dict).
    Mutates unresolved. Returns list of resolved write records."""
    agent_id = agent_id_from_path(session_path)
    session_id = session_id_from_path(session_path)
    resolved = []

    # Pass A: toolCalls (role == "assistant", content[].type in
    # {"toolCall", "tool_use"}, name == "exec"). INS.2 Phase 1
    # observed only "toolCall" in current data; "tool_use" accepted
    # for forward-compat with potential codex-harness format drift.
    for obj in parsed_objs:
        msg = obj.get("message") or {}
        if msg.get("role") != "assistant":
            continue
        ts = obj.get("timestamp") or ""
        for item in (msg.get("content") or []):
            if item.get("type") not in ("toolCall", "tool_use"):
                continue
            if item.get("name") != "exec":
                continue
            call_id = item.get("id") or item.get("toolCallId")
            if not call_id:
                continue
            args = item.get("arguments") or {}
            if not isinstance(args, dict):
                continue
            cmd = args.get("command", "")
            if not is_write_command(cmd):
                continue
            method = extract_method(cmd)
            endpoint, page_id = extract_endpoint_and_pageid(cmd)
            unresolved[call_id] = {
                "agent_id": agent_id,
                "session_id": session_id,
                "session_path": session_path,
                "method": method,
                "endpoint": endpoint,
                "page_id_canonical": page_id,
                "first_seen_ts": ts,
                "first_seen_epoch": now_epoch(),
            }

    # Pass B: toolResults. Match by toolCallId into unresolved.
    for obj in parsed_objs:
        msg = obj.get("message") or {}
        if msg.get("role") != "toolResult":
            continue
        if msg.get("toolName") != "exec":
            continue
        call_id = msg.get("toolCallId")
        if not call_id or call_id not in unresolved:
            continue
        pending = unresolved.pop(call_id)
        details = msg.get("details") or {}
        page_id = pending.get("page_id_canonical") or ""
        if not page_id:
            # Create-page case: id is in response body.
            contents = msg.get("content") or []
            body = ""
            if contents and isinstance(contents[0], dict):
                body = contents[0].get("text", "") or ""
            page_id = extract_page_id_from_result_text(body)
        exit_code = details.get("exitCode")
        dur_ms = details.get("durationMs")
        resolved.append({
            "ts": pending.get("first_seen_ts", ""),
            "agent_id": pending.get("agent_id", ""),
            "session_id": pending.get("session_id", ""),
            "tool_call_id": call_id,
            "method": pending.get("method", ""),
            "endpoint": pending.get("endpoint", ""),
            "page_id_canonical": page_id,
            "exit_code": int(exit_code)
                if isinstance(exit_code, (int, float)) else -1,
            "duration_ms": int(dur_ms)
                if isinstance(dur_ms, (int, float)) else -1,
            "cwd": details.get("cwd", "") or "",
        })
    return resolved


def sweep_orphans(unresolved, cutoff_epoch):
    """Drop and emit unresolved calls older than cutoff_epoch."""
    emitted = []
    stale = [cid for cid, rec in unresolved.items()
             if rec.get("first_seen_epoch", 0) < cutoff_epoch]
    for cid in stale:
        rec = unresolved.pop(cid)
        emitted.append({
            "ts": rec.get("first_seen_ts", ""),
            "agent_id": rec.get("agent_id", ""),
            "session_id": rec.get("session_id", ""),
            "tool_call_id": cid,
            "method": rec.get("method", ""),
            "endpoint": rec.get("endpoint", ""),
            "page_id_canonical": rec.get("page_id_canonical", ""),
            "exit_code": -1,
            "duration_ms": -1,
            "cwd": "",
        })
    if emitted:
        log(f"NOTE orphaned_toolcalls count={len(emitted)}")
    return emitted


# --- Output ------------------------------------------------------


def append_output(output_path, records):
    if not records:
        return
    output_path.parent.mkdir(parents=True, exist_ok=True)
    with output_path.open("a", encoding="utf-8") as f:
        for rec in records:
            f.write(json.dumps(rec, sort_keys=True) + "\n")


# --- Main tick ---------------------------------------------------


def run_once(state_path, output_path, session_files_override=None):
    """One scan tick. Returns summary dict."""
    state = read_state(state_path)
    is_first_run = not state["session_files"]

    active = (session_files_override
              if session_files_override is not None
              else list_active_sessions())
    active_set = set(active)

    # Reconcile: drop state for gone files, drop orphans rooted in gone files.
    for p in list(state["session_files"].keys()):
        if p not in active_set:
            state["session_files"].pop(p, None)
    for cid in list(state["unresolved_calls"].keys()):
        sp = state["unresolved_calls"][cid].get("session_path")
        if sp and sp not in active_set:
            state["unresolved_calls"].pop(cid, None)

    total_new = 0
    bytes_read_first = 0
    per_agent = {a: 0 for a in AGENTS}

    for path in active:
        try:
            size = os.path.getsize(path)
        except OSError as e:
            log(f"WARN stat_failed path={path} err={type(e).__name__}")
            continue
        entry = state["session_files"].get(
            path, {"byte_offset": 0, "last_scan_ts": None})
        offset = int(entry.get("byte_offset") or 0)
        if size < offset:
            # File shrunk/rotated in place. Rescan from 0.
            offset = 0
        if size == offset:
            entry["last_scan_ts"] = now_iso_utc()
            state["session_files"][path] = entry
            continue
        try:
            with open(path, "rb") as f:
                f.seek(offset)
                raw = f.read(size - offset)
        except OSError as e:
            log(f"WARN read_failed path={path} err={type(e).__name__}")
            continue
        if is_first_run:
            bytes_read_first += len(raw)
        parsed, consumed = parse_jsonl_bytes(raw)
        new_offset = offset + consumed
        records = process_new_objs(
            parsed, path, state["unresolved_calls"])
        for r in records:
            ag = r.get("agent_id", "")
            if ag in per_agent:
                per_agent[ag] += 1
        total_new += len(records)
        append_output(output_path, records)

        entry["byte_offset"] = new_offset
        entry["last_scan_ts"] = now_iso_utc()
        state["session_files"][path] = entry

    # Global orphan sweep.
    orphaned_records = sweep_orphans(
        state["unresolved_calls"],
        cutoff_epoch=now_epoch() - ORPHAN_TIMEOUT_SECONDS,
    )
    total_orphaned = len(orphaned_records)
    append_output(output_path, orphaned_records)

    write_state_atomic(state_path, state)

    summary = {
        "files": len(active),
        "new_writes": total_new,
        "orphaned": total_orphaned,
        "unresolved_pending": len(state["unresolved_calls"]),
        "first_run": is_first_run,
        "bytes_read_first": bytes_read_first,
        "writes_per_agent": per_agent,
    }
    if is_first_run:
        log(
            f"SCAN_FIRST files={summary['files']} "
            f"bytes_read={summary['bytes_read_first']} "
            f"writes_extracted_per_agent="
            f"{json.dumps(summary['writes_per_agent'], sort_keys=True)}"
        )
    else:
        log(
            f"SCAN files={summary['files']} "
            f"new_writes={summary['new_writes']} "
            f"orphaned={summary['orphaned']} "
            f"unresolved_pending={summary['unresolved_pending']}"
        )
    return summary


# --- Simulation / test modes -------------------------------------


def _write_jsonl(path, objs, trailing_partial=None):
    with path.open("w", encoding="utf-8") as f:
        for o in objs:
            f.write(json.dumps(o) + "\n")
        if trailing_partial is not None:
            f.write(trailing_partial)  # no newline


def _simulate_fixture(tmp):
    """Build a mock agents/worker/sessions tree with a known mix.
    Uses valid 32-hex UUIDs (dashed and undashed) to exercise
    canonicalization both ways."""
    sess_dir = tmp / "agents" / "worker" / "sessions"
    sess_dir.mkdir(parents=True)
    sess = sess_dir / "11111111-1111-4111-8111-111111111111.jsonl"

    objs = [
        {"type": "session", "version": 3,
         "id": "11111111-1111-4111-8111-111111111111",
         "timestamp": "2026-04-22T12:00:00.000Z"},

        # CASE 1: GET (skip)
        {"type": "message", "timestamp": "2026-04-22T12:00:01.000Z",
         "message": {"role": "assistant", "content": [
             {"type": "toolCall", "name": "exec",
              "id": "call_GET001|fc_get001",
              "arguments": {"command":
                  "curl -s https://api.notion.com/v1/pages/"
                  "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}}]}},
        {"type": "message", "timestamp": "2026-04-22T12:00:01.500Z",
         "message": {"role": "toolResult",
                     "toolCallId": "call_GET001|fc_get001",
                     "toolName": "exec",
                     "content": [{"type": "text", "text": "{}"}],
                     "details": {"status": "completed", "exitCode": 0,
                                 "durationMs": 50, "cwd": "/tmp/x"}}},

        # CASE 2: POST /v1/search (read-shape, skip)
        {"type": "message", "timestamp": "2026-04-22T12:00:02.000Z",
         "message": {"role": "assistant", "content": [
             {"type": "toolCall", "name": "exec",
              "id": "call_SRCH01|fc_srch01",
              "arguments": {"command":
                  "curl -s -X POST https://api.notion.com/v1/search "
                  "-d '{\"query\":\"x\"}'"}}]}},
        {"type": "message", "timestamp": "2026-04-22T12:00:02.500Z",
         "message": {"role": "toolResult",
                     "toolCallId": "call_SRCH01|fc_srch01",
                     "toolName": "exec",
                     "content": [{"type": "text", "text": "{}"}],
                     "details": {"status": "completed", "exitCode": 0,
                                 "durationMs": 60, "cwd": "/tmp/x"}}},

        # CASE 3: POST /v1/data_sources/{id}/query (read-shape, skip)
        {"type": "message", "timestamp": "2026-04-22T12:00:03.000Z",
         "message": {"role": "assistant", "content": [
             {"type": "toolCall", "name": "exec",
              "id": "call_QRY001|fc_qry001",
              "arguments": {"command":
                  "curl -s -X POST https://api.notion.com/v1/"
                  "data_sources/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb/"
                  "query -d '{}'"}}]}},
        {"type": "message", "timestamp": "2026-04-22T12:00:03.500Z",
         "message": {"role": "toolResult",
                     "toolCallId": "call_QRY001|fc_qry001",
                     "toolName": "exec",
                     "content": [{"type": "text", "text": "{}"}],
                     "details": {"status": "completed", "exitCode": 0,
                                 "durationMs": 70, "cwd": "/tmp/x"}}},

        # CASE 4: TRUE WRITE - POST /v1/pages; page_id from body.
        {"type": "message", "timestamp": "2026-04-22T12:00:04.000Z",
         "message": {"role": "assistant", "content": [
             {"type": "toolCall", "name": "exec",
              "id": "call_CREATE01|fc_create01",
              "arguments": {"command":
                  "curl -s -X POST https://api.notion.com/v1/pages "
                  "-H 'Authorization: Bearer X' -d "
                  "'{\"parent\":{\"database_id\":"
                  "\"6c9649ff-a102-4e6e-bb49-80437714318a\"}}'"}}]}},
        {"type": "message", "timestamp": "2026-04-22T12:00:04.500Z",
         "message": {"role": "toolResult",
                     "toolCallId": "call_CREATE01|fc_create01",
                     "toolName": "exec",
                     "content": [{"type": "text", "text":
                         "{\"object\":\"page\",\"id\":"
                         "\"cccccccccccccccccccccccccccccccc\"}"}],
                     "details": {"status": "completed", "exitCode": 0,
                                 "durationMs": 592,
                                 "cwd":
                                 "/Users/openclaw/.openclaw/workspace"}}},

        # CASE 5: TRUE WRITE - PATCH /v1/pages/{undashed_id}
        {"type": "message", "timestamp": "2026-04-22T12:00:05.000Z",
         "message": {"role": "assistant", "content": [
             {"type": "toolCall", "name": "exec",
              "id": "call_PATCH01|fc_patch01",
              "arguments": {"command":
                  "curl -s -X PATCH 'https://api.notion.com/v1/"
                  "pages/dddddddddddddddddddddddddddddddd' "
                  "-d '{}'"}}]}},
        {"type": "message", "timestamp": "2026-04-22T12:00:05.500Z",
         "message": {"role": "toolResult",
                     "toolCallId": "call_PATCH01|fc_patch01",
                     "toolName": "exec",
                     "content": [{"type": "text", "text": "{}"}],
                     "details": {"status": "completed", "exitCode": 0,
                                 "durationMs": 100, "cwd": "/tmp/x"}}},

        # CASE 6: TRUE WRITE - PATCH /v1/blocks/{dashed}/children
        {"type": "message", "timestamp": "2026-04-22T12:00:06.000Z",
         "message": {"role": "assistant", "content": [
             {"type": "toolCall", "name": "exec",
              "id": "call_APPEND01|fc_append01",
              "arguments": {"command":
                  "curl -s -X PATCH 'https://api.notion.com/v1/"
                  "blocks/eeeeeeee-eeee-4eee-8eee-eeeeeeeeeeee/"
                  "children' -d '{\"children\":[]}'"}}]}},
        {"type": "message", "timestamp": "2026-04-22T12:00:06.500Z",
         "message": {"role": "toolResult",
                     "toolCallId": "call_APPEND01|fc_append01",
                     "toolName": "exec",
                     "content": [{"type": "text", "text": "{}"}],
                     "details": {"status": "completed", "exitCode": 0,
                                 "durationMs": 80, "cwd": "/tmp/x"}}},

        # CASE 7: TRUE WRITE - DELETE /v1/blocks/{dashed}
        {"type": "message", "timestamp": "2026-04-22T12:00:07.000Z",
         "message": {"role": "assistant", "content": [
             {"type": "toolCall", "name": "exec",
              "id": "call_DEL01|fc_del01",
              "arguments": {"command":
                  "curl -s -X DELETE https://api.notion.com/v1/"
                  "blocks/ffffffff-ffff-4fff-8fff-ffffffffffff"}}]}},
        {"type": "message", "timestamp": "2026-04-22T12:00:07.500Z",
         "message": {"role": "toolResult",
                     "toolCallId": "call_DEL01|fc_del01",
                     "toolName": "exec",
                     "content": [{"type": "text", "text": "{}"}],
                     "details": {"status": "completed", "exitCode": 0,
                                 "durationMs": 40, "cwd": "/tmp/x"}}},

        # CASE 8: WRITE call with NO matching result (stays unresolved
        # within the 10-min window - tests unresolved_pending=1).
        {"type": "message", "timestamp": "2026-04-22T12:00:08.000Z",
         "message": {"role": "assistant", "content": [
             {"type": "toolCall", "name": "exec",
              "id": "call_ORPH01|fc_orph01",
              "arguments": {"command":
                  "curl -s -X POST 'https://api.notion.com/v1/"
                  "comments' -d '{}'"}}]}},
    ]
    # Trailing partial line (no newline) - scraper must back off.
    trailing = json.dumps({"type": "message", "partial": True})
    _write_jsonl(sess, objs, trailing_partial=trailing)
    return sess


def cmd_simulate():
    tmp = Path(tempfile.mkdtemp(prefix="ins2-simulate-"))
    try:
        sess = _simulate_fixture(tmp)
        state_path = tmp / "state.json"
        output_path = tmp / "notion-writes.jsonl"
        summary = run_once(
            state_path, output_path,
            session_files_override=[str(sess)],
        )
        emitted = []
        if output_path.exists():
            for line in output_path.read_text(encoding="utf-8").splitlines():
                if line.strip():
                    emitted.append(json.loads(line))
        expected_ids = {
            "call_CREATE01|fc_create01",
            "call_PATCH01|fc_patch01",
            "call_APPEND01|fc_append01",
            "call_DEL01|fc_del01",
        }
        got_ids = {r["tool_call_id"] for r in emitted}
        st = json.loads(state_path.read_text())
        fsize = os.path.getsize(sess)
        offset = st["session_files"][str(sess)]["byte_offset"]

        failures = []
        if got_ids != expected_ids:
            failures.append(
                f"ids mismatch got={got_ids} expected={expected_ids}")
        for r in emitted:
            if not UUID_DASHED_RE.match(r["page_id_canonical"]):
                failures.append(
                    f"page_id not dashed: {r['tool_call_id']} "
                    f"-> {r['page_id_canonical']}")
            if r["method"] not in ("POST", "PATCH", "DELETE"):
                failures.append(
                    f"bad method: {r['tool_call_id']} {r['method']}")
            if not r["endpoint"].startswith("/v1/"):
                failures.append(
                    f"bad endpoint: {r['tool_call_id']} {r['endpoint']}")
            if r["agent_id"] != "worker":
                failures.append(f"bad agent_id: {r['agent_id']}")
            if r["session_id"] != "11111111-1111-4111-8111-111111111111":
                failures.append(f"bad session_id: {r['session_id']}")
            if r["exit_code"] != 0:
                failures.append(
                    f"bad exit_code: {r['tool_call_id']} {r['exit_code']}")
        if offset >= fsize:
            failures.append(
                f"offset {offset} should be < filesize {fsize} "
                f"(trailing partial must not be consumed)")
        if len(st["unresolved_calls"]) != 1:
            failures.append(
                f"unresolved count {len(st['unresolved_calls'])} "
                f"expected 1 (CASE 8)")

        print(f"--- SIMULATE summary: {summary}")
        print(f"--- emitted ({len(emitted)} records):")
        for r in emitted:
            print("  " + json.dumps(r, sort_keys=True))
        print(f"--- offset persisted: {offset} / file size: {fsize}")
        print(f"--- unresolved_pending: "
              f"{list(st['unresolved_calls'].keys())}")
        if not failures:
            print("SIMULATE: PASS")
            return 0
        print("SIMULATE: FAIL")
        for f in failures:
            print("  " + f)
        return 1
    finally:
        shutil.rmtree(tmp, ignore_errors=True)


def cmd_real_read(output_override):
    """Run once against real sessions; use a temp state and the
    given output path so we never touch the canonical output."""
    tmp_state = Path(tempfile.mkdtemp(prefix="ins2-realread-")) / "state.json"
    summary = run_once(tmp_state, Path(output_override))
    print(f"--- REAL-READ summary: {summary}")
    return 0


def cmd_tick():
    run_once(STATE_PATH, OUTPUT_PATH)
    return 0


def main():
    p = argparse.ArgumentParser(
        description="Notion write scraper (INS.2)")
    p.add_argument("--simulate", action="store_true",
                   help="Run synthetic fixture test and exit.")
    p.add_argument("--real-read", metavar="OUTPUT",
                   help="Scan real sessions once; write output to "
                        "OUTPUT (not the canonical path).")
    args = p.parse_args()
    try:
        if args.simulate:
            sys.exit(cmd_simulate())
        if args.real_read:
            sys.exit(cmd_real_read(args.real_read))
        sys.exit(cmd_tick())
    except Exception as e:
        log(f"FATAL err={type(e).__name__} msg={e}")
        sys.exit(2)


if __name__ == "__main__":
    main()
