Skip to content

Commit fd8fd51

Browse files
committed
Context deadline propagation should cascade. Fixes #1205
A call's timeout as specified in its metadata should be set depending on the deadline of the call's context. If a call has an explicit deadline set (through CallOptions), then the smaller deadline (from context and call options) should be used to compute the timeout. Also, a new method Contexts.statusFromCancelled(Context) was introduced that attempts to map a canceled context to a gRPC status.
1 parent 363e0f6 commit fd8fd51

File tree

12 files changed

+456
-131
lines changed

12 files changed

+456
-131
lines changed

core/src/main/java/io/grpc/Context.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -651,14 +651,20 @@ private CancellableContext(Context parent, Deadline deadline,
651651
ScheduledExecutorService scheduler) {
652652
super(parent, deriveDeadline(parent, deadline), true);
653653
if (DEADLINE_KEY.get(this) == deadline) {
654-
// The parent deadline was after the new deadline so we need to install a listener
655-
// on the new earlier deadline to trigger expiration for this context.
656-
pendingDeadline = deadline.runOnExpiration(new Runnable() {
657-
@Override
658-
public void run() {
659-
cancel(new TimeoutException("context timed out"));
660-
}
661-
}, scheduler);
654+
final TimeoutException cause = new TimeoutException("context timed out");
655+
if (!deadline.isExpired()) {
656+
// The parent deadline was after the new deadline so we need to install a listener
657+
// on the new earlier deadline to trigger expiration for this context.
658+
pendingDeadline = deadline.runOnExpiration(new Runnable() {
659+
@Override
660+
public void run() {
661+
cancel(cause);
662+
}
663+
}, scheduler);
664+
} else {
665+
// Cancel immediately if the deadline is already expired.
666+
cancel(cause);
667+
}
662668
}
663669
uncancellableSurrogate = new Context(this, EMPTY_ENTRIES);
664670
}

core/src/main/java/io/grpc/Contexts.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@
3131

3232
package io.grpc;
3333

34+
import com.google.common.base.Preconditions;
35+
36+
import java.util.concurrent.TimeoutException;
37+
3438
/**
3539
* Utility methods for working with {@link Context}s in GRPC.
3640
*/
@@ -130,4 +134,33 @@ public void onReady() {
130134
}
131135
}
132136
}
137+
138+
/**
139+
* Returns the {@link Status} of a cancelled context or {@code null} if the context
140+
* is not cancelled.
141+
*/
142+
public static Status statusFromCancelled(Context context) {
143+
Preconditions.checkNotNull(context, "context must not be null");
144+
if (!context.isCancelled()) {
145+
return null;
146+
}
147+
148+
Throwable cancellationCause = context.cancellationCause();
149+
if (cancellationCause == null) {
150+
return Status.CANCELLED;
151+
}
152+
if (cancellationCause instanceof TimeoutException) {
153+
return Status.DEADLINE_EXCEEDED
154+
.withDescription(cancellationCause.getMessage())
155+
.withCause(cancellationCause);
156+
}
157+
Status status = Status.fromThrowable(cancellationCause);
158+
if (Status.Code.UNKNOWN.equals(status.getCode())
159+
&& status.getCause() == cancellationCause) {
160+
// If fromThrowable could not determine a status, then
161+
// just return CANCELLED.
162+
return Status.CANCELLED.withCause(cancellationCause);
163+
}
164+
return status.withCause(cancellationCause);
165+
}
133166
}

core/src/main/java/io/grpc/internal/ClientCallImpl.java

Lines changed: 85 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@
3535
import static com.google.common.base.Preconditions.checkNotNull;
3636
import static com.google.common.base.Preconditions.checkState;
3737
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
38+
import static io.grpc.Contexts.statusFromCancelled;
39+
import static io.grpc.Status.DEADLINE_EXCEEDED;
3840
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_JOINER;
3941
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
4042
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
4143
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
4244
import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;
43-
import static java.util.concurrent.TimeUnit.NANOSECONDS;
45+
import static java.lang.Math.max;
4446

4547
import com.google.common.annotations.VisibleForTesting;
4648
import com.google.common.base.Preconditions;
@@ -62,7 +64,9 @@
6264
import java.io.InputStream;
6365
import java.util.concurrent.Executor;
6466
import java.util.concurrent.ScheduledExecutorService;
65-
import java.util.concurrent.ScheduledFuture;
67+
import java.util.concurrent.TimeUnit;
68+
import java.util.logging.Level;
69+
import java.util.logging.Logger;
6670

