1use crate::object_store::{
5 ObjectStoreDeleteExt, ObjectStoreGetExt, ObjectStoreListExt, ObjectStorePutExt,
6};
7use anyhow::{Context, Result, anyhow};
8use backoff::ExponentialBackoff;
9use backoff::future::retry;
10use bytes::Bytes;
11use futures::StreamExt;
12use futures::TryStreamExt;
13use indicatif::ProgressBar;
14use itertools::Itertools;
15use mysten_common::ZipDebugEqIteratorExt;
16use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey};
17use object_store::gcp::{GoogleCloudStorageBuilder, GoogleConfigKey};
18use object_store::http::HttpBuilder;
19use object_store::local::LocalFileSystem;
20use object_store::path::Path;
21use object_store::{
22 ClientOptions, DynObjectStore, Error, ObjectStore, ObjectStoreExt, RetryConfig,
23};
24use prost::Message;
25use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
26use serde::{Deserialize, Serialize};
27use serde_json;
28use std::collections::BTreeMap;
29use std::num::NonZeroUsize;
30use std::ops::Range;
31use std::path::PathBuf;
32use std::str::FromStr;
33use std::sync::Arc;
34use std::time::Duration;
35use sui_rpc::proto::sui::rpc::v2 as proto;
36use sui_types::full_checkpoint_content::Checkpoint;
37use sui_types::messages_checkpoint::CheckpointSequenceNumber;
38use tokio::time::Instant;
39use tracing::{error, warn};
40use url::Url;
41
42pub const MANIFEST_FILENAME: &str = "MANIFEST";
43
44#[derive(Serialize, Deserialize)]
45
46pub struct Manifest {
47 pub available_epochs: Vec<u64>,
48}
49
50impl Manifest {
51 pub fn new(available_epochs: Vec<u64>) -> Self {
52 Manifest { available_epochs }
53 }
54
55 pub fn epoch_exists(&self, epoch: u64) -> bool {
56 self.available_epochs.contains(&epoch)
57 }
58}
59
60#[derive(Debug, Clone)]
61pub struct PerEpochManifest {
62 pub lines: Vec<String>,
63}
64
65impl PerEpochManifest {
66 pub fn new(lines: Vec<String>) -> Self {
67 PerEpochManifest { lines }
68 }
69
70 pub fn serialize_as_newline_delimited(&self) -> String {
71 self.lines.join("\n")
72 }
73
74 pub fn deserialize_from_newline_delimited(s: &str) -> PerEpochManifest {
75 PerEpochManifest {
76 lines: s.lines().map(String::from).collect(),
77 }
78 }
79
80 pub fn filter_by_prefix(&self, prefix: &str) -> PerEpochManifest {
82 let filtered_lines = self
83 .lines
84 .iter()
85 .filter(|line| line.starts_with(prefix))
86 .cloned()
87 .collect();
88
89 PerEpochManifest {
90 lines: filtered_lines,
91 }
92 }
93}
94
95pub async fn get<S: ObjectStoreGetExt>(store: &S, src: &Path) -> Result<Bytes> {
96 let bytes = retry(backoff::ExponentialBackoff::default(), || async {
97 store.get_bytes(src).await.map_err(|e| {
98 error!("Failed to read file from object store with error: {:?}", &e);
99 backoff::Error::transient(e)
100 })
101 })
102 .await?;
103 Ok(bytes)
104}
105
106pub async fn exists<S: ObjectStoreGetExt>(store: &S, src: &Path) -> bool {
107 store.get_bytes(src).await.is_ok()
108}
109
110pub async fn put<S: ObjectStorePutExt>(store: &S, src: &Path, bytes: Bytes) -> Result<()> {
111 retry(backoff::ExponentialBackoff::default(), || async {
112 if !bytes.is_empty() {
113 store.put_bytes(src, bytes.clone()).await.map_err(|e| {
114 error!("Failed to write file to object store with error: {:?}", &e);
115 backoff::Error::transient(e)
116 })
117 } else {
118 warn!("Not copying empty file: {:?}", src);
119 Ok(())
120 }
121 })
122 .await?;
123 Ok(())
124}
125
126pub async fn copy_file<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
127 src: &Path,
128 dest: &Path,
129 src_store: &S,
130 dest_store: &D,
131) -> Result<()> {
132 let bytes = get(src_store, src).await?;
133 if !bytes.is_empty() {
134 put(dest_store, dest, bytes).await
135 } else {
136 warn!("Not copying empty file: {:?}", src);
137 Ok(())
138 }
139}
140
141pub async fn copy_files<S: ObjectStoreGetExt, D: ObjectStorePutExt>(
142 src: &[Path],
143 dest: &[Path],
144 src_store: &S,
145 dest_store: &D,
146 concurrency: NonZeroUsize,
147 progress_bar: Option<ProgressBar>,
148) -> Result<Vec<()>> {
149 let mut instant = Instant::now();
150 let progress_bar_clone = progress_bar.clone();
151 let results = futures::stream::iter(src.iter().zip_debug_eq(dest.iter()))
152 .map(|(path_in, path_out)| async move {
153 let ret = copy_file(path_in, path_out, src_store, dest_store).await;
154 Ok((path_out.clone(), ret))
155 })
156 .boxed()
157 .buffer_unordered(concurrency.get())
158 .try_for_each(|(path, ret)| {
159 if let Some(progress_bar_clone) = &progress_bar_clone {
160 progress_bar_clone.inc(1);
161 progress_bar_clone.set_message(format!("file: {}", path));
162 instant = Instant::now();
163 }
164 futures::future::ready(ret)
165 })
166 .await;
167 Ok(results.into_iter().collect())
168}
169
170pub async fn copy_recursively<S: ObjectStoreGetExt + ObjectStoreListExt, D: ObjectStorePutExt>(
171 dir: &Path,
172 src_store: &S,
173 dest_store: &D,
174 concurrency: NonZeroUsize,
175) -> Result<Vec<()>> {
176 let mut input_paths = vec![];
177 let mut output_paths = vec![];
178 let mut paths = src_store.list_objects(Some(dir)).await;
179 while let Some(res) = paths.next().await {
180 if let Ok(object_metadata) = res {
181 input_paths.push(object_metadata.location.clone());
182 output_paths.push(object_metadata.location);
183 } else {
184 return Err(res.err().unwrap().into());
185 }
186 }
187 copy_files(
188 &input_paths,
189 &output_paths,
190 src_store,
191 dest_store,
192 concurrency,
193 None,
194 )
195 .await
196}
197
198pub async fn delete_files<S: ObjectStoreDeleteExt>(
199 files: &[Path],
200 store: &S,
201 concurrency: NonZeroUsize,
202) -> Result<Vec<()>> {
203 let results: Vec<Result<()>> = futures::stream::iter(files)
204 .map(|f| {
205 retry(backoff::ExponentialBackoff::default(), || async {
206 store.delete_object(f).await.map_err(|e| {
207 error!("Failed to delete file on object store with error: {:?}", &e);
208 backoff::Error::transient(e)
209 })
210 })
211 })
212 .boxed()
213 .buffer_unordered(concurrency.get())
214 .collect()
215 .await;
216 results.into_iter().collect()
217}
218
219pub async fn delete_recursively<S: ObjectStoreDeleteExt + ObjectStoreListExt>(
220 path: &Path,
221 store: &S,
222 concurrency: NonZeroUsize,
223) -> Result<Vec<()>> {
224 let mut paths_to_delete = vec![];
225 let mut paths = store.list_objects(Some(path)).await;
226 while let Some(res) = paths.next().await {
227 if let Ok(object_metadata) = res {
228 paths_to_delete.push(object_metadata.location);
229 } else {
230 return Err(res.err().unwrap().into());
231 }
232 }
233 delete_files(&paths_to_delete, store, concurrency).await
234}
235
236pub fn path_to_filesystem(local_dir_path: PathBuf, location: &Path) -> anyhow::Result<PathBuf> {
237 let path = std::fs::canonicalize(local_dir_path)?;
239 let mut url = Url::from_file_path(&path)
240 .map_err(|_| anyhow!("Failed to parse input path: {}", path.display()))?;
241 url.path_segments_mut()
242 .map_err(|_| anyhow!("Failed to get path segments: {}", path.display()))?
243 .pop_if_empty()
244 .extend(location.parts());
245 let new_path = url
246 .to_file_path()
247 .map_err(|_| anyhow!("Failed to convert url to path: {}", url.as_str()))?;
248 Ok(new_path)
249}
250
251pub async fn find_all_dirs_with_epoch_prefix(
254 store: &Arc<DynObjectStore>,
255 prefix: Option<&Path>,
256) -> anyhow::Result<BTreeMap<u64, Path>> {
257 let mut dirs = BTreeMap::new();
258 let entries = store.list_with_delimiter(prefix).await?;
259 for entry in entries.common_prefixes {
260 if let Some(filename) = entry.filename() {
261 if !filename.starts_with("epoch_") || filename.ends_with(".tmp") {
262 continue;
263 }
264 let epoch = filename
265 .split_once('_')
266 .context("Failed to split dir name")
267 .map(|(_, epoch)| epoch.parse::<u64>())??;
268 dirs.insert(epoch, entry);
269 }
270 }
271 Ok(dirs)
272}
273
274pub async fn list_all_epochs(object_store: Arc<DynObjectStore>) -> Result<Vec<u64>> {
275 let remote_epoch_dirs = find_all_dirs_with_epoch_prefix(&object_store, None).await?;
276 let mut out = vec![];
277 let mut success_marker_found = false;
278 for (epoch, path) in remote_epoch_dirs.iter().sorted() {
279 let success_marker = path.child("_SUCCESS");
280 let get_result = object_store.get(&success_marker).await;
281 match get_result {
282 Err(_) => {
283 if !success_marker_found {
284 error!("No success marker found for epoch: {epoch}");
285 }
286 }
287 Ok(_) => {
288 out.push(*epoch);
289 success_marker_found = true;
290 }
291 }
292 }
293
294 let archive_prefix = Path::from("archive");
296 if let Ok(archive_epoch_dirs) =
297 find_all_dirs_with_epoch_prefix(&object_store, Some(&archive_prefix)).await
298 {
299 for (epoch, path) in archive_epoch_dirs.iter().sorted() {
300 let success_marker = path.child("_SUCCESS");
301 let get_result = object_store.get(&success_marker).await;
302 if get_result.is_ok() && !out.contains(epoch) {
303 out.push(*epoch);
304 }
305 }
306 }
307
308 Ok(out)
309}
310
311pub async fn run_manifest_update_loop(
312 store: Arc<DynObjectStore>,
313 mut recv: tokio::sync::broadcast::Receiver<()>,
314) -> Result<()> {
315 let mut update_interval = tokio::time::interval(Duration::from_secs(300));
316 loop {
317 tokio::select! {
318 _now = update_interval.tick() => {
319 if let Ok(epochs) = list_all_epochs(store.clone()).await {
320 let manifest_path = Path::from(MANIFEST_FILENAME);
321 let manifest = Manifest::new(epochs);
322 let bytes = serde_json::to_string(&manifest)?;
323 put(&store, &manifest_path, Bytes::from(bytes)).await?;
324 }
325 },
326 _ = recv.recv() => break,
327 }
328 }
329 Ok(())
330}
331
332pub async fn find_all_files_with_epoch_prefix(
335 store: &Arc<DynObjectStore>,
336 prefix: Option<&Path>,
337) -> anyhow::Result<Vec<Range<u64>>> {
338 let mut ranges = Vec::new();
339 let entries = store.list_with_delimiter(prefix).await?;
340 for entry in entries.objects {
341 let checkpoint_seq_range = entry
342 .location
343 .filename()
344 .ok_or(anyhow!("Illegal file name"))?
345 .split_once('.')
346 .context("Failed to split dir name")?
347 .0
348 .split_once('_')
349 .context("Failed to split dir name")
350 .map(|(start, end)| Range {
351 start: start.parse::<u64>().unwrap(),
352 end: end.parse::<u64>().unwrap(),
353 })?;
354
355 ranges.push(checkpoint_seq_range);
356 }
357 Ok(ranges)
358}
359
360pub async fn find_missing_epochs_dirs(
366 store: &Arc<DynObjectStore>,
367 success_marker: &str,
368) -> anyhow::Result<Vec<u64>> {
369 let remote_checkpoints_by_epoch = find_all_dirs_with_epoch_prefix(store, None).await?;
370 let mut dirs: Vec<_> = remote_checkpoints_by_epoch.iter().collect();
371 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
372 let mut candidate_epoch: u64 = 0;
373 let mut missing_epochs = Vec::new();
374 for (epoch_num, path) in dirs {
375 while candidate_epoch < *epoch_num {
376 missing_epochs.push(candidate_epoch);
378 candidate_epoch += 1;
379 continue;
380 }
381 let success_marker = path.child(success_marker);
382 let get_result = store.get(&success_marker).await;
383 match get_result {
384 Err(Error::NotFound { .. }) => {
385 error!("No success marker found in db checkpoint for epoch: {epoch_num}");
386 missing_epochs.push(*epoch_num);
387 }
388 Err(_) => {
389 warn!(
391 "Failed while trying to read success marker in db checkpoint for epoch: {epoch_num}"
392 );
393 }
394 Ok(_) => {
395 }
397 }
398 candidate_epoch += 1
399 }
400 missing_epochs.push(candidate_epoch);
401 Ok(missing_epochs)
402}
403
404pub fn get_path(prefix: &str) -> Path {
405 Path::from(prefix)
406}
407
408pub async fn write_snapshot_manifest<S: ObjectStoreListExt + ObjectStorePutExt>(
411 dir: &Path,
412 store: &S,
413 epoch_prefix: String,
414) -> Result<()> {
415 let mut file_names = vec![];
416 let mut paths = store.list_objects(Some(dir)).await;
417 while let Some(res) = paths.next().await {
418 if let Ok(object_metadata) = res {
419 let mut path_str = object_metadata.location.to_string();
421 if path_str.starts_with(&epoch_prefix) {
422 path_str = String::from(&path_str[epoch_prefix.len()..]);
423 file_names.push(path_str);
424 } else {
425 warn!("{path_str}, should be coming from the files in the {epoch_prefix} dir",)
426 }
427 } else {
428 return Err(res.err().unwrap().into());
429 }
430 }
431
432 let epoch_manifest = PerEpochManifest::new(file_names);
433 let bytes = Bytes::from(epoch_manifest.serialize_as_newline_delimited());
434 put(
435 store,
436 &Path::from(format!("{}/{}", dir, MANIFEST_FILENAME)),
437 bytes,
438 )
439 .await?;
440
441 Ok(())
442}
443
444pub fn build_object_store(
445 ingestion_url: &str,
446 remote_store_options: Vec<(String, String)>,
447 remote_store_headers: Vec<(String, String)>,
448) -> Arc<dyn ObjectStore> {
449 let timeout_secs = 5;
450 let mut client_options = ClientOptions::new()
451 .with_timeout(Duration::from_secs(timeout_secs))
452 .with_allow_http(true);
453 if !remote_store_headers.is_empty() {
454 let mut headers = HeaderMap::new();
455 for (name, value) in &remote_store_headers {
456 headers.insert(
457 HeaderName::from_bytes(name.as_bytes()).expect("invalid remote store header name"),
458 HeaderValue::from_str(value).expect("invalid remote store header value"),
459 );
460 }
461 client_options = client_options.with_default_headers(headers);
462 }
463 let retry_config = RetryConfig {
464 max_retries: 10,
465 retry_timeout: Duration::from_secs(timeout_secs + 1),
466 ..Default::default()
467 };
468 let url = ingestion_url
469 .parse::<Url>()
470 .expect("archival ingestion url must be valid");
471 if url.scheme() == "file" {
472 Arc::new(
473 LocalFileSystem::new_with_prefix(
474 url.to_file_path()
475 .expect("archival ingestion url must have a valid file path"),
476 )
477 .expect("failed to create local file system store"),
478 )
479 } else if url.scheme() == "gs" {
480 let mut builder = GoogleCloudStorageBuilder::new()
481 .with_client_options(client_options)
482 .with_retry(retry_config)
483 .with_url(ingestion_url);
484 for (key, value) in &remote_store_options {
485 builder = builder.with_config(
486 GoogleConfigKey::from_str(key).expect("invalid GCS config key"),
487 value.clone(),
488 );
489 }
490 Arc::new(builder.build().expect("failed to build GCS store"))
491 } else if url.host_str().unwrap_or_default().starts_with("s3") {
492 let mut builder = AmazonS3Builder::new()
493 .with_client_options(client_options)
494 .with_retry(retry_config)
495 .with_imdsv1_fallback()
496 .with_url(ingestion_url);
497 for (key, value) in &remote_store_options {
498 builder = builder.with_config(
499 AmazonS3ConfigKey::from_str(key).expect("invalid S3 config key"),
500 value.clone(),
501 );
502 }
503 Arc::new(builder.build().expect("failed to build S3 store"))
504 } else {
505 Arc::new(
506 HttpBuilder::new()
507 .with_url(url.to_string())
508 .with_client_options(client_options)
509 .with_retry(retry_config)
510 .build()
511 .expect("failed to build HTTP store"),
512 )
513 }
514}
515
516pub async fn fetch_checkpoint(
517 store: &Arc<dyn ObjectStore>,
518 seq: u64,
519) -> anyhow::Result<Checkpoint> {
520 let store = store.clone();
521 let request = move || {
522 let store = store.clone();
523 async move {
524 use backoff::Error as BE;
525 let path = Path::from(format!("{seq}.binpb.zst"));
526 let bytes = store
527 .get(&path)
528 .await
529 .map_err(|e| match e {
530 object_store::Error::NotFound { .. } => {
531 BE::permanent(anyhow!("Checkpoint {seq} not found in archive"))
532 }
533 e => BE::transient(anyhow::Error::from(e)),
534 })?
535 .bytes()
536 .await
537 .map_err(|e| BE::transient(anyhow::Error::from(e)))?;
538 let decompressed =
539 zstd::decode_all(&bytes[..]).map_err(|e| BE::transient(anyhow::Error::from(e)))?;
540 let proto_checkpoint = proto::Checkpoint::decode(&decompressed[..])
541 .map_err(|e| BE::transient(anyhow::Error::from(e)))?;
542 Checkpoint::try_from(&proto_checkpoint).map_err(|e| BE::transient(anyhow!(e)))
543 }
544 };
545 let backoff = ExponentialBackoff {
546 max_elapsed_time: Some(Duration::from_secs(60)),
547 multiplier: 1.0,
548 ..Default::default()
549 };
550 backoff::future::retry(backoff, request).await
551}
552
553pub async fn end_of_epoch_data(
554 url: &str,
555 remote_store_options: Vec<(String, String)>,
556) -> anyhow::Result<Vec<CheckpointSequenceNumber>> {
557 let store = build_object_store(url, remote_store_options, vec![]);
558 let response = store.get(&Path::from("epochs.json")).await?;
559 let bytes = response.bytes().await?;
560 Ok(serde_json::from_slice(&bytes)?)
561}
562
563#[cfg(test)]
564mod tests {
565 use crate::object_store::util::{
566 MANIFEST_FILENAME, copy_recursively, delete_recursively, write_snapshot_manifest,
567 };
568 use object_store::path::Path;
569 use std::fs;
570 use std::num::NonZeroUsize;
571 use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
572 use tempfile::TempDir;
573
574 #[tokio::test]
575 pub async fn test_copy_recursively() -> anyhow::Result<()> {
576 let input = TempDir::new()?;
577 let input_path = input.path();
578 let child = input_path.join("child");
579 fs::create_dir(&child)?;
580 let file1 = child.join("file1");
581 fs::write(file1, b"Lorem ipsum")?;
582 let grandchild = child.join("grand_child");
583 fs::create_dir(&grandchild)?;
584 let file2 = grandchild.join("file2");
585 fs::write(file2, b"Lorem ipsum")?;
586
587 let output = TempDir::new()?;
588 let output_path = output.path();
589
590 let input_store = ObjectStoreConfig {
591 object_store: Some(ObjectStoreType::File),
592 directory: Some(input_path.to_path_buf()),
593 ..Default::default()
594 }
595 .make()?;
596
597 let output_store = ObjectStoreConfig {
598 object_store: Some(ObjectStoreType::File),
599 directory: Some(output_path.to_path_buf()),
600 ..Default::default()
601 }
602 .make()?;
603
604 copy_recursively(
605 &Path::from("child"),
606 &input_store,
607 &output_store,
608 NonZeroUsize::new(1).unwrap(),
609 )
610 .await?;
611
612 assert!(output_path.join("child").exists());
613 assert!(output_path.join("child").join("file1").exists());
614 assert!(output_path.join("child").join("grand_child").exists());
615 assert!(
616 output_path
617 .join("child")
618 .join("grand_child")
619 .join("file2")
620 .exists()
621 );
622 let content = fs::read_to_string(output_path.join("child").join("file1"))?;
623 assert_eq!(content, "Lorem ipsum");
624 let content =
625 fs::read_to_string(output_path.join("child").join("grand_child").join("file2"))?;
626 assert_eq!(content, "Lorem ipsum");
627 Ok(())
628 }
629
630 #[tokio::test]
631 pub async fn test_write_snapshot_manifest() -> anyhow::Result<()> {
632 let input = TempDir::new()?;
633 let input_path = input.path();
634 let epoch_0 = input_path.join("epoch_0");
635 fs::create_dir(&epoch_0)?;
636 let file1 = epoch_0.join("file1");
637 fs::write(file1, b"Lorem ipsum")?;
638 let file2 = epoch_0.join("file2");
639 fs::write(file2, b"Lorem ipsum")?;
640 let grandchild = epoch_0.join("grand_child");
641 fs::create_dir(&grandchild)?;
642 let file3 = grandchild.join("file2.tar.gz");
643 fs::write(file3, b"Lorem ipsum")?;
644
645 let input_store = ObjectStoreConfig {
646 object_store: Some(ObjectStoreType::File),
647 directory: Some(input_path.to_path_buf()),
648 ..Default::default()
649 }
650 .make()?;
651
652 write_snapshot_manifest(
653 &Path::from("epoch_0"),
654 &input_store,
655 String::from("epoch_0/"),
656 )
657 .await?;
658
659 assert!(input_path.join("epoch_0").join(MANIFEST_FILENAME).exists());
660 let content = fs::read_to_string(input_path.join("epoch_0").join(MANIFEST_FILENAME))?;
661 assert!(content.contains("file2"));
662 assert!(content.contains("file1"));
663 assert!(content.contains("grand_child/file2.tar.gz"));
664 Ok(())
665 }
666
667 #[tokio::test]
668 pub async fn test_delete_recursively() -> anyhow::Result<()> {
669 let input = TempDir::new()?;
670 let input_path = input.path();
671 let child = input_path.join("child");
672 fs::create_dir(&child)?;
673 let file1 = child.join("file1");
674 fs::write(file1, b"Lorem ipsum")?;
675 let grandchild = child.join("grand_child");
676 fs::create_dir(&grandchild)?;
677 let file2 = grandchild.join("file2");
678 fs::write(file2, b"Lorem ipsum")?;
679
680 let input_store = ObjectStoreConfig {
681 object_store: Some(ObjectStoreType::File),
682 directory: Some(input_path.to_path_buf()),
683 ..Default::default()
684 }
685 .make()?;
686
687 delete_recursively(
688 &Path::from("child"),
689 &input_store,
690 NonZeroUsize::new(1).unwrap(),
691 )
692 .await?;
693
694 assert!(!input_path.join("child").join("file1").exists());
695 assert!(
696 !input_path
697 .join("child")
698 .join("grand_child")
699 .join("file2")
700 .exists()
701 );
702 Ok(())
703 }
704}