mysten_common/sync/
notify_once.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use parking_lot::Mutex;
use std::sync::Arc;
use tokio::sync::futures::Notified;
use tokio::sync::Notify;

/// Notify once allows waiter to register for certain conditions and unblocks waiter
/// when condition is signalled with `notify` method.
///
/// The functionality is somewhat similar to a tokio watch channel with subscribe method,
/// however it is much less error prone to use NotifyOnce rather then tokio watch.
///
/// Specifically with tokio watch you may miss notification,
/// if you subscribe to it after the value was changed
/// (Note that this is not a bug in tokio watch, but rather a mis-use of it).
///
/// NotifyOnce guarantees that wait() will return once notify() is called,
/// regardless of whether wait() was called before or after notify().
#[derive(Debug)]
pub struct NotifyOnce {
    notify: Mutex<Option<Arc<Notify>>>,
}

impl NotifyOnce {
    pub fn new() -> Self {
        Self::default()
    }

    /// Notify all waiters, present and future about event
    ///
    /// After this method all pending and future calls to .wait() will return
    ///
    /// This method returns errors if called more then once
    #[allow(clippy::result_unit_err)]
    pub fn notify(&self) -> Result<(), ()> {
        let Some(notify) = self.notify.lock().take() else {
            return Err(());
        };
        // At this point all `register` either registered with current notify,
        // or will be returning immediately
        notify.notify_waiters();
        Ok(())
    }

    /// Awaits for `notify` method to be called.
    ///
    /// This future is cancellation safe.
    pub async fn wait(&self) {
        // Note that we only hold lock briefly when registering for notification
        // There is a bit of a trickery here with lock - we take a lock and if it is not empty,
        // we register .notified() first and then release lock
        //
        // This is to make sure no notification is lost because Notify::notify_waiters do not
        // notify waiters that register **after** notify_waiters was called
        let mut notify = None;
        let notified = self.make_notified(&mut notify);

        if let Some(notified) = notified {
            notified.await;
        }
    }

    // This made into separate function as it is only way to make compiler
    // not to hold `lock` in a generated async future.
    fn make_notified<'a>(&self, notify: &'a mut Option<Arc<Notify>>) -> Option<Notified<'a>> {
        let lock = self.notify.lock();
        *notify = lock.as_ref().cloned();
        notify.as_ref().map(|n| n.notified())
    }
}

impl Default for NotifyOnce {
    fn default() -> Self {
        let notify = Arc::new(Notify::new());
        let notify = Mutex::new(Some(notify));
        Self { notify }
    }
}

#[tokio::test]
async fn notify_once_test() {
    let notify_once = NotifyOnce::new();
    // Before notify() is called .wait() is not ready
    assert!(futures::future::poll_immediate(notify_once.wait())
        .await
        .is_none());
    let wait = notify_once.wait();
    notify_once.notify().unwrap();
    // Pending wait() call is ready now
    assert!(futures::future::poll_immediate(wait).await.is_some());
    // Take wait future and don't resolve it.
    // This makes sure lock is dropped properly and wait futures resolve independently of each other
    let _dangle_wait = notify_once.wait();
    // Any new wait() is immediately ready
    assert!(futures::future::poll_immediate(notify_once.wait())
        .await
        .is_some());
}