sui_storage/object_store/
util.rs1use crate::object_store::{
5 ObjectStoreDeleteExt, ObjectStoreGetExt, ObjectStoreListExt, ObjectStorePutExt,
6};
7use anyhow::{Context, Result, anyhow};
8use backoff::future::retry;
9use bytes::Bytes;
10use futures::StreamExt;
11use futures::TryStreamExt;
12use indicatif::ProgressBar;
13use itertools::Itertools;
14use object_store::path::Path;
15use object_store::{DynObjectStore, Error, ObjectStore};
16use serde::{Deserialize, Serialize};
17use std::collections::BTreeMap;
18use std::num::NonZeroUsize;
19use std::ops::Range;
20use std::path::PathBuf;
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::time::Instant;
24use tracing::{error, warn};
25use url::Url;
26
27pub const MANIFEST_FILENAME: &str = "MANIFEST";
28
29#[derive(Serialize, Deserialize)]
30
31pub struct Manifest {
32 pub available_epochs: Vec<u64>,
33}
34
35impl Manifest {
36 pub fn new(available_epochs: Vec<u64>) -> Self {
37 Manifest { available_epochs }
38 }
39
40 pub fn epoch_exists(&self, epoch: u64) -> bool {
41 self.available_epochs.contains(&epoch)
42 }
43}
44
45#[derive(Debug, Clone)]
46pub struct PerEpochManifest {
47 pub lines: Vec<String>,
48}
49
50impl PerEpochManifest {
51 pub fn new(lines: Vec<String>) -> Self {
52 PerEpochManifest { lines }
53 }
54
55 pub fn serialize_as_newline_delimited(&self) -> String {
56 self.lines.join("\n")
57 }
58
59 pub fn deserialize_from_newline_delimited(s: &str) -> PerEpochManifest {
60 PerEpochManifest {
61 lines: s.lines().map(String::from).collect(),
62 }
63 }
64
65 pub fn filter_by_prefix(&self, prefix: &str) -> PerEpochManifest {
67 let filtered_lines = self
68 .lines
69 .iter()
70 .filter(|line| line.starts_with(prefix))
71 .cloned()
72 .collect();
73
74 PerEpochManifest {
75 lines: filtered_lines,
76 }
77 }
78}
79
80pub async fn get<S: ObjectStoreGetExt>(store: &S, src: &Path) -> Result<Bytes> {
81 let bytes = retry(backoff::ExponentialBackoff::default(), || async {
82 store.get_bytes(src).await.map_err(|e| {
83 error!("Failed to read file from object store with error: {:?}", &e);
84 backoff::Error::transient(e)
85 })
86 })
87 .await?;
88 Ok(bytes)
89}
90
91pub async fn exists<S: ObjectStoreGetExt>(store: &S, src: &Path) -> bool {
92 store.get_bytes(src).await.is_ok()
93}
94
95pub async fn put<S: ObjectStorePutExt>(store: &S, src: &Path, bytes: Bytes) -> Result<()> {
96 retry(backoff::ExponentialBackoff::default(), || async {
97 if !bytes.is_empty() {
98 store.put_bytes(src, bytes.clone()).await.map_err(|e| {
99 error!("Failed to write file to object store with error: {:?}", &e);
100 backoff::Error::transient(e)
101 })
102 } else {
103 warn!("Not copying empty file: {:?}", src);
104 Ok(())
105 }
106 })
107 .await?;
108 Ok(())
109}
110
111pub async fn copy_file<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
112 src: &Path,
113 dest: &Path,
114 src_store: &S,
115 dest_store: &D,
116) -> Result<()> {
117 let bytes = get(src_store, src).await?;
118 if !bytes.is_empty() {
119 put(dest_store, dest, bytes).await
120 } else {
121 warn!("Not copying empty file: {:?}", src);
122 Ok(())
123 }
124}
125
126pub async fn copy_files<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
127 src: &[Path],
128 dest: &[Path],
129 src_store: &S,
130 dest_store: &D,
131 concurrency: NonZeroUsize,
132 progress_bar: Option<ProgressBar>,
133) -> Result<Vec<()>> {
134 let mut instant = Instant::now();
135 let progress_bar_clone = progress_bar.clone();
136 let results = futures::stream::iter(src.iter().zip(dest.iter()))
137 .map(|(path_in, path_out)| async move {
138 let ret = copy_file(path_in, path_out, src_store, dest_store).await;
139 Ok((path_out.clone(), ret))
140 })
141 .boxed()
142 .buffer_unordered(concurrency.get())
143 .try_for_each(|(path, ret)| {
144 if let Some(progress_bar_clone) = &progress_bar_clone {
145 progress_bar_clone.inc(1);
146 progress_bar_clone.set_message(format!("file: {}", path));
147 instant = Instant::now();
148 }
149 futures::future::ready(ret)
150 })
151 .await;
152 Ok(results.into_iter().collect())
153}
154
155pub async fn copy_recursively<S: ObjectStoreGetExt + ObjectStoreListExt, D: ObjectStorePutExt>(
156 dir: &Path,
157 src_store: &S,
158 dest_store: &D,
159 concurrency: NonZeroUsize,
160) -> Result<Vec<()>> {
161 let mut input_paths = vec![];
162 let mut output_paths = vec![];
163 let mut paths = src_store.list_objects(Some(dir)).await;
164 while let Some(res) = paths.next().await {
165 if let Ok(object_metadata) = res {
166 input_paths.push(object_metadata.location.clone());
167 output_paths.push(object_metadata.location);
168 } else {
169 return Err(res.err().unwrap().into());
170 }
171 }
172 copy_files(
173 &input_paths,
174 &output_paths,
175 src_store,
176 dest_store,
177 concurrency,
178 None,
179 )
180 .await
181}
182
183pub async fn delete_files<S: ObjectStoreDeleteExt>(
184 files: &[Path],
185 store: &S,
186 concurrency: NonZeroUsize,
187) -> Result<Vec<()>> {
188 let results: Vec<Result<()>> = futures::stream::iter(files)
189 .map(|f| {
190 retry(backoff::ExponentialBackoff::default(), || async {
191 store.delete_object(f).await.map_err(|e| {
192 error!("Failed to delete file on object store with error: {:?}", &e);
193 backoff::Error::transient(e)
194 })
195 })
196 })
197 .boxed()
198 .buffer_unordered(concurrency.get())
199 .collect()
200 .await;
201 results.into_iter().collect()
202}
203
204pub async fn delete_recursively<S: ObjectStoreDeleteExt + ObjectStoreListExt>(
205 path: &Path,
206 store: &S,
207 concurrency: NonZeroUsize,
208) -> Result<Vec<()>> {
209 let mut paths_to_delete = vec![];
210 let mut paths = store.list_objects(Some(path)).await;
211 while let Some(res) = paths.next().await {
212 if let Ok(object_metadata) = res {
213 paths_to_delete.push(object_metadata.location);
214 } else {
215 return Err(res.err().unwrap().into());
216 }
217 }
218 delete_files(&paths_to_delete, store, concurrency).await
219}
220
221pub fn path_to_filesystem(local_dir_path: PathBuf, location: &Path) -> anyhow::Result<PathBuf> {
222 let path = std::fs::canonicalize(local_dir_path)?;
224 let mut url = Url::from_file_path(&path)
225 .map_err(|_| anyhow!("Failed to parse input path: {}", path.display()))?;
226 url.path_segments_mut()
227 .map_err(|_| anyhow!("Failed to get path segments: {}", path.display()))?
228 .pop_if_empty()
229 .extend(location.parts());
230 let new_path = url
231 .to_file_path()
232 .map_err(|_| anyhow!("Failed to convert url to path: {}", url.as_str()))?;
233 Ok(new_path)
234}
235
236pub async fn find_all_dirs_with_epoch_prefix(
239 store: &Arc<DynObjectStore>,
240 prefix: Option<&Path>,
241) -> anyhow::Result<BTreeMap<u64, Path>> {
242 let mut dirs = BTreeMap::new();
243 let entries = store.list_with_delimiter(prefix).await?;
244 for entry in entries.common_prefixes {
245 if let Some(filename) = entry.filename() {
246 if !filename.starts_with("epoch_") || filename.ends_with(".tmp") {
247 continue;
248 }
249 let epoch = filename
250 .split_once('_')
251 .context("Failed to split dir name")
252 .map(|(_, epoch)| epoch.parse::<u64>())??;
253 dirs.insert(epoch, entry);
254 }
255 }
256 Ok(dirs)
257}
258
259pub async fn list_all_epochs(object_store: Arc<DynObjectStore>) -> Result<Vec<u64>> {
260 let remote_epoch_dirs = find_all_dirs_with_epoch_prefix(&object_store, None).await?;
261 let mut out = vec![];
262 let mut success_marker_found = false;
263 for (epoch, path) in remote_epoch_dirs.iter().sorted() {
264 let success_marker = path.child("_SUCCESS");
265 let get_result = object_store.get(&success_marker).await;
266 match get_result {
267 Err(_) => {
268 if !success_marker_found {
269 error!("No success marker found for epoch: {epoch}");
270 }
271 }
272 Ok(_) => {
273 out.push(*epoch);
274 success_marker_found = true;
275 }
276 }
277 }
278
279 let archive_prefix = Path::from("archive");
281 if let Ok(archive_epoch_dirs) =
282 find_all_dirs_with_epoch_prefix(&object_store, Some(&archive_prefix)).await
283 {
284 for (epoch, path) in archive_epoch_dirs.iter().sorted() {
285 let success_marker = path.child("_SUCCESS");
286 let get_result = object_store.get(&success_marker).await;
287 if get_result.is_ok() && !out.contains(epoch) {
288 out.push(*epoch);
289 }
290 }
291 }
292
293 Ok(out)
294}
295
296pub async fn run_manifest_update_loop(
297 store: Arc<DynObjectStore>,
298 mut recv: tokio::sync::broadcast::Receiver<()>,
299) -> Result<()> {
300 let mut update_interval = tokio::time::interval(Duration::from_secs(300));
301 loop {
302 tokio::select! {
303 _now = update_interval.tick() => {
304 if let Ok(epochs) = list_all_epochs(store.clone()).await {
305 let manifest_path = Path::from(MANIFEST_FILENAME);
306 let manifest = Manifest::new(epochs);
307 let bytes = serde_json::to_string(&manifest)?;
308 put(&store, &manifest_path, Bytes::from(bytes)).await?;
309 }
310 },
311 _ = recv.recv() => break,
312 }
313 }
314 Ok(())
315}
316
317pub async fn find_all_files_with_epoch_prefix(
320 store: &Arc<DynObjectStore>,
321 prefix: Option<&Path>,
322) -> anyhow::Result<Vec<Range<u64>>> {
323 let mut ranges = Vec::new();
324 let entries = store.list_with_delimiter(prefix).await?;
325 for entry in entries.objects {
326 let checkpoint_seq_range = entry
327 .location
328 .filename()
329 .ok_or(anyhow!("Illegal file name"))?
330 .split_once('.')
331 .context("Failed to split dir name")?
332 .0
333 .split_once('_')
334 .context("Failed to split dir name")
335 .map(|(start, end)| Range {
336 start: start.parse::<u64>().unwrap(),
337 end: end.parse::<u64>().unwrap(),
338 })?;
339
340 ranges.push(checkpoint_seq_range);
341 }
342 Ok(ranges)
343}
344
345pub async fn find_missing_epochs_dirs(
351 store: &Arc<DynObjectStore>,
352 success_marker: &str,
353) -> anyhow::Result<Vec<u64>> {
354 let remote_checkpoints_by_epoch = find_all_dirs_with_epoch_prefix(store, None).await?;
355 let mut dirs: Vec<_> = remote_checkpoints_by_epoch.iter().collect();
356 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
357 let mut candidate_epoch: u64 = 0;
358 let mut missing_epochs = Vec::new();
359 for (epoch_num, path) in dirs {
360 while candidate_epoch < *epoch_num {
361 missing_epochs.push(candidate_epoch);
363 candidate_epoch += 1;
364 continue;
365 }
366 let success_marker = path.child(success_marker);
367 let get_result = store.get(&success_marker).await;
368 match get_result {
369 Err(Error::NotFound { .. }) => {
370 error!("No success marker found in db checkpoint for epoch: {epoch_num}");
371 missing_epochs.push(*epoch_num);
372 }
373 Err(_) => {
374 warn!(
376 "Failed while trying to read success marker in db checkpoint for epoch: {epoch_num}"
377 );
378 }
379 Ok(_) => {
380 }
382 }
383 candidate_epoch += 1
384 }
385 missing_epochs.push(candidate_epoch);
386 Ok(missing_epochs)
387}
388
389pub fn get_path(prefix: &str) -> Path {
390 Path::from(prefix)
391}
392
393pub async fn write_snapshot_manifest<S: ObjectStoreListExt + ObjectStorePutExt>(
396 dir: &Path,
397 store: &S,
398 epoch_prefix: String,
399) -> Result<()> {
400 let mut file_names = vec![];
401 let mut paths = store.list_objects(Some(dir)).await;
402 while let Some(res) = paths.next().await {
403 if let Ok(object_metadata) = res {
404 let mut path_str = object_metadata.location.to_string();
406 if path_str.starts_with(&epoch_prefix) {
407 path_str = String::from(&path_str[epoch_prefix.len()..]);
408 file_names.push(path_str);
409 } else {
410 warn!("{path_str}, should be coming from the files in the {epoch_prefix} dir",)
411 }
412 } else {
413 return Err(res.err().unwrap().into());
414 }
415 }
416
417 let epoch_manifest = PerEpochManifest::new(file_names);
418 let bytes = Bytes::from(epoch_manifest.serialize_as_newline_delimited());
419 put(
420 store,
421 &Path::from(format!("{}/{}", dir, MANIFEST_FILENAME)),
422 bytes,
423 )
424 .await?;
425
426 Ok(())
427}
428
429#[cfg(test)]
430mod tests {
431 use crate::object_store::util::{
432 MANIFEST_FILENAME, copy_recursively, delete_recursively, write_snapshot_manifest,
433 };
434 use object_store::path::Path;
435 use std::fs;
436 use std::num::NonZeroUsize;
437 use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
438 use tempfile::TempDir;
439
440 #[tokio::test]
441 pub async fn test_copy_recursively() -> anyhow::Result<()> {
442 let input = TempDir::new()?;
443 let input_path = input.path();
444 let child = input_path.join("child");
445 fs::create_dir(&child)?;
446 let file1 = child.join("file1");
447 fs::write(file1, b"Lorem ipsum")?;
448 let grandchild = child.join("grand_child");
449 fs::create_dir(&grandchild)?;
450 let file2 = grandchild.join("file2");
451 fs::write(file2, b"Lorem ipsum")?;
452
453 let output = TempDir::new()?;
454 let output_path = output.path();
455
456 let input_store = ObjectStoreConfig {
457 object_store: Some(ObjectStoreType::File),
458 directory: Some(input_path.to_path_buf()),
459 ..Default::default()
460 }
461 .make()?;
462
463 let output_store = ObjectStoreConfig {
464 object_store: Some(ObjectStoreType::File),
465 directory: Some(output_path.to_path_buf()),
466 ..Default::default()
467 }
468 .make()?;
469
470 copy_recursively(
471 &Path::from("child"),
472 &input_store,
473 &output_store,
474 NonZeroUsize::new(1).unwrap(),
475 )
476 .await?;
477
478 assert!(output_path.join("child").exists());
479 assert!(output_path.join("child").join("file1").exists());
480 assert!(output_path.join("child").join("grand_child").exists());
481 assert!(
482 output_path
483 .join("child")
484 .join("grand_child")
485 .join("file2")
486 .exists()
487 );
488 let content = fs::read_to_string(output_path.join("child").join("file1"))?;
489 assert_eq!(content, "Lorem ipsum");
490 let content =
491 fs::read_to_string(output_path.join("child").join("grand_child").join("file2"))?;
492 assert_eq!(content, "Lorem ipsum");
493 Ok(())
494 }
495
496 #[tokio::test]
497 pub async fn test_write_snapshot_manifest() -> anyhow::Result<()> {
498 let input = TempDir::new()?;
499 let input_path = input.path();
500 let epoch_0 = input_path.join("epoch_0");
501 fs::create_dir(&epoch_0)?;
502 let file1 = epoch_0.join("file1");
503 fs::write(file1, b"Lorem ipsum")?;
504 let file2 = epoch_0.join("file2");
505 fs::write(file2, b"Lorem ipsum")?;
506 let grandchild = epoch_0.join("grand_child");
507 fs::create_dir(&grandchild)?;
508 let file3 = grandchild.join("file2.tar.gz");
509 fs::write(file3, b"Lorem ipsum")?;
510
511 let input_store = ObjectStoreConfig {
512 object_store: Some(ObjectStoreType::File),
513 directory: Some(input_path.to_path_buf()),
514 ..Default::default()
515 }
516 .make()?;
517
518 write_snapshot_manifest(
519 &Path::from("epoch_0"),
520 &input_store,
521 String::from("epoch_0/"),
522 )
523 .await?;
524
525 assert!(input_path.join("epoch_0").join(MANIFEST_FILENAME).exists());
526 let content = fs::read_to_string(input_path.join("epoch_0").join(MANIFEST_FILENAME))?;
527 assert!(content.contains("file2"));
528 assert!(content.contains("file1"));
529 assert!(content.contains("grand_child/file2.tar.gz"));
530 Ok(())
531 }
532
533 #[tokio::test]
534 pub async fn test_delete_recursively() -> anyhow::Result<()> {
535 let input = TempDir::new()?;
536 let input_path = input.path();
537 let child = input_path.join("child");
538 fs::create_dir(&child)?;
539 let file1 = child.join("file1");
540 fs::write(file1, b"Lorem ipsum")?;
541 let grandchild = child.join("grand_child");
542 fs::create_dir(&grandchild)?;
543 let file2 = grandchild.join("file2");
544 fs::write(file2, b"Lorem ipsum")?;
545
546 let input_store = ObjectStoreConfig {
547 object_store: Some(ObjectStoreType::File),
548 directory: Some(input_path.to_path_buf()),
549 ..Default::default()
550 }
551 .make()?;
552
553 delete_recursively(
554 &Path::from("child"),
555 &input_store,
556 NonZeroUsize::new(1).unwrap(),
557 )
558 .await?;
559
560 assert!(!input_path.join("child").join("file1").exists());
561 assert!(
562 !input_path
563 .join("child")
564 .join("grand_child")
565 .join("file2")
566 .exists()
567 );
568 Ok(())
569 }
570}