Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions stub/src/main/java/io/grpc/stub/BlockingClientCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public final class BlockingClientCall<ReqT, RespT> {
*/
public RespT read() throws InterruptedException, StatusException {
try {
return read(true, 0, TimeUnit.NANOSECONDS);
return read(true, 0);
} catch (TimeoutException e) {
throw new AssertionError("should never happen", e);
}
Expand All @@ -106,16 +106,14 @@ public RespT read() throws InterruptedException, StatusException {
*/
public RespT read(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException,
StatusException {
return read(false, timeout, unit);
long endNanoTime = System.nanoTime() + unit.toNanos(timeout);
return read(false, endNanoTime);
}

private RespT read(boolean waitForever, long timeout, TimeUnit unit)
private RespT read(boolean waitForever, long endNanoTime)
throws InterruptedException, TimeoutException, StatusException {
long start = System.nanoTime();
long end = start + unit.toNanos(timeout);

Predicate<BlockingClientCall<ReqT, RespT>> predicate = BlockingClientCall::skipWaitingForRead;
executor.waitAndDrainWithTimeout(waitForever, end, predicate, this);
executor.waitAndDrainWithTimeout(waitForever, endNanoTime, predicate, this);
RespT bufferedValue = buffer.poll();

if (logger.isLoggable(Level.FINER)) {
Expand Down Expand Up @@ -182,7 +180,7 @@ public boolean hasNext() throws InterruptedException, StatusException {
*/
public boolean write(ReqT request) throws InterruptedException, StatusException {
try {
return write(true, request, Integer.MAX_VALUE, TimeUnit.DAYS);
return write(true, request, 0);
} catch (TimeoutException e) {
throw new RuntimeException(e); // should never happen
}
Expand Down Expand Up @@ -211,21 +209,20 @@ public boolean write(ReqT request) throws InterruptedException, StatusException
*/
public boolean write(ReqT request, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException, StatusException {
return write(false, request, timeout, unit);
long endNanoTime = System.nanoTime() + unit.toNanos(timeout);
return write(false, request, endNanoTime);
}

private boolean write(boolean waitForever, ReqT request, long timeout, TimeUnit unit)
private boolean write(boolean waitForever, ReqT request, long endNanoTime)
throws InterruptedException, TimeoutException, StatusException {

if (writeClosed) {
throw new IllegalStateException("Writes cannot be done after calling halfClose or cancel");
}

long end = System.nanoTime() + unit.toNanos(timeout);

Predicate<BlockingClientCall<ReqT, RespT>> predicate =
(x) -> x.call.isReady() || x.closedStatus != null;
executor.waitAndDrainWithTimeout(waitForever, end, predicate, this);
executor.waitAndDrainWithTimeout(waitForever, endNanoTime, predicate, this);
Status savedClosedStatus = closedStatus;
if (savedClosedStatus == null) {
call.sendMessage(request);
Expand Down