""" Ingest FastMCP documentation into rag-mcp. Walks fast_mcp_docs/, reads each .mdx/.md file, and adds it as a record in a rag-mcp document. Runs directly against the rag-mcp HTTP MCP endpoint. Usage: uv run scripts/ingest_fastmcp_docs.py [--dry-run] [--rag-url URL] """ import argparse import json import os import sys import time from pathlib import Path import httpx DOCS_DIR = Path(__file__).parent.parent / "fast_mcp_docs" DEFAULT_RAG_URL = "http://localhost:8006/mcp" DOC_SOURCE = "fastmcp-docs" DOC_DESCRIPTION = "FastMCP Python library documentation (prefecthq/fastmcp)" DOC_TAGS = ["fastmcp", "mcp", "python", "docs"] class RagMcpClient: """Minimal synchronous client for rag-mcp HTTP MCP endpoint.""" def __init__(self, url: str): self.url = url self.session_id: str | None = None self._id = 0 self.client = httpx.Client(timeout=60.0) def _next_id(self) -> int: self._id += 1 return self._id def _headers(self) -> dict: h = { "Content-Type": "application/json", "Accept": "application/json, text/event-stream", } if self.session_id: h["Mcp-Session-Id"] = self.session_id return h def _parse_sse(self, text: str) -> dict: """Extract the JSON payload from an SSE response.""" for line in text.splitlines(): if line.startswith("data: "): return json.loads(line[6:]) raise ValueError(f"No data line in SSE response: {text[:200]}") def initialize(self) -> None: payload = { "jsonrpc": "2.0", "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "ingest-fastmcp-docs", "version": "1.0"}, }, "id": self._next_id(), } resp = self.client.post(self.url, json=payload, headers=self._headers()) resp.raise_for_status() self.session_id = resp.headers.get("mcp-session-id") result = self._parse_sse(resp.text) if "error" in result: raise RuntimeError(f"initialize failed: {result['error']}") print(f"[rag-mcp] Session: {self.session_id}") def call_tool(self, name: str, arguments: dict) -> dict: payload = { "jsonrpc": "2.0", "method": "tools/call", "params": {"name": name, "arguments": arguments}, "id": self._next_id(), } resp = self.client.post(self.url, json=payload, headers=self._headers()) resp.raise_for_status() result = self._parse_sse(resp.text) if "error" in result: raise RuntimeError(f"tools/call {name} failed: {result['error']}") # Unwrap MCP content envelope content = result.get("result", {}).get("content", []) if content and content[0].get("type") == "text": return json.loads(content[0]["text"]) return result.get("result", {}) def close(self) -> None: self.client.close() def find_doc_files(docs_dir: Path) -> list[Path]: files = [] for root, _dirs, filenames in os.walk(docs_dir): for fname in sorted(filenames): if fname.endswith((".mdx", ".md")): files.append(Path(root) / fname) return sorted(files) def derive_title(rel_path: Path, content: str) -> str: """Extract title from first heading or fall back to filename.""" for line in content.splitlines(): line = line.strip() if line.startswith("# "): return line[2:].strip() if line.startswith("title:"): return line[6:].strip().strip('"').strip("'") return rel_path.stem.replace("-", " ").replace("_", " ").title() def derive_section(rel_path: Path) -> str: parts = rel_path.parts return parts[0] if len(parts) > 1 else "root" def main() -> None: parser = argparse.ArgumentParser(description="Ingest FastMCP docs into rag-mcp") parser.add_argument("--dry-run", action="store_true", help="List files only, no ingestion") parser.add_argument("--rag-url", default=DEFAULT_RAG_URL, help="rag-mcp MCP endpoint") parser.add_argument("--limit", type=int, default=0, help="Max files to ingest (0=all)") args = parser.parse_args() files = find_doc_files(DOCS_DIR) print(f"Found {len(files)} doc files in {DOCS_DIR}") if args.dry_run: for f in files: print(f" {f.relative_to(DOCS_DIR)}") return if args.limit: files = files[: args.limit] print(f"Limiting to {args.limit} files") client = RagMcpClient(args.rag_url) client.initialize() # Find or create the document print("Looking for existing fastmcp-docs document...") docs_list = client.call_tool("browse_documents", {"page": 1, "page_size": 50}) existing_doc = None for doc in docs_list: if isinstance(doc, dict) and doc.get("source") == DOC_SOURCE: existing_doc = doc break if existing_doc: doc_id = existing_doc["id"] print(f"Using existing document id={doc_id}") else: print("Creating new document...") new_doc = client.call_tool( "add_document", { "source": DOC_SOURCE, "tags": DOC_TAGS, "description": DOC_DESCRIPTION, "meta": {"repo": "prefecthq/fastmcp", "local_path": str(DOCS_DIR)}, }, ) doc_id = new_doc["id"] print(f"Created document id={doc_id}") # Ingest each file ok = 0 errors = 0 for i, fpath in enumerate(files): rel = fpath.relative_to(DOCS_DIR) try: content = fpath.read_text(encoding="utf-8") except Exception as e: print(f" [SKIP] {rel}: read error: {e}") errors += 1 continue title = derive_title(rel, content) section = derive_section(rel) try: result = client.call_tool( "add_record_fields", { "document_id": doc_id, "fields": { "title": title, "path": str(rel), "content": content, }, "metadata": { "section": section, "path": str(rel), "title": title, }, "config": { "chunk_size": 800, "overlap": 80, "embed_full_field": True, "generate_snippets": True, }, }, ) ok += 1 if (i + 1) % 10 == 0: print(f" [{i+1}/{len(files)}] {rel} -> record_id={result.get('record_id')}") except Exception as e: print(f" [ERROR] {rel}: {e}") errors += 1 time.sleep(1) # back off on error client.close() print(f"\nDone: {ok} ingested, {errors} errors (document id={doc_id})") if __name__ == "__main__": main()