1use anyhow::Result;
6use fastcrypto::traits::ToFromBytes;
7use futures::future::AbortHandle;
8use futures::future::join_all;
9use itertools::Itertools;
10use std::collections::BTreeMap;
11use std::fmt::Write;
12use std::num::NonZeroUsize;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
16use std::time::Duration;
17use std::{fs, io};
18use sui_config::{NodeConfig, genesis::Genesis};
19use sui_core::authority_client::{AuthorityAPI, NetworkAuthorityClient};
20use sui_core::execution_cache::build_execution_cache_from_env;
21use sui_network::default_mysten_network_config;
22use sui_protocol_config::Chain;
23use sui_rpc_api::Client;
24use sui_storage::object_store::http::HttpDownloaderBuilder;
25use sui_storage::object_store::util::MANIFEST_FILENAME;
26use sui_storage::object_store::util::Manifest;
27use sui_storage::object_store::util::PerEpochManifest;
28use sui_storage::object_store::util::{build_object_store, end_of_epoch_data, fetch_checkpoint};
29use sui_types::committee::QUORUM_THRESHOLD;
30use sui_types::crypto::AuthorityPublicKeyBytes;
31use sui_types::global_state_hash::GlobalStateHash;
32use sui_types::messages_grpc::LayoutGenerationOption;
33use sui_types::multiaddr::Multiaddr;
34use sui_types::{base_types::*, object::Owner};
35use tokio::sync::mpsc;
36use tokio::task::JoinHandle;
37use tokio::time::Instant;
38
39use anyhow::anyhow;
40use clap::ValueEnum;
41use eyre::ContextCompat;
42use fastcrypto::hash::MultisetHash;
43use futures::{StreamExt, TryStreamExt};
44use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
45use prometheus::Registry;
46use serde::{Deserialize, Serialize};
47use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
48use sui_core::authority::AuthorityStore;
49use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
50use sui_core::checkpoints::CheckpointStore;
51use sui_core::epoch::committee_store::CommitteeStore;
52use sui_core::storage::RocksDbStore;
53use sui_snapshot::reader::StateSnapshotReaderV1;
54use sui_snapshot::setup_db_state;
55use sui_storage::object_store::ObjectStoreGetExt;
56use sui_storage::object_store::util::{copy_file, exists, get_path};
57use sui_types::full_checkpoint_content::CheckpointData;
58use sui_types::messages_checkpoint::{CheckpointCommitment, ECMHLiveObjectSetDigest};
59use sui_types::messages_grpc::{
60 ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest,
61 TransactionStatus,
62};
63
64use crate::formal_snapshot_util::read_summaries_for_list_no_verify;
65use sui_core::authority::authority_store_pruner::PrunerWatermarks;
66use sui_types::storage::ReadStore;
67use tracing::info;
68use typed_store::DBMetrics;
69
70pub mod commands;
71pub mod db_tool;
72mod formal_snapshot_util;
73
74#[derive(
75 Clone, Serialize, Deserialize, Debug, PartialEq, Copy, PartialOrd, Ord, Eq, ValueEnum, Default,
76)]
77pub enum SnapshotVerifyMode {
78 None,
83 #[default]
86 Normal,
87 Strict,
90}
91
92async fn make_clients(
94 sui_client: &Client,
95) -> Result<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>> {
96 let mut net_config = default_mysten_network_config();
97 net_config.connect_timeout = Some(Duration::from_secs(5));
98 let mut authority_clients = BTreeMap::new();
99
100 let active_validators = sui_client
101 .get_system_state_summary(None)
102 .await?
103 .active_validators;
104
105 for validator in active_validators {
106 let net_addr = Multiaddr::try_from(validator.net_address).unwrap();
107 let tls_config = sui_tls::create_rustls_client_config(
108 sui_types::crypto::NetworkPublicKey::from_bytes(&validator.network_pubkey_bytes)?,
109 sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
110 None,
111 );
112 let channel = net_config
113 .connect_lazy(&net_addr, tls_config)
114 .map_err(|err| anyhow!(err.to_string()))?;
115 let client = NetworkAuthorityClient::new(channel);
116 let public_key_bytes =
117 AuthorityPublicKeyBytes::from_bytes(&validator.protocol_pubkey_bytes)?;
118 authority_clients.insert(public_key_bytes, (net_addr.clone(), client));
119 }
120
121 Ok(authority_clients)
122}
123
124type ObjectVersionResponses = (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64);
125pub struct ObjectData {
126 requested_id: ObjectID,
127 responses: Vec<(AuthorityName, Multiaddr, ObjectVersionResponses)>,
128}
129
130trait OptionDebug<T> {
131 fn opt_debug(&self, def_str: &str) -> String;
132}
133
134impl<T> OptionDebug<T> for Option<T>
135where
136 T: std::fmt::Debug,
137{
138 fn opt_debug(&self, def_str: &str) -> String {
139 match self {
140 None => def_str.to_string(),
141 Some(t) => format!("{:?}", t),
142 }
143 }
144}
145
146#[allow(clippy::type_complexity)]
147pub struct GroupedObjectOutput {
148 pub grouped_results: BTreeMap<
149 Option<(
150 Option<SequenceNumber>,
151 ObjectDigest,
152 TransactionDigest,
153 Owner,
154 Option<TransactionDigest>,
155 )>,
156 Vec<AuthorityName>,
157 >,
158 pub voting_power: Vec<(
159 Option<(
160 Option<SequenceNumber>,
161 ObjectDigest,
162 TransactionDigest,
163 Owner,
164 Option<TransactionDigest>,
165 )>,
166 u64,
167 )>,
168 pub available_voting_power: u64,
169 pub fully_locked: bool,
170}
171
172impl GroupedObjectOutput {
173 pub fn new(
174 object_data: ObjectData,
175 committee: Arc<BTreeMap<AuthorityPublicKeyBytes, u64>>,
176 ) -> Self {
177 let mut grouped_results = BTreeMap::new();
178 let mut voting_power = BTreeMap::new();
179 let mut available_voting_power = 0;
180 for (name, _, (version, resp, _elapsed)) in &object_data.responses {
181 let stake = committee.get(name).unwrap();
182 let key = match resp {
183 Ok(r) => {
184 let obj_digest = r.object.compute_object_reference().2;
185 let parent_tx_digest = r.object.previous_transaction;
186 let owner = r.object.owner.clone();
187 let lock = r.lock_for_debugging.as_ref().map(|lock| *lock.digest());
188 if lock.is_none() {
189 available_voting_power += stake;
190 }
191 Some((*version, obj_digest, parent_tx_digest, owner, lock))
192 }
193 Err(_) => None,
194 };
195 let entry = grouped_results.entry(key.clone()).or_insert_with(Vec::new);
196 entry.push(*name);
197 let entry: &mut u64 = voting_power.entry(key).or_default();
198 *entry += stake;
199 }
200 let voting_power = voting_power
201 .into_iter()
202 .sorted_by(|(_, v1), (_, v2)| Ord::cmp(v2, v1))
203 .collect::<Vec<_>>();
204 let mut fully_locked = false;
205 if !voting_power.is_empty()
206 && voting_power.first().unwrap().1 + available_voting_power < QUORUM_THRESHOLD
207 {
208 fully_locked = true;
209 }
210 Self {
211 grouped_results,
212 voting_power,
213 available_voting_power,
214 fully_locked,
215 }
216 }
217}
218
219#[allow(clippy::format_in_format_args)]
220impl std::fmt::Display for GroupedObjectOutput {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 writeln!(f, "available stake: {}", self.available_voting_power)?;
223 writeln!(f, "fully locked: {}", self.fully_locked)?;
224 writeln!(f, "{:<100}\n", "-".repeat(100))?;
225 for (key, stake) in &self.voting_power {
226 let val = self.grouped_results.get(key).unwrap();
227 writeln!(f, "total stake: {stake}")?;
228 match key {
229 Some((_version, obj_digest, parent_tx_digest, owner, lock)) => {
230 let lock = lock.opt_debug("no-known-lock");
231 writeln!(f, "obj ref: {obj_digest}")?;
232 writeln!(f, "parent tx: {parent_tx_digest}")?;
233 writeln!(f, "owner: {owner}")?;
234 writeln!(f, "lock: {lock}")?;
235 for (i, name) in val.iter().enumerate() {
236 writeln!(f, " {:<4} {:<20}", i, name.concise(),)?;
237 }
238 }
239 None => {
240 writeln!(f, "ERROR")?;
241 for (i, name) in val.iter().enumerate() {
242 writeln!(f, " {:<4} {:<20}", i, name.concise(),)?;
243 }
244 }
245 };
246 writeln!(f, "{:<100}\n", "-".repeat(100))?;
247 }
248 Ok(())
249 }
250}
251
252struct ConciseObjectOutput(ObjectData);
253
254impl ConciseObjectOutput {
255 fn header() -> String {
256 format!(
257 "{:<20} {:<8} {:<66} {:<45} {}",
258 "validator", "version", "digest", "parent_cert", "owner"
259 )
260 }
261}
262
263impl std::fmt::Display for ConciseObjectOutput {
264 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265 for (name, _multi_addr, (version, resp, _time_elapsed)) in &self.0.responses {
266 write!(
267 f,
268 "{:<20} {:<8}",
269 format!("{:?}", name.concise()),
270 version.map(|s| s.value()).opt_debug("-")
271 )?;
272 match resp {
273 Err(_) => writeln!(
274 f,
275 "{:<66} {:<45} {:<51}",
276 "object-fetch-failed", "no-cert-available", "no-owner-available"
277 )?,
278 Ok(resp) => {
279 let obj_digest = resp.object.compute_object_reference().2;
280 let parent = resp.object.previous_transaction;
281 let owner = resp.object.owner.clone();
282 write!(f, " {:<66} {:<45} {:<51}", obj_digest, parent, owner)?;
283 }
284 }
285 writeln!(f)?;
286 }
287 Ok(())
288 }
289}
290
291struct VerboseObjectOutput(ObjectData);
292
293impl std::fmt::Display for VerboseObjectOutput {
294 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295 writeln!(f, "Object: {}", self.0.requested_id)?;
296
297 for (name, multiaddr, (version, resp, timespent)) in &self.0.responses {
298 writeln!(f, "validator: {:?}, addr: {:?}", name.concise(), multiaddr)?;
299 writeln!(
300 f,
301 "-- version: {} ({:.3}s)",
302 version.opt_debug("<version not available>"),
303 timespent,
304 )?;
305
306 match resp {
307 Err(e) => writeln!(f, "Error fetching object: {}", e)?,
308 Ok(resp) => {
309 writeln!(
310 f,
311 " -- object digest: {}",
312 resp.object.compute_object_reference().2
313 )?;
314 if resp.object.is_package() {
315 writeln!(f, " -- object: <Move Package>")?;
316 } else if let Some(layout) = &resp.layout {
317 writeln!(
318 f,
319 " -- object: Move Object: {}",
320 resp.object
321 .data
322 .try_as_move()
323 .unwrap()
324 .to_move_struct(layout)
325 .unwrap()
326 )?;
327 }
328 writeln!(f, " -- owner: {}", resp.object.owner)?;
329 writeln!(
330 f,
331 " -- locked by: {}",
332 resp.lock_for_debugging.opt_debug("<not locked>")
333 )?;
334 }
335 }
336 }
337 Ok(())
338 }
339}
340
341pub async fn get_object(
342 obj_id: ObjectID,
343 version: Option<u64>,
344 validator: Option<AuthorityName>,
345 clients: Arc<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>>,
346) -> Result<ObjectData> {
347 let responses = join_all(
348 clients
349 .iter()
350 .filter(|(name, _)| {
351 if let Some(v) = validator {
352 v == **name
353 } else {
354 true
355 }
356 })
357 .map(|(name, (address, client))| async {
358 let object_version = get_object_impl(client, obj_id, version).await;
359 (*name, address.clone(), object_version)
360 }),
361 )
362 .await;
363
364 Ok(ObjectData {
365 requested_id: obj_id,
366 responses,
367 })
368}
369
370pub async fn get_transaction_block(
371 tx_digest: TransactionDigest,
372 show_input_tx: bool,
373 fullnode_rpc: String,
374) -> Result<String> {
375 let sui_client = Client::new(fullnode_rpc)?;
376 let clients = make_clients(&sui_client).await?;
377 let timer = Instant::now();
378 let responses = join_all(clients.iter().map(|(name, (address, client))| async {
379 let result = client
380 .handle_transaction_info_request(TransactionInfoRequest {
381 transaction_digest: tx_digest,
382 })
383 .await;
384 (
385 *name,
386 address.clone(),
387 result,
388 timer.elapsed().as_secs_f64(),
389 )
390 }))
391 .await;
392
393 let validator_aware_of_tx = responses.iter().find(|r| r.2.is_ok());
395
396 let responses = responses
397 .iter()
398 .map(|r| {
399 let key =
400 r.2.as_ref()
401 .map(|ok_result| match &ok_result.status {
402 TransactionStatus::Signed(_) => None,
403 TransactionStatus::Executed(_, effects, _) => Some(effects.digest()),
404 })
405 .ok();
406 let err = r.2.as_ref().err();
407 (key, err, r)
408 })
409 .sorted_by(|(k1, err1, _), (k2, err2, _)| {
410 Ord::cmp(k1, k2).then_with(|| Ord::cmp(err1, err2))
411 })
412 .chunk_by(|(_, _err, r)| {
413 r.2.as_ref().map(|ok_result| match &ok_result.status {
414 TransactionStatus::Signed(_) => None,
415 TransactionStatus::Executed(_, effects, _) => Some((
416 ok_result.transaction.transaction_data(),
417 effects.data(),
418 effects.digest(),
419 )),
420 })
421 });
422 let mut s = String::new();
423 for (i, (key, group)) in responses.into_iter().enumerate() {
424 match key {
425 Ok(Some((tx, effects, effects_digest))) => {
426 writeln!(
427 &mut s,
428 "#{:<2} tx_digest: {:<68?} effects_digest: {:?}",
429 i, tx_digest, effects_digest,
430 )?;
431 writeln!(&mut s, "{:#?}", effects)?;
432 if show_input_tx {
433 writeln!(&mut s, "{:#?}", tx)?;
434 }
435 }
436 Ok(None) => {
437 writeln!(
438 &mut s,
439 "#{:<2} tx_digest: {:<68?} Signed but not executed",
440 i, tx_digest
441 )?;
442 if show_input_tx {
443 let validator_aware_of_tx = validator_aware_of_tx.unwrap();
445 let client = &clients.get(&validator_aware_of_tx.0).unwrap().1;
446 let tx_info = client.handle_transaction_info_request(TransactionInfoRequest {
447 transaction_digest: tx_digest,
448 }).await.unwrap_or_else(|e| panic!("Validator {:?} should have known about tx_digest: {:?}, got error: {:?}", validator_aware_of_tx.0, tx_digest, e));
449 writeln!(&mut s, "{:#?}", tx_info)?;
450 }
451 }
452 other => {
453 writeln!(&mut s, "#{:<2} {:#?}", i, other)?;
454 }
455 }
456 for (j, (_, _, res)) in group.enumerate() {
457 writeln!(
458 &mut s,
459 " {:<4} {:<20} {:<56} ({:.3}s)",
460 j,
461 res.0.concise(),
462 format!("{}", res.1),
463 res.3
464 )?;
465 }
466 writeln!(&mut s, "{:<100}\n", "-".repeat(100))?;
467 }
468 Ok(s)
469}
470
471async fn get_object_impl(
472 client: &NetworkAuthorityClient,
473 id: ObjectID,
474 version: Option<u64>,
475) -> (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64) {
476 let start = Instant::now();
477 let resp = client
478 .handle_object_info_request(ObjectInfoRequest {
479 object_id: id,
480 generate_layout: LayoutGenerationOption::Generate,
481 request_kind: match version {
482 None => ObjectInfoRequestKind::LatestObjectInfo,
483 Some(v) => ObjectInfoRequestKind::PastObjectInfoDebug(SequenceNumber::from_u64(v)),
484 },
485 })
486 .await
487 .map_err(anyhow::Error::from);
488 let elapsed = start.elapsed().as_secs_f64();
489
490 let resp_version = resp.as_ref().ok().map(|r| r.object.version().value());
491 (resp_version.map(SequenceNumber::from), resp, elapsed)
492}
493
494pub(crate) fn make_anemo_config() -> anemo_cli::Config {
495 use sui_network::discovery::*;
496 use sui_network::state_sync::*;
497
498 anemo_cli::Config::new()
500 .add_service(
502 "Discovery",
503 anemo_cli::ServiceInfo::new().add_method(
504 "GetKnownPeersV2",
505 anemo_cli::ron_method!(DiscoveryClient, get_known_peers_v2, ()),
506 ),
507 )
508 .add_service(
510 "StateSync",
511 anemo_cli::ServiceInfo::new()
512 .add_method(
513 "PushCheckpointSummary",
514 anemo_cli::ron_method!(
515 StateSyncClient,
516 push_checkpoint_summary,
517 sui_types::messages_checkpoint::CertifiedCheckpointSummary
518 ),
519 )
520 .add_method(
521 "GetCheckpointSummary",
522 anemo_cli::ron_method!(
523 StateSyncClient,
524 get_checkpoint_summary,
525 GetCheckpointSummaryRequest
526 ),
527 )
528 .add_method(
529 "GetCheckpointContents",
530 anemo_cli::ron_method!(
531 StateSyncClient,
532 get_checkpoint_contents,
533 sui_types::messages_checkpoint::CheckpointContentsDigest
534 ),
535 )
536 .add_method(
537 "GetCheckpointAvailability",
538 anemo_cli::ron_method!(StateSyncClient, get_checkpoint_availability, ()),
539 ),
540 )
541}
542
543fn copy_dir_all(
544 src: impl AsRef<Path>,
545 dst: impl AsRef<Path>,
546 skip: Vec<PathBuf>,
547) -> io::Result<()> {
548 fs::create_dir_all(&dst)?;
549 for entry in fs::read_dir(src)? {
550 let entry = entry?;
551 let ty = entry.file_type()?;
552 if skip.contains(&entry.path()) {
553 continue;
554 }
555 if ty.is_dir() {
556 copy_dir_all(
557 entry.path(),
558 dst.as_ref().join(entry.file_name()),
559 skip.clone(),
560 )?;
561 } else {
562 fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
563 }
564 }
565 Ok(())
566}
567
568pub async fn restore_from_db_checkpoint(
569 config: &NodeConfig,
570 db_checkpoint_path: &Path,
571) -> Result<(), anyhow::Error> {
572 copy_dir_all(db_checkpoint_path, config.db_path(), vec![])?;
573 Ok(())
574}
575
576fn start_summary_sync(
577 perpetual_db: Arc<AuthorityPerpetualTables>,
578 committee_store: Arc<CommitteeStore>,
579 checkpoint_store: Arc<CheckpointStore>,
580 m: MultiProgress,
581 genesis: Genesis,
582 ingestion_url: String,
583 num_parallel_downloads: usize,
584 verify: bool,
585 end_of_epoch_checkpoint_seq_nums: Vec<u64>,
586) -> JoinHandle<Result<(), anyhow::Error>> {
587 tokio::spawn(async move {
588 let store = AuthorityStore::open_no_genesis(perpetual_db, false, &Registry::default())?;
589 let cache_traits = build_execution_cache_from_env(&Registry::default(), &store);
590 let state_sync_store =
591 RocksDbStore::new(cache_traits, committee_store, checkpoint_store.clone());
592 if checkpoint_store
594 .get_checkpoint_by_digest(genesis.checkpoint().digest())
595 .unwrap()
596 .is_none()
597 {
598 checkpoint_store.insert_checkpoint_contents(genesis.checkpoint_contents().clone())?;
599 checkpoint_store.insert_verified_checkpoint(&genesis.checkpoint())?;
600 checkpoint_store.update_highest_synced_checkpoint(&genesis.checkpoint())?;
601 }
602
603 let last_checkpoint = end_of_epoch_checkpoint_seq_nums
604 .last()
605 .expect("Expected at least one checkpoint");
606
607 let num_to_sync = end_of_epoch_checkpoint_seq_nums.len() as u64;
608 let sync_progress_bar = m.add(
609 ProgressBar::new(num_to_sync).with_style(
610 ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})")
611 .unwrap(),
612 ),
613 );
614
615 let cloned_progress_bar = sync_progress_bar.clone();
616 let sync_checkpoint_counter = Arc::new(AtomicU64::new(0));
617 let s_instant = Instant::now();
618
619 let cloned_counter = sync_checkpoint_counter.clone();
620 let latest_synced = checkpoint_store
621 .get_highest_synced_checkpoint()?
622 .map(|c| c.sequence_number)
623 .unwrap_or(0);
624 let s_start = latest_synced
625 .checked_add(1)
626 .wrap_err("Checkpoint overflow")
627 .map_err(|_| anyhow!("Failed to increment checkpoint"))?;
628 tokio::spawn(async move {
629 loop {
630 if cloned_progress_bar.is_finished() {
631 break;
632 }
633 let num_summaries = cloned_counter.load(Ordering::Relaxed);
634 let total_checkpoints_per_sec =
635 num_summaries as f64 / s_instant.elapsed().as_secs_f64();
636 cloned_progress_bar.set_position(s_start + num_summaries);
637 cloned_progress_bar.set_message(format!(
638 "checkpoints synced per sec: {}",
639 total_checkpoints_per_sec
640 ));
641 tokio::time::sleep(Duration::from_secs(1)).await;
642 }
643 });
644
645 read_summaries_for_list_no_verify(
646 ingestion_url,
647 num_parallel_downloads,
648 state_sync_store.clone(),
649 end_of_epoch_checkpoint_seq_nums.clone(),
650 sync_checkpoint_counter,
651 )
652 .await?;
653 sync_progress_bar.finish_with_message("Checkpoint summary sync is complete");
654 info!("Checkpoint summary sync is complete");
655
656 let checkpoint = checkpoint_store
657 .get_checkpoint_by_sequence_number(*last_checkpoint)?
658 .ok_or(anyhow!("Failed to read last checkpoint"))?;
659 if verify {
660 let verify_progress_bar = m.add(
661 ProgressBar::new(num_to_sync).with_style(
662 ProgressStyle::with_template(
663 "[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})",
664 )
665 .unwrap(),
666 ),
667 );
668 let cloned_verify_progress_bar = verify_progress_bar.clone();
669 let verify_checkpoint_counter = Arc::new(AtomicU64::new(0));
670 let cloned_verify_counter = verify_checkpoint_counter.clone();
671 let v_instant = Instant::now();
672
673 tokio::spawn(async move {
674 loop {
675 if cloned_verify_progress_bar.is_finished() {
676 break;
677 }
678 let num_summaries = cloned_verify_counter.load(Ordering::Relaxed);
679 let total_checkpoints_per_sec =
680 num_summaries as f64 / v_instant.elapsed().as_secs_f64();
681 cloned_verify_progress_bar.set_position(num_summaries);
682 cloned_verify_progress_bar.set_message(format!(
683 "checkpoints verified per sec: {}",
684 total_checkpoints_per_sec
685 ));
686 tokio::time::sleep(Duration::from_secs(1)).await;
687 }
688 });
689
690 for (cp_epoch, epoch_last_cp_seq_num) in
691 end_of_epoch_checkpoint_seq_nums.iter().enumerate()
692 {
693 let epoch_last_checkpoint = checkpoint_store
694 .get_checkpoint_by_sequence_number(*epoch_last_cp_seq_num)?
695 .ok_or(anyhow!("Failed to read checkpoint"))?;
696 let committee = state_sync_store.get_committee(cp_epoch as u64).expect(
697 "Expected committee to exist after syncing all end of epoch checkpoints",
698 );
699 epoch_last_checkpoint
700 .verify_authority_signatures(&committee)
701 .expect("Failed to verify checkpoint");
702 verify_checkpoint_counter.fetch_add(1, Ordering::Relaxed);
703 }
704
705 verify_progress_bar.finish_with_message("Checkpoint summary verification is complete");
706 }
707
708 checkpoint_store.update_highest_verified_checkpoint(&checkpoint)?;
709 checkpoint_store.update_highest_synced_checkpoint(&checkpoint)?;
710 checkpoint_store.update_highest_executed_checkpoint(&checkpoint)?;
711 checkpoint_store.update_highest_pruned_checkpoint(&checkpoint)?;
712 Ok::<(), anyhow::Error>(())
713 })
714}
715
716pub async fn get_latest_available_epoch(
717 snapshot_store_config: &ObjectStoreConfig,
718) -> Result<u64, anyhow::Error> {
719 let remote_object_store = if snapshot_store_config.no_sign_request {
720 snapshot_store_config.make_http()?
721 } else {
722 snapshot_store_config.make().map(Arc::new)?
723 };
724 let manifest_contents = remote_object_store
725 .get_bytes(&get_path(MANIFEST_FILENAME))
726 .await?;
727 let root_manifest: Manifest = serde_json::from_slice(&manifest_contents)
728 .map_err(|err| anyhow!("Error parsing MANIFEST from bytes: {}", err))?;
729 let epoch = root_manifest
730 .available_epochs
731 .iter()
732 .max()
733 .ok_or(anyhow!("No snapshot found in manifest"))?;
734 Ok(*epoch)
735}
736
737pub async fn check_completed_snapshot(
738 snapshot_store_config: &ObjectStoreConfig,
739 epoch: EpochId,
740) -> Result<(), anyhow::Error> {
741 let success_marker = format!("epoch_{}/_SUCCESS", epoch);
742 let archive_success_marker = format!("archive/epoch_{}/_SUCCESS", epoch);
743 let remote_object_store = if snapshot_store_config.no_sign_request {
744 snapshot_store_config.make_http()?
745 } else {
746 snapshot_store_config.make().map(Arc::new)?
747 };
748
749 if exists(&remote_object_store, &get_path(success_marker.as_str())).await
751 || exists(
752 &remote_object_store,
753 &get_path(archive_success_marker.as_str()),
754 )
755 .await
756 {
757 Ok(())
758 } else {
759 Err(anyhow!(
760 "missing success marker at {}/{} or {}/{}",
761 snapshot_store_config.bucket.as_ref().unwrap_or(
762 &snapshot_store_config
763 .clone()
764 .aws_endpoint
765 .unwrap_or("unknown_bucket".to_string())
766 ),
767 success_marker,
768 snapshot_store_config.bucket.as_ref().unwrap_or(
769 &snapshot_store_config
770 .clone()
771 .aws_endpoint
772 .unwrap_or("unknown_bucket".to_string())
773 ),
774 archive_success_marker
775 ))
776 }
777}
778
779pub async fn download_formal_snapshot(
780 path: &Path,
781 epoch: EpochId,
782 genesis: &Path,
783 snapshot_store_config: ObjectStoreConfig,
784 ingestion_url: &str,
785 num_parallel_downloads: usize,
786 num_parallel_chunks: usize,
787 network: Chain,
788 verify: SnapshotVerifyMode,
789 max_retries: usize,
790) -> Result<(), anyhow::Error> {
791 let m = MultiProgress::new();
792 let msg = format!(
793 "Beginning formal snapshot restore to end of epoch {}, network: {:?}, verification mode: {:?}",
794 epoch, network, verify
795 );
796 m.println(&msg).unwrap();
797 info!("{}", msg);
798
799 let path = path.join("staging").to_path_buf();
800 if path.exists() {
801 fs::remove_dir_all(path.clone())?;
802 }
803
804 let registry_service =
806 mysten_metrics::start_prometheus_server("127.0.0.1:9184".parse().unwrap());
807 let prometheus_registry = registry_service.default_registry();
808 DBMetrics::init(registry_service.clone());
809 mysten_metrics::init_metrics(&prometheus_registry);
810
811 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(
812 &path.join("store"),
813 None,
814 None,
815 ));
816 let genesis = Genesis::load(genesis).unwrap();
817 let genesis_committee = genesis.committee();
818 let committee_store = Arc::new(CommitteeStore::new(
819 path.join("epochs"),
820 &genesis_committee,
821 None,
822 ));
823 let checkpoint_store = CheckpointStore::new(
824 &path.join("checkpoints"),
825 Arc::new(PrunerWatermarks::default()),
826 );
827
828 let end_of_epoch_checkpoint_seq_nums: Vec<_> = end_of_epoch_data(ingestion_url, vec![])
829 .await?
830 .into_iter()
831 .take((epoch + 1) as usize)
832 .collect();
833
834 let summaries_handle = start_summary_sync(
835 perpetual_db.clone(),
836 committee_store.clone(),
837 checkpoint_store.clone(),
838 m.clone(),
839 genesis.clone(),
840 ingestion_url.to_string(),
841 num_parallel_downloads,
842 verify != SnapshotVerifyMode::None,
843 end_of_epoch_checkpoint_seq_nums.clone(),
844 );
845
846 let backfill_handle = {
848 let perpetual_db = perpetual_db.clone();
849 let ingestion_url = ingestion_url.to_string();
850 let m = m.clone();
851 let end_of_epoch_checkpoint_seq_nums = end_of_epoch_checkpoint_seq_nums.clone();
852 tokio::spawn(async move {
853 backfill_epoch_transaction_digests(
854 perpetual_db,
855 epoch,
856 ingestion_url,
857 num_parallel_downloads,
858 m,
859 end_of_epoch_checkpoint_seq_nums,
860 )
861 .await
862 })
863 };
864
865 let (_abort_handle, abort_registration) = AbortHandle::new_pair();
866 let perpetual_db_clone = perpetual_db.clone();
867 let snapshot_dir = path.parent().unwrap().join("snapshot");
868 if snapshot_dir.exists() {
869 fs::remove_dir_all(snapshot_dir.clone())?;
870 }
871 let snapshot_dir_clone = snapshot_dir.clone();
872
873 let (sender, mut receiver) = mpsc::channel(num_parallel_downloads);
876 let m_clone = m.clone();
877
878 let snapshot_handle = tokio::spawn(async move {
879 let local_store_config = ObjectStoreConfig {
880 object_store: Some(ObjectStoreType::File),
881 directory: Some(snapshot_dir_clone.to_path_buf()),
882 ..Default::default()
883 };
884 let mut reader = StateSnapshotReaderV1::new(
885 epoch,
886 &snapshot_store_config,
887 &local_store_config,
888 NonZeroUsize::new(num_parallel_downloads).unwrap(),
889 m_clone,
890 false, max_retries,
892 num_parallel_chunks,
893 )
894 .await
895 .unwrap_or_else(|err| panic!("Failed to create reader: {}", err));
896 reader
897 .read(perpetual_db_clone.clone(), abort_registration, Some(sender))
898 .await
899 .unwrap_or_else(|err| panic!("Failed during read: {}", err));
900 info!("Snapshot download complete");
901 Ok::<(), anyhow::Error>(())
902 });
903 let mut root_global_state_hash = GlobalStateHash::default();
904 let mut num_live_objects = 0;
905 while let Some((partial_hash, num_objects)) = receiver.recv().await {
906 num_live_objects += num_objects;
907 root_global_state_hash.union(&partial_hash);
908 }
909 tokio::pin!(summaries_handle);
910 tokio::pin!(snapshot_handle);
911 tokio::pin!(backfill_handle);
912
913 let mut summaries_done = false;
914 let mut snapshot_done = false;
915 let mut backfill_done = false;
916
917 while !summaries_done {
919 tokio::select! {
920 result = &mut summaries_handle, if !summaries_done => {
921 summaries_done = true;
922 result.expect("Summaries task panicked")?;
923 }
924 result = &mut backfill_handle, if !backfill_done => {
925 backfill_done = true;
926 result.expect("Backfill task panicked")?;
927 }
928 result = &mut snapshot_handle, if !snapshot_done => {
929 snapshot_done = true;
930 result.expect("Snapshot task panicked")?;
931 }
932 }
933 }
934
935 let last_checkpoint = checkpoint_store
936 .get_highest_verified_checkpoint()?
937 .expect("Expected nonempty checkpoint store");
938
939 if verify != SnapshotVerifyMode::None {
941 assert_eq!(
942 last_checkpoint.epoch(),
943 epoch,
944 "Expected highest verified checkpoint ({}) to be for epoch {} but was for epoch {}",
945 last_checkpoint.sequence_number,
946 epoch,
947 last_checkpoint.epoch()
948 );
949 let commitment = last_checkpoint
950 .end_of_epoch_data
951 .as_ref()
952 .expect("Expected highest verified checkpoint to have end of epoch data")
953 .epoch_commitments
954 .last()
955 .expect(
956 "End of epoch has no commitments. This likely means that the epoch \
957 you are attempting to restore from does not support end of epoch state \
958 digest commitment. If restoring from mainnet, `--epoch` must be > 20, \
959 and for testnet, `--epoch` must be > 12.",
960 );
961 match commitment {
962 CheckpointCommitment::ECMHLiveObjectSetDigest(consensus_digest) => {
963 let local_digest: ECMHLiveObjectSetDigest = root_global_state_hash.digest().into();
964 assert_eq!(
965 *consensus_digest, local_digest,
966 "End of epoch {} root state digest {} does not match \
967 local root state hash {} computed from snapshot data",
968 epoch, consensus_digest.digest, local_digest.digest,
969 );
970 let progress_bar = m.add(
971 ProgressBar::new(1).with_style(
972 ProgressStyle::with_template(
973 "[{elapsed_precise}] {wide_bar} Verifying snapshot contents against root state hash ({msg})",
974 )
975 .unwrap(),
976 ),
977 );
978 progress_bar.finish_with_message("Verification complete");
979 }
980 _ => return Err(anyhow!("Expected ECMHLiveObjectSetDigest")),
981 };
982 } else {
983 m.println(
984 "WARNING: Skipping snapshot verification! \
985 This is highly discouraged unless you fully trust the source of this snapshot and its contents.
986 If this was unintentional, rerun with `--verify` set to `normal` or `strict`.",
987 )?;
988 }
989
990 while !snapshot_done || !backfill_done {
992 tokio::select! {
993 result = &mut backfill_handle, if !backfill_done => {
994 backfill_done = true;
995 result.expect("Backfill task panicked")?;
996 }
997 result = &mut snapshot_handle, if !snapshot_done => {
998 snapshot_done = true;
999 result.expect("Snapshot task panicked")?;
1000 }
1001 }
1002 }
1003
1004 checkpoint_store.insert_epoch_last_checkpoint(epoch, &last_checkpoint)?;
1008
1009 setup_db_state(
1010 epoch,
1011 root_global_state_hash.clone(),
1012 perpetual_db.clone(),
1013 checkpoint_store.clone(),
1014 committee_store,
1015 network,
1016 verify == SnapshotVerifyMode::Strict,
1017 num_live_objects,
1018 m.clone(),
1019 )
1020 .await?;
1021
1022 #[cfg(tidehunter)]
1025 perpetual_db
1026 .force_rebuild_control_region()
1027 .expect("Failed to rebuild tidehunter control region after snapshot restore");
1028
1029 let new_path = path.parent().unwrap().join("live");
1030 if new_path.exists() {
1031 fs::remove_dir_all(new_path.clone())?;
1032 }
1033 fs::rename(&path, &new_path)?;
1034 fs::remove_dir_all(snapshot_dir.clone())?;
1035 println!(
1036 "Successfully restored state from snapshot at end of epoch {}",
1037 epoch
1038 );
1039
1040 Ok(())
1041}
1042
1043async fn backfill_epoch_transaction_digests(
1044 perpetual_db: Arc<AuthorityPerpetualTables>,
1045 epoch: EpochId,
1046 ingestion_url: String,
1047 concurrency: usize,
1048 m: MultiProgress,
1049 end_of_epoch_checkpoint_seq_nums: Vec<u64>,
1050) -> Result<()> {
1051 if epoch == 0 {
1052 return Ok(());
1053 }
1054
1055 let epoch_last_cp_seq = end_of_epoch_checkpoint_seq_nums
1061 .get(epoch as usize)
1062 .ok_or_else(|| anyhow!("No checkpoint sequence found for epoch {}", epoch))?;
1063
1064 let epoch_start_cp = if epoch == 0 {
1065 0
1066 } else {
1067 end_of_epoch_checkpoint_seq_nums
1068 .get(epoch as usize - 1)
1069 .map(|cp| cp + 1)
1070 .unwrap_or(0)
1071 };
1072 let msg = format!(
1073 "Beginning transaction digest backfill for epoch: {:?}, backfilling from: {:?}..{:?}",
1074 epoch, epoch_start_cp, epoch_last_cp_seq
1075 );
1076 m.println(&msg).ok();
1077 info!("{}", msg);
1078
1079 let checkpoints_to_fetch: Vec<_> = (epoch_start_cp..=*epoch_last_cp_seq).collect();
1080 let num_checkpoints = checkpoints_to_fetch.len();
1081
1082 let progress_bar = m.add(
1083 ProgressBar::new(num_checkpoints as u64).with_style(
1084 ProgressStyle::with_template(
1085 "[{elapsed_precise}] {wide_bar} {pos}/{len} transactions backfilled ({msg})",
1086 )
1087 .unwrap(),
1088 ),
1089 );
1090
1091 let client = build_object_store(&ingestion_url, vec![]);
1092 let checkpoint_counter = Arc::new(AtomicU64::new(0));
1093 let tx_counter = Arc::new(AtomicU64::new(0));
1094 let cloned_checkpoint_counter = checkpoint_counter.clone();
1095 let cloned_progress_bar = progress_bar.clone();
1096 let start_instant = Instant::now();
1097
1098 tokio::spawn(async move {
1099 loop {
1100 if cloned_progress_bar.is_finished() {
1101 break;
1102 }
1103 let num_checkpoints_processed = cloned_checkpoint_counter.load(Ordering::Relaxed);
1104 let elapsed = start_instant.elapsed().as_secs_f64();
1105 let chkpts_per_sec = if elapsed > 0.0 {
1106 num_checkpoints_processed as f64 / elapsed
1107 } else {
1108 0.0
1109 };
1110 cloned_progress_bar.set_position(num_checkpoints_processed);
1111 cloned_progress_bar.set_message(format!("{:.1} chkpts/sec", chkpts_per_sec));
1112 tokio::time::sleep(Duration::from_millis(100)).await;
1113 }
1114 });
1115
1116 futures::stream::iter(checkpoints_to_fetch)
1117 .map(|sq| {
1118 let client = client.clone();
1119 async move {
1120 fetch_checkpoint(&client, sq)
1121 .await
1122 .map(|c| Arc::new(CheckpointData::from(c)))
1123 }
1124 })
1125 .buffer_unordered(concurrency)
1126 .try_for_each(|checkpoint| {
1127 let perpetual_db = perpetual_db.clone();
1128 let tx_counter = tx_counter.clone();
1129 let checkpoint_counter = checkpoint_counter.clone();
1130 let checkpoint_data = checkpoint;
1131
1132 async move {
1133 let tx_digests: Vec<_> = checkpoint_data
1134 .transactions
1135 .iter()
1136 .map(|tx_data| *tx_data.transaction.digest())
1137 .collect();
1138 let num_txs = tx_digests.len();
1139 perpetual_db
1140 .insert_executed_transaction_digests_batch(epoch, tx_digests.into_iter())?;
1141 tx_counter.fetch_add(num_txs as u64, Ordering::Relaxed);
1142 checkpoint_counter.fetch_add(1, Ordering::Relaxed);
1143 Ok::<(), anyhow::Error>(())
1144 }
1145 })
1146 .await?;
1147
1148 let tx_count = tx_counter.load(Ordering::Relaxed);
1149 progress_bar.finish_with_message(format!(
1150 "Backfill complete: {} transactions from {} checkpoints",
1151 tx_count, num_checkpoints
1152 ));
1153 info!(
1154 "Backfill complete: {} transactions from {} checkpoints",
1155 tx_counter.load(Ordering::Relaxed),
1156 checkpoint_counter.load(Ordering::Relaxed)
1157 );
1158
1159 Ok(())
1160}
1161
1162pub async fn download_db_snapshot(
1163 path: &Path,
1164 epoch: u64,
1165 snapshot_store_config: ObjectStoreConfig,
1166 skip_indexes: bool,
1167 num_parallel_downloads: usize,
1168 max_retries: usize,
1169) -> Result<(), anyhow::Error> {
1170 let remote_store = if snapshot_store_config.no_sign_request {
1171 snapshot_store_config.make_http()?
1172 } else {
1173 snapshot_store_config.make().map(Arc::new)?
1174 };
1175
1176 let manifest_contents = remote_store.get_bytes(&get_path(MANIFEST_FILENAME)).await?;
1178 let root_manifest: Manifest = serde_json::from_slice(&manifest_contents)
1179 .map_err(|err| anyhow!("Error parsing MANIFEST from bytes: {}", err))?;
1180
1181 if !root_manifest.epoch_exists(epoch) {
1182 return Err(anyhow!(
1183 "Epoch dir {} doesn't exist on the remote store",
1184 epoch
1185 ));
1186 }
1187
1188 let epoch_path = format!("epoch_{}", epoch);
1189 let epoch_dir = get_path(&epoch_path);
1190
1191 let manifest_file = epoch_dir.child(MANIFEST_FILENAME);
1192 let epoch_manifest_contents =
1193 String::from_utf8(remote_store.get_bytes(&manifest_file).await?.to_vec())
1194 .map_err(|err| anyhow!("Error parsing {}/MANIFEST from bytes: {}", epoch_path, err))?;
1195
1196 let epoch_manifest =
1197 PerEpochManifest::deserialize_from_newline_delimited(&epoch_manifest_contents);
1198
1199 let mut files: Vec<String> = vec![];
1200 files.extend(epoch_manifest.filter_by_prefix("store/perpetual").lines);
1201 files.extend(epoch_manifest.filter_by_prefix("epochs").lines);
1202 files.extend(epoch_manifest.filter_by_prefix("checkpoints").lines);
1203 if !skip_indexes {
1204 files.extend(epoch_manifest.filter_by_prefix("indexes").lines)
1205 }
1206 let local_store = ObjectStoreConfig {
1207 object_store: Some(ObjectStoreType::File),
1208 directory: Some(path.to_path_buf()),
1209 ..Default::default()
1210 }
1211 .make()?;
1212 let m = MultiProgress::new();
1213 let path = path.to_path_buf();
1214 let snapshot_handle = tokio::spawn(async move {
1215 let progress_bar = m.add(
1216 ProgressBar::new(files.len() as u64).with_style(
1217 ProgressStyle::with_template(
1218 "[{elapsed_precise}] {wide_bar} {pos} out of {len} files done ({msg})",
1219 )
1220 .unwrap(),
1221 ),
1222 );
1223 let cloned_progress_bar = progress_bar.clone();
1224 let file_counter = Arc::new(AtomicUsize::new(0));
1225 futures::stream::iter(files.iter())
1226 .map(|file| {
1227 let local_store = local_store.clone();
1228 let remote_store = remote_store.clone();
1229 let counter_cloned = file_counter.clone();
1230 async move {
1231 counter_cloned.fetch_add(1, Ordering::Relaxed);
1232 let file_path = get_path(format!("epoch_{}/{}", epoch, file).as_str());
1233
1234 let mut attempts = 0;
1235 let max_attempts = max_retries + 1;
1236 loop {
1237 attempts += 1;
1238 match copy_file(&file_path, &file_path, &remote_store, &local_store).await {
1239 Ok(()) => break,
1240 Err(e) if attempts >= max_attempts => {
1241 return Err(anyhow::anyhow!(
1242 "Failed to download {} after {} attempts: {}",
1243 file_path,
1244 attempts,
1245 e
1246 ));
1247 }
1248 Err(e) => {
1249 tracing::warn!(
1250 "Failed to download {} (attempt {}/{}): {}, retrying in {}ms",
1251 file_path,
1252 attempts,
1253 max_attempts,
1254 e,
1255 1000 * attempts
1256 );
1257 tokio::time::sleep(Duration::from_millis(1000 * attempts as u64))
1258 .await;
1259 }
1260 }
1261 }
1262
1263 Ok::<::object_store::path::Path, anyhow::Error>(file_path.clone())
1264 }
1265 })
1266 .boxed()
1267 .buffer_unordered(num_parallel_downloads)
1268 .try_for_each(|path| {
1269 file_counter.fetch_sub(1, Ordering::Relaxed);
1270 cloned_progress_bar.inc(1);
1271 cloned_progress_bar.set_message(format!(
1272 "Downloading file: {}, #downloads_in_progress: {}",
1273 path,
1274 file_counter.load(Ordering::Relaxed)
1275 ));
1276 futures::future::ready(Ok(()))
1277 })
1278 .await?;
1279 progress_bar.finish_with_message("Snapshot file download is complete");
1280 Ok::<(), anyhow::Error>(())
1281 });
1282
1283 let tasks: Vec<_> = vec![Box::pin(snapshot_handle)];
1284 join_all(tasks)
1285 .await
1286 .into_iter()
1287 .collect::<Result<Vec<_>, _>>()?
1288 .into_iter()
1289 .for_each(|result| result.expect("Task failed"));
1290
1291 let store_dir = path.join("store");
1292 if store_dir.exists() {
1293 fs::remove_dir_all(&store_dir)?;
1294 }
1295 let epochs_dir = path.join("epochs");
1296 if epochs_dir.exists() {
1297 fs::remove_dir_all(&epochs_dir)?;
1298 }
1299 Ok(())
1300}