@@ -3,7 +3,7 @@ import { randomUUID } from 'node:crypto';
33import { McpServer } from '../../server/mcp.js' ;
44import { EventStore , StreamableHTTPServerTransport } from '../../server/streamableHttp.js' ;
55import { z } from 'zod' ;
6- import { CallToolResult , GetPromptResult , JSONRPCMessage , ReadResourceResult } from '../../types.js' ;
6+ import { CallToolResult , GetPromptResult , isInitializeRequest , JSONRPCMessage , ReadResourceResult } from '../../types.js' ;
77
88// Create a simple in-memory EventStore for resumability
99class InMemoryEventStore implements EventStore {
@@ -36,7 +36,7 @@ class InMemoryEventStore implements EventStore {
3636 * Replays events that occurred after a specific event ID
3737 * Implements EventStore.replayEventsAfter
3838 */
39- async replayEventsAfter ( lastEventId : string ,
39+ async replayEventsAfter ( lastEventId : string ,
4040 { send } : { send : ( eventId : string , message : JSONRPCMessage ) => Promise < void > }
4141 ) : Promise < string > {
4242 if ( ! lastEventId || ! this . events . has ( lastEventId ) ) {
@@ -247,19 +247,28 @@ app.post('/mcp', async (req: Request, res: Response) => {
247247 transport = new StreamableHTTPServerTransport ( {
248248 sessionIdGenerator : ( ) => randomUUID ( ) ,
249249 eventStore, // Enable resumability
250+ onsessioninitialized : ( sessionId ) => {
251+ // Store the transport by session ID when session is initialized
252+ // This avoids race conditions where requests might come in before the session is stored
253+ console . log ( `Session initialized with ID: ${ sessionId } ` ) ;
254+ transports [ sessionId ] = transport ;
255+ }
250256 } ) ;
251257
258+ // Set up onclose handler to clean up transport when closed
259+ transport . onclose = ( ) => {
260+ const sid = transport . sessionId ;
261+ if ( sid && transports [ sid ] ) {
262+ console . log ( `Transport closed for session ${ sid } , removing from transports map` ) ;
263+ delete transports [ sid ] ;
264+ }
265+ } ;
266+
252267 // Connect the transport to the MCP server BEFORE handling the request
253268 // so responses can flow back through the same transport
254269 await server . connect ( transport ) ;
255270
256- // After handling the request, if we get a session ID back, store the transport
257271 await transport . handleRequest ( req , res , req . body ) ;
258-
259- // Store the transport by session ID for future requests
260- if ( transport . sessionId ) {
261- transports [ transport . sessionId ] = transport ;
262- }
263272 return ; // Already handled
264273 } else {
265274 // Invalid request - no session ID or not initialization request
@@ -312,13 +321,26 @@ app.get('/mcp', async (req: Request, res: Response) => {
312321 await transport . handleRequest ( req , res ) ;
313322} ) ;
314323
315- // Helper function to detect initialize requests
316- function isInitializeRequest ( body : unknown ) : boolean {
317- if ( Array . isArray ( body ) ) {
318- return body . some ( msg => typeof msg === 'object' && msg !== null && 'method' in msg && msg . method === 'initialize' ) ;
324+ // Handle DELETE requests for session termination (according to MCP spec)
325+ app . delete ( '/mcp' , async ( req : Request , res : Response ) => {
326+ const sessionId = req . headers [ 'mcp-session-id' ] as string | undefined ;
327+ if ( ! sessionId || ! transports [ sessionId ] ) {
328+ res . status ( 400 ) . send ( 'Invalid or missing session ID' ) ;
329+ return ;
319330 }
320- return typeof body === 'object' && body !== null && 'method' in body && body . method === 'initialize' ;
321- }
331+
332+ console . log ( `Received session termination request for session ${ sessionId } ` ) ;
333+
334+ try {
335+ const transport = transports [ sessionId ] ;
336+ await transport . handleRequest ( req , res ) ;
337+ } catch ( error ) {
338+ console . error ( 'Error handling session termination:' , error ) ;
339+ if ( ! res . headersSent ) {
340+ res . status ( 500 ) . send ( 'Error processing session termination' ) ;
341+ }
342+ }
343+ } ) ;
322344
323345// Start the server
324346const PORT = 3000 ;
@@ -351,6 +373,18 @@ app.listen(PORT, () => {
351373// Handle server shutdown
352374process . on ( 'SIGINT' , async ( ) => {
353375 console . log ( 'Shutting down server...' ) ;
376+
377+ // Close all active transports to properly clean up resources
378+ for ( const sessionId in transports ) {
379+ try {
380+ console . log ( `Closing transport for session ${ sessionId } ` ) ;
381+ await transports [ sessionId ] . close ( ) ;
382+ delete transports [ sessionId ] ;
383+ } catch ( error ) {
384+ console . error ( `Error closing transport for session ${ sessionId } :` , error ) ;
385+ }
386+ }
354387 await server . close ( ) ;
388+ console . log ( 'Server shutdown complete' ) ;
355389 process . exit ( 0 ) ;
356390} ) ;
0 commit comments