Skip to content

[Feature Request]: AI Middleware Integration / Plugin Framework for extensible gateway capabilities #319

@crivetimihai

Description

@crivetimihai

🧭 Epic

Title: AI Middleware Integration / Plugin Framework
Goal: Create a robust, extensible plugin framework that enables both self-contained plugins and external middleware service integrations for AI safety, security, and business logic processing.
Why now: Foundation for 0.8.0 guardrails (#229, #271) and 0.6.0 security features (#221, #257, #426). We need this infrastructure before implementing PII masking, input validation, and policy engines.
Note: Framework supports two plugin types: self-contained plugins (run in-process) and middleware plugins (integrate with external microservices via configured endpoints + auth). All plugins use the same interface and configuration system.

Notes:

  • Just like SELinux, we should also be able to set a plugin to: enforcing vs permissive vs disabled (with fine-grained controls: per-user, per virtual server, per tool, per tenant)

🧭 Type of Feature

  • New functionality (core feature)
  • Architecture enhancement
  • Security hardening (foundation)
  • Developer experience improvement

🙋‍♂️ User Story 1 - Plugin Discovery & Registration

As a: Gateway administrator
I want: to dynamically discover, load, and configure plugins through API and UI
So that: I can extend gateway capabilities without code changes or restarts.

✅ Acceptance Criteria

Scenario: Register new AI middleware plugin
Given the plugin framework is enabled
When I POST a plugin configuration to /admin/plugins
Then the plugin is loaded and appears in the Admin UI
And the plugin processes requests according to its configuration

🙋‍♂️ User Story 2 - Request Pipeline Integration

As a: Security engineer
I want: plugins to intercept and modify requests/responses in a configurable order
So that: I can implement layered security (LlamaGuard → PII filter → custom validators).

✅ Acceptance Criteria

Scenario: Multi-stage plugin processing
Given plugins A, B, C are configured with priority 1, 2, 3
When a request flows through the gateway
Then plugins execute in order: A → B → C → tool → C → B → A
And each plugin can modify or reject the payload

🙋‍♂️ User Story 3 - AI Middleware Integration

As a: AI safety engineer
I want: to integrate LlamaGuard, OpenAI Moderation, and custom LLM-based filters
So that: harmful content is blocked before reaching tools or being returned to clients.

✅ Acceptance Criteria

Scenario: LlamaGuard content filtering
Given LlamaGuard plugin is configured with "block-harmful" policy
When a request contains potentially harmful content
Then LlamaGuard analyzes the content via API/local model
And the request is blocked with appropriate error message

🙋‍♂️ User Story 4 - Configuration Management

As a: DevOps engineer
I want: plugin configurations to be exportable, versionable, and environment-specific
So that: I can manage plugin deployments across dev/staging/production consistently.

✅ Acceptance Criteria

Scenario: Export plugin configuration
Given multiple plugins are configured with custom rules
When I GET /admin/plugins/export
Then I receive a JSON/YAML file with all plugin configurations
And I can import this configuration in another environment

📐 Design Sketch

flowchart TD
    Request[Incoming Request] --> PM[Plugin Manager]
    PM --> PI1[Input Plugin 1]
    PI1 --> PI2[Input Plugin 2]
    PI2 --> Core[Core Gateway Logic]
    Core --> PO1[Output Plugin 1]
    PO1 --> PO2[Output Plugin 2]
    PO2 --> Response[Outgoing Response]
    
    subgraph Plugin Types
        AI[AI Middleware<br/>LlamaGuard, OpenAI Mod]
        PII[PII Scanners<br/>Custom Regex, NER]
        Auth[Custom Auth<br/>Enterprise SSO]
        Metrics[Custom Metrics<br/>Business Logic]
    end
    
    subgraph Plugin Registry
        Discovery[Plugin Discovery]
        Config[Configuration Store]
        Health[Health Monitoring]
    end
Loading
Component Change Detail
plugin_framework/ NEW Core plugin infrastructure
plugin_manager.py NEW Plugin lifecycle management
middleware_pipeline.py NEW Request/response processing chain
plugin_registry.py NEW Plugin discovery and configuration
builtin_plugins/ NEW Included plugins (PII, validation)
config.py UPDATE Plugin configuration settings
main.py UPDATE Plugin middleware integration
Admin UI UPDATE Plugin management interface

🔄 Roll-out Plan

  1. Phase 0 (v0.6.0 - August 19, 2025): Core plugin framework infrastructure

  2. Phase 1 (v0.7.0 - September 2, 2025): Pipeline integration + built-in plugins

    • Middleware pipeline implementation
    • Self-contained validation and PII plugins
    • Admin UI for plugin management
    • External service integration framework
  3. Phase 2 (v0.8.0 - September 16, 2025): AI middleware integrations

  4. Phase 3 (v0.9.0+): Advanced features

    • Plugin marketplace/registry integration
    • Performance monitoring and caching for external services
    • Multi-tenant plugin isolation and scoping

📝 Technical Specification

Plugin Interface

from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Union
from enum import Enum

class PluginType(Enum):
    INPUT_FILTER = "input_filter"
    OUTPUT_FILTER = "output_filter"
    AI_MIDDLEWARE = "ai_middleware"
    AUTHENTICATOR = "authenticator"
    METRICS_COLLECTOR = "metrics_collector"

class PluginExecutionMode(Enum):
    SELF_CONTAINED = "self_contained"  # Runs in-process
    EXTERNAL_SERVICE = "external_service"  # Calls external microservice

class PluginResult:
    def __init__(self, 
                 continue_processing: bool = True,
                 modified_payload: Optional[Any] = None,
                 error_message: Optional[str] = None,
                 metadata: Optional[Dict] = None):
        self.continue_processing = continue_processing
        self.modified_payload = modified_payload
        self.error_message = error_message
        self.metadata = metadata or {}

class BasePlugin(ABC):
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.enabled = config.get("enabled", True)
        self.execution_mode = PluginExecutionMode(config.get("execution_mode", "self_contained"))
        
    @abstractmethod
    async def process(self, payload: Any, context: Dict[str, Any]) -> PluginResult:
        """Process the payload and return result"""
        pass
        
    @abstractmethod
    def get_info(self) -> Dict[str, Any]:
        """Return plugin metadata"""
        pass
        
    async def health_check(self) -> bool:
        """Check if plugin is healthy"""
        return True

class ExternalServicePlugin(BasePlugin):
    """Base class for plugins that call external microservices"""
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.service_url = config.get("service_url")
        self.auth_config = config.get("auth", {})
        self.timeout = config.get("timeout", 30)
        
    async def call_external_service(self, payload: Any) -> Any:
        """Make authenticated call to external service"""
        # Implementation for HTTP calls with auth
        pass

Plugin Configuration Schema

class PluginConfig(BaseModel):
    name: str
    type: PluginType
    execution_mode: PluginExecutionMode = PluginExecutionMode.SELF_CONTAINED
    enabled: bool = True
    priority: int = 100  # Lower = higher priority
    module_path: Optional[str] = None  # For self-contained plugins
    class_name: Optional[str] = None   # For self-contained plugins
    service_url: Optional[str] = None  # For external service plugins
    auth: Optional[Dict[str, Any]] = None  # Auth config for external services
    timeout: int = 30  # Timeout for external service calls
    config: Dict[str, Any] = {}
    conditions: Optional[Dict[str, Any]] = None  # When to apply
    
class PluginCondition(BaseModel):
    """Conditions for when plugin should execute"""
    server_ids: Optional[List[str]] = None
    tool_names: Optional[List[str]] = None
    user_patterns: Optional[List[str]] = None
    content_types: Optional[List[str]] = None

class ExternalServiceAuth(BaseModel):
    """Authentication configuration for external services"""
    type: str  # "bearer", "basic", "api_key", "custom_header"
    token: Optional[str] = None
    username: Optional[str] = None
    password: Optional[str] = None
    api_key: Optional[str] = None
    header_name: Optional[str] = None
    header_value: Optional[str] = None

Built-in Plugin Examples

# Self-contained PII Scanner Plugin
class PIIFilterPlugin(BasePlugin):
    """Self-contained plugin that runs regex-based PII detection in-process"""
    
    async def process(self, payload: Any, context: Dict) -> PluginResult:
        pii_found = self._scan_for_pii(payload)
        if pii_found and self.config.get("block_pii"):
            return PluginResult(
                continue_processing=False,
                error_message="PII detected in request"
            )
        
        cleaned_payload = self._mask_pii(payload) if pii_found else payload
        return PluginResult(modified_payload=cleaned_payload)

# External Service LlamaGuard Plugin  
class LlamaGuardPlugin(ExternalServicePlugin):
    """Middleware plugin that calls external LlamaGuard microservice"""
    
    async def process(self, payload: Any, context: Dict) -> PluginResult:
        # Call external LlamaGuard service
        safety_result = await self.call_external_service({
            "text": payload,
            "policy": self.config.get("policy", "default")
        })
        
        safety_score = safety_result.get("safety_score", 1.0)
        if safety_score < self.config.get("safety_threshold", 0.8):
            return PluginResult(
                continue_processing=False,
                error_message="Content blocked by safety filter",
                metadata={"safety_score": safety_score}
            )
        return PluginResult()

# External OpenAI Moderation Plugin
class OpenAIModerationPlugin(ExternalServicePlugin):
    """Middleware plugin that calls OpenAI Moderation API"""
    
    async def process(self, payload: Any, context: Dict) -> PluginResult:
        moderation_result = await self.call_external_service({
            "input": payload,
            "model": self.config.get("model", "text-moderation-stable")
        })
        
        if moderation_result.get("flagged", False):
            categories = [cat for cat, flagged in moderation_result.get("categories", {}).items() if flagged]
            return PluginResult(
                continue_processing=False,
                error_message=f"Content flagged for: {', '.join(categories)}",
                metadata={"categories": categories}
            )
        return PluginResult()

🏗️ Implementation Architecture

Plugin Manager

  • Discovery: Scan plugin directories, load from configuration
  • Lifecycle: Load, initialize, configure, health-check, unload plugins
  • Registry: Maintain plugin metadata, dependencies, and status
  • Error Handling: Graceful failure, plugin isolation, fallback behavior

Middleware Pipeline

  • Execution Order: Priority-based plugin ordering with dependency resolution
  • Context Passing: Rich context object with request metadata, user info, etc.
  • Performance: Async processing, optional caching, timeout handling
  • Monitoring: Plugin execution metrics, latency tracking, error rates

Configuration Management

  • Storage: Database-backed with environment overrides
  • Validation: JSON Schema validation for plugin configs
  • Hot Reload: Runtime configuration changes without restart
  • Import/Export: YAML/JSON format for configuration portability

📊 Performance Considerations

  • Async Processing: All plugins must be async-compatible
  • Caching Layer: Optional Redis-backed caching for expensive AI calls
  • Circuit Breaker: Auto-disable failing plugins to maintain system stability
  • Resource Limits: Memory and execution time limits per plugin
  • Parallel Execution: Independent plugins can run concurrently where possible

🔒 Security Features

  • Plugin Isolation: Sandbox plugins to prevent system access
  • Configuration Validation: Strict schema validation for all plugin configs
  • Audit Logging: Track all plugin executions and configuration changes
  • Permission Model: Role-based access to plugin management
  • Secret Management: Secure handling of API keys and credentials

📋 Roadmap Dependencies

This plugin framework is foundational infrastructure that enables multiple upcoming roadmap items:

Direct Dependencies (Framework Required)

Enhanced by Framework

Timeline Alignment

  • Target Release: v0.6.0 (August 19, 2025) - "Security, Scale & Smart Automation"
  • Completion Required Before: v0.8.0 guardrails implementation
  • Dependencies: Requires completion of v0.5.0 enterprise auth/config features

📝 Example Use Cases

  1. Guardrails Implementation: PII filtering + LlamaGuard + custom business rules
  2. Enterprise Auth: Custom SSO integration + role-based tool access
  3. Compliance Logging: Audit trail plugin for SOX/GDPR compliance
  4. Performance Optimization: Caching plugin for expensive tool calls
  5. A/B Testing: Traffic splitting plugin for testing new AI models

📣 Dependencies & Next Steps

Foundational Requirements (v0.6.0):

  • Plugin interface standardization with self-contained + external service support
  • Configuration schema design with auth/service integration
  • Admin UI wireframes for plugin management (extends existing HTMX interface)
  • Security model for plugin execution and external service calls

Integration Points:

Roadmap Enablement:
This framework is the critical path for v0.8.0 enterprise security features and must be completed in v0.6.0 to avoid roadmap delays. The self-contained plugin architecture supports immediate security needs, while external service integration enables enterprise AI safety tool integration.

Future Epics Enabled:

Sub-issues

Metadata

Metadata

Assignees

Labels

enhancementNew feature or requestpythonPython / backend development (FastAPI)triageIssues / Features awaiting triage

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions