sui_analytics_indexer/package_store/
package_cache_worker.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::Result;
5use async_trait::async_trait;
6use std::sync::Arc;
7use sui_types::full_checkpoint_content::CheckpointData;
8
9use crate::{Worker, package_store::PackageCache};
10
11pub const PACKAGE_CACHE_WORKER_NAME: &str = "package_cache_manager";
12
13pub struct PackageCacheWorker {
14    package_cache: Arc<PackageCache>,
15}
16
17impl PackageCacheWorker {
18    pub fn new(package_cache: Arc<PackageCache>) -> Self {
19        Self { package_cache }
20    }
21
22    pub fn name(&self) -> &'static str {
23        PACKAGE_CACHE_WORKER_NAME
24    }
25}
26
27#[async_trait]
28impl Worker for PackageCacheWorker {
29    type Result = ();
30
31    async fn process_checkpoint_arc(&self, checkpoint_data: &Arc<CheckpointData>) -> Result<()> {
32        let sequence_number = *checkpoint_data.checkpoint_summary.sequence_number();
33        let cache = self.package_cache.clone();
34        let checkpoint_data = checkpoint_data.clone();
35
36        tokio::task::spawn_blocking(move || {
37            let all_objects = checkpoint_data
38                .transactions
39                .iter()
40                .flat_map(|txn| txn.output_objects.iter());
41            cache.update_batch(all_objects)?;
42            Ok::<(), anyhow::Error>(())
43        })
44        .await??;
45
46        self.package_cache.coordinator.mark_ready(sequence_number);
47        Ok(())
48    }
49
50    fn preprocess_hook(&self, _: &CheckpointData) -> Result<()> {
51        Ok(())
52    }
53}