6771
import javax.annotation.Nullable;
6872

@@ -71,20 +75,22 @@
7175
*/
7276
final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
7377
implements Context.CancellationListener {
78+
79+
private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName());
80+
7481
private final MethodDescriptor<ReqT, RespT> method;
7582
private final Executor callExecutor;
76-
private final Context context;
83+
private final Context parentContext;
84+
private volatile Context context;
7785
private final boolean unaryRequest;
7886
private final CallOptions callOptions;
7987
private ClientStream stream;
80-
private volatile ScheduledFuture<?> deadlineCancellationFuture;
81-
private volatile boolean deadlineCancellationFutureShouldBeCancelled;
88+
private volatile boolean contextListenerShouldBeRemoved;
8289
private boolean cancelCalled;
8390
private boolean halfCloseCalled;
8491
private final ClientTransportProvider clientTransportProvider;
8592
private String userAgent;
8693
private ScheduledExecutorService deadlineCancellationExecutor;
87-
private Compressor compressor;
8894
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
8995
private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
9096

@@ -99,7 +105,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
99105
? new SerializeReentrantCallsDirectExecutor()
100106
: new SerializingExecutor(executor);
101107
// Propagate the context from the thread which initiated the call to all callbacks.
102-
this.context = Context.current();
108+
this.parentContext = Context.current();
103109
this.unaryRequest = method.getType() == MethodType.UNARY
104110
|| method.getType() == MethodType.SERVER_STREAMING;
105111
this.callOptions = callOptions;
@@ -109,7 +115,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
109115

110116
@Override
111117
public void cancelled(Context context) {
112-
stream.cancel(Status.CANCELLED.withCause(context.cancellationCause()));
118+
stream.cancel(statusFromCancelled(context));
113119
}
114120

