Skip to content

Commit cb348e0

Browse files
authored
fix: add a ping loop for http transport clients (#484)
1 parent d261208 commit cb348e0

File tree

3 files changed

+45
-3
lines changed

3 files changed

+45
-3
lines changed

src/common/logger.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ export const LogId = {
5151
streamableHttpTransportSessionCloseNotificationFailure: mongoLogId(1_006_004),
5252
streamableHttpTransportRequestFailure: mongoLogId(1_006_005),
5353
streamableHttpTransportCloseFailure: mongoLogId(1_006_006),
54+
streamableHttpTransportKeepAliveFailure: mongoLogId(1_006_007),
55+
streamableHttpTransportKeepAlive: mongoLogId(1_006_008),
5456

5557
exportCleanupError: mongoLogId(1_007_001),
5658
exportCreationError: mongoLogId(1_007_002),

src/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,10 @@ async function main(): Promise<void> {
9191
transportRunner.logger.info({
9292
id: LogId.serverCloseRequested,
9393
context: "server",
94-
message: "Closing server",
94+
message: `Closing server due to error: ${error as string}`,
95+
noRedaction: true,
9596
});
97+
9698
try {
9799
await transportRunner.close();
98100
transportRunner.logger.info({

src/transports/streamableHttp.ts

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
7474
jsonrpc: "2.0",
7575
error: {
7676
code: JSON_RPC_ERROR_CODE_SESSION_ID_INVALID,
77-
message: `session id is invalid`,
77+
message: "session id is invalid",
7878
},
7979
});
8080
return;
@@ -85,7 +85,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
8585
jsonrpc: "2.0",
8686
error: {
8787
code: JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND,
88-
message: `session not found`,
88+
message: "session not found",
8989
},
9090
});
9191
return;
@@ -114,12 +114,48 @@ export class StreamableHttpRunner extends TransportRunnerBase {
114114
}
115115

116116
const server = this.setupServer();
117+
let keepAliveLoop: NodeJS.Timeout;
117118
const transport = new StreamableHTTPServerTransport({
118119
sessionIdGenerator: (): string => randomUUID().toString(),
119120
onsessioninitialized: (sessionId): void => {
120121
server.session.logger.setAttribute("sessionId", sessionId);
121122

122123
this.sessionStore.setSession(sessionId, transport, server.session.logger);
124+
125+
let failedPings = 0;
126+
// eslint-disable-next-line @typescript-eslint/no-misused-promises
127+
keepAliveLoop = setInterval(async () => {
128+
try {
129+
this.logger.debug({
130+
id: LogId.streamableHttpTransportKeepAlive,
131+
context: "streamableHttpTransport",
132+
message: "Sending ping",
133+
});
134+
135+
await transport.send({
136+
jsonrpc: "2.0",
137+
method: "ping",
138+
});
139+
failedPings = 0;
140+
} catch (err) {
141+
try {
142+
failedPings++;
143+
this.logger.warning({
144+
id: LogId.streamableHttpTransportKeepAliveFailure,
145+
context: "streamableHttpTransport",
146+
message: `Error sending ping (attempt #${failedPings}): ${err instanceof Error ? err.message : String(err)}`,
147+
});
148+
149+
if (failedPings > 3) {
150+
clearInterval(keepAliveLoop);
151+
await transport.close();
152+
}
153+
} catch {
154+
// Ignore the error of the transport close as there's nothing else
155+
// we can do at this point.
156+
}
157+
}
158+
}, 30_000);
123159
},
124160
onsessionclosed: async (sessionId): Promise<void> => {
125161
try {
@@ -135,6 +171,8 @@ export class StreamableHttpRunner extends TransportRunnerBase {
135171
});
136172

137173
transport.onclose = (): void => {
174+
clearInterval(keepAliveLoop);
175+
138176
server.close().catch((error) => {
139177
this.logger.error({
140178
id: LogId.streamableHttpTransportCloseFailure,

0 commit comments

Comments
 (0)