Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions src/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -592,4 +592,84 @@ describe("StreamableHTTPClientTransport", () => {
await expect(transport.send(message)).rejects.toThrow(UnauthorizedError);
expect(mockAuthProvider.redirectToAuthorization.mock.calls).toHaveLength(1);
});


describe('Reconnection Logic', () => {
// Use fake timers to control setTimeout and make the test instant.
beforeEach(() => jest.useFakeTimers());
afterEach(() => jest.useRealTimers());

it('should reconnect on stream failure even without a lastEventId', async () => {
// ARRANGE

// 1. Configure a transport that will retry quickly and at least once.
transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"), {
reconnectionOptions: {
initialReconnectionDelay: 10, // Reconnect almost instantly for the test
maxReconnectionDelay: 100,
reconnectionDelayGrowFactor: 1,
maxRetries: 1, // We only need to see one successful retry attempt
}
});

const errorSpy = jest.fn();
transport.onerror = errorSpy;

// 2. Mock the initial GET request. It will connect, but the stream will die immediately.
// This simulates the GCloud proxy killing the connection.
const failingStream = new ReadableStream({
start(controller) {
// Simulate an abrupt network error.
controller.error(new Error("Network connection terminated"));
}
});

const fetchMock = global.fetch as jest.Mock;
fetchMock.mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ "content-type": "text/event-stream" }),
body: failingStream,
});

// 3. Mock the SECOND GET request (the reconnection attempt). This one can succeed.
fetchMock.mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ "content-type": "text/event-stream" }),
body: new ReadableStream(), // A stable, empty stream
});

// ACT

// 4. Start the transport and initiate the SSE connection.
await transport.start();
// We call the internal method directly to trigger the GET request.
// This is cleaner than sending a full 'initialize' message for this test.
await transport["_startOrAuthSse"]({});

// 5. Advance timers to trigger the setTimeout in _scheduleReconnection.
await jest.advanceTimersByTimeAsync(20); // More than the 10ms delay

// ASSERT

// 6. Verify the initial disconnect error was caught.
expect(errorSpy).toHaveBeenCalledTimes(1);
expect(errorSpy).toHaveBeenCalledWith(
expect.objectContaining({
message: expect.stringContaining('SSE stream disconnected: Error: Network connection terminated'),
})
);

// 7. THIS IS THE KEY ASSERTION: Verify that a second fetch call was made.
// This proves the reconnection logic was triggered.
expect(fetchMock).toHaveBeenCalledTimes(2);

// 8. Verify the second call was a GET request without a last-event-id header.
const secondCall = fetchMock.mock.calls[1];
const secondRequest = secondCall[1];
expect(secondRequest.method).toBe('GET');
expect(secondRequest.headers.has('last-event-id')).toBe(false);
});
});
});
20 changes: 9 additions & 11 deletions src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,18 +349,16 @@ const response = await (this._fetch ?? fetch)(this._url, {
// Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing
if (this._abortController && !this._abortController.signal.aborted) {
// Use the exponential backoff reconnection strategy
if (lastEventId !== undefined) {
try {
this._scheduleReconnection({
resumptionToken: lastEventId,
onresumptiontoken,
replayMessageId
}, 0);
}
catch (error) {
this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));
try {
this._scheduleReconnection({
resumptionToken: lastEventId,
onresumptiontoken,
replayMessageId
}, 0);
}
catch (error) {
this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));

}
}
}
}
Expand Down