Skip to content

Commit 8d1fbc9

Browse files
clmronshapiro
authored andcommitted
Move ExecutionSequencer into guava as @beta.
RELNOTES=Added `ExecutionSequencer`, a new utility to run a series of asynchronous operations serially. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=206609150
1 parent f1249c4 commit 8d1fbc9

File tree

4 files changed

+706
-0
lines changed

4 files changed

+706
-0
lines changed
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Copyright (C) 2018 The Guava Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5+
* in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License
10+
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11+
* or implied. See the License for the specific language governing permissions and limitations under
12+
* the License.
13+
*/
14+
15+
package com.google.common.util.concurrent;
16+
17+
import static com.google.common.truth.Truth.assertThat;
18+
import static com.google.common.util.concurrent.Futures.getDone;
19+
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
20+
21+
import java.util.concurrent.Callable;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.Future;
26+
import java.util.concurrent.TimeUnit;
27+
import org.junit.After;
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
import org.junit.runner.RunWith;
31+
import org.junit.runners.JUnit4;
32+
33+
/** Tests for {@link ExecutionSequencer} */
34+
@RunWith(JUnit4.class)
35+
public class ExecutionSequencerTest {
36+
37+
ExecutorService executor;
38+
39+
private ExecutionSequencer serializer;
40+
private SettableFuture<Void> firstFuture;
41+
private TestCallable firstCallable;
42+
43+
@Before
44+
public void setUp() throws Exception {
45+
executor = Executors.newCachedThreadPool();
46+
serializer = ExecutionSequencer.create();
47+
firstFuture = SettableFuture.create();
48+
firstCallable = new TestCallable(firstFuture);
49+
}
50+
51+
@After
52+
public void tearDown() throws Exception {
53+
executor.shutdown();
54+
}
55+
56+
@Test
57+
public void testCallableStartsAfterFirstFutureCompletes() {
58+
@SuppressWarnings({"unused", "nullness"})
59+
Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor());
60+
TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null));
61+
@SuppressWarnings({"unused", "nullness"})
62+
Future<?> possiblyIgnoredError1 = serializer.submitAsync(secondCallable, directExecutor());
63+
assertThat(firstCallable.called).isTrue();
64+
assertThat(secondCallable.called).isFalse();
65+
firstFuture.set(null);
66+
assertThat(secondCallable.called).isTrue();
67+
}
68+
69+
@Test
70+
public void testCancellationNotPropagatedIfAlreadyStarted() {
71+
serializer.submitAsync(firstCallable, directExecutor()).cancel(true);
72+
assertThat(firstFuture.isCancelled()).isFalse();
73+
}
74+
75+
@Test
76+
public void testCancellationDoesNotViolateSerialization() {
77+
@SuppressWarnings({"unused", "nullness"})
78+
Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor());
79+
TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null));
80+
ListenableFuture<Void> secondFuture = serializer.submitAsync(secondCallable, directExecutor());
81+
TestCallable thirdCallable = new TestCallable(Futures.<Void>immediateFuture(null));
82+
@SuppressWarnings({"unused", "nullness"})
83+
Future<?> possiblyIgnoredError1 = serializer.submitAsync(thirdCallable, directExecutor());
84+
secondFuture.cancel(true);
85+
assertThat(secondCallable.called).isFalse();
86+
assertThat(thirdCallable.called).isFalse();
87+
firstFuture.set(null);
88+
assertThat(secondCallable.called).isFalse();
89+
assertThat(thirdCallable.called).isTrue();
90+
}
91+
92+
@Test
93+
public void testCancellationMultipleThreads() throws Exception {
94+
final BlockingCallable blockingCallable = new BlockingCallable();
95+
ListenableFuture<Void> unused = serializer.submit(blockingCallable, executor);
96+
ListenableFuture<Boolean> future2 =
97+
serializer.submit(
98+
new Callable<Boolean>() {
99+
@Override
100+
public Boolean call() {
101+
return blockingCallable.isRunning();
102+
}
103+
},
104+
directExecutor());
105+
106+
// Wait for the first task to be started in the background. It will block until we explicitly
107+
// stop it.
108+
blockingCallable.waitForStart();
109+
110+
// Give the second task a chance to (incorrectly) start up while the first task is running.
111+
assertThat(future2.isDone()).isFalse();
112+
113+
// Stop the first task. The second task should then run.
114+
blockingCallable.stop();
115+
executor.shutdown();
116+
assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
117+
assertThat(getDone(future2)).isFalse();
118+
}
119+
120+
@Test
121+
public void secondTaskWaitsForFirstEvenIfCancelled() throws Exception {
122+
final BlockingCallable blockingCallable = new BlockingCallable();
123+
ListenableFuture<Void> future1 = serializer.submit(blockingCallable, executor);
124+
ListenableFuture<Boolean> future2 =
125+
serializer.submit(
126+
new Callable<Boolean>() {
127+
@Override
128+
public Boolean call() {
129+
return blockingCallable.isRunning();
130+
}
131+
},
132+
directExecutor());
133+
134+
// Wait for the first task to be started in the background. It will block until we explicitly
135+
// stop it.
136+
blockingCallable.waitForStart();
137+
138+
// This time, cancel the future for the first task. The task remains running, only the future
139+
// is cancelled.
140+
future1.cancel(false);
141+
142+
// Give the second task a chance to (incorrectly) start up while the first task is running.
143+
// (This is the assertion that fails.)
144+
assertThat(future2.isDone()).isFalse();
145+
146+
// Stop the first task. The second task should then run.
147+
blockingCallable.stop();
148+
executor.shutdown();
149+
assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
150+
assertThat(getDone(future2)).isFalse();
151+
}
152+
153+
private static class BlockingCallable implements Callable<Void> {
154+
private final CountDownLatch startLatch = new CountDownLatch(1);
155+
private final CountDownLatch stopLatch = new CountDownLatch(1);
156+
157+
private volatile boolean running = false;
158+
159+
@Override
160+
public Void call() throws InterruptedException {
161+
running = true;
162+
startLatch.countDown();
163+
stopLatch.await();
164+
running = false;
165+
return null;
166+
}
167+
168+
public void waitForStart() throws InterruptedException {
169+
startLatch.await();
170+
}
171+
172+
public void stop() {
173+
stopLatch.countDown();
174+
}
175+
176+
public boolean isRunning() {
177+
return running;
178+
}
179+
}
180+
181+
private static final class TestCallable implements AsyncCallable<Void> {
182+
183+
private final ListenableFuture<Void> future;
184+
private boolean called = false;
185+
186+
private TestCallable(ListenableFuture<Void> future) {
187+
this.future = future;
188+
}
189+
190+
@Override
191+
public ListenableFuture<Void> call() throws Exception {
192+
called = true;
193+
return future;
194+
}
195+
}
196+
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Copyright (C) 2018 The Guava Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5+
* in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License
10+
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11+
* or implied. See the License for the specific language governing permissions and limitations under
12+
* the License.
13+
*/
14+
15+
package com.google.common.util.concurrent;
16+
17+
import static com.google.common.base.Preconditions.checkNotNull;
18+
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED;
19+
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN;
20+
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED;
21+
import static com.google.common.util.concurrent.Futures.immediateCancelledFuture;
22+
import static com.google.common.util.concurrent.Futures.immediateFuture;
23+
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
24+
25+
import com.google.common.annotations.Beta;
26+
import java.util.concurrent.Callable;
27+
import java.util.concurrent.Executor;
28+
import java.util.concurrent.atomic.AtomicReference;
29+
30+
/**
31+
* Serializes execution of a set of operations. This class guarantees that a submitted callable will
32+
* not be called before previously submitted callables (and any {@code Future}s returned from them)
33+
* have completed.
34+
*
35+
* <p>This class implements a superset of the behavior of {@link
36+
* MoreExecutors#newSequentialExecutor}. If your tasks all run on the same underlying executor and
37+
* don't need to wait for {@code Future}s returned from {@code AsyncCallable}s, use it instead.
38+
*
39+
* @since NEXT
40+
*/
41+
@Beta
42+
public final class ExecutionSequencer {
43+
44+
private ExecutionSequencer() {}
45+
46+
/** Creates a new instance. */
47+
public static ExecutionSequencer create() {
48+
return new ExecutionSequencer();
49+
}
50+
51+
enum RunningState {
52+
NOT_RUN,
53+
CANCELLED,
54+
STARTED,
55+
}
56+
57+
/** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */
58+
private final AtomicReference<ListenableFuture<Object>> ref =
59+
new AtomicReference<>(immediateFuture(null));
60+
61+
/**
62+
* Enqueues a task to run when the previous task (if any) completes.
63+
*
64+
* <p>Cancellation does not propagate from the output future to a callable that has begun to
65+
* execute, but if the output future is cancelled before {@link Callable#call()} is invoked,
66+
* {@link Callable#call()} will not be invoked.
67+
*/
68+
public <T> ListenableFuture<T> submit(final Callable<T> callable, Executor executor) {
69+
checkNotNull(callable);
70+
return submitAsync(
71+
new AsyncCallable<T>() {
72+
@Override
73+
public ListenableFuture<T> call() throws Exception {
74+
return immediateFuture(callable.call());
75+
}
76+
},
77+
executor);
78+
}
79+
80+
/**
81+
* Enqueues a task to run when the previous task (if any) completes.
82+
*
83+
* <p>Cancellation does not propagate from the output future to the future returned from {@code
84+
* callable} or a callable that has begun to execute, but if the output future is cancelled before
85+
* {@link AsyncCallable#call()} is invoked, {@link AsyncCallable#call()} will not be invoked.
86+
*/
87+
public <T> ListenableFuture<T> submitAsync(
88+
final AsyncCallable<T> callable, final Executor executor) {
89+
checkNotNull(callable);
90+
final AtomicReference<RunningState> runningState = new AtomicReference<>(NOT_RUN);
91+
final AsyncCallable<T> task =
92+
new AsyncCallable<T>() {
93+
@Override
94+
public ListenableFuture<T> call() throws Exception {
95+
if (!runningState.compareAndSet(NOT_RUN, STARTED)) {
96+
return immediateCancelledFuture();
97+
}
98+
return callable.call();
99+
}
100+
};
101+
/*
102+
* Four futures are at play here:
103+
* taskFuture is the future tracking the result of the callable.
104+
* newFuture is a future that completes after this and all prior tasks are done.
105+
* oldFuture is the previous task's newFuture.
106+
* outputFuture is the future we return to the caller, a nonCancellationPropagating taskFuture.
107+
*
108+
* newFuture is guaranteed to only complete once all tasks previously submitted to this instance
109+
* have completed - namely after oldFuture is done, and taskFuture has either completed or been
110+
* cancelled before the callable started execution.
111+
*/
112+
final SettableFuture<Object> newFuture = SettableFuture.create();
113+
114+
final ListenableFuture<?> oldFuture = ref.getAndSet(newFuture);
115+
116+
// Invoke our task once the previous future completes.
117+
final ListenableFuture<T> taskFuture =
118+
Futures.submitAsync(
119+
task,
120+
new Executor() {
121+
@Override
122+
public void execute(Runnable runnable) {
123+
oldFuture.addListener(runnable, executor);
124+
}
125+
});
126+
127+
final ListenableFuture<T> outputFuture = Futures.nonCancellationPropagating(taskFuture);
128+
129+
// newFuture's lifetime is determined by taskFuture, which can't complete before oldFuture
130+
// unless taskFuture is cancelled, in which case it falls back to oldFuture. This ensures that
131+
// if the future we return is cancelled, we don't begin execution of the next task until after
132+
// oldFuture completes.
133+
Runnable listener =
134+
new Runnable() {
135+
@Override
136+
public void run() {
137+
if (taskFuture.isDone()
138+
// If this CAS succeeds, we know that the provided callable will never be invoked,
139+
// so when oldFuture completes it is safe to allow the next submitted task to
140+
// proceed.
141+
|| (outputFuture.isCancelled() && runningState.compareAndSet(NOT_RUN, CANCELLED))) {
142+
// Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of
143+
// a future that eventually came from immediateFuture(null), this doesn't leak
144+
// throwables or completion values.
145+
newFuture.setFuture(oldFuture);
146+
}
147+
}
148+
};
149+
// Adding the listener to both futures guarantees that newFuture will aways be set. Adding to
150+
// taskFuture guarantees completion if the callable is invoked, and adding to outputFuture
151+
// propagates cancellation if the callable has not yet been invoked.
152+
outputFuture.addListener(listener, directExecutor());
153+
taskFuture.addListener(listener, directExecutor());
154+
155+
return outputFuture;
156+
}
157+
}

0 commit comments

Comments
 (0)