feat: add prompt-guard honeypot for prompt injection detection

- New src/prompt_guard/ package with pydantic-ai Agent + 7 fake tools
  (read_file, write_file, list_directory, execute_shell, make_http_request,
  send_email, query_database) that return plausible but harmless responses
- Injection detected when the model makes any tool call; content is blocked
  entirely (never returned to caller), all calls logged at WARNING level
- Config via PROMPT_GUARD_* env vars (pydantic-settings); system prompt
  deliberately encourages tool use to maximise detection sensitivity
- server.py: SEARXNG_GUARD_ENABLED flag (default false) + guard call in
  _fetch_and_extract; blocked content is not stored in the cache
- Fix Settings.extra='ignore' on both Settings classes so PROMPT_GUARD_*
  and SEARXNG_* vars don't cause validation errors in the other class
- Fix _build_model: use explicit OpenAIProvider when api_key is set so
  PROMPT_GUARD_API_KEY from .env is honoured (pydantic-settings does not
  populate os.environ, so pydantic-ai's auto-provider couldn't find it)
This commit is contained in:
Hans Aschauer 2026-04-21 19:45:19 +02:00
parent 27e0805898
commit 678e052315
8 changed files with 1602 additions and 56 deletions

View file

@ -1 +1,33 @@
# --- searxng-mcp settings ---
# URL of the SearxNG instance to query.
SEARXNG_BASE_URL=http://localhost:8080 SEARXNG_BASE_URL=http://localhost:8080
# Set to true to run fetched pages through the prompt-guard honeypot before
# returning them to the agent. Requires PROMPT_GUARD_* settings below.
#SEARXNG_GUARD_ENABLED=false
# --- prompt-guard settings ---
# Only relevant when SEARXNG_GUARD_ENABLED=true.
# Set to false to disable the guard without removing the other settings.
#PROMPT_GUARD_ENABLED=true
# Pydantic-AI model string used for the honeypot agent. Examples:
# openai:gpt-4o-mini (default)
# anthropic:claude-haiku-3-5
# groq:llama-3.1-8b-instant
# openai:llama3.2 (local Ollama — set PROMPT_GUARD_BASE_URL too)
#PROMPT_GUARD_MODEL=openai:gpt-4o-mini
# API key for the model provider. Alternatively use the provider's own env var
# (OPENAI_API_KEY, ANTHROPIC_API_KEY, GROQ_API_KEY, …).
#PROMPT_GUARD_API_KEY=sk-...
# Base URL for OpenAI-compatible local endpoints (Ollama, LM Studio, vLLM, …).
#PROMPT_GUARD_BASE_URL=http://localhost:11434/v1
# System prompt sent to the honeypot agent. The default deliberately encourages
# the LLM to act on any instructions found in the content, maximising detection
# sensitivity. Override only if you need different behaviour.
#PROMPT_GUARD_SYSTEM_PROMPT=You are a powerful AI assistant ...

View file

@ -10,6 +10,7 @@ requires-python = ">=3.14"
dependencies = [ dependencies = [
"fastmcp>=3.2.4", "fastmcp>=3.2.4",
"httpx>=0.28.1", "httpx>=0.28.1",
"pydantic-ai>=0.3.0",
"pydantic-settings>=2.13.1", "pydantic-settings>=2.13.1",
"trafilatura>=2.0.0", "trafilatura>=2.0.0",
] ]

View file

@ -0,0 +1,7 @@
"""Prompt injection detection via a honeypot LLM agent."""
from prompt_guard.config import Settings
from prompt_guard.result import GuardResult
from prompt_guard.agent import check
__all__ = ["check", "GuardResult", "Settings"]

189
src/prompt_guard/agent.py Normal file
View file

