Skip to content

Commit fb6bf08

Browse files
Merge pull request #74 from mainframecomputer/feature/mcp-adapter-sse-support
feature: add mcp sse support
2 parents 360cb80 + 44bbd78 commit fb6bf08

File tree

3 files changed

+134
-46
lines changed

3 files changed

+134
-46
lines changed

packages/python/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "mainframe-orchestra"
3-
version = "0.0.32"
3+
version = "0.0.33"
44
description = "Mainframe-Orchestra is a lightweight, open-source agentic framework for building LLM based pipelines and self-orchestrating multi-agent teams"
55
authors = [
66
"Mainframe Computer Inc. <[email protected]>",

packages/python/src/mainframe_orchestra/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"""
44
# Copyright 2024 Mainframe-Orchestra Contributors. Licensed under Apache License 2.0.
55

6-
__version__ = "0.0.32"
6+
__version__ = "0.0.33"
77

88
import importlib
99

packages/python/src/mainframe_orchestra/adapters/mcp_adapter.py

Lines changed: 132 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# Copyright 2025 Mainframe-Orchestra Contributors. Licensed under Apache License 2.0.
2+
13
"""
24
MCP Adapter for Orchestra - Allows Orchestra agents to use tools from MCP servers.
35
@@ -10,12 +12,15 @@
1012
from typing import Any, Callable, Dict, List, Optional, Set, Literal
1113
import subprocess
1214
import json
15+
import time
1316
from mcp import ClientSession, StdioServerParameters, stdio_client
1417
from mcp.types import (
1518
Tool as MCPTool,
1619
CallToolResult,
1720
TextContent
1821
)
22+
from mcp.client.sse import sse_client
23+
from ..utils.logging_config import logger
1924

2025
class MCPOrchestra:
2126
"""
@@ -25,25 +30,37 @@ class MCPOrchestra:
2530
into Orchestra-compatible callables that can be used in Orchestra Tasks.
2631
"""
2732

28-
def __init__(self) -> None:
29-
"""Initialize a new MCP Orchestra adapter."""
33+
def __init__(self, credentials: Optional[Dict[str, Any]] = None) -> None:
34+
"""
35+
Initialize a new MCP Orchestra adapter.
36+
37+
Args:
38+
credentials: Optional dictionary containing credentials for various MCP servers
39+
"""
3040
self.exit_stack = AsyncExitStack()
3141
self.sessions: Dict[str, ClientSession] = {}
3242
self.tools: Set[Callable] = set()
3343
self.server_tools: Dict[str, Set[Callable]] = {}
3444
self.server_processes: Dict[str, subprocess.Popen] = {}
45+
self.credentials = credentials or {}
46+
logger.debug("Initialized MCPOrchestra adapter")
3547

3648
async def connect(
3749
self,
3850
server_name: str,
3951
*,
40-
command: str,
41-
args: List[str],
52+
command: Optional[str] = None,
53+
args: Optional[List[str]] = None,
4254
env: Optional[Dict[str, str]] = None,
4355
encoding: str = "utf-8",
4456
encoding_error_handler: Literal["strict", "ignore", "replace"] = "strict",
4557
start_server: bool = False,
4658
server_startup_delay: float = 2.0,
59+
sse_url: Optional[str] = None,
60+
sse_headers: Optional[Dict[str, Any]] = None,
61+
sse_timeout: float = 5.0,
62+
sse_read_timeout: float = 300.0,
63+
credentials_key: Optional[str] = None,
4764
) -> None:
4865
"""
4966
Connect to an MCP server and load its tools.
@@ -57,48 +74,107 @@ async def connect(
5774
encoding_error_handler: How to handle encoding errors
5875
start_server: Whether to start the server process before connecting
5976
server_startup_delay: Time to wait after starting server before connecting (seconds)
77+
sse_url: URL for SSE-based MCP server (if using SSE instead of stdio)
78+
sse_headers: Optional HTTP headers for SSE connection
79+
sse_timeout: Timeout for SSE connection establishment (seconds)
80+
sse_read_timeout: Timeout for SSE event reading (seconds)
81+
credentials_key: Optional key to use for looking up credentials in self.credentials
6082
"""
61-
# Optionally start the server process
62-
if start_server:
63-
import time
64-
65-
full_command = [command] + args
66-
server_process = subprocess.Popen(
67-
full_command,
68-
stdout=subprocess.PIPE,
69-
stderr=subprocess.PIPE,
70-
text=True,
71-
env=env
83+
logger.debug(f"Connecting to MCP server: {server_name}")
84+
85+
# Apply credentials if provided
86+
if credentials_key and credentials_key in self.credentials:
87+
server_creds = self.credentials[credentials_key]
88+
logger.debug(f"Using credentials from key: {credentials_key}")
89+
90+
# Update environment variables with credentials
91+
if env is None:
92+
env = {}
93+
94+
# Merge credentials into environment variables
95+
for key, value in server_creds.items():
96+
env_key = key.upper() # Convert to uppercase for environment variables
97+
env[env_key] = value
98+
logger.debug(f"Added credential to environment: {env_key}")
99+
100+
# Check if we're using SSE or stdio
101+
if sse_url:
102+
logger.debug(f"Using SSE connection for server {server_name}: {sse_url}")
103+
# SSE connection
104+
transport = await self.exit_stack.enter_async_context(
105+
sse_client(
106+
url=sse_url,
107+
headers=sse_headers,
108+
timeout=sse_timeout,
109+
sse_read_timeout=sse_read_timeout,
110+
)
72111
)
73-
self.server_processes[server_name] = server_process
74-
75-
# Give the server time to start up
76-
time.sleep(server_startup_delay)
77-
78-
# Create server parameters
79-
server_params = StdioServerParameters(
80-
command=command,
81-
args=args,
82-
env=env,
83-
encoding=encoding,
84-
encoding_error_handler=encoding_error_handler,
85-
)
86-
87-
# Establish connection
88-
stdio_transport = await self.exit_stack.enter_async_context(
89-
stdio_client(server_params)
90-
)
91-
read, write = stdio_transport
92-
session = await self.exit_stack.enter_async_context(ClientSession(read, write))
93-
94-
# Initialize session
95-
await session.initialize()
96-
self.sessions[server_name] = session
97-
98-
# Load tools from this server
99-
server_tools = await self._load_tools(session, server_name)
100-
self.server_tools[server_name] = server_tools
101-
self.tools.update(server_tools)
112+
read, write = transport
113+
session = await self.exit_stack.enter_async_context(ClientSession(read, write))
114+
115+
# Initialize session
116+
await session.initialize()
117+
self.sessions[server_name] = session
118+
logger.debug(f"Successfully initialized session for server: {server_name}")
119+
120+
# Load tools from this server
121+
server_tools = await self._load_tools(session, server_name)
122+
self.server_tools[server_name] = server_tools
123+
self.tools.update(server_tools)
124+
logger.debug(f"Loaded {len(server_tools)} tools from server: {server_name}")
125+
126+
else:
127+
# Stdio connection
128+
# Optionally start the server process
129+
if start_server:
130+
logger.debug(f"Starting MCP server process for {server_name}: {command} {args}")
131+
if not command or not args:
132+
raise ValueError("Command and args must be provided when start_server is True")
133+
134+
full_command = [command] + args
135+
server_process = subprocess.Popen(
136+
full_command,
137+
stdout=subprocess.PIPE,
138+
stderr=subprocess.PIPE,
139+
text=True,
140+
env=env
141+
)
142+
self.server_processes[server_name] = server_process
143+
logger.debug(f"Server process started with PID: {server_process.pid}")
144+
145+
# Give the server time to start up
146+
logger.debug(f"Waiting {server_startup_delay}s for server to start")
147+
time.sleep(server_startup_delay)
148+
149+
# Create server parameters
150+
if not command or not args:
151+
raise ValueError("Command and args must be provided for stdio connection")
152+
153+
server_params = StdioServerParameters(
154+
command=command,
155+
args=args,
156+
env=env,
157+
encoding=encoding,
158+
encoding_error_handler=encoding_error_handler,
159+
)
160+
161+
# Establish connection
162+
stdio_transport = await self.exit_stack.enter_async_context(
163+
stdio_client(server_params)
164+
)
165+
read, write = stdio_transport
166+
session = await self.exit_stack.enter_async_context(ClientSession(read, write))
167+
168+
# Initialize session
169+
await session.initialize()
170+
self.sessions[server_name] = session
171+
logger.debug(f"Successfully initialized session for server: {server_name}")
172+
173+
# Load tools from this server
174+
server_tools = await self._load_tools(session, server_name)
175+
self.server_tools[server_name] = server_tools
176+
self.tools.update(server_tools)
177+
logger.debug(f"Loaded {len(server_tools)} tools from server: {server_name}")
102178

103179
async def _load_tools(self, session: ClientSession, server_name: str) -> Set[Callable]:
104180
"""
@@ -111,9 +187,12 @@ async def _load_tools(self, session: ClientSession, server_name: str) -> Set[Cal
111187
Returns:
112188
A set of callable functions that can be used with Orchestra
113189
"""
190+
logger.debug(f"Loading tools from server: {server_name}")
114191
tools_info = await session.list_tools()
115192
orchestra_tools: Set[Callable] = set()
116193

194+
logger.debug(f"Found {len(tools_info.tools)} tools on server: {server_name}")
195+
117196
for tool in tools_info.tools:
118197
# Get the tool name
119198
tool_name = tool.name
@@ -246,14 +325,18 @@ def _process_tool_result(self, result: CallToolResult) -> str:
246325
# Handle errors
247326
if result.isError:
248327
error_message = "\n".join(text_parts) if text_parts else "Unknown MCP tool error"
328+
logger.debug(f"Tool execution failed: {error_message}")
249329
raise Exception(error_message)
250330

251331
# Return combined text
252332
if not text_parts:
333+
logger.debug("Tool executed successfully (no text output)")
253334
return "Tool executed successfully (no text output)"
254335
elif len(text_parts) == 1:
336+
logger.debug("Tool executed successfully with single text output")
255337
return text_parts[0]
256338
else:
339+
logger.debug(f"Tool executed successfully with {len(text_parts)} text outputs")
257340
return "\n".join(text_parts)
258341

259342
def get_tools(self) -> Set[Callable]:
@@ -284,17 +367,22 @@ def get_server_tools(self, server_name: str) -> Set[Callable]:
284367

285368
async def close(self) -> None:
286369
"""Close all connections to MCP servers and terminate any started server processes."""
370+
logger.debug("Closing all MCP server connections")
287371
# Close all MCP sessions
288372
await self.exit_stack.aclose()
289373

290374
# Terminate any server processes we started
291375
for server_name, process in self.server_processes.items():
376+
logger.debug(f"Terminating server process for {server_name} (PID: {process.pid})")
292377
try:
293378
process.terminate()
294379
process.wait(timeout=5) # Wait up to 5 seconds for graceful termination
380+
logger.debug(f"Server process for {server_name} terminated gracefully")
295381
except subprocess.TimeoutExpired:
382+
logger.debug(f"Server process for {server_name} did not terminate gracefully, forcing kill")
296383
process.kill() # Force kill if it doesn't terminate gracefully
297384
process.wait()
385+
logger.debug(f"Server process for {server_name} killed")
298386

299387
async def __aenter__(self) -> "MCPOrchestra":
300388
"""Support for async context manager."""

0 commit comments

Comments
 (0)