1use crate::authority::authority_store_pruner::{
5 AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
6};
7use crate::authority::authority_store_tables::AuthorityPerpetualTables;
8use crate::checkpoints::CheckpointStore;
9use crate::rpc_index::RpcIndexStore;
10use anyhow::Result;
11use bytes::Bytes;
12use futures::future::try_join_all;
13use object_store::DynObjectStore;
14use object_store::path::Path;
15use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
16use std::fs;
17use std::num::NonZeroUsize;
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::time::Duration;
21use sui_config::node::AuthorityStorePruningConfig;
22use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
23use sui_storage::object_store::util::{
24 copy_recursively, find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs,
25 path_to_filesystem, put, run_manifest_update_loop, write_snapshot_manifest,
26};
27use tracing::{debug, error, info};
28
29pub const SUCCESS_MARKER: &str = "_SUCCESS";
30pub const TEST_MARKER: &str = "_TEST";
31pub const UPLOAD_COMPLETED_MARKER: &str = "_UPLOAD_COMPLETED";
32pub const STATE_SNAPSHOT_COMPLETED_MARKER: &str = "_STATE_SNAPSHOT_COMPLETED";
33
34pub struct DBCheckpointMetrics {
35 pub first_missing_db_checkpoint_epoch: IntGauge,
36 pub num_local_db_checkpoints: IntGauge,
37}
38
39impl DBCheckpointMetrics {
40 pub fn new(registry: &Registry) -> Arc<Self> {
41 let this = Self {
42 first_missing_db_checkpoint_epoch: register_int_gauge_with_registry!(
43 "first_missing_db_checkpoint_epoch",
44 "First epoch for which we have no db checkpoint in remote store",
45 registry
46 )
47 .unwrap(),
48 num_local_db_checkpoints: register_int_gauge_with_registry!(
49 "num_local_db_checkpoints",
50 "Number of RocksDB checkpoints currently residing on local disk (i.e. not yet garbage collected)",
51 registry
52 )
53 .unwrap(),
54 };
55 Arc::new(this)
56 }
57}
58
59pub struct DBCheckpointHandler {
60 input_object_store: Arc<DynObjectStore>,
62 input_root_path: PathBuf,
64 output_object_store: Option<Arc<DynObjectStore>>,
66 interval: Duration,
68 gc_markers: Vec<String>,
70 prune_and_compact_before_upload: bool,
72 state_snapshot_enabled: bool,
74 pruning_config: AuthorityStorePruningConfig,
76 metrics: Arc<DBCheckpointMetrics>,
77}
78
79impl DBCheckpointHandler {
80 pub fn new(
81 input_path: &std::path::Path,
82 output_object_store_config: Option<&ObjectStoreConfig>,
83 interval_s: u64,
84 prune_and_compact_before_upload: bool,
85 pruning_config: AuthorityStorePruningConfig,
86 registry: &Registry,
87 state_snapshot_enabled: bool,
88 ) -> Result<Arc<Self>> {
89 let input_store_config = ObjectStoreConfig {
90 object_store: Some(ObjectStoreType::File),
91 directory: Some(input_path.to_path_buf()),
92 ..Default::default()
93 };
94 let mut gc_markers = vec![UPLOAD_COMPLETED_MARKER.to_string()];
95 if state_snapshot_enabled {
96 gc_markers.push(STATE_SNAPSHOT_COMPLETED_MARKER.to_string());
97 }
98 Ok(Arc::new(DBCheckpointHandler {
99 input_object_store: input_store_config.make()?,
100 input_root_path: input_path.to_path_buf(),
101 output_object_store: output_object_store_config
102 .map(|config| config.make().expect("Failed to make object store")),
103 interval: Duration::from_secs(interval_s),
104 gc_markers,
105 prune_and_compact_before_upload,
106 state_snapshot_enabled,
107 pruning_config,
108 metrics: DBCheckpointMetrics::new(registry),
109 }))
110 }
111 pub fn new_for_test(
112 input_object_store_config: &ObjectStoreConfig,
113 output_object_store_config: Option<&ObjectStoreConfig>,
114 interval_s: u64,
115 prune_and_compact_before_upload: bool,
116 state_snapshot_enabled: bool,
117 ) -> Result<Arc<Self>> {
118 Ok(Arc::new(DBCheckpointHandler {
119 input_object_store: input_object_store_config.make()?,
120 input_root_path: input_object_store_config
121 .directory
122 .as_ref()
123 .unwrap()
124 .clone(),
125 output_object_store: output_object_store_config
126 .map(|config| config.make().expect("Failed to make object store")),
127 interval: Duration::from_secs(interval_s),
128 gc_markers: vec![UPLOAD_COMPLETED_MARKER.to_string(), TEST_MARKER.to_string()],
129 prune_and_compact_before_upload,
130 state_snapshot_enabled,
131 pruning_config: AuthorityStorePruningConfig::default(),
132 metrics: DBCheckpointMetrics::new(&Registry::default()),
133 }))
134 }
135 pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
136 let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
137 if self.output_object_store.is_some() {
138 tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
139 self.clone(),
140 kill_sender.subscribe(),
141 ));
142 tokio::task::spawn(run_manifest_update_loop(
143 self.output_object_store.as_ref().unwrap().clone(),
144 kill_sender.subscribe(),
145 ));
146 } else {
147 tokio::task::spawn(Self::run_db_checkpoint_cleanup_loop(
151 self.clone(),
152 kill_sender.subscribe(),
153 ));
154 }
155 tokio::task::spawn(Self::run_db_checkpoint_gc_loop(
156 self,
157 kill_sender.subscribe(),
158 ));
159 kill_sender
160 }
161 async fn run_db_checkpoint_upload_loop(
162 self: Arc<Self>,
163 mut recv: tokio::sync::broadcast::Receiver<()>,
164 ) -> Result<()> {
165 let mut interval = tokio::time::interval(self.interval);
166 info!("DB checkpoint upload loop started");
167 loop {
168 tokio::select! {
169 _now = interval.tick() => {
170 let local_checkpoints_by_epoch =
171 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
172 self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
173 match find_missing_epochs_dirs(self.output_object_store.as_ref().unwrap(), SUCCESS_MARKER).await {
174 Ok(epochs) => {
175 self.metrics.first_missing_db_checkpoint_epoch.set(epochs.first().cloned().unwrap_or(0) as i64);
176 if let Err(err) = self.upload_db_checkpoints_to_object_store(epochs).await {
177 error!("Failed to upload db checkpoint to remote store with err: {:?}", err);
178 }
179 }
180 Err(err) => {
181 error!("Failed to find missing db checkpoints in remote store: {:?}", err);
182 }
183 }
184 },
185 _ = recv.recv() => break,
186 }
187 }
188 Ok(())
189 }
190 async fn run_db_checkpoint_cleanup_loop(
191 self: Arc<Self>,
192 mut recv: tokio::sync::broadcast::Receiver<()>,
193 ) -> Result<()> {
194 let mut interval = tokio::time::interval(self.interval);
195 info!("DB checkpoint upload disabled. DB checkpoint cleanup loop started");
196 loop {
197 tokio::select! {
198 _now = interval.tick() => {
199 let local_checkpoints_by_epoch =
200 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
201 self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
202 let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
203 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
204 for (_, db_path) in dirs {
205 let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
207 let upload_completed_path = local_db_path.join(UPLOAD_COMPLETED_MARKER);
208 if upload_completed_path.exists() {
209 continue;
210 }
211 let bytes = Bytes::from_static(b"success");
212 let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
213 put(&self.input_object_store,
214 &upload_completed_marker,
215 bytes.clone(),
216 )
217 .await?;
218 }
219 },
220 _ = recv.recv() => break,
221 }
222 }
223 Ok(())
224 }
225 async fn run_db_checkpoint_gc_loop(
226 self: Arc<Self>,
227 mut recv: tokio::sync::broadcast::Receiver<()>,
228 ) -> Result<()> {
229 let mut gc_interval = tokio::time::interval(Duration::from_secs(30));
230 info!("DB checkpoint garbage collection loop started");
231 loop {
232 tokio::select! {
233 _now = gc_interval.tick() => {
234 if let Ok(deleted) = self.garbage_collect_old_db_checkpoints().await
235 && !deleted.is_empty() {
236 info!("Garbage collected local db checkpoints: {:?}", deleted);
237 }
238 },
239 _ = recv.recv() => break,
240 }
241 }
242 Ok(())
243 }
244
245 async fn prune_and_compact(
246 &self,
247 db_path: PathBuf,
248 epoch: u64,
249 epoch_duration_ms: u64,
250 ) -> Result<()> {
251 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(
252 &db_path.join("store"),
253 None,
254 None,
255 ));
256 let checkpoint_store = Arc::new(CheckpointStore::new_for_db_checkpoint_handler(
257 &db_path.join("checkpoints"),
258 ));
259 let rpc_index = RpcIndexStore::new_without_init(&db_path);
260 let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
261 info!(
262 "Pruning db checkpoint in {:?} for epoch: {epoch}",
263 db_path.display()
264 );
265 AuthorityStorePruner::prune_objects_for_eligible_epochs(
266 &perpetual_db,
267 &checkpoint_store,
268 Some(&rpc_index),
269 None,
270 self.pruning_config.clone(),
271 metrics,
272 epoch_duration_ms,
273 )
274 .await?;
275 info!(
276 "Compacting db checkpoint in {:?} for epoch: {epoch}",
277 db_path.display()
278 );
279 AuthorityStorePruner::compact(&perpetual_db)?;
280 Ok(())
281 }
282 async fn upload_db_checkpoints_to_object_store(
283 &self,
284 missing_epochs: Vec<u64>,
285 ) -> Result<(), anyhow::Error> {
286 let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
287 let local_checkpoints_by_epoch =
288 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
289 let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
290 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
291 let object_store = self
292 .output_object_store
293 .as_ref()
294 .expect("Expected object store to exist")
295 .clone();
296 for (epoch, db_path) in dirs {
297 let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
299 if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
300 if self.state_snapshot_enabled {
301 let snapshot_completed_marker =
302 local_db_path.join(STATE_SNAPSHOT_COMPLETED_MARKER);
303 if !snapshot_completed_marker.exists() {
304 info!(
305 "DB checkpoint upload for epoch {} to wait until state snasphot uploaded",
306 *epoch
307 );
308 continue;
309 }
310 }
311
312 if self.prune_and_compact_before_upload {
313 self.prune_and_compact(local_db_path, *epoch, EPOCH_DURATION_MS_FOR_TESTING)
315 .await?;
316 }
317
318 info!("Copying db checkpoint for epoch: {epoch} to remote storage");
319 copy_recursively(
320 db_path,
321 &self.input_object_store,
322 &object_store,
323 NonZeroUsize::new(20).unwrap(),
324 )
325 .await?;
326
327 write_snapshot_manifest(db_path, &object_store, format!("epoch_{}/", epoch))
329 .await?;
330 let bytes = Bytes::from_static(b"success");
332 let success_marker = db_path.child(SUCCESS_MARKER);
333 put(&object_store, &success_marker, bytes.clone()).await?;
334 }
335 let bytes = Bytes::from_static(b"success");
336 let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
337 put(
338 &self.input_object_store,
339 &upload_completed_marker,
340 bytes.clone(),
341 )
342 .await?;
343 }
344 Ok(())
345 }
346
347 async fn garbage_collect_old_db_checkpoints(&self) -> Result<Vec<u64>> {
348 let local_checkpoints_by_epoch =
349 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
350 let mut deleted = Vec::new();
351 for (epoch, path) in local_checkpoints_by_epoch.iter() {
352 let marker_paths: Vec<Path> = self
353 .gc_markers
354 .iter()
355 .map(|marker| path.child(marker.clone()))
356 .collect();
357 let all_markers_present = try_join_all(
358 marker_paths
359 .iter()
360 .map(|path| self.input_object_store.get(path)),
361 )
362 .await;
363 match all_markers_present {
364 Ok(_) => {
367 info!("Deleting db checkpoint dir: {path} for epoch: {epoch}");
368 deleted.push(*epoch);
369 let local_fs_path = path_to_filesystem(self.input_root_path.clone(), path)?;
370 fs::remove_dir_all(&local_fs_path)?;
371 }
372 Err(_) => {
373 debug!("Not ready for deletion yet: {path}");
374 }
375 }
376 }
377 Ok(deleted)
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use crate::db_checkpoint_handler::{
384 DBCheckpointHandler, SUCCESS_MARKER, TEST_MARKER, UPLOAD_COMPLETED_MARKER,
385 };
386 use itertools::Itertools;
387 use std::fs;
388 use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
389 use sui_storage::object_store::util::{
390 find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs, path_to_filesystem,
391 };
392 use tempfile::TempDir;
393
394 #[tokio::test]
395 async fn test_basic() -> anyhow::Result<()> {
396 let checkpoint_dir = TempDir::new()?;
397 let checkpoint_dir_path = checkpoint_dir.path();
398 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
399 fs::create_dir(&local_epoch0_checkpoint)?;
400 let file1 = local_epoch0_checkpoint.join("file1");
401 fs::write(file1, b"Lorem ipsum")?;
402 let file2 = local_epoch0_checkpoint.join("file2");
403 fs::write(file2, b"Lorem ipsum")?;
404 let nested_dir = local_epoch0_checkpoint.join("data");
405 fs::create_dir(&nested_dir)?;
406 let file3 = nested_dir.join("file3");
407 fs::write(file3, b"Lorem ipsum")?;
408
409 let remote_checkpoint_dir = TempDir::new()?;
410 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
411 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
412
413 let input_store_config = ObjectStoreConfig {
414 object_store: Some(ObjectStoreType::File),
415 directory: Some(checkpoint_dir_path.to_path_buf()),
416 ..Default::default()
417 };
418 let output_store_config = ObjectStoreConfig {
419 object_store: Some(ObjectStoreType::File),
420 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
421 ..Default::default()
422 };
423 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
424 &input_store_config,
425 Some(&output_store_config),
426 10,
427 false,
428 false,
429 )?;
430 let local_checkpoints_by_epoch =
431 find_all_dirs_with_epoch_prefix(&db_checkpoint_handler.input_object_store, None)
432 .await?;
433 assert!(!local_checkpoints_by_epoch.is_empty());
434 assert_eq!(*local_checkpoints_by_epoch.first_key_value().unwrap().0, 0);
435 assert_eq!(
436 path_to_filesystem(
437 db_checkpoint_handler.input_root_path.clone(),
438 local_checkpoints_by_epoch.first_key_value().unwrap().1
439 )
440 .unwrap(),
441 std::fs::canonicalize(local_epoch0_checkpoint.clone()).unwrap()
442 );
443 let missing_epochs = find_missing_epochs_dirs(
444 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
445 SUCCESS_MARKER,
446 )
447 .await?;
448 db_checkpoint_handler
449 .upload_db_checkpoints_to_object_store(missing_epochs)
450 .await?;
451
452 assert!(remote_epoch0_checkpoint.join("file1").exists());
453 assert!(remote_epoch0_checkpoint.join("file2").exists());
454 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
455 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
456 assert!(
457 local_epoch0_checkpoint
458 .join(UPLOAD_COMPLETED_MARKER)
459 .exists()
460 );
461
462 let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
464 fs::write(test_marker, b"Lorem ipsum")?;
465 db_checkpoint_handler
466 .garbage_collect_old_db_checkpoints()
467 .await?;
468
469 assert!(!local_epoch0_checkpoint.join("file1").exists());
470 assert!(!local_epoch0_checkpoint.join("file1").exists());
471 assert!(!local_epoch0_checkpoint.join("file2").exists());
472 assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
473 Ok(())
474 }
475
476 #[tokio::test]
477 async fn test_upload_resumes() -> anyhow::Result<()> {
478 let checkpoint_dir = TempDir::new()?;
479 let checkpoint_dir_path = checkpoint_dir.path();
480 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
481
482 let remote_checkpoint_dir = TempDir::new()?;
483 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
484 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
485
486 let input_store_config = ObjectStoreConfig {
487 object_store: Some(ObjectStoreType::File),
488 directory: Some(checkpoint_dir_path.to_path_buf()),
489 ..Default::default()
490 };
491 let output_store_config = ObjectStoreConfig {
492 object_store: Some(ObjectStoreType::File),
493 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
494 ..Default::default()
495 };
496 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
497 &input_store_config,
498 Some(&output_store_config),
499 10,
500 false,
501 false,
502 )?;
503
504 fs::create_dir(&local_epoch0_checkpoint)?;
505 let file1 = local_epoch0_checkpoint.join("file1");
506 fs::write(file1, b"Lorem ipsum")?;
507 let file2 = local_epoch0_checkpoint.join("file2");
508 fs::write(file2, b"Lorem ipsum")?;
509 let nested_dir = local_epoch0_checkpoint.join("data");
510 fs::create_dir(&nested_dir)?;
511 let file3 = nested_dir.join("file3");
512 fs::write(file3, b"Lorem ipsum")?;
513
514 let missing_epochs = find_missing_epochs_dirs(
515 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
516 SUCCESS_MARKER,
517 )
518 .await?;
519 db_checkpoint_handler
520 .upload_db_checkpoints_to_object_store(missing_epochs)
521 .await?;
522 assert!(remote_epoch0_checkpoint.join("file1").exists());
523 assert!(remote_epoch0_checkpoint.join("file2").exists());
524 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
525 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
526 assert!(
527 local_epoch0_checkpoint
528 .join(UPLOAD_COMPLETED_MARKER)
529 .exists()
530 );
531
532 let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
534 fs::create_dir(&local_epoch1_checkpoint)?;
535 let file1 = local_epoch1_checkpoint.join("file1");
536 fs::write(file1, b"Lorem ipsum")?;
537 let file2 = local_epoch1_checkpoint.join("file2");
538 fs::write(file2, b"Lorem ipsum")?;
539 let nested_dir = local_epoch1_checkpoint.join("data");
540 fs::create_dir(&nested_dir)?;
541 let file3 = nested_dir.join("file3");
542 fs::write(file3, b"Lorem ipsum")?;
543
544 fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
547
548 let missing_epochs = find_missing_epochs_dirs(
551 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
552 SUCCESS_MARKER,
553 )
554 .await?;
555 db_checkpoint_handler
556 .upload_db_checkpoints_to_object_store(missing_epochs)
557 .await?;
558 assert!(remote_epoch0_checkpoint.join("file1").exists());
559 assert!(remote_epoch0_checkpoint.join("file2").exists());
560 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
561 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
562 assert!(
563 local_epoch0_checkpoint
564 .join(UPLOAD_COMPLETED_MARKER)
565 .exists()
566 );
567
568 let remote_epoch1_checkpoint = remote_checkpoint_dir_path.join("epoch_1");
569 assert!(remote_epoch1_checkpoint.join("file1").exists());
570 assert!(remote_epoch1_checkpoint.join("file2").exists());
571 assert!(remote_epoch1_checkpoint.join("data").join("file3").exists());
572 assert!(remote_epoch1_checkpoint.join(SUCCESS_MARKER).exists());
573 assert!(
574 local_epoch1_checkpoint
575 .join(UPLOAD_COMPLETED_MARKER)
576 .exists()
577 );
578
579 let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
581 fs::write(test_marker, b"Lorem ipsum")?;
582 let test_marker = local_epoch1_checkpoint.join(TEST_MARKER);
583 fs::write(test_marker, b"Lorem ipsum")?;
584
585 db_checkpoint_handler
586 .garbage_collect_old_db_checkpoints()
587 .await?;
588 assert!(!local_epoch0_checkpoint.join("file1").exists());
589 assert!(!local_epoch0_checkpoint.join("file1").exists());
590 assert!(!local_epoch0_checkpoint.join("file2").exists());
591 assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
592 assert!(!local_epoch1_checkpoint.join("file1").exists());
593 assert!(!local_epoch1_checkpoint.join("file1").exists());
594 assert!(!local_epoch1_checkpoint.join("file2").exists());
595 assert!(!local_epoch1_checkpoint.join("data").join("file3").exists());
596 Ok(())
597 }
598
599 #[tokio::test]
600 async fn test_missing_epochs() -> anyhow::Result<()> {
601 let checkpoint_dir = TempDir::new()?;
602 let checkpoint_dir_path = checkpoint_dir.path();
603 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
604 fs::create_dir(&local_epoch0_checkpoint)?;
605 let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
606 fs::create_dir(&local_epoch1_checkpoint)?;
607 let local_epoch3_checkpoint = checkpoint_dir_path.join("epoch_3");
609 fs::create_dir(&local_epoch3_checkpoint)?;
610 let remote_checkpoint_dir = TempDir::new()?;
611 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
612
613 let input_store_config = ObjectStoreConfig {
614 object_store: Some(ObjectStoreType::File),
615 directory: Some(checkpoint_dir_path.to_path_buf()),
616 ..Default::default()
617 };
618
619 let output_store_config = ObjectStoreConfig {
620 object_store: Some(ObjectStoreType::File),
621 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
622 ..Default::default()
623 };
624 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
625 &input_store_config,
626 Some(&output_store_config),
627 10,
628 false,
629 false,
630 )?;
631
632 let missing_epochs = find_missing_epochs_dirs(
633 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
634 SUCCESS_MARKER,
635 )
636 .await?;
637 db_checkpoint_handler
638 .upload_db_checkpoints_to_object_store(missing_epochs)
639 .await?;
640
641 let first_missing_epoch = find_missing_epochs_dirs(
642 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
643 SUCCESS_MARKER,
644 )
645 .await?
646 .first()
647 .cloned()
648 .unwrap();
649 assert_eq!(first_missing_epoch, 2);
650
651 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
652 fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
653
654 let first_missing_epoch = find_missing_epochs_dirs(
655 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
656 SUCCESS_MARKER,
657 )
658 .await?
659 .first()
660 .cloned()
661 .unwrap();
662 assert_eq!(first_missing_epoch, 0);
663
664 Ok(())
665 }
666
667 #[tokio::test]
668 async fn test_range_missing_epochs() -> anyhow::Result<()> {
669 let checkpoint_dir = TempDir::new()?;
670 let checkpoint_dir_path = checkpoint_dir.path();
671 let local_epoch100_checkpoint = checkpoint_dir_path.join("epoch_100");
672 fs::create_dir(&local_epoch100_checkpoint)?;
673 let local_epoch200_checkpoint = checkpoint_dir_path.join("epoch_200");
674 fs::create_dir(&local_epoch200_checkpoint)?;
675 let remote_checkpoint_dir = TempDir::new()?;
676 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
677
678 let input_store_config = ObjectStoreConfig {
679 object_store: Some(ObjectStoreType::File),
680 directory: Some(checkpoint_dir_path.to_path_buf()),
681 ..Default::default()
682 };
683
684 let output_store_config = ObjectStoreConfig {
685 object_store: Some(ObjectStoreType::File),
686 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
687 ..Default::default()
688 };
689 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
690 &input_store_config,
691 Some(&output_store_config),
692 10,
693 false,
694 false,
695 )?;
696
697 let missing_epochs = find_missing_epochs_dirs(
698 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
699 SUCCESS_MARKER,
700 )
701 .await?;
702 assert_eq!(missing_epochs, vec![0]);
703 db_checkpoint_handler
704 .upload_db_checkpoints_to_object_store(missing_epochs)
705 .await?;
706
707 let missing_epochs = find_missing_epochs_dirs(
708 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
709 SUCCESS_MARKER,
710 )
711 .await?;
712 let mut expected_missing_epochs: Vec<u64> = (0..100).collect();
713 expected_missing_epochs.extend((101..200).collect_vec().iter());
714 expected_missing_epochs.push(201);
715 assert_eq!(missing_epochs, expected_missing_epochs);
716 Ok(())
717 }
718}