sui_storage/object_store/
util.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::object_store::{
5    ObjectStoreDeleteExt, ObjectStoreGetExt, ObjectStoreListExt, ObjectStorePutExt,
6};
7use anyhow::{Context, Result, anyhow};
8use backoff::ExponentialBackoff;
9use backoff::future::retry;
10use bytes::Bytes;
11use futures::StreamExt;
12use futures::TryStreamExt;
13use indicatif::ProgressBar;
14use itertools::Itertools;
15use mysten_common::ZipDebugEqIteratorExt;
16use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey};
17use object_store::http::HttpBuilder;
18use object_store::local::LocalFileSystem;
19use object_store::path::Path;
20use object_store::{
21    ClientOptions, DynObjectStore, Error, ObjectStore, ObjectStoreExt, RetryConfig,
22};
23use prost::Message;
24use serde::{Deserialize, Serialize};
25use serde_json;
26use std::collections::BTreeMap;
27use std::num::NonZeroUsize;
28use std::ops::Range;
29use std::path::PathBuf;
30use std::str::FromStr;
31use std::sync::Arc;
32use std::time::Duration;
33use sui_rpc::proto::sui::rpc::v2 as proto;
34use sui_types::full_checkpoint_content::Checkpoint;
35use sui_types::messages_checkpoint::CheckpointSequenceNumber;
36use tokio::time::Instant;
37use tracing::{error, warn};
38use url::Url;
39
40pub const MANIFEST_FILENAME: &str = "MANIFEST";
41
42#[derive(Serialize, Deserialize)]
43
44pub struct Manifest {
45    pub available_epochs: Vec<u64>,
46}
47
48impl Manifest {
49    pub fn new(available_epochs: Vec<u64>) -> Self {
50        Manifest { available_epochs }
51    }
52
53    pub fn epoch_exists(&self, epoch: u64) -> bool {
54        self.available_epochs.contains(&epoch)
55    }
56}
57
58#[derive(Debug, Clone)]
59pub struct PerEpochManifest {
60    pub lines: Vec<String>,
61}
62
63impl PerEpochManifest {
64    pub fn new(lines: Vec<String>) -> Self {
65        PerEpochManifest { lines }
66    }
67
68    pub fn serialize_as_newline_delimited(&self) -> String {
69        self.lines.join("\n")
70    }
71
72    pub fn deserialize_from_newline_delimited(s: &str) -> PerEpochManifest {
73        PerEpochManifest {
74            lines: s.lines().map(String::from).collect(),
75        }
76    }
77
78    // Method to filter lines by a given prefix
79    pub fn filter_by_prefix(&self, prefix: &str) -> PerEpochManifest {
80        let filtered_lines = self
81            .lines
82            .iter()
83            .filter(|line| line.starts_with(prefix))
84            .cloned()
85            .collect();
86
87        PerEpochManifest {
88            lines: filtered_lines,
89        }
90    }
91}
92
93pub async fn get<S: ObjectStoreGetExt>(store: &S, src: &Path) -> Result<Bytes> {
94    let bytes = retry(backoff::ExponentialBackoff::default(), || async {
95        store.get_bytes(src).await.map_err(|e| {
96            error!("Failed to read file from object store with error: {:?}", &e);
97            backoff::Error::transient(e)
98        })
99    })
100    .await?;
101    Ok(bytes)
102}
103
104pub async fn exists<S: ObjectStoreGetExt>(store: &S, src: &Path) -> bool {
105    store.get_bytes(src).await.is_ok()
106}
107
108pub async fn put<S: ObjectStorePutExt>(store: &S, src: &Path, bytes: Bytes) -> Result<()> {
109    retry(backoff::ExponentialBackoff::default(), || async {
110        if !bytes.is_empty() {
111            store.put_bytes(src, bytes.clone()).await.map_err(|e| {
112                error!("Failed to write file to object store with error: {:?}", &e);
113                backoff::Error::transient(e)
114            })
115        } else {
116            warn!("Not copying empty file: {:?}", src);
117            Ok(())
118        }
119    })
120    .await?;
121    Ok(())
122}
123
124pub async fn copy_file<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
125    src: &Path,
126    dest: &Path,
127    src_store: &S,
128    dest_store: &D,
129) -> Result<()> {
130    let bytes = get(src_store, src).await?;
131    if !bytes.is_empty() {
132        put(dest_store, dest, bytes).await
133    } else {
134        warn!("Not copying empty file: {:?}", src);
135        Ok(())
136    }
137}
138
139pub async fn copy_files<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
140    src: &[Path],
141    dest: &[Path],
142    src_store: &S,
143    dest_store: &D,
144    concurrency: NonZeroUsize,
145    progress_bar: Option<ProgressBar>,
146) -> Result<Vec<()>> {
147    let mut instant = Instant::now();
148    let progress_bar_clone = progress_bar.clone();
149    let results = futures::stream::iter(src.iter().zip_debug_eq(dest.iter()))
150        .map(|(path_in, path_out)| async move {
151            let ret = copy_file(path_in, path_out, src_store, dest_store).await;
152            Ok((path_out.clone(), ret))
153        })
154        .boxed()
155        .buffer_unordered(concurrency.get())
156        .try_for_each(|(path, ret)| {
157            if let Some(progress_bar_clone) = &progress_bar_clone {
158                progress_bar_clone.inc(1);
159                progress_bar_clone.set_message(format!("file: {}", path));
160                instant = Instant::now();
161            }
162            futures::future::ready(ret)
163        })
164        .await;
165    Ok(results.into_iter().collect())
166}
167
168pub async fn copy_recursively<S: ObjectStoreGetExt + ObjectStoreListExt, D: ObjectStorePutExt>(
169    dir: &Path,
170    src_store: &S,
171    dest_store: &D,
172    concurrency: NonZeroUsize,
173) -> Result<Vec<()>> {
174    let mut input_paths = vec![];
175    let mut output_paths = vec![];
176    let mut paths = src_store.list_objects(Some(dir)).await;
177    while let Some(res) = paths.next().await {
178        if let Ok(object_metadata) = res {
179            input_paths.push(object_metadata.location.clone());
180            output_paths.push(object_metadata.location);
181        } else {
182            return Err(res.err().unwrap().into());
183        }
184    }
185    copy_files(
186        &input_paths,
187        &output_paths,
188        src_store,
189        dest_store,
190        concurrency,
191        None,
192    )
193    .await
194}
195
196pub async fn delete_files<S: ObjectStoreDeleteExt>(
197    files: &[Path],
198    store: &S,
199    concurrency: NonZeroUsize,
200) -> Result<Vec<()>> {
201    let results: Vec<Result<()>> = futures::stream::iter(files)
202        .map(|f| {
203            retry(backoff::ExponentialBackoff::default(), || async {
204                store.delete_object(f).await.map_err(|e| {
205                    error!("Failed to delete file on object store with error: {:?}", &e);
206                    backoff::Error::transient(e)
207                })
208            })
209        })
210        .boxed()
211        .buffer_unordered(concurrency.get())
212        .collect()
213        .await;
214    results.into_iter().collect()
215}
216
217pub async fn delete_recursively<S: ObjectStoreDeleteExt + ObjectStoreListExt>(
218    path: &Path,
219    store: &S,
220    concurrency: NonZeroUsize,
221) -> Result<Vec<()>> {
222    let mut paths_to_delete = vec![];
223    let mut paths = store.list_objects(Some(path)).await;
224    while let Some(res) = paths.next().await {
225        if let Ok(object_metadata) = res {
226            paths_to_delete.push(object_metadata.location);
227        } else {
228            return Err(res.err().unwrap().into());
229        }
230    }
231    delete_files(&paths_to_delete, store, concurrency).await
232}
233
234pub fn path_to_filesystem(local_dir_path: PathBuf, location: &Path) -> anyhow::Result<PathBuf> {
235    // Convert an `object_store::path::Path` to `std::path::PathBuf`
236    let path = std::fs::canonicalize(local_dir_path)?;
237    let mut url = Url::from_file_path(&path)
238        .map_err(|_| anyhow!("Failed to parse input path: {}", path.display()))?;
239    url.path_segments_mut()
240        .map_err(|_| anyhow!("Failed to get path segments: {}", path.display()))?
241        .pop_if_empty()
242        .extend(location.parts());
243    let new_path = url
244        .to_file_path()
245        .map_err(|_| anyhow!("Failed to convert url to path: {}", url.as_str()))?;
246    Ok(new_path)
247}
248
249/// This function will find all child directories in the input store which are of the form "epoch_num"
250/// and return a map of epoch number to the directory path
251pub async fn find_all_dirs_with_epoch_prefix(
252    store: &Arc<DynObjectStore>,
253    prefix: Option<&Path>,
254) -> anyhow::Result<BTreeMap<u64, Path>> {
255    let mut dirs = BTreeMap::new();
256    let entries = store.list_with_delimiter(prefix).await?;
257    for entry in entries.common_prefixes {
258        if let Some(filename) = entry.filename() {
259            if !filename.starts_with("epoch_") || filename.ends_with(".tmp") {
260                continue;
261            }
262            let epoch = filename
263                .split_once('_')
264                .context("Failed to split dir name")
265                .map(|(_, epoch)| epoch.parse::<u64>())??;
266            dirs.insert(epoch, entry);
267        }
268    }
269    Ok(dirs)
270}
271
272pub async fn list_all_epochs(object_store: Arc<DynObjectStore>) -> Result<Vec<u64>> {
273    let remote_epoch_dirs = find_all_dirs_with_epoch_prefix(&object_store, None).await?;
274    let mut out = vec![];
275    let mut success_marker_found = false;
276    for (epoch, path) in remote_epoch_dirs.iter().sorted() {
277        let success_marker = path.child("_SUCCESS");
278        let get_result = object_store.get(&success_marker).await;
279        match get_result {
280            Err(_) => {
281                if !success_marker_found {
282                    error!("No success marker found for epoch: {epoch}");
283                }
284            }
285            Ok(_) => {
286                out.push(*epoch);
287                success_marker_found = true;
288            }
289        }
290    }
291
292    // Also check for archived epochs in the archive/ directory
293    let archive_prefix = Path::from("archive");
294    if let Ok(archive_epoch_dirs) =
295        find_all_dirs_with_epoch_prefix(&object_store, Some(&archive_prefix)).await
296    {
297        for (epoch, path) in archive_epoch_dirs.iter().sorted() {
298            let success_marker = path.child("_SUCCESS");
299            let get_result = object_store.get(&success_marker).await;
300            if get_result.is_ok() && !out.contains(epoch) {
301                out.push(*epoch);
302            }
303        }
304    }
305
306    Ok(out)
307}
308
309pub async fn run_manifest_update_loop(
310    store: Arc<DynObjectStore>,
311    mut recv: tokio::sync::broadcast::Receiver<()>,
312) -> Result<()> {
313    let mut update_interval = tokio::time::interval(Duration::from_secs(300));
314    loop {
315        tokio::select! {
316            _now = update_interval.tick() => {
317                if let Ok(epochs) = list_all_epochs(store.clone()).await {
318                    let manifest_path = Path::from(MANIFEST_FILENAME);
319                    let manifest = Manifest::new(epochs);
320                    let bytes = serde_json::to_string(&manifest)?;
321                    put(&store, &manifest_path, Bytes::from(bytes)).await?;
322                }
323            },
324             _ = recv.recv() => break,
325        }
326    }
327    Ok(())
328}
329
330/// This function will find all child directories in the input store which are of the form "epoch_num"
331/// and return a map of epoch number to the directory path
332pub async fn find_all_files_with_epoch_prefix(
333    store: &Arc<DynObjectStore>,
334    prefix: Option<&Path>,
335) -> anyhow::Result<Vec<Range<u64>>> {
336    let mut ranges = Vec::new();
337    let entries = store.list_with_delimiter(prefix).await?;
338    for entry in entries.objects {
339        let checkpoint_seq_range = entry
340            .location
341            .filename()
342            .ok_or(anyhow!("Illegal file name"))?
343            .split_once('.')
344            .context("Failed to split dir name")?
345            .0
346            .split_once('_')
347            .context("Failed to split dir name")
348            .map(|(start, end)| Range {
349                start: start.parse::<u64>().unwrap(),
350                end: end.parse::<u64>().unwrap(),
351            })?;
352
353        ranges.push(checkpoint_seq_range);
354    }
355    Ok(ranges)
356}
357
358/// This function will find missing epoch directories in the input store and return a list of such
359/// epoch numbers. If the highest epoch directory in the store is `epoch_N` then it is expected that the
360/// store will have all epoch directories from `epoch_0` to `epoch_N`. Additionally, any epoch directory
361/// should have the passed in marker file present or else that epoch number is already considered as
362/// missing
363pub async fn find_missing_epochs_dirs(
364    store: &Arc<DynObjectStore>,
365    success_marker: &str,
366) -> anyhow::Result<Vec<u64>> {
367    let remote_checkpoints_by_epoch = find_all_dirs_with_epoch_prefix(store, None).await?;
368    let mut dirs: Vec<_> = remote_checkpoints_by_epoch.iter().collect();
369    dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
370    let mut candidate_epoch: u64 = 0;
371    let mut missing_epochs = Vec::new();
372    for (epoch_num, path) in dirs {
373        while candidate_epoch < *epoch_num {
374            // The whole epoch directory is missing
375            missing_epochs.push(candidate_epoch);
376            candidate_epoch += 1;
377            continue;
378        }
379        let success_marker = path.child(success_marker);
380        let get_result = store.get(&success_marker).await;
381        match get_result {
382            Err(Error::NotFound { .. }) => {
383                error!("No success marker found in db checkpoint for epoch: {epoch_num}");
384                missing_epochs.push(*epoch_num);
385            }
386            Err(_) => {
387                // Probably a transient error
388                warn!(
389                    "Failed while trying to read success marker in db checkpoint for epoch: {epoch_num}"
390                );
391            }
392            Ok(_) => {
393                // Nothing to do
394            }
395        }
396        candidate_epoch += 1
397    }
398    missing_epochs.push(candidate_epoch);
399    Ok(missing_epochs)
400}
401
402pub fn get_path(prefix: &str) -> Path {
403    Path::from(prefix)
404}
405
406// Snapshot MANIFEST file is very simple. Just a newline delimited list of all paths in the snapshot directory
407// this simplicty enables easy parsing for scripts to download snapshots
408pub async fn write_snapshot_manifest<S: ObjectStoreListExt + ObjectStorePutExt>(
409    dir: &Path,
410    store: &S,
411    epoch_prefix: String,
412) -> Result<()> {
413    let mut file_names = vec![];
414    let mut paths = store.list_objects(Some(dir)).await;
415    while let Some(res) = paths.next().await {
416        if let Ok(object_metadata) = res {
417            // trim the "epoch_XX/" dir prefix here
418            let mut path_str = object_metadata.location.to_string();
419            if path_str.starts_with(&epoch_prefix) {
420                path_str = String::from(&path_str[epoch_prefix.len()..]);
421                file_names.push(path_str);
422            } else {
423                warn!("{path_str}, should be coming from the files in the {epoch_prefix} dir",)
424            }
425        } else {
426            return Err(res.err().unwrap().into());
427        }
428    }
429
430    let epoch_manifest = PerEpochManifest::new(file_names);
431    let bytes = Bytes::from(epoch_manifest.serialize_as_newline_delimited());
432    put(
433        store,
434        &Path::from(format!("{}/{}", dir, MANIFEST_FILENAME)),
435        bytes,
436    )
437    .await?;
438
439    Ok(())
440}
441
442pub fn build_object_store(
443    ingestion_url: &str,
444    remote_store_options: Vec<(String, String)>,
445) -> Arc<dyn ObjectStore> {
446    let timeout_secs = 5;
447    let client_options = ClientOptions::new()
448        .with_timeout(Duration::from_secs(timeout_secs))
449        .with_allow_http(true);
450    let retry_config = RetryConfig {
451        max_retries: 10,
452        retry_timeout: Duration::from_secs(timeout_secs + 1),
453        ..Default::default()
454    };
455    let url = ingestion_url
456        .parse::<Url>()
457        .expect("archival ingestion url must be valid");
458    if url.scheme() == "file" {
459        Arc::new(
460            LocalFileSystem::new_with_prefix(
461                url.to_file_path()
462                    .expect("archival ingestion url must have a valid file path"),
463            )
464            .expect("failed to create local file system store"),
465        )
466    } else if url.host_str().unwrap_or_default().starts_with("s3") {
467        let mut builder = AmazonS3Builder::new()
468            .with_client_options(client_options)
469            .with_retry(retry_config)
470            .with_imdsv1_fallback()
471            .with_url(ingestion_url);
472        for (key, value) in &remote_store_options {
473            builder = builder.with_config(
474                AmazonS3ConfigKey::from_str(key).expect("invalid S3 config key"),
475                value.clone(),
476            );
477        }
478        Arc::new(builder.build().expect("failed to build S3 store"))
479    } else {
480        Arc::new(
481            HttpBuilder::new()
482                .with_url(url.to_string())
483                .with_client_options(client_options)
484                .with_retry(retry_config)
485                .build()
486                .expect("failed to build HTTP store"),
487        )
488    }
489}
490
491pub async fn fetch_checkpoint(
492    store: &Arc<dyn ObjectStore>,
493    seq: u64,
494) -> anyhow::Result<Checkpoint> {
495    let store = store.clone();
496    let request = move || {
497        let store = store.clone();
498        async move {
499            use backoff::Error as BE;
500            let path = Path::from(format!("{seq}.binpb.zst"));
501            let bytes = store
502                .get(&path)
503                .await
504                .map_err(|e| match e {
505                    object_store::Error::NotFound { .. } => {
506                        BE::permanent(anyhow!("Checkpoint {seq} not found in archive"))
507                    }
508                    e => BE::transient(anyhow::Error::from(e)),
509                })?
510                .bytes()
511                .await
512                .map_err(|e| BE::transient(anyhow::Error::from(e)))?;
513            let decompressed =
514                zstd::decode_all(&bytes[..]).map_err(|e| BE::transient(anyhow::Error::from(e)))?;
515            let proto_checkpoint = proto::Checkpoint::decode(&decompressed[..])
516                .map_err(|e| BE::transient(anyhow::Error::from(e)))?;
517            Checkpoint::try_from(&proto_checkpoint).map_err(|e| BE::transient(anyhow!(e)))
518        }
519    };
520    let backoff = ExponentialBackoff {
521        max_elapsed_time: Some(Duration::from_secs(60)),
522        multiplier: 1.0,
523        ..Default::default()
524    };
525    backoff::future::retry(backoff, request).await
526}
527
528pub async fn end_of_epoch_data(
529    url: &str,
530    remote_store_options: Vec<(String, String)>,
531) -> anyhow::Result<Vec<CheckpointSequenceNumber>> {
532    let store = build_object_store(url, remote_store_options);
533    let response = store.get(&Path::from("epochs.json")).await?;
534    let bytes = response.bytes().await?;
535    Ok(serde_json::from_slice(&bytes)?)
536}
537
538#[cfg(test)]
539mod tests {
540    use crate::object_store::util::{
541        MANIFEST_FILENAME, copy_recursively, delete_recursively, write_snapshot_manifest,
542    };
543    use object_store::path::Path;
544    use std::fs;
545    use std::num::NonZeroUsize;
546    use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
547    use tempfile::TempDir;
548
549    #[tokio::test]
550    pub async fn test_copy_recursively() -> anyhow::Result<()> {
551        let input = TempDir::new()?;
552        let input_path = input.path();
553        let child = input_path.join("child");
554        fs::create_dir(&child)?;
555        let file1 = child.join("file1");
556        fs::write(file1, b"Lorem ipsum")?;
557        let grandchild = child.join("grand_child");
558        fs::create_dir(&grandchild)?;
559        let file2 = grandchild.join("file2");
560        fs::write(file2, b"Lorem ipsum")?;
561
562        let output = TempDir::new()?;
563        let output_path = output.path();
564
565        let input_store = ObjectStoreConfig {
566            object_store: Some(ObjectStoreType::File),
567            directory: Some(input_path.to_path_buf()),
568            ..Default::default()
569        }
570        .make()?;
571
572        let output_store = ObjectStoreConfig {
573            object_store: Some(ObjectStoreType::File),
574            directory: Some(output_path.to_path_buf()),
575            ..Default::default()
576        }
577        .make()?;
578
579        copy_recursively(
580            &Path::from("child"),
581            &input_store,
582            &output_store,
583            NonZeroUsize::new(1).unwrap(),
584        )
585        .await?;
586
587        assert!(output_path.join("child").exists());
588        assert!(output_path.join("child").join("file1").exists());
589        assert!(output_path.join("child").join("grand_child").exists());
590        assert!(
591            output_path
592                .join("child")
593                .join("grand_child")
594                .join("file2")
595                .exists()
596        );
597        let content = fs::read_to_string(output_path.join("child").join("file1"))?;
598        assert_eq!(content, "Lorem ipsum");
599        let content =
600            fs::read_to_string(output_path.join("child").join("grand_child").join("file2"))?;
601        assert_eq!(content, "Lorem ipsum");
602        Ok(())
603    }
604
605    #[tokio::test]
606    pub async fn test_write_snapshot_manifest() -> anyhow::Result<()> {
607        let input = TempDir::new()?;
608        let input_path = input.path();
609        let epoch_0 = input_path.join("epoch_0");
610        fs::create_dir(&epoch_0)?;
611        let file1 = epoch_0.join("file1");
612        fs::write(file1, b"Lorem ipsum")?;
613        let file2 = epoch_0.join("file2");
614        fs::write(file2, b"Lorem ipsum")?;
615        let grandchild = epoch_0.join("grand_child");
616        fs::create_dir(&grandchild)?;
617        let file3 = grandchild.join("file2.tar.gz");
618        fs::write(file3, b"Lorem ipsum")?;
619
620        let input_store = ObjectStoreConfig {
621            object_store: Some(ObjectStoreType::File),
622            directory: Some(input_path.to_path_buf()),
623            ..Default::default()
624        }
625        .make()?;
626
627        write_snapshot_manifest(
628            &Path::from("epoch_0"),
629            &input_store,
630            String::from("epoch_0/"),
631        )
632        .await?;
633
634        assert!(input_path.join("epoch_0").join(MANIFEST_FILENAME).exists());
635        let content = fs::read_to_string(input_path.join("epoch_0").join(MANIFEST_FILENAME))?;
636        assert!(content.contains("file2"));
637        assert!(content.contains("file1"));
638        assert!(content.contains("grand_child/file2.tar.gz"));
639        Ok(())
640    }
641
642    #[tokio::test]
643    pub async fn test_delete_recursively() -> anyhow::Result<()> {
644        let input = TempDir::new()?;
645        let input_path = input.path();
646        let child = input_path.join("child");
647        fs::create_dir(&child)?;
648        let file1 = child.join("file1");
649        fs::write(file1, b"Lorem ipsum")?;
650        let grandchild = child.join("grand_child");
651        fs::create_dir(&grandchild)?;
652        let file2 = grandchild.join("file2");
653        fs::write(file2, b"Lorem ipsum")?;
654
655        let input_store = ObjectStoreConfig {
656            object_store: Some(ObjectStoreType::File),
657            directory: Some(input_path.to_path_buf()),
658            ..Default::default()
659        }
660        .make()?;
661
662        delete_recursively(
663            &Path::from("child"),
664            &input_store,
665            NonZeroUsize::new(1).unwrap(),
666        )
667        .await?;
668
669        assert!(!input_path.join("child").join("file1").exists());
670        assert!(
671            !input_path
672                .join("child")
673                .join("grand_child")
674                .join("file2")
675                .exists()
676        );
677        Ok(())
678    }
679}