1- import { promisify } from 'util' ;
2-
31import { type BSONSerializeOptions , type Document , EJSON , resolveBSONOptions } from '../bson' ;
42import type { Collection } from '../collection' ;
53import {
64 type AnyError ,
75 MongoBatchReExecutionError ,
86 MONGODB_ERROR_CODES ,
97 MongoInvalidArgumentError ,
8+ MongoRuntimeError ,
109 MongoServerError ,
1110 MongoWriteConcernError
1211} from '../error' ;
@@ -22,7 +21,6 @@ import type { Topology } from '../sdam/topology';
2221import type { ClientSession } from '../sessions' ;
2322import {
2423 applyRetryableWrites ,
25- type Callback ,
2624 getTopology ,
2725 hasAtomicOperators ,
2826 maybeAddIdToDocuments ,
@@ -500,86 +498,46 @@ export function mergeBatchResults(
500498 }
501499}
502500
503- function executeCommands (
501+ async function executeCommands (
504502 bulkOperation : BulkOperationBase ,
505- options : BulkWriteOptions ,
506- callback : Callback < BulkWriteResult >
507- ) {
503+ options : BulkWriteOptions
504+ ) : Promise < BulkWriteResult > {
508505 if ( bulkOperation . s . batches . length === 0 ) {
509- return callback (
510- undefined ,
511- new BulkWriteResult ( bulkOperation . s . bulkResult , bulkOperation . isOrdered )
512- ) ;
506+ return new BulkWriteResult ( bulkOperation . s . bulkResult , bulkOperation . isOrdered ) ;
513507 }
514508
515- const batch = bulkOperation . s . batches . shift ( ) as Batch ;
509+ for ( const batch of bulkOperation . s . batches ) {
510+ const finalOptions = resolveOptions ( bulkOperation , {
511+ ...options ,
512+ ordered : bulkOperation . isOrdered
513+ } ) ;
516514
517- function resultHandler ( err ?: AnyError , result ?: Document ) {
518- // Error is a driver related error not a bulk op error, return early
519- if ( err && 'message' in err && ! ( err instanceof MongoWriteConcernError ) ) {
520- return callback (
521- new MongoBulkWriteError (
522- err ,
523- new BulkWriteResult ( bulkOperation . s . bulkResult , bulkOperation . isOrdered )
524- )
525- ) ;
515+ if ( finalOptions . bypassDocumentValidation !== true ) {
516+ delete finalOptions . bypassDocumentValidation ;
526517 }
527518
528- if ( err instanceof MongoWriteConcernError ) {
529- return handleMongoWriteConcernError (
530- batch ,
531- bulkOperation . s . bulkResult ,
532- bulkOperation . isOrdered ,
533- err ,
534- callback
535- ) ;
519+ // Is the bypassDocumentValidation options specific
520+ if ( bulkOperation . s . bypassDocumentValidation === true ) {
521+ finalOptions . bypassDocumentValidation = true ;
536522 }
537523
538- // Merge the results together
539- mergeBatchResults ( batch , bulkOperation . s . bulkResult , err , result ) ;
540- const writeResult = new BulkWriteResult ( bulkOperation . s . bulkResult , bulkOperation . isOrdered ) ;
541- if ( bulkOperation . handleWriteError ( callback , writeResult ) ) return ;
542-
543- // Execute the next command in line
544- executeCommands ( bulkOperation , options , callback ) ;
545- }
546-
547- const finalOptions = resolveOptions ( bulkOperation , {
548- ...options ,
549- ordered : bulkOperation . isOrdered
550- } ) ;
551-
552- if ( finalOptions . bypassDocumentValidation !== true ) {
553- delete finalOptions . bypassDocumentValidation ;
554- }
555-
556- // Set an operationIf if provided
557- if ( bulkOperation . operationId ) {
558- resultHandler . operationId = bulkOperation . operationId ;
559- }
560-
561- // Is the bypassDocumentValidation options specific
562- if ( bulkOperation . s . bypassDocumentValidation === true ) {
563- finalOptions . bypassDocumentValidation = true ;
564- }
565-
566- // Is the checkKeys option disabled
567- if ( bulkOperation . s . checkKeys === false ) {
568- finalOptions . checkKeys = false ;
569- }
570-
571- if ( finalOptions . retryWrites ) {
572- if ( isUpdateBatch ( batch ) ) {
573- finalOptions . retryWrites = finalOptions . retryWrites && ! batch . operations . some ( op => op . multi ) ;
524+ // Is the checkKeys option disabled
525+ if ( bulkOperation . s . checkKeys === false ) {
526+ finalOptions . checkKeys = false ;
574527 }
575528
576- if ( isDeleteBatch ( batch ) ) {
577- finalOptions . retryWrites =
578- finalOptions . retryWrites && ! batch . operations . some ( op => op . limit === 0 ) ;
529+ if ( finalOptions . retryWrites ) {
530+ if ( isUpdateBatch ( batch ) ) {
531+ finalOptions . retryWrites =
532+ finalOptions . retryWrites && ! batch . operations . some ( op => op . multi ) ;
533+ }
534+
535+ if ( isDeleteBatch ( batch ) ) {
536+ finalOptions . retryWrites =
537+ finalOptions . retryWrites && ! batch . operations . some ( op => op . limit === 0 ) ;
538+ }
579539 }
580- }
581540
582- try {
583541 const operation = isInsertBatch ( batch )
584542 ? new InsertOperation ( bulkOperation . s . namespace , batch . operations , finalOptions )
585543 : isUpdateBatch ( batch )
@@ -588,39 +546,50 @@ function executeCommands(
588546 ? new DeleteOperation ( bulkOperation . s . namespace , batch . operations , finalOptions )
589547 : null ;
590548
591- if ( operation != null ) {
592- executeOperation ( bulkOperation . s . collection . client , operation ) . then (
593- result => resultHandler ( undefined , result ) ,
594- error => resultHandler ( error )
595- ) ;
549+ if ( operation == null ) throw new MongoRuntimeError ( `Unknown batchType: ${ batch . batchType } ` ) ;
550+
551+ let thrownError = null ;
552+ let result ;
553+ try {
554+ result = await executeOperation ( bulkOperation . s . collection . client , operation ) ;
555+ } catch ( error ) {
556+ thrownError = error ;
557+ }
558+
559+ if ( thrownError != null ) {
560+ if ( thrownError instanceof MongoWriteConcernError ) {
561+ mergeBatchResults ( batch , bulkOperation . s . bulkResult , thrownError , result ) ;
562+ const writeResult = new BulkWriteResult (
563+ bulkOperation . s . bulkResult ,
564+ bulkOperation . isOrdered
565+ ) ;
566+
567+ throw new MongoBulkWriteError (
568+ {
569+ message : thrownError . result . writeConcernError . errmsg ,
570+ code : thrownError . result . writeConcernError . code
571+ } ,
572+ writeResult
573+ ) ;
574+ } else {
575+ // Error is a driver related error not a bulk op error, return early
576+ throw new MongoBulkWriteError (
577+ thrownError ,
578+ new BulkWriteResult ( bulkOperation . s . bulkResult , bulkOperation . isOrdered )
579+ ) ;
580+ }
596581 }
597- } catch ( err ) {
598- // Force top level error
599- err . ok = 0 ;
600- // Merge top level error and return
601- mergeBatchResults ( batch , bulkOperation . s . bulkResult , err , undefined ) ;
602- callback ( ) ;
582+
583+ mergeBatchResults ( batch , bulkOperation . s . bulkResult , thrownError , result ) ;
584+ const writeResult = new BulkWriteResult ( bulkOperation . s . bulkResult , bulkOperation . isOrdered ) ;
585+ bulkOperation . handleWriteError ( writeResult ) ;
603586 }
604- }
605587
606- function handleMongoWriteConcernError (
607- batch : Batch ,
608- bulkResult : BulkResult ,
609- isOrdered : boolean ,
610- err : MongoWriteConcernError ,
611- callback : Callback < BulkWriteResult >
612- ) {
613- mergeBatchResults ( batch , bulkResult , undefined , err . result ) ;
614-
615- callback (
616- new MongoBulkWriteError (
617- {
618- message : err . result . writeConcernError . errmsg ,
619- code : err . result . writeConcernError . code
620- } ,
621- new BulkWriteResult ( bulkResult , isOrdered )
622- )
623- ) ;
588+ bulkOperation . s . batches . length = 0 ;
589+
590+ const writeResult = new BulkWriteResult ( bulkOperation . s . bulkResult , bulkOperation . isOrdered ) ;
591+ bulkOperation . handleWriteError ( writeResult ) ;
592+ return writeResult ;
624593}
625594
626595/**
@@ -875,8 +844,6 @@ export interface BulkWriteOptions extends CommandOperationOptions {
875844 let ?: Document ;
876845}
877846
878- const executeCommandsAsync = promisify ( executeCommands ) ;
879-
880847/**
881848 * TODO(NODE-4063)
882849 * BulkWrites merge complexity is implemented in executeCommands
@@ -895,15 +862,15 @@ export class BulkWriteShimOperation extends AbstractOperation {
895862 return 'bulkWrite' as const ;
896863 }
897864
898- execute ( _server : Server , session : ClientSession | undefined ) : Promise < any > {
865+ async execute ( _server : Server , session : ClientSession | undefined ) : Promise < any > {
899866 if ( this . options . session == null ) {
900867 // An implicit session could have been created by 'executeOperation'
901868 // So if we stick it on finalOptions here, each bulk operation
902869 // will use this same session, it'll be passed in the same way
903870 // an explicit session would be
904871 this . options . session = session ;
905872 }
906- return executeCommandsAsync ( this . bulkOperation , this . options ) ;
873+ return await executeCommands ( this . bulkOperation , this . options ) ;
907874 }
908875}
909876
@@ -1239,33 +1206,26 @@ export abstract class BulkOperationBase {
12391206 * Handles the write error before executing commands
12401207 * @internal
12411208 */
1242- handleWriteError ( callback : Callback < BulkWriteResult > , writeResult : BulkWriteResult ) : boolean {
1209+ handleWriteError ( writeResult : BulkWriteResult ) : void {
12431210 if ( this . s . bulkResult . writeErrors . length > 0 ) {
12441211 const msg = this . s . bulkResult . writeErrors [ 0 ] . errmsg
12451212 ? this . s . bulkResult . writeErrors [ 0 ] . errmsg
12461213 : 'write operation failed' ;
12471214
1248- callback (
1249- new MongoBulkWriteError (
1250- {
1251- message : msg ,
1252- code : this . s . bulkResult . writeErrors [ 0 ] . code ,
1253- writeErrors : this . s . bulkResult . writeErrors
1254- } ,
1255- writeResult
1256- )
1215+ throw new MongoBulkWriteError (
1216+ {
1217+ message : msg ,
1218+ code : this . s . bulkResult . writeErrors [ 0 ] . code ,
1219+ writeErrors : this . s . bulkResult . writeErrors
1220+ } ,
1221+ writeResult
12571222 ) ;
1258-
1259- return true ;
12601223 }
12611224
12621225 const writeConcernError = writeResult . getWriteConcernError ( ) ;
12631226 if ( writeConcernError ) {
1264- callback ( new MongoBulkWriteError ( writeConcernError , writeResult ) ) ;
1265- return true ;
1227+ throw new MongoBulkWriteError ( writeConcernError , writeResult ) ;
12661228 }
1267-
1268- return false ;
12691229 }
12701230
12711231 abstract addToOperationsList (
0 commit comments