sui_data_ingestion_core/
lib.rs1mod executor;
5mod metrics;
6mod progress_store;
7mod reader;
8mod reducer;
9#[cfg(test)]
10mod tests;
11mod util;
12mod worker_pool;
13
14use std::sync::Arc;
15
16use anyhow::Result;
17use async_trait::async_trait;
18pub use executor::{
19 IndexerExecutor, MAX_CHECKPOINTS_IN_PROGRESS, setup_single_workflow,
20 setup_single_workflow_with_options,
21};
22pub use metrics::DataIngestionMetrics;
23pub use progress_store::{
24 ExecutorProgress, FileProgressStore, ProgressStore, ShimIndexerProgressStore, ShimProgressStore,
25};
26pub use reader::{CheckpointReader, ReaderOptions};
27use sui_types::full_checkpoint_content::CheckpointData;
28pub use util::{create_remote_store_client, end_of_epoch_data};
29pub use worker_pool::WorkerPool;
30
31#[async_trait]
32pub trait Worker: Send + Sync {
33 type Result: Send + Sync + Clone;
34 async fn process_checkpoint_arc(
35 &self,
36 checkpoint: &Arc<CheckpointData>,
37 ) -> Result<Self::Result> {
38 self.process_checkpoint(checkpoint).await
39 }
40 async fn process_checkpoint(&self, _checkpoint_data: &CheckpointData) -> Result<Self::Result> {
44 panic!("process_checkpoint not implemented")
45 }
46
47 fn preprocess_hook(&self, _: &CheckpointData) -> Result<()> {
48 Ok(())
49 }
50}
51
52#[async_trait]
53pub trait Reducer<R: Send + Sync>: Send + Sync {
54 async fn commit(&self, batch: Vec<R>) -> Result<()>;
55
56 fn should_close_batch(&self, _batch: &[R], next_item: Option<&R>) -> bool {
57 next_item.is_none()
58 }
59}