-
Notifications
You must be signed in to change notification settings - Fork 241
Description
🔒 SECURITY FEATURE: Add Security Configuration Validation and Startup Checks
Summary: Implement comprehensive security configuration validation that warns about insecure settings at application startup without breaking the application. This provides visibility into security issues while allowing the application to run, making it easier to gradually improve security posture.
Scope:
- Add security-focused validators to
mcpgateway/config.py
- Implement startup security checks in
mcpgateway/main.py
- Add security configuration documentation
- Links to [SECURITY FEATURE]: Simple Endpoint Feature Flags (selectively enable or disable tools, resources, prompts, servers, gateways, roots) #537 and warns if roots/prompts/resources are enabled, as they have other security implications (ex: uploading data).
Not in scope
- Advanced password checking policies, these will be implemented in [Feature Request]: Configurable Password and Secret Policy Engine [SECURITY FEATURE]: Configurable Password and Secret Policy Engine #426
Implementation Details:
1. Add Security Validators to config.py
Add these validators to the Settings class in mcpgateway/config.py
(after the existing field validators, around line 300):
from pydantic import field_validator, model_validator
import secrets
import re
from typing import Self
class Settings(BaseSettings):
# ... existing fields ...
# Security validation thresholds
min_secret_length: int = 32
min_password_length: int = 12
require_strong_secrets: bool = False # Default to False for backward compatibility, will be enforced in 0.8.0
@field_validator('jwt_secret_key', 'auth_encryption_secret')
@classmethod
def validate_secrets(cls, v: str, info) -> str:
"""Validate secret keys meet security requirements."""
field_name = info.field_name
# Check for default/weak secrets
weak_secrets = ['my-test-key', 'my-test-salt', 'changeme', 'secret', 'password']
if v.lower() in weak_secrets:
logger.warning(
f"🔓 SECURITY WARNING - {field_name}: Default/weak secret detected! "
"Please set a strong, unique value for production."
)
# Check minimum length
if len(v) < 32: # Using hardcoded value since we can't access instance attributes
logger.warning(
f"⚠️ SECURITY WARNING - {field_name}: Secret should be at least 32 characters long. "
f"Current length: {len(v)}"
)
# Check entropy (basic check for randomness)
if len(set(v)) < 10: # Less than 10 unique characters
logger.warning(
f"🔑 SECURITY WARNING - {field_name}: Secret has low entropy. "
"Consider using a more random value."
)
return v
@field_validator('basic_auth_password')
@classmethod
def validate_admin_password(cls, v: str) -> str:
"""Validate admin password meets security requirements."""
if v == 'changeme':
logger.warning(
"🔓 SECURITY WARNING: Default admin password detected! "
"Please change the BASIC_AUTH_PASSWORD immediately."
)
if len(v) < 12: # Using hardcoded value
logger.warning(
f"⚠️ SECURITY WARNING: Admin password should be at least 12 characters long. "
f"Current length: {len(v)}"
)
# Check password complexity
has_upper = any(c.isupper() for c in v)
has_lower = any(c.islower() for c in v)
has_digit = any(c.isdigit() for c in v)
has_special = bool(re.search(r'[!@#$%^&*(),.?":{}|<>]', v))
complexity_score = sum([has_upper, has_lower, has_digit, has_special])
if complexity_score < 3:
logger.warning(
"🔐 SECURITY WARNING: Admin password has low complexity. "
"Should contain at least 3 of: uppercase, lowercase, digits, special characters"
)
return v
@field_validator('allowed_origins')
@classmethod
def validate_cors_origins(cls, v: set) -> set:
"""Validate CORS allowed origins."""
if not v:
return v
dangerous_origins = ['*', 'null', '']
for origin in v:
if origin in dangerous_origins:
logger.warning(
f"🌐 SECURITY WARNING: Dangerous CORS origin '{origin}' detected. "
"Consider specifying explicit origins instead of wildcards."
)
# Validate URL format
if not origin.startswith(('http://', 'https://')) and origin not in dangerous_origins:
logger.warning(
f"⚠️ SECURITY WARNING: Invalid origin format '{origin}'. "
"Origins should start with http:// or https://"
)
return v
@field_validator('database_url')
@classmethod
def validate_database_url(cls, v: str) -> str:
"""Validate database connection string security."""
# Check for hardcoded passwords in non-SQLite databases
if not v.startswith('sqlite'):
if 'password' in v and any(weak in v for weak in ['password', '123', 'admin', 'test']):
logger.warning(
"Potentially weak database password detected. "
"Consider using a stronger password."
)
# Warn about SQLite in production
if v.startswith('sqlite'):
logger.info(
"Using SQLite database. Consider PostgreSQL or MySQL for production."
)
return v
@model_validator(mode='after')
def validate_security_combinations(self) -> Self:
"""Validate security setting combinations."""
# Check for dangerous combinations - only log warnings, don't raise errors
if not self.auth_required and self.mcpgateway_ui_enabled:
logger.warning(
"🔓 SECURITY WARNING: Admin UI is enabled without authentication. "
"Consider setting AUTH_REQUIRED=true for production."
)
if self.skip_ssl_verify and not self.dev_mode:
logger.warning(
"🔓 SECURITY WARNING: SSL verification is disabled in non-dev mode. "
"This is a security risk! Set SKIP_SSL_VERIFY=false for production."
)
if self.debug and not self.dev_mode:
logger.warning(
"🐛 SECURITY WARNING: Debug mode is enabled in non-dev mode. "
"This may leak sensitive information! Set DEBUG=false for production."
)
# Warn about federation without auth
if self.federation_enabled and not self.auth_required:
logger.warning(
"🌐 SECURITY WARNING: Federation is enabled without authentication. "
"This may expose your gateway to unauthorized access."
)
return self
def get_security_warnings(self) -> List[str]:
"""Get list of security warnings for current configuration."""
warnings = []
# Authentication warnings
if not self.auth_required:
warnings.append("🔓 Authentication is disabled - ensure this is intentional")
if self.basic_auth_user == 'admin':
warnings.append("⚠️ Using default admin username - consider changing it")
# SSL/TLS warnings
if self.skip_ssl_verify:
warnings.append("🔓 SSL verification is disabled - not recommended for production")
# Debug/Dev warnings
if self.debug and not self.dev_mode:
warnings.append("🐛 Debug mode enabled - disable in production to prevent info leakage")
if self.dev_mode:
warnings.append("🔧 Development mode enabled - not for production use")
# CORS warnings
if self.cors_enabled and '*' in self.allowed_origins:
warnings.append("🌐 CORS allows all origins (*) - this is a security risk")
# Token warnings
if self.token_expiry > 10080: # More than 7 days
warnings.append("⏱️ JWT token expiry is very long - consider shorter duration")
# Database warnings
if self.database_url.startswith('sqlite') and not self.dev_mode:
warnings.append("💾 SQLite database in use - consider PostgreSQL/MySQL for production")
# Rate limiting warnings
if self.tool_rate_limit > 1000:
warnings.append("🚦 Tool rate limit is very high - may allow abuse")
return warnings
def get_security_status(self) -> dict:
"""Get comprehensive security status."""
return {
"secure_secrets": self.jwt_secret_key != 'my-test-key',
"auth_enabled": self.auth_required,
"ssl_verification": not self.skip_ssl_verify,
"debug_disabled": not self.debug,
"cors_restricted": '*' not in self.allowed_origins if self.cors_enabled else True,
"ui_protected": not self.mcpgateway_ui_enabled or self.auth_required,
"warnings": self.get_security_warnings()
}
2. Add Startup Security Validation in main.py
Add this startup event handler in mcpgateway/main.py
(after app creation, around line 450):
from fastapi import FastAPI
from contextlib import asynccontextmanager
import sys
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager with security validation."""
# Startup
try:
# Validate security configuration
await validate_security_configuration()
# Initialize other components
logger.info("Starting MCP Gateway...")
yield
finally:
# Shutdown
logger.info("Shutting down MCP Gateway...")
# Update app creation to use lifespan
app = FastAPI(
title=settings.app_name,
lifespan=lifespan,
# ... other parameters
)
async def validate_security_configuration():
"""Validate security configuration on startup."""
logger.info("🔒 Validating security configuration...")
# Get security status
security_status = settings.get_security_status()
warnings = security_status['warnings']
# Log warnings
if warnings:
logger.warning("=" * 60)
logger.warning("🚨 SECURITY WARNINGS DETECTED:")
logger.warning("=" * 60)
for warning in warnings:
logger.warning(f" {warning}")
logger.warning("=" * 60)
# Critical security checks (fail startup only if REQUIRE_STRONG_SECRETS=true)
critical_issues = []
if settings.jwt_secret_key == 'my-test-key' and not settings.dev_mode:
critical_issues.append(
"Using default JWT secret in non-dev mode. "
"Set JWT_SECRET_KEY environment variable!"
)
if settings.basic_auth_password == 'changeme' and settings.mcpgateway_ui_enabled:
critical_issues.append(
"Admin UI enabled with default password. "
"Set BASIC_AUTH_PASSWORD environment variable!"
)
if not settings.auth_required and settings.federation_enabled and not settings.dev_mode:
critical_issues.append(
"Federation enabled without authentication in non-dev mode. "
"This is a critical security risk!"
)
# Handle critical issues based on REQUIRE_STRONG_SECRETS setting
if critical_issues:
if settings.require_strong_secrets:
logger.error("=" * 60)
logger.error("💀 CRITICAL SECURITY ISSUES DETECTED:")
logger.error("=" * 60)
for issue in critical_issues:
logger.error(f" ❌ {issue}")
logger.error("=" * 60)
logger.error("Startup aborted due to REQUIRE_STRONG_SECRETS=true")
logger.error("To proceed anyway, set REQUIRE_STRONG_SECRETS=false")
logger.error("=" * 60)
sys.exit(1)
else:
# Log as warnings if not enforcing
logger.warning("=" * 60)
logger.warning("⚠️ Critical security issues detected (REQUIRE_STRONG_SECRETS=false):")
for issue in critical_issues:
logger.warning(f" • {issue}")
logger.warning("=" * 60)
# Log security recommendations
if not security_status['secure_secrets'] or not security_status['auth_enabled']:
logger.info("=" * 60)
logger.info("📋 SECURITY RECOMMENDATIONS:")
logger.info("=" * 60)
if settings.jwt_secret_key == 'my-test-key':
logger.info(" • Generate a strong JWT secret:")
logger.info(" python -c 'import secrets; print(secrets.token_urlsafe(32))'")
if settings.basic_auth_password == 'changeme':
logger.info(" • Set a strong admin password in BASIC_AUTH_PASSWORD")
if not settings.auth_required:
logger.info(" • Enable authentication: AUTH_REQUIRED=true")
if settings.skip_ssl_verify:
logger.info(" • Enable SSL verification: SKIP_SSL_VERIFY=false")
logger.info("=" * 60)
logger.info("✅ Security validation completed")
3. Add Security Health Endpoint
Add this endpoint to mcpgateway/main.py
(with other health endpoints, around line 500):
@app.get("/health/security", tags=["health"])
async def security_health(request: Request):
"""Get security configuration health status."""
# Check authentication
if settings.auth_required:
# Verify the request is authenticated
auth_header = request.headers.get("authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise HTTPException(401, "Authentication required for security health")
security_status = settings.get_security_status()
# Determine overall health
score = security_status['security_score']
is_healthy = score >= 60 # Minimum acceptable score
# Build response
response = {
"status": "healthy" if is_healthy else "unhealthy",
"score": score,
"checks": {
"authentication": security_status['auth_enabled'],
"secure_secrets": security_status['secure_secrets'],
"ssl_verification": security_status['ssl_verification'],
"debug_disabled": security_status['debug_disabled'],
"cors_restricted": security_status['cors_restricted'],
"ui_protected": security_status['ui_protected']
},
"warning_count": len(security_status['warnings']),
"timestamp": datetime.utcnow().isoformat()
}
# Include warnings only if authenticated or in dev mode
if settings.dev_mode:
response["warnings"] = security_status['warnings']
return response
4. Add Environment Variables to .env.example
Add these security validation settings to .env.example
:
#####################################
# Security Validation Settings
#####################################
# Minimum length for secret keys (JWT, encryption)
MIN_SECRET_LENGTH=32
# Minimum length for passwords
MIN_PASSWORD_LENGTH=12
# Enforce strong secrets (set to true to fail startup on critical issues)
# Default is false to maintain backward compatibility
REQUIRE_STRONG_SECRETS=false
# Security validation thresholds
# Set to false to allow startup with security warnings
# NOT RECOMMENDED for production!
# REQUIRE_STRONG_SECRETS=false
5. Add Security Monitoring Script
Create .github/tools/check_security.py
for CI/CD integration:
#!/usr/bin/env python3
"""Security configuration checker for MCP Gateway."""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from mcpgateway.config import get_settings
def main():
"""Check security configuration and exit with appropriate code."""
try:
settings = get_settings()
status = settings.get_security_status()
print(f"Security Score: {status['security_score']}/100")
print(f"Warnings: {len(status['warnings'])}")
if status['warnings']:
print("\nSecurity Warnings:")
for warning in status['warnings']:
print(f" - {warning}")
# Exit with error if score is too low
if status['security_score'] < 60:
print("\n❌ Security score too low for deployment")
sys.exit(1)
elif status['security_score'] < 80:
print("\n⚠️ Security could be improved")
sys.exit(0)
else:
print("\n✅ Security configuration looks good")
sys.exit(0)
except Exception as e:
print(f"❌ Security validation failed: {e}")
sys.exit(2)
if __name__ == "__main__":
main()
Files to Modify:
mcpgateway/config.py
- Add security validators and methods to Settings classmcpgateway/main.py
- Add startup validation and security health endpoint.env.example
- Add security validation environment variables.github/tools/check_security.py
- Create new security check script
Testing Checklist:
- Test with default secrets (should fail in production mode)
- Test with weak passwords (should fail validation)
- Test with strong secrets (should pass)
- Verify security warnings are logged
- Test security health endpoint
- Verify critical issues prevent startup
- Test bypass with REQUIRE_STRONG_SECRETS=false
- Run security check script in CI/CD
CI/CD Integration:
# In the CI/CD pipeline (e.g., .github/workflows/security.yml)
- name: Check Security Configuration
run: |
python .github/tools/check_security.py
env:
JWT_SECRET_KEY: ${{ secrets.JWT_SECRET_KEY }}
BASIC_AUTH_PASSWORD: ${{ secrets.ADMIN_PASSWORD }}
Priority: High — Security misconfigurations are a leading cause of breaches. This validation ensures:
- No default/weak credentials in production
- Security best practices are enforced
- Clear visibility into security posture
- Fail-fast on critical security issues