Skip to content

Commit 32005e3

Browse files
Markus Westerlindcramertj
authored andcommitted
fix: Shared must relinquish control to the executor if repolled
If the wrapped future requires that another future runs before it stops waking itself while returning pending the current Shared will loop forever. This removes the `REPOLL` case and returns pending immediately, since the current task is recorded and therefore woken through `Notifier`'s `wake_by_ref` the `Shared` future will still be polled again Fixes #2130
1 parent dc06eda commit 32005e3

File tree

2 files changed

+39
-21
lines changed

2 files changed

+39
-21
lines changed

futures-util/src/future/future/shared.rs

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,8 @@ where
6666

6767
const IDLE: usize = 0;
6868
const POLLING: usize = 1;
69-
const REPOLL: usize = 2;
70-
const COMPLETE: usize = 3;
71-
const POISONED: usize = 4;
69+
const COMPLETE: usize = 2;
70+
const POISONED: usize = 3;
7271

7372
const NULL_WAKER_KEY: usize = usize::max_value();
7473

@@ -196,7 +195,7 @@ where
196195
IDLE => {
197196
// Lock acquired, fall through
198197
}
199-
POLLING | REPOLL => {
198+
POLLING => {
200199
// Another task is currently polling, at this point we just want
201200
// to ensure that the waker for this task is registered
202201
this.inner = Some(inner);
@@ -227,35 +226,27 @@ where
227226

228227
let _reset = Reset(&inner.notifier.state);
229228

230-
let output = loop {
229+
let output = {
231230
let future = unsafe {
232231
match &mut *inner.future_or_output.get() {
233232
FutureOrOutput::Future(fut) => Pin::new_unchecked(fut),
234233
_ => unreachable!(),
235234
}
236235
};
237236

238-
let poll = future.poll(&mut cx);
239-
240-
match poll {
237+
match future.poll(&mut cx) {
241238
Poll::Pending => {
242-
let state = &inner.notifier.state;
243-
match state.compare_and_swap(POLLING, IDLE, SeqCst) {
239+
match inner.notifier.state.compare_and_swap(POLLING, IDLE, SeqCst) {
244240
POLLING => {
245241
// Success
246242
drop(_reset);
247243
this.inner = Some(inner);
248244
return Poll::Pending;
249245
}
250-
REPOLL => {
251-
// Was woken since: Gotta poll again!
252-
let prev = state.swap(POLLING, SeqCst);
253-
assert_eq!(prev, REPOLL);
254-
}
255246
_ => unreachable!(),
256247
}
257248
}
258-
Poll::Ready(output) => break output,
249+
Poll::Ready(output) => output,
259250
}
260251
};
261252

@@ -313,8 +304,6 @@ where
313304

314305
impl ArcWake for Notifier {
315306
fn wake_by_ref(arc_self: &Arc<Self>) {
316-
arc_self.state.compare_and_swap(POLLING, REPOLL, SeqCst);
317-
318307
let wakers = &mut *arc_self.wakers.lock().unwrap();
319308
if let Some(wakers) = wakers.as_mut() {
320309
for (_key, opt_waker) in wakers {

futures/tests/shared.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ fn drop_on_one_task_ok() {
8989
t2.join().unwrap();
9090
}
9191

92-
9392
#[cfg(feature = "executor")] // executor::
9493
#[test]
9594
fn drop_in_poll() {
@@ -104,7 +103,8 @@ fn drop_in_poll() {
104103
let future1 = future::lazy(move |_| {
105104
slot2.replace(None); // Drop future
106105
1
107-
}).shared();
106+
})
107+
.shared();
108108

109109
let future2 = LocalFutureObj::new(Box::new(future1.clone()));
110110
slot1.replace(Some(future2));
@@ -141,7 +141,9 @@ fn peek() {
141141
}
142142

143143
// Once the Shared has been polled, the value is peekable on the clone.
144-
spawn.spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))).unwrap();
144+
spawn
145+
.spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ()))))
146+
.unwrap();
145147
local_pool.run();
146148
for _ in 0..2 {
147149
assert_eq!(*f2.peek().unwrap(), Ok(42));
@@ -191,3 +193,30 @@ fn dont_do_unnecessary_clones_on_output() {
191193
assert_eq!(block_on(rx.clone()).unwrap().0.get(), 2);
192194
assert_eq!(block_on(rx).unwrap().0.get(), 2);
193195
}
196+
197+
#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
198+
#[test]
199+
fn shared_future_that_wakes_itself_until_pending_is_returned() {
200+
use futures::executor::block_on;
201+
use futures::future::FutureExt;
202+
use std::cell::Cell;
203+
use std::task::Poll;
204+
205+
let proceed = Cell::new(false);
206+
let fut = futures::future::poll_fn(|cx| {
207+
if proceed.get() {
208+
Poll::Ready(())
209+
} else {
210+
cx.waker().wake_by_ref();
211+
Poll::Pending
212+
}
213+
})
214+
.shared();
215+
216+
// The join future can only complete if the second future gets a chance to run after the first
217+
// has returned pending
218+
assert_eq!(
219+
block_on(futures::future::join(fut, async { proceed.set(true) })),
220+
((), ())
221+
);
222+
}

0 commit comments

Comments
 (0)