Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,61 +27,130 @@

def upgrade() -> None:
"""Add comprehensive metadata columns to all entity tables for audit tracking."""
bind = op.get_bind()
inspector = sa.inspect(bind)

# Check if this is a fresh database without existing tables
if not inspector.has_table("gateways"):
print("Fresh database detected. Skipping metadata migration.")
return

tables = ["tools", "resources", "prompts", "servers", "gateways"]

# Define metadata columns to add
metadata_columns = [
("created_by", sa.String(), True),
("created_from_ip", sa.String(), True),
("created_via", sa.String(), True),
("created_user_agent", sa.Text(), True),
("modified_by", sa.String(), True),
("modified_from_ip", sa.String(), True),
("modified_via", sa.String(), True),
("modified_user_agent", sa.Text(), True),
("import_batch_id", sa.String(), True),
("federation_source", sa.String(), True),
("version", sa.Integer(), False, "1"), # Not nullable, with default
]

# Add columns to each table if they don't exist
for table in tables:
if inspector.has_table(table):
columns = [col["name"] for col in inspector.get_columns(table)]

for col_name, col_type, nullable, *default in metadata_columns:
if col_name not in columns:
try:
if default:
op.add_column(table, sa.Column(col_name, col_type, nullable=nullable, server_default=default[0]))
else:
op.add_column(table, sa.Column(col_name, col_type, nullable=nullable))
print(f"Added column {col_name} to {table}")
except Exception as e:
print(f"Warning: Could not add column {col_name} to {table}: {e}")

# Create indexes for query performance (safe B-tree indexes)
# Note: modified_at column doesn't exist in schema, so we skip it
index_definitions = [
("created_by", ["created_by"]),
("created_at", ["created_at"]),
("created_via", ["created_via"]),
]

for table in tables:
# Creation metadata (nullable=True for backwards compatibility)
op.add_column(table, sa.Column("created_by", sa.String(), nullable=True))
op.add_column(table, sa.Column("created_from_ip", sa.String(), nullable=True))
op.add_column(table, sa.Column("created_via", sa.String(), nullable=True))
op.add_column(table, sa.Column("created_user_agent", sa.Text(), nullable=True))

# Modification metadata (nullable=True for backwards compatibility)
op.add_column(table, sa.Column("modified_by", sa.String(), nullable=True))
op.add_column(table, sa.Column("modified_from_ip", sa.String(), nullable=True))
op.add_column(table, sa.Column("modified_via", sa.String(), nullable=True))
op.add_column(table, sa.Column("modified_user_agent", sa.Text(), nullable=True))

# Source tracking (nullable=True for backwards compatibility)
op.add_column(table, sa.Column("import_batch_id", sa.String(), nullable=True))
op.add_column(table, sa.Column("federation_source", sa.String(), nullable=True))
op.add_column(table, sa.Column("version", sa.Integer(), nullable=False, server_default="1"))

# Create indexes for query performance (PostgreSQL compatible, SQLite ignores)
try:
op.create_index(f"idx_{table}_created_by", table, ["created_by"])
op.create_index(f"idx_{table}_created_at", table, ["created_at"])
op.create_index(f"idx_{table}_modified_at", table, ["modified_at"])
op.create_index(f"idx_{table}_created_via", table, ["created_via"])
except Exception: # nosec B110 - database compatibility
# SQLite doesn't support all index types, skip silently
pass
if inspector.has_table(table):
try:
existing_indexes = [idx["name"] for idx in inspector.get_indexes(table)]
except Exception as e:
print(f"Warning: Could not get indexes for {table}: {e}")
continue

for index_suffix, columns in index_definitions:
index_name = f"idx_{table}_{index_suffix}"
if index_name not in existing_indexes:
# Check if the column exists before creating index
table_columns = [col["name"] for col in inspector.get_columns(table)]
if all(col in table_columns for col in columns):
try:
op.create_index(index_name, table, columns)
print(f"Created index {index_name}")
except Exception as e:
print(f"Warning: Could not create index {index_name}: {e}")
else:
print(f"Skipping index {index_name} - required columns {columns} not found in {table}")


