diff --git a/docs/extensions.md b/docs/extensions.md new file mode 100644 index 000000000..f3ce4aa97 --- /dev/null +++ b/docs/extensions.md @@ -0,0 +1,239 @@ +# NeuralScape Extensions + +Extensions are self-contained Python packages that add API routes, hook into events, and extend NeuralScape's capabilities — all without modifying core code. + +## Why Extensions? + +NeuralScape core handles memory storage, retrieval, and knowledge graph management. Extensions let you build on top of that: + +- **React to events** — trigger actions when memories are stored, sessions start/end, etc. +- **Add API routes** — expose new endpoints under `/v1/extensions//` +- **Keep it modular** — each extension is isolated; failures don't affect core or other extensions + +## The NeuralscapeExtension Protocol + +Every extension implements this protocol (defined in `extensions/base.py`): + +```python +from typing import Optional, Protocol, runtime_checkable +from fastapi import APIRouter +from pydantic import BaseModel, Field + +class ExtensionManifest(BaseModel): + name: str # unique identifier + version: str # semantic version + description: str # human-readable description + author: str | None = None + hooks: list[str] = [] # event types to listen to + +@runtime_checkable +class NeuralscapeExtension(Protocol): + manifest: ExtensionManifest + + async def startup(self) -> None: ... + async def shutdown(self) -> None: ... + async def on_event(self, event_type: str, payload: dict) -> Optional[dict]: ... + def get_routes(self) -> Optional[APIRouter]: ... +``` + +## Creating an Extension + +### Step 1: Create the directory + +``` +neuralscape-service/extensions/my_extension/ + __init__.py +``` + +### Step 2: Implement the protocol + +```python +# extensions/my_extension/__init__.py + +from typing import Optional +from fastapi import APIRouter +from extensions.base import ExtensionManifest, NeuralscapeExtension + + +class MyExtension: + """Example NeuralScape extension.""" + + manifest = ExtensionManifest( + name="my-extension", + version="0.1.0", + description="Does something useful", + author="Your Name", + hooks=["memory_stored"], # events to listen to + ) + + async def startup(self) -> None: + # Initialize resources (DB connections, clients, etc.) + pass + + async def shutdown(self) -> None: + # Clean up resources + pass + + async def on_event(self, event_type: str, payload: dict) -> Optional[dict]: + if event_type == "memory_stored": + # React to a new memory being stored + memory_id = payload.get("memory_id") + # ... do something with it + return {"processed": memory_id} + return None + + def get_routes(self) -> Optional[APIRouter]: + router = APIRouter() + + @router.get("/status") + async def status(): + return {"status": "ok", "extension": self.manifest.name} + + return router +``` + +That's it. NeuralScape will auto-discover it on startup. + +### Step 3: Test it + +Start NeuralScape and verify: + +```bash +# Check it's registered +curl http://localhost:8199/v1/extensions + +# Hit your custom route +curl http://localhost:8199/v1/extensions/my-extension/status +``` + +## Extension Lifecycle + +``` +Discovery → Registration → Startup → Events → Shutdown +``` + +1. **Discovery** — on app startup, the registry scans `extensions/` subdirectories and the `NEURALSCAPE_EXTENSIONS` env var +2. **Registration** — each discovered extension is instantiated and its manifest recorded +3. **Startup** — `startup()` is called on every registered extension (failures are logged, not fatal) +4. **Events** — as NeuralScape operates, events are dispatched to extensions whose `manifest.hooks` includes the event type +5. **Shutdown** — `shutdown()` is called on all started extensions during app teardown + +## Adding Routes + +Return an `APIRouter` from `get_routes()`. Routes are mounted at `/v1/extensions//`. + +```python +def get_routes(self) -> Optional[APIRouter]: + router = APIRouter() + + @router.get("/items") + async def list_items(): + return {"items": [...]} + + @router.post("/items") + async def create_item(data: dict): + # ... + return {"created": True} + + return router +``` + +Return `None` if your extension doesn't need routes (event-only extensions). + +## Listening to Events + +Declare the event types in `manifest.hooks`, then handle them in `on_event()`: + +```python +manifest = ExtensionManifest( + name="my-listener", + version="0.1.0", + description="Listens to memory events", + hooks=["memory_stored", "session_start"], +) + +async def on_event(self, event_type: str, payload: dict) -> Optional[dict]: + if event_type == "memory_stored": + # payload has: user_id, memory_id, content, category, scope, project_id + ... + elif event_type == "session_start": + # payload has: user_id, session_id, project_id, agent_id, metadata + ... + return None +``` + +### Standard Event Types + +| Event Type | Description | Key Payload Fields | +|---|---|---| +| `conversation_turn` | Conversation turn to process | `user_id`, `messages`, `project_id` | +| `session_start` | New session began | `user_id`, `session_id`, `project_id` | +| `session_end` | Session ended | `user_id`, `session_id`, `duration_seconds` | +| `memory_stored` | Memory was stored | `user_id`, `memory_id`, `content`, `category` | +| `compile_requested` | Daily compilation requested | `user_id`, `project_id`, `requested_by` | + +### Posting Events Externally + +External systems (like OpenClaw hooks) can post events via the API: + +```bash +curl -X POST http://localhost:8199/v1/extensions/events \ + -H "Content-Type: application/json" \ + -d '{"event_type": "session_start", "payload": {"user_id": "alice", "session_id": "abc123"}}' +``` + +## Configuration + +### Auto-discovery + +Place your extension in `neuralscape-service/extensions//` with an `__init__.py` exporting a class that implements `NeuralscapeExtension`. It will be discovered automatically. + +### Explicit registration via env var + +For extensions installed as separate packages or located elsewhere: + +```bash +export NEURALSCAPE_EXTENSIONS="mypackage.ext_module,another_package.ext" +``` + +Comma-separated Python import paths. Each module should export a class implementing `NeuralscapeExtension`. + +## Error Handling + +Extensions are isolated from core and from each other: + +- If `startup()` fails, the extension is marked `failed` and skipped for events +- If `on_event()` throws, the error is logged and other extensions still receive the event +- If `shutdown()` fails, the error is logged and other extensions still shut down +- If `get_routes()` fails, the error is logged and the extension runs without routes + +## Example: Skeleton Extension + +```python +# extensions/skeleton/__init__.py +"""Minimal NeuralScape extension skeleton.""" + +from typing import Optional +from fastapi import APIRouter +from extensions.base import ExtensionManifest + + +class SkeletonExtension: + manifest = ExtensionManifest( + name="skeleton", + version="0.1.0", + description="A minimal skeleton extension", + ) + + async def startup(self) -> None: + pass + + async def shutdown(self) -> None: + pass + + async def on_event(self, event_type: str, payload: dict) -> Optional[dict]: + return None + + def get_routes(self) -> Optional[APIRouter]: + return None +``` diff --git a/neuralscape-service/extensions/__init__.py b/neuralscape-service/extensions/__init__.py new file mode 100644 index 000000000..1ca8753b4 --- /dev/null +++ b/neuralscape-service/extensions/__init__.py @@ -0,0 +1,283 @@ +"""Extension registry for NeuralScape — discovery, lifecycle, and event dispatch. + +The ExtensionRegistry discovers, loads, and manages NeuralScape extensions. +Extensions are discovered from: + 1. Subdirectories of the extensions/ package (auto-discovery) + 2. Python import paths listed in the NEURALSCAPE_EXTENSIONS env var + +Usage in main.py: + registry = ExtensionRegistry() + await registry.discover() + await registry.startup_all() + registry.mount_routes(app) + ... + await registry.shutdown_all() +""" + +import importlib +import logging +import os +import pkgutil +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + +from fastapi import APIRouter, FastAPI + +from extensions.base import ExtensionManifest, NeuralscapeExtension + +logger = logging.getLogger(__name__) + + +@dataclass +class ExtensionEntry: + """Internal record for a registered extension.""" + + instance: NeuralscapeExtension + manifest: ExtensionManifest + status: str = "registered" # registered | started | failed | stopped + + +class ExtensionRegistry: + """Manages discovery, lifecycle, and event dispatch for NeuralScape extensions. + + Thread-safe for event dispatch. Extension failures are logged and do not + propagate — a single broken extension cannot take down the service. + """ + + def __init__(self) -> None: + self._extensions: dict[str, ExtensionEntry] = {} + + @property + def extensions(self) -> dict[str, ExtensionEntry]: + """Read-only access to registered extensions.""" + return dict(self._extensions) + + async def discover(self) -> None: + """Discover and register extensions from subdirectories and env var. + + Scans extensions/ subdirectories for modules exporting a class that + implements NeuralscapeExtension, then loads any additional extensions + specified in the NEURALSCAPE_EXTENSIONS environment variable. + """ + self._discover_local() + self._discover_env() + logger.info( + "Extension discovery complete", + extra={"extensions": list(self._extensions.keys())}, + ) + + def _discover_local(self) -> None: + """Discover extensions from subdirectories of the extensions/ package.""" + extensions_dir = Path(__file__).parent + for item in sorted(extensions_dir.iterdir()): + if not item.is_dir() or item.name.startswith(("_", ".")): + continue + init_file = item / "__init__.py" + if not init_file.exists(): + continue + module_name = f"extensions.{item.name}" + self._load_extension_module(module_name, source=f"local:{item.name}") + + def _discover_env(self) -> None: + """Discover extensions from NEURALSCAPE_EXTENSIONS env var.""" + env_extensions = os.environ.get("NEURALSCAPE_EXTENSIONS", "").strip() + if not env_extensions: + return + for import_path in env_extensions.split(","): + import_path = import_path.strip() + if not import_path: + continue + self._load_extension_module(import_path, source=f"env:{import_path}") + + def _load_extension_module(self, module_name: str, source: str) -> None: + """Import a module and register the first NeuralscapeExtension class found.""" + try: + module = importlib.import_module(module_name) + except Exception: + logger.exception( + "Failed to import extension module %s (source=%s)", + module_name, + source, + ) + return + + # Look for a class-level attribute or a factory + ext_instance = None + + # Check for a module-level 'extension' attribute (pre-instantiated) + if hasattr(module, "extension") and isinstance(module.extension, NeuralscapeExtension): + ext_instance = module.extension + + # Otherwise, find the first class implementing the protocol + if ext_instance is None: + for attr_name in dir(module): + attr = getattr(module, attr_name) + if ( + isinstance(attr, type) + and attr is not NeuralscapeExtension + and issubclass_safe(attr) + ): + try: + ext_instance = attr() + except Exception: + logger.exception( + "Failed to instantiate extension class %s from %s", + attr_name, + module_name, + ) + return + break + + if ext_instance is None: + logger.warning( + "No NeuralscapeExtension found in module %s (source=%s)", + module_name, + source, + ) + return + + self.register(ext_instance) + + def register(self, extension: NeuralscapeExtension) -> None: + """Register a single extension instance. + + Args: + extension: An object implementing the NeuralscapeExtension protocol. + + Raises: + ValueError: If an extension with the same name is already registered. + """ + name = extension.manifest.name + if name in self._extensions: + logger.warning( + "Extension already registered, skipping duplicate", + extra={"extension": name}, + ) + return + self._extensions[name] = ExtensionEntry( + instance=extension, + manifest=extension.manifest, + ) + logger.info( + "Extension registered", + extra={ + "extension": name, + "version": extension.manifest.version, + "hooks": extension.manifest.hooks, + }, + ) + + async def startup_all(self) -> None: + """Call startup() on all registered extensions. + + Failures are logged; other extensions continue starting. + """ + for name, entry in self._extensions.items(): + try: + await entry.instance.startup() + entry.status = "started" + logger.info("Extension started", extra={"extension": name}) + except Exception: + entry.status = "failed" + logger.exception( + "Extension startup failed", + extra={"extension": name}, + ) + + async def shutdown_all(self) -> None: + """Call shutdown() on all started extensions. + + Failures are logged; other extensions continue shutting down. + """ + for name, entry in self._extensions.items(): + if entry.status not in ("started",): + continue + try: + await entry.instance.shutdown() + entry.status = "stopped" + logger.info("Extension stopped", extra={"extension": name}) + except Exception: + logger.exception( + "Extension shutdown failed", + extra={"extension": name}, + ) + + def mount_routes(self, app: FastAPI) -> None: + """Mount all extension routes onto the FastAPI app. + + Each extension's routes are mounted at /v1/extensions//. + """ + for name, entry in self._extensions.items(): + try: + router = entry.instance.get_routes() + if router is not None: + app.include_router( + router, + prefix=f"/v1/extensions/{name}", + tags=[f"ext:{name}"], + ) + logger.info( + "Extension routes mounted", + extra={"extension": name, "prefix": f"/v1/extensions/{name}"}, + ) + except Exception: + logger.exception( + "Failed to mount extension routes", + extra={"extension": name}, + ) + + async def emit_event(self, event_type: str, payload: dict) -> list[dict]: + """Broadcast an event to all extensions whose manifest.hooks includes the event type. + + Args: + event_type: The event type string (e.g. 'memory_stored'). + payload: Event payload as a dictionary. + + Returns: + List of non-None responses from extensions that handled the event. + """ + responses: list[dict] = [] + for name, entry in self._extensions.items(): + if entry.status != "started": + continue + if event_type not in entry.manifest.hooks: + continue + try: + result = await entry.instance.on_event(event_type, payload) + if result is not None: + responses.append(result) + except Exception: + logger.exception( + "Extension event handler failed", + extra={"extension": name, "event_type": event_type}, + ) + return responses + + def list_extensions(self) -> list[dict]: + """Return a summary of all registered extensions. + + Returns: + List of dicts with name, version, description, status, and hooks. + """ + return [ + { + "name": entry.manifest.name, + "version": entry.manifest.version, + "description": entry.manifest.description, + "author": entry.manifest.author, + "status": entry.status, + "hooks": entry.manifest.hooks, + } + for entry in self._extensions.values() + ] + + +def issubclass_safe(cls: type) -> bool: + """Check if cls has the NeuralscapeExtension protocol attributes. + + Uses duck-typing check rather than issubclass() to avoid issues + with Protocol runtime checking on classes that don't fully implement it. + """ + required = ("manifest", "startup", "shutdown", "on_event", "get_routes") + return all(hasattr(cls, attr) for attr in required) diff --git a/neuralscape-service/extensions/base.py b/neuralscape-service/extensions/base.py new file mode 100644 index 000000000..f719700d3 --- /dev/null +++ b/neuralscape-service/extensions/base.py @@ -0,0 +1,74 @@ +"""Extension protocol and manifest for NeuralScape extensions. + +Defines the interface that all NeuralScape extensions must implement. +Extensions are self-contained packages that can add API routes, hook into +events, and extend NeuralScape's capabilities without touching core code. +""" + +from typing import Optional, Protocol, runtime_checkable + +from fastapi import APIRouter +from pydantic import BaseModel, Field + + +class ExtensionManifest(BaseModel): + """Metadata describing a NeuralScape extension.""" + + name: str = Field(description="Unique extension identifier (lowercase, hyphens ok)") + version: str = Field(description="Semantic version string (e.g. '0.1.0')") + description: str = Field(description="Human-readable description of the extension") + author: Optional[str] = Field(default=None, description="Extension author name or org") + hooks: list[str] = Field( + default_factory=list, + description="Event types this extension listens to (e.g. ['memory_stored', 'session_start'])", + ) + + +@runtime_checkable +class NeuralscapeExtension(Protocol): + """Protocol that all NeuralScape extensions must implement. + + Extensions provide a manifest, lifecycle hooks (startup/shutdown), + event handlers, and optional API routes. + """ + + manifest: ExtensionManifest + + async def startup(self) -> None: + """Called when NeuralScape starts up. + + Use this for initializing resources, connections, or state + that the extension needs throughout its lifetime. + """ + ... + + async def shutdown(self) -> None: + """Called when NeuralScape shuts down. + + Use this for cleaning up resources, closing connections, + or flushing buffers. + """ + ... + + async def on_event(self, event_type: str, payload: dict) -> Optional[dict]: + """Handle an event from NeuralScape core or other extensions. + + Only called for event types listed in manifest.hooks. + + Args: + event_type: The type of event (e.g. 'memory_stored'). + payload: Event-specific data as a dictionary. + + Returns: + Optional dict with any response data, or None. + """ + ... + + def get_routes(self) -> Optional[APIRouter]: + """Return an APIRouter to mount at /v1/extensions//. + + Returns: + An APIRouter with the extension's HTTP endpoints, or None + if the extension doesn't expose any routes. + """ + ... diff --git a/neuralscape-service/extensions/conversation_compiler/README.md b/neuralscape-service/extensions/conversation_compiler/README.md new file mode 100644 index 000000000..a784b1dbe --- /dev/null +++ b/neuralscape-service/extensions/conversation_compiler/README.md @@ -0,0 +1,102 @@ +# Conversation Compiler Extension + +Automatic memory capture from conversations. Implements Karpathy's "LLM Wiki" pattern + coleam00's "Claude Memory Compiler" concept as a NeuralScape extension. + +## What It Does + +1. **Flush** — Extracts facts, decisions, preferences, and patterns from conversation turns using Gemini. Stores them in NeuralScape and appends to daily log files in the Obsidian vault. + +2. **Compile** — Synthesizes daily logs into structured articles: session summaries, project pages, decision records, and research articles. Updates the vault index and triggers dedup. + +3. **Lint** — Runs 7 health checks on the vault: broken links, orphan pages, stale content, missing cross-references, contradictions (LLM-powered), data gaps, and index drift. + +4. **Query** — Index-guided retrieval: reads the vault index to find relevant pages, retrieves content, and synthesizes answers via Gemini. Optionally files answers back as new pages. + +## Configuration + +All via environment variables: + +| Variable | Default | Description | +|----------|---------|-------------| +| `OBSIDIAN_VAULT_PATH` | `~/Documents/Obsidian/KITT/K.I.T.T.` | Root path to the Obsidian vault | +| `COMPILER_LLM_MODEL` | (NeuralScape default) | Gemini model for extraction/compilation | +| `COMPILE_AFTER_HOUR` | `18` | Hour (24h) after which auto-compilation runs | +| `AUTO_COMPILE` | `true` | Whether to auto-compile daily logs | +| `NEURALSCAPE_URL` | `http://localhost:8199` | NeuralScape API URL | + +## API Endpoints + +All routes are mounted at `/v1/extensions/conversation-compiler/`. + +### `POST /flush` +Submit a conversation turn for fact extraction. Returns 202 (async via ARQ) or sync result if ARQ unavailable. + +```json +{ + "user_message": "...", + "assistant_response": "...", + "session_id": "abc123", + "channel": "slack", + "timestamp": "2026-04-07T10:30:00", + "project_id": "neuralscape", + "user_id": "ehfaz" +} +``` + +### `POST /compile` +Trigger compilation for a date or all pending. + +```json +{ + "date": "2026-04-07", + "user_id": "ehfaz" +} +``` + +### `POST /query` +Query the knowledge base. + +```json +{ + "question": "How does the dedup system work?", + "file_back": false, + "user_id": "ehfaz" +} +``` + +### `POST /lint` +Run vault health checks. + +```json +{ + "structural_only": false +} +``` + +### `GET /status` +Get extension status and stats. + +## Vault Structure + +``` +vault/ + Daily/ # YYYY-MM-DD.md — raw extraction logs + Sessions/ # YYYY-MM-DD.md — compiled session summaries + Projects/ # /README.md — project knowledge pages + Decisions/ # .md — decision records with rationale + Research/ # .md — investigation/research articles + index.md # Auto-maintained vault index + log.md # Chronological event log +``` + +## Event Hooks + +- `conversation_turn` — Triggers flush extraction +- `session_end` — Flushes remaining context + checks if auto-compile needed +- `compile_requested` — Triggers compilation + +## ARQ Tasks + +- `process_conversation_flush` — Async flush extraction +- `process_conversation_compile` — Async compilation +- `auto_compile_check` — Periodic cron (runs after `COMPILE_AFTER_HOUR`) diff --git a/neuralscape-service/extensions/conversation_compiler/__init__.py b/neuralscape-service/extensions/conversation_compiler/__init__.py new file mode 100644 index 000000000..963e81b22 --- /dev/null +++ b/neuralscape-service/extensions/conversation_compiler/__init__.py @@ -0,0 +1,189 @@ +"""Conversation Compiler — NeuralScape Extension. + +Implements Karpathy's "LLM Wiki" pattern + coleam00's "Claude Memory Compiler" +concept as a NeuralScape extension. Automatically captures facts from conversations, +writes to daily logs, and compiles into structured knowledge articles. + +Hooks: + - conversation_turn: Extract facts from a conversation turn + - session_end: Flush remaining context + check if compile needed + - compile_requested: Trigger daily compilation +""" + +import json +from datetime import datetime +from pathlib import Path +from typing import Optional + +import structlog +from fastapi import APIRouter + +from extensions.base import ExtensionManifest, NeuralscapeExtension +from memory_service import MemoryService + +from .compile import compile_all_pending, compile_date +from .config import compiler_settings +from .flush import flush_conversation_turn +from .obsidian_writer import ObsidianWriter +from .routes import create_router + +logger = structlog.get_logger(__name__) + +# Load manifest from JSON file +_MANIFEST_PATH = Path(__file__).parent / "manifest.json" +_manifest_data = json.loads(_MANIFEST_PATH.read_text(encoding="utf-8")) + + +class ConversationCompilerExtension: + """NeuralScape extension for automatic memory capture and compilation. + + Extracts facts from conversation turns, stores them in NeuralScape, + writes human-readable logs to an Obsidian vault, and compiles + daily logs into structured knowledge articles. + """ + + def __init__(self) -> None: + self.manifest = ExtensionManifest(**_manifest_data) + self._service: Optional[MemoryService] = None + self._writer: Optional[ObsidianWriter] = None + self._task_manager = None + + @property + def service(self) -> MemoryService: + if self._service is None: + self._service = MemoryService() + return self._service + + @property + def writer(self) -> ObsidianWriter: + if self._writer is None: + self._writer = ObsidianWriter() + return self._writer + + async def startup(self) -> None: + """Initialize the extension: create writer, warm up service.""" + logger.info( + "Conversation Compiler starting up", + vault=str(compiler_settings.vault_path), + auto_compile=compiler_settings.auto_compile, + compile_after_hour=compiler_settings.compile_after_hour, + ) + + # Initialize writer (creates vault dirs if needed) + self._writer = ObsidianWriter() + + # Initialize memory service + self._service = MemoryService() + + # Try to get task manager from the main app (set by mount_routes) + # Will be set when routes are created + + logger.info("Conversation Compiler ready") + + async def shutdown(self) -> None: + """Clean up resources.""" + logger.info("Conversation Compiler shutting down") + # MemoryService cleanup is handled by the main app + + async def on_event(self, event_type: str, payload: dict) -> Optional[dict]: + """Handle events from NeuralScape core. + + Args: + event_type: One of 'conversation_turn', 'session_end', 'compile_requested'. + payload: Event-specific data. + + Returns: + Optional dict with results. + """ + if event_type == "conversation_turn": + return await self._handle_conversation_turn(payload) + elif event_type == "session_end": + return await self._handle_session_end(payload) + elif event_type == "compile_requested": + return await self._handle_compile_requested(payload) + return None + + async def _handle_conversation_turn(self, payload: dict) -> Optional[dict]: + """Extract facts from a conversation turn.""" + messages = payload.get("messages", []) + user_id = payload.get("user_id", "") + project_id = payload.get("project_id") + + if not messages or not user_id: + return None + + # Extract user message and assistant response from messages + user_message = "" + assistant_response = "" + for msg in messages: + role = msg.get("role", "") + content = msg.get("content", "") + if role == "user": + user_message = content + elif role == "assistant": + assistant_response = content + + if not user_message: + return None + + session_id = payload.get("run_id") or payload.get("agent_id") or "event" + + result = await flush_conversation_turn( + user_message=user_message, + assistant_response=assistant_response, + session_id=session_id, + channel="event", + timestamp=None, + project_id=project_id, + user_id=user_id, + service=self.service, + writer=self.writer, + ) + + return result.model_dump() if result.facts_extracted > 0 else None + + async def _handle_session_end(self, payload: dict) -> Optional[dict]: + """Handle session end: check if auto-compile is needed.""" + user_id = payload.get("user_id", "") + if not user_id: + return None + + # Check if auto-compile should run + if not compiler_settings.auto_compile: + return None + + now = datetime.now() + if now.hour < compiler_settings.compile_after_hour: + return None + + # Check if today's log has uncompiled entries + today = now.strftime("%Y-%m-%d") + if self.writer.is_daily_log_compiled(today): + return None + + logger.info("Auto-compile triggered on session end", date=today) + try: + result = await compile_date(today, self.service, self.writer) + self.writer.append_log(f"Auto-compiled {today} on session end") + return result.model_dump() + except Exception: + logger.exception("Auto-compile failed") + return None + + async def _handle_compile_requested(self, payload: dict) -> Optional[dict]: + """Handle explicit compile request.""" + date = payload.get("date") + if date: + result = await compile_date(date, self.service, self.writer) + return result.model_dump() + else: + results = await compile_all_pending(self.service, self.writer) + return {"dates_compiled": len(results), "results": [r.model_dump() for r in results]} + + def get_routes(self) -> Optional[APIRouter]: + """Return the API router for this extension.""" + return create_router( + service=self.service, + writer=self.writer, + task_manager=self._task_manager, + ) diff --git a/neuralscape-service/extensions/conversation_compiler/compile.py b/neuralscape-service/extensions/conversation_compiler/compile.py new file mode 100644 index 000000000..6b4d322fa --- /dev/null +++ b/neuralscape-service/extensions/conversation_compiler/compile.py @@ -0,0 +1,361 @@ +"""Daily compiler — synthesizes daily log entries into structured articles. + +Reads all daily log entries for a given date, groups them by project/topic/type, +calls Gemini to synthesize into structured articles, and writes them to the vault. +Idempotent: running twice on the same day updates rather than duplicates. +""" + +import json +import re +from datetime import datetime +from typing import Optional + +import structlog +from google import genai + +from config import settings as core_settings +from memory_service import MemoryService + +from .config import compiler_settings +from .obsidian_writer import ObsidianWriter +from .schemas import CompileResult, CompiledArticle + +logger = structlog.get_logger(__name__) + + +# ────────────────────────────────────────────── +# Compilation prompts +# ────────────────────────────────────────────── + +SESSION_SUMMARY_PROMPT = """\ +You are a knowledge compiler. Given the following daily log entries from a coding assistant session, write a concise session summary in Markdown. + +Structure: +1. **Overview** — 2-3 sentence summary of the day's work +2. **Key Decisions** — Bulleted list of decisions made and their rationale +3. **Facts Learned** — New information discovered +4. **Action Items** — Outstanding tasks or follow-ups +5. **Technical Notes** — Patterns, gotchas, or technical insights worth remembering + +Be concise. Use wikilinks ([[Page Name]]) where you reference topics that deserve their own page. +Omit sections that have no relevant entries. + +DAILY LOG ENTRIES: +""" + +PROJECT_SYNTHESIS_PROMPT = """\ +You are a knowledge compiler. Given the following facts about a project, synthesize them into a structured project knowledge page in Markdown. + +Structure: +1. **Overview** — What this project is and its purpose +2. **Tech Stack** — Technologies, frameworks, and tools used +3. **Architecture** — Key design decisions and patterns +4. **Conventions** — Coding conventions and standards +5. **Dependencies** — Key packages and version notes +6. **Gotchas** — Known issues, pitfalls, or warnings +7. **Recent Changes** — Notable recent work + +Use wikilinks ([[Page Name]]) where relevant. Be specific and factual. +Omit sections with no relevant entries. + +PROJECT: {project} + +FACTS: +""" + +DECISION_SYNTHESIS_PROMPT = """\ +You are a knowledge compiler. Given the following decision-related entries, write a structured decision record in Markdown. + +Structure: +1. **Decision** — Clear statement of what was decided +2. **Context** — Why this decision was needed +3. **Options Considered** — What alternatives were evaluated +4. **Rationale** — Why this option was chosen +5. **Consequences** — Known trade-offs or implications +6. **Related** — Links to related decisions or topics (use [[wikilinks]]) + +ENTRIES: +""" + +RESEARCH_SYNTHESIS_PROMPT = """\ +You are a knowledge compiler. Given the following research-related entries on a topic, write a structured research article in Markdown. + +Structure: +1. **Summary** — Key findings in 2-3 sentences +2. **Details** — Full analysis and findings +3. **Comparisons** — If applicable, comparisons between options +4. **Conclusions** — What was concluded or recommended +5. **References** — Related pages (use [[wikilinks]]) + +TOPIC: {topic} + +ENTRIES: +""" + + +def _call_gemini(prompt: str) -> str: + """Call Gemini with the given prompt and return the response text.""" + model = compiler_settings.get_llm_model(core_settings.gemini_llm_model) + client = genai.Client(api_key=core_settings.google_api_key) + response = client.models.generate_content(model=model, contents=prompt) + return response.text or "" + + +def _group_entries(entries: list[dict]) -> dict[str, list[dict]]: + """Group daily log entries by type for synthesis. + + Returns: + Dict with keys: 'decisions', 'projects', 'research', 'general'. + Each value is a list of entries, and 'projects' is a dict of + project_name -> entries. + """ + groups: dict[str, list[dict]] = { + "decisions": [], + "research": [], + "general": [], + } + projects: dict[str, list[dict]] = {} + + for entry in entries: + category = entry.get("category", "").lower() + content = entry.get("content", "") + + # Check for project mentions + project = _infer_project(content) + + if category == "decision": + groups["decisions"].append(entry) + elif category == "research": + groups["research"].append(entry) + elif project: + projects.setdefault(project, []).append(entry) + else: + groups["general"].append(entry) + + return {**groups, "projects": projects} + + +def _infer_project(content: str) -> Optional[str]: + """Try to infer a project name from content. + + Looks for known project patterns and explicit project mentions. + """ + content_lower = content.lower() + # Check for common project indicators + known_slugs = [ + "neuralscape", + "openclaw", + "lightpath", + "svc-utility-belt", + ] + for slug in known_slugs: + if slug in content_lower: + return slug + return None + + +def _extract_decision_slug(entries: list[dict]) -> str: + """Generate a slug for a group of decision entries.""" + # Use the first decision's content to derive a slug + if entries: + content = entries[0].get("content", "decision") + # Take first meaningful words + words = re.sub(r"[^\w\s]", "", content).split()[:6] + return "-".join(w.lower() for w in words) + return "unnamed-decision" + + +def _extract_research_topic(entries: list[dict]) -> str: + """Generate a topic name for a group of research entries.""" + if entries: + content = entries[0].get("content", "research") + words = re.sub(r"[^\w\s]", "", content).split()[:5] + return "-".join(w.lower() for w in words) + return "unnamed-research" + + +def _entries_to_text(entries: list[dict]) -> str: + """Convert entry dicts to a text block for LLM prompts.""" + lines = [] + for e in entries: + time = e.get("time", "") + category = e.get("category", "") + content = e.get("content", "") + lines.append(f"- [{time}] ({category}) {content}") + return "\n".join(lines) + + +async def compile_date( + date: str, + service: MemoryService, + writer: ObsidianWriter, +) -> CompileResult: + """Compile all daily log entries for a given date into structured articles. + + This is idempotent — running twice on the same date will update existing + articles rather than creating duplicates. + + Args: + date: ISO date string (YYYY-MM-DD). + service: MemoryService instance for dedup. + writer: ObsidianWriter for vault I/O. + + Returns: + CompileResult with details of what was compiled. + """ + logger.info("Starting compilation", date=date) + + # Check if already compiled + if writer.is_daily_log_compiled(date): + logger.info("Daily log already compiled, re-compiling (idempotent)", date=date) + + # Read daily log entries + entries = writer.get_daily_log_entries(date) + if not entries: + logger.info("No entries found for date", date=date) + return CompileResult(date=date) + + # Group entries + grouped = _group_entries(entries) + articles: list[CompiledArticle] = [] + entries_compiled = 0 + + # 1. Session summary (always produced if there are entries) + all_text = _entries_to_text(entries) + try: + summary_content = _call_gemini(SESSION_SUMMARY_PROMPT + all_text) + summary_path = writer.write_session_summary(date, summary_content) + articles.append( + CompiledArticle( + path=summary_path, + title=f"Session Summary — {date}", + article_type="session", + created=not writer.file_exists(summary_path), + ) + ) + entries_compiled += len(entries) + except Exception: + logger.exception("Failed to compile session summary", date=date) + + # 2. Project pages + projects: dict[str, list[dict]] = grouped.get("projects", {}) + for project_name, project_entries in projects.items(): + try: + prompt = PROJECT_SYNTHESIS_PROMPT.format(project=project_name) + prompt += _entries_to_text(project_entries) + project_content = _call_gemini(prompt) + project_path = writer.update_project_page(project_name, project_content) + articles.append( + CompiledArticle( + path=project_path, + title=project_name, + article_type="project", + created=not writer.file_exists(project_path), + ) + ) + except Exception: + logger.exception("Failed to compile project page", project=project_name) + + # 3. Decisions + decision_entries = grouped.get("decisions", []) + if decision_entries: + try: + slug = _extract_decision_slug(decision_entries) + prompt = DECISION_SYNTHESIS_PROMPT + _entries_to_text(decision_entries) + decision_content = _call_gemini(prompt) + decision_path = writer.write_decision(slug, decision_content) + articles.append( + CompiledArticle( + path=decision_path, + title=slug.replace("-", " ").title(), + article_type="decision", + created=not writer.file_exists(decision_path), + ) + ) + except Exception: + logger.exception("Failed to compile decisions") + + # 4. Research + research_entries = grouped.get("research", []) + if research_entries: + try: + topic = _extract_research_topic(research_entries) + prompt = RESEARCH_SYNTHESIS_PROMPT.format(topic=topic) + prompt += _entries_to_text(research_entries) + research_content = _call_gemini(prompt) + research_path = writer.write_research(topic, research_content) + articles.append( + CompiledArticle( + path=research_path, + title=topic.replace("-", " ").title(), + article_type="research", + created=not writer.file_exists(research_path), + ) + ) + except Exception: + logger.exception("Failed to compile research") + + # 5. Update index with all new articles + if articles: + index_entries = [ + {"path": a.path, "title": a.title, "type": a.article_type} + for a in articles + ] + writer.update_index(index_entries) + + # 6. Mark daily log as compiled + writer.mark_daily_log_compiled(date) + + # 7. Append to chronological log + writer.append_log( + f"Compiled {date}: {len(articles)} articles from {entries_compiled} entries" + ) + + # 8. Trigger dedup + dedup_triggered = False + try: + service.dedup_memories("ehfaz") + dedup_triggered = True + except Exception: + logger.warning("Post-compile dedup failed (non-critical)") + + result = CompileResult( + date=date, + articles=articles, + entries_compiled=entries_compiled, + dedup_triggered=dedup_triggered, + ) + + logger.info( + "Compilation complete", + date=date, + articles=len(articles), + entries_compiled=entries_compiled, + ) + + return result + + +async def compile_all_pending( + service: MemoryService, + writer: ObsidianWriter, +) -> list[CompileResult]: + """Compile all uncompiled daily logs. + + Returns: + List of CompileResult for each date compiled. + """ + uncompiled = writer.list_uncompiled_dates() + if not uncompiled: + logger.info("No uncompiled daily logs found") + return [] + + logger.info("Compiling pending daily logs", count=len(uncompiled)) + results = [] + for date in uncompiled: + try: + result = await compile_date(date, service, writer) + results.append(result) + except Exception: + logger.exception("Failed to compile date", date=date) + return results diff --git a/neuralscape-service/extensions/conversation_compiler/config.py b/neuralscape-service/extensions/conversation_compiler/config.py new file mode 100644 index 000000000..b34bbebbc --- /dev/null +++ b/neuralscape-service/extensions/conversation_compiler/config.py @@ -0,0 +1,63 @@ +"""Configuration for the conversation-compiler extension.""" + +import os +from pathlib import Path + +from pydantic import Field +from pydantic_settings import BaseSettings + + +class CompilerSettings(BaseSettings): + """Configuration for the conversation-compiler extension. + + All values are read from environment variables with sensible defaults. + """ + + obsidian_vault_path: str = Field( + default_factory=lambda: os.environ.get( + "OBSIDIAN_VAULT_PATH", + str(Path.home() / "Documents" / "Obsidian" / "KITT" / "K.I.T.T."), + ), + description="Root path to the Obsidian vault", + ) + + compiler_llm_model: str = Field( + default="", + description="LLM model for extraction/compilation. Empty string = use NeuralScape default.", + ) + + compile_after_hour: int = Field( + default=18, + ge=0, + le=23, + description="Hour (24h) after which auto-compilation runs (default: 18 = 6 PM)", + ) + + auto_compile: bool = Field( + default=True, + description="Whether to auto-compile daily logs after compile_after_hour", + ) + + neuralscape_url: str = Field( + default="http://localhost:8199", + description="NeuralScape API base URL (used for health checks only)", + ) + + model_config = { + "env_prefix": "", + "env_file": ".env", + "env_file_encoding": "utf-8", + "extra": "ignore", + } + + @property + def vault_path(self) -> Path: + """Return the vault path as a resolved Path object.""" + return Path(self.obsidian_vault_path).expanduser().resolve() + + def get_llm_model(self, fallback: str) -> str: + """Return the configured LLM model, or the fallback if not set.""" + return self.compiler_llm_model or fallback + + +compiler_settings = CompilerSettings() diff --git a/neuralscape-service/extensions/conversation_compiler/flush.py b/neuralscape-service/extensions/conversation_compiler/flush.py new file mode 100644 index 000000000..f817851c8 --- /dev/null +++ b/neuralscape-service/extensions/conversation_compiler/flush.py @@ -0,0 +1,244 @@ +"""Flush engine — extracts facts from conversation turns via Gemini. + +Takes a conversation turn (user message + assistant response + metadata) +and extracts structured facts, stores them in NeuralScape, and appends +a human-readable summary to the daily log. +""" + +import json +import re +from datetime import datetime +from typing import Optional + +import structlog +from google import genai + +from config import settings as core_settings +from memory_service import MemoryService + +from .config import compiler_settings +from .obsidian_writer import ObsidianWriter +from .schemas import ExtractedFact, FlushResult + +logger = structlog.get_logger(__name__) + + +# ────────────────────────────────────────────── +# Extraction prompt +# ────────────────────────────────────────────── + +CONVERSATION_EXTRACTION_PROMPT = """\ +You are an intelligent memory extraction engine. Analyze the conversation below and extract distinct, actionable pieces of knowledge. + +For each fact, assign one of these types: +- decision — A choice made, with rationale (e.g. "Chose PostgreSQL over MySQL because...") +- preference — User preference or style choice +- fact — Factual information learned (personal, technical, domain) +- pattern — Technical pattern, convention, or architecture choice +- project — Project-specific context (tech stack, dependencies, structure) +- gotcha — Warning, pitfall, or "watch out for" insight +- action_item — Something the user needs to do or follow up on +- research — Investigation, comparison, or exploration of a topic + +Rules: +1. Each fact must be a standalone sentence — useful without the conversation. +2. Be specific. "Uses Python" is too vague. "Uses Python 3.12 with FastAPI for backend services" is good. +3. Skip greetings, acknowledgments, and transient tool operations (file reads, git commands, etc.). +4. Skip information only meaningful in the current moment ("currently running tests"). +5. Deduplicate — don't extract the same insight twice with different wording. +6. For decisions, always include the rationale ("chose X because Y"). +7. If a project name is identifiable, include it. + +Respond with a JSON object: +{ + "facts": [ + {"type": "decision", "content": "...", "project": "project-name-or-null", "tags": ["tag1"]}, + {"type": "preference", "content": "...", "project": null, "tags": []} + ] +} + +If no meaningful facts can be extracted, return: {"facts": []} + +CONVERSATION: +""" + + +def _build_extraction_prompt( + user_message: str, + assistant_response: str, + channel: str = "api", +) -> str: + """Build the full extraction prompt for a conversation turn.""" + return ( + CONVERSATION_EXTRACTION_PROMPT + + f"channel: {channel}\n" + + f"user: {user_message}\n" + + f"assistant: {assistant_response}\n" + ) + + +def _parse_extraction_response(response_text: str) -> list[ExtractedFact]: + """Parse the LLM extraction response into ExtractedFact objects.""" + text = response_text.strip() + + # Strip markdown code block wrapper + if text.startswith("```"): + text = re.sub(r"^```(?:json)?\s*", "", text) + text = re.sub(r"\s*```$", "", text) + + try: + data = json.loads(text) + raw_facts = data.get("facts", []) + except (json.JSONDecodeError, AttributeError): + logger.warning("Failed to parse extraction response as JSON", response=text[:200]) + return [] + + facts = [] + for item in raw_facts: + if not isinstance(item, dict): + continue + content = item.get("content", "").strip() + if not content: + continue + facts.append( + ExtractedFact( + category=item.get("type", "fact"), + content=content, + project_id=item.get("project"), + tags=item.get("tags", []), + ) + ) + return facts + + +# Map our extraction types to NeuralScape memory categories +_TYPE_TO_CATEGORY = { + "decision": "decision", + "preference": "preference", + "fact": "personal_fact", + "pattern": "convention", + "project": "architecture", + "gotcha": "domain_knowledge", + "action_item": "task_context", + "research": "domain_knowledge", +} + + +def _map_category(extraction_type: str) -> str: + """Map an extraction type to a NeuralScape memory category.""" + return _TYPE_TO_CATEGORY.get(extraction_type, "personal_fact") + + +async def flush_conversation_turn( + user_message: str, + assistant_response: str, + session_id: str, + channel: str, + timestamp: str | None, + project_id: str | None, + user_id: str, + service: MemoryService, + writer: ObsidianWriter, +) -> FlushResult: + """Extract facts from a conversation turn and store them. + + 1. Calls Gemini to extract structured facts from the conversation. + 2. Stores each fact in NeuralScape via MemoryService. + 3. Appends a human-readable summary to the daily log. + + Args: + user_message: The user's message text. + assistant_response: The assistant's response text. + session_id: Session identifier for grouping. + channel: Channel name (e.g. 'slack', 'telegram', 'api'). + timestamp: ISO timestamp of the turn (defaults to now). + project_id: Optional project context. + user_id: The user's ID. + service: MemoryService instance for storing facts. + writer: ObsidianWriter instance for daily log writes. + + Returns: + FlushResult with extraction details. + """ + ts = timestamp or datetime.now().isoformat() + date = ts[:10] # YYYY-MM-DD + time_str = ts[11:16] if len(ts) > 16 else datetime.now().strftime("%H:%M") + + logger.info( + "Flushing conversation turn", + session_id=session_id, + channel=channel, + user_id=user_id, + ) + + # Step 1: Call Gemini for extraction + prompt = _build_extraction_prompt(user_message, assistant_response, channel) + model = compiler_settings.get_llm_model(core_settings.gemini_llm_model) + + try: + client = genai.Client(api_key=core_settings.google_api_key) + response = client.models.generate_content( + model=model, + contents=prompt, + ) + response_text = response.text or "" + except Exception: + logger.exception("Gemini extraction failed") + return FlushResult(session_id=session_id, timestamp=ts) + + # Step 2: Parse extracted facts + facts = _parse_extraction_response(response_text) + if not facts: + logger.info("No facts extracted from conversation turn", session_id=session_id) + return FlushResult(session_id=session_id, timestamp=ts) + + # Step 3: Store each fact in NeuralScape + memories_stored = 0 + for fact in facts: + category = _map_category(fact.category) + fact_project = fact.project_id or project_id + scope = "project" if fact_project else "global" + try: + service.store_raw( + content=fact.content, + user_id=user_id, + category=category, + scope=scope, + project_id=fact_project, + tags=fact.tags or None, + ) + memories_stored += 1 + except Exception: + logger.exception( + "Failed to store extracted fact", + fact=fact.content[:80], + category=category, + ) + + # Step 4: Append to daily log + log_entries = [ + { + "time": time_str, + "category": f.category, + "content": f.content, + "session_id": session_id, + } + for f in facts + ] + daily_log_path = writer.append_daily_log(date, log_entries) + + logger.info( + "Flush complete", + session_id=session_id, + facts_extracted=len(facts), + memories_stored=memories_stored, + ) + + return FlushResult( + session_id=session_id, + timestamp=ts, + facts_extracted=len(facts), + facts=facts, + daily_log_path=daily_log_path, + memories_stored=memories_stored, + ) diff --git a/neuralscape-service/extensions/conversation_compiler/lint.py b/neuralscape-service/extensions/conversation_compiler/lint.py new file mode 100644 index 000000000..0acf26ec4 --- /dev/null +++ b/neuralscape-service/extensions/conversation_compiler/lint.py @@ -0,0 +1,362 @@ +"""Vault health checker — 7 checks for Obsidian vault integrity. + +Checks for broken links, orphan pages, stale content, missing cross-references, +contradictions (LLM-powered), data gaps, and index drift. +""" + +import re +from datetime import datetime, timedelta +from pathlib import Path +from typing import Optional + +import structlog +from google import genai + +from config import settings as core_settings + +from .config import compiler_settings +from .obsidian_writer import ObsidianWriter +from .schemas import LintFinding, LintResult + +logger = structlog.get_logger(__name__) + + +def _read_frontmatter_date(content: str) -> Optional[datetime]: + """Extract the 'date' or 'updated' field from frontmatter.""" + for field in ("updated", "date"): + match = re.search(rf"^{field}:\s*(.+)$", content, re.MULTILINE) + if match: + try: + return datetime.fromisoformat(match.group(1).strip()) + except ValueError: + pass + return None + + +def check_broken_links(writer: ObsidianWriter) -> list[LintFinding]: + """Check for broken internal [[wikilinks]].""" + findings = [] + all_files = writer.list_all_files() + all_stems = {Path(f).stem for f in all_files} + + for rel_path in all_files: + content = writer.read_file(rel_path) + links = writer.find_wikilinks(content) + for link in links: + # Wikilinks can be [[Page Name]] or [[path/to/page]] + target_stem = Path(link).stem + if target_stem not in all_stems: + findings.append( + LintFinding( + check="broken_links", + severity="error", + message=f"Broken wikilink [[{link}]]", + file=rel_path, + suggestion=f"Create the page '{link}' or fix the link", + ) + ) + return findings + + +def check_orphan_pages(writer: ObsidianWriter) -> list[LintFinding]: + """Check for pages with no inbound links.""" + findings = [] + all_files = writer.list_all_files() + + # Build inbound link map + inbound: dict[str, int] = {f: 0 for f in all_files} + all_stems_to_path = {Path(f).stem: f for f in all_files} + + for rel_path in all_files: + content = writer.read_file(rel_path) + links = writer.find_wikilinks(content) + for link in links: + target_stem = Path(link).stem + if target_stem in all_stems_to_path: + target_path = all_stems_to_path[target_stem] + inbound[target_path] = inbound.get(target_path, 0) + 1 + + # Skip index.md and log.md — they're root pages + skip = {"index.md", "log.md"} + for rel_path, count in inbound.items(): + if count == 0 and rel_path not in skip: + findings.append( + LintFinding( + check="orphan_pages", + severity="warning", + message=f"Page has no inbound links", + file=rel_path, + suggestion="Add a [[wikilink]] to this page from a relevant parent page", + ) + ) + return findings + + +def check_stale_pages( + writer: ObsidianWriter, stale_days: int = 30 +) -> list[LintFinding]: + """Check for pages not updated in >stale_days that are referenced by recent pages.""" + findings = [] + all_files = writer.list_all_files() + cutoff = datetime.now() - timedelta(days=stale_days) + + # Gather page dates + page_dates: dict[str, Optional[datetime]] = {} + for rel_path in all_files: + content = writer.read_file(rel_path) + page_dates[rel_path] = _read_frontmatter_date(content) + + # Find stale pages referenced by recent pages + all_stems_to_path = {Path(f).stem: f for f in all_files} + for rel_path in all_files: + page_date = page_dates.get(rel_path) + if not page_date or page_date >= cutoff: + continue # Not stale or no date + + # Check if any recent page links to it + stem = Path(rel_path).stem + for other_path in all_files: + if other_path == rel_path: + continue + other_date = page_dates.get(other_path) + if not other_date or other_date < cutoff: + continue # Other page is also stale + content = writer.read_file(other_path) + if f"[[{stem}]]" in content or f"[[{rel_path}]]" in content: + findings.append( + LintFinding( + check="stale_pages", + severity="warning", + message=f"Page last updated {page_date.strftime('%Y-%m-%d')} but referenced by recent page {other_path}", + file=rel_path, + suggestion="Review and update this page", + ) + ) + break # One finding per stale page is enough + + return findings + + +def check_missing_cross_references(writer: ObsidianWriter) -> list[LintFinding]: + """Check for page titles mentioned in content but not linked.""" + findings = [] + all_files = writer.list_all_files() + + for rel_path in all_files: + content = writer.read_file(rel_path) + mentions = writer.find_mentions(content, all_files) + for mentioned_page in mentions: + findings.append( + LintFinding( + check="missing_cross_references", + severity="info", + message=f"Mentions '{Path(mentioned_page).stem}' but doesn't link to it", + file=rel_path, + suggestion=f"Add [[{Path(mentioned_page).stem}]] link", + ) + ) + return findings + + +def check_contradictions(writer: ObsidianWriter) -> list[LintFinding]: + """LLM-powered check for contradictions between related pages.""" + findings = [] + all_files = writer.list_all_files() + + # Build clusters of related pages (by shared wikilinks) + page_links: dict[str, set[str]] = {} + for rel_path in all_files: + content = writer.read_file(rel_path) + links = set(writer.find_wikilinks(content)) + page_links[rel_path] = links + + # Find pairs of pages that link to the same targets + checked_pairs: set[tuple[str, str]] = set() + related_pairs: list[tuple[str, str]] = [] + + for p1, links1 in page_links.items(): + for p2, links2 in page_links.items(): + if p1 >= p2: + continue + pair = (p1, p2) + if pair in checked_pairs: + continue + checked_pairs.add(pair) + overlap = links1 & links2 + if len(overlap) >= 2: + related_pairs.append(pair) + + # LLM check on related pairs (limit to avoid excessive API calls) + model = compiler_settings.get_llm_model(core_settings.gemini_llm_model) + client = genai.Client(api_key=core_settings.google_api_key) + + for p1, p2 in related_pairs[:5]: + content1 = writer.read_file(p1) + content2 = writer.read_file(p2) + + # Truncate to avoid token limits + content1 = content1[:3000] + content2 = content2[:3000] + + prompt = ( + "Compare these two knowledge pages for factual contradictions. " + "Only report clear contradictions (not just different perspectives). " + "Respond with JSON: {\"contradictions\": [{\"claim1\": \"...\", \"claim2\": \"...\", \"description\": \"...\"}]}\n" + "If none found, return: {\"contradictions\": []}\n\n" + f"PAGE 1 ({p1}):\n{content1}\n\n" + f"PAGE 2 ({p2}):\n{content2}\n" + ) + + try: + response = client.models.generate_content(model=model, contents=prompt) + text = (response.text or "").strip() + if text.startswith("```"): + text = re.sub(r"^```(?:json)?\s*", "", text) + text = re.sub(r"\s*```$", "", text) + data = __import__("json").loads(text) + for c in data.get("contradictions", []): + findings.append( + LintFinding( + check="contradictions", + severity="error", + message=c.get("description", "Contradiction found"), + file=f"{p1} vs {p2}", + suggestion=f"Review and resolve: {c.get('claim1', '')} vs {c.get('claim2', '')}", + ) + ) + except Exception: + logger.warning("Contradiction check failed for pair", p1=p1, p2=p2) + + return findings + + +def check_data_gaps(writer: ObsidianWriter) -> list[LintFinding]: + """Check for topics mentioned frequently but without a dedicated page.""" + findings = [] + all_files = writer.list_all_files() + all_stems = {Path(f).stem.lower() for f in all_files} + + # Count wikilink targets that don't exist as pages + missing_targets: dict[str, int] = {} + for rel_path in all_files: + content = writer.read_file(rel_path) + links = writer.find_wikilinks(content) + for link in links: + stem = Path(link).stem.lower() + if stem not in all_stems: + missing_targets[link] = missing_targets.get(link, 0) + 1 + + # Report frequently mentioned but missing topics + for target, count in sorted(missing_targets.items(), key=lambda x: -x[1]): + if count >= 2: + findings.append( + LintFinding( + check="data_gaps", + severity="info", + message=f"Topic '[[{target}]]' referenced {count} times but has no dedicated page", + file=None, + suggestion=f"Consider creating a page for '{target}'", + ) + ) + return findings + + +def check_index_drift(writer: ObsidianWriter) -> list[LintFinding]: + """Check for pages that exist but aren't in index.md.""" + findings = [] + all_files = writer.list_all_files() + + index_content = writer.read_file("index.md") + if not index_content: + if all_files: + findings.append( + LintFinding( + check="index_drift", + severity="warning", + message="No index.md exists but vault has files", + suggestion="Run compile to generate index.md", + ) + ) + return findings + + # Extract all paths referenced in the index + indexed_paths = set(re.findall(r"\[\[([^\]]+)\]\]", index_content)) + indexed_stems = {Path(p).stem for p in indexed_paths} + + # Skip meta files + skip_stems = {"index", "log"} + + for rel_path in all_files: + stem = Path(rel_path).stem + if stem in skip_stems: + continue + if stem not in indexed_stems and rel_path not in indexed_paths: + findings.append( + LintFinding( + check="index_drift", + severity="info", + message="Page exists but is not in index.md", + file=rel_path, + suggestion="Add this page to index.md", + ) + ) + return findings + + +async def run_lint( + writer: ObsidianWriter, + structural_only: bool = False, +) -> LintResult: + """Run all lint checks on the vault. + + Args: + writer: ObsidianWriter instance. + structural_only: If True, skip LLM-powered checks. + + Returns: + LintResult with all findings. + """ + logger.info("Running vault lint", structural_only=structural_only) + all_findings: list[LintFinding] = [] + checks_run = 0 + files_scanned = len(writer.list_all_files()) + + # Structural checks (always run) + structural_checks = [ + ("broken_links", check_broken_links), + ("orphan_pages", check_orphan_pages), + ("stale_pages", check_stale_pages), + ("missing_cross_references", check_missing_cross_references), + ("data_gaps", check_data_gaps), + ("index_drift", check_index_drift), + ] + + for name, check_fn in structural_checks: + try: + results = check_fn(writer) + all_findings.extend(results) + checks_run += 1 + except Exception: + logger.exception("Lint check failed", check=name) + + # LLM-powered checks + if not structural_only: + try: + results = check_contradictions(writer) + all_findings.extend(results) + checks_run += 1 + except Exception: + logger.exception("Contradiction check failed") + + logger.info( + "Lint complete", + checks_run=checks_run, + findings=len(all_findings), + files_scanned=files_scanned, + ) + + return LintResult( + findings=all_findings, + checks_run=checks_run, + files_scanned=files_scanned, + ) diff --git a/neuralscape-service/extensions/conversation_compiler/manifest.json b/neuralscape-service/extensions/conversation_compiler/manifest.json new file mode 100644 index 000000000..7e02f08af --- /dev/null +++ b/neuralscape-service/extensions/conversation_compiler/manifest.json @@ -0,0 +1,7 @@ +{ + "name": "conversation-compiler", + "version": "0.1.0", + "description": "Automatic memory capture from conversations. Extracts facts, decisions, and patterns from conversation turns, writes to daily logs, and compiles into structured knowledge articles.", + "author": "ehfaz", + "hooks": ["conversation_turn", "session_end", "compile_requested"] +} diff --git a/neuralscape-service/extensions/conversation_compiler/obsidian_writer.py b/neuralscape-service/extensions/conversation_compiler/obsidian_writer.py new file mode 100644 index 000000000..1b40757e4 --- /dev/null +++ b/neuralscape-service/extensions/conversation_compiler/obsidian_writer.py @@ -0,0 +1,477 @@ +"""Obsidian vault I/O for the conversation-compiler extension. + +All file writes use a temp-file + atomic rename pattern to prevent +corruption from concurrent access. Frontmatter is managed with +standard YAML delimiters. +""" + +import fcntl +import os +import re +import tempfile +from datetime import datetime +from pathlib import Path +from typing import Optional + +import structlog + +from .config import compiler_settings + +logger = structlog.get_logger(__name__) + + +def _slugify(text: str) -> str: + """Convert text to a kebab-case slug suitable for filenames.""" + slug = text.lower().strip() + slug = re.sub(r"[^\w\s-]", "", slug) + slug = re.sub(r"[\s_]+", "-", slug) + slug = re.sub(r"-+", "-", slug) + return slug.strip("-")[:80] + + +def _ensure_dir(path: Path) -> None: + """Create directory and parents if they don't exist.""" + path.mkdir(parents=True, exist_ok=True) + + +def _atomic_write(path: Path, content: str) -> None: + """Write content to path atomically using temp file + rename. + + Uses file locking to prevent concurrent write corruption. + """ + _ensure_dir(path.parent) + fd, tmp_path = tempfile.mkstemp( + dir=path.parent, prefix=f".{path.stem}.", suffix=".tmp" + ) + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX) + f.write(content) + f.flush() + os.fsync(f.fileno()) + os.rename(tmp_path, path) + except Exception: + # Clean up temp file on failure + try: + os.unlink(tmp_path) + except OSError: + pass + raise + + +def _atomic_append(path: Path, content: str) -> None: + """Append content to path with file locking.""" + _ensure_dir(path.parent) + with open(path, "a", encoding="utf-8") as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX) + f.write(content) + f.flush() + os.fsync(f.fileno()) + + +def _read_file(path: Path) -> str: + """Read file content, returning empty string if file doesn't exist.""" + if not path.exists(): + return "" + return path.read_text(encoding="utf-8") + + +def _build_frontmatter( + title: str, + tags: list[str] | None = None, + date: str | None = None, + source_count: int | None = None, + compiled: bool | None = None, + extra: dict | None = None, +) -> str: + """Build YAML frontmatter block.""" + lines = ["---"] + lines.append(f"title: {title}") + if date: + lines.append(f"date: {date}") + if tags: + lines.append(f"tags: [{', '.join(tags)}]") + if source_count is not None: + lines.append(f"source_count: {source_count}") + if compiled is not None: + lines.append(f"compiled: {str(compiled).lower()}") + if extra: + for k, v in extra.items(): + lines.append(f"{k}: {v}") + lines.append("---") + return "\n".join(lines) + "\n\n" + + +def _update_frontmatter_field(content: str, field: str, value: str) -> str: + """Update a single field in existing frontmatter, or add it.""" + fm_match = re.match(r"^---\n(.*?)\n---\n", content, re.DOTALL) + if not fm_match: + return content + + fm_body = fm_match.group(1) + field_re = re.compile(rf"^{re.escape(field)}:.*$", re.MULTILINE) + if field_re.search(fm_body): + fm_body = field_re.sub(f"{field}: {value}", fm_body) + else: + fm_body += f"\n{field}: {value}" + + return f"---\n{fm_body}\n---\n" + content[fm_match.end() :] + + +class ObsidianWriter: + """Handles all vault I/O for the conversation-compiler extension.""" + + def __init__(self, vault_path: Path | None = None) -> None: + self.vault = vault_path or compiler_settings.vault_path + logger.info("ObsidianWriter initialized", vault=str(self.vault)) + + # ── Daily logs ──────────────────────────────── + + def append_daily_log(self, date: str, entries: list[dict]) -> str: + """Append extracted entries to the daily log file. + + Args: + date: ISO date string (YYYY-MM-DD). + entries: List of dicts with 'category', 'content', 'time', 'session_id' keys. + + Returns: + Path to the daily log file (relative to vault). + """ + rel_path = f"Daily/{date}.md" + path = self.vault / rel_path + existing = _read_file(path) + + if not existing: + # Create with frontmatter + content = _build_frontmatter( + title=f"Daily Log — {date}", + tags=["daily-log", "auto-generated"], + date=date, + compiled=False, + ) + content += f"# Daily Log — {date}\n\n" + else: + content = "" + + for entry in entries: + time_str = entry.get("time", "") + category = entry.get("category", "uncategorized") + fact = entry.get("content", "") + session = entry.get("session_id", "") + content += f"- **[{time_str}]** `{category}` {fact}" + if session: + content += f" _(session: {session})_" + content += "\n" + + if existing: + _atomic_append(path, content) + else: + _atomic_write(path, content) + + logger.info("Daily log updated", date=date, entries=len(entries)) + return rel_path + + def is_daily_log_compiled(self, date: str) -> bool: + """Check if a daily log has already been compiled.""" + path = self.vault / f"Daily/{date}.md" + content = _read_file(path) + if not content: + return False + return "compiled: true" in content.split("---")[1] if "---" in content else False + + def mark_daily_log_compiled(self, date: str) -> None: + """Mark a daily log as compiled by updating its frontmatter.""" + path = self.vault / f"Daily/{date}.md" + content = _read_file(path) + if not content: + return + updated = _update_frontmatter_field(content, "compiled", "true") + updated = _update_frontmatter_field( + updated, "compiled_at", datetime.now().isoformat() + ) + _atomic_write(path, updated) + + def get_daily_log_entries(self, date: str) -> list[dict]: + """Parse entries from a daily log file. + + Returns: + List of dicts with 'time', 'category', 'content', 'session_id'. + """ + path = self.vault / f"Daily/{date}.md" + content = _read_file(path) + if not content: + return [] + + entries = [] + # Match lines like: - **[HH:MM]** `category` content _(session: xxx)_ + pattern = re.compile( + r"^- \*\*\[([^\]]*)\]\*\* `(\w+)` (.+?)(?:\s*_\(session: ([^)]+)\)_)?$", + re.MULTILINE, + ) + for m in pattern.finditer(content): + entries.append( + { + "time": m.group(1), + "category": m.group(2), + "content": m.group(3).strip(), + "session_id": m.group(4) or "", + } + ) + return entries + + def list_daily_logs(self) -> list[str]: + """List all daily log dates (YYYY-MM-DD) in the vault.""" + daily_dir = self.vault / "Daily" + if not daily_dir.exists(): + return [] + dates = [] + for f in sorted(daily_dir.glob("*.md")): + # Extract date from filename + date_str = f.stem + if re.match(r"^\d{4}-\d{2}-\d{2}$", date_str): + dates.append(date_str) + return dates + + def list_uncompiled_dates(self) -> list[str]: + """List daily log dates that haven't been compiled yet.""" + return [d for d in self.list_daily_logs() if not self.is_daily_log_compiled(d)] + + # ── Session summaries ───────────────────────── + + def write_session_summary(self, date: str, summary: str) -> str: + """Write or update a session summary for a given date. + + Returns: + Relative path within the vault. + """ + rel_path = f"Sessions/{date}.md" + path = self.vault / rel_path + content = _build_frontmatter( + title=f"Session Summary — {date}", + tags=["session-summary", "auto-generated"], + date=date, + ) + content += f"# Session Summary — {date}\n\n{summary}\n" + _atomic_write(path, content) + logger.info("Session summary written", date=date) + return rel_path + + # ── Project pages ───────────────────────────── + + def update_project_page(self, project: str, content: str) -> str: + """Update a project's README page. + + Returns: + Relative path within the vault. + """ + slug = _slugify(project) + rel_path = f"Projects/{slug}/README.md" + path = self.vault / rel_path + existing = _read_file(path) + + if existing: + # Preserve frontmatter, replace body + fm_match = re.match(r"^(---\n.*?\n---\n)", existing, re.DOTALL) + if fm_match: + fm = _update_frontmatter_field( + fm_match.group(0) + "\n", + "updated", + datetime.now().strftime("%Y-%m-%d"), + ) + full_content = fm + content + "\n" + else: + full_content = content + "\n" + else: + full_content = _build_frontmatter( + title=project, + tags=["project", slug, "auto-generated"], + date=datetime.now().strftime("%Y-%m-%d"), + ) + full_content += f"# {project}\n\n{content}\n" + + _atomic_write(path, full_content) + logger.info("Project page updated", project=project) + return rel_path + + # ── Decisions ────────────────────────────────── + + def write_decision(self, slug: str, content: str) -> str: + """Create or update a decision record. + + Returns: + Relative path within the vault. + """ + safe_slug = _slugify(slug) + rel_path = f"Decisions/{safe_slug}.md" + path = self.vault / rel_path + existing = _read_file(path) + + if existing: + updated = _update_frontmatter_field( + existing, "updated", datetime.now().strftime("%Y-%m-%d") + ) + # Replace body after frontmatter + fm_match = re.match(r"^(---\n.*?\n---\n)", updated, re.DOTALL) + if fm_match: + full_content = fm_match.group(0) + "\n" + content + "\n" + else: + full_content = content + "\n" + else: + full_content = _build_frontmatter( + title=slug.replace("-", " ").title(), + tags=["decision", "auto-generated"], + date=datetime.now().strftime("%Y-%m-%d"), + ) + full_content += content + "\n" + + _atomic_write(path, full_content) + logger.info("Decision written", slug=safe_slug) + return rel_path + + # ── Research ────────────────────────────────── + + def write_research(self, topic: str, content: str) -> str: + """Create or update a research article. + + Returns: + Relative path within the vault. + """ + safe_slug = _slugify(topic) + rel_path = f"Research/{safe_slug}.md" + path = self.vault / rel_path + existing = _read_file(path) + + if existing: + updated = _update_frontmatter_field( + existing, "updated", datetime.now().strftime("%Y-%m-%d") + ) + fm_match = re.match(r"^(---\n.*?\n---\n)", updated, re.DOTALL) + if fm_match: + full_content = fm_match.group(0) + "\n" + content + "\n" + else: + full_content = content + "\n" + else: + full_content = _build_frontmatter( + title=topic.replace("-", " ").title(), + tags=["research", "auto-generated"], + date=datetime.now().strftime("%Y-%m-%d"), + ) + full_content += content + "\n" + + _atomic_write(path, full_content) + logger.info("Research article written", topic=safe_slug) + return rel_path + + # ── Index ───────────────────────────────────── + + def update_index(self, entries: list[dict]) -> str: + """Update the vault index with new or changed pages. + + Args: + entries: List of dicts with 'path', 'title', 'type' keys. + + Returns: + Relative path to the index file. + """ + rel_path = "index.md" + path = self.vault / rel_path + existing = _read_file(path) + + # Parse existing index entries into a dict keyed by path + existing_entries: dict[str, str] = {} + if existing: + for line in existing.split("\n"): + m = re.match(r"^- \[\[([^\]]+)\]\]\s*—\s*(.+)$", line) + if m: + existing_entries[m.group(1)] = m.group(2).strip() + + # Merge new entries + for entry in entries: + entry_path = entry.get("path", "") + title = entry.get("title", "") + etype = entry.get("type", "") + existing_entries[entry_path] = f"{title} ({etype})" + + # Rebuild index + content = _build_frontmatter( + title="Vault Index", + tags=["index", "auto-generated"], + date=datetime.now().strftime("%Y-%m-%d"), + ) + content += "# Vault Index\n\n" + + # Group by type + groups: dict[str, list[tuple[str, str]]] = {} + for p, desc in sorted(existing_entries.items()): + # Extract type from description + type_match = re.search(r"\((\w+)\)$", desc) + group = type_match.group(1) if type_match else "other" + groups.setdefault(group, []).append((p, desc)) + + for group_name in sorted(groups.keys()): + content += f"## {group_name.title()}\n\n" + for p, desc in groups[group_name]: + content += f"- [[{p}]] — {desc}\n" + content += "\n" + + _atomic_write(path, content) + logger.info("Index updated", entries=len(existing_entries)) + return rel_path + + # ── Chronological log ───────────────────────── + + def append_log(self, entry: str) -> str: + """Append a timestamped entry to the chronological log. + + Returns: + Relative path to the log file. + """ + rel_path = "log.md" + path = self.vault / rel_path + + if not path.exists(): + header = _build_frontmatter( + title="Chronological Log", + tags=["log", "auto-generated"], + ) + header += "# Chronological Log\n\n" + _atomic_write(path, header) + + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + _atomic_append(path, f"- **{timestamp}** — {entry}\n") + return rel_path + + # ── Utility methods ────────────────────────── + + def list_all_files(self) -> list[str]: + """List all markdown files in the vault (relative paths).""" + if not self.vault.exists(): + return [] + return [ + str(f.relative_to(self.vault)) + for f in self.vault.rglob("*.md") + if not f.name.startswith(".") + ] + + def read_file(self, rel_path: str) -> str: + """Read a file from the vault by relative path.""" + return _read_file(self.vault / rel_path) + + def file_exists(self, rel_path: str) -> bool: + """Check if a file exists in the vault.""" + return (self.vault / rel_path).exists() + + def find_wikilinks(self, content: str) -> list[str]: + """Extract all [[wikilink]] targets from content.""" + return re.findall(r"\[\[([^\]]+)\]\]", content) + + def find_mentions(self, content: str, known_pages: list[str]) -> list[str]: + """Find page titles mentioned in content but not linked.""" + mentions = [] + content_lower = content.lower() + for page in known_pages: + # Get the page title from filename + title = Path(page).stem.replace("-", " ") + if title.lower() in content_lower and f"[[{page}]]" not in content: + mentions.append(page) + return mentions diff --git a/neuralscape-service/extensions/conversation_compiler/query.py b/neuralscape-service/extensions/conversation_compiler/query.py new file mode 100644 index 000000000..a49dd2a62 --- /dev/null +++ b/neuralscape-service/extensions/conversation_compiler/query.py @@ -0,0 +1,150 @@ +"""Index-guided retrieval — query the knowledge base using the vault index. + +Reads index.md to find relevant pages, retrieves their content, and +synthesizes an answer using Gemini. Optionally files the answer back +as a new page. +""" + +import re +from typing import Optional + +import structlog +from google import genai + +from config import settings as core_settings + +from .config import compiler_settings +from .obsidian_writer import ObsidianWriter +from .schemas import QueryResult + +logger = structlog.get_logger(__name__) + + +QUERY_PROMPT = """\ +You are a knowledge retrieval assistant. Answer the user's question using ONLY the provided vault pages as context. If the pages don't contain enough information, say so clearly. + +Use Markdown formatting. Reference source pages with [[wikilinks]] where appropriate. + +QUESTION: {question} + +VAULT PAGES: +""" + + +async def query_knowledge_base( + question: str, + writer: ObsidianWriter, + file_back: bool = False, +) -> QueryResult: + """Query the knowledge base using index-guided retrieval. + + 1. Reads index.md to identify relevant pages. + 2. Reads those pages. + 3. Sends them + the question to Gemini for synthesis. + 4. Optionally files the answer back as a new Research page. + + Args: + question: The question to answer. + writer: ObsidianWriter instance. + file_back: Whether to save the answer as a new vault page. + + Returns: + QueryResult with the answer and source pages. + """ + logger.info("Querying knowledge base", question=question[:80]) + + # Step 1: Read index to find relevant pages + index_content = writer.read_file("index.md") + all_files = writer.list_all_files() + + # Find pages that might be relevant based on keyword matching + question_words = set( + w.lower() + for w in re.findall(r"\w+", question) + if len(w) > 2 # Skip tiny words + ) + + scored_files: list[tuple[str, int]] = [] + for rel_path in all_files: + if rel_path in ("index.md", "log.md"): + continue + # Score by keyword overlap with path + index description + path_words = set( + w.lower() for w in re.findall(r"\w+", rel_path) if len(w) > 2 + ) + score = len(question_words & path_words) + + # Check index for this page's description + stem = re.escape(rel_path) + idx_match = re.search(rf"\[\[{stem}\]\]\s*—\s*(.+)$", index_content, re.MULTILINE) + if idx_match: + desc_words = set( + w.lower() + for w in re.findall(r"\w+", idx_match.group(1)) + if len(w) > 2 + ) + score += len(question_words & desc_words) + + if score > 0: + scored_files.append((rel_path, score)) + + # Sort by relevance, take top pages + scored_files.sort(key=lambda x: -x[1]) + relevant_files = [f for f, _ in scored_files[:8]] + + # If no keyword matches, include all non-log files (up to a limit) + if not relevant_files: + relevant_files = [ + f for f in all_files + if f not in ("index.md", "log.md") + ][:10] + + if not relevant_files: + return QueryResult( + answer="No vault pages found to answer this question.", + sources=[], + ) + + # Step 2: Read the relevant pages + pages_context = "" + sources = [] + for rel_path in relevant_files: + content = writer.read_file(rel_path) + if content: + pages_context += f"\n--- {rel_path} ---\n{content[:4000]}\n" + sources.append(rel_path) + + # Step 3: Query Gemini + prompt = QUERY_PROMPT.format(question=question) + pages_context + model = compiler_settings.get_llm_model(core_settings.gemini_llm_model) + + try: + client = genai.Client(api_key=core_settings.google_api_key) + response = client.models.generate_content(model=model, contents=prompt) + answer = response.text or "No answer generated." + except Exception: + logger.exception("Knowledge base query failed") + return QueryResult( + answer="Failed to generate an answer. Check logs for details.", + sources=sources, + ) + + # Step 4: Optionally file back + filed_back = None + if file_back: + try: + # Generate a topic from the question + topic_words = re.findall(r"\w+", question)[:5] + topic = "-".join(w.lower() for w in topic_words) + filed_back = writer.write_research( + topic, f"## Question\n\n{question}\n\n## Answer\n\n{answer}" + ) + logger.info("Answer filed back to vault", path=filed_back) + except Exception: + logger.exception("Failed to file answer back to vault") + + return QueryResult( + answer=answer, + sources=sources, + filed_back=filed_back, + ) diff --git a/neuralscape-service/extensions/conversation_compiler/routes.py b/neuralscape-service/extensions/conversation_compiler/routes.py new file mode 100644 index 000000000..99499c038 --- /dev/null +++ b/neuralscape-service/extensions/conversation_compiler/routes.py @@ -0,0 +1,186 @@ +"""API routes for the conversation-compiler extension. + +Mounted at /v1/extensions/conversation-compiler/ by the extension registry. +""" + +from datetime import datetime + +import structlog +from fastapi import APIRouter, HTTPException + +from memory_service import MemoryService +from task_manager import TaskManager + +from .compile import compile_all_pending, compile_date +from .config import compiler_settings +from .flush import flush_conversation_turn +from .lint import run_lint +from .obsidian_writer import ObsidianWriter +from .query import query_knowledge_base +from .schemas import ( + CompileRequest, + CompileResult, + FlushRequest, + FlushResult, + LintRequest, + LintResult, + QueryRequest, + QueryResult, + StatusResponse, +) + +logger = structlog.get_logger(__name__) + + +def create_router( + service: MemoryService, + writer: ObsidianWriter, + task_manager: TaskManager | None = None, +) -> APIRouter: + """Create the API router for the conversation-compiler extension. + + Args: + service: MemoryService instance for memory storage. + writer: ObsidianWriter instance for vault I/O. + task_manager: Optional TaskManager for async job enqueuing. + + Returns: + Configured APIRouter. + """ + router = APIRouter() + + @router.post("/flush", status_code=202, response_model=dict) + async def flush(request: FlushRequest) -> dict: + """Submit a conversation turn for fact extraction. + + Enqueues the extraction to the ARQ worker for async processing. + Returns 202 Accepted with a task reference. + """ + timestamp = request.timestamp or datetime.now().isoformat() + + if task_manager and task_manager.pool: + try: + job = await task_manager.pool.enqueue_job( + "process_conversation_flush", + request.user_message, + request.assistant_response, + request.session_id, + request.channel, + timestamp, + request.project_id, + request.user_id, + ) + task_id = job.job_id if job else "duplicate" + return {"status": "accepted", "task_id": task_id} + except Exception: + logger.warning("ARQ enqueue failed, falling back to sync flush") + + # Fallback: run synchronously + result = await flush_conversation_turn( + user_message=request.user_message, + assistant_response=request.assistant_response, + session_id=request.session_id, + channel=request.channel, + timestamp=timestamp, + project_id=request.project_id, + user_id=request.user_id, + service=service, + writer=writer, + ) + return { + "status": "completed", + "result": result.model_dump(), + } + + @router.post("/compile", status_code=202, response_model=dict) + async def compile(request: CompileRequest) -> dict: + """Trigger compilation for a specific date or all pending. + + Enqueues compilation to the ARQ worker for async processing. + """ + if task_manager and task_manager.pool: + try: + job = await task_manager.pool.enqueue_job( + "process_conversation_compile", + request.date, + request.user_id, + ) + task_id = job.job_id if job else "duplicate" + return {"status": "accepted", "task_id": task_id} + except Exception: + logger.warning("ARQ enqueue failed, falling back to sync compile") + + # Fallback: run synchronously + if request.date: + result = await compile_date(request.date, service, writer) + return {"status": "completed", "result": result.model_dump()} + else: + results = await compile_all_pending(service, writer) + return { + "status": "completed", + "results": [r.model_dump() for r in results], + } + + @router.post("/query", response_model=QueryResult) + async def query(request: QueryRequest) -> QueryResult: + """Query the knowledge base using index-guided retrieval. + + Synchronous — returns the answer directly. + """ + return await query_knowledge_base( + question=request.question, + writer=writer, + file_back=request.file_back, + ) + + @router.post("/lint", response_model=LintResult) + async def lint(request: LintRequest) -> LintResult: + """Run health checks on the Obsidian vault.""" + return await run_lint( + writer=writer, + structural_only=request.structural_only, + ) + + @router.get("/status", response_model=StatusResponse) + async def status() -> StatusResponse: + """Get extension status and stats.""" + all_files = writer.list_all_files() + daily_logs = writer.list_daily_logs() + + # Find last flush and compile times from the log + log_content = writer.read_file("log.md") + last_flush = None + last_compile = None + if log_content: + # Parse log entries for timestamps + flush_matches = list( + __import__("re").finditer( + r"\*\*(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\*\* — (?:Flush|flush)", + log_content, + ) + ) + compile_matches = list( + __import__("re").finditer( + r"\*\*(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\*\* — Compiled", + log_content, + ) + ) + if flush_matches: + last_flush = flush_matches[-1].group(1) + if compile_matches: + last_compile = compile_matches[-1].group(1) + + # Count articles (exclude daily logs, index, log) + article_count = len( + [f for f in all_files if not f.startswith("Daily/") and f not in ("index.md", "log.md")] + ) + + return StatusResponse( + last_flush=last_flush, + last_compile=last_compile, + article_count=article_count, + daily_log_count=len(daily_logs), + vault_path=str(compiler_settings.vault_path), + ) + + return router diff --git a/neuralscape-service/extensions/conversation_compiler/schemas.py b/neuralscape-service/extensions/conversation_compiler/schemas.py new file mode 100644 index 000000000..6c23f1f5b --- /dev/null +++ b/neuralscape-service/extensions/conversation_compiler/schemas.py @@ -0,0 +1,148 @@ +"""Pydantic request/response models for the conversation-compiler extension.""" + +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel, Field + + +# ────────────────────────────────────────────── +# Extraction models +# ────────────────────────────────────────────── + + +class ExtractedFact(BaseModel): + """A single fact extracted from a conversation turn.""" + + category: str = Field(description="Fact category (decision, preference, etc.)") + content: str = Field(description="The extracted fact text") + project_id: Optional[str] = Field(default=None, description="Inferred project if any") + tags: list[str] = Field(default_factory=list) + + +class FlushResult(BaseModel): + """Result of flushing (extracting from) a conversation turn.""" + + session_id: str + timestamp: str + facts_extracted: int = 0 + facts: list[ExtractedFact] = Field(default_factory=list) + daily_log_path: Optional[str] = None + memories_stored: int = 0 + + +# ────────────────────────────────────────────── +# Compilation models +# ────────────────────────────────────────────── + + +class CompiledArticle(BaseModel): + """A single article produced by compilation.""" + + path: str = Field(description="Relative path within the vault") + title: str + article_type: str = Field(description="session | project | decision | research") + created: bool = Field(default=False, description="True if newly created, False if updated") + + +class CompileResult(BaseModel): + """Result of a compilation run.""" + + date: str + articles: list[CompiledArticle] = Field(default_factory=list) + entries_compiled: int = 0 + dedup_triggered: bool = False + + +# ────────────────────────────────────────────── +# Lint models +# ────────────────────────────────────────────── + + +class LintFinding(BaseModel): + """A single lint finding.""" + + check: str = Field(description="Name of the check that found this issue") + severity: str = Field(default="warning", description="info | warning | error") + message: str + file: Optional[str] = None + suggestion: Optional[str] = None + + +class LintResult(BaseModel): + """Result of a lint run.""" + + findings: list[LintFinding] = Field(default_factory=list) + checks_run: int = 0 + files_scanned: int = 0 + + +# ────────────────────────────────────────────── +# Query models +# ────────────────────────────────────────────── + + +class QueryResult(BaseModel): + """Result of a knowledge base query.""" + + answer: str + sources: list[str] = Field(default_factory=list, description="Vault files used") + filed_back: Optional[str] = Field( + default=None, description="Path if the answer was filed back to the vault" + ) + + +# ────────────────────────────────────────────── +# API request models +# ────────────────────────────────────────────── + + +class FlushRequest(BaseModel): + """Request body for POST /flush.""" + + user_message: str = Field(min_length=1, max_length=50000) + assistant_response: str = Field(min_length=1, max_length=50000) + session_id: str = Field(min_length=1, max_length=200) + channel: str = Field(default="api", max_length=50) + timestamp: Optional[str] = None + project_id: Optional[str] = None + user_id: str = Field(min_length=1, max_length=100) + + +class CompileRequest(BaseModel): + """Request body for POST /compile.""" + + date: Optional[str] = Field( + default=None, + description="ISO date (YYYY-MM-DD) to compile. None = all pending.", + ) + user_id: str = Field(min_length=1, max_length=100) + + +class QueryRequest(BaseModel): + """Request body for POST /query.""" + + question: str = Field(min_length=1, max_length=5000) + file_back: bool = Field(default=False, description="Whether to save the answer to the vault") + user_id: str = Field(min_length=1, max_length=100) + + +class LintRequest(BaseModel): + """Request body for POST /lint.""" + + structural_only: bool = Field( + default=False, + description="If true, skip LLM-powered checks (contradictions, data gaps)", + ) + + +class StatusResponse(BaseModel): + """Response for GET /status.""" + + extension: str = "conversation-compiler" + status: str = "ok" + last_flush: Optional[str] = None + last_compile: Optional[str] = None + article_count: int = 0 + daily_log_count: int = 0 + vault_path: str = "" diff --git a/neuralscape-service/extensions/events.py b/neuralscape-service/extensions/events.py new file mode 100644 index 000000000..a38f56a4b --- /dev/null +++ b/neuralscape-service/extensions/events.py @@ -0,0 +1,92 @@ +"""Standard event types and schemas for the NeuralScape extension system. + +Defines the canonical event types that NeuralScape core emits and the +Pydantic models for their payloads. Extensions declare which event types +they listen to via their manifest.hooks list. +""" + +from enum import Enum +from typing import Optional + +from pydantic import BaseModel, Field + + +class EventType(str, Enum): + """Standard event types emitted by NeuralScape core.""" + + CONVERSATION_TURN = "conversation_turn" + """A conversation turn to process (messages from a user/agent session).""" + + SESSION_START = "session_start" + """A new agent session began.""" + + SESSION_END = "session_end" + """An agent session ended.""" + + MEMORY_STORED = "memory_stored" + """A memory was successfully stored (useful for post-processing extensions).""" + + COMPILE_REQUESTED = "compile_requested" + """Daily compilation/summarization requested.""" + + +class ConversationTurnEvent(BaseModel): + """Payload for conversation_turn events.""" + + user_id: str + messages: list[dict] = Field(description="Conversation messages ({role, content} dicts)") + project_id: Optional[str] = None + agent_id: Optional[str] = None + run_id: Optional[str] = None + + +class SessionStartEvent(BaseModel): + """Payload for session_start events.""" + + user_id: str + session_id: str + project_id: Optional[str] = None + agent_id: Optional[str] = None + metadata: Optional[dict] = None + + +class SessionEndEvent(BaseModel): + """Payload for session_end events.""" + + user_id: str + session_id: str + project_id: Optional[str] = None + agent_id: Optional[str] = None + duration_seconds: Optional[float] = None + + +class MemoryStoredEvent(BaseModel): + """Payload for memory_stored events.""" + + user_id: str + memory_id: str + content: str + category: Optional[str] = None + scope: Optional[str] = None + project_id: Optional[str] = None + + +class CompileRequestedEvent(BaseModel): + """Payload for compile_requested events.""" + + user_id: str + project_id: Optional[str] = None + requested_by: Optional[str] = Field( + default=None, + description="Who/what triggered the compilation (e.g. 'cron', 'manual', 'api')", + ) + + +# Map event types to their payload models for validation +EVENT_PAYLOAD_MODELS: dict[str, type[BaseModel]] = { + EventType.CONVERSATION_TURN: ConversationTurnEvent, + EventType.SESSION_START: SessionStartEvent, + EventType.SESSION_END: SessionEndEvent, + EventType.MEMORY_STORED: MemoryStoredEvent, + EventType.COMPILE_REQUESTED: CompileRequestedEvent, +} diff --git a/neuralscape-service/main.py b/neuralscape-service/main.py index b2912accc..af0ec1720 100644 --- a/neuralscape-service/main.py +++ b/neuralscape-service/main.py @@ -15,6 +15,7 @@ from pydantic import BaseModel, Field from config import settings +from extensions import ExtensionRegistry from logging_config import configure_logging from memory_service import MemoryService @@ -47,6 +48,9 @@ # Redis-backed task manager (initialized in lifespan) _task_manager = TaskManager() +# Extension registry (discovered + started in lifespan) +_extension_registry = ExtensionRegistry() + # Legacy lazy-init globals (kept for backward compat with old endpoints + tests) _memory = None _graphiti = None @@ -116,6 +120,11 @@ async def lifespan(app: FastAPI): # Connect task manager to Redis await _task_manager.connect() + # Discover and start extensions + await _extension_registry.discover() + await _extension_registry.startup_all() + _extension_registry.mount_routes(app) + # Start MCP HTTP session manager if enabled and connect its task manager if _mcp_session_manager is not None: from mcp_server import _task_manager as mcp_task_manager @@ -126,6 +135,9 @@ async def lifespan(app: FastAPI): else: yield + # Shutdown extensions + await _extension_registry.shutdown_all() + # Shutdown with timeout to prevent hanging on unresponsive backends try: await asyncio.wait_for( @@ -988,6 +1000,38 @@ async def v1_graph_communities( return {"status": "ok", "communities": communities} +# ── Extensions ───────────────────────────────── + + +class EmitEventRequest(BaseModel): + """Request body for posting events to the extension registry.""" + + event_type: str = Field(description="Event type (e.g. 'memory_stored', 'session_start')") + payload: dict = Field(default_factory=dict, description="Event payload data") + + +@v1_router.get("/extensions") +async def v1_list_extensions(): + """List all registered extensions with their status.""" + return {"status": "ok", "extensions": _extension_registry.list_extensions()} + + +@v1_router.post("/extensions/events") +async def v1_emit_extension_event(req: EmitEventRequest): + """Post an event to all registered extensions. + + Broadcasts the event to extensions whose manifest.hooks includes + the given event_type. Useful for external callers (e.g. OpenClaw hooks). + """ + responses = await _extension_registry.emit_event(req.event_type, req.payload) + return { + "status": "ok", + "event_type": req.event_type, + "extensions_notified": len(responses), + "responses": responses, + } + + # Mount v1 router app.include_router(v1_router) diff --git a/neuralscape-service/tests/test_conversation_compiler.py b/neuralscape-service/tests/test_conversation_compiler.py new file mode 100644 index 000000000..a860c2b02 --- /dev/null +++ b/neuralscape-service/tests/test_conversation_compiler.py @@ -0,0 +1,399 @@ +"""Tests for the conversation-compiler extension. + +Tests cover: +- ObsidianWriter (vault I/O, atomicity, frontmatter) +- Flush engine (extraction prompt parsing) +- Compile (grouping, idempotency) +- Lint checks (structural checks) +- Schemas (validation) +""" + +import json +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from extensions.conversation_compiler.config import CompilerSettings +from extensions.conversation_compiler.flush import ( + _map_category, + _parse_extraction_response, +) +from extensions.conversation_compiler.obsidian_writer import ( + ObsidianWriter, + _build_frontmatter, + _slugify, + _update_frontmatter_field, +) +from extensions.conversation_compiler.schemas import ( + CompileRequest, + ExtractedFact, + FlushRequest, + FlushResult, + LintFinding, + QueryRequest, + StatusResponse, +) + + +# ────────────────────────────────────────────── +# Fixtures +# ────────────────────────────────────────────── + + +@pytest.fixture +def tmp_vault(tmp_path): + """Create a temporary vault directory.""" + vault = tmp_path / "test-vault" + vault.mkdir() + return vault + + +@pytest.fixture +def writer(tmp_vault): + """Create an ObsidianWriter with a temp vault.""" + return ObsidianWriter(vault_path=tmp_vault) + + +# ────────────────────────────────────────────── +# ObsidianWriter tests +# ────────────────────────────────────────────── + + +class TestSlugify: + def test_basic(self): + assert _slugify("Hello World") == "hello-world" + + def test_special_chars(self): + assert _slugify("Use PostgreSQL vs MySQL?") == "use-postgresql-vs-mysql" + + def test_multiple_spaces(self): + assert _slugify("too many spaces") == "too-many-spaces" + + def test_truncation(self): + long = "a" * 100 + assert len(_slugify(long)) <= 80 + + +class TestFrontmatter: + def test_basic_frontmatter(self): + fm = _build_frontmatter(title="Test Page") + assert "---" in fm + assert "title: Test Page" in fm + + def test_frontmatter_with_tags(self): + fm = _build_frontmatter(title="Test", tags=["foo", "bar"]) + assert "tags: [foo, bar]" in fm + + def test_frontmatter_compiled_flag(self): + fm = _build_frontmatter(title="Test", compiled=False) + assert "compiled: false" in fm + + def test_update_frontmatter_field(self): + content = "---\ntitle: Test\ncompiled: false\n---\n\nBody" + updated = _update_frontmatter_field(content, "compiled", "true") + assert "compiled: true" in updated + assert "Body" in updated + + def test_update_frontmatter_adds_field(self): + content = "---\ntitle: Test\n---\n\nBody" + updated = _update_frontmatter_field(content, "compiled", "true") + assert "compiled: true" in updated + + +class TestObsidianWriter: + def test_append_daily_log_creates_file(self, writer, tmp_vault): + entries = [ + {"time": "10:30", "category": "decision", "content": "Chose FastAPI", "session_id": "s1"} + ] + rel_path = writer.append_daily_log("2026-04-07", entries) + assert rel_path == "Daily/2026-04-07.md" + assert (tmp_vault / "Daily" / "2026-04-07.md").exists() + + def test_append_daily_log_content(self, writer, tmp_vault): + entries = [ + {"time": "10:30", "category": "decision", "content": "Chose FastAPI", "session_id": "s1"} + ] + writer.append_daily_log("2026-04-07", entries) + content = (tmp_vault / "Daily" / "2026-04-07.md").read_text() + assert "Chose FastAPI" in content + assert "`decision`" in content + assert "session: s1" in content + + def test_append_daily_log_appends(self, writer, tmp_vault): + writer.append_daily_log("2026-04-07", [{"time": "10:00", "category": "fact", "content": "First"}]) + writer.append_daily_log("2026-04-07", [{"time": "11:00", "category": "fact", "content": "Second"}]) + content = (tmp_vault / "Daily" / "2026-04-07.md").read_text() + assert "First" in content + assert "Second" in content + + def test_is_daily_log_compiled_false(self, writer): + writer.append_daily_log("2026-04-07", [{"time": "10:00", "category": "fact", "content": "Test"}]) + assert not writer.is_daily_log_compiled("2026-04-07") + + def test_mark_daily_log_compiled(self, writer): + writer.append_daily_log("2026-04-07", [{"time": "10:00", "category": "fact", "content": "Test"}]) + writer.mark_daily_log_compiled("2026-04-07") + assert writer.is_daily_log_compiled("2026-04-07") + + def test_get_daily_log_entries(self, writer): + entries = [ + {"time": "10:30", "category": "decision", "content": "Chose FastAPI", "session_id": "s1"}, + {"time": "11:00", "category": "preference", "content": "Prefers tabs", "session_id": "s2"}, + ] + writer.append_daily_log("2026-04-07", entries) + parsed = writer.get_daily_log_entries("2026-04-07") + assert len(parsed) == 2 + assert parsed[0]["category"] == "decision" + assert parsed[1]["content"] == "Prefers tabs" + + def test_list_daily_logs(self, writer): + writer.append_daily_log("2026-04-06", [{"time": "10:00", "category": "fact", "content": "A"}]) + writer.append_daily_log("2026-04-07", [{"time": "10:00", "category": "fact", "content": "B"}]) + dates = writer.list_daily_logs() + assert dates == ["2026-04-06", "2026-04-07"] + + def test_list_uncompiled_dates(self, writer): + writer.append_daily_log("2026-04-06", [{"time": "10:00", "category": "fact", "content": "A"}]) + writer.append_daily_log("2026-04-07", [{"time": "10:00", "category": "fact", "content": "B"}]) + writer.mark_daily_log_compiled("2026-04-06") + assert writer.list_uncompiled_dates() == ["2026-04-07"] + + def test_write_session_summary(self, writer, tmp_vault): + rel_path = writer.write_session_summary("2026-04-07", "Today was productive.") + assert rel_path == "Sessions/2026-04-07.md" + content = (tmp_vault / "Sessions" / "2026-04-07.md").read_text() + assert "Today was productive." in content + + def test_update_project_page_create(self, writer, tmp_vault): + rel_path = writer.update_project_page("NeuralScape", "A memory service.") + assert "Projects/" in rel_path + assert (tmp_vault / rel_path).exists() + + def test_write_decision(self, writer, tmp_vault): + rel_path = writer.write_decision("use-fastapi", "Chose FastAPI because...") + assert rel_path == "Decisions/use-fastapi.md" + content = (tmp_vault / rel_path).read_text() + assert "Chose FastAPI because..." in content + + def test_write_research(self, writer, tmp_vault): + rel_path = writer.write_research("graphiti-vs-neo4j", "Comparison notes...") + assert rel_path == "Research/graphiti-vs-neo4j.md" + + def test_update_index(self, writer, tmp_vault): + entries = [ + {"path": "Sessions/2026-04-07.md", "title": "Session", "type": "session"}, + {"path": "Decisions/use-fastapi.md", "title": "Use FastAPI", "type": "decision"}, + ] + writer.update_index(entries) + content = (tmp_vault / "index.md").read_text() + assert "[[Sessions/2026-04-07.md]]" in content + assert "[[Decisions/use-fastapi.md]]" in content + + def test_append_log(self, writer, tmp_vault): + writer.append_log("Something happened") + content = (tmp_vault / "log.md").read_text() + assert "Something happened" in content + + def test_list_all_files(self, writer, tmp_vault): + (tmp_vault / "test.md").write_text("hello") + (tmp_vault / "sub").mkdir() + (tmp_vault / "sub" / "nested.md").write_text("world") + files = writer.list_all_files() + assert "test.md" in files + assert "sub/nested.md" in files + + def test_find_wikilinks(self, writer): + content = "See [[Page One]] and [[another-page]] for details." + links = writer.find_wikilinks(content) + assert "Page One" in links + assert "another-page" in links + + +# ────────────────────────────────────────────── +# Flush engine tests +# ────────────────────────────────────────────── + + +class TestParseExtractionResponse: + def test_valid_json(self): + response = json.dumps({ + "facts": [ + {"type": "decision", "content": "Chose FastAPI", "project": "neuralscape", "tags": ["web"]}, + {"type": "preference", "content": "Prefers dark mode", "project": None, "tags": []}, + ] + }) + facts = _parse_extraction_response(response) + assert len(facts) == 2 + assert facts[0].category == "decision" + assert facts[0].content == "Chose FastAPI" + assert facts[0].project_id == "neuralscape" + assert facts[1].category == "preference" + + def test_markdown_wrapped_json(self): + response = "```json\n" + json.dumps({"facts": [{"type": "fact", "content": "Test"}]}) + "\n```" + facts = _parse_extraction_response(response) + assert len(facts) == 1 + + def test_empty_facts(self): + response = json.dumps({"facts": []}) + facts = _parse_extraction_response(response) + assert len(facts) == 0 + + def test_invalid_json(self): + facts = _parse_extraction_response("this is not json") + assert len(facts) == 0 + + def test_missing_content(self): + response = json.dumps({"facts": [{"type": "fact", "content": ""}]}) + facts = _parse_extraction_response(response) + assert len(facts) == 0 + + +class TestMapCategory: + def test_decision(self): + assert _map_category("decision") == "decision" + + def test_preference(self): + assert _map_category("preference") == "preference" + + def test_unknown(self): + assert _map_category("unknown_type") == "personal_fact" + + def test_pattern(self): + assert _map_category("pattern") == "convention" + + +# ────────────────────────────────────────────── +# Schema tests +# ────────────────────────────────────────────── + + +class TestSchemas: + def test_flush_request_validation(self): + req = FlushRequest( + user_message="Hello", + assistant_response="Hi there", + session_id="s1", + user_id="ehfaz", + ) + assert req.channel == "api" + + def test_compile_request_optional_date(self): + req = CompileRequest(user_id="ehfaz") + assert req.date is None + + def test_query_request(self): + req = QueryRequest(question="How does X work?", user_id="ehfaz") + assert req.file_back is False + + def test_extracted_fact(self): + fact = ExtractedFact(category="decision", content="Chose X over Y") + assert fact.project_id is None + assert fact.tags == [] + + def test_flush_result(self): + result = FlushResult(session_id="s1", timestamp="2026-04-07T10:00:00") + assert result.facts_extracted == 0 + + def test_status_response(self): + status = StatusResponse() + assert status.extension == "conversation-compiler" + + +# ────────────────────────────────────────────── +# Lint tests (structural checks) +# ────────────────────────────────────────────── + + +class TestLintChecks: + def test_broken_links(self, writer, tmp_vault): + from extensions.conversation_compiler.lint import check_broken_links + + (tmp_vault / "page.md").write_text("See [[nonexistent-page]] for details.") + findings = check_broken_links(writer) + assert len(findings) == 1 + assert findings[0].check == "broken_links" + + def test_no_broken_links(self, writer, tmp_vault): + from extensions.conversation_compiler.lint import check_broken_links + + (tmp_vault / "page.md").write_text("See [[other]] for details.") + (tmp_vault / "other.md").write_text("Content here.") + findings = check_broken_links(writer) + assert len(findings) == 0 + + def test_orphan_pages(self, writer, tmp_vault): + from extensions.conversation_compiler.lint import check_orphan_pages + + (tmp_vault / "linked.md").write_text("See [[linked]] here.") + (tmp_vault / "orphan.md").write_text("No links to me.") + findings = check_orphan_pages(writer) + orphan_files = [f.file for f in findings] + assert "orphan.md" in orphan_files + + def test_index_drift(self, writer, tmp_vault): + from extensions.conversation_compiler.lint import check_index_drift + + (tmp_vault / "page.md").write_text("Content") + (tmp_vault / "index.md").write_text("# Index\n") + findings = check_index_drift(writer) + assert len(findings) == 1 + assert findings[0].check == "index_drift" + + def test_data_gaps(self, writer, tmp_vault): + from extensions.conversation_compiler.lint import check_data_gaps + + (tmp_vault / "a.md").write_text("See [[missing-topic]] here.") + (tmp_vault / "b.md").write_text("Also see [[missing-topic]].") + findings = check_data_gaps(writer) + assert any(f.check == "data_gaps" for f in findings) + + +# ────────────────────────────────────────────── +# Config tests +# ────────────────────────────────────────────── + + +class TestConfig: + def test_default_settings(self): + s = CompilerSettings() + assert s.compile_after_hour == 18 + assert s.auto_compile is True + + def test_get_llm_model_default(self): + s = CompilerSettings() + assert s.get_llm_model("fallback-model") == "fallback-model" + + def test_get_llm_model_override(self): + s = CompilerSettings(compiler_llm_model="custom-model") + assert s.get_llm_model("fallback-model") == "custom-model" + + def test_vault_path(self): + s = CompilerSettings(obsidian_vault_path="/tmp/test-vault") + assert s.vault_path == Path("/tmp/test-vault").resolve() + + +# ────────────────────────────────────────────── +# Extension class tests +# ────────────────────────────────────────────── + + +class TestExtensionClass: + def test_manifest_loads(self): + from extensions.conversation_compiler import ConversationCompilerExtension + + ext = ConversationCompilerExtension() + assert ext.manifest.name == "conversation-compiler" + assert ext.manifest.version == "0.1.0" + assert "conversation_turn" in ext.manifest.hooks + assert "session_end" in ext.manifest.hooks + assert "compile_requested" in ext.manifest.hooks + + def test_get_routes_returns_router(self): + from extensions.conversation_compiler import ConversationCompilerExtension + + ext = ConversationCompilerExtension() + router = ext.get_routes() + assert router is not None diff --git a/neuralscape-service/tests/test_extension_registry.py b/neuralscape-service/tests/test_extension_registry.py new file mode 100644 index 000000000..0474424d9 --- /dev/null +++ b/neuralscape-service/tests/test_extension_registry.py @@ -0,0 +1,500 @@ +"""Tests for the NeuralScape extension registry. + +Covers discovery, lifecycle, event dispatch, route mounting, +graceful failure handling, and the /v1/extensions listing endpoint. +""" + +import asyncio +from typing import Optional +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from fastapi import APIRouter, FastAPI +from fastapi.testclient import TestClient + +from extensions import ExtensionRegistry +from extensions.base import ExtensionManifest, NeuralscapeExtension +from extensions.events import EventType + + +# ── Helpers ────────────────────────────────────── + + +class DummyExtension: + """Minimal working extension for tests.""" + + def __init__( + self, + name: str = "dummy", + version: str = "1.0.0", + hooks: list[str] | None = None, + routes: bool = False, + fail_startup: bool = False, + fail_shutdown: bool = False, + fail_event: bool = False, + ): + self.manifest = ExtensionManifest( + name=name, + version=version, + description=f"Test extension: {name}", + hooks=hooks or [], + ) + self._routes = routes + self._fail_startup = fail_startup + self._fail_shutdown = fail_shutdown + self._fail_event = fail_event + self.started = False + self.stopped = False + self.events_received: list[tuple[str, dict]] = [] + + async def startup(self) -> None: + if self._fail_startup: + raise RuntimeError("startup boom") + self.started = True + + async def shutdown(self) -> None: + if self._fail_shutdown: + raise RuntimeError("shutdown boom") + self.stopped = True + + async def on_event(self, event_type: str, payload: dict) -> Optional[dict]: + if self._fail_event: + raise RuntimeError("event boom") + self.events_received.append((event_type, payload)) + return {"handled_by": self.manifest.name} + + def get_routes(self) -> Optional[APIRouter]: + if not self._routes: + return None + router = APIRouter() + + @router.get("/ping") + async def ping(): + return {"pong": self.manifest.name} + + return router + + +# ── Registration ───────────────────────────────── + + +class TestRegistration: + def test_register_extension(self): + registry = ExtensionRegistry() + ext = DummyExtension(name="test-ext") + registry.register(ext) + assert "test-ext" in registry.extensions + assert registry.extensions["test-ext"].status == "registered" + + def test_register_duplicate_skipped(self): + registry = ExtensionRegistry() + ext1 = DummyExtension(name="dup") + ext2 = DummyExtension(name="dup", version="2.0.0") + registry.register(ext1) + registry.register(ext2) + # Only the first one should be registered + assert registry.extensions["dup"].manifest.version == "1.0.0" + + def test_list_extensions(self): + registry = ExtensionRegistry() + registry.register(DummyExtension(name="ext-a", hooks=["memory_stored"])) + registry.register(DummyExtension(name="ext-b")) + listing = registry.list_extensions() + assert len(listing) == 2 + names = {e["name"] for e in listing} + assert names == {"ext-a", "ext-b"} + # Check structure + ext_a = next(e for e in listing if e["name"] == "ext-a") + assert ext_a["version"] == "1.0.0" + assert ext_a["status"] == "registered" + assert ext_a["hooks"] == ["memory_stored"] + + def test_list_extensions_empty(self): + registry = ExtensionRegistry() + assert registry.list_extensions() == [] + + +# ── Lifecycle ──────────────────────────────────── + + +class TestLifecycle: + @pytest.mark.asyncio + async def test_startup_all(self): + registry = ExtensionRegistry() + ext = DummyExtension(name="life") + registry.register(ext) + await registry.startup_all() + assert ext.started is True + assert registry.extensions["life"].status == "started" + + @pytest.mark.asyncio + async def test_shutdown_all(self): + registry = ExtensionRegistry() + ext = DummyExtension(name="life") + registry.register(ext) + await registry.startup_all() + await registry.shutdown_all() + assert ext.stopped is True + assert registry.extensions["life"].status == "stopped" + + @pytest.mark.asyncio + async def test_shutdown_skips_not_started(self): + registry = ExtensionRegistry() + ext = DummyExtension(name="never-started") + registry.register(ext) + # Don't call startup_all + await registry.shutdown_all() + assert ext.stopped is False + + @pytest.mark.asyncio + async def test_startup_failure_marks_failed(self): + registry = ExtensionRegistry() + ext = DummyExtension(name="bad", fail_startup=True) + registry.register(ext) + await registry.startup_all() + assert ext.started is False + assert registry.extensions["bad"].status == "failed" + + @pytest.mark.asyncio + async def test_startup_failure_doesnt_block_others(self): + registry = ExtensionRegistry() + bad = DummyExtension(name="bad", fail_startup=True) + good = DummyExtension(name="good") + registry.register(bad) + registry.register(good) + await registry.startup_all() + assert good.started is True + assert registry.extensions["good"].status == "started" + assert registry.extensions["bad"].status == "failed" + + @pytest.mark.asyncio + async def test_shutdown_failure_doesnt_block_others(self): + registry = ExtensionRegistry() + bad = DummyExtension(name="bad-shutdown", fail_shutdown=True) + good = DummyExtension(name="good-shutdown") + registry.register(bad) + registry.register(good) + await registry.startup_all() + await registry.shutdown_all() + assert good.stopped is True + + +# ── Event Dispatch ─────────────────────────────── + + +class TestEventDispatch: + @pytest.mark.asyncio + async def test_event_dispatched_to_matching_hooks(self): + registry = ExtensionRegistry() + ext = DummyExtension(name="listener", hooks=["memory_stored"]) + registry.register(ext) + await registry.startup_all() + + responses = await registry.emit_event("memory_stored", {"memory_id": "abc"}) + assert len(responses) == 1 + assert responses[0] == {"handled_by": "listener"} + assert ext.events_received == [("memory_stored", {"memory_id": "abc"})] + + @pytest.mark.asyncio + async def test_event_not_dispatched_to_non_matching(self): + registry = ExtensionRegistry() + ext = DummyExtension(name="listener", hooks=["session_start"]) + registry.register(ext) + await registry.startup_all() + + responses = await registry.emit_event("memory_stored", {"memory_id": "abc"}) + assert len(responses) == 0 + assert ext.events_received == [] + + @pytest.mark.asyncio + async def test_event_skips_failed_extensions(self): + registry = ExtensionRegistry() + ext = DummyExtension(name="failed-ext", hooks=["memory_stored"], fail_startup=True) + registry.register(ext) + await registry.startup_all() + + responses = await registry.emit_event("memory_stored", {"memory_id": "abc"}) + assert len(responses) == 0 + + @pytest.mark.asyncio + async def test_event_failure_doesnt_block_others(self): + registry = ExtensionRegistry() + bad = DummyExtension(name="bad-handler", hooks=["memory_stored"], fail_event=True) + good = DummyExtension(name="good-handler", hooks=["memory_stored"]) + registry.register(bad) + registry.register(good) + await registry.startup_all() + + responses = await registry.emit_event("memory_stored", {"memory_id": "abc"}) + # Only good handler's response + assert len(responses) == 1 + assert responses[0] == {"handled_by": "good-handler"} + + @pytest.mark.asyncio + async def test_multiple_extensions_receive_event(self): + registry = ExtensionRegistry() + ext1 = DummyExtension(name="ext-1", hooks=["session_start"]) + ext2 = DummyExtension(name="ext-2", hooks=["session_start"]) + registry.register(ext1) + registry.register(ext2) + await registry.startup_all() + + responses = await registry.emit_event("session_start", {"session_id": "s1"}) + assert len(responses) == 2 + assert ext1.events_received == [("session_start", {"session_id": "s1"})] + assert ext2.events_received == [("session_start", {"session_id": "s1"})] + + +# ── Route Mounting ─────────────────────────────── + + +class TestRouteMounting: + def test_mount_routes(self): + registry = ExtensionRegistry() + ext = DummyExtension(name="routed", routes=True) + registry.register(ext) + + test_app = FastAPI() + registry.mount_routes(test_app) + + client = TestClient(test_app) + resp = client.get("/v1/extensions/routed/ping") + assert resp.status_code == 200 + assert resp.json() == {"pong": "routed"} + + def test_mount_routes_no_routes(self): + registry = ExtensionRegistry() + ext = DummyExtension(name="no-routes", routes=False) + registry.register(ext) + + test_app = FastAPI() + registry.mount_routes(test_app) + + client = TestClient(test_app) + resp = client.get("/v1/extensions/no-routes/ping") + assert resp.status_code == 404 + + def test_mount_multiple_extensions(self): + registry = ExtensionRegistry() + ext1 = DummyExtension(name="ext-a", routes=True) + ext2 = DummyExtension(name="ext-b", routes=True) + registry.register(ext1) + registry.register(ext2) + + test_app = FastAPI() + registry.mount_routes(test_app) + + client = TestClient(test_app) + assert client.get("/v1/extensions/ext-a/ping").json() == {"pong": "ext-a"} + assert client.get("/v1/extensions/ext-b/ping").json() == {"pong": "ext-b"} + + +# ── Discovery ──────────────────────────────────── + + +class TestDiscovery: + @pytest.mark.asyncio + async def test_discover_empty(self): + """Discovery with no local extensions and no env var should succeed.""" + registry = ExtensionRegistry() + with patch.dict("os.environ", {}, clear=False): + # Patch _discover_local to avoid scanning real filesystem + with patch.object(registry, "_discover_local"): + await registry.discover() + assert len(registry.extensions) == 0 + + @pytest.mark.asyncio + async def test_discover_env_var(self): + """Extensions listed in NEURALSCAPE_EXTENSIONS env var are loaded.""" + registry = ExtensionRegistry() + + # Create a mock module with a valid extension class + mock_module = MagicMock() + mock_module.extension = DummyExtension(name="env-ext") + + with patch.dict("os.environ", {"NEURALSCAPE_EXTENSIONS": "fake.module"}): + with patch.object(registry, "_discover_local"): + with patch("extensions.importlib.import_module", return_value=mock_module): + await registry.discover() + + assert "env-ext" in registry.extensions + + @pytest.mark.asyncio + async def test_discover_env_var_multiple(self): + """Multiple comma-separated extensions are loaded.""" + registry = ExtensionRegistry() + + def fake_import(name): + mod = MagicMock() + mod.extension = DummyExtension(name=f"ext-{name.split('.')[-1]}") + return mod + + with patch.dict("os.environ", {"NEURALSCAPE_EXTENSIONS": "pkg.one,pkg.two"}): + with patch.object(registry, "_discover_local"): + with patch("extensions.importlib.import_module", side_effect=fake_import): + await registry.discover() + + assert "ext-one" in registry.extensions + assert "ext-two" in registry.extensions + + @pytest.mark.asyncio + async def test_discover_bad_import_handled(self): + """Failed imports are logged but don't crash discovery.""" + registry = ExtensionRegistry() + + with patch.dict("os.environ", {"NEURALSCAPE_EXTENSIONS": "nonexistent.module"}): + with patch.object(registry, "_discover_local"): + with patch( + "extensions.importlib.import_module", + side_effect=ImportError("no such module"), + ): + await registry.discover() + + assert len(registry.extensions) == 0 + + +# ── /v1/extensions Endpoint ────────────────────── + + +class TestExtensionsEndpoint: + @pytest.fixture(autouse=True) + def mock_globals(self): + """Patch main module globals to avoid real service initialization.""" + import main + + original_memory = main._memory + original_graphiti = main._graphiti + original_bridge = main._bridge + original_service = main._service + original_tm = main._task_manager + original_registry = main._extension_registry + + main._memory = MagicMock() + main._graphiti = MagicMock() + main._bridge = MagicMock() + main._service = MagicMock() + mock_tm = MagicMock() + mock_tm.connect = AsyncMock() + mock_tm.close = AsyncMock() + mock_tm.pool = None + main._task_manager = mock_tm + + yield + + main._memory = original_memory + main._graphiti = original_graphiti + main._bridge = original_bridge + main._service = original_service + main._task_manager = original_tm + main._extension_registry = original_registry + + @pytest.fixture + def client(self): + from main import app + return TestClient(app, raise_server_exceptions=False) + + def test_list_extensions_empty(self, client): + import main + main._extension_registry = ExtensionRegistry() + resp = client.get("/v1/extensions") + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "ok" + assert data["extensions"] == [] + + def test_list_extensions_with_entries(self, client): + import main + registry = ExtensionRegistry() + registry.register(DummyExtension(name="ext-a", hooks=["memory_stored"])) + registry.register(DummyExtension(name="ext-b")) + main._extension_registry = registry + + resp = client.get("/v1/extensions") + assert resp.status_code == 200 + data = resp.json() + assert len(data["extensions"]) == 2 + names = {e["name"] for e in data["extensions"]} + assert names == {"ext-a", "ext-b"} + + def test_emit_event_endpoint(self, client): + import main + + registry = ExtensionRegistry() + ext = DummyExtension(name="evt-listener", hooks=["memory_stored"]) + registry.register(ext) + # Manually set status to started so events are dispatched + registry._extensions["evt-listener"].status = "started" + main._extension_registry = registry + + resp = client.post( + "/v1/extensions/events", + json={"event_type": "memory_stored", "payload": {"memory_id": "m1"}}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "ok" + assert data["event_type"] == "memory_stored" + assert data["extensions_notified"] == 1 + + def test_emit_event_no_listeners(self, client): + import main + main._extension_registry = ExtensionRegistry() + + resp = client.post( + "/v1/extensions/events", + json={"event_type": "memory_stored", "payload": {}}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["extensions_notified"] == 0 + + +# ── Event Schema Models ────────────────────────── + + +class TestEventSchemas: + def test_event_type_enum_values(self): + assert EventType.CONVERSATION_TURN == "conversation_turn" + assert EventType.SESSION_START == "session_start" + assert EventType.SESSION_END == "session_end" + assert EventType.MEMORY_STORED == "memory_stored" + assert EventType.COMPILE_REQUESTED == "compile_requested" + + def test_event_payload_models(self): + from extensions.events import ( + CompileRequestedEvent, + ConversationTurnEvent, + EVENT_PAYLOAD_MODELS, + MemoryStoredEvent, + SessionEndEvent, + SessionStartEvent, + ) + + assert len(EVENT_PAYLOAD_MODELS) == 5 + + # Validate each model can be instantiated + ConversationTurnEvent(user_id="u1", messages=[{"role": "user", "content": "hi"}]) + SessionStartEvent(user_id="u1", session_id="s1") + SessionEndEvent(user_id="u1", session_id="s1") + MemoryStoredEvent(user_id="u1", memory_id="m1", content="fact") + CompileRequestedEvent(user_id="u1") + + +# ── Protocol Compliance ────────────────────────── + + +class TestProtocolCompliance: + def test_dummy_extension_satisfies_protocol(self): + ext = DummyExtension(name="proto-test") + assert isinstance(ext, NeuralscapeExtension) + + def test_manifest_validation(self): + manifest = ExtensionManifest( + name="test", + version="1.0.0", + description="Test extension", + author="Tester", + hooks=["memory_stored", "session_start"], + ) + assert manifest.name == "test" + assert manifest.hooks == ["memory_stored", "session_start"] diff --git a/neuralscape-service/uv.lock b/neuralscape-service/uv.lock index 2d9dac28c..98e2e3aec 100644 --- a/neuralscape-service/uv.lock +++ b/neuralscape-service/uv.lock @@ -351,15 +351,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/bc/58/6b3d24e6b9bc474a2dcdee65dfd1f008867015408a271562e4b690561a4d/cryptography-46.0.5-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:8456928655f856c6e1533ff59d5be76578a7157224dbd9ce6872f25055ab9ab7", size = 3407605, upload-time = "2026-02-10T19:18:29.233Z" }, ] -[[package]] -name = "diskcache" -version = "5.6.3" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/3f/21/1c1ffc1a039ddcc459db43cc108658f32c57d271d7289a2794e401d0fdb6/diskcache-5.6.3.tar.gz", hash = "sha256:2c3a3fa2743d8535d832ec61c2054a1641f41775aa7c556758a109941e33e4fc", size = 67916, upload-time = "2023-08-31T06:12:00.316Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/3f/27/4570e78fc0bf5ea0ca45eb1de3818a23787af9b390c0b0a0033a1b8236f9/diskcache-5.6.3-py3-none-any.whl", hash = "sha256:5e31b2d5fbad117cc363ebaf6b689474db18a1f6438bc82358b024abd4c2ca19", size = 45550, upload-time = "2023-08-31T06:11:58.822Z" }, -] - [[package]] name = "distro" version = "1.9.0" @@ -439,10 +430,9 @@ wheels = [ [[package]] name = "graphiti-core" -version = "0.28.0" +version = "0.28.2" source = { editable = "../graphiti" } dependencies = [ - { name = "diskcache" }, { name = "neo4j" }, { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, { name = "numpy", version = "2.4.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, @@ -465,10 +455,9 @@ requires-dist = [ { name = "boto3", marker = "extra == 'dev'", specifier = ">=1.39.16" }, { name = "boto3", marker = "extra == 'neo4j-opensearch'", specifier = ">=1.39.16" }, { name = "boto3", marker = "extra == 'neptune'", specifier = ">=1.39.16" }, - { name = "diskcache", specifier = ">=5.6.3" }, - { name = "diskcache-stubs", marker = "extra == 'dev'", specifier = ">=5.6.3.6.20240818" }, { name = "falkordb", marker = "extra == 'dev'", specifier = ">=1.1.2,<2.0.0" }, { name = "falkordb", marker = "extra == 'falkordb'", specifier = ">=1.1.2,<2.0.0" }, + { name = "gliner2", marker = "python_full_version >= '3.11' and extra == 'gliner2'", specifier = ">=1.2.0" }, { name = "google-genai", marker = "extra == 'dev'", specifier = ">=1.8.0" }, { name = "google-genai", marker = "extra == 'google-genai'", specifier = ">=1.62.0" }, { name = "groq", marker = "extra == 'dev'", specifier = ">=0.2.0" }, @@ -481,7 +470,7 @@ requires-dist = [ { name = "langchain-aws", marker = "extra == 'dev'", specifier = ">=0.2.29" }, { name = "langchain-aws", marker = "extra == 'neptune'", specifier = ">=0.2.29" }, { name = "langchain-openai", marker = "extra == 'dev'", specifier = ">=0.2.6" }, - { name = "langgraph", marker = "extra == 'dev'", specifier = ">=0.2.15" }, + { name = "langgraph", marker = "extra == 'dev'", specifier = ">=1.0.10" }, { name = "langsmith", marker = "extra == 'dev'", specifier = ">=0.1.108" }, { name = "neo4j", specifier = ">=5.26.0" }, { name = "numpy", specifier = ">=1.0.0" }, @@ -507,7 +496,7 @@ requires-dist = [ { name = "voyageai", marker = "extra == 'dev'", specifier = ">=0.2.3" }, { name = "voyageai", marker = "extra == 'voyageai'", specifier = ">=0.2.3" }, ] -provides-extras = ["anthropic", "groq", "google-genai", "kuzu", "falkordb", "voyageai", "neo4j-opensearch", "sentence-transformers", "neptune", "tracing", "dev"] +provides-extras = ["anthropic", "groq", "google-genai", "kuzu", "falkordb", "voyageai", "gliner2", "neo4j-opensearch", "sentence-transformers", "neptune", "tracing", "dev"] [[package]] name = "greenlet" @@ -961,7 +950,7 @@ wheels = [ [[package]] name = "mem0ai" -version = "1.0.4" +version = "1.0.10" source = { editable = "../mem0" } dependencies = [ { name = "openai" }, @@ -980,6 +969,7 @@ graphiti = [ [package.metadata] requires-dist = [ + { name = "apache-age-python", marker = "extra == 'graph'", specifier = ">=0.0.6" }, { name = "azure-identity", marker = "extra == 'vector-stores'", specifier = ">=1.24.0" }, { name = "azure-search-documents", marker = "extra == 'vector-stores'", specifier = ">=11.4.0b8" }, { name = "boto3", marker = "extra == 'extras'", specifier = ">=1.34.0" }, @@ -1004,14 +994,14 @@ requires-dist = [ { name = "langchain-neo4j", marker = "extra == 'graph'", specifier = ">=0.4.0" }, { name = "litellm", marker = "extra == 'llms'", specifier = ">=1.74.0" }, { name = "neo4j", marker = "extra == 'graph'", specifier = ">=5.23.1" }, - { name = "ollama", marker = "extra == 'llms'", specifier = ">=0.1.0" }, + { name = "ollama", marker = "extra == 'llms'", specifier = ">=0.3.0" }, { name = "openai", specifier = ">=1.90.0" }, { name = "openai", marker = "extra == 'llms'", specifier = ">=1.90.0" }, { name = "opensearch-py", marker = "extra == 'extras'", specifier = ">=2.0.0" }, { name = "pinecone", marker = "extra == 'vector-stores'", specifier = "<=7.3.0" }, { name = "pinecone-text", marker = "extra == 'vector-stores'", specifier = ">=0.10.0" }, { name = "posthog", specifier = ">=3.5.0" }, - { name = "protobuf", specifier = ">=5.29.0,<6.0.0" }, + { name = "protobuf", specifier = ">=5.29.6,<7.0.0" }, { name = "psycopg", marker = "extra == 'vector-stores'", specifier = ">=3.2.8" }, { name = "psycopg-pool", marker = "extra == 'vector-stores'", specifier = ">=3.2.6,<4.0.0" }, { name = "pydantic", specifier = ">=2.7.3" }, diff --git a/neuralscape-service/worker.py b/neuralscape-service/worker.py index 07d852309..d61bdf8ad 100644 --- a/neuralscape-service/worker.py +++ b/neuralscape-service/worker.py @@ -5,6 +5,7 @@ import hashlib import logging +from datetime import datetime from arq.cron import cron @@ -82,6 +83,84 @@ def _generate_job_id(content: str, user_id: str) -> str: return f"raw-{h}" +async def process_conversation_flush( + ctx: dict, + user_message: str, + assistant_response: str, + session_id: str, + channel: str, + timestamp: str | None, + project_id: str | None, + user_id: str, +) -> dict: + """Background task: extract facts from a conversation turn.""" + from extensions.conversation_compiler.flush import flush_conversation_turn + from extensions.conversation_compiler.obsidian_writer import ObsidianWriter + + service: MemoryService = ctx["service"] + writer: ObsidianWriter = ctx.get("compiler_writer") or ObsidianWriter() + ctx.setdefault("compiler_writer", writer) + + result = await flush_conversation_turn( + user_message=user_message, + assistant_response=assistant_response, + session_id=session_id, + channel=channel, + timestamp=timestamp, + project_id=project_id, + user_id=user_id, + service=service, + writer=writer, + ) + return result.model_dump() + + +async def process_conversation_compile( + ctx: dict, + date: str | None, + user_id: str, +) -> dict: + """Background task: compile daily logs into structured articles.""" + from extensions.conversation_compiler.compile import compile_all_pending, compile_date + from extensions.conversation_compiler.obsidian_writer import ObsidianWriter + + service: MemoryService = ctx["service"] + writer: ObsidianWriter = ctx.get("compiler_writer") or ObsidianWriter() + ctx.setdefault("compiler_writer", writer) + + if date: + result = await compile_date(date, service, writer) + return result.model_dump() + else: + results = await compile_all_pending(service, writer) + return {"dates_compiled": len(results), "results": [r.model_dump() for r in results]} + + +async def auto_compile_check(ctx: dict) -> dict | None: + """Cron job: check if auto-compilation should run after COMPILE_AFTER_HOUR.""" + from extensions.conversation_compiler.compile import compile_all_pending + from extensions.conversation_compiler.config import compiler_settings + from extensions.conversation_compiler.obsidian_writer import ObsidianWriter + + if not compiler_settings.auto_compile: + return None + + now = datetime.now() + if now.hour < compiler_settings.compile_after_hour: + return None + + service: MemoryService = ctx["service"] + writer: ObsidianWriter = ctx.get("compiler_writer") or ObsidianWriter() + ctx.setdefault("compiler_writer", writer) + + results = await compile_all_pending(service, writer) + if results: + writer.append_log(f"Auto-compiled {len(results)} date(s) via cron") + logger.info(f"Auto-compile cron: compiled {len(results)} dates") + return {"dates_compiled": len(results)} + return None + + async def dedup_all_memories(ctx: dict) -> dict: """Cron job: deduplicate Qdrant memories for every user.""" service: MemoryService = ctx["service"] @@ -131,7 +210,12 @@ async def shutdown(ctx: dict) -> None: class WorkerSettings: - functions = [process_memory_store, process_memory_raw] + functions = [ + process_memory_store, + process_memory_raw, + process_conversation_flush, + process_conversation_compile, + ] cron_jobs = [ cron( dedup_all_memories, @@ -142,6 +226,15 @@ class WorkerSettings: max_tries=1, run_at_startup=False, ), + cron( + auto_compile_check, + hour={18, 19, 20, 21, 22, 23}, + minute=30, + timeout=600, + unique=True, + max_tries=1, + run_at_startup=False, + ), ] on_startup = startup on_shutdown = shutdown