Initial commit

This commit is contained in:
Hans Aschauer 2026-04-20 11:42:25 +02:00
commit 8885c1872f
14 changed files with 1990 additions and 0 deletions

1
.env.example Normal file
View file

@ -0,0 +1 @@
SEARXNG_BASE_URL=http://localhost:8080

13
.gitignore vendored Normal file
View file

@ -0,0 +1,13 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
# Environment variables
.env

View file

@ -0,0 +1,76 @@
---
name: mcp-forge-conventions
description: How to call MCP tools from within mcp-forge execute_python scripts, including tool naming and injection syntax
---
# mcp-forge Conventions
## Tool naming
mcp-forge injects tools using their **bare function name**, not the namespaced name visible to the agent.
| Agent-side name | mcp-forge `mcp_tools` value | In-script call |
|---|---|---|
| `searxng_search` | `"search"` | `search(...)` |
| `searxng_fetch` | `"fetch"` | `fetch(...)` |
| `rag-mcp_browse_documents` | `"browse_documents"` | `browse_documents(...)` |
| `rag-mcp_search_records` | `"search_records"` | `search_records(...)` |
The pattern: strip any server prefix (e.g. `searxng_`, `rag-mcp_`) and use only the function name.
## Injection syntax
Pass a JSON array of bare tool names to `mcp_tools`:
```python
mcp-forge_execute_python(
code='results = search(query="foo"); print(results)',
mcp_tools=["search", "fetch"]
)
```
## Listing all available tools
Use the agent-side `mcp-forge_list_injectable_tools` tool to get the full catalogue before writing scripts:
```
mcp-forge_list_injectable_tools(include_schemas=false)
```
Returns each tool's `tool_name` (injected name), `qualified_name` (`provider.tool`), and provider metadata (name, transport, url). Only tools whose providers are registered in mcp-forge's own config appear here — tools available to the OpenCode agent from other MCP servers (e.g. GitHub) are NOT automatically available inside mcp-forge.
## Verifying a single tool name
To confirm a specific tool name resolves before using it, pass it in `mcp_tools` and check the `available_tools` list in the response. Only successfully resolved tools appear there.
```python
mcp-forge_execute_python(
code='print("ok")',
mcp_tools=["search"]
)
# response includes: "available_tools": ["search"]
# if the name is wrong, the whole call errors with "Tool '<name>' not found"
```
## Return values
Injected tools return Python objects (lists, dicts). Handle both a direct value and a dict wrapper:
```python
data = search(query="foo")
records = data.get("result", []) if isinstance(data, dict) else data
```
## Combining searxng + mcp-forge
```python
mcp-forge_execute_python(
code='''
results = search(query="uv python", language="en")
top = results[0]
page = fetch(url=top["url"], max_chars=2000)
print(page["content"])
''',
mcp_tools=["search", "fetch"]
)
```

1
.python-version Normal file
View file

@ -0,0 +1 @@
3.14

0
README.md Normal file
View file

1
fast_mcp_docs Symbolic link
View file

@ -0,0 +1 @@
/home/hans/software/fastmcp/docs/

22
pyproject.toml Normal file
View file

@ -0,0 +1,22 @@
[project]
name = "searxng-mcp"
version = "0.1.0"
description = "MCP server exposing SearxNG web search as a tool"
readme = "README.md"
authors = [
{ name = "Hans Aschauer", email = "hans.git@ch23.de" }
]
requires-python = ">=3.14"
dependencies = [
"fastmcp>=3.2.4",
"httpx>=0.28.1",
"pydantic-settings>=2.13.1",
"trafilatura>=2.0.0",
]
[project.scripts]
searxng-mcp = "searxng_mcp.__main__:main"
[build-system]
requires = ["uv_build>=0.10.8,<0.11.0"]
build-backend = "uv_build"

View file

