sui_analytics_indexer/package_store/
package_cache_worker.rs1use anyhow::Result;
5use async_trait::async_trait;
6use std::sync::Arc;
7use sui_types::full_checkpoint_content::CheckpointData;
8
9use crate::{Worker, package_store::PackageCache};
10
11pub const PACKAGE_CACHE_WORKER_NAME: &str = "package_cache_manager";
12
13pub struct PackageCacheWorker {
14 package_cache: Arc<PackageCache>,
15}
16
17impl PackageCacheWorker {
18 pub fn new(package_cache: Arc<PackageCache>) -> Self {
19 Self { package_cache }
20 }
21
22 pub fn name(&self) -> &'static str {
23 PACKAGE_CACHE_WORKER_NAME
24 }
25}
26
27#[async_trait]
28impl Worker for PackageCacheWorker {
29 type Result = ();
30
31 async fn process_checkpoint_arc(&self, checkpoint_data: &Arc<CheckpointData>) -> Result<()> {
32 let sequence_number = *checkpoint_data.checkpoint_summary.sequence_number();
33 let cache = self.package_cache.clone();
34 let checkpoint_data = checkpoint_data.clone();
35
36 tokio::task::spawn_blocking(move || {
37 let all_objects = checkpoint_data
38 .transactions
39 .iter()
40 .flat_map(|txn| txn.output_objects.iter());
41 cache.update_batch(all_objects)?;
42 Ok::<(), anyhow::Error>(())
43 })
44 .await??;
45
46 self.package_cache.coordinator.mark_ready(sequence_number);
47 Ok(())
48 }
49
50 fn preprocess_hook(&self, _: &CheckpointData) -> Result<()> {
51 Ok(())
52 }
53}