@@ -76,27 +76,29 @@ export class CsvParserStream<I extends Row, O extends Row> extends Transform {
7676 if ( this . hasHitRowLimit ) {
7777 return done ( ) ;
7878 }
79+ const wrappedCallback = CsvParserStream . wrapDoneCallback ( done ) ;
7980 try {
8081 const { lines } = this ;
8182 const newLine = lines + this . decoder . write ( data ) ;
8283 const rows = this . parse ( newLine , true ) ;
83- return this . processRows ( rows , done ) ;
84+ return this . processRows ( rows , wrappedCallback ) ;
8485 } catch ( e ) {
85- return done ( e ) ;
86+ return wrappedCallback ( e ) ;
8687 }
8788 }
8889
8990 public _flush ( done : TransformCallback ) : void {
91+ const wrappedCallback = CsvParserStream . wrapDoneCallback ( done ) ;
9092 // if we have hit our maxRows parsing limit then skip parsing
9193 if ( this . hasHitRowLimit ) {
92- return done ( ) ;
94+ return wrappedCallback ( ) ;
9395 }
9496 try {
9597 const newLine = this . lines + this . decoder . end ( ) ;
9698 const rows = this . parse ( newLine , false ) ;
97- return this . processRows ( rows , done ) ;
99+ return this . processRows ( rows , wrappedCallback ) ;
98100 } catch ( e ) {
99- return done ( e ) ;
101+ return wrappedCallback ( e ) ;
100102 }
101103 }
102104
@@ -214,4 +216,20 @@ export class CsvParserStream<I extends Row, O extends Row> extends Transform {
214216 cb ( e ) ;
215217 }
216218 }
219+
220+ private static wrapDoneCallback ( done : TransformCallback ) : TransformCallback {
221+ let errorCalled = false ;
222+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
223+ return ( err : Error | null | undefined , ...args : any [ ] ) : void => {
224+ if ( err ) {
225+ if ( errorCalled ) {
226+ throw err ;
227+ }
228+ errorCalled = true ;
229+ done ( err ) ;
230+ return ;
231+ }
232+ done ( ...args ) ;
233+ } ;
234+ }
217235}
0 commit comments