@@ -70,17 +70,18 @@ export type ExportsManagerConfig = Pick<UserConfig, "exportsPath" | "exportTimeo
7070} ;
7171
7272type ExportsManagerEvents = {
73+ closed : [ ] ;
7374 "export-expired" : [ string ] ;
7475 "export-available" : [ string ] ;
7576} ;
7677
78+ class OperationAbortedError extends Error { }
79+
7780export class ExportsManager extends EventEmitter < ExportsManagerEvents > {
7881 private storedExports : Record < StoredExport [ "exportName" ] , StoredExport > = { } ;
7982 private exportsCleanupInProgress : boolean = false ;
8083 private exportsCleanupInterval ?: NodeJS . Timeout ;
8184 private readonly shutdownController : AbortController = new AbortController ( ) ;
82- private readonly activeOperations : Set < Promise < unknown > > = new Set ( ) ;
83- private readonly activeOpsDrainTimeoutMs : number ;
8485 private readonly readTimeoutMs : number ;
8586 private readonly writeTimeoutMs : number ;
8687 private readonly exportLocks : Map < string , RWLock > = new Map ( ) ;
@@ -91,7 +92,6 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
9192 private readonly logger : LoggerBase
9293 ) {
9394 super ( ) ;
94- this . activeOpsDrainTimeoutMs = this . config . activeOpsDrainTimeoutMs ?? 10_000 ;
9595 this . readTimeoutMs = this . config . readTimeout ?? 30_0000 ; // 30 seconds is the default timeout for an MCP request
9696 this . writeTimeoutMs = this . config . writeTimeout ?? 120_000 ; // considering that writes can take time
9797 }
@@ -116,7 +116,7 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
116116 protected init ( ) : void {
117117 if ( ! this . exportsCleanupInterval ) {
118118 this . exportsCleanupInterval = setInterval (
119- ( ) => void this . trackOperation ( this . cleanupExpiredExports ( ) ) ,
119+ ( ) => void this . cleanupExpiredExports ( ) ,
120120 this . config . exportCleanupIntervalMs
121121 ) ;
122122 }
@@ -128,8 +128,8 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
128128 try {
129129 clearInterval ( this . exportsCleanupInterval ) ;
130130 this . shutdownController . abort ( ) ;
131- await this . waitForActiveOperationsToSettle ( this . activeOpsDrainTimeoutMs ) ;
132131 await fs . rm ( this . exportsDirectoryPath , { force : true , recursive : true } ) ;
132+ this . emit ( "closed" ) ;
133133 } catch ( error ) {
134134 this . logger . error ( {
135135 id : LogId . exportCloseError ,
@@ -143,33 +143,35 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
143143 try {
144144 this . assertIsNotShuttingDown ( ) ;
145145 exportName = decodeURIComponent ( exportName ) ;
146- return await this . withLock ( exportName , "read" , false , async ( ) : Promise < string > => {
147- const exportHandle = this . storedExports [ exportName ] ;
148- if ( ! exportHandle ) {
149- throw new Error ( "Requested export has either expired or does not exist!" ) ;
150- }
146+ return await this . withLock (
147+ {
148+ exportName,
149+ mode : "read" ,
150+ callbackName : "readExport" ,
151+ } ,
152+ async ( ) : Promise < string > => {
153+ const exportHandle = this . storedExports [ exportName ] ;
154+ if ( ! exportHandle ) {
155+ throw new Error ( "Requested export has either expired or does not exist!" ) ;
156+ }
151157
152- // This won't happen anymore because of lock synchronization but
153- // keeping it here to make TS happy.
154- if ( exportHandle . exportStatus === "in-progress" ) {
155- throw new Error ( "Requested export is still being generated!" ) ;
156- }
158+ // This won't happen because of lock synchronization but
159+ // keeping it here to make TS happy.
160+ if ( exportHandle . exportStatus === "in-progress" ) {
161+ throw new Error ( "Requested export is still being generated!" ) ;
162+ }
157163
158- const { exportPath } = exportHandle ;
164+ const { exportPath } = exportHandle ;
159165
160- return await this . trackOperation (
161- fs . readFile ( exportPath , { encoding : "utf8" , signal : this . shutdownController . signal } )
162- ) ;
163- } ) ;
166+ return fs . readFile ( exportPath , { encoding : "utf8" , signal : this . shutdownController . signal } ) ;
167+ }
168+ ) ;
164169 } catch ( error ) {
165170 this . logger . error ( {
166171 id : LogId . exportReadError ,
167172 context : `Error when reading export - ${ exportName } ` ,
168173 message : error instanceof Error ? error . message : String ( error ) ,
169174 } ) ;
170- if ( ( error as NodeJS . ErrnoException ) . code === "ENOENT" ) {
171- throw new Error ( "Requested export does not exist!" ) ;
172- }
173175 throw error ;
174176 }
175177 }
@@ -188,23 +190,32 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
188190 try {
189191 this . assertIsNotShuttingDown ( ) ;
190192 const exportNameWithExtension = validateExportName ( ensureExtension ( exportName , "json" ) ) ;
191- return await this . withLock ( exportNameWithExtension , "write" , false , ( ) : AvailableExport => {
192- if ( this . storedExports [ exportNameWithExtension ] ) {
193- throw new Error ( "Export with same name is either already available or being generated." ) ;
194- }
195- const exportURI = `exported-data://${ encodeURIComponent ( exportNameWithExtension ) } ` ;
196- const exportFilePath = path . join ( this . exportsDirectoryPath , exportNameWithExtension ) ;
197- const inProgressExport : InProgressExport = ( this . storedExports [ exportNameWithExtension ] = {
193+ return await this . withLock (
194+ {
198195 exportName : exportNameWithExtension ,
199- exportTitle,
200- exportPath : exportFilePath ,
201- exportURI : exportURI ,
202- exportStatus : "in-progress" ,
203- } ) ;
204-
205- void this . trackOperation ( this . startExport ( { input, jsonExportFormat, inProgressExport } ) ) ;
206- return inProgressExport ;
207- } ) ;
196+ mode : "write" ,
197+ callbackName : "createJSONExport" ,
198+ } ,
199+ ( ) : Promise < AvailableExport > => {
200+ if ( this . storedExports [ exportNameWithExtension ] ) {
201+ return Promise . reject (
202+ new Error ( "Export with same name is either already available or being generated." )
203+ ) ;
204+ }
205+ const exportURI = `exported-data://${ encodeURIComponent ( exportNameWithExtension ) } ` ;
206+ const exportFilePath = path . join ( this . exportsDirectoryPath , exportNameWithExtension ) ;
207+ const inProgressExport : InProgressExport = ( this . storedExports [ exportNameWithExtension ] = {
208+ exportName : exportNameWithExtension ,
209+ exportTitle,
210+ exportPath : exportFilePath ,
211+ exportURI : exportURI ,
212+ exportStatus : "in-progress" ,
213+ } ) ;
214+
215+ void this . startExport ( { input, jsonExportFormat, inProgressExport } ) ;
216+ return Promise . resolve ( inProgressExport ) ;
217+ }
218+ ) ;
208219 } catch ( error ) {
209220 this . logger . error ( {
210221 id : LogId . exportCreationError ,
@@ -224,47 +235,57 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
224235 jsonExportFormat : JSONExportFormat ;
225236 inProgressExport : InProgressExport ;
226237 } ) : Promise < void > {
227- let pipeSuccessful = false ;
228- await this . withLock ( inProgressExport . exportName , "write" , false , async ( ) : Promise < void > => {
229- try {
230- await fs . mkdir ( this . exportsDirectoryPath , { recursive : true } ) ;
231- const outputStream = createWriteStream ( inProgressExport . exportPath ) ;
232- await pipeline (
233- [
234- input . stream ( ) ,
235- this . docToEJSONStream ( this . getEJSONOptionsForFormat ( jsonExportFormat ) ) ,
236- outputStream ,
237- ] ,
238- { signal : this . shutdownController . signal }
239- ) ;
240- pipeSuccessful = true ;
241- } catch ( error ) {
242- this . logger . error ( {
243- id : LogId . exportCreationError ,
244- context : `Error when generating JSON export for ${ inProgressExport . exportName } ` ,
245- message : error instanceof Error ? error . message : String ( error ) ,
246- } ) ;
247-
248- // If the pipeline errors out then we might end up with
249- // partial and incorrect export so we remove it entirely.
250- await this . silentlyRemoveExport (
251- inProgressExport . exportPath ,
252- LogId . exportCreationCleanupError ,
253- `Error when removing incomplete export ${ inProgressExport . exportName } `
254- ) ;
255- delete this . storedExports [ inProgressExport . exportName ] ;
256- } finally {
257- if ( pipeSuccessful ) {
258- this . storedExports [ inProgressExport . exportName ] = {
259- ...inProgressExport ,
260- exportCreatedAt : Date . now ( ) ,
261- exportStatus : "ready" ,
262- } ;
263- this . emit ( "export-available" , inProgressExport . exportURI ) ;
238+ try {
239+ await this . withLock (
240+ {
241+ exportName : inProgressExport . exportName ,
242+ mode : "write" ,
243+ callbackName : "startExport" ,
244+ } ,
245+ async ( ) : Promise < void > => {
246+ let pipeSuccessful = false ;
247+ try {
248+ await fs . mkdir ( this . exportsDirectoryPath , { recursive : true } ) ;
249+ const outputStream = createWriteStream ( inProgressExport . exportPath ) ;
250+ await pipeline (
251+ [
252+ input . stream ( ) ,
253+ this . docToEJSONStream ( this . getEJSONOptionsForFormat ( jsonExportFormat ) ) ,
254+ outputStream ,
255+ ] ,
256+ { signal : this . shutdownController . signal }
257+ ) ;
258+ pipeSuccessful = true ;
259+ } catch ( error ) {
260+ // If the pipeline errors out then we might end up with
261+ // partial and incorrect export so we remove it entirely.
262+ await this . silentlyRemoveExport (
263+ inProgressExport . exportPath ,
264+ LogId . exportCreationCleanupError ,
265+ `Error when removing incomplete export ${ inProgressExport . exportName } `
266+ ) ;
267+ delete this . storedExports [ inProgressExport . exportName ] ;
268+ throw error ;
269+ } finally {
270+ if ( pipeSuccessful ) {
271+ this . storedExports [ inProgressExport . exportName ] = {
272+ ...inProgressExport ,
273+ exportCreatedAt : Date . now ( ) ,
274+ exportStatus : "ready" ,
275+ } ;
276+ this . emit ( "export-available" , inProgressExport . exportURI ) ;
277+ }
278+ void input . close ( ) ;
279+ }
264280 }
265- void input . close ( ) ;
266- }
267- } ) ;
281+ ) ;
282+ } catch ( error ) {
283+ this . logger . error ( {
284+ id : LogId . exportCreationError ,
285+ context : `Error when generating JSON export for ${ inProgressExport . exportName } ` ,
286+ message : error instanceof Error ? error . message : String ( error ) ,
287+ } ) ;
288+ }
268289 }
269290
270291 private getEJSONOptionsForFormat ( format : JSONExportFormat ) : EJSONOptions | undefined {
@@ -321,15 +342,23 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
321342 await Promise . allSettled (
322343 exportsForCleanup . map ( async ( { exportPath, exportCreatedAt, exportURI, exportName } ) => {
323344 if ( isExportExpired ( exportCreatedAt , this . config . exportTimeoutMs ) ) {
324- await this . withLock ( exportName , "write" , true , async ( ) : Promise < void > => {
325- delete this . storedExports [ exportName ] ;
326- await this . silentlyRemoveExport (
327- exportPath ,
328- LogId . exportCleanupError ,
329- `Considerable error when removing export ${ exportName } `
330- ) ;
331- this . emit ( "export-expired" , exportURI ) ;
332- } ) ;
345+ await this . withLock (
346+ {
347+ exportName,
348+ mode : "write" ,
349+ finalize : true ,
350+ callbackName : "cleanupExpiredExport" ,
351+ } ,
352+ async ( ) : Promise < void > => {
353+ delete this . storedExports [ exportName ] ;
354+ await this . silentlyRemoveExport (
355+ exportPath ,
356+ LogId . exportCleanupError ,
357+ `Considerable error when removing export ${ exportName } `
358+ ) ;
359+ this . emit ( "export-expired" , exportURI ) ;
360+ }
361+ ) ;
333362 }
334363 } )
335364 ) ;
@@ -367,62 +396,67 @@ export class ExportsManager extends EventEmitter<ExportsManagerEvents> {
367396 }
368397 }
369398
370- private async withLock < T > (
371- exportName : string ,
372- mode : "read" | "write" ,
373- finalize : boolean ,
374- fn : ( ) => T | Promise < T >
375- ) : Promise < T > {
399+ private async withLock < CallbackResult extends Promise < unknown > > (
400+ lockConfig : {
401+ exportName : string ;
402+ mode : "read" | "write" ;
403+ finalize ?: boolean ;
404+ callbackName ?: string ;
405+ } ,
406+ callback : ( ) => CallbackResult
407+ ) : Promise < Awaited < CallbackResult > > {
408+ const { exportName, mode, finalize = false , callbackName } = lockConfig ;
409+ const operationName = callbackName ? `${ callbackName } - ${ exportName } ` : exportName ;
376410 let lock = this . exportLocks . get ( exportName ) ;
377411 if ( ! lock ) {
378412 lock = new RWLock ( ) ;
379413 this . exportLocks . set ( exportName , lock ) ;
380414 }
381415
382- try {
416+ let lockAcquired : boolean = false ;
417+ const acquireLock = async ( ) : Promise < void > => {
383418 if ( mode === "read" ) {
384419 await lock . readLock ( this . readTimeoutMs ) ;
385420 } else {
386421 await lock . writeLock ( this . writeTimeoutMs ) ;
387422 }
388- return await fn ( ) ;
423+ lockAcquired = true ;
424+ } ;
425+
426+ try {
427+ await Promise . race ( [
428+ this . operationAbortedPromise ( `Acquire ${ mode } lock for ${ operationName } ` ) ,
429+ acquireLock ( ) ,
430+ ] ) ;
431+ return await Promise . race ( [ this . operationAbortedPromise ( operationName ) , callback ( ) ] ) ;
389432 } finally {
390- lock . unlock ( ) ;
433+ if ( lockAcquired ) {
434+ lock . unlock ( ) ;
435+ }
391436 if ( finalize ) {
392437 this . exportLocks . delete ( exportName ) ;
393438 }
394439 }
395440 }
396441
397- private async trackOperation < T > ( promise : Promise < T > ) : Promise < T > {
398- this . activeOperations . add ( promise ) ;
399- try {
400- return await promise ;
401- } finally {
402- this . activeOperations . delete ( promise ) ;
403- }
404- }
442+ private operationAbortedPromise ( operationName ?: string ) : Promise < never > {
443+ return new Promise ( ( _ , reject ) => {
444+ const rejectIfAborted = ( ) : void => {
445+ if ( this . shutdownController . signal . aborted ) {
446+ // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
447+ const abortReason = this . shutdownController . signal . reason ;
448+ const abortMessage =
449+ typeof abortReason === "string"
450+ ? abortReason
451+ : `${ operationName ?? "Operation" } aborted - ExportsManager shutting down!` ;
452+ reject ( new OperationAbortedError ( abortMessage ) ) ;
453+ this . shutdownController . signal . removeEventListener ( "abort" , rejectIfAborted ) ;
454+ }
455+ } ;
405456
406- private async waitForActiveOperationsToSettle ( timeoutMs : number ) : Promise < void > {
407- const pendingPromises = Array . from ( this . activeOperations ) ;
408- if ( pendingPromises . length === 0 ) {
409- return ;
410- }
411- let timedOut = false ;
412- const timeoutPromise = new Promise < void > ( ( resolve ) =>
413- setTimeout ( ( ) => {
414- timedOut = true ;
415- resolve ( ) ;
416- } , timeoutMs )
417- ) ;
418- await Promise . race ( [ Promise . allSettled ( pendingPromises ) , timeoutPromise ] ) ;
419- if ( timedOut && this . activeOperations . size > 0 ) {
420- this . logger . error ( {
421- id : LogId . exportCloseError ,
422- context : `Close timed out waiting for ${ this . activeOperations . size } operation(s) to settle` ,
423- message : "Proceeding to force cleanup after timeout" ,
424- } ) ;
425- }
457+ rejectIfAborted ( ) ;
458+ this . shutdownController . signal . addEventListener ( "abort" , rejectIfAborted ) ;
459+ } ) ;
426460 }
427461
428462 static init (
0 commit comments