@@ -27,6 +27,7 @@ import * as mock from '../../tools/mongodb-mock/index';
2727import { TestBuilder , UnifiedTestSuiteBuilder } from '../../tools/unified_suite_builder' ;
2828import { type FailCommandFailPoint , sleep } from '../../tools/utils' ;
2929import { delay , filterForCommands } from '../shared' ;
30+ import { UUID } from 'bson' ;
3031
3132const initIteratorMode = async ( cs : ChangeStream ) => {
3233 const initEvent = once ( cs . cursor , 'init' ) ;
@@ -1824,6 +1825,7 @@ describe.only('ChangeStream resumability', function () {
18241825 let collection : Collection ;
18251826 let changeStream : ChangeStream ;
18261827 let aggregateEvents : CommandStartedEvent [ ] = [ ] ;
1828+ let appName : string ;
18271829
18281830 const changeStreamResumeOptions : ChangeStreamOptions = {
18291831 fullDocument : 'updateLookup' ,
@@ -1882,7 +1884,15 @@ describe.only('ChangeStream resumability', function () {
18821884 await utilClient . db ( dbName ) . createCollection ( collectionName ) ;
18831885 await utilClient . close ( ) ;
18841886
1885- client = this . configuration . newClient ( { monitorCommands : true } ) ;
1887+ // we are going to switch primary in tests and cleanup of failpoints is difficult,
1888+ // so generating unique appname instead of cleaning for each test is an easier solution
1889+ appName = new UUID ( ) . toString ( ) ;
1890+
1891+ client = this . configuration . newClient ( {
1892+ monitorCommands : true ,
1893+ serverSelectionTimeoutMS : 5_000 ,
1894+ appName : appName
1895+ } ) ;
18861896 client . on ( 'commandStarted' , filterForCommands ( [ 'aggregate' ] , aggregateEvents ) ) ;
18871897 collection = client . db ( dbName ) . collection ( collectionName ) ;
18881898 } ) ;
@@ -2047,61 +2057,42 @@ describe.only('ChangeStream resumability', function () {
20472057 } ) ;
20482058 } ) ;
20492059
2050- context . only ( 'when the error is not a server error' , function ( ) {
2051- let client1 : MongoClient ;
2052- let client2 : MongoClient ;
2053-
2054- beforeEach ( async function ( ) {
2055- client1 = this . configuration . newClient (
2056- { } ,
2057- { serverSelectionTimeoutMS : 1000 , appName : 'client-errors' }
2058- ) ;
2059- client2 = this . configuration . newClient ( ) ;
2060-
2061- collection = client1 . db ( 'client-errors' ) . collection ( 'test' ) ;
2062- } ) ;
2063-
2064- afterEach ( async function ( ) {
2065- await client2 . db ( 'admin' ) . command ( {
2066- configureFailPoint : 'failCommand' ,
2067- mode : 'off' ,
2068- data : { appName : 'client-errors' }
2069- } as FailCommandFailPoint ) ;
2070-
2071- await client1 ?. close ( ) ;
2072- await client2 ?. close ( ) ;
2073- } ) ;
2074-
2060+ context ( 'when the error is not a server error' , function ( ) {
2061+ // This test requires a replica set to call replSetFreeze command
20752062 it (
20762063 'should resume on ServerSelectionError' ,
2077- { requires : { topology : '!single' } } ,
2064+ { requires : { topology : [ 'replicaset' ] } } ,
20782065 async function ( ) {
20792066 changeStream = collection . watch ( [ ] ) ;
20802067 await initIteratorMode ( changeStream ) ;
20812068
20822069 await collection . insertOne ( { a : 1 } ) ;
20832070
2084- await client2 . db ( 'admin' ) . command ( {
2071+ // mimic the node termination by closing the connection and failing on heartbeat
2072+ await client . db ( 'admin' ) . command ( {
20852073 configureFailPoint : 'failCommand' ,
20862074 mode : 'alwaysOn' ,
20872075 data : {
20882076 failCommands : [ 'ping' , 'hello' , LEGACY_HELLO_COMMAND ] ,
20892077 closeConnection : true ,
20902078 handshakeCommands : true ,
20912079 failInternalCommands : true ,
2092- appName : 'client-errors'
2080+ appName : appName
20932081 }
20942082 } as FailCommandFailPoint ) ;
2095- await client2
2096- . db ( 'admin' )
2097- . command ( { replSetFreeze : 0 } , { readPreference : ReadPreference . secondary } ) ;
2098- await client2
2083+ // force new election in the cluster
2084+ await client
20992085 . db ( 'admin' )
2100- . command ( { replSetStepDown : 15 , secondaryCatchUpPeriodSecs : 10 , force : true } ) ;
2101- // await sleep(15_000);
2086+ . command ( { replSetFreeze : 0 } , { readPreference : ReadPreference . SECONDARY } ) ;
2087+ await client . db ( 'admin' ) . command ( { replSetStepDown : 30 , force : true } ) ;
2088+ await sleep ( 1500 ) ;
21022089
21032090 const change = await changeStream . next ( ) ;
21042091 expect ( change ) . to . containSubset ( { operationType : 'insert' , fullDocument : { a : 1 } } ) ;
2092+
2093+ expect ( aggregateEvents ) . to . have . lengthOf ( 2 ) ;
2094+ const [ e1 , e2 ] = aggregateEvents ;
2095+ expect ( e1 . address ) . to . not . equal ( e2 . address ) ;
21052096 }
21062097 ) ;
21072098 } ) ;
@@ -2418,6 +2409,46 @@ describe.only('ChangeStream resumability', function () {
24182409 expect ( changeStream . closed ) . to . be . true ;
24192410 } ) ;
24202411 } ) ;
2412+
2413+ context ( 'when the error is not a server error' , function ( ) {
2414+ // This test requires a replica set to call replSetFreeze command
2415+ it (
2416+ 'should resume on ServerSelectionError' ,
2417+ { requires : { topology : [ 'replicaset' ] } } ,
2418+ async function ( ) {
2419+ changeStream = collection . watch ( [ ] ) ;
2420+ await initIteratorMode ( changeStream ) ;
2421+
2422+ await collection . insertOne ( { a : 1 } ) ;
2423+
2424+ // mimic the node termination by closing the connection and failing on heartbeat
2425+ await client . db ( 'admin' ) . command ( {
2426+ configureFailPoint : 'failCommand' ,
2427+ mode : 'alwaysOn' ,
2428+ data : {
2429+ failCommands : [ 'ping' , 'hello' , LEGACY_HELLO_COMMAND ] ,
2430+ closeConnection : true ,
2431+ handshakeCommands : true ,
2432+ failInternalCommands : true ,
2433+ appName : appName
2434+ }
2435+ } as FailCommandFailPoint ) ;
2436+ // force new election in the cluster
2437+ await client
2438+ . db ( 'admin' )
2439+ . command ( { replSetFreeze : 0 } , { readPreference : ReadPreference . SECONDARY } ) ;
2440+ await client . db ( 'admin' ) . command ( { replSetStepDown : 30 , force : true } ) ;
2441+ await sleep ( 1500 ) ;
2442+
2443+ const change = await changeStream . tryNext ( ) ;
2444+ expect ( change ) . to . containSubset ( { operationType : 'insert' , fullDocument : { a : 1 } } ) ;
2445+
2446+ expect ( aggregateEvents ) . to . have . lengthOf ( 2 ) ;
2447+ const [ e1 , e2 ] = aggregateEvents ;
2448+ expect ( e1 . address ) . to . not . equal ( e2 . address ) ;
2449+ }
2450+ ) ;
2451+ } ) ;
24212452 } ) ;
24222453
24232454 context ( '#asyncIterator' , function ( ) {
@@ -2554,6 +2585,50 @@ describe.only('ChangeStream resumability', function () {
25542585 }
25552586 } ) ;
25562587 } ) ;
2588+
2589+ context ( 'when the error is not a server error' , function ( ) {
2590+ // This test requires a replica set to call replSetFreeze command
2591+ it (
2592+ 'should resume on ServerSelectionError' ,
2593+ { requires : { topology : [ 'replicaset' ] } } ,
2594+ async function ( ) {
2595+ changeStream = collection . watch ( [ ] ) ;
2596+ await initIteratorMode ( changeStream ) ;
2597+ const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
2598+
2599+ await collection . insertOne ( { a : 1 } ) ;
2600+
2601+ // mimic the node termination by closing the connection and failing on heartbeat
2602+ await client . db ( 'admin' ) . command ( {
2603+ configureFailPoint : 'failCommand' ,
2604+ mode : 'alwaysOn' ,
2605+ data : {
2606+ failCommands : [ 'ping' , 'hello' , LEGACY_HELLO_COMMAND ] ,
2607+ closeConnection : true ,
2608+ handshakeCommands : true ,
2609+ failInternalCommands : true ,
2610+ appName : appName
2611+ }
2612+ } as FailCommandFailPoint ) ;
2613+ // force new election in the cluster
2614+ await client
2615+ . db ( 'admin' )
2616+ . command ( { replSetFreeze : 0 } , { readPreference : ReadPreference . SECONDARY } ) ;
2617+ await client . db ( 'admin' ) . command ( { replSetStepDown : 30 , force : true } ) ;
2618+ await sleep ( 1500 ) ;
2619+
2620+ const change = await changeStreamIterator . next ( ) ;
2621+ expect ( change . value ) . to . containSubset ( {
2622+ operationType : 'insert' ,
2623+ fullDocument : { a : 1 }
2624+ } ) ;
2625+
2626+ expect ( aggregateEvents ) . to . have . lengthOf ( 2 ) ;
2627+ const [ e1 , e2 ] = aggregateEvents ;
2628+ expect ( e1 . address ) . to . not . equal ( e2 . address ) ;
2629+ }
2630+ ) ;
2631+ } ) ;
25572632 } ) ;
25582633 } ) ;
25592634
0 commit comments