mysten_common/sync/
async_once_cell.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use parking_lot::Mutex;
5use std::sync::Arc;
6use tokio::sync::{OwnedRwLockWriteGuard, RwLock};
7
8/// This structure contains a cell for a single value.
9/// The cell can be written only once, and can be read many times.
10/// Readers are provided with async API, that waits for write to happen.
11/// This is similar to tokio::sync::watch, except one difference:
12/// * tokio::sync::watch requires existing receiver to work. If no subscriber is registered, and the value is sent to channel, the value is dropped
13/// * Unlike with tokio::sync::watch, it is possible to write to AsyncOnceCell when no readers are registered, and value will be available later when AsyncOnceCell::get is called
14pub struct AsyncOnceCell<T> {
15    value: Arc<RwLock<Option<T>>>,
16    writer: Mutex<Option<OwnedRwLockWriteGuard<Option<T>>>>,
17}
18
19impl<T: Send + Clone> AsyncOnceCell<T> {
20    pub fn new() -> Self {
21        let value = Arc::new(RwLock::new(None));
22        let writer = value
23            .clone()
24            .try_write_owned()
25            .expect("Write lock can not fail here");
26        let writer = Mutex::new(Some(writer));
27        Self { value, writer }
28    }
29
30    pub async fn get(&self) -> T {
31        self.value
32            .read()
33            .await
34            .as_ref()
35            .cloned()
36            .expect("Value is available when writer is dropped")
37    }
38
39    /// Sets the value and notifies waiters. Return error if called twice
40    #[allow(clippy::result_unit_err)]
41    pub fn set(&self, value: T) -> Result<(), ()> {
42        let mut writer = self.writer.lock();
43        match writer.take() {
44            None => Err(()),
45            Some(mut writer) => {
46                *writer = Some(value);
47                Ok(())
48            }
49        }
50    }
51}
52
53impl<T: Send + Clone> Default for AsyncOnceCell<T> {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59#[cfg(test)]
60mod tests {
61    use super::*;
62
63    #[tokio::test]
64    async fn async_once_cell_test() {
65        let cell = Arc::new(AsyncOnceCell::<u64>::new());
66        let cell2 = cell.clone();
67        let wait = tokio::spawn(async move { cell2.get().await });
68        tokio::task::yield_now().await;
69        cell.set(15).unwrap();
70        assert!(cell.set(16).is_err());
71        assert_eq!(15, cell.get().await);
72        assert_eq!(15, wait.await.unwrap());
73    }
74}