sui_core/
db_checkpoint_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_store_pruner::{
5    AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
6};
7use crate::authority::authority_store_tables::AuthorityPerpetualTables;
8use crate::checkpoints::CheckpointStore;
9use crate::rpc_index::RpcIndexStore;
10use anyhow::Result;
11use bytes::Bytes;
12use futures::future::try_join_all;
13use object_store::path::Path;
14use object_store::{DynObjectStore, ObjectStoreExt};
15use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
16use std::fs;
17use std::num::NonZeroUsize;
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::time::Duration;
21use sui_config::node::AuthorityStorePruningConfig;
22use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
23use sui_storage::object_store::util::{
24    copy_recursively, find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs,
25    path_to_filesystem, put, run_manifest_update_loop, write_snapshot_manifest,
26};
27use tracing::{debug, error, info};
28
29pub const SUCCESS_MARKER: &str = "_SUCCESS";
30pub const TEST_MARKER: &str = "_TEST";
31pub const UPLOAD_COMPLETED_MARKER: &str = "_UPLOAD_COMPLETED";
32pub const STATE_SNAPSHOT_COMPLETED_MARKER: &str = "_STATE_SNAPSHOT_COMPLETED";
33
34pub struct DBCheckpointMetrics {
35    pub first_missing_db_checkpoint_epoch: IntGauge,
36    pub num_local_db_checkpoints: IntGauge,
37}
38
39impl DBCheckpointMetrics {
40    pub fn new(registry: &Registry) -> Arc<Self> {
41        let this = Self {
42            first_missing_db_checkpoint_epoch: register_int_gauge_with_registry!(
43                "first_missing_db_checkpoint_epoch",
44                "First epoch for which we have no db checkpoint in remote store",
45                registry
46            )
47            .unwrap(),
48            num_local_db_checkpoints: register_int_gauge_with_registry!(
49                "num_local_db_checkpoints",
50                "Number of RocksDB checkpoints currently residing on local disk (i.e. not yet garbage collected)",
51                registry
52            )
53            .unwrap(),
54        };
55        Arc::new(this)
56    }
57}
58
59pub struct DBCheckpointHandler {
60    /// Directory on local disk where db checkpoints are stored
61    input_object_store: Arc<DynObjectStore>,
62    /// DB checkpoint directory on local filesystem
63    input_root_path: PathBuf,
64    /// Bucket on cloud object store where db checkpoints will be copied
65    output_object_store: Option<Arc<DynObjectStore>>,
66    /// Time interval to check for presence of new db checkpoint
67    interval: Duration,
68    /// File markers which signal that local db checkpoint can be garbage collected
69    gc_markers: Vec<String>,
70    /// Boolean flag to enable/disable object pruning and manual compaction before upload
71    prune_and_compact_before_upload: bool,
72    /// If true, upload will block on state snapshot upload completed marker
73    state_snapshot_enabled: bool,
74    /// Pruning objects
75    pruning_config: AuthorityStorePruningConfig,
76    metrics: Arc<DBCheckpointMetrics>,
77}
78
79impl DBCheckpointHandler {
80    pub fn new(
81        input_path: &std::path::Path,
82        output_object_store_config: Option<&ObjectStoreConfig>,
83        interval_s: u64,
84        prune_and_compact_before_upload: bool,
85        pruning_config: AuthorityStorePruningConfig,
86        registry: &Registry,
87        state_snapshot_enabled: bool,
88    ) -> Result<Arc<Self>> {
89        let input_store_config = ObjectStoreConfig {
90            object_store: Some(ObjectStoreType::File),
91            directory: Some(input_path.to_path_buf()),
92            ..Default::default()
93        };
94        let mut gc_markers = vec![UPLOAD_COMPLETED_MARKER.to_string()];
95        if state_snapshot_enabled {
96            gc_markers.push(STATE_SNAPSHOT_COMPLETED_MARKER.to_string());
97        }
98        Ok(Arc::new(DBCheckpointHandler {
99            input_object_store: input_store_config.make()?,
100            input_root_path: input_path.to_path_buf(),
101            output_object_store: output_object_store_config
102                .map(|config| config.make().expect("Failed to make object store")),
103            interval: Duration::from_secs(interval_s),
104            gc_markers,
105            prune_and_compact_before_upload,
106            state_snapshot_enabled,
107            pruning_config,
108            metrics: DBCheckpointMetrics::new(registry),
109        }))
110    }
111    pub fn new_for_test(
112        input_object_store_config: &ObjectStoreConfig,
113        output_object_store_config: Option<&ObjectStoreConfig>,
114        interval_s: u64,
115        prune_and_compact_before_upload: bool,
116        state_snapshot_enabled: bool,
117    ) -> Result<Arc<Self>> {
118        Ok(Arc::new(DBCheckpointHandler {
119            input_object_store: input_object_store_config.make()?,
120            input_root_path: input_object_store_config
121                .directory
122                .as_ref()
123                .unwrap()
124                .clone(),
125            output_object_store: output_object_store_config
126                .map(|config| config.make().expect("Failed to make object store")),
127            interval: Duration::from_secs(interval_s),
128            gc_markers: vec![UPLOAD_COMPLETED_MARKER.to_string(), TEST_MARKER.to_string()],
129            prune_and_compact_before_upload,
130            state_snapshot_enabled,
131            pruning_config: AuthorityStorePruningConfig::default(),
132            metrics: DBCheckpointMetrics::new(&Registry::default()),
133        }))
134    }
135    pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
136        let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
137        if self.output_object_store.is_some() {
138            tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
139                self.clone(),
140                kill_sender.subscribe(),
141            ));
142            tokio::task::spawn(run_manifest_update_loop(
143                self.output_object_store.as_ref().unwrap().clone(),
144                kill_sender.subscribe(),
145            ));
146        } else {
147            // if db checkpoint remote store is not specified, cleanup loop
148            // is run to immediately mark db checkpoint upload as successful
149            // so that they can be snapshotted and garbage collected
150            tokio::task::spawn(Self::run_db_checkpoint_cleanup_loop(
151                self.clone(),
152                kill_sender.subscribe(),
153            ));
154        }
155        tokio::task::spawn(Self::run_db_checkpoint_gc_loop(
156            self,
157            kill_sender.subscribe(),
158        ));
159        kill_sender
160    }
161    async fn run_db_checkpoint_upload_loop(
162        self: Arc<Self>,
163        mut recv: tokio::sync::broadcast::Receiver<()>,
164    ) -> Result<()> {
165        let mut interval = tokio::time::interval(self.interval);
166        info!("DB checkpoint upload loop started");
167        loop {
168            tokio::select! {
169                _now = interval.tick() => {
170                    let local_checkpoints_by_epoch =
171                        find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
172                    self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
173                    match find_missing_epochs_dirs(self.output_object_store.as_ref().unwrap(), SUCCESS_MARKER).await {
174                        Ok(epochs) => {
175                            self.metrics.first_missing_db_checkpoint_epoch.set(epochs.first().cloned().unwrap_or(0) as i64);
176                            if let Err(err) = self.upload_db_checkpoints_to_object_store(epochs).await {
177                                error!("Failed to upload db checkpoint to remote store with err: {:?}", err);
178                            }
179                        }
180                        Err(err) => {
181                            error!("Failed to find missing db checkpoints in remote store: {:?}", err);
182                        }
183                    }
184                },
185                 _ = recv.recv() => break,
186            }
187        }
188        Ok(())
189    }
190    async fn run_db_checkpoint_cleanup_loop(
191        self: Arc<Self>,
192        mut recv: tokio::sync::broadcast::Receiver<()>,
193    ) -> Result<()> {
194        let mut interval = tokio::time::interval(self.interval);
195        info!("DB checkpoint upload disabled. DB checkpoint cleanup loop started");
196        loop {
197            tokio::select! {
198                _now = interval.tick() => {
199                    let local_checkpoints_by_epoch =
200                        find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
201                    self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
202                    let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
203                    dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
204                    for (_, db_path) in dirs {
205                        // If db checkpoint marked as completed, skip
206                        let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
207                        let upload_completed_path = local_db_path.join(UPLOAD_COMPLETED_MARKER);
208                        if upload_completed_path.exists() {
209                            continue;
210                        }
211                        let bytes = Bytes::from_static(b"success");
212                        let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
213                        put(&self.input_object_store,
214                            &upload_completed_marker,
215                            bytes.clone(),
216                        )
217                        .await?;
218                    }
219                },
220                 _ = recv.recv() => break,
221            }
222        }
223        Ok(())
224    }
225    async fn run_db_checkpoint_gc_loop(
226        self: Arc<Self>,
227        mut recv: tokio::sync::broadcast::Receiver<()>,
228    ) -> Result<()> {
229        let mut gc_interval = tokio::time::interval(Duration::from_secs(30));
230        info!("DB checkpoint garbage collection loop started");
231        loop {
232            tokio::select! {
233                _now = gc_interval.tick() => {
234                    if let Ok(deleted) = self.garbage_collect_old_db_checkpoints().await
235                        && !deleted.is_empty() {
236                            info!("Garbage collected local db checkpoints: {:?}", deleted);
237                        }
238                },
239                 _ = recv.recv() => break,
240            }
241        }
242        Ok(())
243    }
244
245    async fn prune_and_compact(
246        &self,
247        db_path: PathBuf,
248        epoch: u64,
249        epoch_duration_ms: u64,
250    ) -> Result<()> {
251        let perpetual_db = Arc::new(AuthorityPerpetualTables::open(
252            &db_path.join("store"),
253            None,
254            None,
255        ));
256        let checkpoint_store = Arc::new(CheckpointStore::new_for_db_checkpoint_handler(
257            &db_path.join("checkpoints"),
258        ));
259        let rpc_index = RpcIndexStore::new_without_init(&db_path);
260        let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
261        info!(
262            "Pruning db checkpoint in {:?} for epoch: {epoch}",
263            db_path.display()
264        );
265        AuthorityStorePruner::prune_objects_for_eligible_epochs(
266            &perpetual_db,
267            &checkpoint_store,
268            Some(&rpc_index),
269            self.pruning_config.clone(),
270            metrics,
271            epoch_duration_ms,
272        )
273        .await?;
274        info!(
275            "Compacting db checkpoint in {:?} for epoch: {epoch}",
276            db_path.display()
277        );
278        AuthorityStorePruner::compact(&perpetual_db)?;
279        Ok(())
280    }
281    async fn upload_db_checkpoints_to_object_store(
282        &self,
283        missing_epochs: Vec<u64>,
284    ) -> Result<(), anyhow::Error> {
285        let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
286        let local_checkpoints_by_epoch =
287            find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
288        let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
289        dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
290        let object_store = self
291            .output_object_store
292            .as_ref()
293            .expect("Expected object store to exist")
294            .clone();
295        for (epoch, db_path) in dirs {
296            // Convert `db_path` to the local filesystem path to where db checkpoint is stored
297            let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
298            if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
299                if self.state_snapshot_enabled {
300                    let snapshot_completed_marker =
301                        local_db_path.join(STATE_SNAPSHOT_COMPLETED_MARKER);
302                    if !snapshot_completed_marker.exists() {
303                        info!(
304                            "DB checkpoint upload for epoch {} to wait until state snasphot uploaded",
305                            *epoch
306                        );
307                        continue;
308                    }
309                }
310
311                if self.prune_and_compact_before_upload {
312                    // Invoke pruning and compaction on the db checkpoint
313                    self.prune_and_compact(local_db_path, *epoch, EPOCH_DURATION_MS_FOR_TESTING)
314                        .await?;
315                }
316
317                info!("Copying db checkpoint for epoch: {epoch} to remote storage");
318                copy_recursively(
319                    db_path,
320                    &self.input_object_store,
321                    &object_store,
322                    NonZeroUsize::new(20).unwrap(),
323                )
324                .await?;
325
326                // This writes a single "MANIFEST" file which contains a list of all files that make up a db snapshot
327                write_snapshot_manifest(db_path, &object_store, format!("epoch_{}/", epoch))
328                    .await?;
329                // Drop marker in the output directory that upload completed successfully
330                let bytes = Bytes::from_static(b"success");
331                let success_marker = db_path.child(SUCCESS_MARKER);
332                put(&object_store, &success_marker, bytes.clone()).await?;
333            }
334            let bytes = Bytes::from_static(b"success");
335            let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
336            put(
337                &self.input_object_store,
338                &upload_completed_marker,
339                bytes.clone(),
340            )
341            .await?;
342        }
343        Ok(())
344    }
345
346    async fn garbage_collect_old_db_checkpoints(&self) -> Result<Vec<u64>> {
347        let local_checkpoints_by_epoch =
348            find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
349        let mut deleted = Vec::new();
350        for (epoch, path) in local_checkpoints_by_epoch.iter() {
351            let marker_paths: Vec<Path> = self
352                .gc_markers
353                .iter()
354                .map(|marker| path.child(marker.clone()))
355                .collect();
356            let all_markers_present = try_join_all(
357                marker_paths
358                    .iter()
359                    .map(|path| self.input_object_store.get(path)),
360            )
361            .await;
362            match all_markers_present {
363                // After state snapshots, gc will also need to wait for a state snapshot
364                // upload completed marker
365                Ok(_) => {
366                    info!("Deleting db checkpoint dir: {path} for epoch: {epoch}");
367                    deleted.push(*epoch);
368                    let local_fs_path = path_to_filesystem(self.input_root_path.clone(), path)?;
369                    fs::remove_dir_all(&local_fs_path)?;
370                }
371                Err(_) => {
372                    debug!("Not ready for deletion yet: {path}");
373                }
374            }
375        }
376        Ok(deleted)
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    use crate::db_checkpoint_handler::{
383        DBCheckpointHandler, SUCCESS_MARKER, TEST_MARKER, UPLOAD_COMPLETED_MARKER,
384    };
385    use itertools::Itertools;
386    use std::fs;
387    use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
388    use sui_storage::object_store::util::{
389        find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs, path_to_filesystem,
390    };
391    use tempfile::TempDir;
392
393    #[tokio::test]
394    async fn test_basic() -> anyhow::Result<()> {
395        let checkpoint_dir = TempDir::new()?;
396        let checkpoint_dir_path = checkpoint_dir.path();
397        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
398        fs::create_dir(&local_epoch0_checkpoint)?;
399        let file1 = local_epoch0_checkpoint.join("file1");
400        fs::write(file1, b"Lorem ipsum")?;
401        let file2 = local_epoch0_checkpoint.join("file2");
402        fs::write(file2, b"Lorem ipsum")?;
403        let nested_dir = local_epoch0_checkpoint.join("data");
404        fs::create_dir(&nested_dir)?;
405        let file3 = nested_dir.join("file3");
406        fs::write(file3, b"Lorem ipsum")?;
407
408        let remote_checkpoint_dir = TempDir::new()?;
409        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
410        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
411
412        let input_store_config = ObjectStoreConfig {
413            object_store: Some(ObjectStoreType::File),
414            directory: Some(checkpoint_dir_path.to_path_buf()),
415            ..Default::default()
416        };
417        let output_store_config = ObjectStoreConfig {
418            object_store: Some(ObjectStoreType::File),
419            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
420            ..Default::default()
421        };
422        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
423            &input_store_config,
424            Some(&output_store_config),
425            10,
426            false,
427            false,
428        )?;
429        let local_checkpoints_by_epoch =
430            find_all_dirs_with_epoch_prefix(&db_checkpoint_handler.input_object_store, None)
431                .await?;
432        assert!(!local_checkpoints_by_epoch.is_empty());
433        assert_eq!(*local_checkpoints_by_epoch.first_key_value().unwrap().0, 0);
434        assert_eq!(
435            path_to_filesystem(
436                db_checkpoint_handler.input_root_path.clone(),
437                local_checkpoints_by_epoch.first_key_value().unwrap().1
438            )
439            .unwrap(),
440            std::fs::canonicalize(local_epoch0_checkpoint.clone()).unwrap()
441        );
442        let missing_epochs = find_missing_epochs_dirs(
443            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
444            SUCCESS_MARKER,
445        )
446        .await?;
447        db_checkpoint_handler
448            .upload_db_checkpoints_to_object_store(missing_epochs)
449            .await?;
450
451        assert!(remote_epoch0_checkpoint.join("file1").exists());
452        assert!(remote_epoch0_checkpoint.join("file2").exists());
453        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
454        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
455        assert!(
456            local_epoch0_checkpoint
457                .join(UPLOAD_COMPLETED_MARKER)
458                .exists()
459        );
460
461        // Drop an extra gc marker meant only for gc to trigger
462        let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
463        fs::write(test_marker, b"Lorem ipsum")?;
464        db_checkpoint_handler
465            .garbage_collect_old_db_checkpoints()
466            .await?;
467
468        assert!(!local_epoch0_checkpoint.join("file1").exists());
469        assert!(!local_epoch0_checkpoint.join("file1").exists());
470        assert!(!local_epoch0_checkpoint.join("file2").exists());
471        assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
472        Ok(())
473    }
474
475    #[tokio::test]
476    async fn test_upload_resumes() -> anyhow::Result<()> {
477        let checkpoint_dir = TempDir::new()?;
478        let checkpoint_dir_path = checkpoint_dir.path();
479        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
480
481        let remote_checkpoint_dir = TempDir::new()?;
482        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
483        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
484
485        let input_store_config = ObjectStoreConfig {
486            object_store: Some(ObjectStoreType::File),
487            directory: Some(checkpoint_dir_path.to_path_buf()),
488            ..Default::default()
489        };
490        let output_store_config = ObjectStoreConfig {
491            object_store: Some(ObjectStoreType::File),
492            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
493            ..Default::default()
494        };
495        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
496            &input_store_config,
497            Some(&output_store_config),
498            10,
499            false,
500            false,
501        )?;
502
503        fs::create_dir(&local_epoch0_checkpoint)?;
504        let file1 = local_epoch0_checkpoint.join("file1");
505        fs::write(file1, b"Lorem ipsum")?;
506        let file2 = local_epoch0_checkpoint.join("file2");
507        fs::write(file2, b"Lorem ipsum")?;
508        let nested_dir = local_epoch0_checkpoint.join("data");
509        fs::create_dir(&nested_dir)?;
510        let file3 = nested_dir.join("file3");
511        fs::write(file3, b"Lorem ipsum")?;
512
513        let missing_epochs = find_missing_epochs_dirs(
514            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
515            SUCCESS_MARKER,
516        )
517        .await?;
518        db_checkpoint_handler
519            .upload_db_checkpoints_to_object_store(missing_epochs)
520            .await?;
521        assert!(remote_epoch0_checkpoint.join("file1").exists());
522        assert!(remote_epoch0_checkpoint.join("file2").exists());
523        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
524        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
525        assert!(
526            local_epoch0_checkpoint
527                .join(UPLOAD_COMPLETED_MARKER)
528                .exists()
529        );
530
531        // Add a new db checkpoint to the local checkpoint directory
532        let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
533        fs::create_dir(&local_epoch1_checkpoint)?;
534        let file1 = local_epoch1_checkpoint.join("file1");
535        fs::write(file1, b"Lorem ipsum")?;
536        let file2 = local_epoch1_checkpoint.join("file2");
537        fs::write(file2, b"Lorem ipsum")?;
538        let nested_dir = local_epoch1_checkpoint.join("data");
539        fs::create_dir(&nested_dir)?;
540        let file3 = nested_dir.join("file3");
541        fs::write(file3, b"Lorem ipsum")?;
542
543        // Now delete the success marker from remote checkpointed directory
544        // This is the scenario where uploads stops mid way because system stopped
545        fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
546
547        // Checkpoint handler should copy checkpoint for epoch_0 first before copying
548        // epoch_1
549        let missing_epochs = find_missing_epochs_dirs(
550            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
551            SUCCESS_MARKER,
552        )
553        .await?;
554        db_checkpoint_handler
555            .upload_db_checkpoints_to_object_store(missing_epochs)
556            .await?;
557        assert!(remote_epoch0_checkpoint.join("file1").exists());
558        assert!(remote_epoch0_checkpoint.join("file2").exists());
559        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
560        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
561        assert!(
562            local_epoch0_checkpoint
563                .join(UPLOAD_COMPLETED_MARKER)
564                .exists()
565        );
566
567        let remote_epoch1_checkpoint = remote_checkpoint_dir_path.join("epoch_1");
568        assert!(remote_epoch1_checkpoint.join("file1").exists());
569        assert!(remote_epoch1_checkpoint.join("file2").exists());
570        assert!(remote_epoch1_checkpoint.join("data").join("file3").exists());
571        assert!(remote_epoch1_checkpoint.join(SUCCESS_MARKER).exists());
572        assert!(
573            local_epoch1_checkpoint
574                .join(UPLOAD_COMPLETED_MARKER)
575                .exists()
576        );
577
578        // Drop an extra gc marker meant only for gc to trigger
579        let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
580        fs::write(test_marker, b"Lorem ipsum")?;
581        let test_marker = local_epoch1_checkpoint.join(TEST_MARKER);
582        fs::write(test_marker, b"Lorem ipsum")?;
583
584        db_checkpoint_handler
585            .garbage_collect_old_db_checkpoints()
586            .await?;
587        assert!(!local_epoch0_checkpoint.join("file1").exists());
588        assert!(!local_epoch0_checkpoint.join("file1").exists());
589        assert!(!local_epoch0_checkpoint.join("file2").exists());
590        assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
591        assert!(!local_epoch1_checkpoint.join("file1").exists());
592        assert!(!local_epoch1_checkpoint.join("file1").exists());
593        assert!(!local_epoch1_checkpoint.join("file2").exists());
594        assert!(!local_epoch1_checkpoint.join("data").join("file3").exists());
595        Ok(())
596    }
597
598    #[tokio::test]
599    async fn test_missing_epochs() -> anyhow::Result<()> {
600        let checkpoint_dir = TempDir::new()?;
601        let checkpoint_dir_path = checkpoint_dir.path();
602        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
603        fs::create_dir(&local_epoch0_checkpoint)?;
604        let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
605        fs::create_dir(&local_epoch1_checkpoint)?;
606        // Missing epoch 2
607        let local_epoch3_checkpoint = checkpoint_dir_path.join("epoch_3");
608        fs::create_dir(&local_epoch3_checkpoint)?;
609        let remote_checkpoint_dir = TempDir::new()?;
610        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
611
612        let input_store_config = ObjectStoreConfig {
613            object_store: Some(ObjectStoreType::File),
614            directory: Some(checkpoint_dir_path.to_path_buf()),
615            ..Default::default()
616        };
617
618        let output_store_config = ObjectStoreConfig {
619            object_store: Some(ObjectStoreType::File),
620            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
621            ..Default::default()
622        };
623        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
624            &input_store_config,
625            Some(&output_store_config),
626            10,
627            false,
628            false,
629        )?;
630
631        let missing_epochs = find_missing_epochs_dirs(
632            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
633            SUCCESS_MARKER,
634        )
635        .await?;
636        db_checkpoint_handler
637            .upload_db_checkpoints_to_object_store(missing_epochs)
638            .await?;
639
640        let first_missing_epoch = find_missing_epochs_dirs(
641            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
642            SUCCESS_MARKER,
643        )
644        .await?
645        .first()
646        .cloned()
647        .unwrap();
648        assert_eq!(first_missing_epoch, 2);
649
650        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
651        fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
652
653        let first_missing_epoch = find_missing_epochs_dirs(
654            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
655            SUCCESS_MARKER,
656        )
657        .await?
658        .first()
659        .cloned()
660        .unwrap();
661        assert_eq!(first_missing_epoch, 0);
662
663        Ok(())
664    }
665
666    #[tokio::test]
667    async fn test_range_missing_epochs() -> anyhow::Result<()> {
668        let checkpoint_dir = TempDir::new()?;
669        let checkpoint_dir_path = checkpoint_dir.path();
670        let local_epoch100_checkpoint = checkpoint_dir_path.join("epoch_100");
671        fs::create_dir(&local_epoch100_checkpoint)?;
672        let local_epoch200_checkpoint = checkpoint_dir_path.join("epoch_200");
673        fs::create_dir(&local_epoch200_checkpoint)?;
674        let remote_checkpoint_dir = TempDir::new()?;
675        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
676
677        let input_store_config = ObjectStoreConfig {
678            object_store: Some(ObjectStoreType::File),
679            directory: Some(checkpoint_dir_path.to_path_buf()),
680            ..Default::default()
681        };
682
683        let output_store_config = ObjectStoreConfig {
684            object_store: Some(ObjectStoreType::File),
685            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
686            ..Default::default()
687        };
688        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
689            &input_store_config,
690            Some(&output_store_config),
691            10,
692            false,
693            false,
694        )?;
695
696        let missing_epochs = find_missing_epochs_dirs(
697            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
698            SUCCESS_MARKER,
699        )
700        .await?;
701        assert_eq!(missing_epochs, vec![0]);
702        db_checkpoint_handler
703            .upload_db_checkpoints_to_object_store(missing_epochs)
704            .await?;
705
706        let missing_epochs = find_missing_epochs_dirs(
707            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
708            SUCCESS_MARKER,
709        )
710        .await?;
711        let mut expected_missing_epochs: Vec<u64> = (0..100).collect();
712        expected_missing_epochs.extend((101..200).collect_vec().iter());
713        expected_missing_epochs.push(201);
714        assert_eq!(missing_epochs, expected_missing_epochs);
715        Ok(())
716    }
717}