-
Notifications
You must be signed in to change notification settings - Fork 244
Description
🧭 Epic
Title: AI Middleware Integration / Plugin Framework
Goal: Create a robust, extensible plugin framework that enables both self-contained plugins and external middleware service integrations for AI safety, security, and business logic processing.
Why now: Foundation for 0.8.0 guardrails (#229, #271) and 0.6.0 security features (#221, #257, #426). We need this infrastructure before implementing PII masking, input validation, and policy engines.
Note: Framework supports two plugin types: self-contained plugins (run in-process) and middleware plugins (integrate with external microservices via configured endpoints + auth). All plugins use the same interface and configuration system.
Notes:
- Just like SELinux, we should also be able to set a plugin to: enforcing vs permissive vs disabled (with fine-grained controls: per-user, per virtual server, per tool, per tenant)
🧭 Type of Feature
- New functionality (core feature)
- Architecture enhancement
- Security hardening (foundation)
- Developer experience improvement
🙋♂️ User Story 1 - Plugin Discovery & Registration
As a: Gateway administrator
I want: to dynamically discover, load, and configure plugins through API and UI
So that: I can extend gateway capabilities without code changes or restarts.
✅ Acceptance Criteria
Scenario: Register new AI middleware plugin
Given the plugin framework is enabled
When I POST a plugin configuration to /admin/plugins
Then the plugin is loaded and appears in the Admin UI
And the plugin processes requests according to its configuration
🙋♂️ User Story 2 - Request Pipeline Integration
As a: Security engineer
I want: plugins to intercept and modify requests/responses in a configurable order
So that: I can implement layered security (LlamaGuard → PII filter → custom validators).
✅ Acceptance Criteria
Scenario: Multi-stage plugin processing
Given plugins A, B, C are configured with priority 1, 2, 3
When a request flows through the gateway
Then plugins execute in order: A → B → C → tool → C → B → A
And each plugin can modify or reject the payload
🙋♂️ User Story 3 - AI Middleware Integration
As a: AI safety engineer
I want: to integrate LlamaGuard, OpenAI Moderation, and custom LLM-based filters
So that: harmful content is blocked before reaching tools or being returned to clients.
✅ Acceptance Criteria
Scenario: LlamaGuard content filtering
Given LlamaGuard plugin is configured with "block-harmful" policy
When a request contains potentially harmful content
Then LlamaGuard analyzes the content via API/local model
And the request is blocked with appropriate error message
🙋♂️ User Story 4 - Configuration Management
As a: DevOps engineer
I want: plugin configurations to be exportable, versionable, and environment-specific
So that: I can manage plugin deployments across dev/staging/production consistently.
✅ Acceptance Criteria
Scenario: Export plugin configuration
Given multiple plugins are configured with custom rules
When I GET /admin/plugins/export
Then I receive a JSON/YAML file with all plugin configurations
And I can import this configuration in another environment
📐 Design Sketch
flowchart TD
Request[Incoming Request] --> PM[Plugin Manager]
PM --> PI1[Input Plugin 1]
PI1 --> PI2[Input Plugin 2]
PI2 --> Core[Core Gateway Logic]
Core --> PO1[Output Plugin 1]
PO1 --> PO2[Output Plugin 2]
PO2 --> Response[Outgoing Response]
subgraph Plugin Types
AI[AI Middleware<br/>LlamaGuard, OpenAI Mod]
PII[PII Scanners<br/>Custom Regex, NER]
Auth[Custom Auth<br/>Enterprise SSO]
Metrics[Custom Metrics<br/>Business Logic]
end
subgraph Plugin Registry
Discovery[Plugin Discovery]
Config[Configuration Store]
Health[Health Monitoring]
end
Component | Change | Detail |
---|---|---|
plugin_framework/ |
NEW | Core plugin infrastructure |
plugin_manager.py |
NEW | Plugin lifecycle management |
middleware_pipeline.py |
NEW | Request/response processing chain |
plugin_registry.py |
NEW | Plugin discovery and configuration |
builtin_plugins/ |
NEW | Included plugins (PII, validation) |
config.py |
UPDATE | Plugin configuration settings |
main.py |
UPDATE | Plugin middleware integration |
Admin UI | UPDATE | Plugin management interface |
🔄 Roll-out Plan
-
Phase 0 (v0.6.0 - August 19, 2025): Core plugin framework infrastructure
- Plugin interface definitions and base classes
- Plugin manager with lifecycle management
- Configuration schema and validation
- Enables: [SECURITY FEATURE]: Gateway-Level Input Validation & Output Sanitization (prevent traversal) #221 Gateway-Level Input Validation & Output Sanitization
-
Phase 1 (v0.7.0 - September 2, 2025): Pipeline integration + built-in plugins
- Middleware pipeline implementation
- Self-contained validation and PII plugins
- Admin UI for plugin management
- External service integration framework
-
Phase 2 (v0.8.0 - September 16, 2025): AI middleware integrations
- Delivers: [SECURITY FEATURE]: Guardrails - Input/Output Sanitization & PII Masking #229 Guardrails - Input/Output Sanitization & PII Masking
- Delivers: [SECURITY FEATURE]: Policy-as-Code Engine - Rego Prototype #271 Policy-as-Code Engine - Rego Prototype
- LlamaGuard, OpenAI Moderation, and custom filter plugins
- External microservice authentication and configuration
-
Phase 3 (v0.9.0+): Advanced features
- Plugin marketplace/registry integration
- Performance monitoring and caching for external services
- Multi-tenant plugin isolation and scoping
📝 Technical Specification
Plugin Interface
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Union
from enum import Enum
class PluginType(Enum):
INPUT_FILTER = "input_filter"
OUTPUT_FILTER = "output_filter"
AI_MIDDLEWARE = "ai_middleware"
AUTHENTICATOR = "authenticator"
METRICS_COLLECTOR = "metrics_collector"
class PluginExecutionMode(Enum):
SELF_CONTAINED = "self_contained" # Runs in-process
EXTERNAL_SERVICE = "external_service" # Calls external microservice
class PluginResult:
def __init__(self,
continue_processing: bool = True,
modified_payload: Optional[Any] = None,
error_message: Optional[str] = None,
metadata: Optional[Dict] = None):
self.continue_processing = continue_processing
self.modified_payload = modified_payload
self.error_message = error_message
self.metadata = metadata or {}
class BasePlugin(ABC):
def __init__(self, config: Dict[str, Any]):
self.config = config
self.enabled = config.get("enabled", True)
self.execution_mode = PluginExecutionMode(config.get("execution_mode", "self_contained"))
@abstractmethod
async def process(self, payload: Any, context: Dict[str, Any]) -> PluginResult:
"""Process the payload and return result"""
pass
@abstractmethod
def get_info(self) -> Dict[str, Any]:
"""Return plugin metadata"""
pass
async def health_check(self) -> bool:
"""Check if plugin is healthy"""
return True
class ExternalServicePlugin(BasePlugin):
"""Base class for plugins that call external microservices"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.service_url = config.get("service_url")
self.auth_config = config.get("auth", {})
self.timeout = config.get("timeout", 30)
async def call_external_service(self, payload: Any) -> Any:
"""Make authenticated call to external service"""
# Implementation for HTTP calls with auth
pass
Plugin Configuration Schema
class PluginConfig(BaseModel):
name: str
type: PluginType
execution_mode: PluginExecutionMode = PluginExecutionMode.SELF_CONTAINED
enabled: bool = True
priority: int = 100 # Lower = higher priority
module_path: Optional[str] = None # For self-contained plugins
class_name: Optional[str] = None # For self-contained plugins
service_url: Optional[str] = None # For external service plugins
auth: Optional[Dict[str, Any]] = None # Auth config for external services
timeout: int = 30 # Timeout for external service calls
config: Dict[str, Any] = {}
conditions: Optional[Dict[str, Any]] = None # When to apply
class PluginCondition(BaseModel):
"""Conditions for when plugin should execute"""
server_ids: Optional[List[str]] = None
tool_names: Optional[List[str]] = None
user_patterns: Optional[List[str]] = None
content_types: Optional[List[str]] = None
class ExternalServiceAuth(BaseModel):
"""Authentication configuration for external services"""
type: str # "bearer", "basic", "api_key", "custom_header"
token: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None
api_key: Optional[str] = None
header_name: Optional[str] = None
header_value: Optional[str] = None
Built-in Plugin Examples
# Self-contained PII Scanner Plugin
class PIIFilterPlugin(BasePlugin):
"""Self-contained plugin that runs regex-based PII detection in-process"""
async def process(self, payload: Any, context: Dict) -> PluginResult:
pii_found = self._scan_for_pii(payload)
if pii_found and self.config.get("block_pii"):
return PluginResult(
continue_processing=False,
error_message="PII detected in request"
)
cleaned_payload = self._mask_pii(payload) if pii_found else payload
return PluginResult(modified_payload=cleaned_payload)
# External Service LlamaGuard Plugin
class LlamaGuardPlugin(ExternalServicePlugin):
"""Middleware plugin that calls external LlamaGuard microservice"""
async def process(self, payload: Any, context: Dict) -> PluginResult:
# Call external LlamaGuard service
safety_result = await self.call_external_service({
"text": payload,
"policy": self.config.get("policy", "default")
})
safety_score = safety_result.get("safety_score", 1.0)
if safety_score < self.config.get("safety_threshold", 0.8):
return PluginResult(
continue_processing=False,
error_message="Content blocked by safety filter",
metadata={"safety_score": safety_score}
)
return PluginResult()
# External OpenAI Moderation Plugin
class OpenAIModerationPlugin(ExternalServicePlugin):
"""Middleware plugin that calls OpenAI Moderation API"""
async def process(self, payload: Any, context: Dict) -> PluginResult:
moderation_result = await self.call_external_service({
"input": payload,
"model": self.config.get("model", "text-moderation-stable")
})
if moderation_result.get("flagged", False):
categories = [cat for cat, flagged in moderation_result.get("categories", {}).items() if flagged]
return PluginResult(
continue_processing=False,
error_message=f"Content flagged for: {', '.join(categories)}",
metadata={"categories": categories}
)
return PluginResult()
🏗️ Implementation Architecture
Plugin Manager
- Discovery: Scan plugin directories, load from configuration
- Lifecycle: Load, initialize, configure, health-check, unload plugins
- Registry: Maintain plugin metadata, dependencies, and status
- Error Handling: Graceful failure, plugin isolation, fallback behavior
Middleware Pipeline
- Execution Order: Priority-based plugin ordering with dependency resolution
- Context Passing: Rich context object with request metadata, user info, etc.
- Performance: Async processing, optional caching, timeout handling
- Monitoring: Plugin execution metrics, latency tracking, error rates
Configuration Management
- Storage: Database-backed with environment overrides
- Validation: JSON Schema validation for plugin configs
- Hot Reload: Runtime configuration changes without restart
- Import/Export: YAML/JSON format for configuration portability
📊 Performance Considerations
- Async Processing: All plugins must be async-compatible
- Caching Layer: Optional Redis-backed caching for expensive AI calls
- Circuit Breaker: Auto-disable failing plugins to maintain system stability
- Resource Limits: Memory and execution time limits per plugin
- Parallel Execution: Independent plugins can run concurrently where possible
🔒 Security Features
- Plugin Isolation: Sandbox plugins to prevent system access
- Configuration Validation: Strict schema validation for all plugin configs
- Audit Logging: Track all plugin executions and configuration changes
- Permission Model: Role-based access to plugin management
- Secret Management: Secure handling of API keys and credentials
📋 Roadmap Dependencies
This plugin framework is foundational infrastructure that enables multiple upcoming roadmap items:
Direct Dependencies (Framework Required)
- [SECURITY FEATURE]: Gateway-Level Input Validation & Output Sanitization (prevent traversal) #221 (v0.6.0) - Gateway-Level Input Validation & Output Sanitization
- [SECURITY FEATURE]: Guardrails - Input/Output Sanitization & PII Masking #229 (v0.8.0) - Guardrails - Input/Output Sanitization & PII Masking
- [SECURITY FEATURE]: Policy-as-Code Engine - Rego Prototype #271 (v0.8.0) - Policy-as-Code Engine - Rego Prototype
- [SECURITY FEATURE]: Gateway-Level Rate Limiting, DDoS Protection & Abuse Detection #257 (v0.6.0) - Gateway-Level Rate Limiting, DDoS Protection & Abuse Detection
Enhanced by Framework
- [AUTH FEATURE]: Authentication & Authorization - SSO + Identity-Provider Integration #220 (v0.5.0) - SSO + Identity-Provider Integration (custom auth plugins)
- [SECURITY FEATURE]: Role-Based Access Control (RBAC) - User/Team/Global Scopes for full multi-tenancy support #283 (v0.7.0) - Role-Based Access Control (authorization plugins)
- [Feature Request]: Prometheus Metrics Instrumentation using prometheus-fastapi-instrumentator #218 (v0.5.0) - Prometheus Metrics (custom metrics collector plugins)
- [Feature Request]: Add OpenLLMetry Integration for Observability #175 (v0.7.0) - OpenLLMetry Integration (observability plugins)
Timeline Alignment
- Target Release: v0.6.0 (August 19, 2025) - "Security, Scale & Smart Automation"
- Completion Required Before: v0.8.0 guardrails implementation
- Dependencies: Requires completion of v0.5.0 enterprise auth/config features
📝 Example Use Cases
- Guardrails Implementation: PII filtering + LlamaGuard + custom business rules
- Enterprise Auth: Custom SSO integration + role-based tool access
- Compliance Logging: Audit trail plugin for SOX/GDPR compliance
- Performance Optimization: Caching plugin for expensive tool calls
- A/B Testing: Traffic splitting plugin for testing new AI models
📣 Dependencies & Next Steps
Foundational Requirements (v0.6.0):
- Plugin interface standardization with self-contained + external service support
- Configuration schema design with auth/service integration
- Admin UI wireframes for plugin management (extends existing HTMX interface)
- Security model for plugin execution and external service calls
Integration Points:
- FastAPI middleware integration (builds on existing pipeline)
- Database schema for plugin configs (extends current SQLAlchemy models)
- Redis integration for external service caching (uses existing cache layer)
- Observability hooks for monitoring (integrates with [Feature Request]: Prometheus Metrics Instrumentation using prometheus-fastapi-instrumentator #218 Prometheus work)
Roadmap Enablement:
This framework is the critical path for v0.8.0 enterprise security features and must be completed in v0.6.0 to avoid roadmap delays. The self-contained plugin architecture supports immediate security needs, while external service integration enables enterprise AI safety tool integration.
Future Epics Enabled:
- ✅ [SECURITY FEATURE]: Guardrails - Input/Output Sanitization & PII Masking #229: Guardrails - Input/Output Sanitization & PII Masking
- ✅ [SECURITY FEATURE]: Policy-as-Code Engine - Rego Prototype #271: Policy-as-Code Engine - Rego Prototype
- ✅ [SECURITY FEATURE]: Gateway-Level Input Validation & Output Sanitization (prevent traversal) #221: Gateway-Level Input Validation & Output Sanitization
- 🔄 Enterprise SSO Integration ([AUTH FEATURE]: Authentication & Authorization - SSO + Identity-Provider Integration #220, [SECURITY FEATURE]: Role-Based Access Control (RBAC) - User/Team/Global Scopes for full multi-tenancy support #283)
- 🔄 Advanced Rate Limiting & Quotas ([SECURITY FEATURE]: Gateway-Level Rate Limiting, DDoS Protection & Abuse Detection #257)
- 🔄 Custom Business Logic Hooks