1use std::collections::BTreeMap;
10use std::collections::HashMap;
11use std::ops::Range;
12use std::sync::Arc;
13use std::sync::RwLock;
14
15use anyhow::Context;
16use anyhow::Result;
17use anyhow::anyhow;
18use object_store::Error as ObjectStoreError;
19use object_store::ObjectStore;
20use object_store::ObjectStoreExt as _;
21use object_store::PutMode;
22use object_store::PutOptions;
23use object_store::PutPayload;
24use object_store::UpdateVersion;
25use object_store::path::Path as ObjectPath;
26use sui_indexer_alt_framework_store_traits::CommitterWatermark;
27use sui_storage::object_store::util::find_all_dirs_with_epoch_prefix;
28use thiserror::Error;
29use tracing::debug;
30use tracing::info;
31
32use crate::config::PipelineConfig;
33use crate::handlers::CheckpointRows;
34use crate::store::Batch;
35
36#[derive(Error, Debug)]
38pub enum WatermarkUpdateError {
39 #[error("Concurrent writer detected on watermark {path}: {message}")]
41 ConcurrentWriter { path: String, message: String },
42
43 #[error("Transient error updating watermark: {0}")]
45 Transient(#[from] anyhow::Error),
46}
47
48type VersionInfo = (Option<String>, Option<String>);
50
51#[derive(serde::Serialize, serde::Deserialize)]
53pub(crate) struct MigrationWatermark {
54 pub checkpoint_hi_inclusive: u64,
55 pub epoch_hi_inclusive: u64,
57}
58
59#[derive(Clone)]
65pub struct MigrationStore {
66 object_store: Arc<dyn ObjectStore>,
67 migration_id: String,
69 file_ranges: Arc<RwLock<HashMap<String, FileRangeIndex>>>,
71 watermark_versions: Arc<RwLock<HashMap<String, VersionInfo>>>,
73 adjusted_start_checkpoints: Arc<RwLock<HashMap<String, u64>>>,
76}
77
78#[derive(Debug, Clone)]
80pub struct FileRangeEntry {
81 pub start: u64,
83 pub end: u64,
85 pub epoch: u64,
87}
88
89#[derive(Debug, Default, Clone)]
94pub struct FileRangeIndex {
95 ranges: BTreeMap<u64, FileRangeEntry>,
98}
99
100impl MigrationStore {
101 pub fn new(object_store: Arc<dyn ObjectStore>, migration_id: String) -> Self {
103 Self {
104 object_store,
105 migration_id,
106 file_ranges: Arc::new(RwLock::new(HashMap::new())),
107 watermark_versions: Arc::new(RwLock::new(HashMap::new())),
108 adjusted_start_checkpoints: Arc::new(RwLock::new(HashMap::new())),
109 }
110 }
111
112 pub async fn find_checkpoint_range(
132 &self,
133 pipelines: impl Iterator<Item = (&str, &str)>,
134 first_checkpoint: Option<u64>,
135 last_checkpoint: Option<u64>,
136 ) -> Result<(Option<u64>, Option<u64>)> {
137 let mut file_ranges = HashMap::new();
138 let mut adjusted_starts = HashMap::new();
139 let mut min_adjusted_start: Option<u64> = None;
140 let mut max_adjusted_end: Option<u64> = None;
141
142 for (pipeline_name, output_prefix) in pipelines {
143 let index =
145 FileRangeIndex::load_from_store(&self.object_store, output_prefix, None).await?;
146
147 let first_cp = first_checkpoint.unwrap_or(0);
150 let adjusted = index.snap_to_boundary(first_cp).ok_or_else(|| {
151 anyhow!(
152 "No files at or after checkpoint {} for pipeline '{}'. Nothing to migrate.",
153 first_cp,
154 pipeline_name
155 )
156 })?;
157 adjusted_starts.insert(pipeline_name.to_string(), adjusted);
159 min_adjusted_start = Some(min_adjusted_start.map_or(adjusted, |m| m.min(adjusted)));
160
161 info!(
162 pipeline = pipeline_name,
163 output_prefix,
164 requested_checkpoint = first_cp,
165 adjusted_checkpoint = adjusted,
166 "Snapped first_checkpoint to file boundary"
167 );
168
169 if let Some(last_cp) = last_checkpoint {
171 let adjusted = index.snap_end_to_boundary(last_cp).ok_or_else(|| {
172 anyhow!(
173 "No files at or before checkpoint {} for pipeline '{}'. Nothing to migrate.",
174 last_cp,
175 pipeline_name
176 )
177 })?;
178 max_adjusted_end = Some(max_adjusted_end.map_or(adjusted, |m| m.max(adjusted)));
180
181 info!(
182 pipeline = pipeline_name,
183 requested_checkpoint = last_cp,
184 adjusted_checkpoint = adjusted,
185 "Snapped last_checkpoint to file boundary"
186 );
187 }
188
189 file_ranges.insert(pipeline_name.to_string(), index);
191 }
192
193 debug!(
195 pipeline_keys = ?file_ranges.keys().collect::<Vec<_>>(),
196 "Loaded file ranges for migration"
197 );
198 *self.file_ranges.write().unwrap() = file_ranges;
199 *self.adjusted_start_checkpoints.write().unwrap() = adjusted_starts;
200
201 Ok((min_adjusted_start, max_adjusted_end.or(last_checkpoint)))
206 }
207
208 pub fn migration_id(&self) -> &str {
209 &self.migration_id
210 }
211
212 pub fn file_ranges(&self) -> &Arc<RwLock<HashMap<String, FileRangeIndex>>> {
213 &self.file_ranges
214 }
215
216 pub(crate) async fn committer_watermark(
218 &self,
219 pipeline: &str,
220 ) -> anyhow::Result<Option<CommitterWatermark>> {
221 let path = migration_watermark_path(pipeline, &self.migration_id);
222 match self.object_store.get(&path).await {
223 Ok(result) => {
224 let e_tag = result.meta.e_tag.clone();
226 let version = result.meta.version.clone();
227 self.watermark_versions
228 .write()
229 .unwrap()
230 .insert(pipeline.to_string(), (e_tag, version));
231
232 let bytes = result.bytes().await?;
233 let watermark: MigrationWatermark = serde_json::from_slice(&bytes)
234 .context("Failed to parse migration watermark from object store")?;
235 info!(
236 pipeline,
237 migration_id = self.migration_id,
238 epoch = watermark.epoch_hi_inclusive,
239 checkpoint = watermark.checkpoint_hi_inclusive,
240 "Migration mode: found progress from watermark file"
241 );
242 Ok(Some(CommitterWatermark {
243 epoch_hi_inclusive: watermark.epoch_hi_inclusive,
244 checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive,
245 tx_hi: 0,
246 timestamp_ms_hi_inclusive: 0,
247 }))
248 }
249 Err(ObjectStoreError::NotFound { .. }) => Ok(None),
250 Err(e) => Err(e.into()),
251 }
252 }
253
254 pub(crate) async fn update_watermark(
262 &self,
263 pipeline: &str,
264 epoch_hi_inclusive: u64,
265 checkpoint_hi_inclusive: u64,
266 ) -> std::result::Result<(), WatermarkUpdateError> {
267 let path = migration_watermark_path(pipeline, &self.migration_id);
268 let json = serde_json::to_vec(&MigrationWatermark {
269 checkpoint_hi_inclusive,
270 epoch_hi_inclusive,
271 })
272 .map_err(|e| WatermarkUpdateError::Transient(e.into()))?;
273
274 let (e_tag, version) = self
276 .watermark_versions
277 .read()
278 .unwrap()
279 .get(pipeline)
280 .cloned()
281 .unwrap_or((None, None));
282
283 let mode = if e_tag.is_some() || version.is_some() {
284 PutMode::Update(UpdateVersion { e_tag, version })
285 } else {
286 PutMode::Create
287 };
288
289 let result = self
290 .object_store
291 .put_opts(
292 &path,
293 json.into(),
294 PutOptions {
295 mode,
296 ..Default::default()
297 },
298 )
299 .await
300 .map_err(|e| match e {
301 ObjectStoreError::Precondition { path, source } => {
302 WatermarkUpdateError::ConcurrentWriter {
303 path: path.to_string(),
304 message: source.to_string(),
305 }
306 }
307 other => WatermarkUpdateError::Transient(other.into()),
308 })?;
309
310 self.watermark_versions
312 .write()
313 .unwrap()
314 .insert(pipeline.to_string(), (result.e_tag, result.version));
315
316 tracing::debug!(
317 pipeline,
318 migration_id = self.migration_id,
319 checkpoint = checkpoint_hi_inclusive,
320 epoch = epoch_hi_inclusive,
321 "Updated migration watermark"
322 );
323
324 Ok(())
325 }
326
327 pub(crate) fn split_framework_batch_into_files(
336 &self,
337 pipeline_config: &PipelineConfig,
338 batch_from_framework: &[CheckpointRows],
339 mut pending_batch: Batch,
340 watermark: &CommitterWatermark,
341 ) -> (Batch, Vec<Batch>) {
342 let mut complete_batches: Vec<Batch> = Vec::new();
343 let pipeline = pipeline_config.pipeline.name();
344
345 let ranges = self
346 .file_ranges
347 .read()
348 .unwrap()
349 .get(pipeline)
350 .cloned()
351 .expect("migration ranges not loaded for pipeline");
352
353 for checkpoint_rows in batch_from_framework {
354 let cp = checkpoint_rows.checkpoint;
355
356 if let Some(first) = pending_batch.first_checkpoint()
360 && let Some(entry) = ranges.find_containing(first)
361 && cp >= entry.end
362 {
363 debug!(
364 pipeline,
365 current_cp = cp,
366 file_start = entry.start,
367 file_end = entry.end,
368 "Crossed file boundary - completing batch"
369 );
370 pending_batch.explicit_range = Some(entry.start..entry.end);
371 complete_batches.push(pending_batch);
372 pending_batch = Batch::default();
373 }
374
375 pending_batch.add(checkpoint_rows.clone());
376
377 if let Some(first) = pending_batch.first_checkpoint()
379 && ranges.find_containing(first).is_none()
380 && !checkpoint_rows.is_empty()
381 {
382 panic!(
383 "Migration error: checkpoint {} has {} rows but is not in any existing file range for pipeline '{}'. \
384 File ranges cover checkpoints {:?} to {:?}.",
385 first,
386 checkpoint_rows.len(),
387 pipeline,
388 ranges.first_checkpoint(),
389 ranges.last_checkpoint_exclusive(),
390 );
391 }
392 }
393
394 if let Some(first) = pending_batch.first_checkpoint()
398 && let Some(entry) = ranges.find_containing(first)
399 && watermark.checkpoint_hi_inclusive >= entry.end - 1
400 {
401 debug!(
402 pipeline,
403 watermark_cp = watermark.checkpoint_hi_inclusive,
404 file_start = entry.start,
405 file_end = entry.end,
406 "File complete per watermark - completing batch"
407 );
408 pending_batch.explicit_range = Some(entry.start..entry.end);
409 complete_batches.push(pending_batch);
410 pending_batch = Batch::default();
411 }
412
413 (pending_batch, complete_batches)
414 }
415
416 pub(crate) async fn write_to_object_store(
423 &self,
424 pipeline: &str,
425 path: &ObjectPath,
426 checkpoint_range: &Range<u64>,
427 payload: PutPayload,
428 ) -> anyhow::Result<()> {
429 {
431 let ranges = self.file_ranges.read().unwrap();
432 let pipeline_ranges = ranges.get(pipeline).expect("migration ranges not loaded");
433
434 let entry = pipeline_ranges
435 .find_containing(checkpoint_range.start)
436 .ok_or_else(|| {
437 anyhow!(
438 "No file in index for checkpoint range {:?} - migration requires existing files",
439 checkpoint_range
440 )
441 })?;
442
443 anyhow::ensure!(
445 entry.start == checkpoint_range.start,
446 "checkpoint range start mismatch: expected {}, got {}",
447 entry.start,
448 checkpoint_range.start
449 );
450 anyhow::ensure!(
451 entry.end == checkpoint_range.end,
452 "checkpoint range end mismatch: expected {}, got {}",
453 entry.end,
454 checkpoint_range.end
455 );
456 }
457
458 let meta = self.object_store.head(path).await.map_err(|e| {
460 anyhow!(
461 "Failed to get metadata for {} (file should exist for migration): {}",
462 path,
463 e
464 )
465 })?;
466
467 self.put_conditional(
468 path,
469 payload,
470 meta.e_tag.as_deref(),
471 meta.version.as_deref(),
472 )
473 .await
474 }
475
476 async fn put_conditional(
481 &self,
482 path: &ObjectPath,
483 payload: PutPayload,
484 expected_etag: Option<&str>,
485 expected_version: Option<&str>,
486 ) -> anyhow::Result<()> {
487 let mode = if expected_version.is_some() || expected_etag.is_some() {
488 PutMode::Update(UpdateVersion {
489 e_tag: expected_etag.map(String::from),
490 version: expected_version.map(String::from),
491 })
492 } else {
493 PutMode::Create
494 };
495
496 self.object_store
497 .put_opts(
498 path,
499 payload,
500 PutOptions {
501 mode,
502 ..Default::default()
503 },
504 )
505 .await
506 .map_err(|e| match e {
507 ObjectStoreError::Precondition { path, source } => {
508 anyhow!(
509 "Concurrent writer detected - etag mismatch for {}: {}",
510 path,
511 source
512 )
513 }
514 ObjectStoreError::AlreadyExists { path, source } => {
515 anyhow!(
516 "File already exists (expected for conditional create): {}: {}",
517 path,
518 source
519 )
520 }
521 _ => e.into(),
522 })?;
523 Ok(())
524 }
525}
526
527impl FileRangeIndex {
528 pub fn new() -> Self {
530 Self::default()
531 }
532
533 pub fn insert(&mut self, entry: FileRangeEntry) {
535 self.ranges.insert(entry.start, entry);
536 }
537
538 pub fn len(&self) -> usize {
540 self.ranges.len()
541 }
542
543 pub fn is_empty(&self) -> bool {
545 self.ranges.is_empty()
546 }
547
548 pub fn find_containing(&self, checkpoint: u64) -> Option<&FileRangeEntry> {
550 self.ranges
552 .range(..=checkpoint)
553 .next_back()
554 .filter(|(_, entry)| checkpoint < entry.end)
555 .map(|(_, entry)| entry)
556 }
557
558 pub fn find_next_boundary(&self, checkpoint: u64) -> Option<&FileRangeEntry> {
560 self.ranges
561 .range(checkpoint..)
562 .next()
563 .map(|(_, entry)| entry)
564 }
565
566 pub fn snap_to_boundary(&self, checkpoint: u64) -> Option<u64> {
572 if let Some(entry) = self.find_containing(checkpoint) {
574 return Some(entry.start);
575 }
576 if let Some(entry) = self.find_next_boundary(checkpoint) {
578 return Some(entry.start);
579 }
580 None
582 }
583
584 pub fn snap_end_to_boundary(&self, checkpoint: u64) -> Option<u64> {
592 if let Some(entry) = self.find_containing(checkpoint) {
594 return Some(entry.end - 1);
596 }
597 if let Some((_, entry)) = self.ranges.range(..checkpoint).next_back() {
600 return Some(entry.end - 1);
601 }
602 None
604 }
605
606 pub fn first_checkpoint(&self) -> Option<u64> {
608 self.ranges.keys().next().copied()
609 }
610
611 pub fn last_checkpoint_exclusive(&self) -> Option<u64> {
613 self.ranges.values().map(|e| e.end).max()
614 }
615
616 pub async fn load_from_store(
624 store: &Arc<dyn ObjectStore>,
625 pipeline: &str,
626 min_epoch: Option<u64>,
627 ) -> Result<Self> {
628 let mut index = Self::new();
629
630 let prefix = ObjectPath::from(pipeline);
632 let epoch_dirs = find_all_dirs_with_epoch_prefix(store, Some(&prefix)).await?;
633
634 let skipped_epochs = min_epoch
635 .map(|min| epoch_dirs.range(..min).count())
636 .unwrap_or(0);
637
638 for (epoch, epoch_path) in epoch_dirs {
639 if let Some(min) = min_epoch
641 && epoch < min
642 {
643 continue;
644 }
645
646 let list_result = store.list_with_delimiter(Some(&epoch_path)).await?;
648
649 for obj in list_result.objects {
650 let Some(filename) = obj.location.filename() else {
652 continue;
653 };
654 let Some(range) = super::parse_checkpoint_range(filename) else {
655 continue;
656 };
657
658 index.insert(FileRangeEntry {
659 start: range.start,
660 end: range.end,
661 epoch,
662 });
663 }
664 }
665
666 info!(
667 pipeline,
668 num_files = index.len(),
669 skipped_epochs,
670 min_epoch,
671 first_checkpoint = ?index.first_checkpoint(),
672 last_checkpoint = ?index.last_checkpoint_exclusive(),
673 "Loaded existing file ranges"
674 );
675
676 Ok(index)
677 }
678}
679
680pub(crate) fn migration_watermark_path(pipeline: &str, migration_id: &str) -> ObjectPath {
684 ObjectPath::from(format!(
685 "_metadata/watermarks/{}@migration_{}.json",
686 pipeline, migration_id
687 ))
688}
689
690#[cfg(test)]
691mod tests {
692 use super::*;
693 use object_store::memory::InMemory;
694
695 #[tokio::test]
696 async fn test_migration_mode_watermark() {
697 let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
698
699 let migration_store = MigrationStore::new(object_store.clone(), "test_migration".into());
701
702 let watermark = migration_store
704 .committer_watermark("test_pipeline")
705 .await
706 .unwrap();
707 assert!(watermark.is_none());
708
709 migration_store
711 .update_watermark("test_pipeline", 5, 500)
712 .await
713 .unwrap();
714
715 let watermark = migration_store
717 .committer_watermark("test_pipeline")
718 .await
719 .unwrap();
720 assert!(watermark.is_some());
721 let watermark = watermark.unwrap();
722 assert_eq!(watermark.epoch_hi_inclusive, 5);
723 assert_eq!(watermark.checkpoint_hi_inclusive, 500);
724 }
725
726 #[test]
727 fn test_parse_checkpoint_range() {
728 use super::super::parse_checkpoint_range;
729 assert_eq!(parse_checkpoint_range("0_100.parquet"), Some(0..100));
730 assert_eq!(parse_checkpoint_range("100_200.csv"), Some(100..200));
731 assert_eq!(
732 parse_checkpoint_range("1234_5678.parquet"),
733 Some(1234..5678)
734 );
735 assert_eq!(parse_checkpoint_range("invalid"), None);
736 assert_eq!(parse_checkpoint_range("no_extension"), None);
737 assert_eq!(parse_checkpoint_range("a_b.parquet"), None);
738 }
739
740 #[test]
741 fn test_find_containing() {
742 let mut index = FileRangeIndex::new();
743 index.insert(FileRangeEntry {
744 start: 0,
745 end: 100,
746 epoch: 0,
747 });
748 index.insert(FileRangeEntry {
749 start: 100,
750 end: 200,
751 epoch: 0,
752 });
753 index.insert(FileRangeEntry {
754 start: 200,
755 end: 300,
756 epoch: 1,
757 });
758
759 let result = index.find_containing(50);
761 assert!(result.is_some());
762 let entry = result.unwrap();
763 assert_eq!(entry.start, 0);
764 assert_eq!(entry.end, 100);
765
766 let result = index.find_containing(100);
768 assert!(result.is_some());
769 let entry = result.unwrap();
770 assert_eq!(entry.start, 100);
771 assert_eq!(entry.end, 200);
772
773 let result = index.find_containing(300);
775 assert!(result.is_none());
776 }
777
778 #[test]
779 fn test_find_next_boundary() {
780 let mut index = FileRangeIndex::new();
781 index.insert(FileRangeEntry {
782 start: 0,
783 end: 100,
784 epoch: 0,
785 });
786 index.insert(FileRangeEntry {
787 start: 100,
788 end: 200,
789 epoch: 0,
790 });
791
792 let result = index.find_next_boundary(0);
794 assert!(result.is_some());
795 assert_eq!(result.unwrap().start, 0);
796
797 let result = index.find_next_boundary(50);
799 assert!(result.is_some());
800 assert_eq!(result.unwrap().start, 100);
801
802 let result = index.find_next_boundary(200);
804 assert!(result.is_none());
805 }
806
807 #[test]
808 fn test_snap_to_boundary() {
809 let mut index = FileRangeIndex::new();
810 index.insert(FileRangeEntry {
812 start: 0,
813 end: 100,
814 epoch: 0,
815 });
816 index.insert(FileRangeEntry {
817 start: 200,
818 end: 300,
819 epoch: 1,
820 });
821
822 assert_eq!(index.snap_to_boundary(50), Some(0));
824 assert_eq!(index.snap_to_boundary(0), Some(0));
825 assert_eq!(index.snap_to_boundary(99), Some(0));
826
827 assert_eq!(index.snap_to_boundary(100), Some(200));
829 assert_eq!(index.snap_to_boundary(150), Some(200));
830 assert_eq!(index.snap_to_boundary(199), Some(200));
831
832 assert_eq!(index.snap_to_boundary(200), Some(200));
834 assert_eq!(index.snap_to_boundary(250), Some(200));
835 assert_eq!(index.snap_to_boundary(299), Some(200));
836
837 assert_eq!(index.snap_to_boundary(300), None);
839 assert_eq!(index.snap_to_boundary(1000), None);
840 }
841
842 #[test]
843 fn test_snap_end_to_boundary() {
844 let mut index = FileRangeIndex::new();
845 index.insert(FileRangeEntry {
848 start: 0,
849 end: 100,
850 epoch: 0,
851 });
852 index.insert(FileRangeEntry {
853 start: 200,
854 end: 300,
855 epoch: 1,
856 });
857
858 assert_eq!(index.snap_end_to_boundary(0), Some(99));
860 assert_eq!(index.snap_end_to_boundary(50), Some(99));
861 assert_eq!(index.snap_end_to_boundary(99), Some(99));
862
863 assert_eq!(index.snap_end_to_boundary(100), Some(99));
865 assert_eq!(index.snap_end_to_boundary(150), Some(99));
866 assert_eq!(index.snap_end_to_boundary(199), Some(99));
867
868 assert_eq!(index.snap_end_to_boundary(200), Some(299));
870 assert_eq!(index.snap_end_to_boundary(250), Some(299));
871 assert_eq!(index.snap_end_to_boundary(299), Some(299));
872
873 assert_eq!(index.snap_end_to_boundary(300), Some(299));
875 assert_eq!(index.snap_end_to_boundary(1000), Some(299));
876 }
877}