def downgrade() -> None:
"""Remove comprehensive metadata columns from all entity tables."""
bind = op.get_bind()
inspector = sa.inspect(bind)

tables = ["tools", "resources", "prompts", "servers", "gateways"]

# Index names to drop (modified_at doesn't exist, so skip it)
index_suffixes = ["created_by", "created_at", "created_via"]

# Drop indexes first (if they exist)
for table in tables:
# Drop indexes first (if they exist)
try:
op.drop_index(f"idx_{table}_created_by", table)
op.drop_index(f"idx_{table}_created_at", table)
op.drop_index(f"idx_{table}_modified_at", table)
op.drop_index(f"idx_{table}_created_via", table)
except Exception: # nosec B110 - database compatibility
# Indexes might not exist on SQLite
pass

# Drop metadata columns
op.drop_column(table, "version")
op.drop_column(table, "federation_source")
op.drop_column(table, "import_batch_id")
op.drop_column(table, "modified_user_agent")
op.drop_column(table, "modified_via")
op.drop_column(table, "modified_from_ip")
op.drop_column(table, "modified_by")
op.drop_column(table, "created_user_agent")
op.drop_column(table, "created_via")
op.drop_column(table, "created_from_ip")
op.drop_column(table, "created_by")
if inspector.has_table(table):
try:
existing_indexes = [idx["name"] for idx in inspector.get_indexes(table)]
except Exception as e:
print(f"Warning: Could not get indexes for {table}: {e}")
continue

for suffix in index_suffixes:
index_name = f"idx_{table}_{suffix}"
if index_name in existing_indexes:
try:
op.drop_index(index_name, table)
print(f"Dropped index {index_name}")
except Exception as e:
print(f"Warning: Could not drop index {index_name}: {e}")

# Metadata columns to drop (in reverse order for safety)
metadata_columns = [
"version",
"federation_source",
"import_batch_id",
"modified_user_agent",
"modified_via",
"modified_from_ip",
"modified_by",
"created_user_agent",
"created_via",
"created_from_ip",
"created_by",
]

# Drop metadata columns (if they exist)
for table in reversed(tables): # Reverse order for safety
if inspector.has_table(table):
columns = [col["name"] for col in inspector.get_columns(table)]

for col_name in metadata_columns:
if col_name in columns:
try:
op.drop_column(table, col_name)
print(f"Dropped column {col_name} from {table}")
except Exception as e:
print(f"Warning: Could not drop column {col_name} from {table}: {e}")
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,36 @@

def upgrade() -> None:
"""Upgrade schema."""
# Create global_config table
op.create_table("global_config", sa.Column("id", sa.Integer(), nullable=False), sa.Column("passthrough_headers", sa.JSON(), nullable=True), sa.PrimaryKeyConstraint("id"))
bind = op.get_bind()
inspector = sa.inspect(bind)

# Add passthrough_headers column to gateways table
op.add_column("gateways", sa.Column("passthrough_headers", sa.JSON(), nullable=True))
# Check if this is a fresh database without existing tables
if not inspector.has_table("gateways"):
print("Fresh database detected. Skipping passthrough headers migration.")
return

# Create global_config table if it doesn't exist
if not inspector.has_table("global_config"):
op.create_table("global_config", sa.Column("id", sa.Integer(), nullable=False), sa.Column("passthrough_headers", sa.JSON(), nullable=True), sa.PrimaryKeyConstraint("id"))

# Add passthrough_headers column to gateways table if it doesn't exist
if inspector.has_table("gateways"):
columns = [col["name"] for col in inspector.get_columns("gateways")]
if "passthrough_headers" not in columns:
op.add_column("gateways", sa.Column("passthrough_headers", sa.JSON(), nullable=True))


