@@ -291,14 +291,16 @@ public void close() throws SecurityException {
291
291
292
292
final String message = "Hello Client" ;
293
293
Buffer buffer = createMessageFrame (message );
294
- frameHandler ().data (false , 3 , buffer , (int ) buffer .size ());
294
+ frameHandler ().data (false , 3 , buffer , (int ) buffer .size (),
295
+ (int ) buffer .size ());
295
296
assertThat (logs ).hasSize (1 );
296
297
log = logs .remove (0 );
297
298
assertThat (log .getMessage ()).startsWith (Direction .INBOUND + " DATA: streamId=" + 3 );
298
299
assertThat (log .getLevel ()).isEqualTo (Level .FINE );
299
300
300
301
// At most 64 bytes of data frame will be logged.
301
- frameHandler ().data (false , 3 , createMessageFrame (new String (new char [1000 ])), 1000 );
302
+ frameHandler ().data (false , 3 , createMessageFrame (new String (new char [1000 ])),
303
+ 1000 , 1000 );
302
304
assertThat (logs ).hasSize (1 );
303
305
log = logs .remove (0 );
304
306
String data = log .getMessage ();
@@ -377,7 +379,8 @@ public void maxMessageSizeShouldBeEnforced() throws Exception {
377
379
// Receive the message.
378
380
final String message = "Hello Client" ;
379
381
Buffer buffer = createMessageFrame (message );
380
- frameHandler ().data (false , 3 , buffer , (int ) buffer .size ());
382
+ frameHandler ().data (false , 3 , buffer , (int ) buffer .size (),
383
+ (int ) buffer .size ());
381
384
382
385
listener .waitUntilStreamClosed ();
383
386
assertEquals (Code .RESOURCE_EXHAUSTED , listener .status .getCode ());
@@ -500,7 +503,8 @@ public void readMessages() throws Exception {
500
503
assertNotNull (listener .headers );
501
504
for (int i = 0 ; i < numMessages ; i ++) {
502
505
Buffer buffer = createMessageFrame (message + i );
503
- frameHandler ().data (false , 3 , buffer , (int ) buffer .size ());
506
+ frameHandler ().data (false , 3 , buffer , (int ) buffer .size (),
507
+ (int ) buffer .size ());
504
508
}
505
509
frameHandler ().headers (true , true , 3 , 0 , grpcResponseTrailers (), HeadersMode .HTTP_20_HEADERS );
506
510
listener .waitUntilStreamClosed ();
@@ -529,7 +533,8 @@ public void receivedHeadersForInvalidStreamShouldKillConnection() throws Excepti
529
533
@ Test
530
534
public void receivedDataForInvalidStreamShouldKillConnection () throws Exception {
531
535
initTransport ();
532
- frameHandler ().data (false , 3 , createMessageFrame (new String (new char [1000 ])), 1000 );
536
+ frameHandler ().data (false , 3 , createMessageFrame (new String (new char [1000 ])),
537
+ 1000 , 1000 );
533
538
verify (frameWriter , timeout (TIME_OUT_MS ))
534
539
.goAway (eq (0 ), eq (ErrorCode .PROTOCOL_ERROR ), any (byte [].class ));
535
540
verify (transportListener ).transportShutdown (isA (Status .class ));
@@ -551,7 +556,8 @@ public void invalidInboundHeadersCancelStream() throws Exception {
551
556
HeadersMode .HTTP_20_HEADERS );
552
557
// Now wait to receive 1000 bytes of data so we can have a better error message before
553
558
// cancelling the streaam.
554
- frameHandler ().data (false , 3 , createMessageFrame (new String (new char [1000 ])), 1000 );
559
+ frameHandler ().data (false , 3 ,
560
+ createMessageFrame (new String (new char [1000 ])), 1000 , 1000 );
555
561
verify (frameWriter , timeout (TIME_OUT_MS )).rstStream (eq (3 ), eq (ErrorCode .CANCEL ));
556
562
assertNull (listener .headers );
557
563
assertEquals (Status .INTERNAL .getCode (), listener .status .getCode ());
@@ -622,7 +628,8 @@ public void receiveResetNoError() throws Exception {
622
628
assertContainStream (3 );
623
629
frameHandler ().headers (false , false , 3 , 0 , grpcResponseHeaders (), HeadersMode .HTTP_20_HEADERS );
624
630
Buffer buffer = createMessageFrame ("a message" );
625
- frameHandler ().data (false , 3 , buffer , (int ) buffer .size ());
631
+ frameHandler ().data (false , 3 , buffer , (int ) buffer .size (),
632
+ (int ) buffer .size ());
626
633
frameHandler ().headers (true , true , 3 , 0 , grpcResponseTrailers (), HeadersMode .HTTP_20_HEADERS );
627
634
frameHandler ().rstStream (3 , ErrorCode .NO_ERROR );
628
635
stream .request (1 );
@@ -762,33 +769,37 @@ public void windowUpdate() throws Exception {
762
769
763
770
int messageLength = INITIAL_WINDOW_SIZE / 4 ;
764
771
byte [] fakeMessage = new byte [messageLength ];
772
+ int paddingLength = 2 ;
765
773
766
774
// Stream 1 receives a message
767
- Buffer buffer = createMessageFrame (fakeMessage );
775
+ Buffer buffer = createMessageFrame (fakeMessage , paddingLength );
768
776
int messageFrameLength = (int ) buffer .size ();
769
- frameHandler ().data (false , 3 , buffer , messageFrameLength );
777
+ frameHandler ().data (false , 3 , buffer , messageFrameLength - paddingLength ,
778
+ messageFrameLength );
770
779
771
780
// Stream 2 receives a message
772
- buffer = createMessageFrame (fakeMessage );
773
- frameHandler ().data (false , 5 , buffer , messageFrameLength );
781
+ buffer = createMessageFrame (fakeMessage , paddingLength );
782
+ frameHandler ().data (false , 5 , buffer , messageFrameLength - paddingLength ,
783
+ messageFrameLength );
774
784
775
785
verify (frameWriter , timeout (TIME_OUT_MS ))
776
786
.windowUpdate (eq (0 ), eq ((long ) 2 * messageFrameLength ));
777
787
reset (frameWriter );
778
788
779
789
// Stream 1 receives another message
780
790
buffer = createMessageFrame (fakeMessage );
781
- frameHandler ().data (false , 3 , buffer , messageFrameLength );
791
+ messageFrameLength = (int ) buffer .size ();
792
+ frameHandler ().data (false , 3 , buffer , messageFrameLength , messageFrameLength );
782
793
783
794
verify (frameWriter , timeout (TIME_OUT_MS ))
784
- .windowUpdate (eq (3 ), eq ((long ) 2 * messageFrameLength ));
795
+ .windowUpdate (eq (3 ), eq ((long ) 2 * messageFrameLength + paddingLength ));
785
796
786
797
// Stream 2 receives another message
787
798
buffer = createMessageFrame (fakeMessage );
788
- frameHandler ().data (false , 5 , buffer , messageFrameLength );
799
+ frameHandler ().data (false , 5 , buffer , messageFrameLength , messageFrameLength );
789
800
790
801
verify (frameWriter , timeout (TIME_OUT_MS ))
791
- .windowUpdate (eq (5 ), eq ((long ) 2 * messageFrameLength ));
802
+ .windowUpdate (eq (5 ), eq ((long ) 2 * messageFrameLength + paddingLength ));
792
803
verify (frameWriter , timeout (TIME_OUT_MS ))
793
804
.windowUpdate (eq (0 ), eq ((long ) 2 * messageFrameLength ));
794
805
@@ -819,7 +830,8 @@ public void windowUpdateWithInboundFlowControl() throws Exception {
819
830
frameHandler ().headers (false , false , 3 , 0 , grpcResponseHeaders (), HeadersMode .HTTP_20_HEADERS );
820
831
Buffer buffer = createMessageFrame (fakeMessage );
821
832
long messageFrameLength = buffer .size ();
822
- frameHandler ().data (false , 3 , buffer , (int ) messageFrameLength );
833
+ frameHandler ().data (false , 3 , buffer , (int ) messageFrameLength ,
834
+ (int ) messageFrameLength );
823
835
ArgumentCaptor <Integer > idCaptor = ArgumentCaptor .forClass (Integer .class );
824
836
verify (frameWriter , timeout (TIME_OUT_MS )).windowUpdate (
825
837
idCaptor .capture (), eq (messageFrameLength ));
@@ -1123,7 +1135,8 @@ public void receiveGoAway() throws Exception {
1123
1135
frameHandler ().headers (false , false , 3 , 0 , grpcResponseHeaders (), HeadersMode .HTTP_20_HEADERS );
1124
1136
final String receivedMessage = "No, you are fine." ;
1125
1137
Buffer buffer = createMessageFrame (receivedMessage );
1126
- frameHandler ().data (false , 3 , buffer , (int ) buffer .size ());
1138
+ frameHandler ().data (false , 3 , buffer , (int ) buffer .size (),
1139
+ (int ) buffer .size ());
1127
1140
frameHandler ().headers (true , true , 3 , 0 , grpcResponseTrailers (), HeadersMode .HTTP_20_HEADERS );
1128
1141
listener1 .waitUntilStreamClosed ();
1129
1142
assertEquals (1 , listener1 .messages .size ());
@@ -1154,12 +1167,12 @@ public void streamIdExhausted() throws Exception {
1154
1167
assertNotNull (listener .headers );
1155
1168
String message = "hello" ;
1156
1169
Buffer buffer = createMessageFrame (message );
1157
- frameHandler ().data (false , startId , buffer , (int ) buffer .size ());
1170
+ frameHandler ().data (false , startId , buffer , (int ) buffer .size (), ( int ) buffer . size () );
1158
1171
1159
1172
getStream (startId ).cancel (Status .CANCELLED );
1160
1173
// Receives the second message after be cancelled.
1161
1174
buffer = createMessageFrame (message );
1162
- frameHandler ().data (false , startId , buffer , (int ) buffer .size ());
1175
+ frameHandler ().data (false , startId , buffer , (int ) buffer .size (), ( int ) buffer . size () );
1163
1176
1164
1177
listener .waitUntilStreamClosed ();
1165
1178
// Should only have the first message delivered.
@@ -1329,7 +1342,7 @@ public void receivingWindowExceeded() throws Exception {
1329
1342
byte [] fakeMessage = new byte [messageLength ];
1330
1343
Buffer buffer = createMessageFrame (fakeMessage );
1331
1344
int messageFrameLength = (int ) buffer .size ();
1332
- frameHandler ().data (false , 3 , buffer , messageFrameLength );
1345
+ frameHandler ().data (false , 3 , buffer , messageFrameLength , messageFrameLength );
1333
1346
1334
1347
listener .waitUntilStreamClosed ();
1335
1348
assertEquals (Status .INTERNAL .getCode (), listener .status .getCode ());
@@ -1392,7 +1405,8 @@ public void receiveDataWithoutHeader() throws Exception {
1392
1405
stream .start (listener );
1393
1406
stream .request (1 );
1394
1407
Buffer buffer = createMessageFrame (new byte [1 ]);
1395
- frameHandler ().data (false , 3 , buffer , (int ) buffer .size ());
1408
+ frameHandler ().data (false , 3 , buffer , (int ) buffer .size (),
1409
+ (int ) buffer .size ());
1396
1410
1397
1411
// Trigger the failure by a trailer.
1398
1412
frameHandler ().headers (
@@ -1414,11 +1428,13 @@ public void receiveDataWithoutHeaderAndTrailer() throws Exception {
1414
1428
stream .start (listener );
1415
1429
stream .request (1 );
1416
1430
Buffer buffer = createMessageFrame (new byte [1 ]);
1417
- frameHandler ().data (false , 3 , buffer , (int ) buffer .size ());
1431
+ frameHandler ().data (false , 3 , buffer , (int ) buffer .size (),
1432
+ (int ) buffer .size ());
1418
1433
1419
1434
// Trigger the failure by a data frame.
1420
1435
buffer = createMessageFrame (new byte [1 ]);
1421
- frameHandler ().data (true , 3 , buffer , (int ) buffer .size ());
1436
+ frameHandler ().data (true , 3 , buffer , (int ) buffer .size (),
1437
+ (int ) buffer .size ());
1422
1438
1423
1439
listener .waitUntilStreamClosed ();
1424
1440
assertEquals (Status .INTERNAL .getCode (), listener .status .getCode ());
@@ -1436,7 +1452,8 @@ public void receiveLongEnoughDataWithoutHeaderAndTrailer() throws Exception {
1436
1452
stream .start (listener );
1437
1453
stream .request (1 );
1438
1454
Buffer buffer = createMessageFrame (new byte [1000 ]);
1439
- frameHandler ().data (false , 3 , buffer , (int ) buffer .size ());
1455
+ frameHandler ().data (false , 3 , buffer , (int ) buffer .size (),
1456
+ (int ) buffer .size ());
1440
1457
1441
1458
// Once we receive enough detail, we cancel the stream. so we should have sent cancel.
1442
1459
verify (frameWriter , timeout (TIME_OUT_MS )).rstStream (eq (3 ), eq (ErrorCode .CANCEL ));
@@ -1459,15 +1476,17 @@ public void receiveDataForUnknownStreamUpdateConnectionWindow() throws Exception
1459
1476
1460
1477
Buffer buffer = createMessageFrame (
1461
1478
new byte [INITIAL_WINDOW_SIZE / 2 + 1 ]);
1462
- frameHandler ().data (false , 3 , buffer , (int ) buffer .size ());
1479
+ frameHandler ().data (false , 3 , buffer , (int ) buffer .size (),
1480
+ (int ) buffer .size ());
1463
1481
// Should still update the connection window even stream 3 is gone.
1464
1482
verify (frameWriter , timeout (TIME_OUT_MS )).windowUpdate (0 ,
1465
1483
HEADER_LENGTH + INITIAL_WINDOW_SIZE / 2 + 1 );
1466
1484
buffer = createMessageFrame (
1467
1485
new byte [INITIAL_WINDOW_SIZE / 2 + 1 ]);
1468
1486
1469
1487
// This should kill the connection, since we never created stream 5.
1470
- frameHandler ().data (false , 5 , buffer , (int ) buffer .size ());
1488
+ frameHandler ().data (false , 5 , buffer , (int ) buffer .size (),
1489
+ (int ) buffer .size ());
1471
1490
verify (frameWriter , timeout (TIME_OUT_MS ))
1472
1491
.goAway (eq (0 ), eq (ErrorCode .PROTOCOL_ERROR ), any (byte [].class ));
1473
1492
verify (transportListener ).transportShutdown (isA (Status .class ));
@@ -2114,10 +2133,15 @@ private static Buffer createMessageFrame(String message) {
2114
2133
}
2115
2134
2116
2135
private static Buffer createMessageFrame (byte [] message ) {
2136
+ return createMessageFrame (message ,0 );
2137
+ }
2138
+
2139
+ private static Buffer createMessageFrame (byte [] message , int paddingLength ) {
2117
2140
Buffer buffer = new Buffer ();
2118
2141
buffer .writeByte (0 /* UNCOMPRESSED */ );
2119
2142
buffer .writeInt (message .length );
2120
2143
buffer .write (message );
2144
+ buffer .write (new byte [paddingLength ]);
2121
2145
return buffer ;
2122
2146
}
2123
2147
0 commit comments