@@ -7,10 +7,11 @@ import { FindCursor } from "mongodb";
77import { EJSON , EJSONOptions , ObjectId } from "bson" ;
88import { Transform } from "stream" ;
99import { pipeline } from "stream/promises" ;
10+ import { MongoLogId } from "mongodb-log-writer" ;
11+ import { RWLock } from "async-rwlock" ;
1012
1113import { UserConfig } from "./config.js" ;
1214import { LoggerBase , LogId } from "./logger.js" ;
13- import { MongoLogId } from "mongodb-log-writer" ;
1415
1516export const jsonExportFormat = z . enum ( [ "relaxed" , "canonical" ] ) ;
1617export type JSONExportFormat = z . infer < typeof jsonExportFormat > ;
@@ -60,6 +61,12 @@ export type ExportsManagerConfig = Pick<UserConfig, "exportsPath" | "exportTimeo
6061 // The maximum number of milliseconds to wait for in-flight operations to
6162 // settle before shutting down ExportsManager.
6263 activeOpsDrainTimeoutMs ?: number ;
64+
65+ // The maximum number of milliseconds to wait before timing out queued reads
66+ readTimeout ?: number ;
67+
68+ // The maximum number of milliseconds to wait before timing out queued writes
69+ writeTimeout ?: number ;
6370} ;
6471
6572type ExportsManagerEvents = {
@@ -74,6 +81,9 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
7481 private readonly shutdownController : AbortController = new AbortController ( ) ;
7582 private readonly activeOperations : Set < Promise < unknown > > = new Set ( ) ;
7683 private readonly activeOpsDrainTimeoutMs : number ;
84+ private readonly readTimeoutMs : number ;
85+ private readonly writeTimeoutMs : number ;
86+ private readonly exportLocks : Map < string , RWLock > = new Map ( ) ;
7787
7888 private constructor (
7989 private readonly exportsDirectoryPath : string ,
@@ -82,6 +92,8 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
8292 ) {
8393 super ( ) ;
8494 this . activeOpsDrainTimeoutMs = this . config . activeOpsDrainTimeoutMs ?? 10_000 ;
95+ this . readTimeoutMs = this . config . readTimeout ?? 30_0000 ; // 30 seconds is the default timeout for an MCP request
96+ this . writeTimeoutMs = this . config . writeTimeout ?? 120_000 ; // considering that writes can take time
8597 }
8698
8799 public get availableExports ( ) : AvailableExport [ ] {
@@ -131,24 +143,24 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
131143 try {
132144 this . assertIsNotShuttingDown ( ) ;
133145 exportName = decodeURIComponent ( exportName ) ;
134- const exportHandle = this . storedExports [ exportName ] ;
135- if ( ! exportHandle ) {
136- throw new Error ( "Requested export has either expired or does not exist!" ) ;
137- }
138-
139- if ( exportHandle . exportStatus === "in-progress" ) {
140- throw new Error ( "Requested export is still being generated!" ) ;
141- }
146+ return await this . withLock ( exportName , "read" , false , async ( ) : Promise < string > => {
147+ const exportHandle = this . storedExports [ exportName ] ;
148+ if ( ! exportHandle ) {
149+ throw new Error ( "Requested export has either expired or does not exist!" ) ;
150+ }
142151
143- const { exportPath, exportCreatedAt } = exportHandle ;
152+ // This won't happen anymore because of lock synchronization but
153+ // keeping it here to make TS happy.
154+ if ( exportHandle . exportStatus === "in-progress" ) {
155+ throw new Error ( "Requested export is still being generated!" ) ;
156+ }
144157
145- if ( isExportExpired ( exportCreatedAt , this . config . exportTimeoutMs ) ) {
146- throw new Error ( "Requested export has expired!" ) ;
147- }
158+ const { exportPath } = exportHandle ;
148159
149- return await this . trackOperation (
150- fs . readFile ( exportPath , { encoding : "utf8" , signal : this . shutdownController . signal } )
151- ) ;
160+ return await this . trackOperation (
161+ fs . readFile ( exportPath , { encoding : "utf8" , signal : this . shutdownController . signal } )
162+ ) ;
163+ } ) ;
152164 } catch ( error ) {
153165 this . logger . error ( {
154166 id : LogId . exportReadError ,
@@ -162,7 +174,7 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
162174 }
163175 }
164176
165- public createJSONExport ( {
177+ public async createJSONExport ( {
166178 input,
167179 exportName,
168180 exportTitle,
@@ -172,25 +184,27 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
172184 exportName : string ;
173185 exportTitle : string ;
174186 jsonExportFormat : JSONExportFormat ;
175- } ) : AvailableExport {
187+ } ) : Promise < AvailableExport > {
176188 try {
177189 this . assertIsNotShuttingDown ( ) ;
178190 const exportNameWithExtension = validateExportName ( ensureExtension ( exportName , "json" ) ) ;
179- if ( this . storedExports [ exportNameWithExtension ] ) {
180- throw new Error ( "Export with same name is either already available or being generated." ) ;
181- }
182- const exportURI = `exported-data://${ encodeURIComponent ( exportNameWithExtension ) } ` ;
183- const exportFilePath = path . join ( this . exportsDirectoryPath , exportNameWithExtension ) ;
184- const inProgressExport : InProgressExport = ( this . storedExports [ exportNameWithExtension ] = {
185- exportName : exportNameWithExtension ,
186- exportTitle,
187- exportPath : exportFilePath ,
188- exportURI : exportURI ,
189- exportStatus : "in-progress" ,
190- } ) ;
191+ return await this . withLock ( exportNameWithExtension , "write" , false , ( ) : AvailableExport => {
192+ if ( this . storedExports [ exportNameWithExtension ] ) {
193+ throw new Error ( "Export with same name is either already available or being generated." ) ;
194+ }
195+ const exportURI = `exported-data://${ encodeURIComponent ( exportNameWithExtension ) } ` ;
196+ const exportFilePath = path . join ( this . exportsDirectoryPath , exportNameWithExtension ) ;
197+ const inProgressExport : InProgressExport = ( this . storedExports [ exportNameWithExtension ] = {
198+ exportName : exportNameWithExtension ,
199+ exportTitle,
200+ exportPath : exportFilePath ,
201+ exportURI : exportURI ,
202+ exportStatus : "in-progress" ,
203+ } ) ;
191204
192- void this . trackOperation ( this . startExport ( { input, jsonExportFormat, inProgressExport } ) ) ;
193- return inProgressExport ;
205+ void this . trackOperation ( this . startExport ( { input, jsonExportFormat, inProgressExport } ) ) ;
206+ return inProgressExport ;
207+ } ) ;
194208 } catch ( error ) {
195209 this . logger . error ( {
196210 id : LogId . exportCreationError ,
@@ -211,40 +225,46 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
211225 inProgressExport : InProgressExport ;
212226 } ) : Promise < void > {
213227 let pipeSuccessful = false ;
214- try {
215- await fs . mkdir ( this . exportsDirectoryPath , { recursive : true } ) ;
216- const outputStream = createWriteStream ( inProgressExport . exportPath ) ;
217- await pipeline (
218- [ input . stream ( ) , this . docToEJSONStream ( this . getEJSONOptionsForFormat ( jsonExportFormat ) ) , outputStream ] ,
219- { signal : this . shutdownController . signal }
220- ) ;
221- pipeSuccessful = true ;
222- } catch ( error ) {
223- this . logger . error ( {
224- id : LogId . exportCreationError ,
225- context : `Error when generating JSON export for ${ inProgressExport . exportName } ` ,
226- message : error instanceof Error ? error . message : String ( error ) ,
227- } ) ;
228+ await this . withLock ( inProgressExport . exportName , "write" , false , async ( ) : Promise < void > => {
229+ try {
230+ await fs . mkdir ( this . exportsDirectoryPath , { recursive : true } ) ;
231+ const outputStream = createWriteStream ( inProgressExport . exportPath ) ;
232+ await pipeline (
233+ [
234+ input . stream ( ) ,
235+ this . docToEJSONStream ( this . getEJSONOptionsForFormat ( jsonExportFormat ) ) ,
236+ outputStream ,
237+ ] ,
238+ { signal : this . shutdownController . signal }
239+ ) ;
240+ pipeSuccessful = true ;
241+ } catch ( error ) {
242+ this . logger . error ( {
243+ id : LogId . exportCreationError ,
244+ context : `Error when generating JSON export for ${ inProgressExport . exportName } ` ,
245+ message : error instanceof Error ? error . message : String ( error ) ,
246+ } ) ;
228247
229- // If the pipeline errors out then we might end up with
230- // partial and incorrect export so we remove it entirely.
231- await this . silentlyRemoveExport (
232- inProgressExport . exportPath ,
233- LogId . exportCreationCleanupError ,
234- `Error when removing incomplete export ${ inProgressExport . exportName } `
235- ) ;
236- delete this . storedExports [ inProgressExport . exportName ] ;
237- } finally {
238- if ( pipeSuccessful ) {
239- this . storedExports [ inProgressExport . exportName ] = {
240- ...inProgressExport ,
241- exportCreatedAt : Date . now ( ) ,
242- exportStatus : "ready" ,
243- } ;
244- this . emit ( "export-available" , inProgressExport . exportURI ) ;
248+ // If the pipeline errors out then we might end up with
249+ // partial and incorrect export so we remove it entirely.
250+ await this . silentlyRemoveExport (
251+ inProgressExport . exportPath ,
252+ LogId . exportCreationCleanupError ,
253+ `Error when removing incomplete export ${ inProgressExport . exportName } `
254+ ) ;
255+ delete this . storedExports [ inProgressExport . exportName ] ;
256+ } finally {
257+ if ( pipeSuccessful ) {
258+ this . storedExports [ inProgressExport . exportName ] = {
259+ ...inProgressExport ,
260+ exportCreatedAt : Date . now ( ) ,
261+ exportStatus : "ready" ,
262+ } ;
263+ this . emit ( "export-available" , inProgressExport . exportURI ) ;
264+ }
265+ void input . close ( ) ;
245266 }
246- void input . close ( ) ;
247- }
267+ } ) ;
248268 }
249269
250270 private getEJSONOptionsForFormat ( format : JSONExportFormat ) : EJSONOptions | undefined {
@@ -293,24 +313,26 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
293313 }
294314
295315 this . exportsCleanupInProgress = true ;
296- const exportsForCleanup = Object . values ( { ...this . storedExports } ) . filter (
297- ( storedExport ) : storedExport is ReadyExport => storedExport . exportStatus === "ready"
298- ) ;
299316 try {
300- for ( const { exportPath, exportCreatedAt, exportURI, exportName } of exportsForCleanup ) {
301- if ( this . shutdownController . signal . aborted ) {
302- break ;
303- }
304- if ( isExportExpired ( exportCreatedAt , this . config . exportTimeoutMs ) ) {
305- delete this . storedExports [ exportName ] ;
306- await this . silentlyRemoveExport (
307- exportPath ,
308- LogId . exportCleanupError ,
309- `Considerable error when removing export ${ exportName } `
310- ) ;
311- this . emit ( "export-expired" , exportURI ) ;
312- }
313- }
317+ const exportsForCleanup = Object . values ( { ...this . storedExports } ) . filter (
318+ ( storedExport ) : storedExport is ReadyExport => storedExport . exportStatus === "ready"
319+ ) ;
320+
321+ await Promise . allSettled (
322+ exportsForCleanup . map ( async ( { exportPath, exportCreatedAt, exportURI, exportName } ) => {
323+ if ( isExportExpired ( exportCreatedAt , this . config . exportTimeoutMs ) ) {
324+ await this . withLock ( exportName , "write" , true , async ( ) : Promise < void > => {
325+ delete this . storedExports [ exportName ] ;
326+ await this . silentlyRemoveExport (
327+ exportPath ,
328+ LogId . exportCleanupError ,
329+ `Considerable error when removing export ${ exportName } `
330+ ) ;
331+ this . emit ( "export-expired" , exportURI ) ;
332+ } ) ;
333+ }
334+ } )
335+ ) ;
314336 } catch ( error ) {
315337 this . logger . error ( {
316338 id : LogId . exportCleanupError ,
@@ -345,6 +367,33 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
345367 }
346368 }
347369
370+ private async withLock < T > (
371+ exportName : string ,
372+ mode : "read" | "write" ,
373+ finalize : boolean ,
374+ fn : ( ) => T | Promise < T >
375+ ) : Promise < T > {
376+ let lock = this . exportLocks . get ( exportName ) ;
377+ if ( ! lock ) {
378+ lock = new RWLock ( ) ;
379+ this . exportLocks . set ( exportName , lock ) ;
380+ }
381+
382+ try {
383+ if ( mode === "read" ) {
384+ await lock . readLock ( this . readTimeoutMs ) ;
385+ } else {
386+ await lock . writeLock ( this . writeTimeoutMs ) ;
387+ }
388+ return await fn ( ) ;
389+ } finally {
390+ lock . unlock ( ) ;
391+ if ( finalize ) {
392+ this . exportLocks . delete ( exportName ) ;
393+ }
394+ }
395+ }
396+
348397 private async trackOperation < T > ( promise : Promise < T > ) : Promise < T > {
349398 this . activeOperations . add ( promise ) ;
350399 try {
0 commit comments