sui_analytics_indexer/package_store/
cache_coordinator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::{
5    Arc,
6    atomic::{AtomicU64, Ordering},
7};
8
9use tokio::sync::watch;
10
11pub struct CacheReadyCoordinator {
12    latest: Arc<AtomicU64>,
13    tx: tokio::sync::watch::Sender<u64>,
14    rx: watch::Receiver<u64>,
15}
16
17// Signals handlers when a checkpoint's objects have been added to the package cache.
18#[allow(clippy::new_without_default)]
19impl CacheReadyCoordinator {
20    pub fn new() -> Self {
21        let (tx, rx) = watch::channel(0);
22        Self {
23            latest: Arc::new(AtomicU64::new(0)),
24            tx,
25            rx,
26        }
27    }
28
29    pub fn mark_ready(&self, checkpoint: u64) {
30        let prev = self.latest.swap(checkpoint, Ordering::SeqCst);
31        if prev == 0 || checkpoint == prev + 1 {
32            let _ = self.tx.send_replace(checkpoint);
33        } else {
34            // Should never happen since concurrency is set to 1.
35            panic!(
36                "Package cache coordinator saw checkpoints out of order. Previous: {prev}. Current: {checkpoint}"
37            );
38        }
39    }
40
41    pub async fn wait(&self, checkpoint: u64) {
42        if self.latest.load(Ordering::SeqCst) >= checkpoint {
43            return;
44        }
45        let mut rx = self.rx.clone();
46        while rx.changed().await.is_ok() && *rx.borrow() < checkpoint {}
47    }
48}