def downgrade() -> None:
"""Downgrade schema."""
# Remove passthrough_headers column from gateways table
op.drop_column("gateways", "passthrough_headers")

# Drop global_config table
op.drop_table("global_config")
bind = op.get_bind()
inspector = sa.inspect(bind)

# Remove passthrough_headers column from gateways table if it exists
if inspector.has_table("gateways"):
columns = [col["name"] for col in inspector.get_columns("gateways")]
if "passthrough_headers" in columns:
op.drop_column("gateways", "passthrough_headers")

# Drop global_config table if it exists
if inspector.has_table("global_config"):
op.drop_table("global_config")
127 changes: 79 additions & 48 deletions mcpgateway/alembic/versions/add_a2a_agents_and_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def upgrade() -> None:
existing_tables = inspector.get_table_names()

if "a2a_agents" not in existing_tables:
# Create a2a_agents table
# Create a2a_agents table with unique constraints included (SQLite compatible)
op.create_table(
"a2a_agents",
sa.Column("id", sa.String(36), primary_key=True),
Expand Down Expand Up @@ -65,12 +65,10 @@ def upgrade() -> None:
sa.Column("import_batch_id", sa.String()),
sa.Column("federation_source", sa.String()),
sa.Column("version", sa.Integer(), nullable=False, server_default="1"),
sa.UniqueConstraint("name", name="uq_a2a_agents_name"),
sa.UniqueConstraint("slug", name="uq_a2a_agents_slug"),
)

# Create unique constraints
op.create_unique_constraint("uq_a2a_agents_name", "a2a_agents", ["name"])
op.create_unique_constraint("uq_a2a_agents_slug", "a2a_agents", ["slug"])

if "a2a_agent_metrics" not in existing_tables:
# Create a2a_agent_metrics table
op.create_table(
Expand All @@ -93,57 +91,90 @@ def upgrade() -> None:
)

# Create indexes for performance (check if they exist first)
existing_indexes = []
try:
existing_indexes = [idx["name"] for idx in inspector.get_indexes("a2a_agents")]
except Exception:
pass

if "idx_a2a_agents_enabled" not in existing_indexes:
# Only create indexes if tables were actually created
if "a2a_agents" in existing_tables:
try:
op.create_index("idx_a2a_agents_enabled", "a2a_agents", ["enabled"])
existing_indexes = [idx["name"] for idx in inspector.get_indexes("a2a_agents")]
except Exception:
pass
existing_indexes = []

if "idx_a2a_agents_enabled" not in existing_indexes:
try:
op.create_index("idx_a2a_agents_enabled", "a2a_agents", ["enabled"])
except Exception as e:
print(f"Warning: Could not create index idx_a2a_agents_enabled: {e}")

if "idx_a2a_agents_agent_type" not in existing_indexes:
try:
op.create_index("idx_a2a_agents_agent_type", "a2a_agents", ["agent_type"])
except Exception as e:
print(f"Warning: Could not create index idx_a2a_agents_agent_type: {e}")

if "idx_a2a_agents_agent_type" not in existing_indexes:
# Create B-tree index for tags (safer than GIN, works on both PostgreSQL and SQLite)
if "idx_a2a_agents_tags" not in existing_indexes:
try:
op.create_index("idx_a2a_agents_tags", "a2a_agents", ["tags"])
except Exception as e:
print(f"Warning: Could not create index idx_a2a_agents_tags: {e}")

# Metrics table indexes
if "a2a_agent_metrics" in existing_tables:
try:
op.create_index("idx_a2a_agents_agent_type", "a2a_agents", ["agent_type"])
existing_metrics_indexes = [idx["name"] for idx in inspector.get_indexes("a2a_agent_metrics")]
except Exception:
pass
existing_metrics_indexes = []

