sui_storage/object_store/
util.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::object_store::{
5    ObjectStoreDeleteExt, ObjectStoreGetExt, ObjectStoreListExt, ObjectStorePutExt,
6};
7use anyhow::{Context, Result, anyhow};
8use backoff::ExponentialBackoff;
9use backoff::future::retry;
10use bytes::Bytes;
11use futures::StreamExt;
12use futures::TryStreamExt;
13use indicatif::ProgressBar;
14use itertools::Itertools;
15use mysten_common::ZipDebugEqIteratorExt;
16use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey};
17use object_store::gcp::{GoogleCloudStorageBuilder, GoogleConfigKey};
18use object_store::http::HttpBuilder;
19use object_store::local::LocalFileSystem;
20use object_store::path::Path;
21use object_store::{
22    ClientOptions, DynObjectStore, Error, ObjectStore, ObjectStoreExt, RetryConfig,
23};
24use prost::Message;
25use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
26use serde::{Deserialize, Serialize};
27use serde_json;
28use std::collections::BTreeMap;
29use std::num::NonZeroUsize;
30use std::ops::Range;
31use std::path::PathBuf;
32use std::str::FromStr;
33use std::sync::Arc;
34use std::time::Duration;
35use sui_rpc::proto::sui::rpc::v2 as proto;
36use sui_types::full_checkpoint_content::Checkpoint;
37use sui_types::messages_checkpoint::CheckpointSequenceNumber;
38use tokio::time::Instant;
39use tracing::{error, warn};
40use url::Url;
41
42pub const MANIFEST_FILENAME: &str = "MANIFEST";
43
44#[derive(Serialize, Deserialize)]
45
46pub struct Manifest {
47    pub available_epochs: Vec<u64>,
48}
49
50impl Manifest {
51    pub fn new(available_epochs: Vec<u64>) -> Self {
52        Manifest { available_epochs }
53    }
54
55    pub fn epoch_exists(&self, epoch: u64) -> bool {
56        self.available_epochs.contains(&epoch)
57    }
58}
59
60#[derive(Debug, Clone)]
61pub struct PerEpochManifest {
62    pub lines: Vec<String>,
63}
64
65impl PerEpochManifest {
66    pub fn new(lines: Vec<String>) -> Self {
67        PerEpochManifest { lines }
68    }
69
70    pub fn serialize_as_newline_delimited(&self) -> String {
71        self.lines.join("\n")
72    }
73
74    pub fn deserialize_from_newline_delimited(s: &str) -> PerEpochManifest {
75        PerEpochManifest {
76            lines: s.lines().map(String::from).collect(),
77        }
78    }
79
80    // Method to filter lines by a given prefix
81    pub fn filter_by_prefix(&self, prefix: &str) -> PerEpochManifest {
82        let filtered_lines = self
83            .lines
84            .iter()
85            .filter(|line| line.starts_with(prefix))
86            .cloned()
87            .collect();
88
89        PerEpochManifest {
90            lines: filtered_lines,
91        }
92    }
93}
94
95pub async fn get<S: ObjectStoreGetExt>(store: &S, src: &Path) -> Result<Bytes> {
96    let bytes = retry(backoff::ExponentialBackoff::default(), || async {
97        store.get_bytes(src).await.map_err(|e| {
98            error!("Failed to read file from object store with error: {:?}", &e);
99            backoff::Error::transient(e)
100        })
101    })
102    .await?;
103    Ok(bytes)
104}
105
106pub async fn exists<S: ObjectStoreGetExt>(store: &S, src: &Path) -> bool {
107    store.get_bytes(src).await.is_ok()
108}
109
110pub async fn put<S: ObjectStorePutExt>(store: &S, src: &Path, bytes: Bytes) -> Result<()> {
111    retry(backoff::ExponentialBackoff::default(), || async {
112        if !bytes.is_empty() {
113            store.put_bytes(src, bytes.clone()).await.map_err(|e| {
114                error!("Failed to write file to object store with error: {:?}", &e);
115                backoff::Error::transient(e)
116            })
117        } else {
118            warn!("Not copying empty file: {:?}", src);
119            Ok(())
120        }
121    })
122    .await?;
123    Ok(())
124}
125
126pub async fn copy_file<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
127    src: &Path,
128    dest: &Path,
129    src_store: &S,
130    dest_store: &D,
131) -> Result<()> {
132    let bytes = get(src_store, src).await?;
133    if !bytes.is_empty() {
134        put(dest_store, dest, bytes).await
135    } else {
136        warn!("Not copying empty file: {:?}", src);
137        Ok(())
138    }
139}
140
141pub async fn copy_files<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
142    src: &[Path],
143    dest: &[Path],
144    src_store: &S,
145    dest_store: &D,
146    concurrency: NonZeroUsize,
147    progress_bar: Option<ProgressBar>,
148) -> Result<Vec<()>> {
149    let mut instant = Instant::now();
150    let progress_bar_clone = progress_bar.clone();
151    let results = futures::stream::iter(src.iter().zip_debug_eq(dest.iter()))
152        .map(|(path_in, path_out)| async move {
153            let ret = copy_file(path_in, path_out, src_store, dest_store).await;
154            Ok((path_out.clone(), ret))
155        })
156        .boxed()
157        .buffer_unordered(concurrency.get())
158        .try_for_each(|(path, ret)| {
159            if let Some(progress_bar_clone) = &progress_bar_clone {
160                progress_bar_clone.inc(1);
161                progress_bar_clone.set_message(format!("file: {}", path));
162                instant = Instant::now();
163            }
164            futures::future::ready(ret)
165        })
166        .await;
167    Ok(results.into_iter().collect())
168}
169
170pub async fn copy_recursively<S: ObjectStoreGetExt + ObjectStoreListExt, D: ObjectStorePutExt>(
171    dir: &Path,
172    src_store: &S,
173    dest_store: &D,
174    concurrency: NonZeroUsize,
175) -> Result<Vec<()>> {
176    let mut input_paths = vec![];
177    let mut output_paths = vec![];
178    let mut paths = src_store.list_objects(Some(dir)).await;
179    while let Some(res) = paths.next().await {
180        if let Ok(object_metadata) = res {
181            input_paths.push(object_metadata.location.clone());
182            output_paths.push(object_metadata.location);
183        } else {
184            return Err(res.err().unwrap().into());
185        }
186    }
187    copy_files(
188        &input_paths,
189        &output_paths,
190        src_store,
191        dest_store,
192        concurrency,
193        None,
194    )
195    .await
196}
197
198pub async fn delete_files<S: ObjectStoreDeleteExt>(
199    files: &[Path],
200    store: &S,
201    concurrency: NonZeroUsize,
202) -> Result<Vec<()>> {
203    let results: Vec<Result<()>> = futures::stream::iter(files)
204        .map(|f| {
205            retry(backoff::ExponentialBackoff::default(), || async {
206                store.delete_object(f).await.map_err(|e| {
207                    error!("Failed to delete file on object store with error: {:?}", &e);
208                    backoff::Error::transient(e)
209                })
210            })
211        })
212        .boxed()
213        .buffer_unordered(concurrency.get())
214        .collect()
215        .await;
216    results.into_iter().collect()
217}
218
219pub async fn delete_recursively<S: ObjectStoreDeleteExt + ObjectStoreListExt>(
220    path: &Path,
221    store: &S,
222    concurrency: NonZeroUsize,
223) -> Result<Vec<()>> {
224    let mut paths_to_delete = vec![];
225    let mut paths = store.list_objects(Some(path)).await;
226    while let Some(res) = paths.next().await {
227        if let Ok(object_metadata) = res {
228            paths_to_delete.push(object_metadata.location);
229        } else {
230            return Err(res.err().unwrap().into());
231        }
232    }
233    delete_files(&paths_to_delete, store, concurrency).await
234}
235
236pub fn path_to_filesystem(local_dir_path: PathBuf, location: &Path) -> anyhow::Result<PathBuf> {
237    // Convert an `object_store::path::Path` to `std::path::PathBuf`
238    let path = std::fs::canonicalize(local_dir_path)?;
239    let mut url = Url::from_file_path(&path)
240        .map_err(|_| anyhow!("Failed to parse input path: {}", path.display()))?;
241    url.path_segments_mut()
242        .map_err(|_| anyhow!("Failed to get path segments: {}", path.display()))?
243        .pop_if_empty()
244        .extend(location.parts());
245    let new_path = url
246        .to_file_path()
247        .map_err(|_| anyhow!("Failed to convert url to path: {}", url.as_str()))?;
248    Ok(new_path)
249}
250
251/// This function will find all child directories in the input store which are of the form "epoch_num"
252/// and return a map of epoch number to the directory path
253pub async fn find_all_dirs_with_epoch_prefix(
254    store: &Arc<DynObjectStore>,
255    prefix: Option<&Path>,
256) -> anyhow::Result<BTreeMap<u64, Path>> {
257    let mut dirs = BTreeMap::new();
258    let entries = store.list_with_delimiter(prefix).await?;
259    for entry in entries.common_prefixes {
260        if let Some(filename) = entry.filename() {
261            if !filename.starts_with("epoch_") || filename.ends_with(".tmp") {
262                continue;
263            }
264            let epoch = filename
265                .split_once('_')
266                .context("Failed to split dir name")
267                .map(|(_, epoch)| epoch.parse::<u64>())??;
268            dirs.insert(epoch, entry);
269        }
270    }
271    Ok(dirs)
272}
273
274pub async fn list_all_epochs(object_store: Arc<DynObjectStore>) -> Result<Vec<u64>> {
275    let remote_epoch_dirs = find_all_dirs_with_epoch_prefix(&object_store, None).await?;
276    let mut out = vec![];
277    let mut success_marker_found = false;
278    for (epoch, path) in remote_epoch_dirs.iter().sorted() {
279        let success_marker = path.child("_SUCCESS");
280        let get_result = object_store.get(&success_marker).await;
281        match get_result {
282            Err(_) => {
283                if !success_marker_found {
284                    error!("No success marker found for epoch: {epoch}");
285                }
286            }
287            Ok(_) => {
288                out.push(*epoch);
289                success_marker_found = true;
290            }
291        }
292    }
293
294    // Also check for archived epochs in the archive/ directory
295    let archive_prefix = Path::from("archive");
296    if let Ok(archive_epoch_dirs) =
297        find_all_dirs_with_epoch_prefix(&object_store, Some(&archive_prefix)).await
298    {
299        for (epoch, path) in archive_epoch_dirs.iter().sorted() {
300            let success_marker = path.child("_SUCCESS");
301            let get_result = object_store.get(&success_marker).await;
302            if get_result.is_ok() && !out.contains(epoch) {
303                out.push(*epoch);
304            }
305        }
306    }
307
308    Ok(out)
309}
310
311pub async fn run_manifest_update_loop(
312    store: Arc<DynObjectStore>,
313    mut recv: tokio::sync::broadcast::Receiver<()>,
314) -> Result<()> {
315    let mut update_interval = tokio::time::interval(Duration::from_secs(300));
316    loop {
317        tokio::select! {
318            _now = update_interval.tick() => {
319                if let Ok(epochs) = list_all_epochs(store.clone()).await {
320                    let manifest_path = Path::from(MANIFEST_FILENAME);
321                    let manifest = Manifest::new(epochs);
322                    let bytes = serde_json::to_string(&manifest)?;
323                    put(&store, &manifest_path, Bytes::from(bytes)).await?;
324                }
325            },
326             _ = recv.recv() => break,
327        }
328    }
329    Ok(())
330}
331
332/// This function will find all child directories in the input store which are of the form "epoch_num"
333/// and return a map of epoch number to the directory path
334pub async fn find_all_files_with_epoch_prefix(
335    store: &Arc<DynObjectStore>,
336    prefix: Option<&Path>,
337) -> anyhow::Result<Vec<Range<u64>>> {
338    let mut ranges = Vec::new();
339    let entries = store.list_with_delimiter(prefix).await?;
340    for entry in entries.objects {
341        let checkpoint_seq_range = entry
342            .location
343            .filename()
344            .ok_or(anyhow!("Illegal file name"))?
345            .split_once('.')
346            .context("Failed to split dir name")?
347            .0
348            .split_once('_')
349            .context("Failed to split dir name")
350            .map(|(start, end)| Range {
351                start: start.parse::<u64>().unwrap(),
352                end: end.parse::<u64>().unwrap(),
353            })?;
354
355        ranges.push(checkpoint_seq_range);
356    }
357    Ok(ranges)
358}
359
360/// This function will find missing epoch directories in the input store and return a list of such
361/// epoch numbers. If the highest epoch directory in the store is `epoch_N` then it is expected that the
362/// store will have all epoch directories from `epoch_0` to `epoch_N`. Additionally, any epoch directory
363/// should have the passed in marker file present or else that epoch number is already considered as
364/// missing
365pub async fn find_missing_epochs_dirs(
366    store: &Arc<DynObjectStore>,
367    success_marker: &str,
368) -> anyhow::Result<Vec<u64>> {
369    let remote_checkpoints_by_epoch = find_all_dirs_with_epoch_prefix(store, None).await?;
370    let mut dirs: Vec<_> = remote_checkpoints_by_epoch.iter().collect();
371    dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
372    let mut candidate_epoch: u64 = 0;
373    let mut missing_epochs = Vec::new();
374    for (epoch_num, path) in dirs {
375        while candidate_epoch < *epoch_num {
376            // The whole epoch directory is missing
377            missing_epochs.push(candidate_epoch);
378            candidate_epoch += 1;
379            continue;
380        }
381        let success_marker = path.child(success_marker);
382        let get_result = store.get(&success_marker).await;
383        match get_result {
384            Err(Error::NotFound { .. }) => {
385                error!("No success marker found in db checkpoint for epoch: {epoch_num}");
386                missing_epochs.push(*epoch_num);
387            }
388            Err(_) => {
389                // Probably a transient error
390                warn!(
391                    "Failed while trying to read success marker in db checkpoint for epoch: {epoch_num}"
392                );
393            }
394            Ok(_) => {
395                // Nothing to do
396            }
397        }
398        candidate_epoch += 1
399    }
400    missing_epochs.push(candidate_epoch);
401    Ok(missing_epochs)
402}
403
404pub fn get_path(prefix: &str) -> Path {
405    Path::from(prefix)
406}
407
408// Snapshot MANIFEST file is very simple. Just a newline delimited list of all paths in the snapshot directory
409// this simplicty enables easy parsing for scripts to download snapshots
410pub async fn write_snapshot_manifest<S: ObjectStoreListExt + ObjectStorePutExt>(
411    dir: &Path,
412    store: &S,
413    epoch_prefix: String,
414) -> Result<()> {
415    let mut file_names = vec![];
416    let mut paths = store.list_objects(Some(dir)).await;
417    while let Some(res) = paths.next().await {
418        if let Ok(object_metadata) = res {
419            // trim the "epoch_XX/" dir prefix here
420            let mut path_str = object_metadata.location.to_string();
421            if path_str.starts_with(&epoch_prefix) {
422                path_str = String::from(&path_str[epoch_prefix.len()..]);
423                file_names.push(path_str);
424            } else {
425                warn!("{path_str}, should be coming from the files in the {epoch_prefix} dir",)
426            }
427        } else {
428            return Err(res.err().unwrap().into());
429        }
430    }
431
432    let epoch_manifest = PerEpochManifest::new(file_names);
433    let bytes = Bytes::from(epoch_manifest.serialize_as_newline_delimited());
434    put(
435        store,
436        &Path::from(format!("{}/{}", dir, MANIFEST_FILENAME)),
437        bytes,
438    )
439    .await?;
440
441    Ok(())
442}
443
444pub fn build_object_store(
445    ingestion_url: &str,
446    remote_store_options: Vec<(String, String)>,
447    remote_store_headers: Vec<(String, String)>,
448) -> Arc<dyn ObjectStore> {
449    let timeout_secs = 5;
450    let mut client_options = ClientOptions::new()
451        .with_timeout(Duration::from_secs(timeout_secs))
452        .with_allow_http(true);
453    if !remote_store_headers.is_empty() {
454        let mut headers = HeaderMap::new();
455        for (name, value) in &remote_store_headers {
456            headers.insert(
457                HeaderName::from_bytes(name.as_bytes()).expect("invalid remote store header name"),
458                HeaderValue::from_str(value).expect("invalid remote store header value"),
459            );
460        }
461        client_options = client_options.with_default_headers(headers);
462    }
463    let retry_config = RetryConfig {
464        max_retries: 10,
465        retry_timeout: Duration::from_secs(timeout_secs + 1),
466        ..Default::default()
467    };
468    let url = ingestion_url
469        .parse::<Url>()
470        .expect("archival ingestion url must be valid");
471    if url.scheme() == "file" {
472        Arc::new(
473            LocalFileSystem::new_with_prefix(
474                url.to_file_path()
475                    .expect("archival ingestion url must have a valid file path"),
476            )
477            .expect("failed to create local file system store"),
478        )
479    } else if url.scheme() == "gs" {
480        let mut builder = GoogleCloudStorageBuilder::new()
481            .with_client_options(client_options)
482            .with_retry(retry_config)
483            .with_url(ingestion_url);
484        for (key, value) in &remote_store_options {
485            builder = builder.with_config(
486                GoogleConfigKey::from_str(key).expect("invalid GCS config key"),
487                value.clone(),
488            );
489        }
490        Arc::new(builder.build().expect("failed to build GCS store"))
491    } else if url.host_str().unwrap_or_default().starts_with("s3") {
492        let mut builder = AmazonS3Builder::new()
493            .with_client_options(client_options)
494            .with_retry(retry_config)
495            .with_imdsv1_fallback()
496            .with_url(ingestion_url);
497        for (key, value) in &remote_store_options {
498            builder = builder.with_config(
499                AmazonS3ConfigKey::from_str(key).expect("invalid S3 config key"),
500                value.clone(),
501            );
502        }
503        Arc::new(builder.build().expect("failed to build S3 store"))
504    } else {
505        Arc::new(
506            HttpBuilder::new()
507                .with_url(url.to_string())
508                .with_client_options(client_options)
509                .with_retry(retry_config)
510                .build()
511                .expect("failed to build HTTP store"),
512        )
513    }
514}
515
516pub async fn fetch_checkpoint(
517    store: &Arc<dyn ObjectStore>,
518    seq: u64,
519) -> anyhow::Result<Checkpoint> {
520    let store = store.clone();
521    let request = move || {
522        let store = store.clone();
523        async move {
524            use backoff::Error as BE;
525            let path = Path::from(format!("{seq}.binpb.zst"));
526            let bytes = store
527                .get(&path)
528                .await
529                .map_err(|e| match e {
530                    object_store::Error::NotFound { .. } => {
531                        BE::permanent(anyhow!("Checkpoint {seq} not found in archive"))
532                    }
533                    e => BE::transient(anyhow::Error::from(e)),
534                })?
535                .bytes()
536                .await
537                .map_err(|e| BE::transient(anyhow::Error::from(e)))?;
538            let decompressed =
539                zstd::decode_all(&bytes[..]).map_err(|e| BE::transient(anyhow::Error::from(e)))?;
540            let proto_checkpoint = proto::Checkpoint::decode(&decompressed[..])
541                .map_err(|e| BE::transient(anyhow::Error::from(e)))?;
542            Checkpoint::try_from(&proto_checkpoint).map_err(|e| BE::transient(anyhow!(e)))
543        }
544    };
545    let backoff = ExponentialBackoff {
546        max_elapsed_time: Some(Duration::from_secs(60)),
547        multiplier: 1.0,
548        ..Default::default()
549    };
550    backoff::future::retry(backoff, request).await
551}
552
553pub async fn end_of_epoch_data(
554    url: &str,
555    remote_store_options: Vec<(String, String)>,
556) -> anyhow::Result<Vec<CheckpointSequenceNumber>> {
557    let store = build_object_store(url, remote_store_options, vec![]);
558    let response = store.get(&Path::from("epochs.json")).await?;
559    let bytes = response.bytes().await?;
560    Ok(serde_json::from_slice(&bytes)?)
561}
562
563#[cfg(test)]
564mod tests {
565    use crate::object_store::util::{
566        MANIFEST_FILENAME, copy_recursively, delete_recursively, write_snapshot_manifest,
567    };
568    use object_store::path::Path;
569    use std::fs;
570    use std::num::NonZeroUsize;
571    use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
572    use tempfile::TempDir;
573
574    #[tokio::test]
575    pub async fn test_copy_recursively() -> anyhow::Result<()> {
576        let input = TempDir::new()?;
577        let input_path = input.path();
578        let child = input_path.join("child");
579        fs::create_dir(&child)?;
580        let file1 = child.join("file1");
581        fs::write(file1, b"Lorem ipsum")?;
582        let grandchild = child.join("grand_child");
583        fs::create_dir(&grandchild)?;
584        let file2 = grandchild.join("file2");
585        fs::write(file2, b"Lorem ipsum")?;
586
587        let output = TempDir::new()?;
588        let output_path = output.path();
589
590        let input_store = ObjectStoreConfig {
591            object_store: Some(ObjectStoreType::File),
592            directory: Some(input_path.to_path_buf()),
593            ..Default::default()
594        }
595        .make()?;
596
597        let output_store = ObjectStoreConfig {
598            object_store: Some(ObjectStoreType::File),
599            directory: Some(output_path.to_path_buf()),
600            ..Default::default()
601        }
602        .make()?;
603
604        copy_recursively(
605            &Path::from("child"),
606            &input_store,
607            &output_store,
608            NonZeroUsize::new(1).unwrap(),
609        )
610        .await?;
611
612        assert!(output_path.join("child").exists());
613        assert!(output_path.join("child").join("file1").exists());
614        assert!(output_path.join("child").join("grand_child").exists());
615        assert!(
616            output_path
617                .join("child")
618                .join("grand_child")
619                .join("file2")
620                .exists()
621        );
622        let content = fs::read_to_string(output_path.join("child").join("file1"))?;
623        assert_eq!(content, "Lorem ipsum");
624        let content =
625            fs::read_to_string(output_path.join("child").join("grand_child").join("file2"))?;
626        assert_eq!(content, "Lorem ipsum");
627        Ok(())
628    }
629
630    #[tokio::test]
631    pub async fn test_write_snapshot_manifest() -> anyhow::Result<()> {
632        let input = TempDir::new()?;
633        let input_path = input.path();
634        let epoch_0 = input_path.join("epoch_0");
635        fs::create_dir(&epoch_0)?;
636        let file1 = epoch_0.join("file1");
637        fs::write(file1, b"Lorem ipsum")?;
638        let file2 = epoch_0.join("file2");
639        fs::write(file2, b"Lorem ipsum")?;
640        let grandchild = epoch_0.join("grand_child");
641        fs::create_dir(&grandchild)?;
642        let file3 = grandchild.join("file2.tar.gz");
643        fs::write(file3, b"Lorem ipsum")?;
644
645        let input_store = ObjectStoreConfig {
646            object_store: Some(ObjectStoreType::File),
647            directory: Some(input_path.to_path_buf()),
648            ..Default::default()
649        }
650        .make()?;
651
652        write_snapshot_manifest(
653            &Path::from("epoch_0"),
654            &input_store,
655            String::from("epoch_0/"),
656        )
657        .await?;
658
659        assert!(input_path.join("epoch_0").join(MANIFEST_FILENAME).exists());
660        let content = fs::read_to_string(input_path.join("epoch_0").join(MANIFEST_FILENAME))?;
661        assert!(content.contains("file2"));
662        assert!(content.contains("file1"));
663        assert!(content.contains("grand_child/file2.tar.gz"));
664        Ok(())
665    }
666
667    #[tokio::test]
668    pub async fn test_delete_recursively() -> anyhow::Result<()> {
669        let input = TempDir::new()?;
670        let input_path = input.path();
671        let child = input_path.join("child");
672        fs::create_dir(&child)?;
673        let file1 = child.join("file1");
674        fs::write(file1, b"Lorem ipsum")?;
675        let grandchild = child.join("grand_child");
676        fs::create_dir(&grandchild)?;
677        let file2 = grandchild.join("file2");
678        fs::write(file2, b"Lorem ipsum")?;
679
680        let input_store = ObjectStoreConfig {
681            object_store: Some(ObjectStoreType::File),
682            directory: Some(input_path.to_path_buf()),
683            ..Default::default()
684        }
685        .make()?;
686
687        delete_recursively(
688            &Path::from("child"),
689            &input_store,
690            NonZeroUsize::new(1).unwrap(),
691        )
692        .await?;
693
694        assert!(!input_path.join("child").join("file1").exists());
695        assert!(
696            !input_path
697                .join("child")
698                .join("grand_child")
699                .join("file2")
700                .exists()
701        );
702        Ok(())
703    }
704}