Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,12 @@ public void transportHeadersReceived(List<Header> headers, boolean endOfStream)
* Must be called with holding the transport lock.
*/
@GuardedBy("lock")
public void transportDataReceived(okio.Buffer frame, boolean endOfStream) {
public void transportDataReceived(okio.Buffer frame, boolean endOfStream, int paddingLen) {
// We only support 16 KiB frames, and the max permitted in HTTP/2 is 16 MiB. This is verified
// in OkHttp's Http2 deframer. In addition, this code is after the data has been read.
int length = (int) frame.size();
window -= length;
window -= length + paddingLen;
processedWindow -= paddingLen;
if (window < 0) {
frameWriter.rstStream(id(), ErrorCode.FLOW_CONTROL_ERROR);
transport.finishStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,8 @@ public void run() {
*/
@SuppressWarnings("GuardedBy")
@Override
public void data(boolean inFinished, int streamId, BufferedSource in, int length)
public void data(boolean inFinished, int streamId, BufferedSource in, int length,
int paddedLength)
throws IOException {
logger.logData(OkHttpFrameLogger.Direction.INBOUND,
streamId, in.getBuffer(), length, inFinished);
Expand All @@ -1166,12 +1167,12 @@ public void data(boolean inFinished, int streamId, BufferedSource in, int length
synchronized (lock) {
// TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
// instead found: 'OkHttpClientTransport.this.lock'
stream.transportState().transportDataReceived(buf, inFinished);
stream.transportState().transportDataReceived(buf, inFinished, paddedLength - length);
}
}

// connection window update
connectionUnacknowledgedBytesRead += length;
connectionUnacknowledgedBytesRead += paddedLength;
if (connectionUnacknowledgedBytesRead >= initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO) {
synchronized (lock) {
frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
Expand Down
6 changes: 4 additions & 2 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,15 @@ public void runOnTransportThread(final Runnable r) {
* Must be called with holding the transport lock.
*/
@Override
public void inboundDataReceived(okio.Buffer frame, int windowConsumed, boolean endOfStream) {
public void inboundDataReceived(okio.Buffer frame, int dataLength, int paddingLength,
boolean endOfStream) {
synchronized (lock) {
PerfMark.event("OkHttpServerTransport$FrameHandler.data", tag);
if (endOfStream) {
this.receivedEndOfStream = true;
}
window -= windowConsumed;
window -= dataLength + paddingLength;
processedWindow -= paddingLength;
super.inboundDataReceived(new OkHttpReadableBuffer(frame), endOfStream);
}
}
Expand Down
25 changes: 13 additions & 12 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount
TimeUnit.NANOSECONDS);
}

transportExecutor.execute(
new FrameHandler(variant.newReader(Okio.buffer(Okio.source(socket)), false)));
transportExecutor.execute(new FrameHandler(
variant.newReader(Okio.buffer(Okio.source(socket)), false)));
} catch (Error | IOException | RuntimeException ex) {
synchronized (lock) {
if (!handshakeShutdown) {
Expand Down Expand Up @@ -708,7 +708,7 @@ public void headers(boolean outFinished,
return;
}
// Ignore the trailers, but still half-close the stream
stream.inboundDataReceived(new Buffer(), 0, true);
stream.inboundDataReceived(new Buffer(), 0, 0, true);
return;
}
} else {
Expand Down Expand Up @@ -799,7 +799,7 @@ public void headers(boolean outFinished,
listener.streamCreated(streamForApp, method, metadata);
stream.onStreamAllocated();
if (inFinished) {
stream.inboundDataReceived(new Buffer(), 0, inFinished);
stream.inboundDataReceived(new Buffer(), 0, 0, inFinished);
}
}
}
Expand All @@ -819,7 +819,8 @@ private int headerBlockSize(List<Header> headerBlock) {
* Handle an HTTP2 DATA frame.
*/
@Override
public void data(boolean inFinished, int streamId, BufferedSource in, int length)
public void data(boolean inFinished, int streamId, BufferedSource in, int length,
int paddedLength)
throws IOException {
frameLogger.logData(
OkHttpFrameLogger.Direction.INBOUND, streamId, in.getBuffer(), length, inFinished);
Expand Down Expand Up @@ -853,19 +854,19 @@ public void data(boolean inFinished, int streamId, BufferedSource in, int length
"Received DATA for half-closed (remote) stream. RFC7540 section 5.1");
return;
}
if (stream.inboundWindowAvailable() < length) {
if (stream.inboundWindowAvailable() < paddedLength) {
in.skip(length);
streamError(streamId, ErrorCode.FLOW_CONTROL_ERROR,
"Received DATA size exceeded window size. RFC7540 section 6.9");
return;
}
Buffer buf = new Buffer();
buf.write(in.getBuffer(), length);
stream.inboundDataReceived(buf, length, inFinished);
stream.inboundDataReceived(buf, length, paddedLength - length, inFinished);
}

// connection window update
connectionUnacknowledgedBytesRead += length;
connectionUnacknowledgedBytesRead += paddedLength;
if (connectionUnacknowledgedBytesRead
>= config.flowControlWindow * Utils.DEFAULT_WINDOW_UPDATE_RATIO) {
synchronized (lock) {
Expand Down Expand Up @@ -1064,7 +1065,7 @@ private void respondWithHttpError(
}
streams.put(streamId, stream);
if (inFinished) {
stream.inboundDataReceived(new Buffer(), 0, true);
stream.inboundDataReceived(new Buffer(), 0, 0, true);
}
frameWriter.headers(streamId, headers);
outboundFlow.data(
Expand Down Expand Up @@ -1122,7 +1123,7 @@ public void onPingTimeout() {

interface StreamState {
/** Must be holding 'lock' when calling. */
void inboundDataReceived(Buffer frame, int windowConsumed, boolean endOfStream);
void inboundDataReceived(Buffer frame, int dataLength, int paddingLength, boolean endOfStream);

/** Must be holding 'lock' when calling. */
boolean hasReceivedEndOfStream();
Expand Down Expand Up @@ -1159,12 +1160,12 @@ static class Http2ErrorStreamState implements StreamState, OutboundFlowControlle
@Override public void onSentBytes(int frameBytes) {}

@Override public void inboundDataReceived(
Buffer frame, int windowConsumed, boolean endOfStream) {
Buffer frame, int dataLength, int paddingLength, boolean endOfStream) {
synchronized (lock) {
if (endOfStream) {
receivedEndOfStream = true;
}
window -= windowConsumed;
window -= dataLength + paddingLength;
try {
frame.skip(frame.size()); // Recycle segments
} catch (IOException ex) {
Expand Down
76 changes: 50 additions & 26 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,16 @@ public void close() throws SecurityException {

final String message = "Hello Client";
Buffer buffer = createMessageFrame(message);
frameHandler().data(false, 3, buffer, (int) buffer.size());
frameHandler().data(false, 3, buffer, (int) buffer.size(),
(int) buffer.size());
assertThat(logs).hasSize(1);
log = logs.remove(0);
assertThat(log.getMessage()).startsWith(Direction.INBOUND + " DATA: streamId=" + 3);
assertThat(log.getLevel()).isEqualTo(Level.FINE);

// At most 64 bytes of data frame will be logged.
frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])), 1000);
frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])),
1000, 1000);
assertThat(logs).hasSize(1);
log = logs.remove(0);
String data = log.getMessage();
Expand Down Expand Up @@ -377,7 +379,8 @@ public void maxMessageSizeShouldBeEnforced() throws Exception {
// Receive the message.
final String message = "Hello Client";
Buffer buffer = createMessageFrame(message);
frameHandler().data(false, 3, buffer, (int) buffer.size());
frameHandler().data(false, 3, buffer, (int) buffer.size(),
(int) buffer.size());

listener.waitUntilStreamClosed();
assertEquals(Code.RESOURCE_EXHAUSTED, listener.status.getCode());
Expand Down Expand Up @@ -500,7 +503,8 @@ public void readMessages() throws Exception {
assertNotNull(listener.headers);
for (int i = 0; i < numMessages; i++) {
Buffer buffer = createMessageFrame(message + i);
frameHandler().data(false, 3, buffer, (int) buffer.size());
frameHandler().data(false, 3, buffer, (int) buffer.size(),
(int) buffer.size());
}
frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS);
listener.waitUntilStreamClosed();
Expand Down Expand Up @@ -529,7 +533,8 @@ public void receivedHeadersForInvalidStreamShouldKillConnection() throws Excepti
@Test
public void receivedDataForInvalidStreamShouldKillConnection() throws Exception {
initTransport();
frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])), 1000);
frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])),
1000, 1000);
verify(frameWriter, timeout(TIME_OUT_MS))
.goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class));
verify(transportListener).transportShutdown(isA(Status.class));
Expand All @@ -551,7 +556,8 @@ public void invalidInboundHeadersCancelStream() throws Exception {
HeadersMode.HTTP_20_HEADERS);
// Now wait to receive 1000 bytes of data so we can have a better error message before
// cancelling the streaam.
frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])), 1000);
frameHandler().data(false, 3,
createMessageFrame(new String(new char[1000])), 1000, 1000);
verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
assertNull(listener.headers);
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
Expand Down Expand Up @@ -622,7 +628,8 @@ public void receiveResetNoError() throws Exception {
assertContainStream(3);
frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
Buffer buffer = createMessageFrame("a message");
frameHandler().data(false, 3, buffer, (int) buffer.size());
frameHandler().data(false, 3, buffer, (int) buffer.size(),
(int) buffer.size());
frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS);
frameHandler().rstStream(3, ErrorCode.NO_ERROR);
stream.request(1);
Expand Down Expand Up @@ -762,33 +769,37 @@ public void windowUpdate() throws Exception {

int messageLength = INITIAL_WINDOW_SIZE / 4;
byte[] fakeMessage = new byte[messageLength];
int paddingLength = 2;

// Stream 1 receives a message
Buffer buffer = createMessageFrame(fakeMessage);
Buffer buffer = createMessageFrame(fakeMessage, paddingLength);
int messageFrameLength = (int) buffer.size();
frameHandler().data(false, 3, buffer, messageFrameLength);
frameHandler().data(false, 3, buffer, messageFrameLength - paddingLength,
messageFrameLength);

// Stream 2 receives a message
buffer = createMessageFrame(fakeMessage);
frameHandler().data(false, 5, buffer, messageFrameLength);
buffer = createMessageFrame(fakeMessage, paddingLength);
frameHandler().data(false, 5, buffer, messageFrameLength - paddingLength,
messageFrameLength);

verify(frameWriter, timeout(TIME_OUT_MS))
.windowUpdate(eq(0), eq((long) 2 * messageFrameLength));
reset(frameWriter);

// Stream 1 receives another message
buffer = createMessageFrame(fakeMessage);
frameHandler().data(false, 3, buffer, messageFrameLength);
messageFrameLength = (int) buffer.size();
frameHandler().data(false, 3, buffer, messageFrameLength, messageFrameLength);

verify(frameWriter, timeout(TIME_OUT_MS))
.windowUpdate(eq(3), eq((long) 2 * messageFrameLength));
.windowUpdate(eq(3), eq((long) 2 * messageFrameLength + paddingLength));

// Stream 2 receives another message
buffer = createMessageFrame(fakeMessage);
frameHandler().data(false, 5, buffer, messageFrameLength);
frameHandler().data(false, 5, buffer, messageFrameLength, messageFrameLength);

verify(frameWriter, timeout(TIME_OUT_MS))
.windowUpdate(eq(5), eq((long) 2 * messageFrameLength));
.windowUpdate(eq(5), eq((long) 2 * messageFrameLength + paddingLength));
verify(frameWriter, timeout(TIME_OUT_MS))
.windowUpdate(eq(0), eq((long) 2 * messageFrameLength));

Expand Down Expand Up @@ -819,7 +830,8 @@ public void windowUpdateWithInboundFlowControl() throws Exception {
frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
Buffer buffer = createMessageFrame(fakeMessage);
long messageFrameLength = buffer.size();
frameHandler().data(false, 3, buffer, (int) messageFrameLength);
frameHandler().data(false, 3, buffer, (int) messageFrameLength,
(int) messageFrameLength);
ArgumentCaptor<Integer> idCaptor = ArgumentCaptor.forClass(Integer.class);
verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(
idCaptor.capture(), eq(messageFrameLength));
Expand Down Expand Up @@ -1123,7 +1135,8 @@ public void receiveGoAway() throws Exception {
frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
final String receivedMessage = "No, you are fine.";
Buffer buffer = createMessageFrame(receivedMessage);
frameHandler().data(false, 3, buffer, (int) buffer.size());
frameHandler().data(false, 3, buffer, (int) buffer.size(),
(int) buffer.size());
frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS);
listener1.waitUntilStreamClosed();
assertEquals(1, listener1.messages.size());
Expand Down Expand Up @@ -1154,12 +1167,12 @@ public void streamIdExhausted() throws Exception {
assertNotNull(listener.headers);
String message = "hello";
Buffer buffer = createMessageFrame(message);
frameHandler().data(false, startId, buffer, (int) buffer.size());
frameHandler().data(false, startId, buffer, (int) buffer.size(), (int) buffer.size());

getStream(startId).cancel(Status.CANCELLED);
// Receives the second message after be cancelled.
buffer = createMessageFrame(message);
frameHandler().data(false, startId, buffer, (int) buffer.size());
frameHandler().data(false, startId, buffer, (int) buffer.size(), (int) buffer.size());

listener.waitUntilStreamClosed();
// Should only have the first message delivered.
Expand Down Expand Up @@ -1329,7 +1342,7 @@ public void receivingWindowExceeded() throws Exception {
byte[] fakeMessage = new byte[messageLength];
Buffer buffer = createMessageFrame(fakeMessage);
int messageFrameLength = (int) buffer.size();
frameHandler().data(false, 3, buffer, messageFrameLength);
frameHandler().data(false, 3, buffer, messageFrameLength, messageFrameLength);

listener.waitUntilStreamClosed();
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
Expand Down Expand Up @@ -1392,7 +1405,8 @@ public void receiveDataWithoutHeader() throws Exception {
stream.start(listener);
stream.request(1);
Buffer buffer = createMessageFrame(new byte[1]);
frameHandler().data(false, 3, buffer, (int) buffer.size());
frameHandler().data(false, 3, buffer, (int) buffer.size(),
(int) buffer.size());

// Trigger the failure by a trailer.
frameHandler().headers(
Expand All @@ -1414,11 +1428,13 @@ public void receiveDataWithoutHeaderAndTrailer() throws Exception {
stream.start(listener);
stream.request(1);
Buffer buffer = createMessageFrame(new byte[1]);
frameHandler().data(false, 3, buffer, (int) buffer.size());
frameHandler().data(false, 3, buffer, (int) buffer.size(),
(int) buffer.size());

// Trigger the failure by a data frame.
buffer = createMessageFrame(new byte[1]);
frameHandler().data(true, 3, buffer, (int) buffer.size());
frameHandler().data(true, 3, buffer, (int) buffer.size(),
(int) buffer.size());

listener.waitUntilStreamClosed();
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
Expand All @@ -1436,7 +1452,8 @@ public void receiveLongEnoughDataWithoutHeaderAndTrailer() throws Exception {
stream.start(listener);
stream.request(1);
Buffer buffer = createMessageFrame(new byte[1000]);
frameHandler().data(false, 3, buffer, (int) buffer.size());
frameHandler().data(false, 3, buffer, (int) buffer.size(),
(int) buffer.size());

// Once we receive enough detail, we cancel the stream. so we should have sent cancel.
verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
Expand All @@ -1459,15 +1476,17 @@ public void receiveDataForUnknownStreamUpdateConnectionWindow() throws Exception

Buffer buffer = createMessageFrame(
new byte[INITIAL_WINDOW_SIZE / 2 + 1]);
frameHandler().data(false, 3, buffer, (int) buffer.size());
frameHandler().data(false, 3, buffer, (int) buffer.size(),
(int) buffer.size());
// Should still update the connection window even stream 3 is gone.
verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(0,
HEADER_LENGTH + INITIAL_WINDOW_SIZE / 2 + 1);
buffer = createMessageFrame(
new byte[INITIAL_WINDOW_SIZE / 2 + 1]);

// This should kill the connection, since we never created stream 5.
frameHandler().data(false, 5, buffer, (int) buffer.size());
frameHandler().data(false, 5, buffer, (int) buffer.size(),
(int) buffer.size());
verify(frameWriter, timeout(TIME_OUT_MS))
.goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class));
verify(transportListener).transportShutdown(isA(Status.class));
Expand Down Expand Up @@ -2114,10 +2133,15 @@ private static Buffer createMessageFrame(String message) {
}

private static Buffer createMessageFrame(byte[] message) {
return createMessageFrame(message,0);
}

private static Buffer createMessageFrame(byte[] message, int paddingLength) {
Buffer buffer = new Buffer();
buffer.writeByte(0 /* UNCOMPRESSED */);
buffer.writeInt(message.length);
buffer.write(message);
buffer.write(new byte[paddingLength]);
return buffer;
}

Expand Down
Loading