sui_data_ingestion_core/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod executor;
5mod metrics;
6mod progress_store;
7mod reader;
8mod reducer;
9#[cfg(test)]
10mod tests;
11mod util;
12mod worker_pool;
13
14use std::sync::Arc;
15
16use anyhow::Result;
17use async_trait::async_trait;
18pub use executor::{
19    IndexerExecutor, MAX_CHECKPOINTS_IN_PROGRESS, setup_single_workflow,
20    setup_single_workflow_with_options,
21};
22pub use metrics::DataIngestionMetrics;
23pub use progress_store::{
24    ExecutorProgress, FileProgressStore, ProgressStore, ShimIndexerProgressStore, ShimProgressStore,
25};
26pub use reader::{CheckpointReader, ReaderOptions};
27use sui_types::full_checkpoint_content::CheckpointData;
28pub use util::{create_remote_store_client, end_of_epoch_data};
29pub use worker_pool::WorkerPool;
30
31#[async_trait]
32pub trait Worker: Send + Sync {
33    type Result: Send + Sync + Clone;
34    async fn process_checkpoint_arc(
35        &self,
36        checkpoint: &Arc<CheckpointData>,
37    ) -> Result<Self::Result> {
38        self.process_checkpoint(checkpoint).await
39    }
40    /// There is no need to implement this if you implement process_checkpoint_arc. The WorkerPool
41    /// will only call process_checkpoint_arc. This method was left in place for backwards
42    /// compatibiity.
43    async fn process_checkpoint(&self, _checkpoint_data: &CheckpointData) -> Result<Self::Result> {
44        panic!("process_checkpoint not implemented")
45    }
46
47    fn preprocess_hook(&self, _: &CheckpointData) -> Result<()> {
48        Ok(())
49    }
50}
51
52#[async_trait]
53pub trait Reducer<R: Send + Sync>: Send + Sync {
54    async fn commit(&self, batch: Vec<R>) -> Result<()>;
55
56    fn should_close_batch(&self, _batch: &[R], next_item: Option<&R>) -> bool {
57        next_item.is_none()
58    }
59}