Skip to content

Commit d545aa2

Browse files
authored
sync: add sync::Notify::notified_owned() (#7465)
1 parent 911ab21 commit d545aa2

File tree

4 files changed

+484
-22
lines changed

4 files changed

+484
-22
lines changed

tokio/src/sync/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@
449449
cfg_sync! {
450450
/// Named future types.
451451
pub mod futures {
452-
pub use super::notify::Notified;
452+
pub use super::notify::{Notified, OwnedNotified};
453453
}
454454

455455
mod barrier;

tokio/src/sync/notify.rs

Lines changed: 178 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::panic::{RefUnwindSafe, UnwindSafe};
1717
use std::pin::Pin;
1818
use std::ptr::NonNull;
1919
use std::sync::atomic::Ordering::{self, Acquire, Relaxed, Release, SeqCst};
20+
use std::sync::Arc;
2021
use std::task::{Context, Poll, Waker};
2122

2223
type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
@@ -397,6 +398,38 @@ pub struct Notified<'a> {
397398
unsafe impl<'a> Send for Notified<'a> {}
398399
unsafe impl<'a> Sync for Notified<'a> {}
399400

401+
/// Future returned from [`Notify::notified_owned()`].
402+
///
403+
/// This future is fused, so once it has completed, any future calls to poll
404+
/// will immediately return `Poll::Ready`.
405+
#[derive(Debug)]
406+
#[must_use = "futures do nothing unless you `.await` or poll them"]
407+
pub struct OwnedNotified {
408+
/// The `Notify` being received on.
409+
notify: Arc<Notify>,
410+
411+
/// The current state of the receiving process.
412+
state: State,
413+
414+
/// Number of calls to `notify_waiters` at the time of creation.
415+
notify_waiters_calls: usize,
416+
417+
/// Entry in the waiter `LinkedList`.
418+
waiter: Waiter,
419+
}
420+
421+
unsafe impl Sync for OwnedNotified {}
422+
423+
/// A custom `project` implementation is used in place of `pin-project-lite`
424+
/// as a custom drop for [`Notified`] and [`OwnedNotified`] implementation
425+
/// is needed.
426+
struct NotifiedProject<'a> {
427+
notify: &'a Notify,
428+
state: &'a mut State,
429+
notify_waiters_calls: &'a usize,
430+
waiter: &'a Waiter,
431+
}
432+
400433
#[derive(Debug)]
401434
enum State {
402435
Init,
@@ -541,6 +574,53 @@ impl Notify {
541574
}
542575
}
543576

577+
/// Wait for a notification with an owned `Future`.
578+
///
579+
/// Unlike [`Self::notified`] which returns a future tied to the `Notify`'s
580+
/// lifetime, `notified_owned` creates a self-contained future that owns its
581+
/// notification state, making it safe to move between threads.
582+
///
583+
/// See [`Self::notified`] for more details.
584+
///
585+
/// # Cancel safety
586+
///
587+
/// This method uses a queue to fairly distribute notifications in the order
588+
/// they were requested. Cancelling a call to `notified_owned` makes you lose your
589+
/// place in the queue.
590+
///
591+
/// # Examples
592+
///
593+
/// ```
594+
/// use std::sync::Arc;
595+
/// use tokio::sync::Notify;
596+
///
597+
/// #[tokio::main]
598+
/// async fn main() {
599+
/// let notify = Arc::new(Notify::new());
600+
///
601+
/// for _ in 0..10 {
602+
/// let notified = notify.clone().notified_owned();
603+
/// tokio::spawn(async move {
604+
/// notified.await;
605+
/// println!("received notification");
606+
/// });
607+
/// }
608+
///
609+
/// println!("sending notification");
610+
/// notify.notify_waiters();
611+
/// }
612+
/// ```
613+
pub fn notified_owned(self: Arc<Self>) -> OwnedNotified {
614+
// we load the number of times notify_waiters
615+
// was called and store that in the future.
616+
let state = self.state.load(SeqCst);
617+
OwnedNotified {
618+
notify: self,
619+
state: State::Init,
620+
notify_waiters_calls: get_num_notify_waiters_calls(state),
621+
waiter: Waiter::new(),
622+
}
623+
}
544624
/// Notifies the first waiting task.
545625
///
546626
/// If a task is currently waiting, that task is notified. Otherwise, a
@@ -911,9 +991,62 @@ impl Notified<'_> {
911991
self.poll_notified(None).is_ready()
912992
}
913993

