sui_analytics_indexer/store/
live.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Live mode store - derives watermarks from file names.
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use object_store::ObjectStore;
10use object_store::ObjectStoreExt as _;
11use object_store::path::Path as ObjectPath;
12use sui_indexer_alt_framework_store_traits::CommitterWatermark;
13use sui_storage::object_store::util::find_all_dirs_with_epoch_prefix;
14use sui_storage::object_store::util::find_all_files_with_epoch_prefix;
15use tracing::info;
16
17use crate::config::BatchSizeConfig;
18use crate::config::PipelineConfig;
19use crate::handlers::CheckpointRows;
20use crate::store::Batch;
21
22/// Live mode - derives watermarks from file names.
23///
24/// Used for normal streaming ingestion where files are written with checkpoint
25/// ranges in their names, and watermarks are derived from those file names.
26#[derive(Clone)]
27pub struct LiveStore {
28    object_store: Arc<dyn ObjectStore>,
29}
30
31impl LiveStore {
32    /// Create a new live store.
33    pub fn new(object_store: Arc<dyn ObjectStore>) -> Self {
34        Self { object_store }
35    }
36
37    /// Determine the watermark by scanning file names in the object store.
38    ///
39    /// 1. Find epoch directories under `{pipeline}/epoch_*`
40    /// 2. Get the latest epoch (max epoch number)
41    /// 3. Find files in that epoch and extract checkpoint ranges from file names
42    /// 4. Return the maximum `end` value as the watermark
43    pub(crate) async fn committer_watermark(
44        &self,
45        pipeline: &str,
46    ) -> anyhow::Result<Option<CommitterWatermark>> {
47        let prefix = ObjectPath::from(pipeline);
48        let epoch_dirs = find_all_dirs_with_epoch_prefix(&self.object_store, Some(&prefix)).await?;
49
50        // Get latest epoch
51        let Some((&epoch, epoch_path)) = epoch_dirs.last_key_value() else {
52            return Ok(None); // No data yet
53        };
54
55        // Find files in latest epoch: {pipeline}/epoch_N/{start}_{end}.{format}
56        let checkpoint_ranges =
57            find_all_files_with_epoch_prefix(&self.object_store, Some(epoch_path)).await?;
58
59        // Watermark = max(end) across all files
60        let checkpoint_hi = checkpoint_ranges.iter().map(|r| r.end).max().unwrap_or(0);
61
62        // Need checkpoint_hi - 1 since ranges are exclusive and watermark is inclusive
63        if checkpoint_hi == 0 {
64            return Ok(None);
65        }
66
67        info!(
68            pipeline,
69            epoch,
70            checkpoint = checkpoint_hi - 1,
71            "Determined watermark from bucket iteration"
72        );
73
74        Ok(Some(CommitterWatermark {
75            epoch_hi_inclusive: epoch,
76            checkpoint_hi_inclusive: checkpoint_hi - 1, // Convert exclusive end to inclusive
77            tx_hi: 0,                                   // Not stored in filenames
78            timestamp_ms_hi_inclusive: 0,               // Not stored in filenames
79        }))
80    }
81
82    /// Write a file to the object store.
83    pub(crate) async fn write_to_object_store(
84        &self,
85        path: &ObjectPath,
86        payload: object_store::PutPayload,
87    ) -> anyhow::Result<()> {
88        self.object_store.put(path, payload).await?;
89        Ok(())
90    }
91
92    /// Split a batch of checkpoints into files based on epoch boundaries, batch size, and time.
93    ///
94    /// Cuts at:
95    /// - Epoch boundaries (files don't span epochs)
96    /// - Batch size thresholds (rows or checkpoint count)
97    /// - Time threshold (if `max_batch_duration_secs` is configured)
98    pub(crate) fn split_framework_batch_into_files(
99        &self,
100        pipeline_config: &PipelineConfig,
101        rows_by_checkpoint: &[CheckpointRows],
102        mut pending_batch: Batch,
103    ) -> (Batch, Vec<Batch>) {
104        let batch_size = pipeline_config
105            .batch_size
106            .as_ref()
107            .expect("batch_size not configured for pipeline");
108
109        let max_duration = Duration::from_secs(pipeline_config.force_batch_cut_after_secs);
110
111        let mut complete_batches: Vec<Batch> = Vec::new();
112
113        for checkpoint_rows in rows_by_checkpoint {
114            // Cut at epoch boundary
115            if pending_batch
116                .epoch()
117                .is_some_and(|e| e != checkpoint_rows.epoch)
118            {
119                assert!(
120                    !pending_batch.checkpoints_rows.is_empty(),
121                    "invalid state: epoch set but rows empty"
122                );
123                complete_batches.push(pending_batch);
124                pending_batch = Batch::default();
125            }
126
127            match *batch_size {
128                BatchSizeConfig::Rows(n) => {
129                    // Flush BEFORE adding so checkpoint rows stay together
130                    if pending_batch.row_count() >= n {
131                        complete_batches.push(pending_batch);
132                        pending_batch = Batch::default();
133                    }
134                    pending_batch.add(checkpoint_rows.clone());
135                }
136                BatchSizeConfig::Checkpoints(n) => {
137                    pending_batch.add(checkpoint_rows.clone());
138                    // Flush AFTER adding
139                    if pending_batch.checkpoint_count() == n {
140                        complete_batches.push(pending_batch);
141                        pending_batch = Batch::default();
142                    }
143                }
144            }
145
146            // Time-based flush (only if batch has data)
147            if pending_batch.checkpoint_count() > 0 && pending_batch.elapsed() >= max_duration {
148                complete_batches.push(pending_batch);
149                pending_batch = Batch::default();
150            }
151        }
152
153        (pending_batch, complete_batches)
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use bytes::Bytes;
161    use object_store::PutPayload;
162    use object_store::memory::InMemory;
163
164    use crate::config::IndexerConfig;
165    use crate::config::OutputStoreConfig;
166    use crate::config::PipelineConfig;
167    use crate::metrics::Metrics;
168    use crate::pipeline::Pipeline;
169    use crate::store::AnalyticsStore;
170    use sui_indexer_alt_framework::store::Connection;
171    use sui_indexer_alt_framework::store::Store;
172
173    fn test_metrics() -> Metrics {
174        Metrics::new(&prometheus::Registry::new())
175    }
176
177    fn test_config(object_store: Arc<dyn object_store::ObjectStore>) -> IndexerConfig {
178        IndexerConfig {
179            output_store: OutputStoreConfig::Custom(object_store),
180            work_dir: None,
181            sf_account_identifier: None,
182            sf_warehouse: None,
183            sf_database: None,
184            sf_schema: None,
185            sf_username: None,
186            sf_role: None,
187            sf_password_file: None,
188            migration_id: None,
189            file_format: crate::config::FileFormat::Parquet,
190            pipeline_configs: vec![PipelineConfig {
191                pipeline: Pipeline::Checkpoint,
192                file_format: crate::config::FileFormat::Parquet,
193                package_id_filter: None,
194                sf_table_id: None,
195                sf_checkpoint_col_id: None,
196                report_sf_max_table_checkpoint: false,
197                batch_size: None,
198                output_prefix: Some("test_pipeline".to_string()),
199                force_batch_cut_after_secs: 600,
200                sequential: Default::default(),
201            }],
202            ingestion: Default::default(),
203            committer: Default::default(),
204            max_pending_uploads: 10,
205            max_concurrent_serialization: 3,
206            watermark_update_interval_secs: 60,
207        }
208    }
209
210    async fn create_test_file(
211        store: &Arc<dyn object_store::ObjectStore>,
212        pipeline: &str,
213        epoch: u64,
214        start: u64,
215        end: u64,
216    ) {
217        let path = ObjectPath::from(format!(
218            "{}/epoch_{}/{}_{}.parquet",
219            pipeline, epoch, start, end
220        ));
221        let payload: PutPayload = Bytes::from("test data").into();
222        store.put(&path, payload).await.unwrap();
223    }
224
225    #[tokio::test]
226    async fn test_committer_watermark_multiple_epochs() {
227        let object_store: Arc<dyn object_store::ObjectStore> = Arc::new(InMemory::new());
228        // Epoch 0 - files written to output_prefix "test_pipeline"
229        create_test_file(&object_store, "test_pipeline", 0, 0, 100).await;
230        create_test_file(&object_store, "test_pipeline", 0, 100, 200).await;
231        // Epoch 1 (latest)
232        create_test_file(&object_store, "test_pipeline", 1, 200, 300).await;
233        create_test_file(&object_store, "test_pipeline", 1, 300, 400).await;
234
235        let config = test_config(object_store.clone());
236        let store = AnalyticsStore::new(object_store, config, test_metrics());
237        let mut conn = store.connect().await.unwrap();
238
239        // Use pipeline name "Checkpoint" which maps to output_prefix "test_pipeline"
240        let watermark = conn.committer_watermark("Checkpoint").await.unwrap();
241        assert!(watermark.is_some());
242        let watermark = watermark.unwrap();
243        // Should use latest epoch (1) and max checkpoint from that epoch
244        assert_eq!(watermark.epoch_hi_inclusive, 1);
245        assert_eq!(watermark.checkpoint_hi_inclusive, 399); // max(300, 400) - 1
246    }
247}