115121
/**
@@ -165,19 +171,28 @@ public void start(final Listener<RespT> observer, Metadata headers) {
165171
checkNotNull(observer, "observer");
166172
checkNotNull(headers, "headers");
167173

174+
// Create the context
175+
final Deadline effectiveDeadline = min(callOptions.getDeadline(), parentContext.getDeadline());
176+
if (effectiveDeadline != parentContext.getDeadline()) {
177+
context = parentContext.withDeadline(effectiveDeadline, deadlineCancellationExecutor);
178+
} else {
179+
context = parentContext.withCancellation();
180+
}
181+
168182
if (context.isCancelled()) {
169183
// Context is already cancelled so no need to create a real stream, just notify the observer
170184
// of cancellation via callback on the executor
171185
stream = NoopClientStream.INSTANCE;
172186
callExecutor.execute(new ContextRunnable(context) {
173187
@Override
174188
public void runInContext() {
175-
observer.onClose(Status.CANCELLED.withCause(context.cancellationCause()), new Metadata());
189+
observer.onClose(statusFromCancelled(context), new Metadata());
176190
}
177191
});
178192
return;
179193
}
180194
final String compressorName = callOptions.getCompressor();
195+
Compressor compressor = null;
181196
if (compressorName != null) {
182197
compressor = compressorRegistry.lookupCompressor(compressorName);
183198
if (compressor == null) {
@@ -199,11 +214,14 @@ public void runInContext() {
199214

200215
prepareHeaders(headers, callOptions, userAgent, decompressorRegistry, compressor);
201216

202-
if (updateTimeoutHeader(callOptions.getDeadline(), headers)) {
217+
final boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
218+
if (!deadlineExceeded) {
219+
updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(),
220+
parentContext.getDeadline(), headers);
203221
ClientTransport transport = clientTransportProvider.get(callOptions);
204222
stream = transport.newStream(method, headers);
205223
} else {
206-
stream = new FailingClientStream(Status.DEADLINE_EXCEEDED);
224+
stream = new FailingClientStream(DEADLINE_EXCEEDED);
207225
}
208226

209227
if (callOptions.getAuthority() != null) {
@@ -215,45 +233,66 @@ public void runInContext() {
215233
if (compressor != Codec.Identity.NONE) {
216234
stream.setMessageCompression(true);
217235
}
236+
218237
// Delay any sources of cancellation after start(), because most of the transports are broken if
219238
// they receive cancel before start. Issue #1343 has more details
220239

221-
// Start the deadline timer after stream creation because it will close the stream
222-
if (callOptions.getDeadline() != null) {
223-
long timeoutNanos = callOptions.getDeadline().timeRemaining(NANOSECONDS);
224-
deadlineCancellationFuture = startDeadlineTimer(timeoutNanos);
225-
if (deadlineCancellationFutureShouldBeCancelled) {
226-
// Race detected! ClientStreamListener.closed may have been called before
227-
// deadlineCancellationFuture was set, thereby preventing the future from being cancelled.
228-
// Go ahead and cancel again, just to be sure it was cancelled.
229-
deadlineCancellationFuture.cancel(false);
230-
}
231-
}
232240
// Propagate later Context cancellation to the remote side.
233-
this.context.addListener(this, directExecutor());
241+
context.addListener(this, directExecutor());
242+
if (contextListenerShouldBeRemoved) {
243+
// Race detected! ClientStreamListener.closed may have been called before
244+
// deadlineCancellationFuture was set, thereby preventing the future from being cancelled.
245+
// Go ahead and cancel again, just to be sure it was cancelled.
246+
context.removeListener(this);
247+
}
234248
}
235249

236250
/**
237251
* Based on the deadline, calculate and set the timeout to the given headers.
238-
*
239-
* @return {@code false} if deadline already exceeded
240252
*/
241-
static boolean updateTimeoutHeader(@Nullable Deadline deadline, Metadata headers) {
242-
// Fill out timeout on the headers
243-
// TODO(carl-mastrangelo): Find out if this should always remove the timeout,
244-
// even when returning false.
253+
private static void updateTimeoutHeaders(@Nullable Deadline effectiveDeadline,
254+
@Nullable Deadline callDeadline, @Nullable Deadline outerCallDeadline, Metadata headers) {
245255
headers.removeAll(TIMEOUT_KEY);
246256

247-
if (deadline != null) {
248-
// Convert the deadline to timeout. Timeout is more favorable than deadline on the wire
249-
// because timeout tolerates the clock difference between machines.
250-
long timeoutNanos = deadline.timeRemaining(NANOSECONDS);
251-
if (timeoutNanos <= 0) {
252-
return false;
253-
}
254-
headers.put(TIMEOUT_KEY, timeoutNanos);
257+
if (effectiveDeadline == null) {
258+
return;
255259
}
256-
return true;
260+
261+
long effectiveTimeout = max(0, effectiveDeadline.timeRemaining(TimeUnit.NANOSECONDS));
262+
headers.put(TIMEOUT_KEY, effectiveTimeout);
263+
264+
logIfContextNarrowedTimeout(effectiveTimeout, effectiveDeadline, outerCallDeadline,
265+
callDeadline);
266+
}
267+
268+
private static void logIfContextNarrowedTimeout(long effectiveTimeout,
269+
Deadline effectiveDeadline, @Nullable Deadline outerCallDeadline,
270+
@Nullable Deadline callDeadline) {
271+
if (!log.isLoggable(Level.INFO) || outerCallDeadline != effectiveDeadline) {
272+
return;
273+
}
274+
275+
StringBuilder builder = new StringBuilder();
276+
builder.append(String.format("Call timeout set to '%d' ns, due to context deadline.",
277+
effectiveTimeout));
278+
if (callDeadline == null) {
279+
builder.append(" Explicit call timeout was not set.");
280+
} else {
281+
long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS);
282+
builder.append(String.format(" Explicit call timeout was '%d' ns.", callTimeout));
283+
}
284+
285+
log.info(builder.toString());
286+
}
287+
288+
private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
289+
if (deadline0 == null) {
290+
return deadline1;
291+
}
292+
if (deadline1 == null) {
293+
return deadline0;
294+
}
295+
return deadline0.minimum(deadline1);
257296
}
258297

259298
@Override
@@ -276,7 +315,9 @@ public void cancel() {
276315
stream.cancel(Status.CANCELLED);
277316
}
278317
} finally {
279-
context.removeListener(ClientCallImpl.this);
318+
if (context != null) {
319+
context.removeListener(ClientCallImpl.this);
320+
}
280321
}
281322
}
282323

