Skip to content

Commit 5f34c60

Browse files
authored
okhttp: okhttp client and server transport should use padded length for flow control (#10422)
1 parent 93118f4 commit 5f34c60

File tree

9 files changed

+288
-58
lines changed

9 files changed

+288
-58
lines changed

okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,11 +321,12 @@ public void transportHeadersReceived(List<Header> headers, boolean endOfStream)
321321
* Must be called with holding the transport lock.
322322
*/
323323
@GuardedBy("lock")
324-
public void transportDataReceived(okio.Buffer frame, boolean endOfStream) {
324+
public void transportDataReceived(okio.Buffer frame, boolean endOfStream, int paddingLen) {
325325
// We only support 16 KiB frames, and the max permitted in HTTP/2 is 16 MiB. This is verified
326326
// in OkHttp's Http2 deframer. In addition, this code is after the data has been read.
327327
int length = (int) frame.size();
328-
window -= length;
328+
window -= length + paddingLen;
329+
processedWindow -= paddingLen;
329330
if (window < 0) {
330331
frameWriter.rstStream(id(), ErrorCode.FLOW_CONTROL_ERROR);
331332
transport.finishStream(

okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,7 +1140,8 @@ public void run() {
11401140
*/
11411141
@SuppressWarnings("GuardedBy")
11421142
@Override
1143-
public void data(boolean inFinished, int streamId, BufferedSource in, int length)
1143+
public void data(boolean inFinished, int streamId, BufferedSource in, int length,
1144+
int paddedLength)
11441145
throws IOException {
11451146
logger.logData(OkHttpFrameLogger.Direction.INBOUND,
11461147
streamId, in.getBuffer(), length, inFinished);
@@ -1166,12 +1167,12 @@ public void data(boolean inFinished, int streamId, BufferedSource in, int length
11661167
synchronized (lock) {
11671168
// TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
11681169
// instead found: 'OkHttpClientTransport.this.lock'
1169-
stream.transportState().transportDataReceived(buf, inFinished);
1170+
stream.transportState().transportDataReceived(buf, inFinished, paddedLength - length);
11701171
}
11711172
}
11721173

11731174
// connection window update
1174-
connectionUnacknowledgedBytesRead += length;
1175+
connectionUnacknowledgedBytesRead += paddedLength;
11751176
if (connectionUnacknowledgedBytesRead >= initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO) {
11761177
synchronized (lock) {
11771178
frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);

okhttp/src/main/java/io/grpc/okhttp/OkHttpServerStream.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,13 +208,15 @@ public void runOnTransportThread(final Runnable r) {
208208
* Must be called with holding the transport lock.
209209
*/
210210
@Override
211-
public void inboundDataReceived(okio.Buffer frame, int windowConsumed, boolean endOfStream) {
211+
public void inboundDataReceived(okio.Buffer frame, int dataLength, int paddingLength,
212+
boolean endOfStream) {
212213
synchronized (lock) {
213214
PerfMark.event("OkHttpServerTransport$FrameHandler.data", tag);
214215
if (endOfStream) {
215216
this.receivedEndOfStream = true;
216217
}
217-
window -= windowConsumed;
218+
window -= dataLength + paddingLength;
219+
processedWindow -= paddingLength;
218220
super.inboundDataReceived(new OkHttpReadableBuffer(frame), endOfStream);
219221
}
220222
}

okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,8 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount
248248
TimeUnit.NANOSECONDS);
249249
}
250250

251-
transportExecutor.execute(
252-
new FrameHandler(variant.newReader(Okio.buffer(Okio.source(socket)), false)));
251+
transportExecutor.execute(new FrameHandler(
252+
variant.newReader(Okio.buffer(Okio.source(socket)), false)));
253253
} catch (Error | IOException | RuntimeException ex) {
254254
synchronized (lock) {
255255
if (!handshakeShutdown) {
@@ -708,7 +708,7 @@ public void headers(boolean outFinished,
708708
return;
709709
}
710710
// Ignore the trailers, but still half-close the stream
711-
stream.inboundDataReceived(new Buffer(), 0, true);
711+
stream.inboundDataReceived(new Buffer(), 0, 0, true);
712712
return;
713713
}
714714
} else {
@@ -799,7 +799,7 @@ public void headers(boolean outFinished,
799799
listener.streamCreated(streamForApp, method, metadata);
800800
stream.onStreamAllocated();
801801
if (inFinished) {
802-
stream.inboundDataReceived(new Buffer(), 0, inFinished);
802+
stream.inboundDataReceived(new Buffer(), 0, 0, inFinished);
803803
}
804804
}
805805
}
@@ -819,7 +819,8 @@ private int headerBlockSize(List<Header> headerBlock) {
819819
* Handle an HTTP2 DATA frame.
820820
*/
821821
@Override
822-
public void data(boolean inFinished, int streamId, BufferedSource in, int length)
822+
public void data(boolean inFinished, int streamId, BufferedSource in, int length,
823+
int paddedLength)
823824
throws IOException {
824825
frameLogger.logData(
825826
OkHttpFrameLogger.Direction.INBOUND, streamId, in.getBuffer(), length, inFinished);
@@ -853,19 +854,19 @@ public void data(boolean inFinished, int streamId, BufferedSource in, int length
853854
"Received DATA for half-closed (remote) stream. RFC7540 section 5.1");
854855
return;
855856
}
856-
if (stream.inboundWindowAvailable() < length) {
857+
if (stream.inboundWindowAvailable() < paddedLength) {
857858
in.skip(length);
858859
streamError(streamId, ErrorCode.FLOW_CONTROL_ERROR,
859860
"Received DATA size exceeded window size. RFC7540 section 6.9");
860861
return;
861862
}
862863
Buffer buf = new Buffer();
863864
buf.write(in.getBuffer(), length);
864-
stream.inboundDataReceived(buf, length, inFinished);
865+
stream.inboundDataReceived(buf, length, paddedLength - length, inFinished);
865866
}
866867

867868
// connection window update
868-
connectionUnacknowledgedBytesRead += length;
869+
connectionUnacknowledgedBytesRead += paddedLength;
869870
if (connectionUnacknowledgedBytesRead
870871
>= config.flowControlWindow * Utils.DEFAULT_WINDOW_UPDATE_RATIO) {
871872
synchronized (lock) {
@@ -1064,7 +1065,7 @@ private void respondWithHttpError(
10641065
}
10651066
streams.put(streamId, stream);
10661067
if (inFinished) {
1067-
stream.inboundDataReceived(new Buffer(), 0, true);
1068+
stream.inboundDataReceived(new Buffer(), 0, 0, true);
10681069
}
10691070
frameWriter.headers(streamId, headers);
10701071
outboundFlow.data(
@@ -1122,7 +1123,7 @@ public void onPingTimeout() {
11221123

11231124
interface StreamState {
11241125
/** Must be holding 'lock' when calling. */
1125-
void inboundDataReceived(Buffer frame, int windowConsumed, boolean endOfStream);
1126+
void inboundDataReceived(Buffer frame, int dataLength, int paddingLength, boolean endOfStream);
11261127

11271128
/** Must be holding 'lock' when calling. */
11281129
boolean hasReceivedEndOfStream();
@@ -1159,12 +1160,12 @@ static class Http2ErrorStreamState implements StreamState, OutboundFlowControlle
11591160
@Override public void onSentBytes(int frameBytes) {}
11601161

11611162
@Override public void inboundDataReceived(
1162-
Buffer frame, int windowConsumed, boolean endOfStream) {
1163+
Buffer frame, int dataLength, int paddingLength, boolean endOfStream) {
11631164
synchronized (lock) {
11641165
if (endOfStream) {
11651166
receivedEndOfStream = true;
11661167
}
1167-
window -= windowConsumed;
1168+
window -= dataLength + paddingLength;
11681169
try {
11691170
frame.skip(frame.size()); // Recycle segments
11701171
} catch (IOException ex) {

okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -291,14 +291,16 @@ public void close() throws SecurityException {
291291

292292
final String message = "Hello Client";
293293
Buffer buffer = createMessageFrame(message);
294-
frameHandler().data(false, 3, buffer, (int) buffer.size());
294+
frameHandler().data(false, 3, buffer, (int) buffer.size(),
295+
(int) buffer.size());
295296
assertThat(logs).hasSize(1);
296297
log = logs.remove(0);
297298
assertThat(log.getMessage()).startsWith(Direction.INBOUND + " DATA: streamId=" + 3);
298299
assertThat(log.getLevel()).isEqualTo(Level.FINE);
299300

300301
// At most 64 bytes of data frame will be logged.
301-
frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])), 1000);
302+
frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])),
303+
1000, 1000);
302304
assertThat(logs).hasSize(1);
303305
log = logs.remove(0);
304306
String data = log.getMessage();
@@ -377,7 +379,8 @@ public void maxMessageSizeShouldBeEnforced() throws Exception {
377379
// Receive the message.
378380
final String message = "Hello Client";
379381
Buffer buffer = createMessageFrame(message);
380-
frameHandler().data(false, 3, buffer, (int) buffer.size());
382+
frameHandler().data(false, 3, buffer, (int) buffer.size(),
383+
(int) buffer.size());
381384

382385
listener.waitUntilStreamClosed();
383386
assertEquals(Code.RESOURCE_EXHAUSTED, listener.status.getCode());
@@ -500,7 +503,8 @@ public void readMessages() throws Exception {
500503
assertNotNull(listener.headers);
501504
for (int i = 0; i < numMessages; i++) {
502505
Buffer buffer = createMessageFrame(message + i);
503-
frameHandler().data(false, 3, buffer, (int) buffer.size());
506+
frameHandler().data(false, 3, buffer, (int) buffer.size(),
507+
(int) buffer.size());
504508
}
505509
frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS);
506510
listener.waitUntilStreamClosed();
@@ -529,7 +533,8 @@ public void receivedHeadersForInvalidStreamShouldKillConnection() throws Excepti
529533
@Test
530534
public void receivedDataForInvalidStreamShouldKillConnection() throws Exception {
531535
initTransport();
532-
frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])), 1000);
536+
frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])),
537+
1000, 1000);
533538
verify(frameWriter, timeout(TIME_OUT_MS))
534539
.goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class));
535540
verify(transportListener).transportShutdown(isA(Status.class));
@@ -551,7 +556,8 @@ public void invalidInboundHeadersCancelStream() throws Exception {
551556
HeadersMode.HTTP_20_HEADERS);
552557
// Now wait to receive 1000 bytes of data so we can have a better error message before
553558
// cancelling the streaam.
554-
frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])), 1000);
559+
frameHandler().data(false, 3,
560+
createMessageFrame(new String(new char[1000])), 1000, 1000);
555561
verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
556562
assertNull(listener.headers);
557563
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
@@ -622,7 +628,8 @@ public void receiveResetNoError() throws Exception {
622628
assertContainStream(3);
623629
frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
624630
Buffer buffer = createMessageFrame("a message");
625-
frameHandler().data(false, 3, buffer, (int) buffer.size());
631+
frameHandler().data(false, 3, buffer, (int) buffer.size(),
632+
(int) buffer.size());
626633
frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS);
627634
frameHandler().rstStream(3, ErrorCode.NO_ERROR);
628635
stream.request(1);
@@ -762,33 +769,37 @@ public void windowUpdate() throws Exception {
762769

763770
int messageLength = INITIAL_WINDOW_SIZE / 4;
764771
byte[] fakeMessage = new byte[messageLength];
772+
int paddingLength = 2;
765773

766774
// Stream 1 receives a message
767-
Buffer buffer = createMessageFrame(fakeMessage);
775+
Buffer buffer = createMessageFrame(fakeMessage, paddingLength);
768776
int messageFrameLength = (int) buffer.size();
769-
frameHandler().data(false, 3, buffer, messageFrameLength);
777+
frameHandler().data(false, 3, buffer, messageFrameLength - paddingLength,
778+
messageFrameLength);
770779

771780
// Stream 2 receives a message
772-
buffer = createMessageFrame(fakeMessage);
773-
frameHandler().data(false, 5, buffer, messageFrameLength);
781+
buffer = createMessageFrame(fakeMessage, paddingLength);
782+
frameHandler().data(false, 5, buffer, messageFrameLength - paddingLength,
783+
messageFrameLength);
774784

775785
verify(frameWriter, timeout(TIME_OUT_MS))
776786
.windowUpdate(eq(0), eq((long) 2 * messageFrameLength));
777787
reset(frameWriter);
778788

779789
// Stream 1 receives another message
780790
buffer = createMessageFrame(fakeMessage);
781-
frameHandler().data(false, 3, buffer, messageFrameLength);
791+
messageFrameLength = (int) buffer.size();
792+
frameHandler().data(false, 3, buffer, messageFrameLength, messageFrameLength);
782793

783794
verify(frameWriter, timeout(TIME_OUT_MS))
784-
.windowUpdate(eq(3), eq((long) 2 * messageFrameLength));
795+
.windowUpdate(eq(3), eq((long) 2 * messageFrameLength + paddingLength));
785796

786797
// Stream 2 receives another message
787798
buffer = createMessageFrame(fakeMessage);
788-
frameHandler().data(false, 5, buffer, messageFrameLength);
799+
frameHandler().data(false, 5, buffer, messageFrameLength, messageFrameLength);
789800

790801
verify(frameWriter, timeout(TIME_OUT_MS))
791-
.windowUpdate(eq(5), eq((long) 2 * messageFrameLength));
802+
.windowUpdate(eq(5), eq((long) 2 * messageFrameLength + paddingLength));
792803
verify(frameWriter, timeout(TIME_OUT_MS))
793804
.windowUpdate(eq(0), eq((long) 2 * messageFrameLength));
794805

@@ -819,7 +830,8 @@ public void windowUpdateWithInboundFlowControl() throws Exception {
819830
frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
820831
Buffer buffer = createMessageFrame(fakeMessage);
821832
long messageFrameLength = buffer.size();
822-
frameHandler().data(false, 3, buffer, (int) messageFrameLength);
833+
frameHandler().data(false, 3, buffer, (int) messageFrameLength,
834+
(int) messageFrameLength);
823835
ArgumentCaptor<Integer> idCaptor = ArgumentCaptor.forClass(Integer.class);
824836
verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(
825837
idCaptor.capture(), eq(messageFrameLength));
@@ -1123,7 +1135,8 @@ public void receiveGoAway() throws Exception {
11231135
frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
11241136
final String receivedMessage = "No, you are fine.";
11251137
Buffer buffer = createMessageFrame(receivedMessage);
1126-
frameHandler().data(false, 3, buffer, (int) buffer.size());
1138+
frameHandler().data(false, 3, buffer, (int) buffer.size(),
1139+
(int) buffer.size());
11271140
frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS);
11281141
listener1.waitUntilStreamClosed();
11291142
assertEquals(1, listener1.messages.size());
@@ -1154,12 +1167,12 @@ public void streamIdExhausted() throws Exception {
11541167
assertNotNull(listener.headers);
11551168
String message = "hello";
11561169
Buffer buffer = createMessageFrame(message);
1157-
frameHandler().data(false, startId, buffer, (int) buffer.size());
1170+
frameHandler().data(false, startId, buffer, (int) buffer.size(), (int) buffer.size());
11581171

11591172
getStream(startId).cancel(Status.CANCELLED);
11601173
// Receives the second message after be cancelled.
11611174
buffer = createMessageFrame(message);
1162-
frameHandler().data(false, startId, buffer, (int) buffer.size());
1175+
frameHandler().data(false, startId, buffer, (int) buffer.size(), (int) buffer.size());
11631176

11641177
listener.waitUntilStreamClosed();
11651178
// Should only have the first message delivered.
@@ -1329,7 +1342,7 @@ public void receivingWindowExceeded() throws Exception {
13291342
byte[] fakeMessage = new byte[messageLength];
13301343
Buffer buffer = createMessageFrame(fakeMessage);
13311344
int messageFrameLength = (int) buffer.size();
1332-
frameHandler().data(false, 3, buffer, messageFrameLength);
1345+
frameHandler().data(false, 3, buffer, messageFrameLength, messageFrameLength);
13331346

13341347
listener.waitUntilStreamClosed();
13351348
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
@@ -1392,7 +1405,8 @@ public void receiveDataWithoutHeader() throws Exception {
13921405
stream.start(listener);
13931406
stream.request(1);
13941407
Buffer buffer = createMessageFrame(new byte[1]);
1395-
frameHandler().data(false, 3, buffer, (int) buffer.size());
1408+
frameHandler().data(false, 3, buffer, (int) buffer.size(),
1409+
(int) buffer.size());
13961410

13971411
// Trigger the failure by a trailer.
13981412
frameHandler().headers(
@@ -1414,11 +1428,13 @@ public void receiveDataWithoutHeaderAndTrailer() throws Exception {
14141428
stream.start(listener);
14151429
stream.request(1);
14161430
Buffer buffer = createMessageFrame(new byte[1]);
1417-
frameHandler().data(false, 3, buffer, (int) buffer.size());
1431+
frameHandler().data(false, 3, buffer, (int) buffer.size(),
1432+
(int) buffer.size());
14181433

14191434
// Trigger the failure by a data frame.
14201435
buffer = createMessageFrame(new byte[1]);
1421-
frameHandler().data(true, 3, buffer, (int) buffer.size());
1436+
frameHandler().data(true, 3, buffer, (int) buffer.size(),
1437+
(int) buffer.size());
14221438

14231439
listener.waitUntilStreamClosed();
14241440
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
@@ -1436,7 +1452,8 @@ public void receiveLongEnoughDataWithoutHeaderAndTrailer() throws Exception {
14361452
stream.start(listener);
14371453
stream.request(1);
14381454
Buffer buffer = createMessageFrame(new byte[1000]);
1439-
frameHandler().data(false, 3, buffer, (int) buffer.size());
1455+
frameHandler().data(false, 3, buffer, (int) buffer.size(),
1456+
(int) buffer.size());
14401457

14411458
// Once we receive enough detail, we cancel the stream. so we should have sent cancel.
14421459
verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
@@ -1459,15 +1476,17 @@ public void receiveDataForUnknownStreamUpdateConnectionWindow() throws Exception
14591476

14601477
Buffer buffer = createMessageFrame(
14611478
new byte[INITIAL_WINDOW_SIZE / 2 + 1]);
1462-
frameHandler().data(false, 3, buffer, (int) buffer.size());
1479+
frameHandler().data(false, 3, buffer, (int) buffer.size(),
1480+
(int) buffer.size());
14631481
// Should still update the connection window even stream 3 is gone.
14641482
verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(0,
14651483
HEADER_LENGTH + INITIAL_WINDOW_SIZE / 2 + 1);
14661484
buffer = createMessageFrame(
14671485
new byte[INITIAL_WINDOW_SIZE / 2 + 1]);
14681486

14691487
// This should kill the connection, since we never created stream 5.
1470-
frameHandler().data(false, 5, buffer, (int) buffer.size());
1488+
frameHandler().data(false, 5, buffer, (int) buffer.size(),
1489+
(int) buffer.size());
14711490
verify(frameWriter, timeout(TIME_OUT_MS))
14721491
.goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class));
14731492
verify(transportListener).transportShutdown(isA(Status.class));
@@ -2114,10 +2133,15 @@ private static Buffer createMessageFrame(String message) {
21142133
}
21152134

21162135
private static Buffer createMessageFrame(byte[] message) {
2136+
return createMessageFrame(message,0);
2137+
}
2138+
2139+
private static Buffer createMessageFrame(byte[] message, int paddingLength) {
21172140
Buffer buffer = new Buffer();
21182141
buffer.writeByte(0 /* UNCOMPRESSED */);
21192142
buffer.writeInt(message.length);
21202143
buffer.write(message);
2144+
buffer.write(new byte[paddingLength]);
21212145
return buffer;
21222146
}
21232147

0 commit comments

Comments
 (0)