searxng-mcp/scripts/ingest_fastmcp_docs.py
2026-04-20 11:42:25 +02:00

221 lines
7.1 KiB
Python

"""
Ingest FastMCP documentation into rag-mcp.
Walks fast_mcp_docs/, reads each .mdx/.md file, and adds it as a record
in a rag-mcp document. Runs directly against the rag-mcp HTTP MCP endpoint.
Usage:
uv run scripts/ingest_fastmcp_docs.py [--dry-run] [--rag-url URL]
"""
import argparse
import json
import os
import sys
import time
from pathlib import Path
import httpx
DOCS_DIR = Path(__file__).parent.parent / "fast_mcp_docs"
DEFAULT_RAG_URL = "http://localhost:8006/mcp"
DOC_SOURCE = "fastmcp-docs"
DOC_DESCRIPTION = "FastMCP Python library documentation (prefecthq/fastmcp)"
DOC_TAGS = ["fastmcp", "mcp", "python", "docs"]
class RagMcpClient:
"""Minimal synchronous client for rag-mcp HTTP MCP endpoint."""
def __init__(self, url: str):
self.url = url
self.session_id: str | None = None
self._id = 0
self.client = httpx.Client(timeout=60.0)
def _next_id(self) -> int:
self._id += 1
return self._id
def _headers(self) -> dict:
h = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
}
if self.session_id:
h["Mcp-Session-Id"] = self.session_id
return h
def _parse_sse(self, text: str) -> dict:
"""Extract the JSON payload from an SSE response."""
for line in text.splitlines():
if line.startswith("data: "):
return json.loads(line[6:])
raise ValueError(f"No data line in SSE response: {text[:200]}")
def initialize(self) -> None:
payload = {
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "ingest-fastmcp-docs", "version": "1.0"},
},
"id": self._next_id(),
}
resp = self.client.post(self.url, json=payload, headers=self._headers())
resp.raise_for_status()
self.session_id = resp.headers.get("mcp-session-id")
result = self._parse_sse(resp.text)
if "error" in result:
raise RuntimeError(f"initialize failed: {result['error']}")
print(f"[rag-mcp] Session: {self.session_id}")
def call_tool(self, name: str, arguments: dict) -> dict:
payload = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {"name": name, "arguments": arguments},
"id": self._next_id(),
}
resp = self.client.post(self.url, json=payload, headers=self._headers())
resp.raise_for_status()
result = self._parse_sse(resp.text)
if "error" in result:
raise RuntimeError(f"tools/call {name} failed: {result['error']}")
# Unwrap MCP content envelope
content = result.get("result", {}).get("content", [])
if content and content[0].get("type") == "text":
return json.loads(content[0]["text"])
return result.get("result", {})
def close(self) -> None:
self.client.close()
def find_doc_files(docs_dir: Path) -> list[Path]:
files = []
for root, _dirs, filenames in os.walk(docs_dir):
for fname in sorted(filenames):
if fname.endswith((".mdx", ".md")):
files.append(Path(root) / fname)
return sorted(files)
def derive_title(rel_path: Path, content: str) -> str:
"""Extract title from first heading or fall back to filename."""
for line in content.splitlines():
line = line.strip()
if line.startswith("# "):
return line[2:].strip()
if line.startswith("title:"):
return line[6:].strip().strip('"').strip("'")
return rel_path.stem.replace("-", " ").replace("_", " ").title()
def derive_section(rel_path: Path) -> str:
parts = rel_path.parts
return parts[0] if len(parts) > 1 else "root"
def main() -> None:
parser = argparse.ArgumentParser(description="Ingest FastMCP docs into rag-mcp")
parser.add_argument("--dry-run", action="store_true", help="List files only, no ingestion")
parser.add_argument("--rag-url", default=DEFAULT_RAG_URL, help="rag-mcp MCP endpoint")
parser.add_argument("--limit", type=int, default=0, help="Max files to ingest (0=all)")
args = parser.parse_args()
files = find_doc_files(DOCS_DIR)
print(f"Found {len(files)} doc files in {DOCS_DIR}")
if args.dry_run:
for f in files:
print(f" {f.relative_to(DOCS_DIR)}")
return
if args.limit:
files = files[: args.limit]
print(f"Limiting to {args.limit} files")
client = RagMcpClient(args.rag_url)
client.initialize()
# Find or create the document
print("Looking for existing fastmcp-docs document...")
docs_list = client.call_tool("browse_documents", {"page": 1, "page_size": 50})
existing_doc = None
for doc in docs_list:
if isinstance(doc, dict) and doc.get("source") == DOC_SOURCE:
existing_doc = doc
break
if existing_doc:
doc_id = existing_doc["id"]
print(f"Using existing document id={doc_id}")
else:
print("Creating new document...")
new_doc = client.call_tool(
"add_document",
{
"source": DOC_SOURCE,
"tags": DOC_TAGS,
"description": DOC_DESCRIPTION,
"meta": {"repo": "prefecthq/fastmcp", "local_path": str(DOCS_DIR)},
},
)
doc_id = new_doc["id"]
print(f"Created document id={doc_id}")
# Ingest each file
ok = 0
errors = 0
for i, fpath in enumerate(files):
rel = fpath.relative_to(DOCS_DIR)
try:
content = fpath.read_text(encoding="utf-8")
except Exception as e:
print(f" [SKIP] {rel}: read error: {e}")
errors += 1
continue
title = derive_title(rel, content)
section = derive_section(rel)
try:
result = client.call_tool(
"add_record_fields",
{
"document_id": doc_id,
"fields": {
"title": title,
"path": str(rel),
"content": content,
},
"metadata": {
"section": section,
"path": str(rel),
"title": title,
},
"config": {
"chunk_size": 800,
"overlap": 80,
"embed_full_field": True,
"generate_snippets": True,
},
},
)
ok += 1
if (i + 1) % 10 == 0:
print(f" [{i+1}/{len(files)}] {rel} -> record_id={result.get('record_id')}")
except Exception as e:
print(f" [ERROR] {rel}: {e}")
errors += 1
time.sleep(1) # back off on error
client.close()
print(f"\nDone: {ok} ingested, {errors} errors (document id={doc_id})")
if __name__ == "__main__":
main()