mysten_common/sync/notify_once.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use parking_lot::Mutex;
5use std::sync::Arc;
6use tokio::sync::Notify;
7use tokio::sync::futures::Notified;
8
9/// Notify once allows waiter to register for certain conditions and unblocks waiter
10/// when condition is signalled with `notify` method.
11///
12/// The functionality is somewhat similar to a tokio watch channel with subscribe method,
13/// however it is much less error prone to use NotifyOnce rather then tokio watch.
14///
15/// Specifically with tokio watch you may miss notification,
16/// if you subscribe to it after the value was changed
17/// (Note that this is not a bug in tokio watch, but rather a mis-use of it).
18///
19/// NotifyOnce guarantees that wait() will return once notify() is called,
20/// regardless of whether wait() was called before or after notify().
21#[derive(Debug)]
22pub struct NotifyOnce {
23    notify: Mutex<Option<Arc<Notify>>>,
24}
25
26impl NotifyOnce {
27    pub fn new() -> Self {
28        Self::default()
29    }
30
31    /// Notify all waiters, present and future about event
32    ///
33    /// After this method all pending and future calls to .wait() will return
34    ///
35    /// This method returns errors if called more then once
36    #[allow(clippy::result_unit_err)]
37    pub fn notify(&self) -> Result<(), ()> {
38        let Some(notify) = self.notify.lock().take() else {
39            return Err(());
40        };
41        // At this point all `register` either registered with current notify,
42        // or will be returning immediately
43        notify.notify_waiters();
44        Ok(())
45    }
46
47    /// Awaits for `notify` method to be called.
48    ///
49    /// This future is cancellation safe.
50    pub async fn wait(&self) {
51        // Note that we only hold lock briefly when registering for notification
52        // There is a bit of a trickery here with lock - we take a lock and if it is not empty,
53        // we register .notified() first and then release lock
54        //
55        // This is to make sure no notification is lost because Notify::notify_waiters do not
56        // notify waiters that register **after** notify_waiters was called
57        let mut notify = None;
58        let notified = self.make_notified(&mut notify);
59
60        if let Some(notified) = notified {
61            notified.await;
62        }
63    }
64
65    // This made into separate function as it is only way to make compiler
66    // not to hold `lock` in a generated async future.
67    fn make_notified<'a>(&self, notify: &'a mut Option<Arc<Notify>>) -> Option<Notified<'a>> {
68        let lock = self.notify.lock();
69        *notify = lock.as_ref().cloned();
70        notify.as_ref().map(|n| n.notified())
71    }
72}
73
74impl Default for NotifyOnce {
75    fn default() -> Self {
76        let notify = Arc::new(Notify::new());
77        let notify = Mutex::new(Some(notify));
78        Self { notify }
79    }
80}
81
82#[tokio::test]
83async fn notify_once_test() {
84    let notify_once = NotifyOnce::new();
85    // Before notify() is called .wait() is not ready
86    assert!(
87        futures::future::poll_immediate(notify_once.wait())
88            .await
89            .is_none()
90    );
91    let wait = notify_once.wait();
92    notify_once.notify().unwrap();
93    // Pending wait() call is ready now
94    assert!(futures::future::poll_immediate(wait).await.is_some());
95    // Take wait future and don't resolve it.
96    // This makes sure lock is dropped properly and wait futures resolve independently of each other
97    let _dangle_wait = notify_once.wait();
98    // Any new wait() is immediately ready
99    assert!(
100        futures::future::poll_immediate(notify_once.wait())
101            .await
102            .is_some()
103    );
104}