@@ -2061,6 +2061,34 @@ describe('ChangeStream resumability', function () {
20612061 }
20622062 ) ;
20632063 } ) ;
2064+
2065+ context ( 'when the error occurs on the aggregate command' , function ( ) {
2066+ it (
2067+ 'does not resume' ,
2068+ { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
2069+ async function ( ) {
2070+ const resumableErrorCode = 7 ; // Host not found
2071+ await client . db ( 'admin' ) . command ( {
2072+ configureFailPoint : 'failCommand' ,
2073+ mode : { times : 2 } , // Fail twice to account for retry attempt in executeOperation which is separate from the change stream's resume attempt
2074+ data : {
2075+ failCommands : [ 'aggregate' ] ,
2076+ errorCode : resumableErrorCode
2077+ }
2078+ } as FailPoint ) ;
2079+
2080+ changeStream = collection . watch ( [ ] ) ;
2081+
2082+ await collection . insertOne ( { name : 'bailey' } ) ;
2083+
2084+ const maybeError = await changeStream . next ( ) . catch ( e => e ) ;
2085+
2086+ expect ( maybeError ) . to . be . instanceof ( MongoServerError ) ;
2087+ expect ( aggregateEvents ) . to . have . lengthOf ( 2 ) ;
2088+ expect ( changeStream . closed ) . to . be . true ;
2089+ }
2090+ ) ;
2091+ } ) ;
20642092 } ) ;
20652093
20662094 context ( '#hasNext' , function ( ) {
@@ -2225,6 +2253,34 @@ describe('ChangeStream resumability', function () {
22252253 }
22262254 ) ;
22272255 } ) ;
2256+
2257+ context ( 'when the error occurs on the aggregate command' , function ( ) {
2258+ it (
2259+ 'does not resume' ,
2260+ { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
2261+ async function ( ) {
2262+ const resumableErrorCode = 7 ; // Host not found
2263+ await client . db ( 'admin' ) . command ( {
2264+ configureFailPoint : 'failCommand' ,
2265+ mode : { times : 2 } , // Fail twice to account for retry attempt in executeOperation which is separate from the change stream's resume attempt
2266+ data : {
2267+ failCommands : [ 'aggregate' ] ,
2268+ errorCode : resumableErrorCode
2269+ }
2270+ } as FailPoint ) ;
2271+
2272+ changeStream = collection . watch ( [ ] ) ;
2273+
2274+ await collection . insertOne ( { name : 'bailey' } ) ;
2275+
2276+ const maybeError = await changeStream . hasNext ( ) . catch ( e => e ) ;
2277+
2278+ expect ( maybeError ) . to . be . instanceof ( MongoServerError ) ;
2279+ expect ( aggregateEvents ) . to . have . lengthOf ( 2 ) ;
2280+ expect ( changeStream . closed ) . to . be . true ;
2281+ }
2282+ ) ;
2283+ } ) ;
22282284 } ) ;
22292285
22302286 context ( '#tryNext' , function ( ) {
@@ -2401,6 +2457,34 @@ describe('ChangeStream resumability', function () {
24012457 }
24022458 ) ;
24032459 } ) ;
2460+
2461+ context ( 'when the error occurs on the aggregate command' , function ( ) {
2462+ it (
2463+ 'does not resume' ,
2464+ { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
2465+ async function ( ) {
2466+ const resumableErrorCode = 7 ; // Host not found
2467+ await client . db ( 'admin' ) . command ( {
2468+ configureFailPoint : 'failCommand' ,
2469+ mode : { times : 2 } , // Fail twice to account for retry attempt in executeOperation which is separate from the change stream's resume attempt
2470+ data : {
2471+ failCommands : [ 'aggregate' ] ,
2472+ errorCode : resumableErrorCode
2473+ }
2474+ } as FailPoint ) ;
2475+
2476+ changeStream = collection . watch ( [ ] ) ;
2477+
2478+ await collection . insertOne ( { name : 'bailey' } ) ;
2479+
2480+ const maybeError = await changeStream . tryNext ( ) . catch ( e => e ) ;
2481+
2482+ expect ( maybeError ) . to . be . instanceof ( MongoServerError ) ;
2483+ expect ( aggregateEvents ) . to . have . lengthOf ( 2 ) ;
2484+ expect ( changeStream . closed ) . to . be . true ;
2485+ }
2486+ ) ;
2487+ } ) ;
24042488 } ) ;
24052489
24062490 context ( '#asyncIterator' , function ( ) {
@@ -2551,6 +2635,41 @@ describe('ChangeStream resumability', function () {
25512635 }
25522636 ) ;
25532637 } ) ;
2638+
2639+ context ( 'when the error occurs on the aggregate command' , function ( ) {
2640+ it (
2641+ 'does not resume' ,
2642+ { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
2643+ async function ( ) {
2644+ changeStream = collection . watch ( [ ] ) ;
2645+
2646+ const docs = [ { city : 'New York City' } , { city : 'Seattle' } , { city : 'Boston' } ] ;
2647+ await collection . insertMany ( docs ) ;
2648+
2649+ const resumableErrorCode = 7 ;
2650+ await client . db ( 'admin' ) . command ( {
2651+ configureFailPoint : 'failCommand' ,
2652+ mode : { times : 2 } , // Account for retry in executeOperation which is separate from change stream's resume
2653+ data : {
2654+ failCommands : [ 'aggregate' ] ,
2655+ errorCode : resumableErrorCode
2656+ }
2657+ } as FailPoint ) ;
2658+
2659+ try {
2660+ // eslint-disable-next-line @typescript-eslint/no-unused-vars
2661+ for await ( const change of changeStream ) {
2662+ expect . fail ( 'Change stream produced events on an unresumable error' ) ;
2663+ }
2664+ expect . fail ( 'Change stream did not iterate and did not throw an error' ) ;
2665+ } catch ( error ) {
2666+ expect ( error ) . to . be . instanceOf ( MongoServerError ) ;
2667+ expect ( aggregateEvents ) . to . have . lengthOf ( 2 ) ;
2668+ expect ( changeStream . closed ) . to . be . true ;
2669+ }
2670+ }
2671+ ) ;
2672+ } ) ;
25542673 } ) ;
25552674 } ) ;
25562675
@@ -2721,6 +2840,35 @@ describe('ChangeStream resumability', function () {
27212840 }
27222841 ) ;
27232842 } ) ;
2843+
2844+ context ( 'when the error occurred on the aggregate' , function ( ) {
2845+ it (
2846+ 'does not resume' ,
2847+ { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
2848+ async function ( ) {
2849+ changeStream = collection . watch ( [ ] ) ;
2850+
2851+ const resumableErrorCode = 7 ;
2852+ await client . db ( 'admin' ) . command ( {
2853+ configureFailPoint : 'failCommand' ,
2854+ mode : { times : 2 } , // account for retry attempt in executeOperation which is separate from change stream's retry
2855+ data : {
2856+ failCommands : [ 'aggregate' ] ,
2857+ errorCode : resumableErrorCode
2858+ }
2859+ } as FailPoint ) ;
2860+
2861+ const willBeError = once ( changeStream , 'change' ) . catch ( error => error ) ;
2862+ await collection . insertOne ( { name : 'bailey' } ) ;
2863+
2864+ const error = await willBeError ;
2865+
2866+ expect ( error ) . to . be . instanceOf ( MongoServerError ) ;
2867+ expect ( aggregateEvents ) . to . have . lengthOf ( 2 ) ;
2868+ expect ( changeStream . closed ) . to . be . true ;
2869+ }
2870+ ) ;
2871+ } ) ;
27242872 } ) ;
27252873
27262874 it (
0 commit comments