# Metrics table indexes
try:
existing_indexes = [idx["name"] for idx in inspector.get_indexes("a2a_agent_metrics")]
if "idx_a2a_agent_metrics_agent_id" not in existing_indexes:
op.create_index("idx_a2a_agent_metrics_agent_id", "a2a_agent_metrics", ["a2a_agent_id"])
if "idx_a2a_agent_metrics_timestamp" not in existing_indexes:
op.create_index("idx_a2a_agent_metrics_timestamp", "a2a_agent_metrics", ["timestamp"])
except Exception:
pass

# Create GIN indexes for tags on PostgreSQL (ignored on SQLite)
try:
if "idx_a2a_agents_tags" not in existing_indexes:
op.create_index("idx_a2a_agents_tags", "a2a_agents", ["tags"], postgresql_using="gin")
except Exception: # nosec B110 - database compatibility
pass # SQLite doesn't support GIN indexes
if "idx_a2a_agent_metrics_agent_id" not in existing_metrics_indexes:
try:
op.create_index("idx_a2a_agent_metrics_agent_id", "a2a_agent_metrics", ["a2a_agent_id"])
except Exception as e:
print(f"Warning: Could not create index idx_a2a_agent_metrics_agent_id: {e}")

if "idx_a2a_agent_metrics_timestamp" not in existing_metrics_indexes:
try:
op.create_index("idx_a2a_agent_metrics_timestamp", "a2a_agent_metrics", ["timestamp"])
except Exception as e:
print(f"Warning: Could not create index idx_a2a_agent_metrics_timestamp: {e}")


def downgrade() -> None:
"""Reverse the A2A agents and metrics tables."""
# Check if tables exist before trying to drop indexes/tables
conn = op.get_bind()
inspector = sa.inspect(conn)
existing_tables = inspector.get_table_names()

# Drop indexes first
try:
op.drop_index("idx_a2a_agents_tags", "a2a_agents")
except Exception: # nosec B110 - database compatibility
pass

op.drop_index("idx_a2a_agent_metrics_timestamp", "a2a_agent_metrics")
op.drop_index("idx_a2a_agent_metrics_agent_id", "a2a_agent_metrics")
op.drop_index("idx_a2a_agents_agent_type", "a2a_agents")
op.drop_index("idx_a2a_agents_enabled", "a2a_agents")

# Drop tables
op.drop_table("server_a2a_association")
op.drop_table("a2a_agent_metrics")
op.drop_table("a2a_agents")
# Drop indexes first (if they exist)
if "a2a_agents" in existing_tables:
try:
existing_indexes = [idx["name"] for idx in inspector.get_indexes("a2a_agents")]

for index_name in ["idx_a2a_agents_tags", "idx_a2a_agents_agent_type", "idx_a2a_agents_enabled"]:
if index_name in existing_indexes:
try:
op.drop_index(index_name, "a2a_agents")
except Exception as e:
print(f"Warning: Could not drop index {index_name}: {e}")
except Exception as e:
print(f"Warning: Could not get indexes for a2a_agents: {e}")

if "a2a_agent_metrics" in existing_tables:
try:
existing_metrics_indexes = [idx["name"] for idx in inspector.get_indexes("a2a_agent_metrics")]

for index_name in ["idx_a2a_agent_metrics_timestamp", "idx_a2a_agent_metrics_agent_id"]:
if index_name in existing_metrics_indexes:
try:
op.drop_index(index_name, "a2a_agent_metrics")
except Exception as e:
print(f"Warning: Could not drop index {index_name}: {e}")
except Exception as e:
print(f"Warning: Could not get indexes for a2a_agent_metrics: {e}")

# Drop tables (if they exist)
for table_name in ["server_a2a_association", "a2a_agent_metrics", "a2a_agents"]:
if table_name in existing_tables:
try:
op.drop_table(table_name)
except Exception as e:
print(f"Warning: Could not drop table {table_name}: {e}")
Loading
Loading