1use anyhow::Result;
6use fastcrypto::traits::ToFromBytes;
7use futures::future::AbortHandle;
8use futures::future::join_all;
9use itertools::Itertools;
10use std::collections::BTreeMap;
11use std::fmt::Write;
12use std::net::{IpAddr, Ipv4Addr, SocketAddr};
13use std::num::NonZeroUsize;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Duration;
18use std::{fs, io};
19use sui_config::{NodeConfig, genesis::Genesis};
20use sui_core::authority_client::{AuthorityAPI, NetworkAuthorityClient};
21use sui_core::execution_cache::build_execution_cache_from_env;
22use sui_network::default_mysten_network_config;
23use sui_protocol_config::Chain;
24use sui_rpc_api::Client;
25use sui_storage::object_store::http::HttpDownloaderBuilder;
26use sui_storage::object_store::util::MANIFEST_FILENAME;
27use sui_storage::object_store::util::Manifest;
28use sui_storage::object_store::util::{build_object_store, end_of_epoch_data, fetch_checkpoint};
29use sui_types::committee::QUORUM_THRESHOLD;
30use sui_types::crypto::AuthorityPublicKeyBytes;
31use sui_types::digests::ChainIdentifier;
32use sui_types::global_state_hash::GlobalStateHash;
33use sui_types::messages_grpc::LayoutGenerationOption;
34use sui_types::multiaddr::Multiaddr;
35use sui_types::{base_types::*, object::Owner};
36use tokio::sync::mpsc;
37use tokio::task::JoinHandle;
38use tokio::time::Instant;
39
40use anyhow::anyhow;
41use clap::ValueEnum;
42use eyre::ContextCompat;
43use fastcrypto::hash::MultisetHash;
44use futures::{StreamExt, TryStreamExt};
45use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
46use prometheus::Registry;
47use serde::{Deserialize, Serialize};
48use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
49use sui_core::authority::AuthorityStore;
50use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
51use sui_core::checkpoints::CheckpointStore;
52use sui_core::epoch::committee_store::CommitteeStore;
53use sui_core::storage::RocksDbStore;
54use sui_snapshot::reader::StateSnapshotReaderV1;
55use sui_snapshot::setup_db_state;
56use sui_storage::object_store::ObjectStoreGetExt;
57use sui_storage::object_store::util::{exists, get_path};
58use sui_types::full_checkpoint_content::CheckpointData;
59use sui_types::messages_checkpoint::{CheckpointCommitment, ECMHLiveObjectSetDigest};
60use sui_types::messages_grpc::{
61 ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest,
62 TransactionStatus,
63};
64
65use crate::formal_snapshot_util::read_summaries_for_list_no_verify;
66use sui_core::authority::authority_store_pruner::PrunerWatermarks;
67use sui_types::storage::ReadStore;
68use tracing::info;
69use typed_store::DBMetrics;
70
71pub mod commands;
72pub mod db_tool;
73mod formal_snapshot_util;
74#[cfg(all(feature = "tideconsole", not(windows)))]
75pub mod tideconsole_cmd;
76
77#[derive(
78 Clone, Serialize, Deserialize, Debug, PartialEq, Copy, PartialOrd, Ord, Eq, ValueEnum, Default,
79)]
80pub enum SnapshotVerifyMode {
81 None,
86 #[default]
89 Normal,
90 Strict,
93}
94
95async fn make_clients(
97 sui_client: &Client,
98) -> Result<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>> {
99 let mut net_config = default_mysten_network_config();
100 net_config.connect_timeout = Some(Duration::from_secs(5));
101 let mut authority_clients = BTreeMap::new();
102
103 let active_validators = sui_client
104 .get_system_state_summary(None)
105 .await?
106 .active_validators;
107
108 for validator in active_validators {
109 let net_addr = Multiaddr::try_from(validator.net_address)
110 .unwrap()
111 .rewrite_http_to_https();
112 let tls_config = sui_tls::create_rustls_client_config(
113 sui_types::crypto::NetworkPublicKey::from_bytes(&validator.network_pubkey_bytes)?,
114 sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
115 None,
116 );
117 let channel = net_config
118 .connect_lazy(&net_addr, tls_config)
119 .map_err(|err| anyhow!(err.to_string()))?;
120 let client = NetworkAuthorityClient::new(channel);
121 let public_key_bytes =
122 AuthorityPublicKeyBytes::from_bytes(&validator.protocol_pubkey_bytes)?;
123 authority_clients.insert(public_key_bytes, (net_addr.clone(), client));
124 }
125
126 Ok(authority_clients)
127}
128
129type ObjectVersionResponses = (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64);
130pub struct ObjectData {
131 requested_id: ObjectID,
132 responses: Vec<(AuthorityName, Multiaddr, ObjectVersionResponses)>,
133}
134
135trait OptionDebug<T> {
136 fn opt_debug(&self, def_str: &str) -> String;
137}
138
139impl<T> OptionDebug<T> for Option<T>
140where
141 T: std::fmt::Debug,
142{
143 fn opt_debug(&self, def_str: &str) -> String {
144 match self {
145 None => def_str.to_string(),
146 Some(t) => format!("{:?}", t),
147 }
148 }
149}
150
151#[allow(clippy::type_complexity)]
152pub struct GroupedObjectOutput {
153 pub grouped_results: BTreeMap<
154 Option<(
155 Option<SequenceNumber>,
156 ObjectDigest,
157 TransactionDigest,
158 Owner,
159 Option<TransactionDigest>,
160 )>,
161 Vec<AuthorityName>,
162 >,
163 pub voting_power: Vec<(
164 Option<(
165 Option<SequenceNumber>,
166 ObjectDigest,
167 TransactionDigest,
168 Owner,
169 Option<TransactionDigest>,
170 )>,
171 u64,
172 )>,
173 pub available_voting_power: u64,
174 pub fully_locked: bool,
175}
176
177impl GroupedObjectOutput {
178 pub fn new(
179 object_data: ObjectData,
180 committee: Arc<BTreeMap<AuthorityPublicKeyBytes, u64>>,
181 ) -> Self {
182 let mut grouped_results = BTreeMap::new();
183 let mut voting_power = BTreeMap::new();
184 let mut available_voting_power = 0;
185 for (name, _, (version, resp, _elapsed)) in &object_data.responses {
186 let stake = committee.get(name).unwrap();
187 let key = match resp {
188 Ok(r) => {
189 let obj_digest = r.object.compute_object_reference().2;
190 let parent_tx_digest = r.object.previous_transaction;
191 let owner = r.object.owner.clone();
192 let lock = r.lock_for_debugging.as_ref().map(|lock| *lock.digest());
193 if lock.is_none() {
194 available_voting_power += stake;
195 }
196 Some((*version, obj_digest, parent_tx_digest, owner, lock))
197 }
198 Err(_) => None,
199 };
200 let entry = grouped_results.entry(key.clone()).or_insert_with(Vec::new);
201 entry.push(*name);
202 let entry: &mut u64 = voting_power.entry(key).or_default();
203 *entry += stake;
204 }
205 let voting_power = voting_power
206 .into_iter()
207 .sorted_by(|(_, v1), (_, v2)| Ord::cmp(v2, v1))
208 .collect::<Vec<_>>();
209 let mut fully_locked = false;
210 if !voting_power.is_empty()
211 && voting_power.first().unwrap().1 + available_voting_power < QUORUM_THRESHOLD
212 {
213 fully_locked = true;
214 }
215 Self {
216 grouped_results,
217 voting_power,
218 available_voting_power,
219 fully_locked,
220 }
221 }
222}
223
224#[allow(clippy::format_in_format_args)]
225impl std::fmt::Display for GroupedObjectOutput {
226 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227 writeln!(f, "available stake: {}", self.available_voting_power)?;
228 writeln!(f, "fully locked: {}", self.fully_locked)?;
229 writeln!(f, "{:<100}\n", "-".repeat(100))?;
230 for (key, stake) in &self.voting_power {
231 let val = self.grouped_results.get(key).unwrap();
232 writeln!(f, "total stake: {stake}")?;
233 match key {
234 Some((_version, obj_digest, parent_tx_digest, owner, lock)) => {
235 let lock = lock.opt_debug("no-known-lock");
236 writeln!(f, "obj ref: {obj_digest}")?;
237 writeln!(f, "parent tx: {parent_tx_digest}")?;
238 writeln!(f, "owner: {owner}")?;
239 writeln!(f, "lock: {lock}")?;
240 for (i, name) in val.iter().enumerate() {
241 writeln!(f, " {:<4} {:<20}", i, name.concise(),)?;
242 }
243 }
244 None => {
245 writeln!(f, "ERROR")?;
246 for (i, name) in val.iter().enumerate() {
247 writeln!(f, " {:<4} {:<20}", i, name.concise(),)?;
248 }
249 }
250 };
251 writeln!(f, "{:<100}\n", "-".repeat(100))?;
252 }
253 Ok(())
254 }
255}
256
257struct ConciseObjectOutput(ObjectData);
258
259impl ConciseObjectOutput {
260 fn header() -> String {
261 format!(
262 "{:<20} {:<8} {:<66} {:<45} {}",
263 "validator", "version", "digest", "parent_cert", "owner"
264 )
265 }
266}
267
268impl std::fmt::Display for ConciseObjectOutput {
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 for (name, _multi_addr, (version, resp, _time_elapsed)) in &self.0.responses {
271 write!(
272 f,
273 "{:<20} {:<8}",
274 format!("{:?}", name.concise()),
275 version.map(|s| s.value()).opt_debug("-")
276 )?;
277 match resp {
278 Err(_) => writeln!(
279 f,
280 "{:<66} {:<45} {:<51}",
281 "object-fetch-failed", "no-cert-available", "no-owner-available"
282 )?,
283 Ok(resp) => {
284 let obj_digest = resp.object.compute_object_reference().2;
285 let parent = resp.object.previous_transaction;
286 let owner = resp.object.owner.clone();
287 write!(f, " {:<66} {:<45} {:<51}", obj_digest, parent, owner)?;
288 }
289 }
290 writeln!(f)?;
291 }
292 Ok(())
293 }
294}
295
296struct VerboseObjectOutput(ObjectData);
297
298impl std::fmt::Display for VerboseObjectOutput {
299 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300 writeln!(f, "Object: {}", self.0.requested_id)?;
301
302 for (name, multiaddr, (version, resp, timespent)) in &self.0.responses {
303 writeln!(f, "validator: {:?}, addr: {:?}", name.concise(), multiaddr)?;
304 writeln!(
305 f,
306 "-- version: {} ({:.3}s)",
307 version.opt_debug("<version not available>"),
308 timespent,
309 )?;
310
311 match resp {
312 Err(e) => writeln!(f, "Error fetching object: {}", e)?,
313 Ok(resp) => {
314 writeln!(
315 f,
316 " -- object digest: {}",
317 resp.object.compute_object_reference().2
318 )?;
319 if resp.object.is_package() {
320 writeln!(f, " -- object: <Move Package>")?;
321 } else if let Some(layout) = &resp.layout {
322 writeln!(
323 f,
324 " -- object: Move Object: {}",
325 resp.object
326 .data
327 .try_as_move()
328 .unwrap()
329 .to_move_struct(layout)
330 .unwrap()
331 )?;
332 }
333 writeln!(f, " -- owner: {}", resp.object.owner)?;
334 writeln!(
335 f,
336 " -- locked by: {}",
337 resp.lock_for_debugging.opt_debug("<not locked>")
338 )?;
339 }
340 }
341 }
342 Ok(())
343 }
344}
345
346pub async fn get_object(
347 obj_id: ObjectID,
348 version: Option<u64>,
349 validator: Option<AuthorityName>,
350 clients: Arc<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>>,
351) -> Result<ObjectData> {
352 let responses = join_all(
353 clients
354 .iter()
355 .filter(|(name, _)| {
356 if let Some(v) = validator {
357 v == **name
358 } else {
359 true
360 }
361 })
362 .map(|(name, (address, client))| async {
363 let object_version = get_object_impl(client, obj_id, version).await;
364 (*name, address.clone(), object_version)
365 }),
366 )
367 .await;
368
369 Ok(ObjectData {
370 requested_id: obj_id,
371 responses,
372 })
373}
374
375pub async fn get_transaction_block(
376 tx_digest: TransactionDigest,
377 show_input_tx: bool,
378 fullnode_rpc: String,
379) -> Result<String> {
380 let sui_client = Client::new(fullnode_rpc)?;
381 let clients = make_clients(&sui_client).await?;
382 let timer = Instant::now();
383 let responses = join_all(clients.iter().map(|(name, (address, client))| async {
384 let result = client
385 .handle_transaction_info_request(TransactionInfoRequest {
386 transaction_digest: tx_digest,
387 })
388 .await;
389 (
390 *name,
391 address.clone(),
392 result,
393 timer.elapsed().as_secs_f64(),
394 )
395 }))
396 .await;
397
398 let validator_aware_of_tx = responses.iter().find(|r| r.2.is_ok());
400
401 let responses = responses
402 .iter()
403 .map(|r| {
404 let key =
405 r.2.as_ref()
406 .map(|ok_result| match &ok_result.status {
407 TransactionStatus::Signed(_) => None,
408 TransactionStatus::Executed(_, effects, _) => Some(effects.digest()),
409 })
410 .ok();
411 let err = r.2.as_ref().err();
412 (key, err, r)
413 })
414 .sorted_by(|(k1, err1, _), (k2, err2, _)| {
415 Ord::cmp(k1, k2).then_with(|| Ord::cmp(err1, err2))
416 })
417 .chunk_by(|(_, _err, r)| {
418 r.2.as_ref().map(|ok_result| match &ok_result.status {
419 TransactionStatus::Signed(_) => None,
420 TransactionStatus::Executed(_, effects, _) => Some((
421 ok_result.transaction.transaction_data(),
422 effects.data(),
423 effects.digest(),
424 )),
425 })
426 });
427 let mut s = String::new();
428 for (i, (key, group)) in responses.into_iter().enumerate() {
429 match key {
430 Ok(Some((tx, effects, effects_digest))) => {
431 writeln!(
432 &mut s,
433 "#{:<2} tx_digest: {:<68?} effects_digest: {:?}",
434 i, tx_digest, effects_digest,
435 )?;
436 writeln!(&mut s, "{:#?}", effects)?;
437 if show_input_tx {
438 writeln!(&mut s, "{:#?}", tx)?;
439 }
440 }
441 Ok(None) => {
442 writeln!(
443 &mut s,
444 "#{:<2} tx_digest: {:<68?} Signed but not executed",
445 i, tx_digest
446 )?;
447 if show_input_tx {
448 let validator_aware_of_tx = validator_aware_of_tx.unwrap();
450 let client = &clients.get(&validator_aware_of_tx.0).unwrap().1;
451 let tx_info = client.handle_transaction_info_request(TransactionInfoRequest {
452 transaction_digest: tx_digest,
453 }).await.unwrap_or_else(|e| panic!("Validator {:?} should have known about tx_digest: {:?}, got error: {:?}", validator_aware_of_tx.0, tx_digest, e));
454 writeln!(&mut s, "{:#?}", tx_info)?;
455 }
456 }
457 other => {
458 writeln!(&mut s, "#{:<2} {:#?}", i, other)?;
459 }
460 }
461 for (j, (_, _, res)) in group.enumerate() {
462 writeln!(
463 &mut s,
464 " {:<4} {:<20} {:<56} ({:.3}s)",
465 j,
466 res.0.concise(),
467 format!("{}", res.1),
468 res.3
469 )?;
470 }
471 writeln!(&mut s, "{:<100}\n", "-".repeat(100))?;
472 }
473 Ok(s)
474}
475
476async fn get_object_impl(
477 client: &NetworkAuthorityClient,
478 id: ObjectID,
479 version: Option<u64>,
480) -> (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64) {
481 let start = Instant::now();
482 let resp = client
483 .handle_object_info_request(ObjectInfoRequest {
484 object_id: id,
485 generate_layout: LayoutGenerationOption::Generate,
486 request_kind: match version {
487 None => ObjectInfoRequestKind::LatestObjectInfo,
488 Some(v) => ObjectInfoRequestKind::PastObjectInfoDebug(SequenceNumber::from_u64(v)),
489 },
490 })
491 .await
492 .map_err(anyhow::Error::from);
493 let elapsed = start.elapsed().as_secs_f64();
494
495 let resp_version = resp.as_ref().ok().map(|r| r.object.version().value());
496 (resp_version.map(SequenceNumber::from), resp, elapsed)
497}
498
499pub(crate) fn make_anemo_config() -> anemo_cli::Config {
500 use sui_network::discovery::*;
501 use sui_network::state_sync::*;
502
503 anemo_cli::Config::new()
505 .add_service(
507 "Discovery",
508 anemo_cli::ServiceInfo::new().add_method(
509 "GetKnownPeersV2",
510 anemo_cli::ron_method!(DiscoveryClient, get_known_peers_v2, ()),
511 ),
512 )
513 .add_service(
515 "StateSync",
516 anemo_cli::ServiceInfo::new()
517 .add_method(
518 "PushCheckpointSummary",
519 anemo_cli::ron_method!(
520 StateSyncClient,
521 push_checkpoint_summary,
522 sui_types::messages_checkpoint::CertifiedCheckpointSummary
523 ),
524 )
525 .add_method(
526 "GetCheckpointSummary",
527 anemo_cli::ron_method!(
528 StateSyncClient,
529 get_checkpoint_summary,
530 GetCheckpointSummaryRequest
531 ),
532 )
533 .add_method(
534 "GetCheckpointContents",
535 anemo_cli::ron_method!(
536 StateSyncClient,
537 get_checkpoint_contents,
538 sui_types::messages_checkpoint::CheckpointContentsDigest
539 ),
540 )
541 .add_method(
542 "GetCheckpointAvailability",
543 anemo_cli::ron_method!(StateSyncClient, get_checkpoint_availability, ()),
544 ),
545 )
546}
547
548fn copy_dir_all(
549 src: impl AsRef<Path>,
550 dst: impl AsRef<Path>,
551 skip: Vec<PathBuf>,
552) -> io::Result<()> {
553 fs::create_dir_all(&dst)?;
554 for entry in fs::read_dir(src)? {
555 let entry = entry?;
556 let ty = entry.file_type()?;
557 if skip.contains(&entry.path()) {
558 continue;
559 }
560 if ty.is_dir() {
561 copy_dir_all(
562 entry.path(),
563 dst.as_ref().join(entry.file_name()),
564 skip.clone(),
565 )?;
566 } else {
567 fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
568 }
569 }
570 Ok(())
571}
572
573pub async fn restore_from_db_checkpoint(
574 config: &NodeConfig,
575 db_checkpoint_path: &Path,
576) -> Result<(), anyhow::Error> {
577 copy_dir_all(db_checkpoint_path, config.db_path(), vec![])?;
578 Ok(())
579}
580
581fn start_summary_sync(
582 perpetual_db: Arc<AuthorityPerpetualTables>,
583 committee_store: Arc<CommitteeStore>,
584 checkpoint_store: Arc<CheckpointStore>,
585 m: MultiProgress,
586 genesis: Genesis,
587 ingestion_url: String,
588 num_parallel_downloads: usize,
589 verify: bool,
590 end_of_epoch_checkpoint_seq_nums: Vec<u64>,
591) -> JoinHandle<Result<(), anyhow::Error>> {
592 tokio::spawn(async move {
593 let store = AuthorityStore::open_no_genesis(perpetual_db, false, &Registry::default())?;
594 let cache_traits = build_execution_cache_from_env(&Registry::default(), &store);
595 let state_sync_store =
596 RocksDbStore::new(cache_traits, committee_store, checkpoint_store.clone());
597 if checkpoint_store
599 .get_checkpoint_by_digest(genesis.checkpoint().digest())
600 .unwrap()
601 .is_none()
602 {
603 checkpoint_store.insert_checkpoint_contents(genesis.checkpoint_contents().clone())?;
604 checkpoint_store.insert_verified_checkpoint(&genesis.checkpoint())?;
605 checkpoint_store.update_highest_synced_checkpoint(&genesis.checkpoint())?;
606 }
607
608 let last_checkpoint = end_of_epoch_checkpoint_seq_nums
609 .last()
610 .expect("Expected at least one checkpoint");
611
612 let num_to_sync = end_of_epoch_checkpoint_seq_nums.len() as u64;
613 let sync_progress_bar = m.add(
614 ProgressBar::new(num_to_sync).with_style(
615 ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})")
616 .unwrap(),
617 ),
618 );
619
620 let cloned_progress_bar = sync_progress_bar.clone();
621 let sync_checkpoint_counter = Arc::new(AtomicU64::new(0));
622 let s_instant = Instant::now();
623
624 let cloned_counter = sync_checkpoint_counter.clone();
625 let latest_synced = checkpoint_store
626 .get_highest_synced_checkpoint()?
627 .map(|c| c.sequence_number)
628 .unwrap_or(0);
629 let s_start = latest_synced
630 .checked_add(1)
631 .wrap_err("Checkpoint overflow")
632 .map_err(|_| anyhow!("Failed to increment checkpoint"))?;
633 tokio::spawn(async move {
634 loop {
635 if cloned_progress_bar.is_finished() {
636 break;
637 }
638 let num_summaries = cloned_counter.load(Ordering::Relaxed);
639 let total_checkpoints_per_sec =
640 num_summaries as f64 / s_instant.elapsed().as_secs_f64();
641 cloned_progress_bar.set_position(s_start + num_summaries);
642 cloned_progress_bar.set_message(format!(
643 "checkpoints synced per sec: {}",
644 total_checkpoints_per_sec
645 ));
646 tokio::time::sleep(Duration::from_secs(1)).await;
647 }
648 });
649
650 read_summaries_for_list_no_verify(
651 ingestion_url,
652 num_parallel_downloads,
653 state_sync_store.clone(),
654 end_of_epoch_checkpoint_seq_nums.clone(),
655 sync_checkpoint_counter,
656 )
657 .await?;
658 sync_progress_bar.finish_with_message("Checkpoint summary sync is complete");
659 info!("Checkpoint summary sync is complete");
660
661 let checkpoint = checkpoint_store
662 .get_checkpoint_by_sequence_number(*last_checkpoint)?
663 .ok_or(anyhow!("Failed to read last checkpoint"))?;
664 if verify {
665 let verify_progress_bar = m.add(
666 ProgressBar::new(num_to_sync).with_style(
667 ProgressStyle::with_template(
668 "[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})",
669 )
670 .unwrap(),
671 ),
672 );
673 let cloned_verify_progress_bar = verify_progress_bar.clone();
674 let verify_checkpoint_counter = Arc::new(AtomicU64::new(0));
675 let cloned_verify_counter = verify_checkpoint_counter.clone();
676 let v_instant = Instant::now();
677
678 tokio::spawn(async move {
679 loop {
680 if cloned_verify_progress_bar.is_finished() {
681 break;
682 }
683 let num_summaries = cloned_verify_counter.load(Ordering::Relaxed);
684 let total_checkpoints_per_sec =
685 num_summaries as f64 / v_instant.elapsed().as_secs_f64();
686 cloned_verify_progress_bar.set_position(num_summaries);
687 cloned_verify_progress_bar.set_message(format!(
688 "checkpoints verified per sec: {}",
689 total_checkpoints_per_sec
690 ));
691 tokio::time::sleep(Duration::from_secs(1)).await;
692 }
693 });
694
695 for (cp_epoch, epoch_last_cp_seq_num) in
696 end_of_epoch_checkpoint_seq_nums.iter().enumerate()
697 {
698 let epoch_last_checkpoint = checkpoint_store
699 .get_checkpoint_by_sequence_number(*epoch_last_cp_seq_num)?
700 .ok_or(anyhow!("Failed to read checkpoint"))?;
701 let committee = state_sync_store.get_committee(cp_epoch as u64).expect(
702 "Expected committee to exist after syncing all end of epoch checkpoints",
703 );
704 epoch_last_checkpoint
705 .verify_authority_signatures(&committee)
706 .expect("Failed to verify checkpoint");
707 verify_checkpoint_counter.fetch_add(1, Ordering::Relaxed);
708 }
709
710 verify_progress_bar.finish_with_message("Checkpoint summary verification is complete");
711 }
712
713 checkpoint_store.update_highest_verified_checkpoint(&checkpoint)?;
714 checkpoint_store.update_highest_synced_checkpoint(&checkpoint)?;
715 checkpoint_store.update_highest_executed_checkpoint(&checkpoint)?;
716 checkpoint_store.update_highest_pruned_checkpoint(&checkpoint)?;
717 Ok::<(), anyhow::Error>(())
718 })
719}
720
721pub async fn get_latest_available_epoch(
722 snapshot_store_config: &ObjectStoreConfig,
723) -> Result<u64, anyhow::Error> {
724 let remote_object_store = if snapshot_store_config.no_sign_request {
725 snapshot_store_config.make_http()?
726 } else {
727 snapshot_store_config.make().map(Arc::new)?
728 };
729 let manifest_contents = remote_object_store
730 .get_bytes(&get_path(MANIFEST_FILENAME))
731 .await?;
732 let root_manifest: Manifest = serde_json::from_slice(&manifest_contents)
733 .map_err(|err| anyhow!("Error parsing MANIFEST from bytes: {}", err))?;
734 let epoch = root_manifest
735 .available_epochs
736 .iter()
737 .max()
738 .ok_or(anyhow!("No snapshot found in manifest"))?;
739 Ok(*epoch)
740}
741
742pub async fn check_completed_snapshot(
743 snapshot_store_config: &ObjectStoreConfig,
744 epoch: EpochId,
745) -> Result<(), anyhow::Error> {
746 let success_marker = format!("epoch_{}/_SUCCESS", epoch);
747 let archive_success_marker = format!("archive/epoch_{}/_SUCCESS", epoch);
748 let remote_object_store = if snapshot_store_config.no_sign_request {
749 snapshot_store_config.make_http()?
750 } else {
751 snapshot_store_config.make().map(Arc::new)?
752 };
753
754 if exists(&remote_object_store, &get_path(success_marker.as_str())).await
756 || exists(
757 &remote_object_store,
758 &get_path(archive_success_marker.as_str()),
759 )
760 .await
761 {
762 Ok(())
763 } else {
764 Err(anyhow!(
765 "missing success marker at {}/{} or {}/{}",
766 snapshot_store_config.bucket.as_ref().unwrap_or(
767 &snapshot_store_config
768 .clone()
769 .aws_endpoint
770 .unwrap_or("unknown_bucket".to_string())
771 ),
772 success_marker,
773 snapshot_store_config.bucket.as_ref().unwrap_or(
774 &snapshot_store_config
775 .clone()
776 .aws_endpoint
777 .unwrap_or("unknown_bucket".to_string())
778 ),
779 archive_success_marker
780 ))
781 }
782}
783
784pub async fn download_formal_snapshot(
785 path: &Path,
786 epoch: EpochId,
787 genesis: &Path,
788 snapshot_store_config: ObjectStoreConfig,
789 ingestion_url: &str,
790 num_parallel_downloads: usize,
791 num_parallel_chunks: usize,
792 network: Chain,
793 verify: SnapshotVerifyMode,
794 max_retries: usize,
795 metrics_port: u16,
796) -> Result<(), anyhow::Error> {
797 let m = MultiProgress::new();
798 let msg = format!(
799 "Beginning formal snapshot restore to end of epoch {}, network: {:?}, verification mode: {:?}",
800 epoch, network, verify
801 );
802 m.println(&msg).unwrap();
803 info!("{}", msg);
804
805 let path = path.join("staging").to_path_buf();
806 if path.exists() {
807 fs::remove_dir_all(path.clone())?;
808 }
809
810 let metrics_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), metrics_port);
812 let registry_service = mysten_metrics::start_prometheus_server(metrics_addr);
813 let prometheus_registry = registry_service.default_registry();
814 DBMetrics::init(registry_service.clone());
815 mysten_metrics::init_metrics(&prometheus_registry);
816
817 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(
818 &path.join("store"),
819 None,
820 None,
821 ));
822 let genesis = Genesis::load(genesis)?;
823 let genesis_chain = ChainIdentifier::from(*genesis.checkpoint().digest()).chain();
824 if genesis_chain != network {
825 return Err(anyhow!(
826 "Genesis file is for chain {}, but formal snapshot download is configured for --network {}. \
827 Use the matching genesis.blob or pass the correct --network flag.",
828 genesis_chain.as_str(),
829 network.as_str(),
830 ));
831 }
832 let genesis_committee = genesis.committee();
833 let committee_store = Arc::new(CommitteeStore::new(
834 path.join("epochs"),
835 &genesis_committee,
836 None,
837 ));
838 let checkpoint_store = CheckpointStore::new(
839 &path.join("checkpoints"),
840 Arc::new(PrunerWatermarks::default()),
841 );
842
843 let end_of_epoch_checkpoint_seq_nums: Vec<_> = end_of_epoch_data(ingestion_url, vec![])
844 .await?
845 .into_iter()
846 .take((epoch + 1) as usize)
847 .collect();
848
849 let summaries_handle = start_summary_sync(
850 perpetual_db.clone(),
851 committee_store.clone(),
852 checkpoint_store.clone(),
853 m.clone(),
854 genesis.clone(),
855 ingestion_url.to_string(),
856 num_parallel_downloads,
857 verify != SnapshotVerifyMode::None,
858 end_of_epoch_checkpoint_seq_nums.clone(),
859 );
860
861 let backfill_handle = {
863 let perpetual_db = perpetual_db.clone();
864 let ingestion_url = ingestion_url.to_string();
865 let m = m.clone();
866 let end_of_epoch_checkpoint_seq_nums = end_of_epoch_checkpoint_seq_nums.clone();
867 tokio::spawn(async move {
868 backfill_epoch_transaction_digests(
869 perpetual_db,
870 epoch,
871 ingestion_url,
872 num_parallel_downloads,
873 m,
874 end_of_epoch_checkpoint_seq_nums,
875 )
876 .await
877 })
878 };
879
880 let (_abort_handle, abort_registration) = AbortHandle::new_pair();
881 let perpetual_db_clone = perpetual_db.clone();
882 let snapshot_dir = path.parent().unwrap().join("snapshot");
883 if snapshot_dir.exists() {
884 fs::remove_dir_all(snapshot_dir.clone())?;
885 }
886 let snapshot_dir_clone = snapshot_dir.clone();
887
888 let (sender, mut receiver) = mpsc::channel(num_parallel_downloads);
891 let m_clone = m.clone();
892
893 let snapshot_handle = tokio::spawn(async move {
894 let local_store_config = ObjectStoreConfig {
895 object_store: Some(ObjectStoreType::File),
896 directory: Some(snapshot_dir_clone.to_path_buf()),
897 ..Default::default()
898 };
899 let mut reader = StateSnapshotReaderV1::new(
900 epoch,
901 &snapshot_store_config,
902 &local_store_config,
903 NonZeroUsize::new(num_parallel_downloads).unwrap(),
904 m_clone,
905 false, max_retries,
907 num_parallel_chunks,
908 )
909 .await
910 .unwrap_or_else(|err| panic!("Failed to create reader: {}", err));
911 reader
912 .read(perpetual_db_clone.clone(), abort_registration, Some(sender))
913 .await
914 .unwrap_or_else(|err| panic!("Failed during read: {}", err));
915 info!("Snapshot download complete");
916 Ok::<(), anyhow::Error>(())
917 });
918 let mut root_global_state_hash = GlobalStateHash::default();
919 let mut num_live_objects = 0;
920 while let Some((partial_hash, num_objects)) = receiver.recv().await {
921 num_live_objects += num_objects;
922 root_global_state_hash.union(&partial_hash);
923 }
924 tokio::pin!(summaries_handle);
925 tokio::pin!(snapshot_handle);
926 tokio::pin!(backfill_handle);
927
928 let mut summaries_done = false;
929 let mut snapshot_done = false;
930 let mut backfill_done = false;
931
932 while !summaries_done {
934 tokio::select! {
935 result = &mut summaries_handle, if !summaries_done => {
936 summaries_done = true;
937 result.expect("Summaries task panicked")?;
938 }
939 result = &mut backfill_handle, if !backfill_done => {
940 backfill_done = true;
941 result.expect("Backfill task panicked")?;
942 }
943 result = &mut snapshot_handle, if !snapshot_done => {
944 snapshot_done = true;
945 result.expect("Snapshot task panicked")?;
946 }
947 }
948 }
949
950 let last_checkpoint = checkpoint_store
951 .get_highest_verified_checkpoint()?
952 .expect("Expected nonempty checkpoint store");
953
954 if verify != SnapshotVerifyMode::None {
956 assert_eq!(
957 last_checkpoint.epoch(),
958 epoch,
959 "Expected highest verified checkpoint ({}) to be for epoch {} but was for epoch {}",
960 last_checkpoint.sequence_number,
961 epoch,
962 last_checkpoint.epoch()
963 );
964 let commitment = last_checkpoint
965 .end_of_epoch_data
966 .as_ref()
967 .expect("Expected highest verified checkpoint to have end of epoch data")
968 .epoch_commitments
969 .last()
970 .expect(
971 "End of epoch has no commitments. This likely means that the epoch \
972 you are attempting to restore from does not support end of epoch state \
973 digest commitment. If restoring from mainnet, `--epoch` must be > 20, \
974 and for testnet, `--epoch` must be > 12.",
975 );
976 match commitment {
977 CheckpointCommitment::ECMHLiveObjectSetDigest(consensus_digest) => {
978 let local_digest: ECMHLiveObjectSetDigest = root_global_state_hash.digest().into();
979 assert_eq!(
980 *consensus_digest, local_digest,
981 "End of epoch {} root state digest {} does not match \
982 local root state hash {} computed from snapshot data",
983 epoch, consensus_digest.digest, local_digest.digest,
984 );
985 let progress_bar = m.add(
986 ProgressBar::new(1).with_style(
987 ProgressStyle::with_template(
988 "[{elapsed_precise}] {wide_bar} Verifying snapshot contents against root state hash ({msg})",
989 )
990 .unwrap(),
991 ),
992 );
993 progress_bar.finish_with_message("Verification complete");
994 }
995 _ => return Err(anyhow!("Expected ECMHLiveObjectSetDigest")),
996 };
997 } else {
998 m.println(
999 "WARNING: Skipping snapshot verification! \
1000 This is highly discouraged unless you fully trust the source of this snapshot and its contents.
1001 If this was unintentional, rerun with `--verify` set to `normal` or `strict`.",
1002 )?;
1003 }
1004
1005 while !snapshot_done || !backfill_done {
1007 tokio::select! {
1008 result = &mut backfill_handle, if !backfill_done => {
1009 backfill_done = true;
1010 result.expect("Backfill task panicked")?;
1011 }
1012 result = &mut snapshot_handle, if !snapshot_done => {
1013 snapshot_done = true;
1014 result.expect("Snapshot task panicked")?;
1015 }
1016 }
1017 }
1018
1019 checkpoint_store.insert_epoch_last_checkpoint(epoch, &last_checkpoint)?;
1023
1024 setup_db_state(
1025 epoch,
1026 root_global_state_hash.clone(),
1027 perpetual_db.clone(),
1028 checkpoint_store.clone(),
1029 committee_store,
1030 network,
1031 verify == SnapshotVerifyMode::Strict,
1032 num_live_objects,
1033 m.clone(),
1034 )
1035 .await?;
1036
1037 #[cfg(tidehunter)]
1040 {
1041 perpetual_db
1042 .force_rebuild_control_region()
1043 .expect("Failed to rebuild tidehunter control region after snapshot restore");
1044 println!(
1049 "Waiting for tidehunter background threads to finish before renaming staging to live"
1050 );
1051 perpetual_db.wait_for_tidehunter_background_threads();
1052 println!("Tidehunter background threads finished, proceeding with rename");
1053 }
1054
1055 let new_path = path.parent().unwrap().join("live");
1056 if new_path.exists() {
1057 fs::remove_dir_all(new_path.clone())?;
1058 }
1059 fs::rename(&path, &new_path)?;
1060 fs::remove_dir_all(snapshot_dir.clone())?;
1061 println!(
1062 "Successfully restored state from snapshot at end of epoch {}",
1063 epoch
1064 );
1065
1066 Ok(())
1067}
1068
1069async fn backfill_epoch_transaction_digests(
1070 perpetual_db: Arc<AuthorityPerpetualTables>,
1071 epoch: EpochId,
1072 ingestion_url: String,
1073 concurrency: usize,
1074 m: MultiProgress,
1075 end_of_epoch_checkpoint_seq_nums: Vec<u64>,
1076) -> Result<()> {
1077 if epoch == 0 {
1078 return Ok(());
1079 }
1080
1081 let epoch_last_cp_seq = end_of_epoch_checkpoint_seq_nums
1087 .get(epoch as usize)
1088 .ok_or_else(|| anyhow!("No checkpoint sequence found for epoch {}", epoch))?;
1089
1090 let epoch_start_cp = if epoch == 0 {
1091 0
1092 } else {
1093 end_of_epoch_checkpoint_seq_nums
1094 .get(epoch as usize - 1)
1095 .map(|cp| cp + 1)
1096 .unwrap_or(0)
1097 };
1098 let msg = format!(
1099 "Beginning transaction digest backfill for epoch: {:?}, backfilling from: {:?}..{:?}",
1100 epoch, epoch_start_cp, epoch_last_cp_seq
1101 );
1102 m.println(&msg).ok();
1103 info!("{}", msg);
1104
1105 let checkpoints_to_fetch: Vec<_> = (epoch_start_cp..=*epoch_last_cp_seq).collect();
1106 let num_checkpoints = checkpoints_to_fetch.len();
1107
1108 let progress_bar = m.add(
1109 ProgressBar::new(num_checkpoints as u64).with_style(
1110 ProgressStyle::with_template(
1111 "[{elapsed_precise}] {wide_bar} {pos}/{len} transactions backfilled ({msg})",
1112 )
1113 .unwrap(),
1114 ),
1115 );
1116
1117 let client = build_object_store(&ingestion_url, vec![]);
1118 let checkpoint_counter = Arc::new(AtomicU64::new(0));
1119 let tx_counter = Arc::new(AtomicU64::new(0));
1120 let cloned_checkpoint_counter = checkpoint_counter.clone();
1121 let cloned_progress_bar = progress_bar.clone();
1122 let start_instant = Instant::now();
1123
1124 tokio::spawn(async move {
1125 loop {
1126 if cloned_progress_bar.is_finished() {
1127 break;
1128 }
1129 let num_checkpoints_processed = cloned_checkpoint_counter.load(Ordering::Relaxed);
1130 let elapsed = start_instant.elapsed().as_secs_f64();
1131 let chkpts_per_sec = if elapsed > 0.0 {
1132 num_checkpoints_processed as f64 / elapsed
1133 } else {
1134 0.0
1135 };
1136 cloned_progress_bar.set_position(num_checkpoints_processed);
1137 cloned_progress_bar.set_message(format!("{:.1} chkpts/sec", chkpts_per_sec));
1138 tokio::time::sleep(Duration::from_millis(100)).await;
1139 }
1140 });
1141
1142 futures::stream::iter(checkpoints_to_fetch)
1143 .map(|sq| {
1144 let client = client.clone();
1145 async move {
1146 fetch_checkpoint(&client, sq)
1147 .await
1148 .map(|c| Arc::new(CheckpointData::from(c)))
1149 }
1150 })
1151 .buffer_unordered(concurrency)
1152 .try_for_each(|checkpoint| {
1153 let perpetual_db = perpetual_db.clone();
1154 let tx_counter = tx_counter.clone();
1155 let checkpoint_counter = checkpoint_counter.clone();
1156 let checkpoint_data = checkpoint;
1157
1158 async move {
1159 let tx_digests: Vec<_> = checkpoint_data
1160 .transactions
1161 .iter()
1162 .map(|tx_data| *tx_data.transaction.digest())
1163 .collect();
1164 let num_txs = tx_digests.len();
1165 perpetual_db
1166 .insert_executed_transaction_digests_batch(epoch, tx_digests.into_iter())?;
1167 tx_counter.fetch_add(num_txs as u64, Ordering::Relaxed);
1168 checkpoint_counter.fetch_add(1, Ordering::Relaxed);
1169 Ok::<(), anyhow::Error>(())
1170 }
1171 })
1172 .await?;
1173
1174 let tx_count = tx_counter.load(Ordering::Relaxed);
1175 progress_bar.finish_with_message(format!(
1176 "Backfill complete: {} transactions from {} checkpoints",
1177 tx_count, num_checkpoints
1178 ));
1179 info!(
1180 "Backfill complete: {} transactions from {} checkpoints",
1181 tx_counter.load(Ordering::Relaxed),
1182 checkpoint_counter.load(Ordering::Relaxed)
1183 );
1184
1185 Ok(())
1186}