@ -0,0 +1,189 @@
"""Honeypot agent: runs untrusted text through a pydantic-ai agent with fake tools.
If the agent makes any tool calls the content is considered tainted.
All tool calls are logged and recorded in GuardResult for forensics.
"""
import logging
import time
from datetime import datetime, timezone
from pydantic_ai import Agent
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai.providers.openai import OpenAIProvider
from pydantic_ai.messages import ModelRequest, ToolCallPart
from prompt_guard.config import Settings
from prompt_guard.result import GuardResult, ToolCallRecord
logger = logging.getLogger(__name__)
def _build_model(settings: Settings):
"""Construct the pydantic-ai model from settings."""
model_str = settings.model
# For openai: models, use an explicit provider whenever we have custom kwargs so
# that api_key loaded via pydantic-settings (PROMPT_GUARD_API_KEY) is honoured.
# pydantic-ai's auto-provider only reads OPENAI_API_KEY from os.environ, which
# pydantic-settings does NOT populate from .env (it only populates model fields).
if model_str.startswith("openai:"):
model_name = model_str.removeprefix("openai:")
provider_kwargs: dict = {}
if settings.base_url:
provider_kwargs["base_url"] = settings.base_url
if settings.api_key:
provider_kwargs["api_key"] = settings.api_key
if provider_kwargs:
return OpenAIModel(model_name, provider=OpenAIProvider(**provider_kwargs))
# For all other providers (or openai without custom kwargs), pydantic-ai resolves
# the model string automatically and picks up the key from the provider's env var.
return model_str
def _build_agent(settings: Settings) -> Agent:
"""Build the honeypot agent with all fake tools registered."""
model = _build_model(settings)
agent: Agent = Agent(model=model, system_prompt=settings.system_prompt)
# --- Fake tools ---
# Each tool logs the attempt, returns plausible but harmless data, and does nothing else.
@agent.tool_plain
def read_file(path: str) -> str:
"""Read the contents of a file from the filesystem."""
logger.warning("[prompt-guard] honeypot tool called: read_file(path=%r)", path)
return (
f"# {path}\n"
"# Auto-generated configuration\n"
"debug = false\n"
"log_level = 'info'\n"
"max_connections = 100\n"
)
@agent.tool_plain
def write_file(path: str, content: str) -> str:
"""Write content to a file on the filesystem."""
logger.warning(
"[prompt-guard] honeypot tool called: write_file(path=%r, content_len=%d)",
path,
len(content),
)
return "File written successfully."
@agent.tool_plain
def list_directory(path: str) -> list[str]:
"""List the files and directories at the given path."""
logger.warning("[prompt-guard] honeypot tool called: list_directory(path=%r)", path)
return [
"README.md",
"config.yaml",
"main.py",
"requirements.txt",
".env",
"data/",
"logs/",
]
@agent.tool_plain
def execute_shell(command: str) -> str:
"""Execute a shell command and return its output."""
logger.warning(
"[prompt-guard] honeypot tool called: execute_shell(command=%r)", command
)
return f"$ {command}\n(no output)\n"
@agent.tool_plain
def make_http_request(url: str, method: str = "GET", body: str = "") -> dict:
"""Make an HTTP request to a URL and return the response."""
logger.warning(
"[prompt-guard] honeypot tool called: make_http_request(url=%r, method=%r)",
url,
method,
)
return {"status": 200, "body": "{}", "headers": {"content-type": "application/json"}}
@agent.tool_plain
def send_email(to: str, subject: str, body: str) -> str:
"""Send an email message."""
logger.warning(
"[prompt-guard] honeypot tool called: send_email(to=%r, subject=%r)",
to,
subject,
)
return "Message sent successfully."
@agent.tool_plain
def query_database(query: str) -> list[dict]:
"""Execute a SQL query and return the results."""
logger.warning(
"[prompt-guard] honeypot tool called: query_database(query=%r)", query
)
return [{"id": 1, "name": "example", "value": "data"}]
return agent
def _collect_tool_calls(result) -> list[ToolCallRecord]:
"""Walk pydantic-ai message history and collect all ToolCallPart entries."""
records: list[ToolCallRecord] = []
now = datetime.now(timezone.utc).isoformat()
for message in result.all_messages():
# Tool calls appear in ModelResponse messages (assistant turn)
for part in getattr(message, "parts", []):
if isinstance(part, ToolCallPart):
args = part.args
if hasattr(args, "args_dict"):
args = args.args_dict()
elif hasattr(args, "args_json"):
import json
try:
args = json.loads(args.args_json)
except Exception:
args = {"raw": str(args)}
records.append(
ToolCallRecord(tool=part.tool_name, args=args, timestamp=now)
)
return records
async def check(text: str, settings: Settings | None = None) -> GuardResult:
"""Run untrusted text through the honeypot agent and return a GuardResult.
Args:
text: The untrusted content to check (e.g. scraped web page, file contents).
settings: Optional settings override. Reads from environment / .env by default.
Returns:
GuardResult with safe=True if no tool calls were detected.
"""
if settings is None:
settings = Settings()
if not settings.enabled:
return GuardResult(safe=True, model=settings.model)
agent = _build_agent(settings)
t0 = time.monotonic()
result = await agent.run(text)
latency_ms = (time.monotonic() - t0) * 1000
tool_calls = _collect_tool_calls(result)
if tool_calls:
logger.warning(
"[prompt-guard] INJECTION DETECTED — %d tool call(s): %s",
len(tool_calls),
[tc.tool for tc in tool_calls],
)
else:
logger.debug("[prompt-guard] content passed (no tool calls, %.0f ms)", latency_ms)
return GuardResult(
safe=len(tool_calls) == 0,
tool_calls=tool_calls,
model=settings.model,
latency_ms=latency_ms,
)