@ -0,0 +1,221 @@
"""
Ingest FastMCP documentation into rag-mcp.
Walks fast_mcp_docs/, reads each .mdx/.md file, and adds it as a record
in a rag-mcp document. Runs directly against the rag-mcp HTTP MCP endpoint.
Usage:
uv run scripts/ingest_fastmcp_docs.py [--dry-run] [--rag-url URL]
"""
import argparse
import json
import os
import sys
import time
from pathlib import Path
import httpx
DOCS_DIR = Path(__file__).parent.parent / "fast_mcp_docs"
DEFAULT_RAG_URL = "http://localhost:8006/mcp"
DOC_SOURCE = "fastmcp-docs"
DOC_DESCRIPTION = "FastMCP Python library documentation (prefecthq/fastmcp)"
DOC_TAGS = ["fastmcp", "mcp", "python", "docs"]
class RagMcpClient:
"""Minimal synchronous client for rag-mcp HTTP MCP endpoint."""
def __init__(self, url: str):
self.url = url
self.session_id: str | None = None
self._id = 0
self.client = httpx.Client(timeout=60.0)
def _next_id(self) -> int:
self._id += 1
return self._id
def _headers(self) -> dict:
h = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
}
if self.session_id:
h["Mcp-Session-Id"] = self.session_id
return h
def _parse_sse(self, text: str) -> dict:
"""Extract the JSON payload from an SSE response."""
for line in text.splitlines():
if line.startswith("data: "):
return json.loads(line[6:])
raise ValueError(f"No data line in SSE response: {text[:200]}")
def initialize(self) -> None:
payload = {
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "ingest-fastmcp-docs", "version": "1.0"},
},
"id": self._next_id(),
}
resp = self.client.post(self.url, json=payload, headers=self._headers())
resp.raise_for_status()
self.session_id = resp.headers.get("mcp-session-id")
result = self._parse_sse(resp.text)
if "error" in result:
raise RuntimeError(f"initialize failed: {result['error']}")
print(f"[rag-mcp] Session: {self.session_id}")
def call_tool(self, name: str, arguments: dict) -> dict:
payload = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {"name": name, "arguments": arguments},
"id": self._next_id(),
}
resp = self.client.post(self.url, json=payload, headers=self._headers())
resp.raise_for_status()
result = self._parse_sse(resp.text)
if "error" in result:
raise RuntimeError(f"tools/call {name} failed: {result['error']}")
# Unwrap MCP content envelope
content = result.get("result", {}).get("content", [])
if content and content[0].get("type") == "text":
return json.loads(content[0]["text"])
return result.get("result", {})
def close(self) -> None:
self.client.close()
def find_doc_files(docs_dir: Path) -> list[Path]:
files = []
for root, _dirs, filenames in os.walk(docs_dir):
for fname in sorted(filenames):
if fname.endswith((".mdx", ".md")):
files.append(Path(root) / fname)
return sorted(files)
def derive_title(rel_path: Path, content: str) -> str:
"""Extract title from first heading or fall back to filename."""
for line in content.splitlines():
line = line.strip()
if line.startswith("# "):
return line[2:].strip()
if line.startswith("title:"):
return line[6:].strip().strip('"').strip("'")
return rel_path.stem.replace("-", " ").replace("_", " ").title()
def derive_section(rel_path: Path) -> str:
parts = rel_path.parts
return parts[0] if len(parts) > 1 else "root"
def main() -> None:
parser = argparse.ArgumentParser(description="Ingest FastMCP docs into rag-mcp")
parser.add_argument("--dry-run", action="store_true", help="List files only, no ingestion")
parser.add_argument("--rag-url", default=DEFAULT_RAG_URL, help="rag-mcp MCP endpoint")
parser.add_argument("--limit", type=int, default=0, help="Max files to ingest (0=all)")
args = parser.parse_args()
files = find_doc_files(DOCS_DIR)
print(f"Found {len(files)} doc files in {DOCS_DIR}")
if args.dry_run:
for f in files:
print(f" {f.relative_to(DOCS_DIR)}")
return
if args.limit:
files = files[: args.limit]
print(f"Limiting to {args.limit} files")
client = RagMcpClient(args.rag_url)
client.initialize()
# Find or create the document
print("Looking for existing fastmcp-docs document...")
docs_list = client.call_tool("browse_documents", {"page": 1, "page_size": 50})
existing_doc = None
for doc in docs_list:
if isinstance(doc, dict) and doc.get("source") == DOC_SOURCE:
existing_doc = doc
break
if existing_doc:
doc_id = existing_doc["id"]
print(f"Using existing document id={doc_id}")
else:
print("Creating new document...")
new_doc = client.call_tool(
"add_document",
{
"source": DOC_SOURCE,
"tags": DOC_TAGS,
"description": DOC_DESCRIPTION,
"meta": {"repo": "prefecthq/fastmcp", "local_path": str(DOCS_DIR)},
},
)
doc_id = new_doc["id"]
print(f"Created document id={doc_id}")
# Ingest each file
ok = 0
errors = 0
for i, fpath in enumerate(files):
rel = fpath.relative_to(DOCS_DIR)
try:
content = fpath.read_text(encoding="utf-8")
except Exception as e:
print(f" [SKIP] {rel}: read error: {e}")
errors += 1
continue
title = derive_title(rel, content)
section = derive_section(rel)
try:
result = client.call_tool(
"add_record_fields",
{
"document_id": doc_id,
"fields": {
"title": title,
"path": str(rel),
"content": content,
},
"metadata": {
"section": section,
"path": str(rel),
"title": title,
},
"config": {
"chunk_size": 800,
"overlap": 80,
"embed_full_field": True,
"generate_snippets": True,
},
},
)
ok += 1
if (i + 1) % 10 == 0:
print(f" [{i+1}/{len(files)}] {rel} -> record_id={result.get('record_id')}")
except Exception as e:
print(f" [ERROR] {rel}: {e}")
errors += 1
time.sleep(1) # back off on error
client.close()
print(f"\nDone: {ok} ingested, {errors} errors (document id={doc_id})")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,5 @@
"""SearxNG MCP — package entry point."""
from searxng_mcp.server import mcp
__all__ = ["mcp"]