994+
fn project(self: Pin<&mut Self>) -> NotifiedProject<'_> {
995+
unsafe {
996+
// Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
997+
998+
is_unpin::<&Notify>();
999+
is_unpin::<State>();
1000+
is_unpin::<usize>();
1001+
1002+
let me = self.get_unchecked_mut();
1003+
NotifiedProject {
1004+
notify: me.notify,
1005+
state: &mut me.state,
1006+
notify_waiters_calls: &me.notify_waiters_calls,
1007+
waiter: &me.waiter,
1008+
}
1009+
}
1010+
}
1011+
1012+
fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
1013+
self.project().poll_notified(waker)
1014+
}
1015+
}
1016+
1017+
impl Future for Notified<'_> {
1018+
type Output = ();
1019+
1020+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1021+
self.poll_notified(Some(cx.waker()))
1022+
}
1023+
}
1024+
1025+
impl Drop for Notified<'_> {
1026+
fn drop(&mut self) {
1027+
// Safety: The type only transitions to a "Waiting" state when pinned.
1028+
unsafe { Pin::new_unchecked(self) }
1029+
.project()
1030+
.drop_notified();
1031+
}
1032+
}
1033+
1034+
// ===== impl OwnedNotified =====
1035+
1036+
impl OwnedNotified {
1037+
/// Adds this future to the list of futures that are ready to receive
1038+
/// wakeups from calls to [`notify_one`].
1039+
///
1040+
/// See [`Notified::enable`] for more details.
1041+
///
1042+
/// [`notify_one`]: Notify::notify_one()
1043+
pub fn enable(self: Pin<&mut Self>) -> bool {
1044+
self.poll_notified(None).is_ready()
1045+
}
1046+
9141047
/// A custom `project` implementation is used in place of `pin-project-lite`
9151048
/// as a custom drop implementation is needed.
916-
fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &usize, &Waiter) {
1049+
fn project(self: Pin<&mut Self>) -> NotifiedProject<'_> {
9171050
unsafe {
9181051
// Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
9191052

@@ -922,17 +1055,47 @@ impl Notified<'_> {
9221055
is_unpin::<usize>();
9231056

9241057
let me = self.get_unchecked_mut();
925-
(
926-
me.notify,
927-
&mut me.state,
928-
&me.notify_waiters_calls,
929-
&me.waiter,
930-
)
1058+
NotifiedProject {
1059+
notify: &me.notify,
1060+
state: &mut me.state,
1061+
notify_waiters_calls: &me.notify_waiters_calls,
1062+
waiter: &me.waiter,
1063+
}
9311064
}
9321065
}
9331066

9341067
fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
935-
let (notify, state, notify_waiters_calls, waiter) = self.project();
1068+
self.project().poll_notified(waker)
1069+
}
1070+
}
1071+
1072+
impl Future for OwnedNotified {
1073+
type Output = ();
1074+
1075+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1076+
self.poll_notified(Some(cx.waker()))
1077+
}
1078+
}
1079+
1080+
impl Drop for OwnedNotified {
1081+
fn drop(&mut self) {
1082+
// Safety: The type only transitions to a "Waiting" state when pinned.
1083+
unsafe { Pin::new_unchecked(self) }
1084+
.project()
1085+
.drop_notified();
1086+
}
1087+
}
1088+
1089+
// ===== impl NotifiedProject =====
1090+
1091+
impl NotifiedProject<'_> {
1092+
fn poll_notified(self, waker: Option<&Waker>) -> Poll<()> {
1093+
let NotifiedProject {
1094+
notify,
1095+
state,
1096+
notify_waiters_calls,
1097+
waiter,
1098+
} = self;
9361099

9371100
'outer_loop: loop {
9381101
match *state {
@@ -1143,20 +1306,14 @@ impl Notified<'_> {
11431306
}
11441307
}
11451308
}
1146-
}
1147-
1148-
impl Future for Notified<'_> {
1149-
type Output = ();
11501309

1151-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1152-
self.poll_notified(Some(cx.waker()))
1153-
}
1154-
}
1155-
1156-
impl Drop for Notified<'_> {
1157-
fn drop(&mut self) {
1158-
// Safety: The type only transitions to a "Waiting" state when pinned.
1159-
let (notify, state, _, waiter) = unsafe { Pin::new_unchecked(self).project() };
1310+
fn drop_notified(self) {
1311+
let NotifiedProject {
1312+
notify,
1313+
state,
1314+
waiter,
1315+
..
1316+
} = self;
11601317

11611318
// This is where we ensure safety. The `Notified` value is being
11621319
// dropped, which means we must ensure that the waiter entry is no

tokio/tests/async_send_sync.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ assert_value!(tokio::sync::broadcast::WeakSender<NN>: !Send & !Sync & Unpin);
401401
assert_value!(tokio::sync::broadcast::WeakSender<YN>: Send & Sync & Unpin);
402402
assert_value!(tokio::sync::broadcast::WeakSender<YY>: Send & Sync & Unpin);
403403
assert_value!(tokio::sync::futures::Notified<'_>: Send & Sync & !Unpin);
404+
assert_value!(tokio::sync::futures::OwnedNotified: Send & Sync & !Unpin);
404405
assert_value!(tokio::sync::mpsc::OwnedPermit<NN>: !Send & !Sync & Unpin);
405406
assert_value!(tokio::sync::mpsc::OwnedPermit<YN>: Send & Sync & Unpin);
406407
assert_value!(tokio::sync::mpsc::OwnedPermit<YY>: Send & Sync & Unpin);

0 commit comments

Comments
 (0)