1use crate::object_store::{
5 ObjectStoreDeleteExt, ObjectStoreGetExt, ObjectStoreListExt, ObjectStorePutExt,
6};
7use anyhow::{Context, Result, anyhow};
8use backoff::ExponentialBackoff;
9use backoff::future::retry;
10use bytes::Bytes;
11use futures::StreamExt;
12use futures::TryStreamExt;
13use indicatif::ProgressBar;
14use itertools::Itertools;
15use mysten_common::ZipDebugEqIteratorExt;
16use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey};
17use object_store::http::HttpBuilder;
18use object_store::local::LocalFileSystem;
19use object_store::path::Path;
20use object_store::{
21 ClientOptions, DynObjectStore, Error, ObjectStore, ObjectStoreExt, RetryConfig,
22};
23use prost::Message;
24use serde::{Deserialize, Serialize};
25use serde_json;
26use std::collections::BTreeMap;
27use std::num::NonZeroUsize;
28use std::ops::Range;
29use std::path::PathBuf;
30use std::str::FromStr;
31use std::sync::Arc;
32use std::time::Duration;
33use sui_rpc::proto::sui::rpc::v2 as proto;
34use sui_types::full_checkpoint_content::Checkpoint;
35use sui_types::messages_checkpoint::CheckpointSequenceNumber;
36use tokio::time::Instant;
37use tracing::{error, warn};
38use url::Url;
39
40pub const MANIFEST_FILENAME: &str = "MANIFEST";
41
42#[derive(Serialize, Deserialize)]
43
44pub struct Manifest {
45 pub available_epochs: Vec<u64>,
46}
47
48impl Manifest {
49 pub fn new(available_epochs: Vec<u64>) -> Self {
50 Manifest { available_epochs }
51 }
52
53 pub fn epoch_exists(&self, epoch: u64) -> bool {
54 self.available_epochs.contains(&epoch)
55 }
56}
57
58#[derive(Debug, Clone)]
59pub struct PerEpochManifest {
60 pub lines: Vec<String>,
61}
62
63impl PerEpochManifest {
64 pub fn new(lines: Vec<String>) -> Self {
65 PerEpochManifest { lines }
66 }
67
68 pub fn serialize_as_newline_delimited(&self) -> String {
69 self.lines.join("\n")
70 }
71
72 pub fn deserialize_from_newline_delimited(s: &str) -> PerEpochManifest {
73 PerEpochManifest {
74 lines: s.lines().map(String::from).collect(),
75 }
76 }
77
78 pub fn filter_by_prefix(&self, prefix: &str) -> PerEpochManifest {
80 let filtered_lines = self
81 .lines
82 .iter()
83 .filter(|line| line.starts_with(prefix))
84 .cloned()
85 .collect();
86
87 PerEpochManifest {
88 lines: filtered_lines,
89 }
90 }
91}
92
93pub async fn get<S: ObjectStoreGetExt>(store: &S, src: &Path) -> Result<Bytes> {
94 let bytes = retry(backoff::ExponentialBackoff::default(), || async {
95 store.get_bytes(src).await.map_err(|e| {
96 error!("Failed to read file from object store with error: {:?}", &e);
97 backoff::Error::transient(e)
98 })
99 })
100 .await?;
101 Ok(bytes)
102}
103
104pub async fn exists<S: ObjectStoreGetExt>(store: &S, src: &Path) -> bool {
105 store.get_bytes(src).await.is_ok()
106}
107
108pub async fn put<S: ObjectStorePutExt>(store: &S, src: &Path, bytes: Bytes) -> Result<()> {
109 retry(backoff::ExponentialBackoff::default(), || async {
110 if !bytes.is_empty() {
111 store.put_bytes(src, bytes.clone()).await.map_err(|e| {
112 error!("Failed to write file to object store with error: {:?}", &e);
113 backoff::Error::transient(e)
114 })
115 } else {
116 warn!("Not copying empty file: {:?}", src);
117 Ok(())
118 }
119 })
120 .await?;
121 Ok(())
122}
123
124pub async fn copy_file<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
125 src: &Path,
126 dest: &Path,
127 src_store: &S,
128 dest_store: &D,
129) -> Result<()> {
130 let bytes = get(src_store, src).await?;
131 if !bytes.is_empty() {
132 put(dest_store, dest, bytes).await
133 } else {
134 warn!("Not copying empty file: {:?}", src);
135 Ok(())
136 }
137}
138
139pub async fn copy_files<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
140 src: &[Path],
141 dest: &[Path],
142 src_store: &S,
143 dest_store: &D,
144 concurrency: NonZeroUsize,
145 progress_bar: Option<ProgressBar>,
146) -> Result<Vec<()>> {
147 let mut instant = Instant::now();
148 let progress_bar_clone = progress_bar.clone();
149 let results = futures::stream::iter(src.iter().zip_debug_eq(dest.iter()))
150 .map(|(path_in, path_out)| async move {
151 let ret = copy_file(path_in, path_out, src_store, dest_store).await;
152 Ok((path_out.clone(), ret))
153 })
154 .boxed()
155 .buffer_unordered(concurrency.get())
156 .try_for_each(|(path, ret)| {
157 if let Some(progress_bar_clone) = &progress_bar_clone {
158 progress_bar_clone.inc(1);
159 progress_bar_clone.set_message(format!("file: {}", path));
160 instant = Instant::now();
161 }
162 futures::future::ready(ret)
163 })
164 .await;
165 Ok(results.into_iter().collect())
166}
167
168pub async fn copy_recursively<S: ObjectStoreGetExt + ObjectStoreListExt, D: ObjectStorePutExt>(
169 dir: &Path,
170 src_store: &S,
171 dest_store: &D,
172 concurrency: NonZeroUsize,
173) -> Result<Vec<()>> {
174 let mut input_paths = vec![];
175 let mut output_paths = vec![];
176 let mut paths = src_store.list_objects(Some(dir)).await;
177 while let Some(res) = paths.next().await {
178 if let Ok(object_metadata) = res {
179 input_paths.push(object_metadata.location.clone());
180 output_paths.push(object_metadata.location);
181 } else {
182 return Err(res.err().unwrap().into());
183 }
184 }
185 copy_files(
186 &input_paths,
187 &output_paths,
188 src_store,
189 dest_store,
190 concurrency,
191 None,
192 )
193 .await
194}
195
196pub async fn delete_files<S: ObjectStoreDeleteExt>(
197 files: &[Path],
198 store: &S,
199 concurrency: NonZeroUsize,
200) -> Result<Vec<()>> {
201 let results: Vec<Result<()>> = futures::stream::iter(files)
202 .map(|f| {
203 retry(backoff::ExponentialBackoff::default(), || async {
204 store.delete_object(f).await.map_err(|e| {
205 error!("Failed to delete file on object store with error: {:?}", &e);
206 backoff::Error::transient(e)
207 })
208 })
209 })
210 .boxed()
211 .buffer_unordered(concurrency.get())
212 .collect()
213 .await;
214 results.into_iter().collect()
215}
216
217pub async fn delete_recursively<S: ObjectStoreDeleteExt + ObjectStoreListExt>(
218 path: &Path,
219 store: &S,
220 concurrency: NonZeroUsize,
221) -> Result<Vec<()>> {
222 let mut paths_to_delete = vec![];
223 let mut paths = store.list_objects(Some(path)).await;
224 while let Some(res) = paths.next().await {
225 if let Ok(object_metadata) = res {
226 paths_to_delete.push(object_metadata.location);
227 } else {
228 return Err(res.err().unwrap().into());
229 }
230 }
231 delete_files(&paths_to_delete, store, concurrency).await
232}
233
234pub fn path_to_filesystem(local_dir_path: PathBuf, location: &Path) -> anyhow::Result<PathBuf> {
235 let path = std::fs::canonicalize(local_dir_path)?;
237 let mut url = Url::from_file_path(&path)
238 .map_err(|_| anyhow!("Failed to parse input path: {}", path.display()))?;
239 url.path_segments_mut()
240 .map_err(|_| anyhow!("Failed to get path segments: {}", path.display()))?
241 .pop_if_empty()
242 .extend(location.parts());
243 let new_path = url
244 .to_file_path()
245 .map_err(|_| anyhow!("Failed to convert url to path: {}", url.as_str()))?;
246 Ok(new_path)
247}
248
249pub async fn find_all_dirs_with_epoch_prefix(
252 store: &Arc<DynObjectStore>,
253 prefix: Option<&Path>,
254) -> anyhow::Result<BTreeMap<u64, Path>> {
255 let mut dirs = BTreeMap::new();
256 let entries = store.list_with_delimiter(prefix).await?;
257 for entry in entries.common_prefixes {
258 if let Some(filename) = entry.filename() {
259 if !filename.starts_with("epoch_") || filename.ends_with(".tmp") {
260 continue;
261 }
262 let epoch = filename
263 .split_once('_')
264 .context("Failed to split dir name")
265 .map(|(_, epoch)| epoch.parse::<u64>())??;
266 dirs.insert(epoch, entry);
267 }
268 }
269 Ok(dirs)
270}
271
272pub async fn list_all_epochs(object_store: Arc<DynObjectStore>) -> Result<Vec<u64>> {
273 let remote_epoch_dirs = find_all_dirs_with_epoch_prefix(&object_store, None).await?;
274 let mut out = vec![];
275 let mut success_marker_found = false;
276 for (epoch, path) in remote_epoch_dirs.iter().sorted() {
277 let success_marker = path.child("_SUCCESS");
278 let get_result = object_store.get(&success_marker).await;
279 match get_result {
280 Err(_) => {
281 if !success_marker_found {
282 error!("No success marker found for epoch: {epoch}");
283 }
284 }
285 Ok(_) => {
286 out.push(*epoch);
287 success_marker_found = true;
288 }
289 }
290 }
291
292 let archive_prefix = Path::from("archive");
294 if let Ok(archive_epoch_dirs) =
295 find_all_dirs_with_epoch_prefix(&object_store, Some(&archive_prefix)).await
296 {
297 for (epoch, path) in archive_epoch_dirs.iter().sorted() {
298 let success_marker = path.child("_SUCCESS");
299 let get_result = object_store.get(&success_marker).await;
300 if get_result.is_ok() && !out.contains(epoch) {
301 out.push(*epoch);
302 }
303 }
304 }
305
306 Ok(out)
307}
308
309pub async fn run_manifest_update_loop(
310 store: Arc<DynObjectStore>,
311 mut recv: tokio::sync::broadcast::Receiver<()>,
312) -> Result<()> {
313 let mut update_interval = tokio::time::interval(Duration::from_secs(300));
314 loop {
315 tokio::select! {
316 _now = update_interval.tick() => {
317 if let Ok(epochs) = list_all_epochs(store.clone()).await {
318 let manifest_path = Path::from(MANIFEST_FILENAME);
319 let manifest = Manifest::new(epochs);
320 let bytes = serde_json::to_string(&manifest)?;
321 put(&store, &manifest_path, Bytes::from(bytes)).await?;
322 }
323 },
324 _ = recv.recv() => break,
325 }
326 }
327 Ok(())
328}
329
330pub async fn find_all_files_with_epoch_prefix(
333 store: &Arc<DynObjectStore>,
334 prefix: Option<&Path>,
335) -> anyhow::Result<Vec<Range<u64>>> {
336 let mut ranges = Vec::new();
337 let entries = store.list_with_delimiter(prefix).await?;
338 for entry in entries.objects {
339 let checkpoint_seq_range = entry
340 .location
341 .filename()
342 .ok_or(anyhow!("Illegal file name"))?
343 .split_once('.')
344 .context("Failed to split dir name")?
345 .0
346 .split_once('_')
347 .context("Failed to split dir name")
348 .map(|(start, end)| Range {
349 start: start.parse::<u64>().unwrap(),
350 end: end.parse::<u64>().unwrap(),
351 })?;
352
353 ranges.push(checkpoint_seq_range);
354 }
355 Ok(ranges)
356}
357
358pub async fn find_missing_epochs_dirs(
364 store: &Arc<DynObjectStore>,
365 success_marker: &str,
366) -> anyhow::Result<Vec<u64>> {
367 let remote_checkpoints_by_epoch = find_all_dirs_with_epoch_prefix(store, None).await?;
368 let mut dirs: Vec<_> = remote_checkpoints_by_epoch.iter().collect();
369 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
370 let mut candidate_epoch: u64 = 0;
371 let mut missing_epochs = Vec::new();
372 for (epoch_num, path) in dirs {
373 while candidate_epoch < *epoch_num {
374 missing_epochs.push(candidate_epoch);
376 candidate_epoch += 1;
377 continue;
378 }
379 let success_marker = path.child(success_marker);
380 let get_result = store.get(&success_marker).await;
381 match get_result {
382 Err(Error::NotFound { .. }) => {
383 error!("No success marker found in db checkpoint for epoch: {epoch_num}");
384 missing_epochs.push(*epoch_num);
385 }
386 Err(_) => {
387 warn!(
389 "Failed while trying to read success marker in db checkpoint for epoch: {epoch_num}"
390 );
391 }
392 Ok(_) => {
393 }
395 }
396 candidate_epoch += 1
397 }
398 missing_epochs.push(candidate_epoch);
399 Ok(missing_epochs)
400}
401
402pub fn get_path(prefix: &str) -> Path {
403 Path::from(prefix)
404}
405
406pub async fn write_snapshot_manifest<S: ObjectStoreListExt + ObjectStorePutExt>(
409 dir: &Path,
410 store: &S,
411 epoch_prefix: String,
412) -> Result<()> {
413 let mut file_names = vec![];
414 let mut paths = store.list_objects(Some(dir)).await;
415 while let Some(res) = paths.next().await {
416 if let Ok(object_metadata) = res {
417 let mut path_str = object_metadata.location.to_string();
419 if path_str.starts_with(&epoch_prefix) {
420 path_str = String::from(&path_str[epoch_prefix.len()..]);
421 file_names.push(path_str);
422 } else {
423 warn!("{path_str}, should be coming from the files in the {epoch_prefix} dir",)
424 }
425 } else {
426 return Err(res.err().unwrap().into());
427 }
428 }
429
430 let epoch_manifest = PerEpochManifest::new(file_names);
431 let bytes = Bytes::from(epoch_manifest.serialize_as_newline_delimited());
432 put(
433 store,
434 &Path::from(format!("{}/{}", dir, MANIFEST_FILENAME)),
435 bytes,
436 )
437 .await?;
438
439 Ok(())
440}
441
442pub fn build_object_store(
443 ingestion_url: &str,
444 remote_store_options: Vec<(String, String)>,
445) -> Arc<dyn ObjectStore> {
446 let timeout_secs = 5;
447 let client_options = ClientOptions::new()
448 .with_timeout(Duration::from_secs(timeout_secs))
449 .with_allow_http(true);
450 let retry_config = RetryConfig {
451 max_retries: 10,
452 retry_timeout: Duration::from_secs(timeout_secs + 1),
453 ..Default::default()
454 };
455 let url = ingestion_url
456 .parse::<Url>()
457 .expect("archival ingestion url must be valid");
458 if url.scheme() == "file" {
459 Arc::new(
460 LocalFileSystem::new_with_prefix(
461 url.to_file_path()
462 .expect("archival ingestion url must have a valid file path"),
463 )
464 .expect("failed to create local file system store"),
465 )
466 } else if url.host_str().unwrap_or_default().starts_with("s3") {
467 let mut builder = AmazonS3Builder::new()
468 .with_client_options(client_options)
469 .with_retry(retry_config)
470 .with_imdsv1_fallback()
471 .with_url(ingestion_url);
472 for (key, value) in &remote_store_options {
473 builder = builder.with_config(
474 AmazonS3ConfigKey::from_str(key).expect("invalid S3 config key"),
475 value.clone(),
476 );
477 }
478 Arc::new(builder.build().expect("failed to build S3 store"))
479 } else {
480 Arc::new(
481 HttpBuilder::new()
482 .with_url(url.to_string())
483 .with_client_options(client_options)
484 .with_retry(retry_config)
485 .build()
486 .expect("failed to build HTTP store"),
487 )
488 }
489}
490
491pub async fn fetch_checkpoint(
492 store: &Arc<dyn ObjectStore>,
493 seq: u64,
494) -> anyhow::Result<Checkpoint> {
495 let store = store.clone();
496 let request = move || {
497 let store = store.clone();
498 async move {
499 use backoff::Error as BE;
500 let path = Path::from(format!("{seq}.binpb.zst"));
501 let bytes = store
502 .get(&path)
503 .await
504 .map_err(|e| match e {
505 object_store::Error::NotFound { .. } => {
506 BE::permanent(anyhow!("Checkpoint {seq} not found in archive"))
507 }
508 e => BE::transient(anyhow::Error::from(e)),
509 })?
510 .bytes()
511 .await
512 .map_err(|e| BE::transient(anyhow::Error::from(e)))?;
513 let decompressed =
514 zstd::decode_all(&bytes[..]).map_err(|e| BE::transient(anyhow::Error::from(e)))?;
515 let proto_checkpoint = proto::Checkpoint::decode(&decompressed[..])
516 .map_err(|e| BE::transient(anyhow::Error::from(e)))?;
517 Checkpoint::try_from(&proto_checkpoint).map_err(|e| BE::transient(anyhow!(e)))
518 }
519 };
520 let backoff = ExponentialBackoff {
521 max_elapsed_time: Some(Duration::from_secs(60)),
522 multiplier: 1.0,
523 ..Default::default()
524 };
525 backoff::future::retry(backoff, request).await
526}
527
528pub async fn end_of_epoch_data(
529 url: &str,
530 remote_store_options: Vec<(String, String)>,
531) -> anyhow::Result<Vec<CheckpointSequenceNumber>> {
532 let store = build_object_store(url, remote_store_options);
533 let response = store.get(&Path::from("epochs.json")).await?;
534 let bytes = response.bytes().await?;
535 Ok(serde_json::from_slice(&bytes)?)
536}
537
538#[cfg(test)]
539mod tests {
540 use crate::object_store::util::{
541 MANIFEST_FILENAME, copy_recursively, delete_recursively, write_snapshot_manifest,
542 };
543 use object_store::path::Path;
544 use std::fs;
545 use std::num::NonZeroUsize;
546 use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
547 use tempfile::TempDir;
548
549 #[tokio::test]
550 pub async fn test_copy_recursively() -> anyhow::Result<()> {
551 let input = TempDir::new()?;
552 let input_path = input.path();
553 let child = input_path.join("child");
554 fs::create_dir(&child)?;
555 let file1 = child.join("file1");
556 fs::write(file1, b"Lorem ipsum")?;
557 let grandchild = child.join("grand_child");
558 fs::create_dir(&grandchild)?;
559 let file2 = grandchild.join("file2");
560 fs::write(file2, b"Lorem ipsum")?;
561
562 let output = TempDir::new()?;
563 let output_path = output.path();
564
565 let input_store = ObjectStoreConfig {
566 object_store: Some(ObjectStoreType::File),
567 directory: Some(input_path.to_path_buf()),
568 ..Default::default()
569 }
570 .make()?;
571
572 let output_store = ObjectStoreConfig {
573 object_store: Some(ObjectStoreType::File),
574 directory: Some(output_path.to_path_buf()),
575 ..Default::default()
576 }
577 .make()?;
578
579 copy_recursively(
580 &Path::from("child"),
581 &input_store,
582 &output_store,
583 NonZeroUsize::new(1).unwrap(),
584 )
585 .await?;
586
587 assert!(output_path.join("child").exists());
588 assert!(output_path.join("child").join("file1").exists());
589 assert!(output_path.join("child").join("grand_child").exists());
590 assert!(
591 output_path
592 .join("child")
593 .join("grand_child")
594 .join("file2")
595 .exists()
596 );
597 let content = fs::read_to_string(output_path.join("child").join("file1"))?;
598 assert_eq!(content, "Lorem ipsum");
599 let content =
600 fs::read_to_string(output_path.join("child").join("grand_child").join("file2"))?;
601 assert_eq!(content, "Lorem ipsum");
602 Ok(())
603 }
604
605 #[tokio::test]
606 pub async fn test_write_snapshot_manifest() -> anyhow::Result<()> {
607 let input = TempDir::new()?;
608 let input_path = input.path();
609 let epoch_0 = input_path.join("epoch_0");
610 fs::create_dir(&epoch_0)?;
611 let file1 = epoch_0.join("file1");
612 fs::write(file1, b"Lorem ipsum")?;
613 let file2 = epoch_0.join("file2");
614 fs::write(file2, b"Lorem ipsum")?;
615 let grandchild = epoch_0.join("grand_child");
616 fs::create_dir(&grandchild)?;
617 let file3 = grandchild.join("file2.tar.gz");
618 fs::write(file3, b"Lorem ipsum")?;
619
620 let input_store = ObjectStoreConfig {
621 object_store: Some(ObjectStoreType::File),
622 directory: Some(input_path.to_path_buf()),
623 ..Default::default()
624 }
625 .make()?;
626
627 write_snapshot_manifest(
628 &Path::from("epoch_0"),
629 &input_store,
630 String::from("epoch_0/"),
631 )
632 .await?;
633
634 assert!(input_path.join("epoch_0").join(MANIFEST_FILENAME).exists());
635 let content = fs::read_to_string(input_path.join("epoch_0").join(MANIFEST_FILENAME))?;
636 assert!(content.contains("file2"));
637 assert!(content.contains("file1"));
638 assert!(content.contains("grand_child/file2.tar.gz"));
639 Ok(())
640 }
641
642 #[tokio::test]
643 pub async fn test_delete_recursively() -> anyhow::Result<()> {
644 let input = TempDir::new()?;
645 let input_path = input.path();
646 let child = input_path.join("child");
647 fs::create_dir(&child)?;
648 let file1 = child.join("file1");
649 fs::write(file1, b"Lorem ipsum")?;
650 let grandchild = child.join("grand_child");
651 fs::create_dir(&grandchild)?;
652 let file2 = grandchild.join("file2");
653 fs::write(file2, b"Lorem ipsum")?;
654
655 let input_store = ObjectStoreConfig {
656 object_store: Some(ObjectStoreType::File),
657 directory: Some(input_path.to_path_buf()),
658 ..Default::default()
659 }
660 .make()?;
661
662 delete_recursively(
663 &Path::from("child"),
664 &input_store,
665 NonZeroUsize::new(1).unwrap(),
666 )
667 .await?;
668
669 assert!(!input_path.join("child").join("file1").exists());
670 assert!(
671 !input_path
672 .join("child")
673 .join("grand_child")
674 .join("file2")
675 .exists()
676 );
677 Ok(())
678 }
679}