sui_analytics_indexer/package_store/
cache_coordinator.rs1use std::sync::{
5 Arc,
6 atomic::{AtomicU64, Ordering},
7};
8
9use tokio::sync::watch;
10
11pub struct CacheReadyCoordinator {
12 latest: Arc<AtomicU64>,
13 tx: tokio::sync::watch::Sender<u64>,
14 rx: watch::Receiver<u64>,
15}
16
17#[allow(clippy::new_without_default)]
19impl CacheReadyCoordinator {
20 pub fn new() -> Self {
21 let (tx, rx) = watch::channel(0);
22 Self {
23 latest: Arc::new(AtomicU64::new(0)),
24 tx,
25 rx,
26 }
27 }
28
29 pub fn mark_ready(&self, checkpoint: u64) {
30 let prev = self.latest.swap(checkpoint, Ordering::SeqCst);
31 if prev == 0 || checkpoint == prev + 1 {
32 let _ = self.tx.send_replace(checkpoint);
33 } else {
34 panic!(
36 "Package cache coordinator saw checkpoints out of order. Previous: {prev}. Current: {checkpoint}"
37 );
38 }
39 }
40
41 pub async fn wait(&self, checkpoint: u64) {
42 if self.latest.load(Ordering::SeqCst) >= checkpoint {
43 return;
44 }
45 let mut rx = self.rx.clone();
46 while rx.changed().await.is_ok() && *rx.borrow() < checkpoint {}
47 }
48}