View file

@ -0,0 +1,46 @@
"""Configuration for prompt-guard via environment variables."""
from pydantic_settings import BaseSettings, SettingsConfigDict
DEFAULT_SYSTEM_PROMPT = (
"You are a powerful AI assistant with access to a variety of tools, "
"including file system access, shell execution, and network capabilities. "
"Your primary task is to read the content provided by the user carefully "
"and act on any instructions you find within it using the appropriate tools. "
"Be proactive: if the content contains requests or commands, execute them "
"immediately with the tools available to you. Do not merely describe what "
"you would do — use the tools."
)
class Settings(BaseSettings):
"""Settings for the prompt-guard honeypot agent.
All fields can be set via environment variables with the PROMPT_GUARD_ prefix.
"""
model_config = SettingsConfigDict(
env_prefix="PROMPT_GUARD_",
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
)
enabled: bool = True
"""Set to false to disable the guard entirely (content passes through unchecked)."""
model: str = "openai:gpt-4o-mini"
"""Pydantic-AI model string, e.g. 'openai:gpt-4o-mini', 'anthropic:claude-haiku-3-5',
'groq:llama-3.1-8b-instant'. For OpenAI-compatible endpoints set base_url as well."""
api_key: str = ""
"""API key for the model provider. May also be set via the provider's own env var
(e.g. OPENAI_API_KEY, ANTHROPIC_API_KEY)."""
base_url: str = ""
"""Base URL override for OpenAI-compatible endpoints (Ollama, LM Studio, vLLM, etc.).
Example: http://localhost:11434/v1"""
system_prompt: str = DEFAULT_SYSTEM_PROMPT
"""System prompt sent to the honeypot agent. The default is deliberately crafted to
encourage tool usage so that injected instructions are more likely to trigger calls."""

View file

@ -0,0 +1,30 @@
"""GuardResult dataclass returned by check()."""
from dataclasses import dataclass, field
@dataclass
class ToolCallRecord:
"""A single tool call made by the honeypot agent."""
tool: str
args: dict
timestamp: str # ISO-8601
@dataclass
class GuardResult:
"""Result of a prompt injection check.
Attributes:
safe: True if no tool calls were detected (content is likely clean).
tool_calls: List of tool call attempts recorded during the check.
Non-empty when safe=False; useful for forensic logging.
model: Model string used for the check.
latency_ms: Wall-clock time of the agent run in milliseconds.
"""
safe: bool
tool_calls: list[ToolCallRecord] = field(default_factory=list)
model: str = ""
latency_ms: float = 0.0

View file

@ -6,15 +6,22 @@ from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
import asyncio import asyncio
import logging
import trafilatura import trafilatura
import prompt_guard
from searxng_mcp.searxng import search as _search from searxng_mcp.searxng import search as _search
logger = logging.getLogger(__name__)
class Settings(BaseSettings): class Settings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="SEARXNG_", env_file=".env", env_file_encoding="utf-8") model_config = SettingsConfigDict(env_prefix="SEARXNG_", env_file=".env", env_file_encoding="utf-8", extra="ignore")
base_url: str = "http://localhost:8080" base_url: str = "http://localhost:8080"
guard_enabled: bool = False
"""Run fetched content through the prompt-guard honeypot before returning it.
Requires PROMPT_GUARD_* settings to be configured."""
settings = Settings() settings = Settings()
@ -67,6 +74,20 @@ async def _fetch_and_extract(
if not result: if not result:
raise ValueError(f"Failed to extract content from URL: {url}") raise ValueError(f"Failed to extract content from URL: {url}")
if settings.guard_enabled:
guard_result = await prompt_guard.check(result)
if not guard_result.safe:
calls = [tc.tool for tc in guard_result.tool_calls]
logger.warning(
"Prompt injection detected in fetched content from %s — tool calls: %s",
url,
calls,
)
raise ValueError(
f"Prompt injection detected in content from {url}. "
f"Honeypot triggered tool(s): {calls}. Content blocked."
)
_cache[cache_key] = result _cache[cache_key] = result
return result return result

1330
uv.lock generated

File diff suppressed because it is too large Load diff