sui_analytics_indexer/store/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Analytics store implementation with TransactionalStore support.
5//!
6//! This store supports two modes:
7//!
8//! ## Live Mode
9//! Derives watermarks from file names via bucket iteration at startup,
10//! rather than storing them separately. File uploads inherently update the watermark
11//! since file names encode checkpoint ranges.
12//!
13//! ## Migration Mode
14//! When `migration_id` is set, the store operates in migration mode:
15//! - Existing file ranges are loaded at startup and updated in-place.
16//! - Watermark is stored in a separate file: `_metadata/watermarks/{pipeline}@migration_{id}.json`
17//! - Conditional PUT with etag is used to prevent concurrent modification of data files
18
19use std::collections::HashMap;
20use std::ops::Range;
21use std::sync::Arc;
22use std::sync::RwLock;
23use std::time::Duration;
24use std::time::Instant;
25
26use anyhow::Result;
27use anyhow::bail;
28use async_trait::async_trait;
29use object_store::PutPayload;
30use object_store::path::Path as ObjectPath;
31use scoped_futures::ScopedBoxFuture;
32use sui_indexer_alt_framework::pipeline::Processor;
33use sui_indexer_alt_framework::store::Connection;
34use sui_indexer_alt_framework::store::Store;
35use sui_indexer_alt_framework::store::TransactionalStore;
36use sui_indexer_alt_framework_store_traits::CommitterWatermark;
37use sui_indexer_alt_framework_store_traits::InitWatermark;
38use sui_indexer_alt_framework_store_traits::PrunerWatermark;
39use sui_indexer_alt_framework_store_traits::ReaderWatermark;
40use sui_indexer_alt_framework_store_traits::init_with_committer_watermark;
41use sui_types::base_types::EpochId;
42use tokio::sync::mpsc;
43use tracing::debug;
44use tracing::info;
45use tracing::warn;
46
47use crate::config::FileFormat;
48use crate::config::IndexerConfig;
49use crate::handlers::CheckpointRows;
50use crate::metrics::Metrics;
51use crate::schema::RowSchema;
52
53/// Rows accumulated across commits, waiting to be flushed to a file.
54#[derive(Clone)]
55pub struct Batch {
56    pub(crate) checkpoints_rows: Vec<CheckpointRows>,
57    row_count: usize,
58    /// When the batch was created.
59    created_at: Instant,
60    /// Explicit checkpoint range (migration mode only). If set, used for file naming.
61    /// If None, checkpoint_range() computes it from the data.
62    pub(crate) explicit_range: Option<Range<u64>>,
63}
64
65impl Default for Batch {
66    fn default() -> Self {
67        Self {
68            checkpoints_rows: Vec::new(),
69            row_count: 0,
70            created_at: Instant::now(),
71            explicit_range: None,
72        }
73    }
74}
75
76impl Batch {
77    pub(crate) fn first_checkpoint(&self) -> Option<u64> {
78        self.checkpoints_rows.first().map(|c| c.checkpoint)
79    }
80
81    pub(crate) fn last_checkpoint(&self) -> Option<u64> {
82        self.checkpoints_rows.last().map(|c| c.checkpoint)
83    }
84
85    pub(crate) fn epoch(&self) -> Option<EpochId> {
86        self.checkpoints_rows.last().map(|c| c.epoch)
87    }
88
89    pub(crate) fn row_count(&self) -> usize {
90        self.row_count
91    }
92
93    pub(crate) fn checkpoint_count(&self) -> usize {
94        self.checkpoints_rows.len()
95    }
96
97    /// Returns the checkpoint range for this batch.
98    /// Uses explicit_range if set (migration mode), otherwise computes from data.
99    pub(crate) fn checkpoint_range(&self) -> Option<Range<u64>> {
100        self.explicit_range.clone().or_else(|| {
101            match (self.first_checkpoint(), self.last_checkpoint()) {
102                (Some(first), Some(last)) => Some(first..last + 1),
103                _ => None,
104            }
105        })
106    }
107
108    pub(crate) fn add(&mut self, checkpoint_rows: CheckpointRows) {
109        self.row_count += checkpoint_rows.len();
110        self.checkpoints_rows.push(checkpoint_rows);
111    }
112
113    /// Time elapsed since the batch was created.
114    pub(crate) fn elapsed(&self) -> Duration {
115        self.created_at.elapsed()
116    }
117}
118
119mod live;
120mod migration;
121mod uploader;
122
123pub use live::LiveStore;
124pub use migration::FileRangeEntry;
125pub use migration::FileRangeIndex;
126pub use migration::MigrationStore;
127pub use migration::WatermarkUpdateError;
128use uploader::PendingFileUpload;
129
130/// The operational mode of the analytics store.
131#[derive(Clone)]
132pub enum StoreMode {
133    Live(LiveStore),
134    Migration(MigrationStore),
135}
136
137use crate::config::PipelineConfig;
138
139/// Analytics store wrapper that delegates to an inner store mode.
140#[derive(Clone)]
141pub struct AnalyticsStore {
142    mode: StoreMode,
143    /// Accumulated rows per pipeline, keyed by pipeline name.
144    pending_by_pipeline: Arc<RwLock<HashMap<String, Batch>>>,
145    /// Shared metrics for all pipelines.
146    metrics: Metrics,
147    /// Per-pipeline upload senders. Sender is Clone so we can share it.
148    uploader_senders: Arc<RwLock<HashMap<String, mpsc::Sender<PendingFileUpload>>>>,
149    /// Worker handles for graceful shutdown.
150    worker_handles: Arc<tokio::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>>,
151    /// Indexer configuration.
152    config: IndexerConfig,
153    /// Schema for each pipeline, registered during pipeline setup.
154    schemas_by_pipeline: Arc<RwLock<HashMap<String, &'static [&'static str]>>>,
155}
156
157/// Connection to the analytics store.
158///
159/// Provides access to the underlying object store for file uploads.
160pub struct AnalyticsConnection<'a> {
161    store: &'a AnalyticsStore,
162    /// Watermark set by the framework before commit.
163    /// Used to detect file boundary crossings and end-of-processing.
164    watermark: Option<CommitterWatermark>,
165}
166
167impl StoreMode {
168    /// Split a batch of checkpoints into files.
169    ///
170    /// Delegates to mode-specific splitting logic:
171    /// - Live: cuts at epoch boundaries and batch size thresholds
172    /// - Migration: cuts at existing file boundaries using watermark
173    pub(crate) fn split_framework_batch_into_files(
174        &self,
175        pipeline_config: &PipelineConfig,
176        batch_from_framework: &[CheckpointRows],
177        pending_batch: Batch,
178        watermark: &CommitterWatermark,
179    ) -> (Batch, Vec<Batch>) {
180        match self {
181            StoreMode::Live(store) => store.split_framework_batch_into_files(
182                pipeline_config,
183                batch_from_framework,
184                pending_batch,
185            ),
186            StoreMode::Migration(store) => store.split_framework_batch_into_files(
187                pipeline_config,
188                batch_from_framework,
189                pending_batch,
190                watermark,
191            ),
192        }
193    }
194
195    /// Write a file to the object store.
196    ///
197    /// Delegates to mode-specific logic:
198    /// - Live mode: simple `put`
199    /// - Migration mode: verifies range matches expected, uses conditional PUT with etag/version
200    pub(crate) async fn write_to_object_store(
201        &self,
202        pipeline: &str,
203        path: &ObjectPath,
204        checkpoint_range: &Range<u64>,
205        payload: PutPayload,
206    ) -> anyhow::Result<()> {
207        match self {
208            StoreMode::Live(store) => store.write_to_object_store(path, payload).await,
209            StoreMode::Migration(store) => {
210                store
211                    .write_to_object_store(pipeline, path, checkpoint_range, payload)
212                    .await
213            }
214        }
215    }
216
217    /// Update watermark after a successful file upload.
218    ///
219    /// In migration mode, writes the watermark to the metadata file.
220    /// In live mode, this is a no-op (watermarks are derived from files).
221    pub(crate) async fn update_watermark_after_upload(
222        &self,
223        pipeline: &str,
224        epoch: u64,
225        checkpoint_hi_inclusive: u64,
226    ) -> Result<(), WatermarkUpdateError> {
227        match self {
228            StoreMode::Live(_) => Ok(()),
229            StoreMode::Migration(store) => {
230                store
231                    .update_watermark(pipeline, epoch, checkpoint_hi_inclusive)
232                    .await
233            }
234        }
235    }
236
237    /// Spawn an upload worker for this mode.
238    ///
239    /// Returns the sender for queueing files and the worker's JoinHandle.
240    pub(crate) fn spawn_uploader(
241        &self,
242        pipeline_name: String,
243        output_prefix: String,
244        metrics: Metrics,
245        config: &IndexerConfig,
246    ) -> (mpsc::Sender<PendingFileUpload>, tokio::task::JoinHandle<()>) {
247        uploader::spawn_uploader(pipeline_name, output_prefix, self.clone(), metrics, config)
248    }
249}
250
251impl AnalyticsStore {
252    /// Create a new analytics store.
253    ///
254    /// The mode (live vs migration) is determined by `config.migration_id`:
255    /// - None: Live mode for streaming ingestion, sequential uploads
256    /// - Some(id): Migration mode for rewriting existing files, concurrent uploads
257    pub fn new(
258        object_store: Arc<dyn object_store::ObjectStore>,
259        config: IndexerConfig,
260        metrics: Metrics,
261    ) -> Self {
262        let mode = if let Some(ref migration_id) = config.migration_id {
263            info!(migration_id, "Enabling migration mode");
264            StoreMode::Migration(MigrationStore::new(object_store, migration_id.clone()))
265        } else {
266            StoreMode::Live(LiveStore::new(object_store))
267        };
268        Self {
269            mode,
270            pending_by_pipeline: Arc::new(RwLock::new(HashMap::new())),
271            metrics,
272            uploader_senders: Arc::new(RwLock::new(HashMap::new())),
273            worker_handles: Arc::new(tokio::sync::Mutex::new(Vec::new())),
274            config,
275            schemas_by_pipeline: Arc::new(RwLock::new(HashMap::new())),
276        }
277    }
278
279    /// Find the checkpoint range for ingestion, snapping to file boundaries in migration mode.
280    ///
281    /// In migration mode, loads file ranges and snaps both `first_checkpoint` and
282    /// `last_checkpoint` to file boundaries:
283    /// - `first_checkpoint` snaps to the start of the containing file
284    /// - `last_checkpoint` snaps to the end of the containing file (exclusive)
285    ///
286    /// Returns (adjusted_first, adjusted_last) across all pipelines.
287    ///
288    /// In live mode, returns the checkpoints unchanged.
289    pub async fn find_checkpoint_range(
290        &self,
291        first_checkpoint: Option<u64>,
292        last_checkpoint: Option<u64>,
293    ) -> Result<(Option<u64>, Option<u64>)> {
294        match &self.mode {
295            StoreMode::Live(_) => Ok((first_checkpoint, last_checkpoint)),
296            StoreMode::Migration(store) => {
297                // Pass (pipeline_name, output_prefix) pairs.
298                // pipeline_name is used as the key in the file_ranges map.
299                // output_prefix is the path in the object store where files are located.
300                let pipelines: Vec<_> = self
301                    .config
302                    .pipeline_configs()
303                    .iter()
304                    .map(|p| (p.pipeline.name(), p.output_prefix()))
305                    .collect();
306                store
307                    .find_checkpoint_range(
308                        pipelines.iter().map(|(name, prefix)| (*name, *prefix)),
309                        first_checkpoint,
310                        last_checkpoint,
311                    )
312                    .await
313            }
314        }
315    }
316
317    /// Register the schema for a pipeline. Called during pipeline setup.
318    pub fn register_schema<P: Processor, T: RowSchema>(&self) {
319        self.schemas_by_pipeline
320            .write()
321            .unwrap()
322            .insert(P::NAME.to_string(), T::schema());
323    }
324
325    /// Get the schema for a pipeline.
326    fn get_schema(&self, pipeline: &str) -> Option<&'static [&'static str]> {
327        self.schemas_by_pipeline
328            .read()
329            .unwrap()
330            .get(pipeline)
331            .copied()
332    }
333
334    /// Get or create an uploader for a pipeline.
335    ///
336    /// Lazily spawns a background worker on first access.
337    fn get_or_create_uploader(&self, pipeline: &str) -> mpsc::Sender<PendingFileUpload> {
338        // Check if uploader already exists
339        {
340            let uploaders = self.uploader_senders.read().unwrap();
341            if let Some(tx) = uploaders.get(pipeline) {
342                return tx.clone();
343            }
344        }
345
346        // Create new uploader
347        let mut uploaders = self.uploader_senders.write().unwrap();
348        // Double-check in case another thread created it
349        if let Some(tx) = uploaders.get(pipeline) {
350            return tx.clone();
351        }
352
353        let output_prefix = self
354            .config
355            .get_pipeline_config(pipeline)
356            .expect("Pipeline not configured")
357            .output_prefix();
358
359        let (tx, handle) = self.mode.spawn_uploader(
360            pipeline.to_string(),
361            output_prefix.to_string(),
362            self.metrics.clone(),
363            &self.config,
364        );
365        uploaders.insert(pipeline.to_string(), tx.clone());
366
367        // Track the handle for shutdown
368        // Note: We can't block here, so we spawn a task to add the handle
369        let handles = self.worker_handles.clone();
370        tokio::spawn(async move {
371            handles.lock().await.push(handle);
372        });
373
374        tx
375    }
376
377    /// Flush all pending batches before shutdown.
378    ///
379    /// This ensures any buffered data that hasn't reached batch thresholds
380    /// is written to the object store before the indexer shuts down.
381    async fn flush_pending_batches(&self) {
382        // Take all pending batches
383        let pending: HashMap<String, Batch> = {
384            let mut pending_map = self.pending_by_pipeline.write().unwrap();
385            std::mem::take(&mut *pending_map)
386        };
387
388        match &self.mode {
389            StoreMode::Live(_) => {
390                for (pipeline_name, batch) in pending {
391                    if batch.checkpoint_count() == 0 {
392                        continue;
393                    }
394
395                    let pipeline_config = self
396                        .config
397                        .get_pipeline_config(&pipeline_name)
398                        .expect("Pipeline config must exist for pending batch");
399
400                    let schema = self
401                        .get_schema(&pipeline_name)
402                        .expect("Schema must be registered for pending batch");
403
404                    info!(
405                        pipeline = %pipeline_name,
406                        checkpoints = batch.checkpoint_count(),
407                        rows = batch.row_count(),
408                        "Flushing pending batch on shutdown"
409                    );
410
411                    let pending_upload = PendingFileUpload {
412                        epoch: batch.epoch().unwrap(),
413                        checkpoint_range: batch.checkpoint_range().unwrap(),
414                        file_format: pipeline_config.file_format,
415                        checkpoints_rows: batch.checkpoints_rows,
416                        schema,
417                    };
418
419                    let tx = self.get_or_create_uploader(&pipeline_name);
420                    if tx.send(pending_upload).await.is_err() {
421                        warn!(pipeline = %pipeline_name, "Failed to send final batch to uploader");
422                    }
423                }
424            }
425            StoreMode::Migration(_) => {
426                // Migration mode only modifies existing files at known boundaries.
427                // Any pending data that doesn't align with file boundaries is intentionally dropped.
428            }
429        }
430    }
431
432    /// Shutdown all upload workers, waiting for pending uploads to complete.
433    pub async fn shutdown(&self) {
434        // Flush any pending batches before closing channels (live mode only)
435        self.flush_pending_batches().await;
436
437        // Clear senders to signal workers to stop
438        self.uploader_senders.write().unwrap().clear();
439
440        // Wait for all workers to finish
441        let mut handles = self.worker_handles.lock().await;
442        for handle in handles.drain(..) {
443            let _ = handle.await;
444        }
445    }
446}
447
448impl<'a> AnalyticsConnection<'a> {
449    /// Get the store mode for split_batch operations.
450    pub fn mode(&self) -> &StoreMode {
451        &self.store.mode
452    }
453
454    /// Get a clone of the pending rows for a pipeline.
455    /// Returns default FileRows if pipeline has no pending rows.
456    pub fn get_pending_batch(&self, pipeline: &str) -> Batch {
457        self.store
458            .pending_by_pipeline
459            .read()
460            .unwrap()
461            .get(pipeline)
462            .cloned()
463            .unwrap_or_default()
464    }
465
466    /// Set the pending rows for a pipeline after successful upload.
467    pub fn set_pending_batch(&self, pipeline: &str, rows: Batch) {
468        self.store
469            .pending_by_pipeline
470            .write()
471            .unwrap()
472            .insert(pipeline.to_string(), rows);
473    }
474
475    /// Get the pipeline config for a pipeline.
476    fn pipeline_config(&self, pipeline: &str) -> &PipelineConfig {
477        self.store
478            .config
479            .get_pipeline_config(pipeline)
480            .unwrap_or_else(|| panic!("Pipeline '{}' not configured", pipeline))
481    }
482
483    /// Write a file to the object store.
484    ///
485    /// Constructs the path from the provided parameters and delegates to the store mode.
486    pub async fn write_to_object_store(
487        &self,
488        pipeline: &str,
489        epoch: EpochId,
490        checkpoint_range: Range<u64>,
491        file_format: FileFormat,
492        payload: PutPayload,
493    ) -> anyhow::Result<()> {
494        let path = construct_object_store_path(pipeline, epoch, &checkpoint_range, file_format);
495        self.store
496            .mode
497            .write_to_object_store(pipeline, &path, &checkpoint_range, payload)
498            .await
499    }
500
501    /// Commit a batch of rows to the object store.
502    ///
503    /// # Background
504    ///
505    /// The indexer framework has limitations that require us to handle batching
506    /// and serialization in the store layer:
507    ///
508    /// 1. **No minimum batch size**: The framework supports max batch size but not
509    ///    min batch size, so there's no way to defer commits until a batch reaches
510    ///    a certain size or to control which checkpoints end up in which output files.
511    ///
512    /// 2. **No fan-out/fan-in for batch processing**: The framework provides no way
513    ///    to fan out processing of a completed batch (e.g., CPU-intensive serialization)
514    ///    before committing it sequentially.
515    ///
516    /// To work around these limitations, this store accumulates rows across
517    /// checkpoint commits and manages its own batching logic (by checkpoint count
518    /// or row count). Serialization is offloaded to background workers via
519    /// `spawn_blocking`, allowing multiple batches to serialize in parallel while
520    /// maintaining strict checkpoint ordering for uploads.
521    ///
522    /// # Commit Lifecycle
523    ///
524    /// 1. Accumulates rows in pending buffer
525    /// 2. When batch threshold is reached, sends to background upload worker
526    /// 3. Worker serializes (parallel) and uploads (sequential by checkpoint order)
527    ///
528    /// Backpressure: If the upload channel is full, this method blocks.
529    ///
530    /// # Error Handling
531    ///
532    /// The framework assumes commit_batch is atomic - if an error is returned,
533    /// the transaction is rolled back and retried. This method _never_ returns
534    /// an error; object store write failures are retried internally. The
535    /// implementation is idempotent, so framework retries would be safe anyway.
536    pub async fn commit_batch<P: Processor>(
537        &mut self,
538        batch_from_framework: &[CheckpointRows],
539    ) -> Result<usize> {
540        let pipeline = P::NAME;
541        let pipeline_config = self.pipeline_config(pipeline);
542
543        // Split batch from framework into batches that we can upload as files.
544        // The watermark is passed to detect file boundary completion in migration mode.
545        let (pending_batch, complete_batches) = {
546            let pending_batch = self.get_pending_batch(pipeline);
547            self.store.mode.split_framework_batch_into_files(
548                pipeline_config,
549                batch_from_framework,
550                pending_batch,
551                self.watermark
552                    .as_ref()
553                    .expect("watermark should be set on connection."),
554            )
555        };
556
557        debug!(
558            pipeline = pipeline,
559            files_to_upload = complete_batches.len(),
560            pending_checkpoints = pending_batch.checkpoint_count(),
561            "Commit starting"
562        );
563
564        // Get the uploader for this pipeline (lazily created)
565        let tx = self.store.get_or_create_uploader(pipeline);
566
567        let mut total_rows = 0;
568        for batch in complete_batches {
569            total_rows += batch.row_count();
570
571            let pending_upload = PendingFileUpload {
572                epoch: batch.epoch().unwrap(),
573                checkpoint_range: batch.checkpoint_range().unwrap(),
574                file_format: pipeline_config.file_format,
575                checkpoints_rows: batch.checkpoints_rows,
576                schema: self
577                    .store
578                    .get_schema(pipeline)
579                    .unwrap_or_else(|| panic!("Schema not registered for pipeline: {}", pipeline)),
580            };
581
582            // Send to worker - BLOCKS IF CHANNEL FULL (backpressure)
583            tx.send(pending_upload)
584                .await
585                .unwrap_or_else(|e| panic!("Upload channel closed: {}", e));
586        }
587
588        self.set_pending_batch(pipeline, pending_batch);
589
590        debug!(
591            pipeline = pipeline,
592            total_rows = total_rows,
593            "Commit complete, files queued for upload"
594        );
595
596        Ok(total_rows)
597    }
598}
599
600#[async_trait]
601impl Store for AnalyticsStore {
602    type Connection<'c> = AnalyticsConnection<'c>;
603
604    async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
605        Ok(AnalyticsConnection {
606            store: self,
607            watermark: None,
608        })
609    }
610}
611
612#[async_trait]
613impl TransactionalStore for AnalyticsStore {
614    async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
615    where
616        R: Send + 'a,
617        F: Send + 'a,
618        F: for<'r> FnOnce(
619            &'r mut Self::Connection<'_>,
620        ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
621    {
622        let mut conn = self.connect().await?;
623        f(&mut conn).await
624    }
625}
626
627#[async_trait]
628impl Connection for AnalyticsConnection<'_> {
629    /// Initialize watermark.
630    ///
631    /// In live mode: Watermarks are derived from file names, so just delegates to `committer_watermark`.
632    /// In migration mode: Delegates to `MigrationStore::init_watermark`.
633    async fn init_watermark(
634        &mut self,
635        pipeline_task: &str,
636        init_watermark: InitWatermark,
637    ) -> anyhow::Result<InitWatermark> {
638        init_with_committer_watermark(self, pipeline_task, init_watermark).await
639    }
640
641    /// Determine the watermark.
642    ///
643    /// In live mode: scans file names in the object store.
644    /// In migration mode: reads from watermark metadata file.
645    async fn committer_watermark(
646        &mut self,
647        pipeline: &str,
648    ) -> anyhow::Result<Option<CommitterWatermark>> {
649        let output_prefix = self.pipeline_config(pipeline).output_prefix().to_string();
650        match &self.store.mode {
651            StoreMode::Live(store) => store.committer_watermark(&output_prefix).await,
652            StoreMode::Migration(store) => store.committer_watermark(&output_prefix).await,
653        }
654    }
655
656    async fn reader_watermark(
657        &mut self,
658        _pipeline: &'static str,
659    ) -> anyhow::Result<Option<ReaderWatermark>> {
660        // Reader watermark not supported - no pruning in analytics indexer
661        Ok(None)
662    }
663
664    async fn pruner_watermark(
665        &mut self,
666        _pipeline: &'static str,
667        _delay: Duration,
668    ) -> anyhow::Result<Option<PrunerWatermark>> {
669        // Pruning not supported in analytics indexer
670        Ok(None)
671    }
672
673    /// Store the watermark for use in commit_batch.
674    ///
675    /// Note: This doesn't persist the watermark - that's done by the upload worker
676    /// after successful file uploads. This just captures the watermark so commit_batch
677    /// can use it to detect file boundary crossings and end-of-processing.
678    async fn set_committer_watermark(
679        &mut self,
680        _pipeline_task: &str,
681        watermark: CommitterWatermark,
682    ) -> anyhow::Result<bool> {
683        self.watermark = Some(watermark);
684        Ok(true)
685    }
686
687    async fn set_reader_watermark(
688        &mut self,
689        _pipeline: &'static str,
690        _reader_lo: u64,
691    ) -> anyhow::Result<bool> {
692        bail!("Pruning not supported by analytics store");
693    }
694
695    async fn set_pruner_watermark(
696        &mut self,
697        _pipeline: &'static str,
698        _pruner_hi: u64,
699    ) -> anyhow::Result<bool> {
700        bail!("Pruning not supported by analytics store");
701    }
702}
703
704/// Construct the object store path for an analytics file.
705/// Path format: {pipeline}/epoch_{epoch}/{start}_{end}.{ext}
706pub(crate) fn construct_object_store_path(
707    pipeline: &str,
708    epoch: EpochId,
709    checkpoint_range: &Range<u64>,
710    file_format: FileFormat,
711) -> ObjectPath {
712    let extension = match file_format {
713        FileFormat::Csv => "csv",
714        FileFormat::Parquet => "parquet",
715    };
716    ObjectPath::from(format!(
717        "{}/epoch_{}/{}_{}.{}",
718        pipeline, epoch, checkpoint_range.start, checkpoint_range.end, extension
719    ))
720}
721
722/// Parse checkpoint range from filename.
723/// Expected format: `{start}_{end}.{format}` (e.g., `0_100.parquet`)
724pub(crate) fn parse_checkpoint_range(filename: &str) -> Option<Range<u64>> {
725    let base = filename.split('.').next()?;
726    let (start_str, end_str) = base.split_once('_')?;
727    let start: u64 = start_str.parse().ok()?;
728    let end: u64 = end_str.parse().ok()?;
729    Some(start..end)
730}