Skip to content

[SECURITY FEATURE]: Add Security Configuration Validation and Startup Checks #534

@crivetimihai

Description

@crivetimihai

🔒 SECURITY FEATURE: Add Security Configuration Validation and Startup Checks

Summary: Implement comprehensive security configuration validation that warns about insecure settings at application startup without breaking the application. This provides visibility into security issues while allowing the application to run, making it easier to gradually improve security posture.

Scope:

Not in scope

Implementation Details:

1. Add Security Validators to config.py

Add these validators to the Settings class in mcpgateway/config.py (after the existing field validators, around line 300):

from pydantic import field_validator, model_validator
import secrets
import re
from typing import Self

class Settings(BaseSettings):
    # ... existing fields ...

    # Security validation thresholds
    min_secret_length: int = 32
    min_password_length: int = 12
    require_strong_secrets: bool = False  # Default to False for backward compatibility, will be enforced in 0.8.0
    
    @field_validator('jwt_secret_key', 'auth_encryption_secret')
    @classmethod
    def validate_secrets(cls, v: str, info) -> str:
        """Validate secret keys meet security requirements."""
        field_name = info.field_name
        
        # Check for default/weak secrets
        weak_secrets = ['my-test-key', 'my-test-salt', 'changeme', 'secret', 'password']
        if v.lower() in weak_secrets:
            logger.warning(
                f"🔓 SECURITY WARNING - {field_name}: Default/weak secret detected! "
                "Please set a strong, unique value for production."
            )
        
        # Check minimum length
        if len(v) < 32:  # Using hardcoded value since we can't access instance attributes
            logger.warning(
                f"⚠️  SECURITY WARNING - {field_name}: Secret should be at least 32 characters long. "
                f"Current length: {len(v)}"
            )
        
        # Check entropy (basic check for randomness)
        if len(set(v)) < 10:  # Less than 10 unique characters
            logger.warning(
                f"🔑 SECURITY WARNING - {field_name}: Secret has low entropy. "
                "Consider using a more random value."
            )
        
        return v
    
    @field_validator('basic_auth_password')
    @classmethod
    def validate_admin_password(cls, v: str) -> str:
        """Validate admin password meets security requirements."""
        if v == 'changeme':
            logger.warning(
                "🔓 SECURITY WARNING: Default admin password detected! "
                "Please change the BASIC_AUTH_PASSWORD immediately."
            )
        
        if len(v) < 12:  # Using hardcoded value
            logger.warning(
                f"⚠️  SECURITY WARNING: Admin password should be at least 12 characters long. "
                f"Current length: {len(v)}"
            )
        
        # Check password complexity
        has_upper = any(c.isupper() for c in v)
        has_lower = any(c.islower() for c in v)
        has_digit = any(c.isdigit() for c in v)
        has_special = bool(re.search(r'[!@#$%^&*(),.?":{}|<>]', v))
        
        complexity_score = sum([has_upper, has_lower, has_digit, has_special])
        if complexity_score < 3:
            logger.warning(
                "🔐 SECURITY WARNING: Admin password has low complexity. "
                "Should contain at least 3 of: uppercase, lowercase, digits, special characters"
            )
        
        return v
    
    @field_validator('allowed_origins')
    @classmethod
    def validate_cors_origins(cls, v: set) -> set:
        """Validate CORS allowed origins."""
        if not v:
            return v
        
        dangerous_origins = ['*', 'null', '']
        for origin in v:
            if origin in dangerous_origins:
                logger.warning(
                    f"🌐 SECURITY WARNING: Dangerous CORS origin '{origin}' detected. "
                    "Consider specifying explicit origins instead of wildcards."
                )
            
            # Validate URL format
            if not origin.startswith(('http://', 'https://')) and origin not in dangerous_origins:
                logger.warning(
                    f"⚠️  SECURITY WARNING: Invalid origin format '{origin}'. "
                    "Origins should start with http:// or https://"
                )
        
        return v
    
    @field_validator('database_url')
    @classmethod
    def validate_database_url(cls, v: str) -> str:
        """Validate database connection string security."""
        # Check for hardcoded passwords in non-SQLite databases
        if not v.startswith('sqlite'):
            if 'password' in v and any(weak in v for weak in ['password', '123', 'admin', 'test']):
                logger.warning(
                    "Potentially weak database password detected. "
                    "Consider using a stronger password."
                )
        
        # Warn about SQLite in production
        if v.startswith('sqlite'):
            logger.info(
                "Using SQLite database. Consider PostgreSQL or MySQL for production."
            )
        
        return v
    
    @model_validator(mode='after')
    def validate_security_combinations(self) -> Self:
        """Validate security setting combinations."""
        # Check for dangerous combinations - only log warnings, don't raise errors
        if not self.auth_required and self.mcpgateway_ui_enabled:
            logger.warning(
                "🔓 SECURITY WARNING: Admin UI is enabled without authentication. "
                "Consider setting AUTH_REQUIRED=true for production."
            )
        
        if self.skip_ssl_verify and not self.dev_mode:
            logger.warning(
                "🔓 SECURITY WARNING: SSL verification is disabled in non-dev mode. "
                "This is a security risk! Set SKIP_SSL_VERIFY=false for production."
            )
        
        if self.debug and not self.dev_mode:
            logger.warning(
                "🐛 SECURITY WARNING: Debug mode is enabled in non-dev mode. "
                "This may leak sensitive information! Set DEBUG=false for production."
            )
        
        # Warn about federation without auth
        if self.federation_enabled and not self.auth_required:
            logger.warning(
                "🌐 SECURITY WARNING: Federation is enabled without authentication. "
                "This may expose your gateway to unauthorized access."
            )
        
        return self
    
    def get_security_warnings(self) -> List[str]:
        """Get list of security warnings for current configuration."""
        warnings = []
        
        # Authentication warnings
        if not self.auth_required:
            warnings.append("🔓 Authentication is disabled - ensure this is intentional")
        
        if self.basic_auth_user == 'admin':
            warnings.append("⚠️  Using default admin username - consider changing it")
        
        # SSL/TLS warnings
        if self.skip_ssl_verify:
            warnings.append("🔓 SSL verification is disabled - not recommended for production")
        
        # Debug/Dev warnings
        if self.debug and not self.dev_mode:
            warnings.append("🐛 Debug mode enabled - disable in production to prevent info leakage")
        
        if self.dev_mode:
            warnings.append("🔧 Development mode enabled - not for production use")
        
        # CORS warnings
        if self.cors_enabled and '*' in self.allowed_origins:
            warnings.append("🌐 CORS allows all origins (*) - this is a security risk")
        
        # Token warnings
        if self.token_expiry > 10080:  # More than 7 days
            warnings.append("⏱️  JWT token expiry is very long - consider shorter duration")
        
        # Database warnings
        if self.database_url.startswith('sqlite') and not self.dev_mode:
            warnings.append("💾 SQLite database in use - consider PostgreSQL/MySQL for production")
        
        # Rate limiting warnings
        if self.tool_rate_limit > 1000:
            warnings.append("🚦 Tool rate limit is very high - may allow abuse")
        
        return warnings
    
    def get_security_status(self) -> dict:
        """Get comprehensive security status."""
        return {
            "secure_secrets": self.jwt_secret_key != 'my-test-key',
            "auth_enabled": self.auth_required,
            "ssl_verification": not self.skip_ssl_verify,
            "debug_disabled": not self.debug,
            "cors_restricted": '*' not in self.allowed_origins if self.cors_enabled else True,
            "ui_protected": not self.mcpgateway_ui_enabled or self.auth_required,
            "warnings": self.get_security_warnings()
        }

