1use anyhow::Result;
6use fastcrypto::traits::ToFromBytes;
7use futures::future::AbortHandle;
8use futures::future::join_all;
9use itertools::Itertools;
10use std::collections::BTreeMap;
11use std::fmt::Write;
12use std::num::NonZeroUsize;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
16use std::time::Duration;
17use std::{fs, io};
18use sui_config::{NodeConfig, genesis::Genesis};
19use sui_core::authority_client::{AuthorityAPI, NetworkAuthorityClient};
20use sui_core::execution_cache::build_execution_cache_from_env;
21use sui_data_ingestion_core::{CheckpointReader, create_remote_store_client, end_of_epoch_data};
22use sui_network::default_mysten_network_config;
23use sui_protocol_config::Chain;
24use sui_sdk::SuiClient;
25use sui_sdk::SuiClientBuilder;
26use sui_storage::object_store::http::HttpDownloaderBuilder;
27use sui_storage::object_store::util::MANIFEST_FILENAME;
28use sui_storage::object_store::util::Manifest;
29use sui_storage::object_store::util::PerEpochManifest;
30use sui_types::committee::QUORUM_THRESHOLD;
31use sui_types::crypto::AuthorityPublicKeyBytes;
32use sui_types::global_state_hash::GlobalStateHash;
33use sui_types::messages_grpc::LayoutGenerationOption;
34use sui_types::multiaddr::Multiaddr;
35use sui_types::{base_types::*, object::Owner};
36use tokio::sync::mpsc;
37use tokio::task::JoinHandle;
38use tokio::time::Instant;
39
40use anyhow::anyhow;
41use clap::ValueEnum;
42use eyre::ContextCompat;
43use fastcrypto::hash::MultisetHash;
44use futures::{StreamExt, TryStreamExt};
45use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
46use prometheus::Registry;
47use serde::{Deserialize, Serialize};
48use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
49use sui_core::authority::AuthorityStore;
50use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
51use sui_core::checkpoints::CheckpointStore;
52use sui_core::epoch::committee_store::CommitteeStore;
53use sui_core::storage::RocksDbStore;
54use sui_snapshot::reader::StateSnapshotReaderV1;
55use sui_snapshot::setup_db_state;
56use sui_storage::object_store::ObjectStoreGetExt;
57use sui_storage::object_store::util::{copy_file, exists, get_path};
58use sui_types::messages_checkpoint::{CheckpointCommitment, ECMHLiveObjectSetDigest};
59use sui_types::messages_grpc::{
60 ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest,
61 TransactionStatus,
62};
63
64use crate::formal_snapshot_util::read_summaries_for_list_no_verify;
65use sui_core::authority::authority_store_pruner::PrunerWatermarks;
66use sui_types::storage::ReadStore;
67use tracing::{info, warn};
68use typed_store::DBMetrics;
69
70pub mod commands;
71#[cfg(not(tidehunter))]
72pub mod db_tool;
73mod formal_snapshot_util;
74
75#[derive(
76 Clone, Serialize, Deserialize, Debug, PartialEq, Copy, PartialOrd, Ord, Eq, ValueEnum, Default,
77)]
78pub enum SnapshotVerifyMode {
79 None,
84 #[default]
87 Normal,
88 Strict,
91}
92
93async fn make_clients(
95 sui_client: &Arc<SuiClient>,
96) -> Result<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>> {
97 let mut net_config = default_mysten_network_config();
98 net_config.connect_timeout = Some(Duration::from_secs(5));
99 let mut authority_clients = BTreeMap::new();
100
101 let active_validators = sui_client
102 .governance_api()
103 .get_latest_sui_system_state()
104 .await?
105 .active_validators;
106
107 for validator in active_validators {
108 let net_addr = Multiaddr::try_from(validator.net_address).unwrap();
109 let tls_config = sui_tls::create_rustls_client_config(
110 sui_types::crypto::NetworkPublicKey::from_bytes(&validator.network_pubkey_bytes)?,
111 sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
112 None,
113 );
114 let channel = net_config
115 .connect_lazy(&net_addr, tls_config)
116 .map_err(|err| anyhow!(err.to_string()))?;
117 let client = NetworkAuthorityClient::new(channel);
118 let public_key_bytes =
119 AuthorityPublicKeyBytes::from_bytes(&validator.protocol_pubkey_bytes)?;
120 authority_clients.insert(public_key_bytes, (net_addr.clone(), client));
121 }
122
123 Ok(authority_clients)
124}
125
126type ObjectVersionResponses = (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64);
127pub struct ObjectData {
128 requested_id: ObjectID,
129 responses: Vec<(AuthorityName, Multiaddr, ObjectVersionResponses)>,
130}
131
132trait OptionDebug<T> {
133 fn opt_debug(&self, def_str: &str) -> String;
134}
135
136impl<T> OptionDebug<T> for Option<T>
137where
138 T: std::fmt::Debug,
139{
140 fn opt_debug(&self, def_str: &str) -> String {
141 match self {
142 None => def_str.to_string(),
143 Some(t) => format!("{:?}", t),
144 }
145 }
146}
147
148#[allow(clippy::type_complexity)]
149pub struct GroupedObjectOutput {
150 pub grouped_results: BTreeMap<
151 Option<(
152 Option<SequenceNumber>,
153 ObjectDigest,
154 TransactionDigest,
155 Owner,
156 Option<TransactionDigest>,
157 )>,
158 Vec<AuthorityName>,
159 >,
160 pub voting_power: Vec<(
161 Option<(
162 Option<SequenceNumber>,
163 ObjectDigest,
164 TransactionDigest,
165 Owner,
166 Option<TransactionDigest>,
167 )>,
168 u64,
169 )>,
170 pub available_voting_power: u64,
171 pub fully_locked: bool,
172}
173
174impl GroupedObjectOutput {
175 pub fn new(
176 object_data: ObjectData,
177 committee: Arc<BTreeMap<AuthorityPublicKeyBytes, u64>>,
178 ) -> Self {
179 let mut grouped_results = BTreeMap::new();
180 let mut voting_power = BTreeMap::new();
181 let mut available_voting_power = 0;
182 for (name, _, (version, resp, _elapsed)) in &object_data.responses {
183 let stake = committee.get(name).unwrap();
184 let key = match resp {
185 Ok(r) => {
186 let obj_digest = r.object.compute_object_reference().2;
187 let parent_tx_digest = r.object.previous_transaction;
188 let owner = r.object.owner.clone();
189 let lock = r.lock_for_debugging.as_ref().map(|lock| *lock.digest());
190 if lock.is_none() {
191 available_voting_power += stake;
192 }
193 Some((*version, obj_digest, parent_tx_digest, owner, lock))
194 }
195 Err(_) => None,
196 };
197 let entry = grouped_results.entry(key.clone()).or_insert_with(Vec::new);
198 entry.push(*name);
199 let entry: &mut u64 = voting_power.entry(key).or_default();
200 *entry += stake;
201 }
202 let voting_power = voting_power
203 .into_iter()
204 .sorted_by(|(_, v1), (_, v2)| Ord::cmp(v2, v1))
205 .collect::<Vec<_>>();
206 let mut fully_locked = false;
207 if !voting_power.is_empty()
208 && voting_power.first().unwrap().1 + available_voting_power < QUORUM_THRESHOLD
209 {
210 fully_locked = true;
211 }
212 Self {
213 grouped_results,
214 voting_power,
215 available_voting_power,
216 fully_locked,
217 }
218 }
219}
220
221#[allow(clippy::format_in_format_args)]
222impl std::fmt::Display for GroupedObjectOutput {
223 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224 writeln!(f, "available stake: {}", self.available_voting_power)?;
225 writeln!(f, "fully locked: {}", self.fully_locked)?;
226 writeln!(f, "{:<100}\n", "-".repeat(100))?;
227 for (key, stake) in &self.voting_power {
228 let val = self.grouped_results.get(key).unwrap();
229 writeln!(f, "total stake: {stake}")?;
230 match key {
231 Some((_version, obj_digest, parent_tx_digest, owner, lock)) => {
232 let lock = lock.opt_debug("no-known-lock");
233 writeln!(f, "obj ref: {obj_digest}")?;
234 writeln!(f, "parent tx: {parent_tx_digest}")?;
235 writeln!(f, "owner: {owner}")?;
236 writeln!(f, "lock: {lock}")?;
237 for (i, name) in val.iter().enumerate() {
238 writeln!(f, " {:<4} {:<20}", i, name.concise(),)?;
239 }
240 }
241 None => {
242 writeln!(f, "ERROR")?;
243 for (i, name) in val.iter().enumerate() {
244 writeln!(f, " {:<4} {:<20}", i, name.concise(),)?;
245 }
246 }
247 };
248 writeln!(f, "{:<100}\n", "-".repeat(100))?;
249 }
250 Ok(())
251 }
252}
253
254struct ConciseObjectOutput(ObjectData);
255
256impl ConciseObjectOutput {
257 fn header() -> String {
258 format!(
259 "{:<20} {:<8} {:<66} {:<45} {}",
260 "validator", "version", "digest", "parent_cert", "owner"
261 )
262 }
263}
264
265impl std::fmt::Display for ConciseObjectOutput {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 for (name, _multi_addr, (version, resp, _time_elapsed)) in &self.0.responses {
268 write!(
269 f,
270 "{:<20} {:<8}",
271 format!("{:?}", name.concise()),
272 version.map(|s| s.value()).opt_debug("-")
273 )?;
274 match resp {
275 Err(_) => writeln!(
276 f,
277 "{:<66} {:<45} {:<51}",
278 "object-fetch-failed", "no-cert-available", "no-owner-available"
279 )?,
280 Ok(resp) => {
281 let obj_digest = resp.object.compute_object_reference().2;
282 let parent = resp.object.previous_transaction;
283 let owner = resp.object.owner.clone();
284 write!(f, " {:<66} {:<45} {:<51}", obj_digest, parent, owner)?;
285 }
286 }
287 writeln!(f)?;
288 }
289 Ok(())
290 }
291}
292
293struct VerboseObjectOutput(ObjectData);
294
295impl std::fmt::Display for VerboseObjectOutput {
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 writeln!(f, "Object: {}", self.0.requested_id)?;
298
299 for (name, multiaddr, (version, resp, timespent)) in &self.0.responses {
300 writeln!(f, "validator: {:?}, addr: {:?}", name.concise(), multiaddr)?;
301 writeln!(
302 f,
303 "-- version: {} ({:.3}s)",
304 version.opt_debug("<version not available>"),
305 timespent,
306 )?;
307
308 match resp {
309 Err(e) => writeln!(f, "Error fetching object: {}", e)?,
310 Ok(resp) => {
311 writeln!(
312 f,
313 " -- object digest: {}",
314 resp.object.compute_object_reference().2
315 )?;
316 if resp.object.is_package() {
317 writeln!(f, " -- object: <Move Package>")?;
318 } else if let Some(layout) = &resp.layout {
319 writeln!(
320 f,
321 " -- object: Move Object: {}",
322 resp.object
323 .data
324 .try_as_move()
325 .unwrap()
326 .to_move_struct(layout)
327 .unwrap()
328 )?;
329 }
330 writeln!(f, " -- owner: {}", resp.object.owner)?;
331 writeln!(
332 f,
333 " -- locked by: {}",
334 resp.lock_for_debugging.opt_debug("<not locked>")
335 )?;
336 }
337 }
338 }
339 Ok(())
340 }
341}
342
343pub async fn get_object(
344 obj_id: ObjectID,
345 version: Option<u64>,
346 validator: Option<AuthorityName>,
347 clients: Arc<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>>,
348) -> Result<ObjectData> {
349 let responses = join_all(
350 clients
351 .iter()
352 .filter(|(name, _)| {
353 if let Some(v) = validator {
354 v == **name
355 } else {
356 true
357 }
358 })
359 .map(|(name, (address, client))| async {
360 let object_version = get_object_impl(client, obj_id, version).await;
361 (*name, address.clone(), object_version)
362 }),
363 )
364 .await;
365
366 Ok(ObjectData {
367 requested_id: obj_id,
368 responses,
369 })
370}
371
372pub async fn get_transaction_block(
373 tx_digest: TransactionDigest,
374 show_input_tx: bool,
375 fullnode_rpc: String,
376) -> Result<String> {
377 let sui_client = Arc::new(SuiClientBuilder::default().build(fullnode_rpc).await?);
378 let clients = make_clients(&sui_client).await?;
379 let timer = Instant::now();
380 let responses = join_all(clients.iter().map(|(name, (address, client))| async {
381 let result = client
382 .handle_transaction_info_request(TransactionInfoRequest {
383 transaction_digest: tx_digest,
384 })
385 .await;
386 (
387 *name,
388 address.clone(),
389 result,
390 timer.elapsed().as_secs_f64(),
391 )
392 }))
393 .await;
394
395 let validator_aware_of_tx = responses.iter().find(|r| r.2.is_ok());
397
398 let responses = responses
399 .iter()
400 .map(|r| {
401 let key =
402 r.2.as_ref()
403 .map(|ok_result| match &ok_result.status {
404 TransactionStatus::Signed(_) => None,
405 TransactionStatus::Executed(_, effects, _) => Some(effects.digest()),
406 })
407 .ok();
408 let err = r.2.as_ref().err();
409 (key, err, r)
410 })
411 .sorted_by(|(k1, err1, _), (k2, err2, _)| {
412 Ord::cmp(k1, k2).then_with(|| Ord::cmp(err1, err2))
413 })
414 .chunk_by(|(_, _err, r)| {
415 r.2.as_ref().map(|ok_result| match &ok_result.status {
416 TransactionStatus::Signed(_) => None,
417 TransactionStatus::Executed(_, effects, _) => Some((
418 ok_result.transaction.transaction_data(),
419 effects.data(),
420 effects.digest(),
421 )),
422 })
423 });
424 let mut s = String::new();
425 for (i, (key, group)) in responses.into_iter().enumerate() {
426 match key {
427 Ok(Some((tx, effects, effects_digest))) => {
428 writeln!(
429 &mut s,
430 "#{:<2} tx_digest: {:<68?} effects_digest: {:?}",
431 i, tx_digest, effects_digest,
432 )?;
433 writeln!(&mut s, "{:#?}", effects)?;
434 if show_input_tx {
435 writeln!(&mut s, "{:#?}", tx)?;
436 }
437 }
438 Ok(None) => {
439 writeln!(
440 &mut s,
441 "#{:<2} tx_digest: {:<68?} Signed but not executed",
442 i, tx_digest
443 )?;
444 if show_input_tx {
445 let validator_aware_of_tx = validator_aware_of_tx.unwrap();
447 let client = &clients.get(&validator_aware_of_tx.0).unwrap().1;
448 let tx_info = client.handle_transaction_info_request(TransactionInfoRequest {
449 transaction_digest: tx_digest,
450 }).await.unwrap_or_else(|e| panic!("Validator {:?} should have known about tx_digest: {:?}, got error: {:?}", validator_aware_of_tx.0, tx_digest, e));
451 writeln!(&mut s, "{:#?}", tx_info)?;
452 }
453 }
454 other => {
455 writeln!(&mut s, "#{:<2} {:#?}", i, other)?;
456 }
457 }
458 for (j, (_, _, res)) in group.enumerate() {
459 writeln!(
460 &mut s,
461 " {:<4} {:<20} {:<56} ({:.3}s)",
462 j,
463 res.0.concise(),
464 format!("{}", res.1),
465 res.3
466 )?;
467 }
468 writeln!(&mut s, "{:<100}\n", "-".repeat(100))?;
469 }
470 Ok(s)
471}
472
473async fn get_object_impl(
474 client: &NetworkAuthorityClient,
475 id: ObjectID,
476 version: Option<u64>,
477) -> (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64) {
478 let start = Instant::now();
479 let resp = client
480 .handle_object_info_request(ObjectInfoRequest {
481 object_id: id,
482 generate_layout: LayoutGenerationOption::Generate,
483 request_kind: match version {
484 None => ObjectInfoRequestKind::LatestObjectInfo,
485 Some(v) => ObjectInfoRequestKind::PastObjectInfoDebug(SequenceNumber::from_u64(v)),
486 },
487 })
488 .await
489 .map_err(anyhow::Error::from);
490 let elapsed = start.elapsed().as_secs_f64();
491
492 let resp_version = resp.as_ref().ok().map(|r| r.object.version().value());
493 (resp_version.map(SequenceNumber::from), resp, elapsed)
494}
495
496pub(crate) fn make_anemo_config() -> anemo_cli::Config {
497 use sui_network::discovery::*;
498 use sui_network::state_sync::*;
499
500 anemo_cli::Config::new()
502 .add_service(
504 "Discovery",
505 anemo_cli::ServiceInfo::new().add_method(
506 "GetKnownPeersV2",
507 anemo_cli::ron_method!(DiscoveryClient, get_known_peers_v2, ()),
508 ),
509 )
510 .add_service(
512 "StateSync",
513 anemo_cli::ServiceInfo::new()
514 .add_method(
515 "PushCheckpointSummary",
516 anemo_cli::ron_method!(
517 StateSyncClient,
518 push_checkpoint_summary,
519 sui_types::messages_checkpoint::CertifiedCheckpointSummary
520 ),
521 )
522 .add_method(
523 "GetCheckpointSummary",
524 anemo_cli::ron_method!(
525 StateSyncClient,
526 get_checkpoint_summary,
527 GetCheckpointSummaryRequest
528 ),
529 )
530 .add_method(
531 "GetCheckpointContents",
532 anemo_cli::ron_method!(
533 StateSyncClient,
534 get_checkpoint_contents,
535 sui_types::messages_checkpoint::CheckpointContentsDigest
536 ),
537 )
538 .add_method(
539 "GetCheckpointAvailability",
540 anemo_cli::ron_method!(StateSyncClient, get_checkpoint_availability, ()),
541 ),
542 )
543}
544
545fn copy_dir_all(
546 src: impl AsRef<Path>,
547 dst: impl AsRef<Path>,
548 skip: Vec<PathBuf>,
549) -> io::Result<()> {
550 fs::create_dir_all(&dst)?;
551 for entry in fs::read_dir(src)? {
552 let entry = entry?;
553 let ty = entry.file_type()?;
554 if skip.contains(&entry.path()) {
555 continue;
556 }
557 if ty.is_dir() {
558 copy_dir_all(
559 entry.path(),
560 dst.as_ref().join(entry.file_name()),
561 skip.clone(),
562 )?;
563 } else {
564 fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
565 }
566 }
567 Ok(())
568}
569
570pub async fn restore_from_db_checkpoint(
571 config: &NodeConfig,
572 db_checkpoint_path: &Path,
573) -> Result<(), anyhow::Error> {
574 copy_dir_all(db_checkpoint_path, config.db_path(), vec![])?;
575 Ok(())
576}
577
578fn start_summary_sync(
579 perpetual_db: Arc<AuthorityPerpetualTables>,
580 committee_store: Arc<CommitteeStore>,
581 checkpoint_store: Arc<CheckpointStore>,
582 m: MultiProgress,
583 genesis: Genesis,
584 ingestion_url: String,
585 num_parallel_downloads: usize,
586 verify: bool,
587 end_of_epoch_checkpoint_seq_nums: Vec<u64>,
588) -> JoinHandle<Result<(), anyhow::Error>> {
589 tokio::spawn(async move {
590 let store = AuthorityStore::open_no_genesis(perpetual_db, false, &Registry::default())?;
591 let cache_traits = build_execution_cache_from_env(&Registry::default(), &store);
592 let state_sync_store =
593 RocksDbStore::new(cache_traits, committee_store, checkpoint_store.clone());
594 if checkpoint_store
596 .get_checkpoint_by_digest(genesis.checkpoint().digest())
597 .unwrap()
598 .is_none()
599 {
600 checkpoint_store.insert_checkpoint_contents(genesis.checkpoint_contents().clone())?;
601 checkpoint_store.insert_verified_checkpoint(&genesis.checkpoint())?;
602 checkpoint_store.update_highest_synced_checkpoint(&genesis.checkpoint())?;
603 }
604
605 let last_checkpoint = end_of_epoch_checkpoint_seq_nums
606 .last()
607 .expect("Expected at least one checkpoint");
608
609 let num_to_sync = end_of_epoch_checkpoint_seq_nums.len() as u64;
610 let sync_progress_bar = m.add(
611 ProgressBar::new(num_to_sync).with_style(
612 ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})")
613 .unwrap(),
614 ),
615 );
616
617 let cloned_progress_bar = sync_progress_bar.clone();
618 let sync_checkpoint_counter = Arc::new(AtomicU64::new(0));
619 let s_instant = Instant::now();
620
621 let cloned_counter = sync_checkpoint_counter.clone();
622 let latest_synced = checkpoint_store
623 .get_highest_synced_checkpoint()?
624 .map(|c| c.sequence_number)
625 .unwrap_or(0);
626 let s_start = latest_synced
627 .checked_add(1)
628 .wrap_err("Checkpoint overflow")
629 .map_err(|_| anyhow!("Failed to increment checkpoint"))?;
630 tokio::spawn(async move {
631 loop {
632 if cloned_progress_bar.is_finished() {
633 break;
634 }
635 let num_summaries = cloned_counter.load(Ordering::Relaxed);
636 let total_checkpoints_per_sec =
637 num_summaries as f64 / s_instant.elapsed().as_secs_f64();
638 cloned_progress_bar.set_position(s_start + num_summaries);
639 cloned_progress_bar.set_message(format!(
640 "checkpoints synced per sec: {}",
641 total_checkpoints_per_sec
642 ));
643 tokio::time::sleep(Duration::from_secs(1)).await;
644 }
645 });
646
647 read_summaries_for_list_no_verify(
648 ingestion_url,
649 num_parallel_downloads,
650 state_sync_store.clone(),
651 end_of_epoch_checkpoint_seq_nums.clone(),
652 sync_checkpoint_counter,
653 )
654 .await?;
655 sync_progress_bar.finish_with_message("Checkpoint summary sync is complete");
656 info!("Checkpoint summary sync is complete");
657
658 let checkpoint = checkpoint_store
659 .get_checkpoint_by_sequence_number(*last_checkpoint)?
660 .ok_or(anyhow!("Failed to read last checkpoint"))?;
661 if verify {
662 let verify_progress_bar = m.add(
663 ProgressBar::new(num_to_sync).with_style(
664 ProgressStyle::with_template(
665 "[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})",
666 )
667 .unwrap(),
668 ),
669 );
670 let cloned_verify_progress_bar = verify_progress_bar.clone();
671 let verify_checkpoint_counter = Arc::new(AtomicU64::new(0));
672 let cloned_verify_counter = verify_checkpoint_counter.clone();
673 let v_instant = Instant::now();
674
675 tokio::spawn(async move {
676 loop {
677 if cloned_verify_progress_bar.is_finished() {
678 break;
679 }
680 let num_summaries = cloned_verify_counter.load(Ordering::Relaxed);
681 let total_checkpoints_per_sec =
682 num_summaries as f64 / v_instant.elapsed().as_secs_f64();
683 cloned_verify_progress_bar.set_position(num_summaries);
684 cloned_verify_progress_bar.set_message(format!(
685 "checkpoints verified per sec: {}",
686 total_checkpoints_per_sec
687 ));
688 tokio::time::sleep(Duration::from_secs(1)).await;
689 }
690 });
691
692 for (cp_epoch, epoch_last_cp_seq_num) in
693 end_of_epoch_checkpoint_seq_nums.iter().enumerate()
694 {
695 let epoch_last_checkpoint = checkpoint_store
696 .get_checkpoint_by_sequence_number(*epoch_last_cp_seq_num)?
697 .ok_or(anyhow!("Failed to read checkpoint"))?;
698 let committee = state_sync_store.get_committee(cp_epoch as u64).expect(
699 "Expected committee to exist after syncing all end of epoch checkpoints",
700 );
701 epoch_last_checkpoint
702 .verify_authority_signatures(&committee)
703 .expect("Failed to verify checkpoint");
704 verify_checkpoint_counter.fetch_add(1, Ordering::Relaxed);
705 }
706
707 verify_progress_bar.finish_with_message("Checkpoint summary verification is complete");
708 }
709
710 checkpoint_store.update_highest_verified_checkpoint(&checkpoint)?;
711 checkpoint_store.update_highest_synced_checkpoint(&checkpoint)?;
712 checkpoint_store.update_highest_executed_checkpoint(&checkpoint)?;
713 checkpoint_store.update_highest_pruned_checkpoint(&checkpoint)?;
714 Ok::<(), anyhow::Error>(())
715 })
716}
717
718pub async fn get_latest_available_epoch(
719 snapshot_store_config: &ObjectStoreConfig,
720) -> Result<u64, anyhow::Error> {
721 let remote_object_store = if snapshot_store_config.no_sign_request {
722 snapshot_store_config.make_http()?
723 } else {
724 snapshot_store_config.make().map(Arc::new)?
725 };
726 let manifest_contents = remote_object_store
727 .get_bytes(&get_path(MANIFEST_FILENAME))
728 .await?;
729 let root_manifest: Manifest = serde_json::from_slice(&manifest_contents)
730 .map_err(|err| anyhow!("Error parsing MANIFEST from bytes: {}", err))?;
731 let epoch = root_manifest
732 .available_epochs
733 .iter()
734 .max()
735 .ok_or(anyhow!("No snapshot found in manifest"))?;
736 Ok(*epoch)
737}
738
739pub async fn check_completed_snapshot(
740 snapshot_store_config: &ObjectStoreConfig,
741 epoch: EpochId,
742) -> Result<(), anyhow::Error> {
743 let success_marker = format!("epoch_{}/_SUCCESS", epoch);
744 let archive_success_marker = format!("archive/epoch_{}/_SUCCESS", epoch);
745 let remote_object_store = if snapshot_store_config.no_sign_request {
746 snapshot_store_config.make_http()?
747 } else {
748 snapshot_store_config.make().map(Arc::new)?
749 };
750
751 if exists(&remote_object_store, &get_path(success_marker.as_str())).await
753 || exists(
754 &remote_object_store,
755 &get_path(archive_success_marker.as_str()),
756 )
757 .await
758 {
759 Ok(())
760 } else {
761 Err(anyhow!(
762 "missing success marker at {}/{} or {}/{}",
763 snapshot_store_config.bucket.as_ref().unwrap_or(
764 &snapshot_store_config
765 .clone()
766 .aws_endpoint
767 .unwrap_or("unknown_bucket".to_string())
768 ),
769 success_marker,
770 snapshot_store_config.bucket.as_ref().unwrap_or(
771 &snapshot_store_config
772 .clone()
773 .aws_endpoint
774 .unwrap_or("unknown_bucket".to_string())
775 ),
776 archive_success_marker
777 ))
778 }
779}
780
781pub async fn download_formal_snapshot(
782 path: &Path,
783 epoch: EpochId,
784 genesis: &Path,
785 snapshot_store_config: ObjectStoreConfig,
786 ingestion_url: &str,
787 num_parallel_downloads: usize,
788 network: Chain,
789 verify: SnapshotVerifyMode,
790 max_retries: usize,
791) -> Result<(), anyhow::Error> {
792 let m = MultiProgress::new();
793 let msg = format!(
794 "Beginning formal snapshot restore to end of epoch {}, network: {:?}, verification mode: {:?}",
795 epoch, network, verify
796 );
797 m.println(&msg).unwrap();
798 info!("{}", msg);
799
800 let path = path.join("staging").to_path_buf();
801 if path.exists() {
802 fs::remove_dir_all(path.clone())?;
803 }
804
805 let registry_service =
807 mysten_metrics::start_prometheus_server("127.0.0.1:9184".parse().unwrap());
808 let prometheus_registry = registry_service.default_registry();
809 DBMetrics::init(registry_service.clone());
810 mysten_metrics::init_metrics(&prometheus_registry);
811
812 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(
813 &path.join("store"),
814 None,
815 None,
816 ));
817 let genesis = Genesis::load(genesis).unwrap();
818 let genesis_committee = genesis.committee()?;
819 let committee_store = Arc::new(CommitteeStore::new(
820 path.join("epochs"),
821 &genesis_committee,
822 None,
823 ));
824 let checkpoint_store = CheckpointStore::new(
825 &path.join("checkpoints"),
826 Arc::new(PrunerWatermarks::default()),
827 );
828
829 let end_of_epoch_checkpoint_seq_nums: Vec<_> =
830 end_of_epoch_data(ingestion_url.to_string(), vec![], 5)
831 .await?
832 .into_iter()
833 .take((epoch + 1) as usize)
834 .collect();
835
836 let summaries_handle = start_summary_sync(
837 perpetual_db.clone(),
838 committee_store.clone(),
839 checkpoint_store.clone(),
840 m.clone(),
841 genesis.clone(),
842 ingestion_url.to_string(),
843 num_parallel_downloads,
844 verify != SnapshotVerifyMode::None,
845 end_of_epoch_checkpoint_seq_nums.clone(),
846 );
847
848 let backfill_handle = {
850 let perpetual_db = perpetual_db.clone();
851 let ingestion_url = ingestion_url.to_string();
852 let m = m.clone();
853 let end_of_epoch_checkpoint_seq_nums = end_of_epoch_checkpoint_seq_nums.clone();
854 tokio::spawn(async move {
855 backfill_epoch_transaction_digests(
856 perpetual_db,
857 epoch,
858 ingestion_url,
859 num_parallel_downloads,
860 m,
861 end_of_epoch_checkpoint_seq_nums,
862 )
863 .await
864 })
865 };
866
867 let (_abort_handle, abort_registration) = AbortHandle::new_pair();
868 let perpetual_db_clone = perpetual_db.clone();
869 let snapshot_dir = path.parent().unwrap().join("snapshot");
870 if snapshot_dir.exists() {
871 fs::remove_dir_all(snapshot_dir.clone())?;
872 }
873 let snapshot_dir_clone = snapshot_dir.clone();
874
875 let (sender, mut receiver) = mpsc::channel(num_parallel_downloads);
878 let m_clone = m.clone();
879
880 let snapshot_handle = tokio::spawn(async move {
881 let local_store_config = ObjectStoreConfig {
882 object_store: Some(ObjectStoreType::File),
883 directory: Some(snapshot_dir_clone.to_path_buf()),
884 ..Default::default()
885 };
886 let mut reader = StateSnapshotReaderV1::new(
887 epoch,
888 &snapshot_store_config,
889 &local_store_config,
890 NonZeroUsize::new(num_parallel_downloads).unwrap(),
891 m_clone,
892 false, max_retries,
894 )
895 .await
896 .unwrap_or_else(|err| panic!("Failed to create reader: {}", err));
897 reader
898 .read(&perpetual_db_clone, abort_registration, Some(sender))
899 .await
900 .unwrap_or_else(|err| panic!("Failed during read: {}", err));
901 info!("Snapshot download complete");
902 Ok::<(), anyhow::Error>(())
903 });
904 let mut root_global_state_hash = GlobalStateHash::default();
905 let mut num_live_objects = 0;
906 while let Some((partial_hash, num_objects)) = receiver.recv().await {
907 num_live_objects += num_objects;
908 root_global_state_hash.union(&partial_hash);
909 }
910 summaries_handle
911 .await
912 .expect("Task join failed")
913 .expect("Summaries task failed");
914
915 let last_checkpoint = checkpoint_store
916 .get_highest_verified_checkpoint()?
917 .expect("Expected nonempty checkpoint store");
918
919 if verify != SnapshotVerifyMode::None {
921 assert_eq!(
922 last_checkpoint.epoch(),
923 epoch,
924 "Expected highest verified checkpoint ({}) to be for epoch {} but was for epoch {}",
925 last_checkpoint.sequence_number,
926 epoch,
927 last_checkpoint.epoch()
928 );
929 let commitment = last_checkpoint
930 .end_of_epoch_data
931 .as_ref()
932 .expect("Expected highest verified checkpoint to have end of epoch data")
933 .epoch_commitments
934 .last()
935 .expect(
936 "End of epoch has no commitments. This likely means that the epoch \
937 you are attempting to restore from does not support end of epoch state \
938 digest commitment. If restoring from mainnet, `--epoch` must be > 20, \
939 and for testnet, `--epoch` must be > 12.",
940 );
941 match commitment {
942 CheckpointCommitment::ECMHLiveObjectSetDigest(consensus_digest) => {
943 let local_digest: ECMHLiveObjectSetDigest = root_global_state_hash.digest().into();
944 assert_eq!(
945 *consensus_digest, local_digest,
946 "End of epoch {} root state digest {} does not match \
947 local root state hash {} computed from snapshot data",
948 epoch, consensus_digest.digest, local_digest.digest,
949 );
950 let progress_bar = m.add(
951 ProgressBar::new(1).with_style(
952 ProgressStyle::with_template(
953 "[{elapsed_precise}] {wide_bar} Verifying snapshot contents against root state hash ({msg})",
954 )
955 .unwrap(),
956 ),
957 );
958 progress_bar.finish_with_message("Verification complete");
959 }
960 _ => return Err(anyhow!("Expected ECMHLiveObjectSetDigest")),
961 };
962 } else {
963 m.println(
964 "WARNING: Skipping snapshot verification! \
965 This is highly discouraged unless you fully trust the source of this snapshot and its contents.
966 If this was unintentional, rerun with `--verify` set to `normal` or `strict`.",
967 )?;
968 }
969
970 snapshot_handle
971 .await
972 .expect("Task join failed")
973 .expect("Snapshot restore task failed");
974
975 checkpoint_store.insert_epoch_last_checkpoint(epoch, &last_checkpoint)?;
979
980 setup_db_state(
981 epoch,
982 root_global_state_hash.clone(),
983 perpetual_db.clone(),
984 checkpoint_store.clone(),
985 committee_store,
986 network,
987 verify == SnapshotVerifyMode::Strict,
988 num_live_objects,
989 m.clone(),
990 )
991 .await?;
992
993 backfill_handle.await.expect("Task join failed")?;
995
996 let new_path = path.parent().unwrap().join("live");
997 if new_path.exists() {
998 fs::remove_dir_all(new_path.clone())?;
999 }
1000 fs::rename(&path, &new_path)?;
1001 fs::remove_dir_all(snapshot_dir.clone())?;
1002 println!(
1003 "Successfully restored state from snapshot at end of epoch {}",
1004 epoch
1005 );
1006
1007 Ok(())
1008}
1009
1010async fn backfill_epoch_transaction_digests(
1011 perpetual_db: Arc<AuthorityPerpetualTables>,
1012 epoch: EpochId,
1013 ingestion_url: String,
1014 concurrency: usize,
1015 m: MultiProgress,
1016 end_of_epoch_checkpoint_seq_nums: Vec<u64>,
1017) -> Result<()> {
1018 if epoch == 0 {
1019 return Ok(());
1020 }
1021
1022 let epoch_last_cp_seq = end_of_epoch_checkpoint_seq_nums
1028 .get(epoch as usize)
1029 .ok_or_else(|| anyhow!("No checkpoint sequence found for epoch {}", epoch))?;
1030
1031 let epoch_start_cp = if epoch == 0 {
1032 0
1033 } else {
1034 end_of_epoch_checkpoint_seq_nums
1035 .get(epoch as usize - 1)
1036 .map(|cp| cp + 1)
1037 .unwrap_or(0)
1038 };
1039 let msg = format!(
1040 "Beginning transaction digest backfill for epoch: {:?}, backfilling from: {:?}..{:?}",
1041 epoch, epoch_start_cp, epoch_last_cp_seq
1042 );
1043 m.println(&msg).ok();
1044 info!("{}", msg);
1045
1046 let checkpoints_to_fetch: Vec<_> = (epoch_start_cp..=*epoch_last_cp_seq).collect();
1047 let num_checkpoints = checkpoints_to_fetch.len();
1048
1049 let progress_bar = m.add(
1050 ProgressBar::new(num_checkpoints as u64).with_style(
1051 ProgressStyle::with_template(
1052 "[{elapsed_precise}] {wide_bar} {pos}/{len} transactions backfilled ({msg})",
1053 )
1054 .unwrap(),
1055 ),
1056 );
1057
1058 let client = Arc::new(create_remote_store_client(ingestion_url, vec![], 60)?);
1059 let checkpoint_counter = Arc::new(AtomicU64::new(0));
1060 let tx_counter = Arc::new(AtomicU64::new(0));
1061 let cloned_checkpoint_counter = checkpoint_counter.clone();
1062 let cloned_progress_bar = progress_bar.clone();
1063 let start_instant = Instant::now();
1064
1065 tokio::spawn(async move {
1066 loop {
1067 if cloned_progress_bar.is_finished() {
1068 break;
1069 }
1070 let num_checkpoints_processed = cloned_checkpoint_counter.load(Ordering::Relaxed);
1071 let elapsed = start_instant.elapsed().as_secs_f64();
1072 let chkpts_per_sec = if elapsed > 0.0 {
1073 num_checkpoints_processed as f64 / elapsed
1074 } else {
1075 0.0
1076 };
1077 cloned_progress_bar.set_position(num_checkpoints_processed);
1078 cloned_progress_bar.set_message(format!("{:.1} chkpts/sec", chkpts_per_sec));
1079 tokio::time::sleep(Duration::from_millis(100)).await;
1080 }
1081 });
1082
1083 futures::stream::iter(checkpoints_to_fetch)
1084 .map(|sq| {
1085 let client = client.clone();
1086 async move {
1087 let res = CheckpointReader::fetch_from_object_store(&**client, sq).await;
1088 if let Err(ref e) = res {
1089 warn!("Failed to fetch checkpoint {}: {:?}", sq, e);
1090 }
1091 res
1092 }
1093 })
1094 .buffer_unordered(concurrency)
1095 .try_for_each(|checkpoint| {
1096 let perpetual_db = perpetual_db.clone();
1097 let tx_counter = tx_counter.clone();
1098 let checkpoint_counter = checkpoint_counter.clone();
1099 let checkpoint_data = checkpoint.0;
1100
1101 async move {
1102 let tx_digests: Vec<_> = checkpoint_data
1103 .transactions
1104 .iter()
1105 .map(|tx_data| *tx_data.transaction.digest())
1106 .collect();
1107 let num_txs = tx_digests.len();
1108 perpetual_db
1109 .insert_executed_transaction_digests_batch(epoch, tx_digests.into_iter())?;
1110 tx_counter.fetch_add(num_txs as u64, Ordering::Relaxed);
1111 checkpoint_counter.fetch_add(1, Ordering::Relaxed);
1112 Ok::<(), anyhow::Error>(())
1113 }
1114 })
1115 .await?;
1116
1117 let tx_count = tx_counter.load(Ordering::Relaxed);
1118 progress_bar.finish_with_message(format!(
1119 "Backfill complete: {} transactions from {} checkpoints",
1120 tx_count, num_checkpoints
1121 ));
1122 info!(
1123 "Backfill complete: {} transactions from {} checkpoints",
1124 tx_counter.load(Ordering::Relaxed),
1125 checkpoint_counter.load(Ordering::Relaxed)
1126 );
1127
1128 Ok(())
1129}
1130
1131pub async fn download_db_snapshot(
1132 path: &Path,
1133 epoch: u64,
1134 snapshot_store_config: ObjectStoreConfig,
1135 skip_indexes: bool,
1136 num_parallel_downloads: usize,
1137 max_retries: usize,
1138) -> Result<(), anyhow::Error> {
1139 let remote_store = if snapshot_store_config.no_sign_request {
1140 snapshot_store_config.make_http()?
1141 } else {
1142 snapshot_store_config.make().map(Arc::new)?
1143 };
1144
1145 let manifest_contents = remote_store.get_bytes(&get_path(MANIFEST_FILENAME)).await?;
1147 let root_manifest: Manifest = serde_json::from_slice(&manifest_contents)
1148 .map_err(|err| anyhow!("Error parsing MANIFEST from bytes: {}", err))?;
1149
1150 if !root_manifest.epoch_exists(epoch) {
1151 return Err(anyhow!(
1152 "Epoch dir {} doesn't exist on the remote store",
1153 epoch
1154 ));
1155 }
1156
1157 let epoch_path = format!("epoch_{}", epoch);
1158 let epoch_dir = get_path(&epoch_path);
1159
1160 let manifest_file = epoch_dir.child(MANIFEST_FILENAME);
1161 let epoch_manifest_contents =
1162 String::from_utf8(remote_store.get_bytes(&manifest_file).await?.to_vec())
1163 .map_err(|err| anyhow!("Error parsing {}/MANIFEST from bytes: {}", epoch_path, err))?;
1164
1165 let epoch_manifest =
1166 PerEpochManifest::deserialize_from_newline_delimited(&epoch_manifest_contents);
1167
1168 let mut files: Vec<String> = vec![];
1169 files.extend(epoch_manifest.filter_by_prefix("store/perpetual").lines);
1170 files.extend(epoch_manifest.filter_by_prefix("epochs").lines);
1171 files.extend(epoch_manifest.filter_by_prefix("checkpoints").lines);
1172 if !skip_indexes {
1173 files.extend(epoch_manifest.filter_by_prefix("indexes").lines)
1174 }
1175 let local_store = ObjectStoreConfig {
1176 object_store: Some(ObjectStoreType::File),
1177 directory: Some(path.to_path_buf()),
1178 ..Default::default()
1179 }
1180 .make()?;
1181 let m = MultiProgress::new();
1182 let path = path.to_path_buf();
1183 let snapshot_handle = tokio::spawn(async move {
1184 let progress_bar = m.add(
1185 ProgressBar::new(files.len() as u64).with_style(
1186 ProgressStyle::with_template(
1187 "[{elapsed_precise}] {wide_bar} {pos} out of {len} files done ({msg})",
1188 )
1189 .unwrap(),
1190 ),
1191 );
1192 let cloned_progress_bar = progress_bar.clone();
1193 let file_counter = Arc::new(AtomicUsize::new(0));
1194 futures::stream::iter(files.iter())
1195 .map(|file| {
1196 let local_store = local_store.clone();
1197 let remote_store = remote_store.clone();
1198 let counter_cloned = file_counter.clone();
1199 async move {
1200 counter_cloned.fetch_add(1, Ordering::Relaxed);
1201 let file_path = get_path(format!("epoch_{}/{}", epoch, file).as_str());
1202
1203 let mut attempts = 0;
1204 let max_attempts = max_retries + 1;
1205 loop {
1206 attempts += 1;
1207 match copy_file(&file_path, &file_path, &remote_store, &local_store).await {
1208 Ok(()) => break,
1209 Err(e) if attempts >= max_attempts => {
1210 return Err(anyhow::anyhow!(
1211 "Failed to download {} after {} attempts: {}",
1212 file_path,
1213 attempts,
1214 e
1215 ));
1216 }
1217 Err(e) => {
1218 tracing::warn!(
1219 "Failed to download {} (attempt {}/{}): {}, retrying in {}ms",
1220 file_path,
1221 attempts,
1222 max_attempts,
1223 e,
1224 1000 * attempts
1225 );
1226 tokio::time::sleep(Duration::from_millis(1000 * attempts as u64))
1227 .await;
1228 }
1229 }
1230 }
1231
1232 Ok::<::object_store::path::Path, anyhow::Error>(file_path.clone())
1233 }
1234 })
1235 .boxed()
1236 .buffer_unordered(num_parallel_downloads)
1237 .try_for_each(|path| {
1238 file_counter.fetch_sub(1, Ordering::Relaxed);
1239 cloned_progress_bar.inc(1);
1240 cloned_progress_bar.set_message(format!(
1241 "Downloading file: {}, #downloads_in_progress: {}",
1242 path,
1243 file_counter.load(Ordering::Relaxed)
1244 ));
1245 futures::future::ready(Ok(()))
1246 })
1247 .await?;
1248 progress_bar.finish_with_message("Snapshot file download is complete");
1249 Ok::<(), anyhow::Error>(())
1250 });
1251
1252 let tasks: Vec<_> = vec![Box::pin(snapshot_handle)];
1253 join_all(tasks)
1254 .await
1255 .into_iter()
1256 .collect::<Result<Vec<_>, _>>()?
1257 .into_iter()
1258 .for_each(|result| result.expect("Task failed"));
1259
1260 let store_dir = path.join("store");
1261 if store_dir.exists() {
1262 fs::remove_dir_all(&store_dir)?;
1263 }
1264 let epochs_dir = path.join("epochs");
1265 if epochs_dir.exists() {
1266 fs::remove_dir_all(&epochs_dir)?;
1267 }
1268 Ok(())
1269}