11import { BSON , type Document } from '../../bson' ;
22import { DocumentSequence } from '../../cmap/commands' ;
3+ import { MongoAPIError , MongoInvalidArgumentError } from '../../error' ;
34import { type PkFactory } from '../../mongo_client' ;
45import type { Filter , OptionalId , UpdateFilter , WithoutId } from '../../mongo_types' ;
5- import { DEFAULT_PK_FACTORY } from '../../utils' ;
6+ import { DEFAULT_PK_FACTORY , hasAtomicOperators } from '../../utils' ;
67import { type CollationOptions } from '../command' ;
78import { type Hint } from '../operation' ;
89import type {
@@ -38,8 +39,14 @@ export class ClientBulkWriteCommandBuilder {
3839 models : AnyClientBulkWriteModel [ ] ;
3940 options : ClientBulkWriteOptions ;
4041 pkFactory : PkFactory ;
42+ /** The current index in the models array that is being processed. */
4143 currentModelIndex : number ;
44+ /** The model index that the builder was on when it finished the previous batch. Used for resets when retrying. */
45+ previousModelIndex : number ;
46+ /** The last array of operations that were created. Used by the results merger for indexing results. */
4247 lastOperations : Document [ ] ;
48+ /** Returns true if the current batch being created has no multi-updates. */
49+ isBatchRetryable : boolean ;
4350
4451 /**
4552 * Create the command builder.
@@ -54,7 +61,9 @@ export class ClientBulkWriteCommandBuilder {
5461 this . options = options ;
5562 this . pkFactory = pkFactory ?? DEFAULT_PK_FACTORY ;
5663 this . currentModelIndex = 0 ;
64+ this . previousModelIndex = 0 ;
5765 this . lastOperations = [ ] ;
66+ this . isBatchRetryable = true ;
5867 }
5968
6069 /**
@@ -76,27 +85,57 @@ export class ClientBulkWriteCommandBuilder {
7685 return this . currentModelIndex < this . models . length ;
7786 }
7887
88+ /**
89+ * When we need to retry a command we need to set the current
90+ * model index back to its previous value.
91+ */
92+ resetBatch ( ) : boolean {
93+ this . currentModelIndex = this . previousModelIndex ;
94+ return true ;
95+ }
96+
7997 /**
8098 * Build a single batch of a client bulk write command.
8199 * @param maxMessageSizeBytes - The max message size in bytes.
82100 * @param maxWriteBatchSize - The max write batch size.
83101 * @returns The client bulk write command.
84102 */
85- buildBatch ( maxMessageSizeBytes : number , maxWriteBatchSize : number ) : ClientBulkWriteCommand {
103+ buildBatch (
104+ maxMessageSizeBytes : number ,
105+ maxWriteBatchSize : number ,
106+ maxBsonObjectSize : number
107+ ) : ClientBulkWriteCommand {
108+ // We start by assuming the batch has no multi-updates, so it is retryable
109+ // until we find them.
110+ this . isBatchRetryable = true ;
86111 let commandLength = 0 ;
87112 let currentNamespaceIndex = 0 ;
88113 const command : ClientBulkWriteCommand = this . baseCommand ( ) ;
89114 const namespaces = new Map < string , number > ( ) ;
115+ // In the case of retries we need to mark where we started this batch.
116+ this . previousModelIndex = this . currentModelIndex ;
90117
91118 while ( this . currentModelIndex < this . models . length ) {
92119 const model = this . models [ this . currentModelIndex ] ;
93120 const ns = model . namespace ;
94121 const nsIndex = namespaces . get ( ns ) ;
95122
123+ // Multi updates are not retryable.
124+ if ( model . name === 'deleteMany' || model . name === 'updateMany' ) {
125+ this . isBatchRetryable = false ;
126+ }
127+
96128 if ( nsIndex != null ) {
97129 // Build the operation and serialize it to get the bytes buffer.
98130 const operation = buildOperation ( model , nsIndex , this . pkFactory ) ;
99- const operationBuffer = BSON . serialize ( operation ) ;
131+ let operationBuffer ;
132+ try {
133+ operationBuffer = BSON . serialize ( operation ) ;
134+ } catch ( cause ) {
135+ throw new MongoInvalidArgumentError ( `Could not serialize operation to BSON` , { cause } ) ;
136+ }
137+
138+ validateBufferSize ( 'ops' , operationBuffer , maxBsonObjectSize ) ;
100139
101140 // Check if the operation buffer can fit in the command. If it can,
102141 // then add the operation to the document sequence and increment the
@@ -119,9 +158,18 @@ export class ClientBulkWriteCommandBuilder {
119158 // construct our nsInfo and ops documents and buffers.
120159 namespaces . set ( ns , currentNamespaceIndex ) ;
121160 const nsInfo = { ns : ns } ;
122- const nsInfoBuffer = BSON . serialize ( nsInfo ) ;
123161 const operation = buildOperation ( model , currentNamespaceIndex , this . pkFactory ) ;
124- const operationBuffer = BSON . serialize ( operation ) ;
162+ let nsInfoBuffer ;
163+ let operationBuffer ;
164+ try {
165+ nsInfoBuffer = BSON . serialize ( nsInfo ) ;
166+ operationBuffer = BSON . serialize ( operation ) ;
167+ } catch ( cause ) {
168+ throw new MongoInvalidArgumentError ( `Could not serialize ns info to BSON` , { cause } ) ;
169+ }
170+
171+ validateBufferSize ( 'nsInfo' , nsInfoBuffer , maxBsonObjectSize ) ;
172+ validateBufferSize ( 'ops' , operationBuffer , maxBsonObjectSize ) ;
125173
126174 // Check if the operation and nsInfo buffers can fit in the command. If they
127175 // can, then add the operation and nsInfo to their respective document
@@ -179,6 +227,14 @@ export class ClientBulkWriteCommandBuilder {
179227 }
180228}
181229
230+ function validateBufferSize ( name : string , buffer : Uint8Array , maxBsonObjectSize : number ) {
231+ if ( buffer . length > maxBsonObjectSize ) {
232+ throw new MongoInvalidArgumentError (
233+ `Client bulk write operation ${ name } of length ${ buffer . length } exceeds the max bson object size of ${ maxBsonObjectSize } `
234+ ) ;
235+ }
236+ }
237+
182238/** @internal */
183239interface ClientInsertOperation {
184240 insert : number ;
@@ -293,6 +349,18 @@ export const buildUpdateManyOperation = (
293349 return createUpdateOperation ( model , index , true ) ;
294350} ;
295351
352+ /**
353+ * Validate the update document.
354+ * @param update - The update document.
355+ */
356+ function validateUpdate ( update : Document ) {
357+ if ( ! hasAtomicOperators ( update ) ) {
358+ throw new MongoAPIError (
359+ 'Client bulk write update models must only contain atomic modifiers (start with $) and must not be empty.'
360+ ) ;
361+ }
362+ }
363+
296364/**
297365 * Creates a delete operation based on the parameters.
298366 */
@@ -301,6 +369,11 @@ function createUpdateOperation(
301369 index : number ,
302370 multi : boolean
303371) : ClientUpdateOperation {
372+ // Update documents provided in UpdateOne and UpdateMany write models are
373+ // required only to contain atomic modifiers (i.e. keys that start with "$").
374+ // Drivers MUST throw an error if an update document is empty or if the
375+ // document's first key does not start with "$".
376+ validateUpdate ( model . update ) ;
304377 const document : ClientUpdateOperation = {
305378 update : index ,
306379 multi : multi ,
@@ -343,6 +416,12 @@ export const buildReplaceOneOperation = (
343416 model : ClientReplaceOneModel ,
344417 index : number
345418) : ClientReplaceOneOperation => {
419+ if ( hasAtomicOperators ( model . replacement ) ) {
420+ throw new MongoAPIError (
421+ 'Client bulk write replace models must not contain atomic modifiers (start with $) and must not be empty.'
422+ ) ;
423+ }
424+
346425 const document : ClientReplaceOneOperation = {
347426 update : index ,
348427 multi : false ,
0 commit comments