View file

@ -0,0 +1,41 @@
"""CLI entry point for the SearxNG MCP server."""
import argparse
from searxng_mcp.server import mcp
def main() -> None:
parser = argparse.ArgumentParser(
prog="searxng-mcp",
description="SearxNG MCP server",
)
parser.add_argument(
"--transport",
choices=["stdio", "http", "sse"],
default="stdio",
help="Transport protocol (default: stdio)",
)
parser.add_argument(
"--host",
default="127.0.0.1",
help="Host to bind when using http/sse transport (default: 127.0.0.1)",
)
parser.add_argument(
"--port",
type=int,
default=8000,
help="Port to bind when using http/sse transport (default: 8000)",
)
args = parser.parse_args()
kwargs = {"transport": args.transport}
if args.transport in ("http", "sse"):
kwargs["host"] = args.host
kwargs["port"] = args.port
mcp.run(**kwargs)
if __name__ == "__main__":
main()

0
src/searxng_mcp/py.typed Normal file
View file

View file

@ -0,0 +1,36 @@
"""HTTP client for the SearxNG search API."""
from typing import Any
import httpx
async def search(
base_url: str,
query: str,
categories: str | None = None,
engines: str | None = None,
language: str | None = None,
pageno: int = 1,
time_range: str | None = None,
safesearch: int = 0,
) -> dict[str, Any]:
"""Send a search request to a SearxNG instance and return parsed JSON."""
params: dict[str, Any] = {
"q": query,
"format": "json",
"pageno": pageno,
"safesearch": safesearch,
}
if categories:
params["categories"] = categories
if engines:
params["engines"] = engines
if language:
params["language"] = language
if time_range:
params["time_range"] = time_range
async with httpx.AsyncClient() as client:
response = await client.get(f"{base_url.rstrip('/')}/search", params=params)
response.raise_for_status()
return response.json()

222
src/searxng_mcp/server.py Normal file
View file