2. Add Startup Security Validation in main.py

Add this startup event handler in mcpgateway/main.py (after app creation, around line 450):

from fastapi import FastAPI
from contextlib import asynccontextmanager
import sys

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan manager with security validation."""
    # Startup
    try:
        # Validate security configuration
        await validate_security_configuration()
        
        # Initialize other components
        logger.info("Starting MCP Gateway...")
        
        yield
        
    finally:
        # Shutdown
        logger.info("Shutting down MCP Gateway...")

# Update app creation to use lifespan
app = FastAPI(
    title=settings.app_name,
    lifespan=lifespan,
    # ... other parameters
)

async def validate_security_configuration():
    """Validate security configuration on startup."""
    logger.info("🔒 Validating security configuration...")
    
    # Get security status
    security_status = settings.get_security_status()
    warnings = security_status['warnings']
    
    # Log warnings
    if warnings:
        logger.warning("=" * 60)
        logger.warning("🚨 SECURITY WARNINGS DETECTED:")
        logger.warning("=" * 60)
        for warning in warnings:
            logger.warning(f"  {warning}")
        logger.warning("=" * 60)
    
    # Critical security checks (fail startup only if REQUIRE_STRONG_SECRETS=true)
    critical_issues = []
    
    if settings.jwt_secret_key == 'my-test-key' and not settings.dev_mode:
        critical_issues.append(
            "Using default JWT secret in non-dev mode. "
            "Set JWT_SECRET_KEY environment variable!"
        )
    
    if settings.basic_auth_password == 'changeme' and settings.mcpgateway_ui_enabled:
        critical_issues.append(
            "Admin UI enabled with default password. "
            "Set BASIC_AUTH_PASSWORD environment variable!"
        )
    
    if not settings.auth_required and settings.federation_enabled and not settings.dev_mode:
        critical_issues.append(
            "Federation enabled without authentication in non-dev mode. "
            "This is a critical security risk!"
        )
    
    # Handle critical issues based on REQUIRE_STRONG_SECRETS setting
    if critical_issues:
        if settings.require_strong_secrets:
            logger.error("=" * 60)
            logger.error("💀 CRITICAL SECURITY ISSUES DETECTED:")
            logger.error("=" * 60)
            for issue in critical_issues:
                logger.error(f"  ❌ {issue}")
            logger.error("=" * 60)
            logger.error("Startup aborted due to REQUIRE_STRONG_SECRETS=true")
            logger.error("To proceed anyway, set REQUIRE_STRONG_SECRETS=false")
            logger.error("=" * 60)
            sys.exit(1)
        else:
            # Log as warnings if not enforcing
            logger.warning("=" * 60)
            logger.warning("⚠️  Critical security issues detected (REQUIRE_STRONG_SECRETS=false):")
            for issue in critical_issues:
                logger.warning(f"  • {issue}")
            logger.warning("=" * 60)
    
    # Log security recommendations
    if not security_status['secure_secrets'] or not security_status['auth_enabled']:
        logger.info("=" * 60)
        logger.info("📋 SECURITY RECOMMENDATIONS:")
        logger.info("=" * 60)
        
        if settings.jwt_secret_key == 'my-test-key':
            logger.info("  • Generate a strong JWT secret:")
            logger.info("    python -c 'import secrets; print(secrets.token_urlsafe(32))'")
        
        if settings.basic_auth_password == 'changeme':
            logger.info("  • Set a strong admin password in BASIC_AUTH_PASSWORD")
        
        if not settings.auth_required:
            logger.info("  • Enable authentication: AUTH_REQUIRED=true")
        
        if settings.skip_ssl_verify:
            logger.info("  • Enable SSL verification: SKIP_SSL_VERIFY=false")
        
        logger.info("=" * 60)
    
    logger.info("✅ Security validation completed")

3. Add Security Health Endpoint

Add this endpoint to mcpgateway/main.py (with other health endpoints, around line 500):

@app.get("/health/security", tags=["health"])
async def security_health(request: Request):
    """Get security configuration health status."""
    # Check authentication
    if settings.auth_required:
        # Verify the request is authenticated
        auth_header = request.headers.get("authorization")
        if not auth_header or not auth_header.startswith("Bearer "):
            raise HTTPException(401, "Authentication required for security health")
    
    security_status = settings.get_security_status()
    
    # Determine overall health
    score = security_status['security_score']
    is_healthy = score >= 60  # Minimum acceptable score
    
    # Build response
    response = {
        "status": "healthy" if is_healthy else "unhealthy",
        "score": score,
        "checks": {
            "authentication": security_status['auth_enabled'],
            "secure_secrets": security_status['secure_secrets'],
            "ssl_verification": security_status['ssl_verification'],
            "debug_disabled": security_status['debug_disabled'],
            "cors_restricted": security_status['cors_restricted'],
            "ui_protected": security_status['ui_protected']
        },
        "warning_count": len(security_status['warnings']),
        "timestamp": datetime.utcnow().isoformat()
    }
    
    # Include warnings only if authenticated or in dev mode
    if settings.dev_mode:
        response["warnings"] = security_status['warnings']
    
    return response

4. Add Environment Variables to .env.example

Add these security validation settings to .env.example:

#####################################
# Security Validation Settings
#####################################

# Minimum length for secret keys (JWT, encryption)
MIN_SECRET_LENGTH=32

# Minimum length for passwords
MIN_PASSWORD_LENGTH=12

# Enforce strong secrets (set to true to fail startup on critical issues)
# Default is false to maintain backward compatibility
REQUIRE_STRONG_SECRETS=false

# Security validation thresholds
# Set to false to allow startup with security warnings
# NOT RECOMMENDED for production!
# REQUIRE_STRONG_SECRETS=false

5. Add Security Monitoring Script

Create .github/tools/check_security.py for CI/CD integration:

#!/usr/bin/env python3
"""Security configuration checker for MCP Gateway."""

import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from mcpgateway.config import get_settings

def main():
    """Check security configuration and exit with appropriate code."""
    try:
        settings = get_settings()
        status = settings.get_security_status()
        
        print(f"Security Score: {status['security_score']}/100")
        print(f"Warnings: {len(status['warnings'])}")
        
        if status['warnings']:
            print("\nSecurity Warnings:")
            for warning in status['warnings']:
                print(f"  - {warning}")
        
        # Exit with error if score is too low
        if status['security_score'] < 60:
            print("\n❌ Security score too low for deployment")
            sys.exit(1)
        elif status['security_score'] < 80:
            print("\n⚠️  Security could be improved")
            sys.exit(0)
        else:
            print("\n✅ Security configuration looks good")
            sys.exit(0)
            
    except Exception as e:
        print(f"❌ Security validation failed: {e}")
        sys.exit(2)

if __name__ == "__main__":
    main()

Files to Modify:

  1. mcpgateway/config.py - Add security validators and methods to Settings class
  2. mcpgateway/main.py - Add startup validation and security health endpoint
  3. .env.example - Add security validation environment variables
  4. .github/tools/check_security.py - Create new security check script

Testing Checklist:

  • Test with default secrets (should fail in production mode)
  • Test with weak passwords (should fail validation)
  • Test with strong secrets (should pass)
  • Verify security warnings are logged
  • Test security health endpoint
  • Verify critical issues prevent startup
  • Test bypass with REQUIRE_STRONG_SECRETS=false
  • Run security check script in CI/CD

CI/CD Integration:

# In the CI/CD pipeline (e.g., .github/workflows/security.yml)
- name: Check Security Configuration
  run: |
    python .github/tools/check_security.py
  env:
    JWT_SECRET_KEY: ${{ secrets.JWT_SECRET_KEY }}
    BASIC_AUTH_PASSWORD: ${{ secrets.ADMIN_PASSWORD }}

Priority: High — Security misconfigurations are a leading cause of breaches. This validation ensures:

  • No default/weak credentials in production
  • Security best practices are enforced
  • Clear visibility into security posture
  • Fail-fast on critical security issues

Metadata

Metadata

Assignees

Labels

enhancementNew feature or requestpythonPython / backend development (FastAPI)securityImproves securitytriageIssues / Features awaiting triage

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions