-
Notifications
You must be signed in to change notification settings - Fork 243
Description
Title: A2A Initial Support - Add A2A Servers as Tools
Goal: Enable the MCP Gateway to discover, register, and interact with A2A (Agent2Agent) servers by automatically converting A2A skills into MCP tools, providing seamless interoperability between MCP and A2A protocols
Why now: The A2A protocol provides a standard for AI agent collaboration. By supporting A2A servers, the MCP Gateway becomes a bridge between MCP and A2A ecosystems, enabling MCP clients to access A2A agent capabilities through familiar tool interfaces.
🎯 Feature Overview
This feature transforms the MCP Gateway into an A2A client that can discover, register, and interact with A2A servers. It automatically converts A2A skills into MCP tools, enabling seamless integration between the two protocols:
- A2A Server Discovery - Discover A2A servers via Agent Cards (well-known URIs, registries, manual configuration)
- Automatic Tool Generation - Convert A2A skills into MCP-compliant tools with proper schemas
- Task Management - Handle A2A task lifecycle including async operations, streaming, and notifications
- Authentication Bridge - Support A2A authentication schemes (OAuth2, Bearer tokens, API keys)
- Protocol Translation - Translate between MCP tool calls and A2A task messages
- Lifecycle Management - Monitor A2A server health, handle reconnections, and manage task states
- Admin UI Integration - Manage A2A servers through existing admin interface
🙋♂️ User Stories
Story 1 — A2A Server Discovery & Registration
As an Administrator
I want to register A2A servers and automatically discover their capabilities
So that their skills become available as MCP tools in my gateway.
✅ Acceptance Criteria
Scenario: Register A2A server via Agent Card URL
Given I have an A2A server running at "https://agent.example.com"
When I register the server with URL "https://agent.example.com/.well-known/agent.json"
Then the system fetches the Agent Card
And discovers the server's skills and capabilities
And creates MCP tools for each A2A skill
And the tools appear in the global tools catalog
And the A2A server appears in the servers list
Scenario: Automatic skill-to-tool conversion
Given an A2A server with skills:
- "route-optimizer": Route planning and optimization
- "traffic-analyzer": Real-time traffic analysis
When the server is registered
Then two MCP tools are created:
- "route-optimizer" tool with appropriate input schema
- "traffic-analyzer" tool with appropriate input schema
And both tools have proper descriptions from the Agent Card
And tools inherit input/output modes from the A2A skills
Story 2 — MCP Tool to A2A Task Translation
As a Developer
I want to invoke A2A skills through standard MCP tool calls
So that I can use A2A agents without changing my MCP client code.
✅ Acceptance Criteria
Scenario: Invoke A2A skill via MCP tool call
Given an A2A server registered with skill "route-optimizer"
When I invoke the MCP tool with parameters:
```json
{
"origin": "New York, NY",
"destination": "Boston, MA",
"optimize_for": "time"
}
```
Then the gateway creates an A2A task with the parameters
And sends a tasks/send request to the A2A server
And waits for the task completion
And returns the A2A artifacts as MCP tool results
And records metrics for the tool invocation
Scenario: Handle A2A streaming responses
Given an A2A server with streaming capability enabled
When I invoke a tool that generates streaming output
Then the gateway subscribes to A2A SSE updates via tasks/sendSubscribe
And streams incremental updates back to the MCP client
And properly handles streaming artifacts and status updates
And completes when the A2A task reaches terminal state
Story 3 — A2A Authentication Integration
As an Administrator
I want to configure authentication for A2A servers
So that the gateway can securely access protected A2A agents.
✅ Acceptance Criteria
Scenario: Configure OAuth2 authentication for A2A server
Given an A2A server requiring OAuth2 authentication
When I configure the server with OAuth2 credentials:
- Client ID and secret
- Authorization URL and token URL
- Required scopes
Then the gateway obtains access tokens via OAuth2 flow
And includes Bearer tokens in A2A requests
And automatically refreshes expired tokens
And handles authentication errors gracefully
Scenario: Support multiple authentication schemes
Given A2A servers with different auth requirements
When I register servers with:
- OAuth2 authentication
- Bearer token authentication
- API key authentication
- No authentication required
Then the gateway handles each authentication method correctly
And stores credentials securely
And applies appropriate auth headers to A2A requests
Story 4 — Async Task Management
As a System
I want to handle long-running A2A tasks efficiently
So that MCP clients can access time-intensive A2A agent capabilities.
✅ Acceptance Criteria
Scenario: Handle long-running A2A task
Given an A2A skill that takes 5+ minutes to complete
When I invoke the corresponding MCP tool
Then the gateway creates an A2A task
And polls the task status via tasks/get requests
And updates task progress in the background
And returns results when the A2A task completes
And provides appropriate timeout handling
Scenario: Support A2A push notifications
Given an A2A server supporting push notifications
When I invoke a long-running tool
Then the gateway configures a webhook for task updates
And receives push notifications as the task progresses
And updates internal task state based on notifications
And completes the MCP tool call when A2A task finishes
And validates notification authenticity
Story 5 — A2A Server Health & Lifecycle
As an Administrator
I want automatic monitoring of A2A server health and capabilities
So that I can ensure reliable service and handle server changes.
✅ Acceptance Criteria
Scenario: Monitor A2A server health
Given registered A2A servers
When the health monitoring runs
Then the system periodically fetches updated Agent Cards
And detects changes in server capabilities
And updates tool definitions when skills change
And marks servers as unhealthy if Agent Card fetch fails
And disables tools for unhealthy servers
And re-enables tools when servers recover
Scenario: Handle A2A server capability changes
Given an A2A server with existing registered skills
When the server adds new skills or removes existing ones
Then the gateway detects the changes during health checks
And creates new MCP tools for added skills
And marks tools as inactive for removed skills
And updates tool schemas if skill definitions change
And logs all capability changes for audit purposes
Story 6 — Error Handling & Resilience
As a System
I want robust error handling for A2A protocol interactions
So that MCP clients receive meaningful errors and the system remains stable.
✅ Acceptance Criteria
Scenario: Handle A2A task failures gracefully
Given an A2A task that fails during execution
When the A2A server returns a task with state "failed"
Then the gateway extracts error information from the task status
And converts A2A error details to MCP tool error format
And includes original A2A error codes and messages
And returns appropriate HTTP status codes to MCP clients
And logs detailed error information for debugging
Scenario: Handle A2A server connectivity issues
Given an A2A server that becomes unreachable
When MCP tools try to invoke skills on that server
Then the gateway detects connection failures
And retries requests with exponential backoff
And eventually returns timeout errors to MCP clients
And marks the server as unhealthy
And provides clear error messages about server unavailability
🏗️ Architecture
A2A Integration Architecture
flowchart TD
subgraph "MCP Gateway Core"
UI[Admin UI] --> AS[A2A Service]
API[MCP Tool API] --> AS
AS --> TM[Task Manager]
AS --> AM[Auth Manager]
AS --> SM[Session Manager]
AS --> DB[(Gateway Database)]
end
subgraph "A2A Client Layer"
AS --> AC[A2A Client]
AC --> DM[Discovery Manager]
AC --> PM[Protocol Manager]
AC --> HM[Health Monitor]
end
subgraph "A2A Protocol Handlers"
PM --> SH[Sync Handler]
PM --> AH[Async Handler]
PM --> ST[Streaming Handler]
PM --> NH[Notification Handler]
end
subgraph "External A2A Servers"
AS1[A2A Server 1<br/>Route Planner]
AS2[A2A Server 2<br/>Data Analyzer]
AS3[A2A Server 3<br/>Content Generator]
end
subgraph "Integration Layer"
AS --> TG[Tool Generator]
AS --> PT[Protocol Translator]
AS --> ER[Error Resolver]
end
DM --> |Agent Card Discovery| AS1
DM --> |Agent Card Discovery| AS2
DM --> |Agent Card Discovery| AS3
SH --> |tasks/send| AS1
AH --> |tasks/get polling| AS2
ST --> |tasks/sendSubscribe| AS3
NH --> |webhook notifications| AS1
TG --> |Generate MCP Tools| DB
PT --> |MCP ↔ A2A Translation| PM
ER --> |Error Mapping| API
classDef gateway fill:#e3f2fd,stroke:#1976d2;
classDef a2a fill:#f3e5f5,stroke:#7b1fa2;
classDef protocol fill:#e8f5e8,stroke:#388e3c;
classDef external fill:#fff3e0,stroke:#f57c00;
class AS,TM,AM,SM gateway
class AC,DM,PM,HM a2a
class SH,AH,ST,NH protocol
class AS1,AS2,AS3 external
A2A Task Execution Flow
sequenceDiagram
participant MCP as MCP Client
participant GW as MCP Gateway
participant A2A as A2A Server
participant WH as Webhook Handler
MCP ->> GW: invoke_tool("route-optimizer", params)
GW ->> GW: translate_mcp_to_a2a_task()
GW ->> A2A: POST /a2a/v1 (tasks/send)
alt Quick Response
A2A -->> GW: Task completed
GW ->> GW: translate_a2a_to_mcp_result()
GW -->> MCP: tool_result
else Long Running Task
A2A -->> GW: Task submitted (working state)
GW ->> A2A: Configure push notification
GW ->> GW: start_background_polling()
loop Background Monitoring
A2A ->> WH: POST webhook (task update)
WH ->> GW: update_task_status()
GW ->> GW: check_task_completion()
end
A2A ->> WH: POST webhook (task completed)
WH ->> GW: task_completed_notification()
GW ->> GW: finalize_mcp_result()
GW -->> MCP: tool_result
end
💻 Implementation Design
Core A2A Integration Components
class A2AService:
"""Main service for A2A server integration and management"""
def __init__(self):
self.a2a_client = A2AClient()
self.task_manager = A2ATaskManager()
self.auth_manager = A2AAuthManager()
self.tool_generator = A2AToolGenerator()
self.discovery_manager = A2ADiscoveryManager()
self.health_monitor = A2AHealthMonitor()
async def register_a2a_server(
self,
server_config: A2AServerConfig
) -> A2AServerRegistration:
"""Register a new A2A server and create corresponding MCP tools"""
# Discover server capabilities
agent_card = await self.discovery_manager.fetch_agent_card(
server_config.agent_card_url
)
# Validate A2A server compatibility
await self._validate_a2a_compatibility(agent_card)
# Configure authentication
auth_config = await self.auth_manager.configure_authentication(
agent_card.authentication,
server_config.credentials
)
# Create A2A server record
a2a_server = A2AServer(
id=self._generate_server_id(),
name=agent_card.name,
description=agent_card.description,
url=agent_card.url,
version=agent_card.version,
capabilities=agent_card.capabilities,
authentication=auth_config,
skills=agent_card.skills,
status="active",
created_at=datetime.utcnow()
)
# Generate MCP tools for each A2A skill
mcp_tools = []
for skill in agent_card.skills:
tool = await self.tool_generator.create_tool_from_skill(
a2a_server,
skill
)
mcp_tools.append(tool)
# Store server and tools
await self._store_a2a_server(a2a_server, mcp_tools)
# Start health monitoring
await self.health_monitor.start_monitoring(a2a_server)
return A2AServerRegistration(
server=a2a_server,
tools_created=len(mcp_tools),
tools=mcp_tools
)
async def invoke_a2a_skill(
self,
tool_name: str,
parameters: Dict[str, Any],
user_context: UserContext
) -> ToolResult:
"""Invoke an A2A skill through MCP tool interface"""
# Find A2A server and skill for this tool
tool_mapping = await self._get_tool_mapping(tool_name)
if not tool_mapping:
raise ToolNotFoundError(f"Tool {tool_name} not found")
a2a_server = tool_mapping.a2a_server
skill = tool_mapping.skill
# Create A2A task
task = await self._create_a2a_task(
a2a_server,
skill,
parameters,
user_context
)
# Execute task based on server capabilities
if a2a_server.capabilities.streaming:
return await self._execute_streaming_task(task)
elif a2a_server.capabilities.push_notifications:
return await self._execute_async_task_with_notifications(task)
else:
return await self._execute_sync_task(task)
class A2AClient:
"""A2A protocol client implementation"""
def __init__(self):
self.http_client = httpx.AsyncClient()
self.session_manager = A2ASessionManager()
async def send_task(
self,
server: A2AServer,
task_params: A2ATaskParams
) -> A2ATaskResponse:
"""Send a task using A2A tasks/send method"""
# Prepare JSON-RPC request
request = {
"jsonrpc": "2.0",
"id": self._generate_request_id(),
"method": "tasks/send",
"params": task_params.model_dump()
}
# Add authentication
headers = await self._get_auth_headers(server)
# Send request
response = await self.http_client.post(
server.url,
json=request,
headers=headers,
timeout=30.0
)
response.raise_for_status()
# Parse JSON-RPC response
rpc_response = response.json()
if "error" in rpc_response:
raise A2ATaskError(rpc_response["error"])
return A2ATaskResponse.model_validate(rpc_response["result"])
async def send_task_subscribe(
self,
server: A2AServer,
task_params: A2ATaskParams
) -> AsyncGenerator[A2ATaskUpdate, None]:
"""Send task and subscribe to SSE updates"""
request = {
"jsonrpc": "2.0",
"id": self._generate_request_id(),
"method": "tasks/sendSubscribe",
"params": task_params.model_dump()
}
headers = await self._get_auth_headers(server)
headers["Accept"] = "text/event-stream"
async with self.http_client.stream(
"POST",
server.url,
json=request,
headers=headers
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith("data: "):
try:
data = json.loads(line[6:]) # Remove "data: " prefix
if "result" in data:
yield A2ATaskUpdate.model_validate(data["result"])
except json.JSONDecodeError:
continue
async def get_task(
self,
server: A2AServer,
task_id: str
) -> A2ATaskResponse:
"""Get current task state using A2A tasks/get method"""
request = {
"jsonrpc": "2.0",
"id": self._generate_request_id(),
"method": "tasks/get",
"params": {"id": task_id}
}
headers = await self._get_auth_headers(server)
response = await self.http_client.post(
server.url,
json=request,
headers=headers
)
response.raise_for_status()
rpc_response = response.json()
if "error" in rpc_response:
raise A2ATaskError(rpc_response["error"])
return A2ATaskResponse.model_validate(rpc_response["result"])
class A2AToolGenerator:
"""Generates MCP tools from A2A skills"""
async def create_tool_from_skill(
self,
a2a_server: A2AServer,
skill: A2ASkill
) -> ToolCreate:
"""Convert an A2A skill into an MCP tool definition"""
# Generate tool name (ensure uniqueness)
tool_name = f"a2a_{a2a_server.name}_{skill.id}".lower().replace(" ", "_")
tool_name = re.sub(r"[^a-z0-9_]", "", tool_name)
# Build input schema from skill information
input_schema = await self._build_input_schema(skill)
# Create tool configuration
tool_config = ToolCreate(
name=tool_name,
description=skill.description or f"A2A skill: {skill.name}",
url=a2a_server.url,
integration_type="A2A", # New integration type for A2A
request_type="A2A_TASK", # New request type for A2A tasks
input_schema=input_schema,
annotations={
"a2a_server_id": a2a_server.id,
"a2a_skill_id": skill.id,
"a2a_skill_name": skill.name,
"supports_streaming": a2a_server.capabilities.streaming,
"supports_push_notifications": a2a_server.capabilities.push_notifications
},
auth=await self._convert_a2a_auth_to_mcp_auth(a2a_server.authentication)
)
return tool_config
async def _build_input_schema(self, skill: A2ASkill) -> Dict[str, Any]:
"""Build JSON schema for A2A skill input"""
# Start with basic schema
schema = {
"type": "object",
"properties": {},
"required": []
}
# Add common A2A task fields
schema["properties"]["message"] = {
"type": "string",
"description": "Main instruction or query for the A2A agent"
}
schema["required"].append("message")
# Add skill-specific properties if available
if skill.input_modes:
schema["properties"]["input_mode"] = {
"type": "string",
"enum": skill.input_modes,
"description": "Preferred input content type"
}
# Add session grouping support
schema["properties"]["session_id"] = {
"type": "string",
"description": "Optional session ID to group related tasks"
}
# Add metadata support
schema["properties"]["metadata"] = {
"type": "object",
"description": "Additional metadata for the task"
}
return schema
class A2ATaskManager:
"""Manages A2A task lifecycle and state tracking"""
def __init__(self):
self.active_tasks: Dict[str, A2ATask] = {}
self.task_results: Dict[str, Any] = {}
async def execute_sync_task(
self,
server: A2AServer,
skill: A2ASkill,
parameters: Dict[str, Any]
) -> ToolResult:
"""Execute A2A task synchronously"""
# Create A2A task parameters
task_params = A2ATaskParams(
id=self._generate_task_id(),
session_id=parameters.get("session_id"),
message=A2AMessage(
role="user",
parts=[A2ATextPart(
type="text",
text=parameters.get("message", "")
)]
),
metadata=parameters.get("metadata")
)
# Send task to A2A server
task_response = await self.a2a_client.send_task(server, task_params)
# Handle immediate completion
if task_response.status.state in ["completed", "failed", "canceled"]:
return await self._convert_a2a_response_to_tool_result(task_response)
# Poll for completion
return await self._poll_task_completion(server, task_params.id)
async def execute_streaming_task(
self,
server: A2AServer,
skill: A2ASkill,
parameters: Dict[str, Any]
) -> ToolResult:
"""Execute A2A task with streaming support"""
task_params = A2ATaskParams(
id=self._generate_task_id(),
message=A2AMessage(
role="user",
parts=[A2ATextPart(
type="text",
text=parameters.get("message", "")
)]
)
)
# Collect streaming updates
collected_artifacts = []
final_status = None
async for update in self.a2a_client.send_task_subscribe(server, task_params):
if hasattr(update, 'artifact') and update.artifact:
collected_artifacts.append(update.artifact)
if hasattr(update, 'status') and update.status:
final_status = update.status
if getattr(update, 'final', False):
break
# Convert collected results to tool result
return await self._convert_streaming_results_to_tool_result(
collected_artifacts,
final_status
)
async def _convert_a2a_response_to_tool_result(
self,
task_response: A2ATaskResponse
) -> ToolResult:
"""Convert A2A task response to MCP tool result"""
if task_response.status.state == "failed":
error_message = "A2A task failed"
if task_response.status.message:
error_message = f"A2A task failed: {task_response.status.message.parts[0].text}"
return ToolResult(
content=[TextContent(
type="text",
text=error_message
)],
isError=True
)
# Extract content from artifacts
content_parts = []
if task_response.artifacts:
for artifact in task_response.artifacts:
for part in artifact.parts:
if part.type == "text":
content_parts.append(TextContent(
type="text",
text=part.text
))
elif part.type == "data":
content_parts.append(TextContent(
type="text",
text=json.dumps(part.data, indent=2)
))
# Fallback to status message if no artifacts
if not content_parts and task_response.status.message:
for part in task_response.status.message.parts:
if part.type == "text":
content_parts.append(TextContent(
type="text",
text=part.text
))
# Default empty result if nothing found
if not content_parts:
content_parts.append(TextContent(
type="text",
text="Task completed successfully (no output)"
))
return ToolResult(
content=content_parts,
isError=False
)
Database Schema Extensions
class A2AServer(Base):
"""A2A server registration and configuration"""
__tablename__ = "a2a_servers"
id: Mapped[str] = mapped_column(String, primary_key=True)
name: Mapped[str] = mapped_column(String, nullable=False)
description: Mapped[Optional[str]] = mapped_column(Text)
url: Mapped[str] = mapped_column(String, nullable=False)
agent_card_url: Mapped[str] = mapped_column(String, nullable=False)
version: Mapped[str] = mapped_column(String, nullable=False)
# Capabilities
capabilities: Mapped[Dict[str, Any]] = mapped_column(JSON)
# Authentication configuration
authentication_scheme: Mapped[str] = mapped_column(String) # oauth2, bearer, apikey, none
authentication_config: Mapped[Dict[str, Any]] = mapped_column(JSON)
# Skills
skills: Mapped[List[Dict[str, Any]]] = mapped_column(JSON)
# Status and health
status: Mapped[str] = mapped_column(String, default="active") # active, inactive, unhealthy
last_health_check: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
health_check_error: Mapped[Optional[str]] = mapped_column(Text)
# Timestamps
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utc_now)
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utc_now, onupdate=utc_now)
# Relationships
tools: Mapped[List["Tool"]] = relationship("Tool", back_populates="a2a_server")
tasks: Mapped[List["A2ATask"]] = relationship("A2ATask", back_populates="server")
class A2ATask(Base):
"""A2A task tracking and state management"""
__tablename__ = "a2a_tasks"
id: Mapped[str] = mapped_column(String, primary_key=True) # A2A task ID
server_id: Mapped[str] = mapped_column(String, ForeignKey("a2a_servers.id"), nullable=False)
tool_id: Mapped[str] = mapped_column(String, ForeignKey("tools.id"), nullable=False)
user_id: Mapped[str] = mapped_column(String, nullable=False)
# A2A task details
session_id: Mapped[Optional[str]] = mapped_column(String)
skill_id: Mapped[str] = mapped_column(String, nullable=False)
# Task state
state: Mapped[str] = mapped_column(String, default="submitted") # A2A task states
status_message: Mapped[Optional[str]] = mapped_column(Text)
# Input/Output
input_parameters: Mapped[Dict[str, Any]] = mapped_column(JSON)
output_artifacts: Mapped[Optional[List[Dict[str, Any]]]] = mapped_column(JSON)
# Execution tracking
started_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utc_now)
completed_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
execution_time: Mapped[Optional[float]] = mapped_column(Float) # seconds
# Error tracking
error_code: Mapped[Optional[str]] = mapped_column(String)
error_message: Mapped[Optional[str]] = mapped_column(Text)
# Metadata
metadata: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSON)
# Relationships
server: Mapped["A2AServer"] = relationship("A2AServer", back_populates="tasks")
tool: Mapped["Tool"] = relationship("Tool")
# Indexes
__table_args__ = (
Index('idx_a2a_tasks_server', 'server_id'),
Index('idx_a2a_tasks_user', 'user_id'),
Index('idx_a2a_tasks_state', 'state'),
Index('idx_a2a_tasks_started', 'started_at'),
)
# Extend existing Tool model with A2A support
class Tool(Base):
# ... existing fields ...
# A2A integration fields
a2a_server_id: Mapped[Optional[str]] = mapped_column(String, ForeignKey("a2a_servers.id"))
a2a_skill_id: Mapped[Optional[str]] = mapped_column(String)
# Relationships
a2a_server: Mapped[Optional["A2AServer"]] = relationship("A2AServer", back_populates="tools")
🔧 Configuration
class A2AConfig:
# === Core A2A Support ===
a2a_enabled: bool = True # Enable A2A protocol support
a2a_client_timeout: int = 30 # Default A2A request timeout (seconds)
a2a_task_polling_interval: int = 5 # Task status polling interval (seconds)
a2a_max_task_duration: int = 3600 # Maximum task duration (1 hour)
# === Discovery ===
a2a_discovery_enabled: bool = True # Enable automatic A2A server discovery
a2a_well_known_path: str = "/.well-known/agent.json" # Standard Agent Card path
a2a_discovery_timeout: int = 10 # Agent Card fetch timeout (seconds)
# === Task Management ===
a2a_async_task_support: bool = True # Support async task execution
a2a_streaming_support: bool = True # Support SSE streaming tasks
a2a_push_notifications_support: bool = True # Support webhook notifications
a2a_webhook_base_url: str = "" # Base URL for webhook endpoints
# === Authentication ===
a2a_auth_cache_ttl: int = 3600 # Cache auth tokens for 1 hour
a2a_oauth2_enabled: bool = True # Support OAuth2 authentication
a2a_bearer_token_enabled: bool = True # Support Bearer token auth
a2a_api_key_enabled: bool = True # Support API key auth
# === Health Monitoring ===
a2a_health_check_enabled: bool = True # Enable A2A server health monitoring
a2a_health_check_interval: int = 300 # Health check interval (5 minutes)
a2a_health_check_timeout: int = 10 # Health check timeout (seconds)
a2a_unhealthy_threshold: int = 3 # Consecutive failures before unhealthy
# === Tool Generation ===
a2a_tool_prefix: str = "a2a" # Prefix for generated tool names
a2a_auto_tool_generation: bool = True # Auto-generate tools from skills
a2a_skill_update_detection: bool = True # Detect skill changes and update tools
# === Error Handling ===
a2a_retry_enabled: bool = True # Enable request retries
a2a_max_retries: int = 3 # Maximum retry attempts
a2a_retry_delay: int = 2 # Base retry delay (seconds)
a2a_exponential_backoff: bool = True # Use exponential backoff for retries
# === Performance ===
a2a_connection_pool_size: int = 20 # HTTP connection pool size
a2a_max_concurrent_tasks: int = 100 # Max concurrent A2A tasks
a2a_task_result_cache_ttl: int = 300 # Cache task results for 5 minutes
# === Security ===
a2a_webhook_token_validation: bool = True # Validate webhook tokens
a2a_tls_verification: bool = True # Verify TLS certificates
a2a_allowed_schemes: List[str] = [ # Allowed authentication schemes
"oauth2", "bearer", "apikey", "none"
]
📱 Admin UI Integration
A2A Server Management Interface
<!-- A2A Servers Tab Panel -->
<div id="a2a-servers-panel" class="tab-panel hidden">
<div class="a2a-servers-header">
<h2 class="text-2xl font-bold">A2A Servers</h2>
<p class="text-gray-600">Manage Agent2Agent servers and their automatically generated tools</p>
<!-- Quick Stats -->
<div class="a2a-stats grid grid-cols-4 gap-4 my-4">
<div class="stat-card">
<h3>A2A Servers</h3>
<span class="stat-number">5</span>
<span class="stat-note">3 healthy, 2 inactive</span>
</div>
<div class="stat-card">
<h3>Generated Tools</h3>
<span class="stat-number">18</span>
<span class="stat-note">from A2A skills</span>
</div>
<div class="stat-card">
<h3>Active Tasks</h3>
<span class="stat-number">7</span>
<span class="stat-note">currently running</span>
</div>
<div class="stat-card">
<h3>Avg Response Time</h3>
<span class="stat-number">2.3s</span>
<span class="stat-change">-0.5s from last week</span>
</div>
</div>
</div>
<!-- A2A Server Registration Form -->
<div class="registration-section">
<h3>Register New A2A Server</h3>
<form id="a2a-server-form" class="a2a-registration-form">
<div class="form-group">
<label>Agent Card URL</label>
<input type="url" name="agent_card_url" placeholder="https://agent.example.com/.well-known/agent.json" required />
<button type="button" onclick="discoverA2AServer()">Discover</button>
</div>
<div id="discovery-results" class="discovery-results hidden">
<h4>Discovered Server Information</h4>
<div class="server-info">
<div class="info-item">
<label>Server Name:</label>
<span id="discovered-name"></span>
</div>
<div class="info-item">
<label>Description:</label>
<span id="discovered-description"></span>
</div>
<div class="info-item">
<label>Version:</label>
<span id="discovered-version"></span>
</div>
<div class="info-item">
<label>Capabilities:</label>
<div id="discovered-capabilities"></div>
</div>
<div class="info-item">
<label>Skills Found:</label>
<div id="discovered-skills"></div>
</div>
</div>
<!-- Authentication Configuration -->
<div id="auth-config" class="auth-config">
<h4>Authentication Configuration</h4>
<div id="auth-fields">
<!-- Populated based on discovered auth schemes -->
</div>
</div>
<div class="registration-actions">
<button type="submit" class="btn btn-primary">Register A2A Server</button>
<button type="button" onclick="cancelRegistration()" class="btn btn-secondary">Cancel</button>
</div>
</div>
</form>
</div>
<!-- A2A Servers List -->
<div class="a2a-servers-list">
<div class="servers-grid" id="a2a-servers-grid">
<!-- Server cards populated dynamically -->
</div>
</div>
</div>
<!-- A2A Server Detail Modal -->
<div id="a2a-server-detail-modal" class="modal">
<div class="modal-content a2a-server-detail">
<div class="server-header">
<div class="server-info">
<h3 class="server-name"></h3>
<p class="server-description"></p>
<div class="server-status">
<span class="status-indicator"></span>
<span class="status-text"></span>
<span class="last-check"></span>
</div>
</div>
<div class="server-actions">
<button class="btn btn-primary" onclick="refreshA2AServer()">Refresh</button>
<button class="btn btn-secondary" onclick="testA2AConnection()">Test Connection</button>
<button class="btn btn-warning" onclick="toggleA2AServer()">Disable</button>
</div>
</div>
<div class="server-tabs">
<button class="tab-btn active" onclick="showA2ATab('overview')">Overview</button>
<button class="tab-btn" onclick="showA2ATab('skills')">Skills & Tools</button>
<button class="tab-btn" onclick="showA2ATab('tasks')">Tasks</button>
<button class="tab-btn" onclick="showA2ATab('auth')">Authentication</button>
<button class="tab-btn" onclick="showA2ATab('logs')">Logs</button>
</div>
<div class="server-tab-content">
<!-- Tab content populated dynamically -->
</div>
</div>
</div>
A2A Integration JavaScript
class A2AManager {
async discoverA2AServer(agentCardUrl) {
try {
const response = await fetch('/admin/a2a/discover', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ agent_card_url: agentCardUrl })
});
if (!response.ok) {
throw new Error(`Discovery failed: ${response.statusText}`);
}
const discovery = await response.json();
this.displayDiscoveryResults(discovery);
} catch (error) {
this.showError('Failed to discover A2A server: ' + error.message);
}
}
displayDiscoveryResults(discovery) {
// Populate discovery results
document.getElementById('discovered-name').textContent = discovery.name;
document.getElementById('discovered-description').textContent = discovery.description || 'No description';
document.getElementById('discovered-version').textContent = discovery.version;
// Display capabilities
const capabilitiesDiv = document.getElementById('discovered-capabilities');
const capabilities = [];
if (discovery.capabilities.streaming) capabilities.push('Streaming');
if (discovery.capabilities.pushNotifications) capabilities.push('Push Notifications');
capabilitiesDiv.innerHTML = capabilities.join(', ') || 'None';
// Display skills
const skillsDiv = document.getElementById('discovered-skills');
skillsDiv.innerHTML = discovery.skills.map(skill =>
`<div class="skill-item">
<strong>${skill.name}</strong> (${skill.id})
<p>${skill.description || 'No description'}</p>
</div>`
).join('');
// Configure authentication fields
this.configureAuthenticationFields(discovery.authentication);
// Show discovery results
document.getElementById('discovery-results').classList.remove('hidden');
}
configureAuthenticationFields(authentication) {
const authFields = document.getElementById('auth-fields');
if (!authentication || !authentication.schemes || authentication.schemes.includes('none')) {
authFields.innerHTML = '<p>No authentication required</p>';
return;
}
let fieldsHTML = '';
if (authentication.schemes.includes('oauth2')) {
fieldsHTML += `
<div class="auth-section">
<h5>OAuth2 Configuration</h5>
<div class="form-group">
<label>Client ID</label>
<input type="text" name="oauth2_client_id" required />
</div>
<div class="form-group">
<label>Client Secret</label>
<input type="password" name="oauth2_client_secret" required />
</div>
<div class="form-group">
<label>Scopes (comma-separated)</label>
<input type="text" name="oauth2_scopes" placeholder="read,write" />
</div>
</div>
`;
}
if (authentication.schemes.includes('bearer')) {
fieldsHTML += `
<div class="auth-section">
<h5>Bearer Token</h5>
<div class="form-group">
<label>Token</label>
<input type="password" name="bearer_token" required />
</div>
</div>
`;
}
if (authentication.schemes.includes('apikey')) {
fieldsHTML += `
<div class="auth-section">
<h5>API Key</h5>
<div class="form-group">
<label>API Key</label>
<input type="password" name="api_key" required />
</div>
<div class="form-group">
<label>Header Name</label>
<input type="text" name="api_key_header" value="X-API-Key" />
</div>
</div>
`;
}
authFields.innerHTML = fieldsHTML;
}
async registerA2AServer(formData) {
try {
const response = await fetch('/admin/a2a/servers', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(formData)
});
if (!response.ok) {
throw new Error(`Registration failed: ${response.statusText}`);
}
const result = await response.json();
this.showSuccess(
`A2A server registered successfully! Created ${result.tools_created} tools.`
);
this.refreshA2AServersList();
this.resetRegistrationForm();
} catch (error) {
this.showError('Failed to register A2A server: ' + error.message);
}
}
async testA2AConnection(serverId) {
try {
const response = await fetch(`/admin/a2a/servers/${serverId}/test`);
const result = await response.json();
if (result.healthy) {
this.showSuccess('A2A server connection test successful');
} else {
this.showWarning(`Connection test failed: ${result.error}`);
}
} catch (error) {
this.showError('Connection test failed: ' + error.message);
}
}
async viewA2ATasks(serverId) {
try {
const response = await fetch(`/admin/a2a/servers/${serverId}/tasks`);
const tasks = await response.json();
this.displayA2ATasks(tasks);
} catch (error) {
this.showError('Failed to load A2A tasks: ' + error.message);
}
}
displayA2ATasks(tasks) {
const tasksContainer = document.getElementById('a2a-tasks-container');
if (tasks.length === 0) {
tasksContainer.innerHTML = '<p>No tasks found</p>';
return;
}
tasksContainer.innerHTML = tasks.map(task => `
<div class="task-item" data-task-id="${task.id}">
<div class="task-header">
<h4>${task.skill_id}</h4>
<span class="task-state state-${task.state}">${task.state}</span>
</div>
<div class="task-details">
<p><strong>Started:</strong> ${new Date(task.started_at).toLocaleString()}</p>
${task.completed_at ? `<p><strong>Completed:</strong> ${new Date(task.completed_at).toLocaleString()}</p>` : ''}
${task.execution_time ? `<p><strong>Duration:</strong> ${task.execution_time.toFixed(2)}s</p>` : ''}
${task.error_message ? `<p class="error"><strong>Error:</strong> ${task.error_message}</p>` : ''}
</div>
</div>
`).join('');
}
}
// Initialize A2A management
const a2aManager = new A2AManager();
// Form handlers
document.getElementById('a2a-server-form').addEventListener('submit', async (e) => {
e.preventDefault();
const formData = new FormData(e.target);
const data = Object.fromEntries(formData);
await a2aManager.registerA2AServer(data);
});
function discoverA2AServer() {
const agentCardUrl = document.querySelector('input[name="agent_card_url"]').value;
if (agentCardUrl) {
a2aManager.discoverA2AServer(agentCardUrl);
}
}
🚀 Performance Expectations
A2A Integration Performance Metrics
A2A Operations Performance:
┌─────────────────────────┬──────────────┬────────────────┐
│ Operation │ Target Time │ Max Acceptable │
├─────────────────────────┼──────────────┼────────────────┤
│ Agent Card Discovery │ <3 seconds │ 10 seconds │
│ A2A Server Registration │ <10 seconds │ 30 seconds │
│ Sync Task Execution │ <5 seconds │ 30 seconds │
│ Async Task Submission │ <2 seconds │ 5 seconds │
│ Task Status Polling │ <1 second │ 3 seconds │
│ Health Check │ <5 seconds │ 15 seconds │
│ Tool Invocation │ <3 seconds │ 10 seconds │
└─────────────────────────┴──────────────┴────────────────┘
A2A Protocol Efficiency:
- Sync task success rate: >95%
- Async task completion rate: >90%
- Streaming task reliability: >85%
- Authentication success rate: >98%
- Health check accuracy: >99%
A2A Scalability Targets:
- Support for 50+ A2A servers
- 500+ generated A2A tools
- 100+ concurrent A2A tasks
- 1,000+ daily A2A interactions
🧭 Implementation Roadmap
**Phase 1: Core A2A Infrastructure
- Design and implement A2A database schema
- Create A2AService and A2AClient core components
- Implement Agent Card discovery and parsing
- Build basic A2A server registration functionality
- Create A2A tool generator for skill-to-tool conversion
Phase 2: Task Management & Protocol
- Implement A2A task lifecycle management
- Build sync task execution (tasks/send)
- Add async task polling (tasks/get)
- Create protocol translation layer (MCP ↔ A2A)
- Implement error handling and status mapping
Phase 3: Advanced Features
- Add streaming support (tasks/sendSubscribe)
- Implement push notification handling
- Build authentication bridge for A2A schemes
- Create A2A server health monitoring
- Add task cancellation and cleanup
Phase 4: Admin UI & Management
- Integrate A2A management into Admin UI
- Build A2A server registration interface
- Create A2A task monitoring dashboard
- Add A2A authentication configuration UI
- Implement A2A health status visualization
Phase 5: Performance & Production
- Optimize A2A task execution performance
- Add comprehensive A2A metrics and monitoring
- Implement A2A server load balancing (future)
- Create A2A integration testing suite
- Add production-ready error handling and logging
🎯 Success Criteria
- A2A server registration success rate >95% for valid Agent Cards
- Tool generation creates 100% coverage of A2A skills
- Sync task execution <5 seconds average response time
- Async task handling with proper lifecycle management
- Authentication success rate >98% across supported schemes
- Health monitoring detects server issues within 5 minutes
- MCP-A2A protocol translation accuracy >99% for standard use cases
- Admin UI provides complete A2A management capabilities
- Support for 50+ A2A servers with 500+ generated tools
🔗 Integration Points
Existing Gateway Integration
- Tool Management: A2A tools integrate with existing tool catalog and metrics
- Authentication: Leverages existing auth framework with A2A-specific extensions
- Health Monitoring: Extends current health check system for A2A servers
- Admin UI: Seamlessly integrates into existing admin interface
- Metrics: A2A metrics integrate with current metrics collection
Future Integration Opportunities
- Marketplace Integration: A2A servers could be discoverable through marketplace
- Load Balancing: A2A servers could participate in intelligent load balancing
- Review System: A2A tools could be rated and reviewed like other tools
- Certification: A2A servers could undergo automated testing and certification
🧩 Additional Notes
- 🌉 Protocol Bridge: Creates seamless interoperability between MCP and A2A ecosystems
- 🔄 Automatic Conversion: A2A skills automatically become MCP tools with no manual configuration
- ⚡ Performance Focused: Optimized for both sync and async A2A task patterns
- 🛡️ Enterprise Ready: Supports all A2A authentication schemes and security requirements
- 📊 Observable: Comprehensive monitoring and metrics for A2A operations
- 🎛️ Manageable: Full admin UI integration for A2A server lifecycle management
- 🔧 Extensible: Foundation for advanced A2A features like task chaining and collaboration
- 📈 Scalable: Designed to handle large numbers of A2A servers and concurrent tasks