1use anyhow::Result;
6use fastcrypto::traits::ToFromBytes;
7use futures::future::AbortHandle;
8use futures::future::join_all;
9use itertools::Itertools;
10use std::collections::BTreeMap;
11use std::fmt::Write;
12use std::num::NonZeroUsize;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
16use std::time::Duration;
17use std::{fs, io};
18use sui_config::{NodeConfig, genesis::Genesis};
19use sui_core::authority_client::{AuthorityAPI, NetworkAuthorityClient};
20use sui_core::execution_cache::build_execution_cache_from_env;
21use sui_data_ingestion_core::end_of_epoch_data;
22use sui_network::default_mysten_network_config;
23use sui_protocol_config::Chain;
24use sui_sdk::SuiClient;
25use sui_sdk::SuiClientBuilder;
26use sui_storage::object_store::http::HttpDownloaderBuilder;
27use sui_storage::object_store::util::MANIFEST_FILENAME;
28use sui_storage::object_store::util::Manifest;
29use sui_storage::object_store::util::PerEpochManifest;
30use sui_types::committee::QUORUM_THRESHOLD;
31use sui_types::crypto::AuthorityPublicKeyBytes;
32use sui_types::global_state_hash::GlobalStateHash;
33use sui_types::messages_grpc::LayoutGenerationOption;
34use sui_types::multiaddr::Multiaddr;
35use sui_types::{base_types::*, object::Owner};
36use tokio::sync::mpsc;
37use tokio::task::JoinHandle;
38use tokio::time::Instant;
39
40use anyhow::anyhow;
41use clap::ValueEnum;
42use eyre::ContextCompat;
43use fastcrypto::hash::MultisetHash;
44use futures::{StreamExt, TryStreamExt};
45use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
46use prometheus::Registry;
47use serde::{Deserialize, Serialize};
48use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
49use sui_core::authority::AuthorityStore;
50use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
51use sui_core::checkpoints::CheckpointStore;
52use sui_core::epoch::committee_store::CommitteeStore;
53use sui_core::storage::RocksDbStore;
54use sui_snapshot::reader::StateSnapshotReaderV1;
55use sui_snapshot::setup_db_state;
56use sui_storage::object_store::ObjectStoreGetExt;
57use sui_storage::object_store::util::{copy_file, exists, get_path};
58use sui_types::messages_checkpoint::{CheckpointCommitment, ECMHLiveObjectSetDigest};
59use sui_types::messages_grpc::{
60 ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest,
61 TransactionStatus,
62};
63
64use crate::formal_snapshot_util::read_summaries_for_list_no_verify;
65use sui_core::authority::authority_store_pruner::PrunerWatermarks;
66use sui_types::storage::ReadStore;
67use tracing::info;
68use typed_store::DBMetrics;
69
70pub mod commands;
71#[cfg(not(tidehunter))]
72pub mod db_tool;
73mod formal_snapshot_util;
74
75#[derive(
76 Clone, Serialize, Deserialize, Debug, PartialEq, Copy, PartialOrd, Ord, Eq, ValueEnum, Default,
77)]
78pub enum SnapshotVerifyMode {
79 None,
84 #[default]
87 Normal,
88 Strict,
91}
92
93async fn make_clients(
95 sui_client: &Arc<SuiClient>,
96) -> Result<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>> {
97 let mut net_config = default_mysten_network_config();
98 net_config.connect_timeout = Some(Duration::from_secs(5));
99 let mut authority_clients = BTreeMap::new();
100
101 let active_validators = sui_client
102 .governance_api()
103 .get_latest_sui_system_state()
104 .await?
105 .active_validators;
106
107 for validator in active_validators {
108 let net_addr = Multiaddr::try_from(validator.net_address).unwrap();
109 let tls_config = sui_tls::create_rustls_client_config(
110 sui_types::crypto::NetworkPublicKey::from_bytes(&validator.network_pubkey_bytes)?,
111 sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
112 None,
113 );
114 let channel = net_config
115 .connect_lazy(&net_addr, tls_config)
116 .map_err(|err| anyhow!(err.to_string()))?;
117 let client = NetworkAuthorityClient::new(channel);
118 let public_key_bytes =
119 AuthorityPublicKeyBytes::from_bytes(&validator.protocol_pubkey_bytes)?;
120 authority_clients.insert(public_key_bytes, (net_addr.clone(), client));
121 }
122
123 Ok(authority_clients)
124}
125
126type ObjectVersionResponses = (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64);
127pub struct ObjectData {
128 requested_id: ObjectID,
129 responses: Vec<(AuthorityName, Multiaddr, ObjectVersionResponses)>,
130}
131
132trait OptionDebug<T> {
133 fn opt_debug(&self, def_str: &str) -> String;
134}
135
136impl<T> OptionDebug<T> for Option<T>
137where
138 T: std::fmt::Debug,
139{
140 fn opt_debug(&self, def_str: &str) -> String {
141 match self {
142 None => def_str.to_string(),
143 Some(t) => format!("{:?}", t),
144 }
145 }
146}
147
148#[allow(clippy::type_complexity)]
149pub struct GroupedObjectOutput {
150 pub grouped_results: BTreeMap<
151 Option<(
152 Option<SequenceNumber>,
153 ObjectDigest,
154 TransactionDigest,
155 Owner,
156 Option<TransactionDigest>,
157 )>,
158 Vec<AuthorityName>,
159 >,
160 pub voting_power: Vec<(
161 Option<(
162 Option<SequenceNumber>,
163 ObjectDigest,
164 TransactionDigest,
165 Owner,
166 Option<TransactionDigest>,
167 )>,
168 u64,
169 )>,
170 pub available_voting_power: u64,
171 pub fully_locked: bool,
172}
173
174impl GroupedObjectOutput {
175 pub fn new(
176 object_data: ObjectData,
177 committee: Arc<BTreeMap<AuthorityPublicKeyBytes, u64>>,
178 ) -> Self {
179 let mut grouped_results = BTreeMap::new();
180 let mut voting_power = BTreeMap::new();
181 let mut available_voting_power = 0;
182 for (name, _, (version, resp, _elapsed)) in &object_data.responses {
183 let stake = committee.get(name).unwrap();
184 let key = match resp {
185 Ok(r) => {
186 let obj_digest = r.object.compute_object_reference().2;
187 let parent_tx_digest = r.object.previous_transaction;
188 let owner = r.object.owner.clone();
189 let lock = r.lock_for_debugging.as_ref().map(|lock| *lock.digest());
190 if lock.is_none() {
191 available_voting_power += stake;
192 }
193 Some((*version, obj_digest, parent_tx_digest, owner, lock))
194 }
195 Err(_) => None,
196 };
197 let entry = grouped_results.entry(key.clone()).or_insert_with(Vec::new);
198 entry.push(*name);
199 let entry: &mut u64 = voting_power.entry(key).or_default();
200 *entry += stake;
201 }
202 let voting_power = voting_power
203 .into_iter()
204 .sorted_by(|(_, v1), (_, v2)| Ord::cmp(v2, v1))
205 .collect::<Vec<_>>();
206 let mut fully_locked = false;
207 if !voting_power.is_empty()
208 && voting_power.first().unwrap().1 + available_voting_power < QUORUM_THRESHOLD
209 {
210 fully_locked = true;
211 }
212 Self {
213 grouped_results,
214 voting_power,
215 available_voting_power,
216 fully_locked,
217 }
218 }
219}
220
221#[allow(clippy::format_in_format_args)]
222impl std::fmt::Display for GroupedObjectOutput {
223 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224 writeln!(f, "available stake: {}", self.available_voting_power)?;
225 writeln!(f, "fully locked: {}", self.fully_locked)?;
226 writeln!(f, "{:<100}\n", "-".repeat(100))?;
227 for (key, stake) in &self.voting_power {
228 let val = self.grouped_results.get(key).unwrap();
229 writeln!(f, "total stake: {stake}")?;
230 match key {
231 Some((_version, obj_digest, parent_tx_digest, owner, lock)) => {
232 let lock = lock.opt_debug("no-known-lock");
233 writeln!(f, "obj ref: {obj_digest}")?;
234 writeln!(f, "parent tx: {parent_tx_digest}")?;
235 writeln!(f, "owner: {owner}")?;
236 writeln!(f, "lock: {lock}")?;
237 for (i, name) in val.iter().enumerate() {
238 writeln!(f, " {:<4} {:<20}", i, name.concise(),)?;
239 }
240 }
241 None => {
242 writeln!(f, "ERROR")?;
243 for (i, name) in val.iter().enumerate() {
244 writeln!(f, " {:<4} {:<20}", i, name.concise(),)?;
245 }
246 }
247 };
248 writeln!(f, "{:<100}\n", "-".repeat(100))?;
249 }
250 Ok(())
251 }
252}
253
254struct ConciseObjectOutput(ObjectData);
255
256impl ConciseObjectOutput {
257 fn header() -> String {
258 format!(
259 "{:<20} {:<8} {:<66} {:<45} {}",
260 "validator", "version", "digest", "parent_cert", "owner"
261 )
262 }
263}
264
265impl std::fmt::Display for ConciseObjectOutput {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 for (name, _multi_addr, (version, resp, _time_elapsed)) in &self.0.responses {
268 write!(
269 f,
270 "{:<20} {:<8}",
271 format!("{:?}", name.concise()),
272 version.map(|s| s.value()).opt_debug("-")
273 )?;
274 match resp {
275 Err(_) => writeln!(
276 f,
277 "{:<66} {:<45} {:<51}",
278 "object-fetch-failed", "no-cert-available", "no-owner-available"
279 )?,
280 Ok(resp) => {
281 let obj_digest = resp.object.compute_object_reference().2;
282 let parent = resp.object.previous_transaction;
283 let owner = resp.object.owner.clone();
284 write!(f, " {:<66} {:<45} {:<51}", obj_digest, parent, owner)?;
285 }
286 }
287 writeln!(f)?;
288 }
289 Ok(())
290 }
291}
292
293struct VerboseObjectOutput(ObjectData);
294
295impl std::fmt::Display for VerboseObjectOutput {
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 writeln!(f, "Object: {}", self.0.requested_id)?;
298
299 for (name, multiaddr, (version, resp, timespent)) in &self.0.responses {
300 writeln!(f, "validator: {:?}, addr: {:?}", name.concise(), multiaddr)?;
301 writeln!(
302 f,
303 "-- version: {} ({:.3}s)",
304 version.opt_debug("<version not available>"),
305 timespent,
306 )?;
307
308 match resp {
309 Err(e) => writeln!(f, "Error fetching object: {}", e)?,
310 Ok(resp) => {
311 writeln!(
312 f,
313 " -- object digest: {}",
314 resp.object.compute_object_reference().2
315 )?;
316 if resp.object.is_package() {
317 writeln!(f, " -- object: <Move Package>")?;
318 } else if let Some(layout) = &resp.layout {
319 writeln!(
320 f,
321 " -- object: Move Object: {}",
322 resp.object
323 .data
324 .try_as_move()
325 .unwrap()
326 .to_move_struct(layout)
327 .unwrap()
328 )?;
329 }
330 writeln!(f, " -- owner: {}", resp.object.owner)?;
331 writeln!(
332 f,
333 " -- locked by: {}",
334 resp.lock_for_debugging.opt_debug("<not locked>")
335 )?;
336 }
337 }
338 }
339 Ok(())
340 }
341}
342
343pub async fn get_object(
344 obj_id: ObjectID,
345 version: Option<u64>,
346 validator: Option<AuthorityName>,
347 clients: Arc<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>>,
348) -> Result<ObjectData> {
349 let responses = join_all(
350 clients
351 .iter()
352 .filter(|(name, _)| {
353 if let Some(v) = validator {
354 v == **name
355 } else {
356 true
357 }
358 })
359 .map(|(name, (address, client))| async {
360 let object_version = get_object_impl(client, obj_id, version).await;
361 (*name, address.clone(), object_version)
362 }),
363 )
364 .await;
365
366 Ok(ObjectData {
367 requested_id: obj_id,
368 responses,
369 })
370}
371
372pub async fn get_transaction_block(
373 tx_digest: TransactionDigest,
374 show_input_tx: bool,
375 fullnode_rpc: String,
376) -> Result<String> {
377 let sui_client = Arc::new(SuiClientBuilder::default().build(fullnode_rpc).await?);
378 let clients = make_clients(&sui_client).await?;
379 let timer = Instant::now();
380 let responses = join_all(clients.iter().map(|(name, (address, client))| async {
381 let result = client
382 .handle_transaction_info_request(TransactionInfoRequest {
383 transaction_digest: tx_digest,
384 })
385 .await;
386 (
387 *name,
388 address.clone(),
389 result,
390 timer.elapsed().as_secs_f64(),
391 )
392 }))
393 .await;
394
395 let validator_aware_of_tx = responses.iter().find(|r| r.2.is_ok());
397
398 let responses = responses
399 .iter()
400 .map(|r| {
401 let key =
402 r.2.as_ref()
403 .map(|ok_result| match &ok_result.status {
404 TransactionStatus::Signed(_) => None,
405 TransactionStatus::Executed(_, effects, _) => Some(effects.digest()),
406 })
407 .ok();
408 let err = r.2.as_ref().err();
409 (key, err, r)
410 })
411 .sorted_by(|(k1, err1, _), (k2, err2, _)| {
412 Ord::cmp(k1, k2).then_with(|| Ord::cmp(err1, err2))
413 })
414 .chunk_by(|(_, _err, r)| {
415 r.2.as_ref().map(|ok_result| match &ok_result.status {
416 TransactionStatus::Signed(_) => None,
417 TransactionStatus::Executed(_, effects, _) => Some((
418 ok_result.transaction.transaction_data(),
419 effects.data(),
420 effects.digest(),
421 )),
422 })
423 });
424 let mut s = String::new();
425 for (i, (key, group)) in responses.into_iter().enumerate() {
426 match key {
427 Ok(Some((tx, effects, effects_digest))) => {
428 writeln!(
429 &mut s,
430 "#{:<2} tx_digest: {:<68?} effects_digest: {:?}",
431 i, tx_digest, effects_digest,
432 )?;
433 writeln!(&mut s, "{:#?}", effects)?;
434 if show_input_tx {
435 writeln!(&mut s, "{:#?}", tx)?;
436 }
437 }
438 Ok(None) => {
439 writeln!(
440 &mut s,
441 "#{:<2} tx_digest: {:<68?} Signed but not executed",
442 i, tx_digest
443 )?;
444 if show_input_tx {
445 let validator_aware_of_tx = validator_aware_of_tx.unwrap();
447 let client = &clients.get(&validator_aware_of_tx.0).unwrap().1;
448 let tx_info = client.handle_transaction_info_request(TransactionInfoRequest {
449 transaction_digest: tx_digest,
450 }).await.unwrap_or_else(|e| panic!("Validator {:?} should have known about tx_digest: {:?}, got error: {:?}", validator_aware_of_tx.0, tx_digest, e));
451 writeln!(&mut s, "{:#?}", tx_info)?;
452 }
453 }
454 other => {
455 writeln!(&mut s, "#{:<2} {:#?}", i, other)?;
456 }
457 }
458 for (j, (_, _, res)) in group.enumerate() {
459 writeln!(
460 &mut s,
461 " {:<4} {:<20} {:<56} ({:.3}s)",
462 j,
463 res.0.concise(),
464 format!("{}", res.1),
465 res.3
466 )?;
467 }
468 writeln!(&mut s, "{:<100}\n", "-".repeat(100))?;
469 }
470 Ok(s)
471}
472
473async fn get_object_impl(
474 client: &NetworkAuthorityClient,
475 id: ObjectID,
476 version: Option<u64>,
477) -> (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64) {
478 let start = Instant::now();
479 let resp = client
480 .handle_object_info_request(ObjectInfoRequest {
481 object_id: id,
482 generate_layout: LayoutGenerationOption::Generate,
483 request_kind: match version {
484 None => ObjectInfoRequestKind::LatestObjectInfo,
485 Some(v) => ObjectInfoRequestKind::PastObjectInfoDebug(SequenceNumber::from_u64(v)),
486 },
487 })
488 .await
489 .map_err(anyhow::Error::from);
490 let elapsed = start.elapsed().as_secs_f64();
491
492 let resp_version = resp.as_ref().ok().map(|r| r.object.version().value());
493 (resp_version.map(SequenceNumber::from), resp, elapsed)
494}
495
496pub(crate) fn make_anemo_config() -> anemo_cli::Config {
497 use sui_network::discovery::*;
498 use sui_network::state_sync::*;
499
500 anemo_cli::Config::new()
502 .add_service(
504 "Discovery",
505 anemo_cli::ServiceInfo::new().add_method(
506 "GetKnownPeersV2",
507 anemo_cli::ron_method!(DiscoveryClient, get_known_peers_v2, ()),
508 ),
509 )
510 .add_service(
512 "StateSync",
513 anemo_cli::ServiceInfo::new()
514 .add_method(
515 "PushCheckpointSummary",
516 anemo_cli::ron_method!(
517 StateSyncClient,
518 push_checkpoint_summary,
519 sui_types::messages_checkpoint::CertifiedCheckpointSummary
520 ),
521 )
522 .add_method(
523 "GetCheckpointSummary",
524 anemo_cli::ron_method!(
525 StateSyncClient,
526 get_checkpoint_summary,
527 GetCheckpointSummaryRequest
528 ),
529 )
530 .add_method(
531 "GetCheckpointContents",
532 anemo_cli::ron_method!(
533 StateSyncClient,
534 get_checkpoint_contents,
535 sui_types::messages_checkpoint::CheckpointContentsDigest
536 ),
537 )
538 .add_method(
539 "GetCheckpointAvailability",
540 anemo_cli::ron_method!(StateSyncClient, get_checkpoint_availability, ()),
541 ),
542 )
543}
544
545fn copy_dir_all(
546 src: impl AsRef<Path>,
547 dst: impl AsRef<Path>,
548 skip: Vec<PathBuf>,
549) -> io::Result<()> {
550 fs::create_dir_all(&dst)?;
551 for entry in fs::read_dir(src)? {
552 let entry = entry?;
553 let ty = entry.file_type()?;
554 if skip.contains(&entry.path()) {
555 continue;
556 }
557 if ty.is_dir() {
558 copy_dir_all(
559 entry.path(),
560 dst.as_ref().join(entry.file_name()),
561 skip.clone(),
562 )?;
563 } else {
564 fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
565 }
566 }
567 Ok(())
568}
569
570pub async fn restore_from_db_checkpoint(
571 config: &NodeConfig,
572 db_checkpoint_path: &Path,
573) -> Result<(), anyhow::Error> {
574 copy_dir_all(db_checkpoint_path, config.db_path(), vec![])?;
575 Ok(())
576}
577
578fn start_summary_sync(
579 perpetual_db: Arc<AuthorityPerpetualTables>,
580 committee_store: Arc<CommitteeStore>,
581 checkpoint_store: Arc<CheckpointStore>,
582 m: MultiProgress,
583 genesis: Genesis,
584 ingestion_url: String,
585 epoch: u64,
586 num_parallel_downloads: usize,
587 verify: bool,
588) -> JoinHandle<Result<(), anyhow::Error>> {
589 tokio::spawn(async move {
590 info!("Starting summary sync");
591 let store = AuthorityStore::open_no_genesis(perpetual_db, false, &Registry::default())?;
592 let cache_traits = build_execution_cache_from_env(&Registry::default(), &store);
593 let state_sync_store =
594 RocksDbStore::new(cache_traits, committee_store, checkpoint_store.clone());
595 if checkpoint_store
597 .get_checkpoint_by_digest(genesis.checkpoint().digest())
598 .unwrap()
599 .is_none()
600 {
601 checkpoint_store.insert_checkpoint_contents(genesis.checkpoint_contents().clone())?;
602 checkpoint_store.insert_verified_checkpoint(&genesis.checkpoint())?;
603 checkpoint_store.update_highest_synced_checkpoint(&genesis.checkpoint())?;
604 }
605
606 let end_of_epoch_checkpoint_seq_nums: Vec<_> =
607 end_of_epoch_data(ingestion_url.clone(), vec![], 5)
608 .await?
609 .into_iter()
610 .take((epoch + 1) as usize)
611 .collect();
612
613 let last_checkpoint = end_of_epoch_checkpoint_seq_nums
614 .last()
615 .expect("Expected at least one checkpoint");
616
617 let num_to_sync = end_of_epoch_checkpoint_seq_nums.len() as u64;
618 let sync_progress_bar = m.add(
619 ProgressBar::new(num_to_sync).with_style(
620 ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})")
621 .unwrap(),
622 ),
623 );
624
625 let cloned_progress_bar = sync_progress_bar.clone();
626 let sync_checkpoint_counter = Arc::new(AtomicU64::new(0));
627 let s_instant = Instant::now();
628
629 let cloned_counter = sync_checkpoint_counter.clone();
630 let latest_synced = checkpoint_store
631 .get_highest_synced_checkpoint()?
632 .map(|c| c.sequence_number)
633 .unwrap_or(0);
634 let s_start = latest_synced
635 .checked_add(1)
636 .context("Checkpoint overflow")
637 .map_err(|_| anyhow!("Failed to increment checkpoint"))?;
638 tokio::spawn(async move {
639 loop {
640 if cloned_progress_bar.is_finished() {
641 break;
642 }
643 let num_summaries = cloned_counter.load(Ordering::Relaxed);
644 let total_checkpoints_per_sec =
645 num_summaries as f64 / s_instant.elapsed().as_secs_f64();
646 cloned_progress_bar.set_position(s_start + num_summaries);
647 cloned_progress_bar.set_message(format!(
648 "checkpoints synced per sec: {}",
649 total_checkpoints_per_sec
650 ));
651 tokio::time::sleep(Duration::from_secs(1)).await;
652 }
653 });
654
655 read_summaries_for_list_no_verify(
656 ingestion_url,
657 num_parallel_downloads,
658 state_sync_store.clone(),
659 end_of_epoch_checkpoint_seq_nums.clone(),
660 sync_checkpoint_counter,
661 )
662 .await?;
663 sync_progress_bar.finish_with_message("Checkpoint summary sync is complete");
664
665 let checkpoint = checkpoint_store
666 .get_checkpoint_by_sequence_number(*last_checkpoint)?
667 .ok_or(anyhow!("Failed to read last checkpoint"))?;
668 if verify {
669 let verify_progress_bar = m.add(
670 ProgressBar::new(num_to_sync).with_style(
671 ProgressStyle::with_template(
672 "[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})",
673 )
674 .unwrap(),
675 ),
676 );
677 let cloned_verify_progress_bar = verify_progress_bar.clone();
678 let verify_checkpoint_counter = Arc::new(AtomicU64::new(0));
679 let cloned_verify_counter = verify_checkpoint_counter.clone();
680 let v_instant = Instant::now();
681
682 tokio::spawn(async move {
683 loop {
684 if cloned_verify_progress_bar.is_finished() {
685 break;
686 }
687 let num_summaries = cloned_verify_counter.load(Ordering::Relaxed);
688 let total_checkpoints_per_sec =
689 num_summaries as f64 / v_instant.elapsed().as_secs_f64();
690 cloned_verify_progress_bar.set_position(num_summaries);
691 cloned_verify_progress_bar.set_message(format!(
692 "checkpoints verified per sec: {}",
693 total_checkpoints_per_sec
694 ));
695 tokio::time::sleep(Duration::from_secs(1)).await;
696 }
697 });
698
699 for (cp_epoch, epoch_last_cp_seq_num) in
700 end_of_epoch_checkpoint_seq_nums.iter().enumerate()
701 {
702 let epoch_last_checkpoint = checkpoint_store
703 .get_checkpoint_by_sequence_number(*epoch_last_cp_seq_num)?
704 .ok_or(anyhow!("Failed to read checkpoint"))?;
705 let committee = state_sync_store.get_committee(cp_epoch as u64).expect(
706 "Expected committee to exist after syncing all end of epoch checkpoints",
707 );
708 epoch_last_checkpoint
709 .verify_authority_signatures(&committee)
710 .expect("Failed to verify checkpoint");
711 verify_checkpoint_counter.fetch_add(1, Ordering::Relaxed);
712 }
713
714 verify_progress_bar.finish_with_message("Checkpoint summary verification is complete");
715 }
716
717 checkpoint_store.update_highest_verified_checkpoint(&checkpoint)?;
718 checkpoint_store.update_highest_synced_checkpoint(&checkpoint)?;
719 checkpoint_store.update_highest_executed_checkpoint(&checkpoint)?;
720 checkpoint_store.update_highest_pruned_checkpoint(&checkpoint)?;
721 Ok::<(), anyhow::Error>(())
722 })
723}
724
725pub async fn get_latest_available_epoch(
726 snapshot_store_config: &ObjectStoreConfig,
727) -> Result<u64, anyhow::Error> {
728 let remote_object_store = if snapshot_store_config.no_sign_request {
729 snapshot_store_config.make_http()?
730 } else {
731 snapshot_store_config.make().map(Arc::new)?
732 };
733 let manifest_contents = remote_object_store
734 .get_bytes(&get_path(MANIFEST_FILENAME))
735 .await?;
736 let root_manifest: Manifest = serde_json::from_slice(&manifest_contents)
737 .map_err(|err| anyhow!("Error parsing MANIFEST from bytes: {}", err))?;
738 let epoch = root_manifest
739 .available_epochs
740 .iter()
741 .max()
742 .ok_or(anyhow!("No snapshot found in manifest"))?;
743 Ok(*epoch)
744}
745
746pub async fn check_completed_snapshot(
747 snapshot_store_config: &ObjectStoreConfig,
748 epoch: EpochId,
749) -> Result<(), anyhow::Error> {
750 let success_marker = format!("epoch_{}/_SUCCESS", epoch);
751 let remote_object_store = if snapshot_store_config.no_sign_request {
752 snapshot_store_config.make_http()?
753 } else {
754 snapshot_store_config.make().map(Arc::new)?
755 };
756 if exists(&remote_object_store, &get_path(success_marker.as_str())).await {
757 Ok(())
758 } else {
759 Err(anyhow!(
760 "missing success marker at {}/{}",
761 snapshot_store_config.bucket.as_ref().unwrap_or(
762 &snapshot_store_config
763 .clone()
764 .aws_endpoint
765 .unwrap_or("unknown_bucket".to_string())
766 ),
767 success_marker
768 ))
769 }
770}
771
772pub async fn download_formal_snapshot(
773 path: &Path,
774 epoch: EpochId,
775 genesis: &Path,
776 snapshot_store_config: ObjectStoreConfig,
777 ingestion_url: &str,
778 num_parallel_downloads: usize,
779 network: Chain,
780 verify: SnapshotVerifyMode,
781 max_retries: usize,
782) -> Result<(), anyhow::Error> {
783 let m = MultiProgress::new();
784 m.println(format!(
785 "Beginning formal snapshot restore to end of epoch {}, network: {:?}, verification mode: {:?}",
786 epoch, network, verify,
787 ))?;
788 let path = path.join("staging").to_path_buf();
789 if path.exists() {
790 fs::remove_dir_all(path.clone())?;
791 }
792
793 let registry_service =
795 mysten_metrics::start_prometheus_server("127.0.0.1:9184".parse().unwrap());
796 let prometheus_registry = registry_service.default_registry();
797 DBMetrics::init(registry_service.clone());
798 mysten_metrics::init_metrics(&prometheus_registry);
799
800 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(
801 &path.join("store"),
802 None,
803 None,
804 ));
805 let genesis = Genesis::load(genesis).unwrap();
806 let genesis_committee = genesis.committee()?;
807 let committee_store = Arc::new(CommitteeStore::new(
808 path.join("epochs"),
809 &genesis_committee,
810 None,
811 ));
812 let checkpoint_store = CheckpointStore::new(
813 &path.join("checkpoints"),
814 Arc::new(PrunerWatermarks::default()),
815 );
816
817 let summaries_handle = start_summary_sync(
818 perpetual_db.clone(),
819 committee_store.clone(),
820 checkpoint_store.clone(),
821 m.clone(),
822 genesis.clone(),
823 ingestion_url.to_string(),
824 epoch,
825 num_parallel_downloads,
826 verify != SnapshotVerifyMode::None,
827 );
828 let (_abort_handle, abort_registration) = AbortHandle::new_pair();
829 let perpetual_db_clone = perpetual_db.clone();
830 let snapshot_dir = path.parent().unwrap().join("snapshot");
831 if snapshot_dir.exists() {
832 fs::remove_dir_all(snapshot_dir.clone())?;
833 }
834 let snapshot_dir_clone = snapshot_dir.clone();
835
836 let (sender, mut receiver) = mpsc::channel(num_parallel_downloads);
839 let m_clone = m.clone();
840
841 let snapshot_handle = tokio::spawn(async move {
842 let local_store_config = ObjectStoreConfig {
843 object_store: Some(ObjectStoreType::File),
844 directory: Some(snapshot_dir_clone.to_path_buf()),
845 ..Default::default()
846 };
847 let mut reader = StateSnapshotReaderV1::new(
848 epoch,
849 &snapshot_store_config,
850 &local_store_config,
851 NonZeroUsize::new(num_parallel_downloads).unwrap(),
852 m_clone,
853 false, max_retries,
855 )
856 .await
857 .unwrap_or_else(|err| panic!("Failed to create reader: {}", err));
858 reader
859 .read(&perpetual_db_clone, abort_registration, Some(sender))
860 .await
861 .unwrap_or_else(|err| panic!("Failed during read: {}", err));
862 Ok::<(), anyhow::Error>(())
863 });
864 let mut root_global_state_hash = GlobalStateHash::default();
865 let mut num_live_objects = 0;
866 while let Some((partial_hash, num_objects)) = receiver.recv().await {
867 num_live_objects += num_objects;
868 root_global_state_hash.union(&partial_hash);
869 }
870 summaries_handle
871 .await
872 .expect("Task join failed")
873 .expect("Summaries task failed");
874
875 let last_checkpoint = checkpoint_store
876 .get_highest_verified_checkpoint()?
877 .expect("Expected nonempty checkpoint store");
878
879 if verify != SnapshotVerifyMode::None {
881 assert_eq!(
882 last_checkpoint.epoch(),
883 epoch,
884 "Expected highest verified checkpoint ({}) to be for epoch {} but was for epoch {}",
885 last_checkpoint.sequence_number,
886 epoch,
887 last_checkpoint.epoch()
888 );
889 let commitment = last_checkpoint
890 .end_of_epoch_data
891 .as_ref()
892 .expect("Expected highest verified checkpoint to have end of epoch data")
893 .epoch_commitments
894 .last()
895 .expect(
896 "End of epoch has no commitments. This likely means that the epoch \
897 you are attempting to restore from does not support end of epoch state \
898 digest commitment. If restoring from mainnet, `--epoch` must be > 20, \
899 and for testnet, `--epoch` must be > 12.",
900 );
901 match commitment {
902 CheckpointCommitment::ECMHLiveObjectSetDigest(consensus_digest) => {
903 let local_digest: ECMHLiveObjectSetDigest = root_global_state_hash.digest().into();
904 assert_eq!(
905 *consensus_digest, local_digest,
906 "End of epoch {} root state digest {} does not match \
907 local root state hash {} computed from snapshot data",
908 epoch, consensus_digest.digest, local_digest.digest,
909 );
910 let progress_bar = m.add(
911 ProgressBar::new(1).with_style(
912 ProgressStyle::with_template(
913 "[{elapsed_precise}] {wide_bar} Verifying snapshot contents against root state hash ({msg})",
914 )
915 .unwrap(),
916 ),
917 );
918 progress_bar.finish_with_message("Verification complete");
919 }
920 _ => return Err(anyhow!("Expected ECMHLiveObjectSetDigest")),
921 };
922 } else {
923 m.println(
924 "WARNING: Skipping snapshot verification! \
925 This is highly discouraged unless you fully trust the source of this snapshot and its contents.
926 If this was unintentional, rerun with `--verify` set to `normal` or `strict`.",
927 )?;
928 }
929
930 snapshot_handle
931 .await
932 .expect("Task join failed")
933 .expect("Snapshot restore task failed");
934
935 checkpoint_store.insert_epoch_last_checkpoint(epoch, &last_checkpoint)?;
939
940 setup_db_state(
941 epoch,
942 root_global_state_hash.clone(),
943 perpetual_db.clone(),
944 checkpoint_store,
945 committee_store,
946 network,
947 verify == SnapshotVerifyMode::Strict,
948 num_live_objects,
949 m,
950 )
951 .await?;
952
953 let new_path = path.parent().unwrap().join("live");
954 if new_path.exists() {
955 fs::remove_dir_all(new_path.clone())?;
956 }
957 fs::rename(&path, &new_path)?;
958 fs::remove_dir_all(snapshot_dir.clone())?;
959 println!(
960 "Successfully restored state from snapshot at end of epoch {}",
961 epoch
962 );
963
964 Ok(())
965}
966
967pub async fn download_db_snapshot(
968 path: &Path,
969 epoch: u64,
970 snapshot_store_config: ObjectStoreConfig,
971 skip_indexes: bool,
972 num_parallel_downloads: usize,
973 max_retries: usize,
974) -> Result<(), anyhow::Error> {
975 let remote_store = if snapshot_store_config.no_sign_request {
976 snapshot_store_config.make_http()?
977 } else {
978 snapshot_store_config.make().map(Arc::new)?
979 };
980
981 let manifest_contents = remote_store.get_bytes(&get_path(MANIFEST_FILENAME)).await?;
983 let root_manifest: Manifest = serde_json::from_slice(&manifest_contents)
984 .map_err(|err| anyhow!("Error parsing MANIFEST from bytes: {}", err))?;
985
986 if !root_manifest.epoch_exists(epoch) {
987 return Err(anyhow!(
988 "Epoch dir {} doesn't exist on the remote store",
989 epoch
990 ));
991 }
992
993 let epoch_path = format!("epoch_{}", epoch);
994 let epoch_dir = get_path(&epoch_path);
995
996 let manifest_file = epoch_dir.child(MANIFEST_FILENAME);
997 let epoch_manifest_contents =
998 String::from_utf8(remote_store.get_bytes(&manifest_file).await?.to_vec())
999 .map_err(|err| anyhow!("Error parsing {}/MANIFEST from bytes: {}", epoch_path, err))?;
1000
1001 let epoch_manifest =
1002 PerEpochManifest::deserialize_from_newline_delimited(&epoch_manifest_contents);
1003
1004 let mut files: Vec<String> = vec![];
1005 files.extend(epoch_manifest.filter_by_prefix("store/perpetual").lines);
1006 files.extend(epoch_manifest.filter_by_prefix("epochs").lines);
1007 files.extend(epoch_manifest.filter_by_prefix("checkpoints").lines);
1008 if !skip_indexes {
1009 files.extend(epoch_manifest.filter_by_prefix("indexes").lines)
1010 }
1011 let local_store = ObjectStoreConfig {
1012 object_store: Some(ObjectStoreType::File),
1013 directory: Some(path.to_path_buf()),
1014 ..Default::default()
1015 }
1016 .make()?;
1017 let m = MultiProgress::new();
1018 let path = path.to_path_buf();
1019 let snapshot_handle = tokio::spawn(async move {
1020 let progress_bar = m.add(
1021 ProgressBar::new(files.len() as u64).with_style(
1022 ProgressStyle::with_template(
1023 "[{elapsed_precise}] {wide_bar} {pos} out of {len} files done ({msg})",
1024 )
1025 .unwrap(),
1026 ),
1027 );
1028 let cloned_progress_bar = progress_bar.clone();
1029 let file_counter = Arc::new(AtomicUsize::new(0));
1030 futures::stream::iter(files.iter())
1031 .map(|file| {
1032 let local_store = local_store.clone();
1033 let remote_store = remote_store.clone();
1034 let counter_cloned = file_counter.clone();
1035 async move {
1036 counter_cloned.fetch_add(1, Ordering::Relaxed);
1037 let file_path = get_path(format!("epoch_{}/{}", epoch, file).as_str());
1038
1039 let mut attempts = 0;
1040 let max_attempts = max_retries + 1;
1041 loop {
1042 attempts += 1;
1043 match copy_file(&file_path, &file_path, &remote_store, &local_store).await {
1044 Ok(()) => break,
1045 Err(e) if attempts >= max_attempts => {
1046 return Err(anyhow::anyhow!(
1047 "Failed to download {} after {} attempts: {}",
1048 file_path,
1049 attempts,
1050 e
1051 ));
1052 }
1053 Err(e) => {
1054 tracing::warn!(
1055 "Failed to download {} (attempt {}/{}): {}, retrying in {}ms",
1056 file_path,
1057 attempts,
1058 max_attempts,
1059 e,
1060 1000 * attempts
1061 );
1062 tokio::time::sleep(Duration::from_millis(1000 * attempts as u64))
1063 .await;
1064 }
1065 }
1066 }
1067
1068 Ok::<::object_store::path::Path, anyhow::Error>(file_path.clone())
1069 }
1070 })
1071 .boxed()
1072 .buffer_unordered(num_parallel_downloads)
1073 .try_for_each(|path| {
1074 file_counter.fetch_sub(1, Ordering::Relaxed);
1075 cloned_progress_bar.inc(1);
1076 cloned_progress_bar.set_message(format!(
1077 "Downloading file: {}, #downloads_in_progress: {}",
1078 path,
1079 file_counter.load(Ordering::Relaxed)
1080 ));
1081 futures::future::ready(Ok(()))
1082 })
1083 .await?;
1084 progress_bar.finish_with_message("Snapshot file download is complete");
1085 Ok::<(), anyhow::Error>(())
1086 });
1087
1088 let tasks: Vec<_> = vec![Box::pin(snapshot_handle)];
1089 join_all(tasks)
1090 .await
1091 .into_iter()
1092 .collect::<Result<Vec<_>, _>>()?
1093 .into_iter()
1094 .for_each(|result| result.expect("Task failed"));
1095
1096 let store_dir = path.join("store");
1097 if store_dir.exists() {
1098 fs::remove_dir_all(&store_dir)?;
1099 }
1100 let epochs_dir = path.join("epochs");
1101 if epochs_dir.exists() {
1102 fs::remove_dir_all(&epochs_dir)?;
1103 }
1104 Ok(())
1105}