sui_core/
db_checkpoint_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_store_pruner::{
5    AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
6};
7use crate::authority::authority_store_tables::AuthorityPerpetualTables;
8use crate::checkpoints::CheckpointStore;
9use crate::rpc_index::RpcIndexStore;
10use anyhow::Result;
11use bytes::Bytes;
12use futures::future::try_join_all;
13use object_store::DynObjectStore;
14use object_store::path::Path;
15use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
16use std::fs;
17use std::num::NonZeroUsize;
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::time::Duration;
21use sui_config::node::AuthorityStorePruningConfig;
22use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
23use sui_storage::object_store::util::{
24    copy_recursively, find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs,
25    path_to_filesystem, put, run_manifest_update_loop, write_snapshot_manifest,
26};
27use tracing::{debug, error, info};
28
29pub const SUCCESS_MARKER: &str = "_SUCCESS";
30pub const TEST_MARKER: &str = "_TEST";
31pub const UPLOAD_COMPLETED_MARKER: &str = "_UPLOAD_COMPLETED";
32pub const STATE_SNAPSHOT_COMPLETED_MARKER: &str = "_STATE_SNAPSHOT_COMPLETED";
33
34pub struct DBCheckpointMetrics {
35    pub first_missing_db_checkpoint_epoch: IntGauge,
36    pub num_local_db_checkpoints: IntGauge,
37}
38
39impl DBCheckpointMetrics {
40    pub fn new(registry: &Registry) -> Arc<Self> {
41        let this = Self {
42            first_missing_db_checkpoint_epoch: register_int_gauge_with_registry!(
43                "first_missing_db_checkpoint_epoch",
44                "First epoch for which we have no db checkpoint in remote store",
45                registry
46            )
47            .unwrap(),
48            num_local_db_checkpoints: register_int_gauge_with_registry!(
49                "num_local_db_checkpoints",
50                "Number of RocksDB checkpoints currently residing on local disk (i.e. not yet garbage collected)",
51                registry
52            )
53            .unwrap(),
54        };
55        Arc::new(this)
56    }
57}
58
59pub struct DBCheckpointHandler {
60    /// Directory on local disk where db checkpoints are stored
61    input_object_store: Arc<DynObjectStore>,
62    /// DB checkpoint directory on local filesystem
63    input_root_path: PathBuf,
64    /// Bucket on cloud object store where db checkpoints will be copied
65    output_object_store: Option<Arc<DynObjectStore>>,
66    /// Time interval to check for presence of new db checkpoint
67    interval: Duration,
68    /// File markers which signal that local db checkpoint can be garbage collected
69    gc_markers: Vec<String>,
70    /// Boolean flag to enable/disable object pruning and manual compaction before upload
71    prune_and_compact_before_upload: bool,
72    /// If true, upload will block on state snapshot upload completed marker
73    state_snapshot_enabled: bool,
74    /// Pruning objects
75    pruning_config: AuthorityStorePruningConfig,
76    metrics: Arc<DBCheckpointMetrics>,
77}
78
79impl DBCheckpointHandler {
80    pub fn new(
81        input_path: &std::path::Path,
82        output_object_store_config: Option<&ObjectStoreConfig>,
83        interval_s: u64,
84        prune_and_compact_before_upload: bool,
85        pruning_config: AuthorityStorePruningConfig,
86        registry: &Registry,
87        state_snapshot_enabled: bool,
88    ) -> Result<Arc<Self>> {
89        let input_store_config = ObjectStoreConfig {
90            object_store: Some(ObjectStoreType::File),
91            directory: Some(input_path.to_path_buf()),
92            ..Default::default()
93        };
94        let mut gc_markers = vec![UPLOAD_COMPLETED_MARKER.to_string()];
95        if state_snapshot_enabled {
96            gc_markers.push(STATE_SNAPSHOT_COMPLETED_MARKER.to_string());
97        }
98        Ok(Arc::new(DBCheckpointHandler {
99            input_object_store: input_store_config.make()?,
100            input_root_path: input_path.to_path_buf(),
101            output_object_store: output_object_store_config
102                .map(|config| config.make().expect("Failed to make object store")),
103            interval: Duration::from_secs(interval_s),
104            gc_markers,
105            prune_and_compact_before_upload,
106            state_snapshot_enabled,
107            pruning_config,
108            metrics: DBCheckpointMetrics::new(registry),
109        }))
110    }
111    pub fn new_for_test(
112        input_object_store_config: &ObjectStoreConfig,
113        output_object_store_config: Option<&ObjectStoreConfig>,
114        interval_s: u64,
115        prune_and_compact_before_upload: bool,
116        state_snapshot_enabled: bool,
117    ) -> Result<Arc<Self>> {
118        Ok(Arc::new(DBCheckpointHandler {
119            input_object_store: input_object_store_config.make()?,
120            input_root_path: input_object_store_config
121                .directory
122                .as_ref()
123                .unwrap()
124                .clone(),
125            output_object_store: output_object_store_config
126                .map(|config| config.make().expect("Failed to make object store")),
127            interval: Duration::from_secs(interval_s),
128            gc_markers: vec![UPLOAD_COMPLETED_MARKER.to_string(), TEST_MARKER.to_string()],
129            prune_and_compact_before_upload,
130            state_snapshot_enabled,
131            pruning_config: AuthorityStorePruningConfig::default(),
132            metrics: DBCheckpointMetrics::new(&Registry::default()),
133        }))
134    }
135    pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
136        let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
137        if self.output_object_store.is_some() {
138            tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
139                self.clone(),
140                kill_sender.subscribe(),
141            ));
142            tokio::task::spawn(run_manifest_update_loop(
143                self.output_object_store.as_ref().unwrap().clone(),
144                kill_sender.subscribe(),
145            ));
146        } else {
147            // if db checkpoint remote store is not specified, cleanup loop
148            // is run to immediately mark db checkpoint upload as successful
149            // so that they can be snapshotted and garbage collected
150            tokio::task::spawn(Self::run_db_checkpoint_cleanup_loop(
151                self.clone(),
152                kill_sender.subscribe(),
153            ));
154        }
155        tokio::task::spawn(Self::run_db_checkpoint_gc_loop(
156            self,
157            kill_sender.subscribe(),
158        ));
159        kill_sender
160    }
161    async fn run_db_checkpoint_upload_loop(
162        self: Arc<Self>,
163        mut recv: tokio::sync::broadcast::Receiver<()>,
164    ) -> Result<()> {
165        let mut interval = tokio::time::interval(self.interval);
166        info!("DB checkpoint upload loop started");
167        loop {
168            tokio::select! {
169                _now = interval.tick() => {
170                    let local_checkpoints_by_epoch =
171                        find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
172                    self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
173                    match find_missing_epochs_dirs(self.output_object_store.as_ref().unwrap(), SUCCESS_MARKER).await {
174                        Ok(epochs) => {
175                            self.metrics.first_missing_db_checkpoint_epoch.set(epochs.first().cloned().unwrap_or(0) as i64);
176                            if let Err(err) = self.upload_db_checkpoints_to_object_store(epochs).await {
177                                error!("Failed to upload db checkpoint to remote store with err: {:?}", err);
178                            }
179                        }
180                        Err(err) => {
181                            error!("Failed to find missing db checkpoints in remote store: {:?}", err);
182                        }
183                    }
184                },
185                 _ = recv.recv() => break,
186            }
187        }
188        Ok(())
189    }
190    async fn run_db_checkpoint_cleanup_loop(
191        self: Arc<Self>,
192        mut recv: tokio::sync::broadcast::Receiver<()>,
193    ) -> Result<()> {
194        let mut interval = tokio::time::interval(self.interval);
195        info!("DB checkpoint upload disabled. DB checkpoint cleanup loop started");
196        loop {
197            tokio::select! {
198                _now = interval.tick() => {
199                    let local_checkpoints_by_epoch =
200                        find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
201                    self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
202                    let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
203                    dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
204                    for (_, db_path) in dirs {
205                        // If db checkpoint marked as completed, skip
206                        let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
207                        let upload_completed_path = local_db_path.join(UPLOAD_COMPLETED_MARKER);
208                        if upload_completed_path.exists() {
209                            continue;
210                        }
211                        let bytes = Bytes::from_static(b"success");
212                        let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
213                        put(&self.input_object_store,
214                            &upload_completed_marker,
215                            bytes.clone(),
216                        )
217                        .await?;
218                    }
219                },
220                 _ = recv.recv() => break,
221            }
222        }
223        Ok(())
224    }
225    async fn run_db_checkpoint_gc_loop(
226        self: Arc<Self>,
227        mut recv: tokio::sync::broadcast::Receiver<()>,
228    ) -> Result<()> {
229        let mut gc_interval = tokio::time::interval(Duration::from_secs(30));
230        info!("DB checkpoint garbage collection loop started");
231        loop {
232            tokio::select! {
233                _now = gc_interval.tick() => {
234                    if let Ok(deleted) = self.garbage_collect_old_db_checkpoints().await
235                        && !deleted.is_empty() {
236                            info!("Garbage collected local db checkpoints: {:?}", deleted);
237                        }
238                },
239                 _ = recv.recv() => break,
240            }
241        }
242        Ok(())
243    }
244
245    async fn prune_and_compact(
246        &self,
247        db_path: PathBuf,
248        epoch: u64,
249        epoch_duration_ms: u64,
250    ) -> Result<()> {
251        let perpetual_db = Arc::new(AuthorityPerpetualTables::open(
252            &db_path.join("store"),
253            None,
254            None,
255        ));
256        let checkpoint_store = Arc::new(CheckpointStore::new_for_db_checkpoint_handler(
257            &db_path.join("checkpoints"),
258        ));
259        let rpc_index = RpcIndexStore::new_without_init(&db_path);
260        let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
261        info!(
262            "Pruning db checkpoint in {:?} for epoch: {epoch}",
263            db_path.display()
264        );
265        AuthorityStorePruner::prune_objects_for_eligible_epochs(
266            &perpetual_db,
267            &checkpoint_store,
268            Some(&rpc_index),
269            None,
270            self.pruning_config.clone(),
271            metrics,
272            epoch_duration_ms,
273        )
274        .await?;
275        info!(
276            "Compacting db checkpoint in {:?} for epoch: {epoch}",
277            db_path.display()
278        );
279        AuthorityStorePruner::compact(&perpetual_db)?;
280        Ok(())
281    }
282    async fn upload_db_checkpoints_to_object_store(
283        &self,
284        missing_epochs: Vec<u64>,
285    ) -> Result<(), anyhow::Error> {
286        let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
287        let local_checkpoints_by_epoch =
288            find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
289        let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
290        dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
291        let object_store = self
292            .output_object_store
293            .as_ref()
294            .expect("Expected object store to exist")
295            .clone();
296        for (epoch, db_path) in dirs {
297            // Convert `db_path` to the local filesystem path to where db checkpoint is stored
298            let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
299            if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
300                if self.state_snapshot_enabled {
301                    let snapshot_completed_marker =
302                        local_db_path.join(STATE_SNAPSHOT_COMPLETED_MARKER);
303                    if !snapshot_completed_marker.exists() {
304                        info!(
305                            "DB checkpoint upload for epoch {} to wait until state snasphot uploaded",
306                            *epoch
307                        );
308                        continue;
309                    }
310                }
311
312                if self.prune_and_compact_before_upload {
313                    // Invoke pruning and compaction on the db checkpoint
314                    self.prune_and_compact(local_db_path, *epoch, EPOCH_DURATION_MS_FOR_TESTING)
315                        .await?;
316                }
317
318                info!("Copying db checkpoint for epoch: {epoch} to remote storage");
319                copy_recursively(
320                    db_path,
321                    &self.input_object_store,
322                    &object_store,
323                    NonZeroUsize::new(20).unwrap(),
324                )
325                .await?;
326
327                // This writes a single "MANIFEST" file which contains a list of all files that make up a db snapshot
328                write_snapshot_manifest(db_path, &object_store, format!("epoch_{}/", epoch))
329                    .await?;
330                // Drop marker in the output directory that upload completed successfully
331                let bytes = Bytes::from_static(b"success");
332                let success_marker = db_path.child(SUCCESS_MARKER);
333                put(&object_store, &success_marker, bytes.clone()).await?;
334            }
335            let bytes = Bytes::from_static(b"success");
336            let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
337            put(
338                &self.input_object_store,
339                &upload_completed_marker,
340                bytes.clone(),
341            )
342            .await?;
343        }
344        Ok(())
345    }
346
347    async fn garbage_collect_old_db_checkpoints(&self) -> Result<Vec<u64>> {
348        let local_checkpoints_by_epoch =
349            find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
350        let mut deleted = Vec::new();
351        for (epoch, path) in local_checkpoints_by_epoch.iter() {
352            let marker_paths: Vec<Path> = self
353                .gc_markers
354                .iter()
355                .map(|marker| path.child(marker.clone()))
356                .collect();
357            let all_markers_present = try_join_all(
358                marker_paths
359                    .iter()
360                    .map(|path| self.input_object_store.get(path)),
361            )
362            .await;
363            match all_markers_present {
364                // After state snapshots, gc will also need to wait for a state snapshot
365                // upload completed marker
366                Ok(_) => {
367                    info!("Deleting db checkpoint dir: {path} for epoch: {epoch}");
368                    deleted.push(*epoch);
369                    let local_fs_path = path_to_filesystem(self.input_root_path.clone(), path)?;
370                    fs::remove_dir_all(&local_fs_path)?;
371                }
372                Err(_) => {
373                    debug!("Not ready for deletion yet: {path}");
374                }
375            }
376        }
377        Ok(deleted)
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use crate::db_checkpoint_handler::{
384        DBCheckpointHandler, SUCCESS_MARKER, TEST_MARKER, UPLOAD_COMPLETED_MARKER,
385    };
386    use itertools::Itertools;
387    use std::fs;
388    use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
389    use sui_storage::object_store::util::{
390        find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs, path_to_filesystem,
391    };
392    use tempfile::TempDir;
393
394    #[tokio::test]
395    async fn test_basic() -> anyhow::Result<()> {
396        let checkpoint_dir = TempDir::new()?;
397        let checkpoint_dir_path = checkpoint_dir.path();
398        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
399        fs::create_dir(&local_epoch0_checkpoint)?;
400        let file1 = local_epoch0_checkpoint.join("file1");
401        fs::write(file1, b"Lorem ipsum")?;
402        let file2 = local_epoch0_checkpoint.join("file2");
403        fs::write(file2, b"Lorem ipsum")?;
404        let nested_dir = local_epoch0_checkpoint.join("data");
405        fs::create_dir(&nested_dir)?;
406        let file3 = nested_dir.join("file3");
407        fs::write(file3, b"Lorem ipsum")?;
408
409        let remote_checkpoint_dir = TempDir::new()?;
410        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
411        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
412
413        let input_store_config = ObjectStoreConfig {
414            object_store: Some(ObjectStoreType::File),
415            directory: Some(checkpoint_dir_path.to_path_buf()),
416            ..Default::default()
417        };
418        let output_store_config = ObjectStoreConfig {
419            object_store: Some(ObjectStoreType::File),
420            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
421            ..Default::default()
422        };
423        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
424            &input_store_config,
425            Some(&output_store_config),
426            10,
427            false,
428            false,
429        )?;
430        let local_checkpoints_by_epoch =
431            find_all_dirs_with_epoch_prefix(&db_checkpoint_handler.input_object_store, None)
432                .await?;
433        assert!(!local_checkpoints_by_epoch.is_empty());
434        assert_eq!(*local_checkpoints_by_epoch.first_key_value().unwrap().0, 0);
435        assert_eq!(
436            path_to_filesystem(
437                db_checkpoint_handler.input_root_path.clone(),
438                local_checkpoints_by_epoch.first_key_value().unwrap().1
439            )
440            .unwrap(),
441            std::fs::canonicalize(local_epoch0_checkpoint.clone()).unwrap()
442        );
443        let missing_epochs = find_missing_epochs_dirs(
444            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
445            SUCCESS_MARKER,
446        )
447        .await?;
448        db_checkpoint_handler
449            .upload_db_checkpoints_to_object_store(missing_epochs)
450            .await?;
451
452        assert!(remote_epoch0_checkpoint.join("file1").exists());
453        assert!(remote_epoch0_checkpoint.join("file2").exists());
454        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
455        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
456        assert!(
457            local_epoch0_checkpoint
458                .join(UPLOAD_COMPLETED_MARKER)
459                .exists()
460        );
461
462        // Drop an extra gc marker meant only for gc to trigger
463        let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
464        fs::write(test_marker, b"Lorem ipsum")?;
465        db_checkpoint_handler
466            .garbage_collect_old_db_checkpoints()
467            .await?;
468
469        assert!(!local_epoch0_checkpoint.join("file1").exists());
470        assert!(!local_epoch0_checkpoint.join("file1").exists());
471        assert!(!local_epoch0_checkpoint.join("file2").exists());
472        assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
473        Ok(())
474    }
475
476    #[tokio::test]
477    async fn test_upload_resumes() -> anyhow::Result<()> {
478        let checkpoint_dir = TempDir::new()?;
479        let checkpoint_dir_path = checkpoint_dir.path();
480        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
481
482        let remote_checkpoint_dir = TempDir::new()?;
483        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
484        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
485
486        let input_store_config = ObjectStoreConfig {
487            object_store: Some(ObjectStoreType::File),
488            directory: Some(checkpoint_dir_path.to_path_buf()),
489            ..Default::default()
490        };
491        let output_store_config = ObjectStoreConfig {
492            object_store: Some(ObjectStoreType::File),
493            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
494            ..Default::default()
495        };
496        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
497            &input_store_config,
498            Some(&output_store_config),
499            10,
500            false,
501            false,
502        )?;
503
504        fs::create_dir(&local_epoch0_checkpoint)?;
505        let file1 = local_epoch0_checkpoint.join("file1");
506        fs::write(file1, b"Lorem ipsum")?;
507        let file2 = local_epoch0_checkpoint.join("file2");
508        fs::write(file2, b"Lorem ipsum")?;
509        let nested_dir = local_epoch0_checkpoint.join("data");
510        fs::create_dir(&nested_dir)?;
511        let file3 = nested_dir.join("file3");
512        fs::write(file3, b"Lorem ipsum")?;
513
514        let missing_epochs = find_missing_epochs_dirs(
515            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
516            SUCCESS_MARKER,
517        )
518        .await?;
519        db_checkpoint_handler
520            .upload_db_checkpoints_to_object_store(missing_epochs)
521            .await?;
522        assert!(remote_epoch0_checkpoint.join("file1").exists());
523        assert!(remote_epoch0_checkpoint.join("file2").exists());
524        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
525        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
526        assert!(
527            local_epoch0_checkpoint
528                .join(UPLOAD_COMPLETED_MARKER)
529                .exists()
530        );
531
532        // Add a new db checkpoint to the local checkpoint directory
533        let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
534        fs::create_dir(&local_epoch1_checkpoint)?;
535        let file1 = local_epoch1_checkpoint.join("file1");
536        fs::write(file1, b"Lorem ipsum")?;
537        let file2 = local_epoch1_checkpoint.join("file2");
538        fs::write(file2, b"Lorem ipsum")?;
539        let nested_dir = local_epoch1_checkpoint.join("data");
540        fs::create_dir(&nested_dir)?;
541        let file3 = nested_dir.join("file3");
542        fs::write(file3, b"Lorem ipsum")?;
543
544        // Now delete the success marker from remote checkpointed directory
545        // This is the scenario where uploads stops mid way because system stopped
546        fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
547
548        // Checkpoint handler should copy checkpoint for epoch_0 first before copying
549        // epoch_1
550        let missing_epochs = find_missing_epochs_dirs(
551            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
552            SUCCESS_MARKER,
553        )
554        .await?;
555        db_checkpoint_handler
556            .upload_db_checkpoints_to_object_store(missing_epochs)
557            .await?;
558        assert!(remote_epoch0_checkpoint.join("file1").exists());
559        assert!(remote_epoch0_checkpoint.join("file2").exists());
560        assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
561        assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
562        assert!(
563            local_epoch0_checkpoint
564                .join(UPLOAD_COMPLETED_MARKER)
565                .exists()
566        );
567
568        let remote_epoch1_checkpoint = remote_checkpoint_dir_path.join("epoch_1");
569        assert!(remote_epoch1_checkpoint.join("file1").exists());
570        assert!(remote_epoch1_checkpoint.join("file2").exists());
571        assert!(remote_epoch1_checkpoint.join("data").join("file3").exists());
572        assert!(remote_epoch1_checkpoint.join(SUCCESS_MARKER).exists());
573        assert!(
574            local_epoch1_checkpoint
575                .join(UPLOAD_COMPLETED_MARKER)
576                .exists()
577        );
578
579        // Drop an extra gc marker meant only for gc to trigger
580        let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
581        fs::write(test_marker, b"Lorem ipsum")?;
582        let test_marker = local_epoch1_checkpoint.join(TEST_MARKER);
583        fs::write(test_marker, b"Lorem ipsum")?;
584
585        db_checkpoint_handler
586            .garbage_collect_old_db_checkpoints()
587            .await?;
588        assert!(!local_epoch0_checkpoint.join("file1").exists());
589        assert!(!local_epoch0_checkpoint.join("file1").exists());
590        assert!(!local_epoch0_checkpoint.join("file2").exists());
591        assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
592        assert!(!local_epoch1_checkpoint.join("file1").exists());
593        assert!(!local_epoch1_checkpoint.join("file1").exists());
594        assert!(!local_epoch1_checkpoint.join("file2").exists());
595        assert!(!local_epoch1_checkpoint.join("data").join("file3").exists());
596        Ok(())
597    }
598
599    #[tokio::test]
600    async fn test_missing_epochs() -> anyhow::Result<()> {
601        let checkpoint_dir = TempDir::new()?;
602        let checkpoint_dir_path = checkpoint_dir.path();
603        let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
604        fs::create_dir(&local_epoch0_checkpoint)?;
605        let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
606        fs::create_dir(&local_epoch1_checkpoint)?;
607        // Missing epoch 2
608        let local_epoch3_checkpoint = checkpoint_dir_path.join("epoch_3");
609        fs::create_dir(&local_epoch3_checkpoint)?;
610        let remote_checkpoint_dir = TempDir::new()?;
611        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
612
613        let input_store_config = ObjectStoreConfig {
614            object_store: Some(ObjectStoreType::File),
615            directory: Some(checkpoint_dir_path.to_path_buf()),
616            ..Default::default()
617        };
618
619        let output_store_config = ObjectStoreConfig {
620            object_store: Some(ObjectStoreType::File),
621            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
622            ..Default::default()
623        };
624        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
625            &input_store_config,
626            Some(&output_store_config),
627            10,
628            false,
629            false,
630        )?;
631
632        let missing_epochs = find_missing_epochs_dirs(
633            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
634            SUCCESS_MARKER,
635        )
636        .await?;
637        db_checkpoint_handler
638            .upload_db_checkpoints_to_object_store(missing_epochs)
639            .await?;
640
641        let first_missing_epoch = find_missing_epochs_dirs(
642            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
643            SUCCESS_MARKER,
644        )
645        .await?
646        .first()
647        .cloned()
648        .unwrap();
649        assert_eq!(first_missing_epoch, 2);
650
651        let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
652        fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
653
654        let first_missing_epoch = find_missing_epochs_dirs(
655            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
656            SUCCESS_MARKER,
657        )
658        .await?
659        .first()
660        .cloned()
661        .unwrap();
662        assert_eq!(first_missing_epoch, 0);
663
664        Ok(())
665    }
666
667    #[tokio::test]
668    async fn test_range_missing_epochs() -> anyhow::Result<()> {
669        let checkpoint_dir = TempDir::new()?;
670        let checkpoint_dir_path = checkpoint_dir.path();
671        let local_epoch100_checkpoint = checkpoint_dir_path.join("epoch_100");
672        fs::create_dir(&local_epoch100_checkpoint)?;
673        let local_epoch200_checkpoint = checkpoint_dir_path.join("epoch_200");
674        fs::create_dir(&local_epoch200_checkpoint)?;
675        let remote_checkpoint_dir = TempDir::new()?;
676        let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
677
678        let input_store_config = ObjectStoreConfig {
679            object_store: Some(ObjectStoreType::File),
680            directory: Some(checkpoint_dir_path.to_path_buf()),
681            ..Default::default()
682        };
683
684        let output_store_config = ObjectStoreConfig {
685            object_store: Some(ObjectStoreType::File),
686            directory: Some(remote_checkpoint_dir_path.to_path_buf()),
687            ..Default::default()
688        };
689        let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
690            &input_store_config,
691            Some(&output_store_config),
692            10,
693            false,
694            false,
695        )?;
696
697        let missing_epochs = find_missing_epochs_dirs(
698            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
699            SUCCESS_MARKER,
700        )
701        .await?;
702        assert_eq!(missing_epochs, vec![0]);
703        db_checkpoint_handler
704            .upload_db_checkpoints_to_object_store(missing_epochs)
705            .await?;
706
707        let missing_epochs = find_missing_epochs_dirs(
708            db_checkpoint_handler.output_object_store.as_ref().unwrap(),
709            SUCCESS_MARKER,
710        )
711        .await?;
712        let mut expected_missing_epochs: Vec<u64> = (0..100).collect();
713        expected_missing_epochs.extend((101..200).collect_vec().iter());
714        expected_missing_epochs.push(201);
715        assert_eq!(missing_epochs, expected_missing_epochs);
716        Ok(())
717    }
718}