@@ -321,15 +362,6 @@ public boolean isReady() {
321362
return stream.isReady();
322363
}
323364

324-
private ScheduledFuture<?> startDeadlineTimer(long timeoutNanos) {
325-
return deadlineCancellationExecutor.schedule(new Runnable() {
326-
@Override
327-
public void run() {
328-
stream.cancel(Status.DEADLINE_EXCEEDED);
329-
}
330-
}, timeoutNanos, NANOSECONDS);
331-
}
332-
333365
private class ClientStreamListenerImpl implements ClientStreamListener {
334366
private final Listener<RespT> observer;
335367
private boolean closed;
@@ -394,13 +426,13 @@ public final void runInContext() {
394426

395427
@Override
396428
public void closed(Status status, Metadata trailers) {
397-
if (status.getCode() == Status.Code.CANCELLED && callOptions.getDeadline() != null) {
429+
Deadline deadline = context.getDeadline();
430+
if (status.getCode() == Status.Code.CANCELLED && deadline != null) {
398431
// When the server's deadline expires, it can only reset the stream with CANCEL and no
399432
// description. Since our timer may be delayed in firing, we double-check the deadline and
400433
// turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
401-
long timeoutNanos = callOptions.getDeadline().timeRemaining(NANOSECONDS);
402-
if (timeoutNanos <= 0) {
403-
status = Status.DEADLINE_EXCEEDED;
434+
if (deadline.isExpired()) {
435+
status = DEADLINE_EXCEEDED;
404436
// Replace trailers to prevent mixing sources of status and trailers.
405437
trailers = new Metadata();
406438
}
@@ -412,12 +444,7 @@ public void closed(Status status, Metadata trailers) {
412444
public final void runInContext() {
413445
try {
414446
closed = true;
415-
deadlineCancellationFutureShouldBeCancelled = true;
416-
// manually optimize the volatile read
417-
ScheduledFuture<?> future = deadlineCancellationFuture;
418-
if (future != null) {
419-
future.cancel(false);
420-
}
447+
contextListenerShouldBeRemoved = true;
421448
observer.onClose(savedStatus, savedTrailers);
422449
} finally {
423450
context.removeListener(ClientCallImpl.this);

core/src/main/java/io/grpc/internal/ServerCallImpl.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
import java.io.InputStream;
5959
import java.util.List;
6060
import java.util.Set;
61-
import java.util.concurrent.Future;
6261

6362
final class ServerCallImpl<ReqT, RespT> extends ServerCall<RespT> {
6463
private final ServerStream stream;
@@ -198,9 +197,8 @@ public boolean isCancelled() {
198197
return cancelled;
199198
}
200199

201-
ServerStreamListener newServerStreamListener(ServerCall.Listener<ReqT> listener,
202-
Future<?> timeout) {
203-
return new ServerStreamListenerImpl<ReqT>(this, listener, timeout, context);
200+
ServerStreamListener newServerStreamListener(ServerCall.Listener<ReqT> listener) {
201+
return new ServerStreamListenerImpl<ReqT>(this, listener, context);
204202
}
205203

206204
@Override
@@ -216,16 +214,14 @@ public Attributes attributes() {
216214
static final class ServerStreamListenerImpl<ReqT> implements ServerStreamListener {
217215
private final ServerCallImpl<ReqT, ?> call;
218216
private final ServerCall.Listener<ReqT> listener;
219-
private final Future<?> timeout;
220217
private final Context.CancellableContext context;
221218
private boolean messageReceived;
222219

223220
public ServerStreamListenerImpl(
224-
ServerCallImpl<ReqT, ?> call, ServerCall.Listener<ReqT> listener, Future<?> timeout,
221+
ServerCallImpl<ReqT, ?> call, ServerCall.Listener<ReqT> listener,
225222
Context.CancellableContext context) {
226223
this.call = checkNotNull(call, "call");
227224
this.listener = checkNotNull(listener, "listener must not be null");
228-
this.timeout = checkNotNull(timeout, "timeout");
229225
this.context = checkNotNull(context, "context");
230226
}
231227

@@ -265,7 +261,6 @@ public void halfClosed() {
265261

266262
@Override
267263
public void closed(Status status) {
268-
timeout.cancel(true);
269264
try {
270265
if (status.isOk()) {
271266
listener.onComplete();

0 commit comments

Comments
 (0)