mysten_common/sync/async_once_cell.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use parking_lot::Mutex;
use std::sync::Arc;
use tokio::sync::{OwnedRwLockWriteGuard, RwLock};
/// This structure contains a cell for a single value.
/// The cell can be written only once, and can be read many times.
/// Readers are provided with async API, that waits for write to happen.
/// This is similar to tokio::sync::watch, except one difference:
/// * tokio::sync::watch requires existing receiver to work. If no subscriber is registered, and the value is sent to channel, the value is dropped
/// * Unlike with tokio::sync::watch, it is possible to write to AsyncOnceCell when no readers are registered, and value will be available later when AsyncOnceCell::get is called
pub struct AsyncOnceCell<T> {
value: Arc<RwLock<Option<T>>>,
writer: Mutex<Option<OwnedRwLockWriteGuard<Option<T>>>>,
}
impl<T: Send + Clone> AsyncOnceCell<T> {
pub fn new() -> Self {
let value = Arc::new(RwLock::new(None));
let writer = value
.clone()
.try_write_owned()
.expect("Write lock can not fail here");
let writer = Mutex::new(Some(writer));
Self { value, writer }
}
pub async fn get(&self) -> T {
self.value
.read()
.await
.as_ref()
.cloned()
.expect("Value is available when writer is dropped")
}
/// Sets the value and notifies waiters. Return error if called twice
#[allow(clippy::result_unit_err)]
pub fn set(&self, value: T) -> Result<(), ()> {
let mut writer = self.writer.lock();
match writer.take() {
None => Err(()),
Some(mut writer) => {
*writer = Some(value);
Ok(())
}
}
}
}
impl<T: Send + Clone> Default for AsyncOnceCell<T> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn async_once_cell_test() {
let cell = Arc::new(AsyncOnceCell::<u64>::new());
let cell2 = cell.clone();
let wait = tokio::spawn(async move { cell2.get().await });
tokio::task::yield_now().await;
cell.set(15).unwrap();
assert!(cell.set(16).is_err());
assert_eq!(15, cell.get().await);
assert_eq!(15, wait.await.unwrap());
}
}