1use crate::authority::authority_store_pruner::{
5 AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
6};
7use crate::authority::authority_store_tables::AuthorityPerpetualTables;
8use crate::checkpoints::CheckpointStore;
9use crate::rpc_index::RpcIndexStore;
10use anyhow::Result;
11use bytes::Bytes;
12use futures::future::try_join_all;
13use object_store::path::Path;
14use object_store::{DynObjectStore, ObjectStoreExt};
15use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
16use std::fs;
17use std::num::NonZeroUsize;
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::time::Duration;
21use sui_config::node::AuthorityStorePruningConfig;
22use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
23use sui_storage::object_store::util::{
24 copy_recursively, find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs,
25 path_to_filesystem, put, run_manifest_update_loop, write_snapshot_manifest,
26};
27use tracing::{debug, error, info};
28
29pub const SUCCESS_MARKER: &str = "_SUCCESS";
30pub const TEST_MARKER: &str = "_TEST";
31pub const UPLOAD_COMPLETED_MARKER: &str = "_UPLOAD_COMPLETED";
32pub const STATE_SNAPSHOT_COMPLETED_MARKER: &str = "_STATE_SNAPSHOT_COMPLETED";
33
34pub struct DBCheckpointMetrics {
35 pub first_missing_db_checkpoint_epoch: IntGauge,
36 pub num_local_db_checkpoints: IntGauge,
37}
38
39impl DBCheckpointMetrics {
40 pub fn new(registry: &Registry) -> Arc<Self> {
41 let this = Self {
42 first_missing_db_checkpoint_epoch: register_int_gauge_with_registry!(
43 "first_missing_db_checkpoint_epoch",
44 "First epoch for which we have no db checkpoint in remote store",
45 registry
46 )
47 .unwrap(),
48 num_local_db_checkpoints: register_int_gauge_with_registry!(
49 "num_local_db_checkpoints",
50 "Number of RocksDB checkpoints currently residing on local disk (i.e. not yet garbage collected)",
51 registry
52 )
53 .unwrap(),
54 };
55 Arc::new(this)
56 }
57}
58
59pub struct DBCheckpointHandler {
60 input_object_store: Arc<DynObjectStore>,
62 input_root_path: PathBuf,
64 output_object_store: Option<Arc<DynObjectStore>>,
66 interval: Duration,
68 gc_markers: Vec<String>,
70 prune_and_compact_before_upload: bool,
72 state_snapshot_enabled: bool,
74 pruning_config: AuthorityStorePruningConfig,
76 metrics: Arc<DBCheckpointMetrics>,
77}
78
79impl DBCheckpointHandler {
80 pub fn new(
81 input_path: &std::path::Path,
82 output_object_store_config: Option<&ObjectStoreConfig>,
83 interval_s: u64,
84 prune_and_compact_before_upload: bool,
85 pruning_config: AuthorityStorePruningConfig,
86 registry: &Registry,
87 state_snapshot_enabled: bool,
88 ) -> Result<Arc<Self>> {
89 let input_store_config = ObjectStoreConfig {
90 object_store: Some(ObjectStoreType::File),
91 directory: Some(input_path.to_path_buf()),
92 ..Default::default()
93 };
94 let mut gc_markers = vec![UPLOAD_COMPLETED_MARKER.to_string()];
95 if state_snapshot_enabled {
96 gc_markers.push(STATE_SNAPSHOT_COMPLETED_MARKER.to_string());
97 }
98 Ok(Arc::new(DBCheckpointHandler {
99 input_object_store: input_store_config.make()?,
100 input_root_path: input_path.to_path_buf(),
101 output_object_store: output_object_store_config
102 .map(|config| config.make().expect("Failed to make object store")),
103 interval: Duration::from_secs(interval_s),
104 gc_markers,
105 prune_and_compact_before_upload,
106 state_snapshot_enabled,
107 pruning_config,
108 metrics: DBCheckpointMetrics::new(registry),
109 }))
110 }
111 pub fn new_for_test(
112 input_object_store_config: &ObjectStoreConfig,
113 output_object_store_config: Option<&ObjectStoreConfig>,
114 interval_s: u64,
115 prune_and_compact_before_upload: bool,
116 state_snapshot_enabled: bool,
117 ) -> Result<Arc<Self>> {
118 Ok(Arc::new(DBCheckpointHandler {
119 input_object_store: input_object_store_config.make()?,
120 input_root_path: input_object_store_config
121 .directory
122 .as_ref()
123 .unwrap()
124 .clone(),
125 output_object_store: output_object_store_config
126 .map(|config| config.make().expect("Failed to make object store")),
127 interval: Duration::from_secs(interval_s),
128 gc_markers: vec![UPLOAD_COMPLETED_MARKER.to_string(), TEST_MARKER.to_string()],
129 prune_and_compact_before_upload,
130 state_snapshot_enabled,
131 pruning_config: AuthorityStorePruningConfig::default(),
132 metrics: DBCheckpointMetrics::new(&Registry::default()),
133 }))
134 }
135 pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
136 let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
137 if self.output_object_store.is_some() {
138 tokio::task::spawn(Self::run_db_checkpoint_upload_loop(
139 self.clone(),
140 kill_sender.subscribe(),
141 ));
142 tokio::task::spawn(run_manifest_update_loop(
143 self.output_object_store.as_ref().unwrap().clone(),
144 kill_sender.subscribe(),
145 ));
146 } else {
147 tokio::task::spawn(Self::run_db_checkpoint_cleanup_loop(
151 self.clone(),
152 kill_sender.subscribe(),
153 ));
154 }
155 tokio::task::spawn(Self::run_db_checkpoint_gc_loop(
156 self,
157 kill_sender.subscribe(),
158 ));
159 kill_sender
160 }
161 async fn run_db_checkpoint_upload_loop(
162 self: Arc<Self>,
163 mut recv: tokio::sync::broadcast::Receiver<()>,
164 ) -> Result<()> {
165 let mut interval = tokio::time::interval(self.interval);
166 info!("DB checkpoint upload loop started");
167 loop {
168 tokio::select! {
169 _now = interval.tick() => {
170 let local_checkpoints_by_epoch =
171 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
172 self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
173 match find_missing_epochs_dirs(self.output_object_store.as_ref().unwrap(), SUCCESS_MARKER).await {
174 Ok(epochs) => {
175 self.metrics.first_missing_db_checkpoint_epoch.set(epochs.first().cloned().unwrap_or(0) as i64);
176 if let Err(err) = self.upload_db_checkpoints_to_object_store(epochs).await {
177 error!("Failed to upload db checkpoint to remote store with err: {:?}", err);
178 }
179 }
180 Err(err) => {
181 error!("Failed to find missing db checkpoints in remote store: {:?}", err);
182 }
183 }
184 },
185 _ = recv.recv() => break,
186 }
187 }
188 Ok(())
189 }
190 async fn run_db_checkpoint_cleanup_loop(
191 self: Arc<Self>,
192 mut recv: tokio::sync::broadcast::Receiver<()>,
193 ) -> Result<()> {
194 let mut interval = tokio::time::interval(self.interval);
195 info!("DB checkpoint upload disabled. DB checkpoint cleanup loop started");
196 loop {
197 tokio::select! {
198 _now = interval.tick() => {
199 let local_checkpoints_by_epoch =
200 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
201 self.metrics.num_local_db_checkpoints.set(local_checkpoints_by_epoch.len() as i64);
202 let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
203 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
204 for (_, db_path) in dirs {
205 let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
207 let upload_completed_path = local_db_path.join(UPLOAD_COMPLETED_MARKER);
208 if upload_completed_path.exists() {
209 continue;
210 }
211 let bytes = Bytes::from_static(b"success");
212 let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
213 put(&self.input_object_store,
214 &upload_completed_marker,
215 bytes.clone(),
216 )
217 .await?;
218 }
219 },
220 _ = recv.recv() => break,
221 }
222 }
223 Ok(())
224 }
225 async fn run_db_checkpoint_gc_loop(
226 self: Arc<Self>,
227 mut recv: tokio::sync::broadcast::Receiver<()>,
228 ) -> Result<()> {
229 let mut gc_interval = tokio::time::interval(Duration::from_secs(30));
230 info!("DB checkpoint garbage collection loop started");
231 loop {
232 tokio::select! {
233 _now = gc_interval.tick() => {
234 if let Ok(deleted) = self.garbage_collect_old_db_checkpoints().await
235 && !deleted.is_empty() {
236 info!("Garbage collected local db checkpoints: {:?}", deleted);
237 }
238 },
239 _ = recv.recv() => break,
240 }
241 }
242 Ok(())
243 }
244
245 async fn prune_and_compact(
246 &self,
247 db_path: PathBuf,
248 epoch: u64,
249 epoch_duration_ms: u64,
250 ) -> Result<()> {
251 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(
252 &db_path.join("store"),
253 None,
254 None,
255 ));
256 let checkpoint_store = Arc::new(CheckpointStore::new_for_db_checkpoint_handler(
257 &db_path.join("checkpoints"),
258 ));
259 let rpc_index = RpcIndexStore::new_without_init(&db_path);
260 let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
261 info!(
262 "Pruning db checkpoint in {:?} for epoch: {epoch}",
263 db_path.display()
264 );
265 AuthorityStorePruner::prune_objects_for_eligible_epochs(
266 &perpetual_db,
267 &checkpoint_store,
268 Some(&rpc_index),
269 self.pruning_config.clone(),
270 metrics,
271 epoch_duration_ms,
272 )
273 .await?;
274 info!(
275 "Compacting db checkpoint in {:?} for epoch: {epoch}",
276 db_path.display()
277 );
278 AuthorityStorePruner::compact(&perpetual_db)?;
279 Ok(())
280 }
281 async fn upload_db_checkpoints_to_object_store(
282 &self,
283 missing_epochs: Vec<u64>,
284 ) -> Result<(), anyhow::Error> {
285 let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
286 let local_checkpoints_by_epoch =
287 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
288 let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
289 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
290 let object_store = self
291 .output_object_store
292 .as_ref()
293 .expect("Expected object store to exist")
294 .clone();
295 for (epoch, db_path) in dirs {
296 let local_db_path = path_to_filesystem(self.input_root_path.clone(), db_path)?;
298 if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
299 if self.state_snapshot_enabled {
300 let snapshot_completed_marker =
301 local_db_path.join(STATE_SNAPSHOT_COMPLETED_MARKER);
302 if !snapshot_completed_marker.exists() {
303 info!(
304 "DB checkpoint upload for epoch {} to wait until state snasphot uploaded",
305 *epoch
306 );
307 continue;
308 }
309 }
310
311 if self.prune_and_compact_before_upload {
312 self.prune_and_compact(local_db_path, *epoch, EPOCH_DURATION_MS_FOR_TESTING)
314 .await?;
315 }
316
317 info!("Copying db checkpoint for epoch: {epoch} to remote storage");
318 copy_recursively(
319 db_path,
320 &self.input_object_store,
321 &object_store,
322 NonZeroUsize::new(20).unwrap(),
323 )
324 .await?;
325
326 write_snapshot_manifest(db_path, &object_store, format!("epoch_{}/", epoch))
328 .await?;
329 let bytes = Bytes::from_static(b"success");
331 let success_marker = db_path.child(SUCCESS_MARKER);
332 put(&object_store, &success_marker, bytes.clone()).await?;
333 }
334 let bytes = Bytes::from_static(b"success");
335 let upload_completed_marker = db_path.child(UPLOAD_COMPLETED_MARKER);
336 put(
337 &self.input_object_store,
338 &upload_completed_marker,
339 bytes.clone(),
340 )
341 .await?;
342 }
343 Ok(())
344 }
345
346 async fn garbage_collect_old_db_checkpoints(&self) -> Result<Vec<u64>> {
347 let local_checkpoints_by_epoch =
348 find_all_dirs_with_epoch_prefix(&self.input_object_store, None).await?;
349 let mut deleted = Vec::new();
350 for (epoch, path) in local_checkpoints_by_epoch.iter() {
351 let marker_paths: Vec<Path> = self
352 .gc_markers
353 .iter()
354 .map(|marker| path.child(marker.clone()))
355 .collect();
356 let all_markers_present = try_join_all(
357 marker_paths
358 .iter()
359 .map(|path| self.input_object_store.get(path)),
360 )
361 .await;
362 match all_markers_present {
363 Ok(_) => {
366 info!("Deleting db checkpoint dir: {path} for epoch: {epoch}");
367 deleted.push(*epoch);
368 let local_fs_path = path_to_filesystem(self.input_root_path.clone(), path)?;
369 fs::remove_dir_all(&local_fs_path)?;
370 }
371 Err(_) => {
372 debug!("Not ready for deletion yet: {path}");
373 }
374 }
375 }
376 Ok(deleted)
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use crate::db_checkpoint_handler::{
383 DBCheckpointHandler, SUCCESS_MARKER, TEST_MARKER, UPLOAD_COMPLETED_MARKER,
384 };
385 use itertools::Itertools;
386 use std::fs;
387 use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
388 use sui_storage::object_store::util::{
389 find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs, path_to_filesystem,
390 };
391 use tempfile::TempDir;
392
393 #[tokio::test]
394 async fn test_basic() -> anyhow::Result<()> {
395 let checkpoint_dir = TempDir::new()?;
396 let checkpoint_dir_path = checkpoint_dir.path();
397 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
398 fs::create_dir(&local_epoch0_checkpoint)?;
399 let file1 = local_epoch0_checkpoint.join("file1");
400 fs::write(file1, b"Lorem ipsum")?;
401 let file2 = local_epoch0_checkpoint.join("file2");
402 fs::write(file2, b"Lorem ipsum")?;
403 let nested_dir = local_epoch0_checkpoint.join("data");
404 fs::create_dir(&nested_dir)?;
405 let file3 = nested_dir.join("file3");
406 fs::write(file3, b"Lorem ipsum")?;
407
408 let remote_checkpoint_dir = TempDir::new()?;
409 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
410 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
411
412 let input_store_config = ObjectStoreConfig {
413 object_store: Some(ObjectStoreType::File),
414 directory: Some(checkpoint_dir_path.to_path_buf()),
415 ..Default::default()
416 };
417 let output_store_config = ObjectStoreConfig {
418 object_store: Some(ObjectStoreType::File),
419 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
420 ..Default::default()
421 };
422 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
423 &input_store_config,
424 Some(&output_store_config),
425 10,
426 false,
427 false,
428 )?;
429 let local_checkpoints_by_epoch =
430 find_all_dirs_with_epoch_prefix(&db_checkpoint_handler.input_object_store, None)
431 .await?;
432 assert!(!local_checkpoints_by_epoch.is_empty());
433 assert_eq!(*local_checkpoints_by_epoch.first_key_value().unwrap().0, 0);
434 assert_eq!(
435 path_to_filesystem(
436 db_checkpoint_handler.input_root_path.clone(),
437 local_checkpoints_by_epoch.first_key_value().unwrap().1
438 )
439 .unwrap(),
440 std::fs::canonicalize(local_epoch0_checkpoint.clone()).unwrap()
441 );
442 let missing_epochs = find_missing_epochs_dirs(
443 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
444 SUCCESS_MARKER,
445 )
446 .await?;
447 db_checkpoint_handler
448 .upload_db_checkpoints_to_object_store(missing_epochs)
449 .await?;
450
451 assert!(remote_epoch0_checkpoint.join("file1").exists());
452 assert!(remote_epoch0_checkpoint.join("file2").exists());
453 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
454 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
455 assert!(
456 local_epoch0_checkpoint
457 .join(UPLOAD_COMPLETED_MARKER)
458 .exists()
459 );
460
461 let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
463 fs::write(test_marker, b"Lorem ipsum")?;
464 db_checkpoint_handler
465 .garbage_collect_old_db_checkpoints()
466 .await?;
467
468 assert!(!local_epoch0_checkpoint.join("file1").exists());
469 assert!(!local_epoch0_checkpoint.join("file1").exists());
470 assert!(!local_epoch0_checkpoint.join("file2").exists());
471 assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
472 Ok(())
473 }
474
475 #[tokio::test]
476 async fn test_upload_resumes() -> anyhow::Result<()> {
477 let checkpoint_dir = TempDir::new()?;
478 let checkpoint_dir_path = checkpoint_dir.path();
479 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
480
481 let remote_checkpoint_dir = TempDir::new()?;
482 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
483 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
484
485 let input_store_config = ObjectStoreConfig {
486 object_store: Some(ObjectStoreType::File),
487 directory: Some(checkpoint_dir_path.to_path_buf()),
488 ..Default::default()
489 };
490 let output_store_config = ObjectStoreConfig {
491 object_store: Some(ObjectStoreType::File),
492 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
493 ..Default::default()
494 };
495 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
496 &input_store_config,
497 Some(&output_store_config),
498 10,
499 false,
500 false,
501 )?;
502
503 fs::create_dir(&local_epoch0_checkpoint)?;
504 let file1 = local_epoch0_checkpoint.join("file1");
505 fs::write(file1, b"Lorem ipsum")?;
506 let file2 = local_epoch0_checkpoint.join("file2");
507 fs::write(file2, b"Lorem ipsum")?;
508 let nested_dir = local_epoch0_checkpoint.join("data");
509 fs::create_dir(&nested_dir)?;
510 let file3 = nested_dir.join("file3");
511 fs::write(file3, b"Lorem ipsum")?;
512
513 let missing_epochs = find_missing_epochs_dirs(
514 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
515 SUCCESS_MARKER,
516 )
517 .await?;
518 db_checkpoint_handler
519 .upload_db_checkpoints_to_object_store(missing_epochs)
520 .await?;
521 assert!(remote_epoch0_checkpoint.join("file1").exists());
522 assert!(remote_epoch0_checkpoint.join("file2").exists());
523 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
524 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
525 assert!(
526 local_epoch0_checkpoint
527 .join(UPLOAD_COMPLETED_MARKER)
528 .exists()
529 );
530
531 let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
533 fs::create_dir(&local_epoch1_checkpoint)?;
534 let file1 = local_epoch1_checkpoint.join("file1");
535 fs::write(file1, b"Lorem ipsum")?;
536 let file2 = local_epoch1_checkpoint.join("file2");
537 fs::write(file2, b"Lorem ipsum")?;
538 let nested_dir = local_epoch1_checkpoint.join("data");
539 fs::create_dir(&nested_dir)?;
540 let file3 = nested_dir.join("file3");
541 fs::write(file3, b"Lorem ipsum")?;
542
543 fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
546
547 let missing_epochs = find_missing_epochs_dirs(
550 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
551 SUCCESS_MARKER,
552 )
553 .await?;
554 db_checkpoint_handler
555 .upload_db_checkpoints_to_object_store(missing_epochs)
556 .await?;
557 assert!(remote_epoch0_checkpoint.join("file1").exists());
558 assert!(remote_epoch0_checkpoint.join("file2").exists());
559 assert!(remote_epoch0_checkpoint.join("data").join("file3").exists());
560 assert!(remote_epoch0_checkpoint.join(SUCCESS_MARKER).exists());
561 assert!(
562 local_epoch0_checkpoint
563 .join(UPLOAD_COMPLETED_MARKER)
564 .exists()
565 );
566
567 let remote_epoch1_checkpoint = remote_checkpoint_dir_path.join("epoch_1");
568 assert!(remote_epoch1_checkpoint.join("file1").exists());
569 assert!(remote_epoch1_checkpoint.join("file2").exists());
570 assert!(remote_epoch1_checkpoint.join("data").join("file3").exists());
571 assert!(remote_epoch1_checkpoint.join(SUCCESS_MARKER).exists());
572 assert!(
573 local_epoch1_checkpoint
574 .join(UPLOAD_COMPLETED_MARKER)
575 .exists()
576 );
577
578 let test_marker = local_epoch0_checkpoint.join(TEST_MARKER);
580 fs::write(test_marker, b"Lorem ipsum")?;
581 let test_marker = local_epoch1_checkpoint.join(TEST_MARKER);
582 fs::write(test_marker, b"Lorem ipsum")?;
583
584 db_checkpoint_handler
585 .garbage_collect_old_db_checkpoints()
586 .await?;
587 assert!(!local_epoch0_checkpoint.join("file1").exists());
588 assert!(!local_epoch0_checkpoint.join("file1").exists());
589 assert!(!local_epoch0_checkpoint.join("file2").exists());
590 assert!(!local_epoch0_checkpoint.join("data").join("file3").exists());
591 assert!(!local_epoch1_checkpoint.join("file1").exists());
592 assert!(!local_epoch1_checkpoint.join("file1").exists());
593 assert!(!local_epoch1_checkpoint.join("file2").exists());
594 assert!(!local_epoch1_checkpoint.join("data").join("file3").exists());
595 Ok(())
596 }
597
598 #[tokio::test]
599 async fn test_missing_epochs() -> anyhow::Result<()> {
600 let checkpoint_dir = TempDir::new()?;
601 let checkpoint_dir_path = checkpoint_dir.path();
602 let local_epoch0_checkpoint = checkpoint_dir_path.join("epoch_0");
603 fs::create_dir(&local_epoch0_checkpoint)?;
604 let local_epoch1_checkpoint = checkpoint_dir_path.join("epoch_1");
605 fs::create_dir(&local_epoch1_checkpoint)?;
606 let local_epoch3_checkpoint = checkpoint_dir_path.join("epoch_3");
608 fs::create_dir(&local_epoch3_checkpoint)?;
609 let remote_checkpoint_dir = TempDir::new()?;
610 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
611
612 let input_store_config = ObjectStoreConfig {
613 object_store: Some(ObjectStoreType::File),
614 directory: Some(checkpoint_dir_path.to_path_buf()),
615 ..Default::default()
616 };
617
618 let output_store_config = ObjectStoreConfig {
619 object_store: Some(ObjectStoreType::File),
620 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
621 ..Default::default()
622 };
623 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
624 &input_store_config,
625 Some(&output_store_config),
626 10,
627 false,
628 false,
629 )?;
630
631 let missing_epochs = find_missing_epochs_dirs(
632 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
633 SUCCESS_MARKER,
634 )
635 .await?;
636 db_checkpoint_handler
637 .upload_db_checkpoints_to_object_store(missing_epochs)
638 .await?;
639
640 let first_missing_epoch = find_missing_epochs_dirs(
641 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
642 SUCCESS_MARKER,
643 )
644 .await?
645 .first()
646 .cloned()
647 .unwrap();
648 assert_eq!(first_missing_epoch, 2);
649
650 let remote_epoch0_checkpoint = remote_checkpoint_dir_path.join("epoch_0");
651 fs::remove_file(remote_epoch0_checkpoint.join(SUCCESS_MARKER))?;
652
653 let first_missing_epoch = find_missing_epochs_dirs(
654 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
655 SUCCESS_MARKER,
656 )
657 .await?
658 .first()
659 .cloned()
660 .unwrap();
661 assert_eq!(first_missing_epoch, 0);
662
663 Ok(())
664 }
665
666 #[tokio::test]
667 async fn test_range_missing_epochs() -> anyhow::Result<()> {
668 let checkpoint_dir = TempDir::new()?;
669 let checkpoint_dir_path = checkpoint_dir.path();
670 let local_epoch100_checkpoint = checkpoint_dir_path.join("epoch_100");
671 fs::create_dir(&local_epoch100_checkpoint)?;
672 let local_epoch200_checkpoint = checkpoint_dir_path.join("epoch_200");
673 fs::create_dir(&local_epoch200_checkpoint)?;
674 let remote_checkpoint_dir = TempDir::new()?;
675 let remote_checkpoint_dir_path = remote_checkpoint_dir.path();
676
677 let input_store_config = ObjectStoreConfig {
678 object_store: Some(ObjectStoreType::File),
679 directory: Some(checkpoint_dir_path.to_path_buf()),
680 ..Default::default()
681 };
682
683 let output_store_config = ObjectStoreConfig {
684 object_store: Some(ObjectStoreType::File),
685 directory: Some(remote_checkpoint_dir_path.to_path_buf()),
686 ..Default::default()
687 };
688 let db_checkpoint_handler = DBCheckpointHandler::new_for_test(
689 &input_store_config,
690 Some(&output_store_config),
691 10,
692 false,
693 false,
694 )?;
695
696 let missing_epochs = find_missing_epochs_dirs(
697 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
698 SUCCESS_MARKER,
699 )
700 .await?;
701 assert_eq!(missing_epochs, vec![0]);
702 db_checkpoint_handler
703 .upload_db_checkpoints_to_object_store(missing_epochs)
704 .await?;
705
706 let missing_epochs = find_missing_epochs_dirs(
707 db_checkpoint_handler.output_object_store.as_ref().unwrap(),
708 SUCCESS_MARKER,
709 )
710 .await?;
711 let mut expected_missing_epochs: Vec<u64> = (0..100).collect();
712 expected_missing_epochs.extend((101..200).collect_vec().iter());
713 expected_missing_epochs.push(201);
714 assert_eq!(missing_epochs, expected_missing_epochs);
715 Ok(())
716 }
717}