@@ -151,6 +151,12 @@ export class RetryingCall implements Call {
151
151
private initialMetadata : Metadata | null = null ;
152
152
private underlyingCalls : UnderlyingCall [ ] = [ ] ;
153
153
private writeBuffer : WriteBufferEntry [ ] = [ ] ;
154
+ /**
155
+ * The offset of message indices in the writeBuffer. For example, if
156
+ * writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15
157
+ * is in writeBuffer[5].
158
+ */
159
+ private writeBufferOffset = 0 ;
154
160
/**
155
161
* Tracks whether a read has been started, so that we know whether to start
156
162
* reads on new child calls. This only matters for the first read, because
@@ -203,14 +209,8 @@ export class RetryingCall implements Call {
203
209
private reportStatus ( statusObject : StatusObject ) {
204
210
this . trace ( 'ended with status: code=' + statusObject . code + ' details="' + statusObject . details + '"' ) ;
205
211
this . bufferTracker . freeAll ( this . callNumber ) ;
206
- for ( let i = 0 ; i < this . writeBuffer . length ; i ++ ) {
207
- if ( this . writeBuffer [ i ] . entryType === 'MESSAGE' ) {
208
- this . writeBuffer [ i ] = {
209
- entryType : 'FREED' ,
210
- allocated : false
211
- } ;
212
- }
213
- }
212
+ this . writeBufferOffset = this . writeBufferOffset + this . writeBuffer . length ;
213
+ this . writeBuffer = [ ] ;
214
214
process . nextTick ( ( ) => {
215
215
// Explicitly construct status object to remove progress field
216
216
this . listener ?. onReceiveStatus ( {
@@ -236,20 +236,27 @@ export class RetryingCall implements Call {
236
236
}
237
237
}
238
238
239
- private maybefreeMessageBufferEntry ( messageIndex : number ) {
239
+ private getBufferEntry ( messageIndex : number ) : WriteBufferEntry {
240
+ return this . writeBuffer [ messageIndex - this . writeBufferOffset ] ?? { entryType : 'FREED' , allocated : false } ;
241
+ }
242
+
243
+ private getNextBufferIndex ( ) {
244
+ return this . writeBufferOffset + this . writeBuffer . length ;
245
+ }
246
+
247
+ private clearSentMessages ( ) {
240
248
if ( this . state !== 'COMMITTED' ) {
241
249
return ;
242
250
}
243
- const bufferEntry = this . writeBuffer [ messageIndex ] ;
244
- if ( bufferEntry . entryType === 'MESSAGE' ) {
251
+ const earliestNeededMessageIndex = this . underlyingCalls [ this . committedCallIndex ! ] . nextMessageToSend ;
252
+ for ( let messageIndex = this . writeBufferOffset ; messageIndex < earliestNeededMessageIndex ; messageIndex ++ ) {
253
+ const bufferEntry = this . getBufferEntry ( messageIndex ) ;
245
254
if ( bufferEntry . allocated ) {
246
255
this . bufferTracker . free ( bufferEntry . message ! . message . length , this . callNumber ) ;
247
256
}
248
- this . writeBuffer [ messageIndex ] = {
249
- entryType : 'FREED' ,
250
- allocated : false
251
- } ;
252
257
}
258
+ this . writeBuffer = this . writeBuffer . slice ( earliestNeededMessageIndex - this . writeBufferOffset ) ;
259
+ this . writeBufferOffset = earliestNeededMessageIndex ;
253
260
}
254
261
255
262
private commitCall ( index : number ) {
@@ -272,9 +279,7 @@ export class RetryingCall implements Call {
272
279
this . underlyingCalls [ i ] . state = 'COMPLETED' ;
273
280
this . underlyingCalls [ i ] . call . cancelWithStatus ( Status . CANCELLED , 'Discarded in favor of other hedged attempt' ) ;
274
281
}
275
- for ( let messageIndex = 0 ; messageIndex < this . underlyingCalls [ index ] . nextMessageToSend - 1 ; messageIndex += 1 ) {
276
- this . maybefreeMessageBufferEntry ( messageIndex ) ;
277
- }
282
+ this . clearSentMessages ( ) ;
278
283
}
279
284
280
285
private commitCallWithMostMessages ( ) {
@@ -555,8 +560,8 @@ export class RetryingCall implements Call {
555
560
private handleChildWriteCompleted ( childIndex : number ) {
556
561
const childCall = this . underlyingCalls [ childIndex ] ;
557
562
const messageIndex = childCall . nextMessageToSend ;
558
- this . writeBuffer [ messageIndex ] . callback ?.( ) ;
559
- this . maybefreeMessageBufferEntry ( messageIndex ) ;
563
+ this . getBufferEntry ( messageIndex ) . callback ?.( ) ;
564
+ this . clearSentMessages ( ) ;
560
565
childCall . nextMessageToSend += 1 ;
561
566
this . sendNextChildMessage ( childIndex ) ;
562
567
}
@@ -566,10 +571,10 @@ export class RetryingCall implements Call {
566
571
if ( childCall . state === 'COMPLETED' ) {
567
572
return ;
568
573
}
569
- if ( this . writeBuffer [ childCall . nextMessageToSend ] ) {
570
- const bufferEntry = this . writeBuffer [ childCall . nextMessageToSend ] ;
574
+ if ( this . getBufferEntry ( childCall . nextMessageToSend ) ) {
575
+ const bufferEntry = this . getBufferEntry ( childCall . nextMessageToSend ) ;
571
576
switch ( bufferEntry . entryType ) {
572
- case 'MESSAGE' :
577
+ case 'MESSAGE' :
573
578
childCall . call . sendMessageWithContext ( {
574
579
callback : ( error ) => {
575
580
// Ignore error
@@ -594,13 +599,13 @@ export class RetryingCall implements Call {
594
599
message,
595
600
flags : context . flags ,
596
601
} ;
597
- const messageIndex = this . writeBuffer . length ;
602
+ const messageIndex = this . getNextBufferIndex ( ) ;
598
603
const bufferEntry : WriteBufferEntry = {
599
604
entryType : 'MESSAGE' ,
600
605
message : writeObj ,
601
606
allocated : this . bufferTracker . allocate ( message . length , this . callNumber )
602
607
} ;
603
- this . writeBuffer [ messageIndex ] = bufferEntry ;
608
+ this . writeBuffer . push ( bufferEntry ) ;
604
609
if ( bufferEntry . allocated ) {
605
610
context . callback ?.( ) ;
606
611
for ( const [ callIndex , call ] of this . underlyingCalls . entries ( ) ) {
@@ -642,11 +647,11 @@ export class RetryingCall implements Call {
642
647
}
643
648
halfClose ( ) : void {
644
649
this . trace ( 'halfClose called' ) ;
645
- const halfCloseIndex = this . writeBuffer . length ;
646
- this . writeBuffer [ halfCloseIndex ] = {
650
+ const halfCloseIndex = this . getNextBufferIndex ( ) ;
651
+ this . writeBuffer . push ( {
647
652
entryType : 'HALF_CLOSE' ,
648
653
allocated : false
649
- } ;
654
+ } ) ;
650
655
for ( const call of this . underlyingCalls ) {
651
656
if ( call ?. state === 'ACTIVE' && call . nextMessageToSend === halfCloseIndex ) {
652
657
call . nextMessageToSend += 1 ;
0 commit comments