@ -0,0 +1,222 @@
"""SearxNG MCP server."""
from typing import Annotated, Literal
from fastmcp import FastMCP
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
import asyncio
import trafilatura
from searxng_mcp.searxng import search as _search
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="SEARXNG_", env_file=".env", env_file_encoding="utf-8")
base_url: str = "http://localhost:8080"
settings = Settings()
mcp = FastMCP(
"SearxNG Search",
instructions=(
"Use the search tool to query the web via a SearxNG instance. "
"Prefer specific queries and use categories/time_range to narrow results. "
"Use the fetch tool to retrieve a page preview (first N chars). "
"If the page is truncated and you need more, read the resource "
"web://fetch?url=<url>&start=<n>&end=<m> to get a specific character slice. "
"Pages are cached after the first fetch; pass use_cache=false to force a refresh."
),
)
# In-memory cache: (url, output_format, include_tables, include_images, include_links) -> content
_cache: dict[tuple, str] = {}
async def _fetch_and_extract(
url: str,
output_format: str = "markdown",
include_tables: bool = True,
include_images: bool = False,
include_links: bool = False,
use_cache: bool = True,
) -> str:
"""Shared fetch+extract logic used by both the tool and resource."""
cache_key = (url, output_format, include_tables, include_images, include_links)
if use_cache and cache_key in _cache:
return _cache[cache_key]
loop = asyncio.get_event_loop()
downloaded = await loop.run_in_executor(None, trafilatura.fetch_url, url)
if not downloaded:
raise ValueError(f"Failed to fetch URL: {url}")
result = await loop.run_in_executor(
None,
lambda: trafilatura.extract(
downloaded,
url=url,
output_format=output_format,
include_tables=include_tables,
include_images=include_images,
include_links=include_links,
with_metadata=output_format == "json",
),
)
if not result:
raise ValueError(f"Failed to extract content from URL: {url}")
_cache[cache_key] = result
return result
@mcp.tool
async def search(
query: Annotated[str, Field(description="Search query string.")],
categories: Annotated[
str | None,
Field(description="Comma-separated categories: general, images, news, science, files, social_media, it, map."),
] = None,
engines: Annotated[
str | None,
Field(description="Comma-separated engines to use, e.g. 'google,bing'. Overrides categories."),
] = None,
language: Annotated[
str | None,
Field(description="BCP 47 language code for results, e.g. 'en', 'de'."),
] = None,
pageno: Annotated[
int,
Field(description="Result page number (1-based).", ge=1),
] = 1,
time_range: Annotated[
Literal["day", "week", "month", "year"] | None,
Field(description="Restrict results to a time range."),
] = None,
safesearch: Annotated[
Literal[0, 1, 2],
Field(description="Safe search level: 0=off, 1=moderate, 2=strict."),
] = 0,
) -> list[dict]:
"""Search the web via SearxNG and return a list of results.
Each result contains: title, url, content (snippet), engine, category.
Returns at most the results provided by the SearxNG instance (typically 10 per page).
"""
data = await _search(
base_url=settings.base_url,
query=query,
categories=categories,
engines=engines,
language=language,
pageno=pageno,
time_range=time_range,
safesearch=safesearch,
)
results = data.get("results", [])
return [
{
"title": r.get("title", ""),
"url": r.get("url", ""),
"content": r.get("content", ""),
"engine": r.get("engine", ""),
"category": r.get("category", ""),
}
for r in results
]
@mcp.tool
async def fetch(
url: Annotated[str, Field(description="URL of the page to fetch and extract.")],
output_format: Annotated[
Literal["markdown", "txt", "json"],
Field(description="Output format for extracted content: markdown, txt, or json (includes metadata)."),
] = "markdown",
include_tables: Annotated[
bool,
Field(description="Include tables in extracted content."),
] = True,
include_images: Annotated[
bool,
Field(description="Include image descriptions in extracted content."),
] = False,
include_links: Annotated[
bool,
Field(description="Include hyperlinks in extracted content."),
] = False,
max_chars: Annotated[
int,
Field(description="Maximum characters to return. 0 means no limit.", ge=0),
] = 2000,
start: Annotated[
int,
Field(description="Start character offset for slicing extracted content.", ge=0),
] = 0,
end: Annotated[
int,
Field(description="End character offset for slicing extracted content. 0 means read to end of content.", ge=0),
] = 0,
use_cache: Annotated[
bool,
Field(description="Return cached content if available. Set to false to force a fresh download."),
] = True,
) -> dict:
"""Fetch a URL and extract its main content, stripping navigation, ads, and boilerplate.
Returns a preview of the content (up to max_chars) plus total_chars and truncated flag.
If truncated, use start/end to page through the full content, or read the resource
web://fetch?url=<url>&start=<n>&end=<m> for specific slices.
"""
content = await _fetch_and_extract(url, output_format, include_tables, include_images, include_links, use_cache)
total_chars = len(content)
# Apply explicit start/end slice first (takes priority over max_chars windowing)
if start > 0 or end > 0:
slice_end = end if end > 0 else None
sliced = content[start:slice_end]
return {
"content": sliced,
"total_chars": total_chars,
"truncated": False,
}
if max_chars > 0 and total_chars > max_chars:
return {
"content": content[:max_chars],
"total_chars": total_chars,
"truncated": True,
}
return {
"content": content,
"total_chars": total_chars,
"truncated": False,
}
@mcp.resource(
"web://fetch{?url,start,end,output_format,include_links,include_tables,include_images,use_cache}",
mime_type="text/markdown",
)
async def fetch_slice(
url: str = "",
start: int = 0,
end: int = 0,
output_format: str = "markdown",
include_links: bool = False,
include_tables: bool = True,
include_images: bool = False,
use_cache: bool = True,
) -> str:
"""Fetch a URL and return a character slice of the extracted content.
Use start/end to page through large documents (end=0 means read to end of content).
Example: web://fetch?url=https://example.com/page&start=2000&end=4000
"""
if not url:
raise ValueError("url parameter is required")
content = await _fetch_and_extract(url, output_format, include_tables, include_images, include_links, use_cache)
if end > 0:
return content[start:end]
return content[start:]

1351
uv.lock generated Normal file

File diff suppressed because it is too large Load diff