sui_storage/object_store/
util.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::object_store::{
5    ObjectStoreDeleteExt, ObjectStoreGetExt, ObjectStoreListExt, ObjectStorePutExt,
6};
7use anyhow::{Context, Result, anyhow};
8use backoff::future::retry;
9use bytes::Bytes;
10use futures::StreamExt;
11use futures::TryStreamExt;
12use indicatif::ProgressBar;
13use itertools::Itertools;
14use object_store::path::Path;
15use object_store::{DynObjectStore, Error, ObjectStore};
16use serde::{Deserialize, Serialize};
17use std::collections::BTreeMap;
18use std::num::NonZeroUsize;
19use std::ops::Range;
20use std::path::PathBuf;
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::time::Instant;
24use tracing::{error, warn};
25use url::Url;
26
27pub const MANIFEST_FILENAME: &str = "MANIFEST";
28
29#[derive(Serialize, Deserialize)]
30
31pub struct Manifest {
32    pub available_epochs: Vec<u64>,
33}
34
35impl Manifest {
36    pub fn new(available_epochs: Vec<u64>) -> Self {
37        Manifest { available_epochs }
38    }
39
40    pub fn epoch_exists(&self, epoch: u64) -> bool {
41        self.available_epochs.contains(&epoch)
42    }
43}
44
45#[derive(Debug, Clone)]
46pub struct PerEpochManifest {
47    pub lines: Vec<String>,
48}
49
50impl PerEpochManifest {
51    pub fn new(lines: Vec<String>) -> Self {
52        PerEpochManifest { lines }
53    }
54
55    pub fn serialize_as_newline_delimited(&self) -> String {
56        self.lines.join("\n")
57    }
58
59    pub fn deserialize_from_newline_delimited(s: &str) -> PerEpochManifest {
60        PerEpochManifest {
61            lines: s.lines().map(String::from).collect(),
62        }
63    }
64
65    // Method to filter lines by a given prefix
66    pub fn filter_by_prefix(&self, prefix: &str) -> PerEpochManifest {
67        let filtered_lines = self
68            .lines
69            .iter()
70            .filter(|line| line.starts_with(prefix))
71            .cloned()
72            .collect();
73
74        PerEpochManifest {
75            lines: filtered_lines,
76        }
77    }
78}
79
80pub async fn get<S: ObjectStoreGetExt>(store: &S, src: &Path) -> Result<Bytes> {
81    let bytes = retry(backoff::ExponentialBackoff::default(), || async {
82        store.get_bytes(src).await.map_err(|e| {
83            error!("Failed to read file from object store with error: {:?}", &e);
84            backoff::Error::transient(e)
85        })
86    })
87    .await?;
88    Ok(bytes)
89}
90
91pub async fn exists<S: ObjectStoreGetExt>(store: &S, src: &Path) -> bool {
92    store.get_bytes(src).await.is_ok()
93}
94
95pub async fn put<S: ObjectStorePutExt>(store: &S, src: &Path, bytes: Bytes) -> Result<()> {
96    retry(backoff::ExponentialBackoff::default(), || async {
97        if !bytes.is_empty() {
98            store.put_bytes(src, bytes.clone()).await.map_err(|e| {
99                error!("Failed to write file to object store with error: {:?}", &e);
100                backoff::Error::transient(e)
101            })
102        } else {
103            warn!("Not copying empty file: {:?}", src);
104            Ok(())
105        }
106    })
107    .await?;
108    Ok(())
109}
110
111pub async fn copy_file<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
112    src: &Path,
113    dest: &Path,
114    src_store: &S,
115    dest_store: &D,
116) -> Result<()> {
117    let bytes = get(src_store, src).await?;
118    if !bytes.is_empty() {
119        put(dest_store, dest, bytes).await
120    } else {
121        warn!("Not copying empty file: {:?}", src);
122        Ok(())
123    }
124}
125
126pub async fn copy_files<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
127    src: &[Path],
128    dest: &[Path],
129    src_store: &S,
130    dest_store: &D,
131    concurrency: NonZeroUsize,
132    progress_bar: Option<ProgressBar>,
133) -> Result<Vec<()>> {
134    let mut instant = Instant::now();
135    let progress_bar_clone = progress_bar.clone();
136    let results = futures::stream::iter(src.iter().zip(dest.iter()))
137        .map(|(path_in, path_out)| async move {
138            let ret = copy_file(path_in, path_out, src_store, dest_store).await;
139            Ok((path_out.clone(), ret))
140        })
141        .boxed()
142        .buffer_unordered(concurrency.get())
143        .try_for_each(|(path, ret)| {
144            if let Some(progress_bar_clone) = &progress_bar_clone {
145                progress_bar_clone.inc(1);
146                progress_bar_clone.set_message(format!("file: {}", path));
147                instant = Instant::now();
148            }
149            futures::future::ready(ret)
150        })
151        .await;
152    Ok(results.into_iter().collect())
153}
154
155pub async fn copy_recursively<S: ObjectStoreGetExt + ObjectStoreListExt, D: ObjectStorePutExt>(
156    dir: &Path,
157    src_store: &S,
158    dest_store: &D,
159    concurrency: NonZeroUsize,
160) -> Result<Vec<()>> {
161    let mut input_paths = vec![];
162    let mut output_paths = vec![];
163    let mut paths = src_store.list_objects(Some(dir)).await;
164    while let Some(res) = paths.next().await {
165        if let Ok(object_metadata) = res {
166            input_paths.push(object_metadata.location.clone());
167            output_paths.push(object_metadata.location);
168        } else {
169            return Err(res.err().unwrap().into());
170        }
171    }
172    copy_files(
173        &input_paths,
174        &output_paths,
175        src_store,
176        dest_store,
177        concurrency,
178        None,
179    )
180    .await
181}
182
183pub async fn delete_files<S: ObjectStoreDeleteExt>(
184    files: &[Path],
185    store: &S,
186    concurrency: NonZeroUsize,
187) -> Result<Vec<()>> {
188    let results: Vec<Result<()>> = futures::stream::iter(files)
189        .map(|f| {
190            retry(backoff::ExponentialBackoff::default(), || async {
191                store.delete_object(f).await.map_err(|e| {
192                    error!("Failed to delete file on object store with error: {:?}", &e);
193                    backoff::Error::transient(e)
194                })
195            })
196        })
197        .boxed()
198        .buffer_unordered(concurrency.get())
199        .collect()
200        .await;
201    results.into_iter().collect()
202}
203
204pub async fn delete_recursively<S: ObjectStoreDeleteExt + ObjectStoreListExt>(
205    path: &Path,
206    store: &S,
207    concurrency: NonZeroUsize,
208) -> Result<Vec<()>> {
209    let mut paths_to_delete = vec![];
210    let mut paths = store.list_objects(Some(path)).await;
211    while let Some(res) = paths.next().await {
212        if let Ok(object_metadata) = res {
213            paths_to_delete.push(object_metadata.location);
214        } else {
215            return Err(res.err().unwrap().into());
216        }
217    }
218    delete_files(&paths_to_delete, store, concurrency).await
219}
220
221pub fn path_to_filesystem(local_dir_path: PathBuf, location: &Path) -> anyhow::Result<PathBuf> {
222    // Convert an `object_store::path::Path` to `std::path::PathBuf`
223    let path = std::fs::canonicalize(local_dir_path)?;
224    let mut url = Url::from_file_path(&path)
225        .map_err(|_| anyhow!("Failed to parse input path: {}", path.display()))?;
226    url.path_segments_mut()
227        .map_err(|_| anyhow!("Failed to get path segments: {}", path.display()))?
228        .pop_if_empty()
229        .extend(location.parts());
230    let new_path = url
231        .to_file_path()
232        .map_err(|_| anyhow!("Failed to convert url to path: {}", url.as_str()))?;
233    Ok(new_path)
234}
235
236/// This function will find all child directories in the input store which are of the form "epoch_num"
237/// and return a map of epoch number to the directory path
238pub async fn find_all_dirs_with_epoch_prefix(
239    store: &Arc<DynObjectStore>,
240    prefix: Option<&Path>,
241) -> anyhow::Result<BTreeMap<u64, Path>> {
242    let mut dirs = BTreeMap::new();
243    let entries = store.list_with_delimiter(prefix).await?;
244    for entry in entries.common_prefixes {
245        if let Some(filename) = entry.filename() {
246            if !filename.starts_with("epoch_") || filename.ends_with(".tmp") {
247                continue;
248            }
249            let epoch = filename
250                .split_once('_')
251                .context("Failed to split dir name")
252                .map(|(_, epoch)| epoch.parse::<u64>())??;
253            dirs.insert(epoch, entry);
254        }
255    }
256    Ok(dirs)
257}
258
259pub async fn list_all_epochs(object_store: Arc<DynObjectStore>) -> Result<Vec<u64>> {
260    let remote_epoch_dirs = find_all_dirs_with_epoch_prefix(&object_store, None).await?;
261    let mut out = vec![];
262    let mut success_marker_found = false;
263    for (epoch, path) in remote_epoch_dirs.iter().sorted() {
264        let success_marker = path.child("_SUCCESS");
265        let get_result = object_store.get(&success_marker).await;
266        match get_result {
267            Err(_) => {
268                if !success_marker_found {
269                    error!("No success marker found for epoch: {epoch}");
270                }
271            }
272            Ok(_) => {
273                out.push(*epoch);
274                success_marker_found = true;
275            }
276        }
277    }
278
279    // Also check for archived epochs in the archive/ directory
280    let archive_prefix = Path::from("archive");
281    if let Ok(archive_epoch_dirs) =
282        find_all_dirs_with_epoch_prefix(&object_store, Some(&archive_prefix)).await
283    {
284        for (epoch, path) in archive_epoch_dirs.iter().sorted() {
285            let success_marker = path.child("_SUCCESS");
286            let get_result = object_store.get(&success_marker).await;
287            if get_result.is_ok() && !out.contains(epoch) {
288                out.push(*epoch);
289            }
290        }
291    }
292
293    Ok(out)
294}
295
296pub async fn run_manifest_update_loop(
297    store: Arc<DynObjectStore>,
298    mut recv: tokio::sync::broadcast::Receiver<()>,
299) -> Result<()> {
300    let mut update_interval = tokio::time::interval(Duration::from_secs(300));
301    loop {
302        tokio::select! {
303            _now = update_interval.tick() => {
304                if let Ok(epochs) = list_all_epochs(store.clone()).await {
305                    let manifest_path = Path::from(MANIFEST_FILENAME);
306                    let manifest = Manifest::new(epochs);
307                    let bytes = serde_json::to_string(&manifest)?;
308                    put(&store, &manifest_path, Bytes::from(bytes)).await?;
309                }
310            },
311             _ = recv.recv() => break,
312        }
313    }
314    Ok(())
315}
316
317/// This function will find all child directories in the input store which are of the form "epoch_num"
318/// and return a map of epoch number to the directory path
319pub async fn find_all_files_with_epoch_prefix(
320    store: &Arc<DynObjectStore>,
321    prefix: Option<&Path>,
322) -> anyhow::Result<Vec<Range<u64>>> {
323    let mut ranges = Vec::new();
324    let entries = store.list_with_delimiter(prefix).await?;
325    for entry in entries.objects {
326        let checkpoint_seq_range = entry
327            .location
328            .filename()
329            .ok_or(anyhow!("Illegal file name"))?
330            .split_once('.')
331            .context("Failed to split dir name")?
332            .0
333            .split_once('_')
334            .context("Failed to split dir name")
335            .map(|(start, end)| Range {
336                start: start.parse::<u64>().unwrap(),
337                end: end.parse::<u64>().unwrap(),
338            })?;
339
340        ranges.push(checkpoint_seq_range);
341    }
342    Ok(ranges)
343}
344
345/// This function will find missing epoch directories in the input store and return a list of such
346/// epoch numbers. If the highest epoch directory in the store is `epoch_N` then it is expected that the
347/// store will have all epoch directories from `epoch_0` to `epoch_N`. Additionally, any epoch directory
348/// should have the passed in marker file present or else that epoch number is already considered as
349/// missing
350pub async fn find_missing_epochs_dirs(
351    store: &Arc<DynObjectStore>,
352    success_marker: &str,
353) -> anyhow::Result<Vec<u64>> {
354    let remote_checkpoints_by_epoch = find_all_dirs_with_epoch_prefix(store, None).await?;
355    let mut dirs: Vec<_> = remote_checkpoints_by_epoch.iter().collect();
356    dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
357    let mut candidate_epoch: u64 = 0;
358    let mut missing_epochs = Vec::new();
359    for (epoch_num, path) in dirs {
360        while candidate_epoch < *epoch_num {
361            // The whole epoch directory is missing
362            missing_epochs.push(candidate_epoch);
363            candidate_epoch += 1;
364            continue;
365        }
366        let success_marker = path.child(success_marker);
367        let get_result = store.get(&success_marker).await;
368        match get_result {
369            Err(Error::NotFound { .. }) => {
370                error!("No success marker found in db checkpoint for epoch: {epoch_num}");
371                missing_epochs.push(*epoch_num);
372            }
373            Err(_) => {
374                // Probably a transient error
375                warn!(
376                    "Failed while trying to read success marker in db checkpoint for epoch: {epoch_num}"
377                );
378            }
379            Ok(_) => {
380                // Nothing to do
381            }
382        }
383        candidate_epoch += 1
384    }
385    missing_epochs.push(candidate_epoch);
386    Ok(missing_epochs)
387}
388
389pub fn get_path(prefix: &str) -> Path {
390    Path::from(prefix)
391}
392
393// Snapshot MANIFEST file is very simple. Just a newline delimited list of all paths in the snapshot directory
394// this simplicty enables easy parsing for scripts to download snapshots
395pub async fn write_snapshot_manifest<S: ObjectStoreListExt + ObjectStorePutExt>(
396    dir: &Path,
397    store: &S,
398    epoch_prefix: String,
399) -> Result<()> {
400    let mut file_names = vec![];
401    let mut paths = store.list_objects(Some(dir)).await;
402    while let Some(res) = paths.next().await {
403        if let Ok(object_metadata) = res {
404            // trim the "epoch_XX/" dir prefix here
405            let mut path_str = object_metadata.location.to_string();
406            if path_str.starts_with(&epoch_prefix) {
407                path_str = String::from(&path_str[epoch_prefix.len()..]);
408                file_names.push(path_str);
409            } else {
410                warn!("{path_str}, should be coming from the files in the {epoch_prefix} dir",)
411            }
412        } else {
413            return Err(res.err().unwrap().into());
414        }
415    }
416
417    let epoch_manifest = PerEpochManifest::new(file_names);
418    let bytes = Bytes::from(epoch_manifest.serialize_as_newline_delimited());
419    put(
420        store,
421        &Path::from(format!("{}/{}", dir, MANIFEST_FILENAME)),
422        bytes,
423    )
424    .await?;
425
426    Ok(())
427}
428
429#[cfg(test)]
430mod tests {
431    use crate::object_store::util::{
432        MANIFEST_FILENAME, copy_recursively, delete_recursively, write_snapshot_manifest,
433    };
434    use object_store::path::Path;
435    use std::fs;
436    use std::num::NonZeroUsize;
437    use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
438    use tempfile::TempDir;
439
440    #[tokio::test]
441    pub async fn test_copy_recursively() -> anyhow::Result<()> {
442        let input = TempDir::new()?;
443        let input_path = input.path();
444        let child = input_path.join("child");
445        fs::create_dir(&child)?;
446        let file1 = child.join("file1");
447        fs::write(file1, b"Lorem ipsum")?;
448        let grandchild = child.join("grand_child");
449        fs::create_dir(&grandchild)?;
450        let file2 = grandchild.join("file2");
451        fs::write(file2, b"Lorem ipsum")?;
452
453        let output = TempDir::new()?;
454        let output_path = output.path();
455
456        let input_store = ObjectStoreConfig {
457            object_store: Some(ObjectStoreType::File),
458            directory: Some(input_path.to_path_buf()),
459            ..Default::default()
460        }
461        .make()?;
462
463        let output_store = ObjectStoreConfig {
464            object_store: Some(ObjectStoreType::File),
465            directory: Some(output_path.to_path_buf()),
466            ..Default::default()
467        }
468        .make()?;
469
470        copy_recursively(
471            &Path::from("child"),
472            &input_store,
473            &output_store,
474            NonZeroUsize::new(1).unwrap(),
475        )
476        .await?;
477
478        assert!(output_path.join("child").exists());
479        assert!(output_path.join("child").join("file1").exists());
480        assert!(output_path.join("child").join("grand_child").exists());
481        assert!(
482            output_path
483                .join("child")
484                .join("grand_child")
485                .join("file2")
486                .exists()
487        );
488        let content = fs::read_to_string(output_path.join("child").join("file1"))?;
489        assert_eq!(content, "Lorem ipsum");
490        let content =
491            fs::read_to_string(output_path.join("child").join("grand_child").join("file2"))?;
492        assert_eq!(content, "Lorem ipsum");
493        Ok(())
494    }
495
496    #[tokio::test]
497    pub async fn test_write_snapshot_manifest() -> anyhow::Result<()> {
498        let input = TempDir::new()?;
499        let input_path = input.path();
500        let epoch_0 = input_path.join("epoch_0");
501        fs::create_dir(&epoch_0)?;
502        let file1 = epoch_0.join("file1");
503        fs::write(file1, b"Lorem ipsum")?;
504        let file2 = epoch_0.join("file2");
505        fs::write(file2, b"Lorem ipsum")?;
506        let grandchild = epoch_0.join("grand_child");
507        fs::create_dir(&grandchild)?;
508        let file3 = grandchild.join("file2.tar.gz");
509        fs::write(file3, b"Lorem ipsum")?;
510
511        let input_store = ObjectStoreConfig {
512            object_store: Some(ObjectStoreType::File),
513            directory: Some(input_path.to_path_buf()),
514            ..Default::default()
515        }
516        .make()?;
517
518        write_snapshot_manifest(
519            &Path::from("epoch_0"),
520            &input_store,
521            String::from("epoch_0/"),
522        )
523        .await?;
524
525        assert!(input_path.join("epoch_0").join(MANIFEST_FILENAME).exists());
526        let content = fs::read_to_string(input_path.join("epoch_0").join(MANIFEST_FILENAME))?;
527        assert!(content.contains("file2"));
528        assert!(content.contains("file1"));
529        assert!(content.contains("grand_child/file2.tar.gz"));
530        Ok(())
531    }
532
533    #[tokio::test]
534    pub async fn test_delete_recursively() -> anyhow::Result<()> {
535        let input = TempDir::new()?;
536        let input_path = input.path();
537        let child = input_path.join("child");
538        fs::create_dir(&child)?;
539        let file1 = child.join("file1");
540        fs::write(file1, b"Lorem ipsum")?;
541        let grandchild = child.join("grand_child");
542        fs::create_dir(&grandchild)?;
543        let file2 = grandchild.join("file2");
544        fs::write(file2, b"Lorem ipsum")?;
545
546        let input_store = ObjectStoreConfig {
547            object_store: Some(ObjectStoreType::File),
548            directory: Some(input_path.to_path_buf()),
549            ..Default::default()
550        }
551        .make()?;
552
553        delete_recursively(
554            &Path::from("child"),
555            &input_store,
556            NonZeroUsize::new(1).unwrap(),
557        )
558        .await?;
559
560        assert!(!input_path.join("child").join("file1").exists());
561        assert!(
562            !input_path
563                .join("child")
564                .join("grand_child")
565                .join("file2")
566                .exists()
567        );
568        Ok(())
569    }
570}