sui_analytics_indexer/store/
migration.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Migration mode store - uses explicit watermark files and conditional PUT.
5//!
6//! In migration mode, we rewrite existing files (e.g., adding new columns to parquet files).
7//! This module provides the store implementation and utilities to track existing file ranges.
8
9use std::collections::BTreeMap;
10use std::collections::HashMap;
11use std::ops::Range;
12use std::sync::Arc;
13use std::sync::RwLock;
14
15use anyhow::Context;
16use anyhow::Result;
17use anyhow::anyhow;
18use object_store::Error as ObjectStoreError;
19use object_store::ObjectStore;
20use object_store::ObjectStoreExt as _;
21use object_store::PutMode;
22use object_store::PutOptions;
23use object_store::PutPayload;
24use object_store::UpdateVersion;
25use object_store::path::Path as ObjectPath;
26use sui_indexer_alt_framework_store_traits::CommitterWatermark;
27use sui_storage::object_store::util::find_all_dirs_with_epoch_prefix;
28use thiserror::Error;
29use tracing::debug;
30use tracing::info;
31
32use crate::config::PipelineConfig;
33use crate::handlers::CheckpointRows;
34use crate::store::Batch;
35
36/// Error type for watermark updates.
37#[derive(Error, Debug)]
38pub enum WatermarkUpdateError {
39    /// Precondition failure - concurrent writer detected. This is fatal.
40    #[error("Concurrent writer detected on watermark {path}: {message}")]
41    ConcurrentWriter { path: String, message: String },
42
43    /// Transient error - can be retried.
44    #[error("Transient error updating watermark: {0}")]
45    Transient(#[from] anyhow::Error),
46}
47
48/// Version info (etag, version) for conditional PUT operations.
49type VersionInfo = (Option<String>, Option<String>);
50
51/// Simple watermark struct for JSON serialization.
52#[derive(serde::Serialize, serde::Deserialize)]
53pub(crate) struct MigrationWatermark {
54    pub checkpoint_hi_inclusive: u64,
55    /// Epoch of the watermark - used to skip scanning earlier epochs on restart.
56    pub epoch_hi_inclusive: u64,
57}
58
59/// Migration mode - uses explicit watermark files and conditional PUT.
60///
61/// Used for rewriting existing files (e.g., adding new columns to parquet files).
62/// Tracks progress separately via watermark files and uses conditional PUT to
63/// ensure we are overwriting the files we expect.
64#[derive(Clone)]
65pub struct MigrationStore {
66    object_store: Arc<dyn ObjectStore>,
67    /// Migration identifier.
68    migration_id: String,
69    /// Pipeline -> FileRangeIndex (target ranges).
70    file_ranges: Arc<RwLock<HashMap<String, FileRangeIndex>>>,
71    /// Pipeline -> (etag, version) for conditional PUT on watermark files.
72    watermark_versions: Arc<RwLock<HashMap<String, VersionInfo>>>,
73    /// Pre-computed per-pipeline adjusted starting checkpoints.
74    /// Set during pre-loading in build_analytics_indexer.
75    adjusted_start_checkpoints: Arc<RwLock<HashMap<String, u64>>>,
76}
77
78/// Entry for a single file range in the index.
79#[derive(Debug, Clone)]
80pub struct FileRangeEntry {
81    /// Start checkpoint (inclusive).
82    pub start: u64,
83    /// End checkpoint (exclusive).
84    pub end: u64,
85    /// Epoch this file belongs to.
86    pub epoch: u64,
87}
88
89/// Index of existing file ranges for a pipeline.
90///
91/// In migration mode, this is loaded at startup to track target file ranges.
92/// Progress is tracked separately via a watermark file.
93#[derive(Debug, Default, Clone)]
94pub struct FileRangeIndex {
95    /// Map from start_checkpoint -> FileRangeEntry.
96    /// Sorted by start checkpoint for efficient lookups.
97    ranges: BTreeMap<u64, FileRangeEntry>,
98}
99
100impl MigrationStore {
101    /// Create a new migration store.
102    pub fn new(object_store: Arc<dyn ObjectStore>, migration_id: String) -> Self {
103        Self {
104            object_store,
105            migration_id,
106            file_ranges: Arc::new(RwLock::new(HashMap::new())),
107            watermark_versions: Arc::new(RwLock::new(HashMap::new())),
108            adjusted_start_checkpoints: Arc::new(RwLock::new(HashMap::new())),
109        }
110    }
111
112    /// Load file ranges and find the starting/ending checkpoints for migration.
113    ///
114    /// This snaps `first_checkpoint` to file boundaries:
115    /// - If checkpoint is inside a file → snap to file start
116    /// - If checkpoint is in a gap → snap to next file start
117    /// - If no files at or after checkpoint → error
118    ///
119    /// This also snaps `last_checkpoint` to file boundaries:
120    /// - If checkpoint is inside a file → snap to file end (exclusive)
121    /// - If checkpoint is in a gap → snap to previous file end (exclusive)
122    /// - If no files at or before checkpoint → error
123    ///
124    /// Returns (adjusted_first, adjusted_last) where:
125    /// - adjusted_first is the minimum adjusted start across all pipelines
126    /// - adjusted_last is the maximum adjusted end across all pipelines
127    ///
128    /// Each pipeline is specified as (pipeline_name, output_prefix) where:
129    /// - pipeline_name is used as the key in file_ranges map (for lookup by handlers)
130    /// - output_prefix is the path prefix in the object store where files are located
131    pub async fn find_checkpoint_range(
132        &self,
133        pipelines: impl Iterator<Item = (&str, &str)>,
134        first_checkpoint: Option<u64>,
135        last_checkpoint: Option<u64>,
136    ) -> Result<(Option<u64>, Option<u64>)> {
137        let mut file_ranges = HashMap::new();
138        let mut adjusted_starts = HashMap::new();
139        let mut min_adjusted_start: Option<u64> = None;
140        let mut max_adjusted_end: Option<u64> = None;
141
142        for (pipeline_name, output_prefix) in pipelines {
143            // Load file ranges from the output_prefix path in the object store
144            let index =
145                FileRangeIndex::load_from_store(&self.object_store, output_prefix, None).await?;
146
147            // Snap starting checkpoint to file boundary.
148            // If first_checkpoint is not specified, snap from 0 to find the first available file.
149            let first_cp = first_checkpoint.unwrap_or(0);
150            let adjusted = index.snap_to_boundary(first_cp).ok_or_else(|| {
151                anyhow!(
152                    "No files at or after checkpoint {} for pipeline '{}'. Nothing to migrate.",
153                    first_cp,
154                    pipeline_name
155                )
156            })?;
157            // Key by pipeline_name for lookup by handlers
158            adjusted_starts.insert(pipeline_name.to_string(), adjusted);
159            min_adjusted_start = Some(min_adjusted_start.map_or(adjusted, |m| m.min(adjusted)));
160
161            info!(
162                pipeline = pipeline_name,
163                output_prefix,
164                requested_checkpoint = first_cp,
165                adjusted_checkpoint = adjusted,
166                "Snapped first_checkpoint to file boundary"
167            );
168
169            // Compute adjusted ending checkpoint if last_checkpoint specified
170            if let Some(last_cp) = last_checkpoint {
171                let adjusted = index.snap_end_to_boundary(last_cp).ok_or_else(|| {
172                    anyhow!(
173                        "No files at or before checkpoint {} for pipeline '{}'. Nothing to migrate.",
174                        last_cp,
175                        pipeline_name
176                    )
177                })?;
178                // Use max across pipelines so all pipelines get all their data
179                max_adjusted_end = Some(max_adjusted_end.map_or(adjusted, |m| m.max(adjusted)));
180
181                info!(
182                    pipeline = pipeline_name,
183                    requested_checkpoint = last_cp,
184                    adjusted_checkpoint = adjusted,
185                    "Snapped last_checkpoint to file boundary"
186                );
187            }
188
189            // Key by pipeline_name for lookup by handlers
190            file_ranges.insert(pipeline_name.to_string(), index);
191        }
192
193        // Store the loaded data
194        debug!(
195            pipeline_keys = ?file_ranges.keys().collect::<Vec<_>>(),
196            "Loaded file ranges for migration"
197        );
198        *self.file_ranges.write().unwrap() = file_ranges;
199        *self.adjusted_start_checkpoints.write().unwrap() = adjusted_starts;
200
201        // Return adjusted checkpoints.
202        // min_adjusted_start is always set (we snap from 0 if first_checkpoint not specified).
203        // last_checkpoint is inclusive in framework (uses ..=), and snap_end_to_boundary
204        // returns end - 1 which is also inclusive, so they match.
205        Ok((min_adjusted_start, max_adjusted_end.or(last_checkpoint)))
206    }
207
208    pub fn migration_id(&self) -> &str {
209        &self.migration_id
210    }
211
212    pub fn file_ranges(&self) -> &Arc<RwLock<HashMap<String, FileRangeIndex>>> {
213        &self.file_ranges
214    }
215
216    /// Read watermark from metadata file and cache its etag/version.
217    pub(crate) async fn committer_watermark(
218        &self,
219        pipeline: &str,
220    ) -> anyhow::Result<Option<CommitterWatermark>> {
221        let path = migration_watermark_path(pipeline, &self.migration_id);
222        match self.object_store.get(&path).await {
223            Ok(result) => {
224                // Capture etag and version for conditional PUT
225                let e_tag = result.meta.e_tag.clone();
226                let version = result.meta.version.clone();
227                self.watermark_versions
228                    .write()
229                    .unwrap()
230                    .insert(pipeline.to_string(), (e_tag, version));
231
232                let bytes = result.bytes().await?;
233                let watermark: MigrationWatermark = serde_json::from_slice(&bytes)
234                    .context("Failed to parse migration watermark from object store")?;
235                info!(
236                    pipeline,
237                    migration_id = self.migration_id,
238                    epoch = watermark.epoch_hi_inclusive,
239                    checkpoint = watermark.checkpoint_hi_inclusive,
240                    "Migration mode: found progress from watermark file"
241                );
242                Ok(Some(CommitterWatermark {
243                    epoch_hi_inclusive: watermark.epoch_hi_inclusive,
244                    checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive,
245                    tx_hi: 0,
246                    timestamp_ms_hi_inclusive: 0,
247                }))
248            }
249            Err(ObjectStoreError::NotFound { .. }) => Ok(None),
250            Err(e) => Err(e.into()),
251        }
252    }
253
254    /// Update watermark for a single pipeline after successful file upload.
255    ///
256    /// Called by the upload worker after each file is successfully uploaded.
257    /// This provides incremental progress tracking for crash recovery.
258    ///
259    /// Returns `WatermarkUpdateError::ConcurrentWriter` on precondition failure (fatal),
260    /// or `WatermarkUpdateError::Transient` on other errors (can be retried).
261    pub(crate) async fn update_watermark(
262        &self,
263        pipeline: &str,
264        epoch_hi_inclusive: u64,
265        checkpoint_hi_inclusive: u64,
266    ) -> std::result::Result<(), WatermarkUpdateError> {
267        let path = migration_watermark_path(pipeline, &self.migration_id);
268        let json = serde_json::to_vec(&MigrationWatermark {
269            checkpoint_hi_inclusive,
270            epoch_hi_inclusive,
271        })
272        .map_err(|e| WatermarkUpdateError::Transient(e.into()))?;
273
274        // Look up cached etag/version for conditional PUT
275        let (e_tag, version) = self
276            .watermark_versions
277            .read()
278            .unwrap()
279            .get(pipeline)
280            .cloned()
281            .unwrap_or((None, None));
282
283        let mode = if e_tag.is_some() || version.is_some() {
284            PutMode::Update(UpdateVersion { e_tag, version })
285        } else {
286            PutMode::Create
287        };
288
289        let result = self
290            .object_store
291            .put_opts(
292                &path,
293                json.into(),
294                PutOptions {
295                    mode,
296                    ..Default::default()
297                },
298            )
299            .await
300            .map_err(|e| match e {
301                ObjectStoreError::Precondition { path, source } => {
302                    WatermarkUpdateError::ConcurrentWriter {
303                        path: path.to_string(),
304                        message: source.to_string(),
305                    }
306                }
307                other => WatermarkUpdateError::Transient(other.into()),
308            })?;
309
310        // Update cached etag/version
311        self.watermark_versions
312            .write()
313            .unwrap()
314            .insert(pipeline.to_string(), (result.e_tag, result.version));
315
316        tracing::debug!(
317            pipeline,
318            migration_id = self.migration_id,
319            checkpoint = checkpoint_hi_inclusive,
320            epoch = epoch_hi_inclusive,
321            "Updated migration watermark"
322        );
323
324        Ok(())
325    }
326
327    /// Split a batch of checkpoints into files based on existing file boundaries.
328    ///
329    /// In migration mode, we match the boundaries of existing files to ensure
330    /// we can use conditional PUT with the correct e_tag/version.
331    ///
332    /// The watermark indicates the highest checkpoint processed by the framework,
333    /// including empty checkpoints. It's used to detect when the final file is
334    /// complete (when there's no "next row" to trigger boundary detection).
335    pub(crate) fn split_framework_batch_into_files(
336        &self,
337        pipeline_config: &PipelineConfig,
338        batch_from_framework: &[CheckpointRows],
339        mut pending_batch: Batch,
340        watermark: &CommitterWatermark,
341    ) -> (Batch, Vec<Batch>) {
342        let mut complete_batches: Vec<Batch> = Vec::new();
343        let pipeline = pipeline_config.pipeline.name();
344
345        let ranges = self
346            .file_ranges
347            .read()
348            .unwrap()
349            .get(pipeline)
350            .cloned()
351            .expect("migration ranges not loaded for pipeline");
352
353        for checkpoint_rows in batch_from_framework {
354            let cp = checkpoint_rows.checkpoint;
355
356            // Check if we've crossed a file boundary BEFORE adding this checkpoint.
357            // This handles sparse checkpoints - the framework only sends checkpoints with rows,
358            // so we might skip from e.g. 6500 to 6600, missing the exact boundary at 6543.
359            if let Some(first) = pending_batch.first_checkpoint()
360                && let Some(entry) = ranges.find_containing(first)
361                && cp >= entry.end
362            {
363                debug!(
364                    pipeline,
365                    current_cp = cp,
366                    file_start = entry.start,
367                    file_end = entry.end,
368                    "Crossed file boundary - completing batch"
369                );
370                pending_batch.explicit_range = Some(entry.start..entry.end);
371                complete_batches.push(pending_batch);
372                pending_batch = Batch::default();
373            }
374
375            pending_batch.add(checkpoint_rows.clone());
376
377            // Validate that checkpoints with rows are in a known file range
378            if let Some(first) = pending_batch.first_checkpoint()
379                && ranges.find_containing(first).is_none()
380                && !checkpoint_rows.is_empty()
381            {
382                panic!(
383                    "Migration error: checkpoint {} has {} rows but is not in any existing file range for pipeline '{}'. \
384                     File ranges cover checkpoints {:?} to {:?}.",
385                    first,
386                    checkpoint_rows.len(),
387                    pipeline,
388                    ranges.first_checkpoint(),
389                    ranges.last_checkpoint_exclusive(),
390                );
391            }
392        }
393
394        // Check if the pending batch's file is complete based on watermark.
395        // This handles the final batch when there's no "next row" to trigger
396        // the boundary detection above.
397        if let Some(first) = pending_batch.first_checkpoint()
398            && let Some(entry) = ranges.find_containing(first)
399            && watermark.checkpoint_hi_inclusive >= entry.end - 1
400        {
401            debug!(
402                pipeline,
403                watermark_cp = watermark.checkpoint_hi_inclusive,
404                file_start = entry.start,
405                file_end = entry.end,
406                "File complete per watermark - completing batch"
407            );
408            pending_batch.explicit_range = Some(entry.start..entry.end);
409            complete_batches.push(pending_batch);
410            pending_batch = Batch::default();
411        }
412
413        (pending_batch, complete_batches)
414    }
415
416    /// Write a file to the object store with conditional update.
417    ///
418    /// Verifies the checkpoint range matches an existing file in the index,
419    /// then fetches current metadata via HEAD for conditional PUT.
420    ///
421    /// Errors if the file doesn't exist (migration mode requires existing files to replace).
422    pub(crate) async fn write_to_object_store(
423        &self,
424        pipeline: &str,
425        path: &ObjectPath,
426        checkpoint_range: &Range<u64>,
427        payload: PutPayload,
428    ) -> anyhow::Result<()> {
429        // Verify this range exists in our index
430        {
431            let ranges = self.file_ranges.read().unwrap();
432            let pipeline_ranges = ranges.get(pipeline).expect("migration ranges not loaded");
433
434            let entry = pipeline_ranges
435                .find_containing(checkpoint_range.start)
436                .ok_or_else(|| {
437                    anyhow!(
438                        "No file in index for checkpoint range {:?} - migration requires existing files",
439                        checkpoint_range
440                    )
441                })?;
442
443            // Verify the range matches exactly
444            anyhow::ensure!(
445                entry.start == checkpoint_range.start,
446                "checkpoint range start mismatch: expected {}, got {}",
447                entry.start,
448                checkpoint_range.start
449            );
450            anyhow::ensure!(
451                entry.end == checkpoint_range.end,
452                "checkpoint range end mismatch: expected {}, got {}",
453                entry.end,
454                checkpoint_range.end
455            );
456        }
457
458        // Fetch current metadata for conditional PUT (GCS requires version/generation)
459        let meta = self.object_store.head(path).await.map_err(|e| {
460            anyhow!(
461                "Failed to get metadata for {} (file should exist for migration): {}",
462                path,
463                e
464            )
465        })?;
466
467        self.put_conditional(
468            path,
469            payload,
470            meta.e_tag.as_deref(),
471            meta.version.as_deref(),
472        )
473        .await
474    }
475
476    /// Put a file with conditional update for migration mode.
477    ///
478    /// Uses `PutMode::Update` with etag/version for atomic replacement to prevent
479    /// concurrent modification. GCS requires version (generation), other stores use e_tag.
480    async fn put_conditional(
481        &self,
482        path: &ObjectPath,
483        payload: PutPayload,
484        expected_etag: Option<&str>,
485        expected_version: Option<&str>,
486    ) -> anyhow::Result<()> {
487        let mode = if expected_version.is_some() || expected_etag.is_some() {
488            PutMode::Update(UpdateVersion {
489                e_tag: expected_etag.map(String::from),
490                version: expected_version.map(String::from),
491            })
492        } else {
493            PutMode::Create
494        };
495
496        self.object_store
497            .put_opts(
498                path,
499                payload,
500                PutOptions {
501                    mode,
502                    ..Default::default()
503                },
504            )
505            .await
506            .map_err(|e| match e {
507                ObjectStoreError::Precondition { path, source } => {
508                    anyhow!(
509                        "Concurrent writer detected - etag mismatch for {}: {}",
510                        path,
511                        source
512                    )
513                }
514                ObjectStoreError::AlreadyExists { path, source } => {
515                    anyhow!(
516                        "File already exists (expected for conditional create): {}: {}",
517                        path,
518                        source
519                    )
520                }
521                _ => e.into(),
522            })?;
523        Ok(())
524    }
525}
526
527impl FileRangeIndex {
528    /// Create a new empty index.
529    pub fn new() -> Self {
530        Self::default()
531    }
532
533    /// Insert a file range entry.
534    pub fn insert(&mut self, entry: FileRangeEntry) {
535        self.ranges.insert(entry.start, entry);
536    }
537
538    /// Get the number of file ranges.
539    pub fn len(&self) -> usize {
540        self.ranges.len()
541    }
542
543    /// Check if the index is empty.
544    pub fn is_empty(&self) -> bool {
545        self.ranges.is_empty()
546    }
547
548    /// Find the file range that contains the given checkpoint.
549    pub fn find_containing(&self, checkpoint: u64) -> Option<&FileRangeEntry> {
550        // Find the largest start <= checkpoint
551        self.ranges
552            .range(..=checkpoint)
553            .next_back()
554            .filter(|(_, entry)| checkpoint < entry.end)
555            .map(|(_, entry)| entry)
556    }
557
558    /// Find the next file boundary at or after the given checkpoint.
559    pub fn find_next_boundary(&self, checkpoint: u64) -> Option<&FileRangeEntry> {
560        self.ranges
561            .range(checkpoint..)
562            .next()
563            .map(|(_, entry)| entry)
564    }
565
566    /// Snap a start checkpoint to file boundaries for migration.
567    ///
568    /// - If checkpoint is inside a file → returns file start
569    /// - If checkpoint is in a gap → returns next file start
570    /// - If no files at or after checkpoint → returns None (error case)
571    pub fn snap_to_boundary(&self, checkpoint: u64) -> Option<u64> {
572        // Check if checkpoint is inside a file
573        if let Some(entry) = self.find_containing(checkpoint) {
574            return Some(entry.start);
575        }
576        // Check for next file after checkpoint
577        if let Some(entry) = self.find_next_boundary(checkpoint) {
578            return Some(entry.start);
579        }
580        // No files at or after checkpoint
581        None
582    }
583
584    /// Snap an end checkpoint to file boundaries for migration.
585    ///
586    /// Returns the last checkpoint (inclusive) that should be processed to complete
587    /// the file containing the given checkpoint:
588    /// - If checkpoint is inside a file → returns file's last checkpoint (end - 1)
589    /// - If checkpoint is in a gap → returns previous file's last checkpoint (end - 1)
590    /// - If no files at or before checkpoint → returns None (error case)
591    pub fn snap_end_to_boundary(&self, checkpoint: u64) -> Option<u64> {
592        // Check if checkpoint is inside a file
593        if let Some(entry) = self.find_containing(checkpoint) {
594            // Return last checkpoint in file (end is exclusive, so end - 1 is last inclusive)
595            return Some(entry.end - 1);
596        }
597        // Checkpoint is in a gap - find the previous file's end
598        // Look for the largest start < checkpoint
599        if let Some((_, entry)) = self.ranges.range(..checkpoint).next_back() {
600            return Some(entry.end - 1);
601        }
602        // No files at or before checkpoint
603        None
604    }
605
606    /// Get the first checkpoint across all ranges.
607    pub fn first_checkpoint(&self) -> Option<u64> {
608        self.ranges.keys().next().copied()
609    }
610
611    /// Get the last checkpoint across all ranges (exclusive).
612    pub fn last_checkpoint_exclusive(&self) -> Option<u64> {
613        self.ranges.values().map(|e| e.end).max()
614    }
615
616    /// Load file range index from object store.
617    ///
618    /// This lists all files in the pipeline directory to build the index of
619    /// target ranges for migration. Progress is tracked via a separate watermark file.
620    ///
621    /// If `min_epoch` is provided, only epochs >= min_epoch are scanned, which
622    /// reduces startup time when resuming a migration.
623    pub async fn load_from_store(
624        store: &Arc<dyn ObjectStore>,
625        pipeline: &str,
626        min_epoch: Option<u64>,
627    ) -> Result<Self> {
628        let mut index = Self::new();
629
630        // Find all epoch directories under {pipeline}/epoch_*
631        let prefix = ObjectPath::from(pipeline);
632        let epoch_dirs = find_all_dirs_with_epoch_prefix(store, Some(&prefix)).await?;
633
634        let skipped_epochs = min_epoch
635            .map(|min| epoch_dirs.range(..min).count())
636            .unwrap_or(0);
637
638        for (epoch, epoch_path) in epoch_dirs {
639            // Skip epochs before the watermark
640            if let Some(min) = min_epoch
641                && epoch < min
642            {
643                continue;
644            }
645
646            // List files in this epoch directory
647            let list_result = store.list_with_delimiter(Some(&epoch_path)).await?;
648
649            for obj in list_result.objects {
650                // Parse checkpoint range from filename: {start}_{end}.{format}
651                let Some(filename) = obj.location.filename() else {
652                    continue;
653                };
654                let Some(range) = super::parse_checkpoint_range(filename) else {
655                    continue;
656                };
657
658                index.insert(FileRangeEntry {
659                    start: range.start,
660                    end: range.end,
661                    epoch,
662                });
663            }
664        }
665
666        info!(
667            pipeline,
668            num_files = index.len(),
669            skipped_epochs,
670            min_epoch,
671            first_checkpoint = ?index.first_checkpoint(),
672            last_checkpoint = ?index.last_checkpoint_exclusive(),
673            "Loaded existing file ranges"
674        );
675
676        Ok(index)
677    }
678}
679
680/// Construct the path for a migration watermark file.
681///
682/// Format: `_metadata/watermarks/{pipeline}@migration_{migration_id}.json`
683pub(crate) fn migration_watermark_path(pipeline: &str, migration_id: &str) -> ObjectPath {
684    ObjectPath::from(format!(
685        "_metadata/watermarks/{}@migration_{}.json",
686        pipeline, migration_id
687    ))
688}
689
690#[cfg(test)]
691mod tests {
692    use super::*;
693    use object_store::memory::InMemory;
694
695    #[tokio::test]
696    async fn test_migration_mode_watermark() {
697        let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
698
699        // Create migration store directly to test watermark updates
700        let migration_store = MigrationStore::new(object_store.clone(), "test_migration".into());
701
702        // No watermark file yet
703        let watermark = migration_store
704            .committer_watermark("test_pipeline")
705            .await
706            .unwrap();
707        assert!(watermark.is_none());
708
709        // Update watermark (simulating what uploader does after upload)
710        migration_store
711            .update_watermark("test_pipeline", 5, 500)
712            .await
713            .unwrap();
714
715        // Read it back
716        let watermark = migration_store
717            .committer_watermark("test_pipeline")
718            .await
719            .unwrap();
720        assert!(watermark.is_some());
721        let watermark = watermark.unwrap();
722        assert_eq!(watermark.epoch_hi_inclusive, 5);
723        assert_eq!(watermark.checkpoint_hi_inclusive, 500);
724    }
725
726    #[test]
727    fn test_parse_checkpoint_range() {
728        use super::super::parse_checkpoint_range;
729        assert_eq!(parse_checkpoint_range("0_100.parquet"), Some(0..100));
730        assert_eq!(parse_checkpoint_range("100_200.csv"), Some(100..200));
731        assert_eq!(
732            parse_checkpoint_range("1234_5678.parquet"),
733            Some(1234..5678)
734        );
735        assert_eq!(parse_checkpoint_range("invalid"), None);
736        assert_eq!(parse_checkpoint_range("no_extension"), None);
737        assert_eq!(parse_checkpoint_range("a_b.parquet"), None);
738    }
739
740    #[test]
741    fn test_find_containing() {
742        let mut index = FileRangeIndex::new();
743        index.insert(FileRangeEntry {
744            start: 0,
745            end: 100,
746            epoch: 0,
747        });
748        index.insert(FileRangeEntry {
749            start: 100,
750            end: 200,
751            epoch: 0,
752        });
753        index.insert(FileRangeEntry {
754            start: 200,
755            end: 300,
756            epoch: 1,
757        });
758
759        // Test checkpoint in first range
760        let result = index.find_containing(50);
761        assert!(result.is_some());
762        let entry = result.unwrap();
763        assert_eq!(entry.start, 0);
764        assert_eq!(entry.end, 100);
765
766        // Test checkpoint at boundary (should be in second range)
767        let result = index.find_containing(100);
768        assert!(result.is_some());
769        let entry = result.unwrap();
770        assert_eq!(entry.start, 100);
771        assert_eq!(entry.end, 200);
772
773        // Test checkpoint not in any range
774        let result = index.find_containing(300);
775        assert!(result.is_none());
776    }
777
778    #[test]
779    fn test_find_next_boundary() {
780        let mut index = FileRangeIndex::new();
781        index.insert(FileRangeEntry {
782            start: 0,
783            end: 100,
784            epoch: 0,
785        });
786        index.insert(FileRangeEntry {
787            start: 100,
788            end: 200,
789            epoch: 0,
790        });
791
792        // From checkpoint 0, next boundary is at 0
793        let result = index.find_next_boundary(0);
794        assert!(result.is_some());
795        assert_eq!(result.unwrap().start, 0);
796
797        // From checkpoint 50, next boundary is at 100
798        let result = index.find_next_boundary(50);
799        assert!(result.is_some());
800        assert_eq!(result.unwrap().start, 100);
801
802        // From checkpoint 200, no more boundaries
803        let result = index.find_next_boundary(200);
804        assert!(result.is_none());
805    }
806
807    #[test]
808    fn test_snap_to_boundary() {
809        let mut index = FileRangeIndex::new();
810        // Files: 0-100, 200-300 (gap at 100-200)
811        index.insert(FileRangeEntry {
812            start: 0,
813            end: 100,
814            epoch: 0,
815        });
816        index.insert(FileRangeEntry {
817            start: 200,
818            end: 300,
819            epoch: 1,
820        });
821
822        // Checkpoint inside first file → snap to file start
823        assert_eq!(index.snap_to_boundary(50), Some(0));
824        assert_eq!(index.snap_to_boundary(0), Some(0));
825        assert_eq!(index.snap_to_boundary(99), Some(0));
826
827        // Checkpoint in gap → snap to next file start
828        assert_eq!(index.snap_to_boundary(100), Some(200));
829        assert_eq!(index.snap_to_boundary(150), Some(200));
830        assert_eq!(index.snap_to_boundary(199), Some(200));
831
832        // Checkpoint inside second file → snap to file start
833        assert_eq!(index.snap_to_boundary(200), Some(200));
834        assert_eq!(index.snap_to_boundary(250), Some(200));
835        assert_eq!(index.snap_to_boundary(299), Some(200));
836
837        // Checkpoint beyond all files → None (error case)
838        assert_eq!(index.snap_to_boundary(300), None);
839        assert_eq!(index.snap_to_boundary(1000), None);
840    }
841
842    #[test]
843    fn test_snap_end_to_boundary() {
844        let mut index = FileRangeIndex::new();
845        // Files: 0-100, 200-300 (gap at 100-200)
846        // File ranges are [start, end) so file 0-100 contains checkpoints 0-99
847        index.insert(FileRangeEntry {
848            start: 0,
849            end: 100,
850            epoch: 0,
851        });
852        index.insert(FileRangeEntry {
853            start: 200,
854            end: 300,
855            epoch: 1,
856        });
857
858        // Checkpoint inside first file → snap to last checkpoint in file (99)
859        assert_eq!(index.snap_end_to_boundary(0), Some(99));
860        assert_eq!(index.snap_end_to_boundary(50), Some(99));
861        assert_eq!(index.snap_end_to_boundary(99), Some(99));
862
863        // Checkpoint in gap → snap to previous file's last checkpoint (99)
864        assert_eq!(index.snap_end_to_boundary(100), Some(99));
865        assert_eq!(index.snap_end_to_boundary(150), Some(99));
866        assert_eq!(index.snap_end_to_boundary(199), Some(99));
867
868        // Checkpoint inside second file → snap to last checkpoint in file (299)
869        assert_eq!(index.snap_end_to_boundary(200), Some(299));
870        assert_eq!(index.snap_end_to_boundary(250), Some(299));
871        assert_eq!(index.snap_end_to_boundary(299), Some(299));
872
873        // Checkpoint at or beyond second file end → snap to last checkpoint (299)
874        assert_eq!(index.snap_end_to_boundary(300), Some(299));
875        assert_eq!(index.snap_end_to_boundary(1000), Some(299));
876    }
877}