sui_tool/
lib.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use fastcrypto::traits::ToFromBytes;
7use futures::future::AbortHandle;
8use futures::future::join_all;
9use itertools::Itertools;
10use std::collections::BTreeMap;
11use std::fmt::Write;
12use std::num::NonZeroUsize;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
16use std::time::Duration;
17use std::{fs, io};
18use sui_config::{NodeConfig, genesis::Genesis};
19use sui_core::authority_client::{AuthorityAPI, NetworkAuthorityClient};
20use sui_core::execution_cache::build_execution_cache_from_env;
21use sui_data_ingestion_core::end_of_epoch_data;
22use sui_network::default_mysten_network_config;
23use sui_protocol_config::Chain;
24use sui_sdk::SuiClient;
25use sui_sdk::SuiClientBuilder;
26use sui_storage::object_store::http::HttpDownloaderBuilder;
27use sui_storage::object_store::util::MANIFEST_FILENAME;
28use sui_storage::object_store::util::Manifest;
29use sui_storage::object_store::util::PerEpochManifest;
30use sui_types::committee::QUORUM_THRESHOLD;
31use sui_types::crypto::AuthorityPublicKeyBytes;
32use sui_types::global_state_hash::GlobalStateHash;
33use sui_types::messages_grpc::LayoutGenerationOption;
34use sui_types::multiaddr::Multiaddr;
35use sui_types::{base_types::*, object::Owner};
36use tokio::sync::mpsc;
37use tokio::task::JoinHandle;
38use tokio::time::Instant;
39
40use anyhow::anyhow;
41use clap::ValueEnum;
42use eyre::ContextCompat;
43use fastcrypto::hash::MultisetHash;
44use futures::{StreamExt, TryStreamExt};
45use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
46use prometheus::Registry;
47use serde::{Deserialize, Serialize};
48use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
49use sui_core::authority::AuthorityStore;
50use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
51use sui_core::checkpoints::CheckpointStore;
52use sui_core::epoch::committee_store::CommitteeStore;
53use sui_core::storage::RocksDbStore;
54use sui_snapshot::reader::StateSnapshotReaderV1;
55use sui_snapshot::setup_db_state;
56use sui_storage::object_store::ObjectStoreGetExt;
57use sui_storage::object_store::util::{copy_file, exists, get_path};
58use sui_types::messages_checkpoint::{CheckpointCommitment, ECMHLiveObjectSetDigest};
59use sui_types::messages_grpc::{
60    ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest,
61    TransactionStatus,
62};
63
64use crate::formal_snapshot_util::read_summaries_for_list_no_verify;
65use sui_core::authority::authority_store_pruner::PrunerWatermarks;
66use sui_types::storage::ReadStore;
67use tracing::info;
68use typed_store::DBMetrics;
69
70pub mod commands;
71#[cfg(not(tidehunter))]
72pub mod db_tool;
73mod formal_snapshot_util;
74
75#[derive(
76    Clone, Serialize, Deserialize, Debug, PartialEq, Copy, PartialOrd, Ord, Eq, ValueEnum, Default,
77)]
78pub enum SnapshotVerifyMode {
79    /// verification of both db state and downloaded checkpoints are skipped.
80    /// This is the fastest mode, but is unsafe, and thus should only be used
81    /// if you fully trust the source for both the snapshot and the checkpoint
82    /// archive.
83    None,
84    /// verify snapshot state during download, but no post-restore db verification.
85    /// Checkpoint verification is performed.
86    #[default]
87    Normal,
88    /// In ADDITION to the behavior of `--verify normal`, verify db state post-restore
89    /// against the end of epoch state root commitment.
90    Strict,
91}
92
93// This functions requires at least one of genesis or fullnode_rpc to be `Some`.
94async fn make_clients(
95    sui_client: &Arc<SuiClient>,
96) -> Result<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>> {
97    let mut net_config = default_mysten_network_config();
98    net_config.connect_timeout = Some(Duration::from_secs(5));
99    let mut authority_clients = BTreeMap::new();
100
101    let active_validators = sui_client
102        .governance_api()
103        .get_latest_sui_system_state()
104        .await?
105        .active_validators;
106
107    for validator in active_validators {
108        let net_addr = Multiaddr::try_from(validator.net_address).unwrap();
109        let tls_config = sui_tls::create_rustls_client_config(
110            sui_types::crypto::NetworkPublicKey::from_bytes(&validator.network_pubkey_bytes)?,
111            sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
112            None,
113        );
114        let channel = net_config
115            .connect_lazy(&net_addr, tls_config)
116            .map_err(|err| anyhow!(err.to_string()))?;
117        let client = NetworkAuthorityClient::new(channel);
118        let public_key_bytes =
119            AuthorityPublicKeyBytes::from_bytes(&validator.protocol_pubkey_bytes)?;
120        authority_clients.insert(public_key_bytes, (net_addr.clone(), client));
121    }
122
123    Ok(authority_clients)
124}
125
126type ObjectVersionResponses = (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64);
127pub struct ObjectData {
128    requested_id: ObjectID,
129    responses: Vec<(AuthorityName, Multiaddr, ObjectVersionResponses)>,
130}
131
132trait OptionDebug<T> {
133    fn opt_debug(&self, def_str: &str) -> String;
134}
135
136impl<T> OptionDebug<T> for Option<T>
137where
138    T: std::fmt::Debug,
139{
140    fn opt_debug(&self, def_str: &str) -> String {
141        match self {
142            None => def_str.to_string(),
143            Some(t) => format!("{:?}", t),
144        }
145    }
146}
147
148#[allow(clippy::type_complexity)]
149pub struct GroupedObjectOutput {
150    pub grouped_results: BTreeMap<
151        Option<(
152            Option<SequenceNumber>,
153            ObjectDigest,
154            TransactionDigest,
155            Owner,
156            Option<TransactionDigest>,
157        )>,
158        Vec<AuthorityName>,
159    >,
160    pub voting_power: Vec<(
161        Option<(
162            Option<SequenceNumber>,
163            ObjectDigest,
164            TransactionDigest,
165            Owner,
166            Option<TransactionDigest>,
167        )>,
168        u64,
169    )>,
170    pub available_voting_power: u64,
171    pub fully_locked: bool,
172}
173
174impl GroupedObjectOutput {
175    pub fn new(
176        object_data: ObjectData,
177        committee: Arc<BTreeMap<AuthorityPublicKeyBytes, u64>>,
178    ) -> Self {
179        let mut grouped_results = BTreeMap::new();
180        let mut voting_power = BTreeMap::new();
181        let mut available_voting_power = 0;
182        for (name, _, (version, resp, _elapsed)) in &object_data.responses {
183            let stake = committee.get(name).unwrap();
184            let key = match resp {
185                Ok(r) => {
186                    let obj_digest = r.object.compute_object_reference().2;
187                    let parent_tx_digest = r.object.previous_transaction;
188                    let owner = r.object.owner.clone();
189                    let lock = r.lock_for_debugging.as_ref().map(|lock| *lock.digest());
190                    if lock.is_none() {
191                        available_voting_power += stake;
192                    }
193                    Some((*version, obj_digest, parent_tx_digest, owner, lock))
194                }
195                Err(_) => None,
196            };
197            let entry = grouped_results.entry(key.clone()).or_insert_with(Vec::new);
198            entry.push(*name);
199            let entry: &mut u64 = voting_power.entry(key).or_default();
200            *entry += stake;
201        }
202        let voting_power = voting_power
203            .into_iter()
204            .sorted_by(|(_, v1), (_, v2)| Ord::cmp(v2, v1))
205            .collect::<Vec<_>>();
206        let mut fully_locked = false;
207        if !voting_power.is_empty()
208            && voting_power.first().unwrap().1 + available_voting_power < QUORUM_THRESHOLD
209        {
210            fully_locked = true;
211        }
212        Self {
213            grouped_results,
214            voting_power,
215            available_voting_power,
216            fully_locked,
217        }
218    }
219}
220
221#[allow(clippy::format_in_format_args)]
222impl std::fmt::Display for GroupedObjectOutput {
223    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224        writeln!(f, "available stake: {}", self.available_voting_power)?;
225        writeln!(f, "fully locked: {}", self.fully_locked)?;
226        writeln!(f, "{:<100}\n", "-".repeat(100))?;
227        for (key, stake) in &self.voting_power {
228            let val = self.grouped_results.get(key).unwrap();
229            writeln!(f, "total stake: {stake}")?;
230            match key {
231                Some((_version, obj_digest, parent_tx_digest, owner, lock)) => {
232                    let lock = lock.opt_debug("no-known-lock");
233                    writeln!(f, "obj ref: {obj_digest}")?;
234                    writeln!(f, "parent tx: {parent_tx_digest}")?;
235                    writeln!(f, "owner: {owner}")?;
236                    writeln!(f, "lock: {lock}")?;
237                    for (i, name) in val.iter().enumerate() {
238                        writeln!(f, "        {:<4} {:<20}", i, name.concise(),)?;
239                    }
240                }
241                None => {
242                    writeln!(f, "ERROR")?;
243                    for (i, name) in val.iter().enumerate() {
244                        writeln!(f, "        {:<4} {:<20}", i, name.concise(),)?;
245                    }
246                }
247            };
248            writeln!(f, "{:<100}\n", "-".repeat(100))?;
249        }
250        Ok(())
251    }
252}
253
254struct ConciseObjectOutput(ObjectData);
255
256impl ConciseObjectOutput {
257    fn header() -> String {
258        format!(
259            "{:<20} {:<8} {:<66} {:<45} {}",
260            "validator", "version", "digest", "parent_cert", "owner"
261        )
262    }
263}
264
265impl std::fmt::Display for ConciseObjectOutput {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        for (name, _multi_addr, (version, resp, _time_elapsed)) in &self.0.responses {
268            write!(
269                f,
270                "{:<20} {:<8}",
271                format!("{:?}", name.concise()),
272                version.map(|s| s.value()).opt_debug("-")
273            )?;
274            match resp {
275                Err(_) => writeln!(
276                    f,
277                    "{:<66} {:<45} {:<51}",
278                    "object-fetch-failed", "no-cert-available", "no-owner-available"
279                )?,
280                Ok(resp) => {
281                    let obj_digest = resp.object.compute_object_reference().2;
282                    let parent = resp.object.previous_transaction;
283                    let owner = resp.object.owner.clone();
284                    write!(f, " {:<66} {:<45} {:<51}", obj_digest, parent, owner)?;
285                }
286            }
287            writeln!(f)?;
288        }
289        Ok(())
290    }
291}
292
293struct VerboseObjectOutput(ObjectData);
294
295impl std::fmt::Display for VerboseObjectOutput {
296    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297        writeln!(f, "Object: {}", self.0.requested_id)?;
298
299        for (name, multiaddr, (version, resp, timespent)) in &self.0.responses {
300            writeln!(f, "validator: {:?}, addr: {:?}", name.concise(), multiaddr)?;
301            writeln!(
302                f,
303                "-- version: {} ({:.3}s)",
304                version.opt_debug("<version not available>"),
305                timespent,
306            )?;
307
308            match resp {
309                Err(e) => writeln!(f, "Error fetching object: {}", e)?,
310                Ok(resp) => {
311                    writeln!(
312                        f,
313                        "  -- object digest: {}",
314                        resp.object.compute_object_reference().2
315                    )?;
316                    if resp.object.is_package() {
317                        writeln!(f, "  -- object: <Move Package>")?;
318                    } else if let Some(layout) = &resp.layout {
319                        writeln!(
320                            f,
321                            "  -- object: Move Object: {}",
322                            resp.object
323                                .data
324                                .try_as_move()
325                                .unwrap()
326                                .to_move_struct(layout)
327                                .unwrap()
328                        )?;
329                    }
330                    writeln!(f, "  -- owner: {}", resp.object.owner)?;
331                    writeln!(
332                        f,
333                        "  -- locked by: {}",
334                        resp.lock_for_debugging.opt_debug("<not locked>")
335                    )?;
336                }
337            }
338        }
339        Ok(())
340    }
341}
342
343pub async fn get_object(
344    obj_id: ObjectID,
345    version: Option<u64>,
346    validator: Option<AuthorityName>,
347    clients: Arc<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>>,
348) -> Result<ObjectData> {
349    let responses = join_all(
350        clients
351            .iter()
352            .filter(|(name, _)| {
353                if let Some(v) = validator {
354                    v == **name
355                } else {
356                    true
357                }
358            })
359            .map(|(name, (address, client))| async {
360                let object_version = get_object_impl(client, obj_id, version).await;
361                (*name, address.clone(), object_version)
362            }),
363    )
364    .await;
365
366    Ok(ObjectData {
367        requested_id: obj_id,
368        responses,
369    })
370}
371
372pub async fn get_transaction_block(
373    tx_digest: TransactionDigest,
374    show_input_tx: bool,
375    fullnode_rpc: String,
376) -> Result<String> {
377    let sui_client = Arc::new(SuiClientBuilder::default().build(fullnode_rpc).await?);
378    let clients = make_clients(&sui_client).await?;
379    let timer = Instant::now();
380    let responses = join_all(clients.iter().map(|(name, (address, client))| async {
381        let result = client
382            .handle_transaction_info_request(TransactionInfoRequest {
383                transaction_digest: tx_digest,
384            })
385            .await;
386        (
387            *name,
388            address.clone(),
389            result,
390            timer.elapsed().as_secs_f64(),
391        )
392    }))
393    .await;
394
395    // Grab one validator that return Some(TransactionInfoResponse)
396    let validator_aware_of_tx = responses.iter().find(|r| r.2.is_ok());
397
398    let responses = responses
399        .iter()
400        .map(|r| {
401            let key =
402                r.2.as_ref()
403                    .map(|ok_result| match &ok_result.status {
404                        TransactionStatus::Signed(_) => None,
405                        TransactionStatus::Executed(_, effects, _) => Some(effects.digest()),
406                    })
407                    .ok();
408            let err = r.2.as_ref().err();
409            (key, err, r)
410        })
411        .sorted_by(|(k1, err1, _), (k2, err2, _)| {
412            Ord::cmp(k1, k2).then_with(|| Ord::cmp(err1, err2))
413        })
414        .chunk_by(|(_, _err, r)| {
415            r.2.as_ref().map(|ok_result| match &ok_result.status {
416                TransactionStatus::Signed(_) => None,
417                TransactionStatus::Executed(_, effects, _) => Some((
418                    ok_result.transaction.transaction_data(),
419                    effects.data(),
420                    effects.digest(),
421                )),
422            })
423        });
424    let mut s = String::new();
425    for (i, (key, group)) in responses.into_iter().enumerate() {
426        match key {
427            Ok(Some((tx, effects, effects_digest))) => {
428                writeln!(
429                    &mut s,
430                    "#{:<2} tx_digest: {:<68?} effects_digest: {:?}",
431                    i, tx_digest, effects_digest,
432                )?;
433                writeln!(&mut s, "{:#?}", effects)?;
434                if show_input_tx {
435                    writeln!(&mut s, "{:#?}", tx)?;
436                }
437            }
438            Ok(None) => {
439                writeln!(
440                    &mut s,
441                    "#{:<2} tx_digest: {:<68?} Signed but not executed",
442                    i, tx_digest
443                )?;
444                if show_input_tx {
445                    // In this case, we expect at least one validator knows about this tx
446                    let validator_aware_of_tx = validator_aware_of_tx.unwrap();
447                    let client = &clients.get(&validator_aware_of_tx.0).unwrap().1;
448                    let tx_info = client.handle_transaction_info_request(TransactionInfoRequest {
449                        transaction_digest: tx_digest,
450                    }).await.unwrap_or_else(|e| panic!("Validator {:?} should have known about tx_digest: {:?}, got error: {:?}", validator_aware_of_tx.0, tx_digest, e));
451                    writeln!(&mut s, "{:#?}", tx_info)?;
452                }
453            }
454            other => {
455                writeln!(&mut s, "#{:<2} {:#?}", i, other)?;
456            }
457        }
458        for (j, (_, _, res)) in group.enumerate() {
459            writeln!(
460                &mut s,
461                "        {:<4} {:<20} {:<56} ({:.3}s)",
462                j,
463                res.0.concise(),
464                format!("{}", res.1),
465                res.3
466            )?;
467        }
468        writeln!(&mut s, "{:<100}\n", "-".repeat(100))?;
469    }
470    Ok(s)
471}
472
473async fn get_object_impl(
474    client: &NetworkAuthorityClient,
475    id: ObjectID,
476    version: Option<u64>,
477) -> (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64) {
478    let start = Instant::now();
479    let resp = client
480        .handle_object_info_request(ObjectInfoRequest {
481            object_id: id,
482            generate_layout: LayoutGenerationOption::Generate,
483            request_kind: match version {
484                None => ObjectInfoRequestKind::LatestObjectInfo,
485                Some(v) => ObjectInfoRequestKind::PastObjectInfoDebug(SequenceNumber::from_u64(v)),
486            },
487        })
488        .await
489        .map_err(anyhow::Error::from);
490    let elapsed = start.elapsed().as_secs_f64();
491
492    let resp_version = resp.as_ref().ok().map(|r| r.object.version().value());
493    (resp_version.map(SequenceNumber::from), resp, elapsed)
494}
495
496pub(crate) fn make_anemo_config() -> anemo_cli::Config {
497    use sui_network::discovery::*;
498    use sui_network::state_sync::*;
499
500    // TODO: implement `ServiceInfo` generation in anemo-build and use here.
501    anemo_cli::Config::new()
502        // Sui discovery
503        .add_service(
504            "Discovery",
505            anemo_cli::ServiceInfo::new().add_method(
506                "GetKnownPeersV2",
507                anemo_cli::ron_method!(DiscoveryClient, get_known_peers_v2, ()),
508            ),
509        )
510        // Sui state sync
511        .add_service(
512            "StateSync",
513            anemo_cli::ServiceInfo::new()
514                .add_method(
515                    "PushCheckpointSummary",
516                    anemo_cli::ron_method!(
517                        StateSyncClient,
518                        push_checkpoint_summary,
519                        sui_types::messages_checkpoint::CertifiedCheckpointSummary
520                    ),
521                )
522                .add_method(
523                    "GetCheckpointSummary",
524                    anemo_cli::ron_method!(
525                        StateSyncClient,
526                        get_checkpoint_summary,
527                        GetCheckpointSummaryRequest
528                    ),
529                )
530                .add_method(
531                    "GetCheckpointContents",
532                    anemo_cli::ron_method!(
533                        StateSyncClient,
534                        get_checkpoint_contents,
535                        sui_types::messages_checkpoint::CheckpointContentsDigest
536                    ),
537                )
538                .add_method(
539                    "GetCheckpointAvailability",
540                    anemo_cli::ron_method!(StateSyncClient, get_checkpoint_availability, ()),
541                ),
542        )
543}
544
545fn copy_dir_all(
546    src: impl AsRef<Path>,
547    dst: impl AsRef<Path>,
548    skip: Vec<PathBuf>,
549) -> io::Result<()> {
550    fs::create_dir_all(&dst)?;
551    for entry in fs::read_dir(src)? {
552        let entry = entry?;
553        let ty = entry.file_type()?;
554        if skip.contains(&entry.path()) {
555            continue;
556        }
557        if ty.is_dir() {
558            copy_dir_all(
559                entry.path(),
560                dst.as_ref().join(entry.file_name()),
561                skip.clone(),
562            )?;
563        } else {
564            fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
565        }
566    }
567    Ok(())
568}
569
570pub async fn restore_from_db_checkpoint(
571    config: &NodeConfig,
572    db_checkpoint_path: &Path,
573) -> Result<(), anyhow::Error> {
574    copy_dir_all(db_checkpoint_path, config.db_path(), vec![])?;
575    Ok(())
576}
577
578fn start_summary_sync(
579    perpetual_db: Arc<AuthorityPerpetualTables>,
580    committee_store: Arc<CommitteeStore>,
581    checkpoint_store: Arc<CheckpointStore>,
582    m: MultiProgress,
583    genesis: Genesis,
584    ingestion_url: String,
585    epoch: u64,
586    num_parallel_downloads: usize,
587    verify: bool,
588) -> JoinHandle<Result<(), anyhow::Error>> {
589    tokio::spawn(async move {
590        info!("Starting summary sync");
591        let store = AuthorityStore::open_no_genesis(perpetual_db, false, &Registry::default())?;
592        let cache_traits = build_execution_cache_from_env(&Registry::default(), &store);
593        let state_sync_store =
594            RocksDbStore::new(cache_traits, committee_store, checkpoint_store.clone());
595        // Only insert the genesis checkpoint if the DB is empty and doesn't have it already
596        if checkpoint_store
597            .get_checkpoint_by_digest(genesis.checkpoint().digest())
598            .unwrap()
599            .is_none()
600        {
601            checkpoint_store.insert_checkpoint_contents(genesis.checkpoint_contents().clone())?;
602            checkpoint_store.insert_verified_checkpoint(&genesis.checkpoint())?;
603            checkpoint_store.update_highest_synced_checkpoint(&genesis.checkpoint())?;
604        }
605
606        let end_of_epoch_checkpoint_seq_nums: Vec<_> =
607            end_of_epoch_data(ingestion_url.clone(), vec![], 5)
608                .await?
609                .into_iter()
610                .take((epoch + 1) as usize)
611                .collect();
612
613        let last_checkpoint = end_of_epoch_checkpoint_seq_nums
614            .last()
615            .expect("Expected at least one checkpoint");
616
617        let num_to_sync = end_of_epoch_checkpoint_seq_nums.len() as u64;
618        let sync_progress_bar = m.add(
619            ProgressBar::new(num_to_sync).with_style(
620                ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})")
621                    .unwrap(),
622            ),
623        );
624
625        let cloned_progress_bar = sync_progress_bar.clone();
626        let sync_checkpoint_counter = Arc::new(AtomicU64::new(0));
627        let s_instant = Instant::now();
628
629        let cloned_counter = sync_checkpoint_counter.clone();
630        let latest_synced = checkpoint_store
631            .get_highest_synced_checkpoint()?
632            .map(|c| c.sequence_number)
633            .unwrap_or(0);
634        let s_start = latest_synced
635            .checked_add(1)
636            .context("Checkpoint overflow")
637            .map_err(|_| anyhow!("Failed to increment checkpoint"))?;
638        tokio::spawn(async move {
639            loop {
640                if cloned_progress_bar.is_finished() {
641                    break;
642                }
643                let num_summaries = cloned_counter.load(Ordering::Relaxed);
644                let total_checkpoints_per_sec =
645                    num_summaries as f64 / s_instant.elapsed().as_secs_f64();
646                cloned_progress_bar.set_position(s_start + num_summaries);
647                cloned_progress_bar.set_message(format!(
648                    "checkpoints synced per sec: {}",
649                    total_checkpoints_per_sec
650                ));
651                tokio::time::sleep(Duration::from_secs(1)).await;
652            }
653        });
654
655        read_summaries_for_list_no_verify(
656            ingestion_url,
657            num_parallel_downloads,
658            state_sync_store.clone(),
659            end_of_epoch_checkpoint_seq_nums.clone(),
660            sync_checkpoint_counter,
661        )
662        .await?;
663        sync_progress_bar.finish_with_message("Checkpoint summary sync is complete");
664
665        let checkpoint = checkpoint_store
666            .get_checkpoint_by_sequence_number(*last_checkpoint)?
667            .ok_or(anyhow!("Failed to read last checkpoint"))?;
668        if verify {
669            let verify_progress_bar = m.add(
670                ProgressBar::new(num_to_sync).with_style(
671                    ProgressStyle::with_template(
672                        "[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})",
673                    )
674                    .unwrap(),
675                ),
676            );
677            let cloned_verify_progress_bar = verify_progress_bar.clone();
678            let verify_checkpoint_counter = Arc::new(AtomicU64::new(0));
679            let cloned_verify_counter = verify_checkpoint_counter.clone();
680            let v_instant = Instant::now();
681
682            tokio::spawn(async move {
683                loop {
684                    if cloned_verify_progress_bar.is_finished() {
685                        break;
686                    }
687                    let num_summaries = cloned_verify_counter.load(Ordering::Relaxed);
688                    let total_checkpoints_per_sec =
689                        num_summaries as f64 / v_instant.elapsed().as_secs_f64();
690                    cloned_verify_progress_bar.set_position(num_summaries);
691                    cloned_verify_progress_bar.set_message(format!(
692                        "checkpoints verified per sec: {}",
693                        total_checkpoints_per_sec
694                    ));
695                    tokio::time::sleep(Duration::from_secs(1)).await;
696                }
697            });
698
699            for (cp_epoch, epoch_last_cp_seq_num) in
700                end_of_epoch_checkpoint_seq_nums.iter().enumerate()
701            {
702                let epoch_last_checkpoint = checkpoint_store
703                    .get_checkpoint_by_sequence_number(*epoch_last_cp_seq_num)?
704                    .ok_or(anyhow!("Failed to read checkpoint"))?;
705                let committee = state_sync_store.get_committee(cp_epoch as u64).expect(
706                    "Expected committee to exist after syncing all end of epoch checkpoints",
707                );
708                epoch_last_checkpoint
709                    .verify_authority_signatures(&committee)
710                    .expect("Failed to verify checkpoint");
711                verify_checkpoint_counter.fetch_add(1, Ordering::Relaxed);
712            }
713
714            verify_progress_bar.finish_with_message("Checkpoint summary verification is complete");
715        }
716
717        checkpoint_store.update_highest_verified_checkpoint(&checkpoint)?;
718        checkpoint_store.update_highest_synced_checkpoint(&checkpoint)?;
719        checkpoint_store.update_highest_executed_checkpoint(&checkpoint)?;
720        checkpoint_store.update_highest_pruned_checkpoint(&checkpoint)?;
721        Ok::<(), anyhow::Error>(())
722    })
723}
724
725pub async fn get_latest_available_epoch(
726    snapshot_store_config: &ObjectStoreConfig,
727) -> Result<u64, anyhow::Error> {
728    let remote_object_store = if snapshot_store_config.no_sign_request {
729        snapshot_store_config.make_http()?
730    } else {
731        snapshot_store_config.make().map(Arc::new)?
732    };
733    let manifest_contents = remote_object_store
734        .get_bytes(&get_path(MANIFEST_FILENAME))
735        .await?;
736    let root_manifest: Manifest = serde_json::from_slice(&manifest_contents)
737        .map_err(|err| anyhow!("Error parsing MANIFEST from bytes: {}", err))?;
738    let epoch = root_manifest
739        .available_epochs
740        .iter()
741        .max()
742        .ok_or(anyhow!("No snapshot found in manifest"))?;
743    Ok(*epoch)
744}
745
746pub async fn check_completed_snapshot(
747    snapshot_store_config: &ObjectStoreConfig,
748    epoch: EpochId,
749) -> Result<(), anyhow::Error> {
750    let success_marker = format!("epoch_{}/_SUCCESS", epoch);
751    let remote_object_store = if snapshot_store_config.no_sign_request {
752        snapshot_store_config.make_http()?
753    } else {
754        snapshot_store_config.make().map(Arc::new)?
755    };
756    if exists(&remote_object_store, &get_path(success_marker.as_str())).await {
757        Ok(())
758    } else {
759        Err(anyhow!(
760            "missing success marker at {}/{}",
761            snapshot_store_config.bucket.as_ref().unwrap_or(
762                &snapshot_store_config
763                    .clone()
764                    .aws_endpoint
765                    .unwrap_or("unknown_bucket".to_string())
766            ),
767            success_marker
768        ))
769    }
770}
771
772pub async fn download_formal_snapshot(
773    path: &Path,
774    epoch: EpochId,
775    genesis: &Path,
776    snapshot_store_config: ObjectStoreConfig,
777    ingestion_url: &str,
778    num_parallel_downloads: usize,
779    network: Chain,
780    verify: SnapshotVerifyMode,
781    max_retries: usize,
782) -> Result<(), anyhow::Error> {
783    let m = MultiProgress::new();
784    m.println(format!(
785        "Beginning formal snapshot restore to end of epoch {}, network: {:?}, verification mode: {:?}",
786        epoch, network, verify,
787    ))?;
788    let path = path.join("staging").to_path_buf();
789    if path.exists() {
790        fs::remove_dir_all(path.clone())?;
791    }
792
793    // Start prometheus server so that we can serve metrics during snapshot download
794    let registry_service =
795        mysten_metrics::start_prometheus_server("127.0.0.1:9184".parse().unwrap());
796    let prometheus_registry = registry_service.default_registry();
797    DBMetrics::init(registry_service.clone());
798    mysten_metrics::init_metrics(&prometheus_registry);
799
800    let perpetual_db = Arc::new(AuthorityPerpetualTables::open(
801        &path.join("store"),
802        None,
803        None,
804    ));
805    let genesis = Genesis::load(genesis).unwrap();
806    let genesis_committee = genesis.committee()?;
807    let committee_store = Arc::new(CommitteeStore::new(
808        path.join("epochs"),
809        &genesis_committee,
810        None,
811    ));
812    let checkpoint_store = CheckpointStore::new(
813        &path.join("checkpoints"),
814        Arc::new(PrunerWatermarks::default()),
815    );
816
817    let summaries_handle = start_summary_sync(
818        perpetual_db.clone(),
819        committee_store.clone(),
820        checkpoint_store.clone(),
821        m.clone(),
822        genesis.clone(),
823        ingestion_url.to_string(),
824        epoch,
825        num_parallel_downloads,
826        verify != SnapshotVerifyMode::None,
827    );
828    let (_abort_handle, abort_registration) = AbortHandle::new_pair();
829    let perpetual_db_clone = perpetual_db.clone();
830    let snapshot_dir = path.parent().unwrap().join("snapshot");
831    if snapshot_dir.exists() {
832        fs::remove_dir_all(snapshot_dir.clone())?;
833    }
834    let snapshot_dir_clone = snapshot_dir.clone();
835
836    // TODO if verify is false, we should skip generating these and
837    // not pass in a channel to the reader
838    let (sender, mut receiver) = mpsc::channel(num_parallel_downloads);
839    let m_clone = m.clone();
840
841    let snapshot_handle = tokio::spawn(async move {
842        let local_store_config = ObjectStoreConfig {
843            object_store: Some(ObjectStoreType::File),
844            directory: Some(snapshot_dir_clone.to_path_buf()),
845            ..Default::default()
846        };
847        let mut reader = StateSnapshotReaderV1::new(
848            epoch,
849            &snapshot_store_config,
850            &local_store_config,
851            NonZeroUsize::new(num_parallel_downloads).unwrap(),
852            m_clone,
853            false, // skip_reset_local_store
854            max_retries,
855        )
856        .await
857        .unwrap_or_else(|err| panic!("Failed to create reader: {}", err));
858        reader
859            .read(&perpetual_db_clone, abort_registration, Some(sender))
860            .await
861            .unwrap_or_else(|err| panic!("Failed during read: {}", err));
862        Ok::<(), anyhow::Error>(())
863    });
864    let mut root_global_state_hash = GlobalStateHash::default();
865    let mut num_live_objects = 0;
866    while let Some((partial_hash, num_objects)) = receiver.recv().await {
867        num_live_objects += num_objects;
868        root_global_state_hash.union(&partial_hash);
869    }
870    summaries_handle
871        .await
872        .expect("Task join failed")
873        .expect("Summaries task failed");
874
875    let last_checkpoint = checkpoint_store
876        .get_highest_verified_checkpoint()?
877        .expect("Expected nonempty checkpoint store");
878
879    // Perform snapshot state verification
880    if verify != SnapshotVerifyMode::None {
881        assert_eq!(
882            last_checkpoint.epoch(),
883            epoch,
884            "Expected highest verified checkpoint ({}) to be for epoch {} but was for epoch {}",
885            last_checkpoint.sequence_number,
886            epoch,
887            last_checkpoint.epoch()
888        );
889        let commitment = last_checkpoint
890            .end_of_epoch_data
891            .as_ref()
892            .expect("Expected highest verified checkpoint to have end of epoch data")
893            .epoch_commitments
894            .last()
895            .expect(
896                "End of epoch has no commitments. This likely means that the epoch \
897                you are attempting to restore from does not support end of epoch state \
898                digest commitment. If restoring from mainnet, `--epoch` must be > 20, \
899                and for testnet, `--epoch` must be > 12.",
900            );
901        match commitment {
902            CheckpointCommitment::ECMHLiveObjectSetDigest(consensus_digest) => {
903                let local_digest: ECMHLiveObjectSetDigest = root_global_state_hash.digest().into();
904                assert_eq!(
905                    *consensus_digest, local_digest,
906                    "End of epoch {} root state digest {} does not match \
907                    local root state hash {} computed from snapshot data",
908                    epoch, consensus_digest.digest, local_digest.digest,
909                );
910                let progress_bar = m.add(
911                    ProgressBar::new(1).with_style(
912                        ProgressStyle::with_template(
913                            "[{elapsed_precise}] {wide_bar} Verifying snapshot contents against root state hash ({msg})",
914                        )
915                        .unwrap(),
916                    ),
917                );
918                progress_bar.finish_with_message("Verification complete");
919            }
920            _ => return Err(anyhow!("Expected ECMHLiveObjectSetDigest")),
921        };
922    } else {
923        m.println(
924            "WARNING: Skipping snapshot verification! \
925            This is highly discouraged unless you fully trust the source of this snapshot and its contents.
926            If this was unintentional, rerun with `--verify` set to `normal` or `strict`.",
927        )?;
928    }
929
930    snapshot_handle
931        .await
932        .expect("Task join failed")
933        .expect("Snapshot restore task failed");
934
935    // TODO we should ensure this map is being updated for all end of epoch
936    // checkpoints during summary sync. This happens in `insert_{verified|certified}_checkpoint`
937    // in checkpoint store, but not in the corresponding functions in ObjectStore trait
938    checkpoint_store.insert_epoch_last_checkpoint(epoch, &last_checkpoint)?;
939
940    setup_db_state(
941        epoch,
942        root_global_state_hash.clone(),
943        perpetual_db.clone(),
944        checkpoint_store,
945        committee_store,
946        network,
947        verify == SnapshotVerifyMode::Strict,
948        num_live_objects,
949        m,
950    )
951    .await?;
952
953    let new_path = path.parent().unwrap().join("live");
954    if new_path.exists() {
955        fs::remove_dir_all(new_path.clone())?;
956    }
957    fs::rename(&path, &new_path)?;
958    fs::remove_dir_all(snapshot_dir.clone())?;
959    println!(
960        "Successfully restored state from snapshot at end of epoch {}",
961        epoch
962    );
963
964    Ok(())
965}
966
967pub async fn download_db_snapshot(
968    path: &Path,
969    epoch: u64,
970    snapshot_store_config: ObjectStoreConfig,
971    skip_indexes: bool,
972    num_parallel_downloads: usize,
973    max_retries: usize,
974) -> Result<(), anyhow::Error> {
975    let remote_store = if snapshot_store_config.no_sign_request {
976        snapshot_store_config.make_http()?
977    } else {
978        snapshot_store_config.make().map(Arc::new)?
979    };
980
981    // We rely on the top level MANIFEST file which contains all valid epochs
982    let manifest_contents = remote_store.get_bytes(&get_path(MANIFEST_FILENAME)).await?;
983    let root_manifest: Manifest = serde_json::from_slice(&manifest_contents)
984        .map_err(|err| anyhow!("Error parsing MANIFEST from bytes: {}", err))?;
985
986    if !root_manifest.epoch_exists(epoch) {
987        return Err(anyhow!(
988            "Epoch dir {} doesn't exist on the remote store",
989            epoch
990        ));
991    }
992
993    let epoch_path = format!("epoch_{}", epoch);
994    let epoch_dir = get_path(&epoch_path);
995
996    let manifest_file = epoch_dir.child(MANIFEST_FILENAME);
997    let epoch_manifest_contents =
998        String::from_utf8(remote_store.get_bytes(&manifest_file).await?.to_vec())
999            .map_err(|err| anyhow!("Error parsing {}/MANIFEST from bytes: {}", epoch_path, err))?;
1000
1001    let epoch_manifest =
1002        PerEpochManifest::deserialize_from_newline_delimited(&epoch_manifest_contents);
1003
1004    let mut files: Vec<String> = vec![];
1005    files.extend(epoch_manifest.filter_by_prefix("store/perpetual").lines);
1006    files.extend(epoch_manifest.filter_by_prefix("epochs").lines);
1007    files.extend(epoch_manifest.filter_by_prefix("checkpoints").lines);
1008    if !skip_indexes {
1009        files.extend(epoch_manifest.filter_by_prefix("indexes").lines)
1010    }
1011    let local_store = ObjectStoreConfig {
1012        object_store: Some(ObjectStoreType::File),
1013        directory: Some(path.to_path_buf()),
1014        ..Default::default()
1015    }
1016    .make()?;
1017    let m = MultiProgress::new();
1018    let path = path.to_path_buf();
1019    let snapshot_handle = tokio::spawn(async move {
1020        let progress_bar = m.add(
1021            ProgressBar::new(files.len() as u64).with_style(
1022                ProgressStyle::with_template(
1023                    "[{elapsed_precise}] {wide_bar} {pos} out of {len} files done ({msg})",
1024                )
1025                .unwrap(),
1026            ),
1027        );
1028        let cloned_progress_bar = progress_bar.clone();
1029        let file_counter = Arc::new(AtomicUsize::new(0));
1030        futures::stream::iter(files.iter())
1031            .map(|file| {
1032                let local_store = local_store.clone();
1033                let remote_store = remote_store.clone();
1034                let counter_cloned = file_counter.clone();
1035                async move {
1036                    counter_cloned.fetch_add(1, Ordering::Relaxed);
1037                    let file_path = get_path(format!("epoch_{}/{}", epoch, file).as_str());
1038
1039                    let mut attempts = 0;
1040                    let max_attempts = max_retries + 1;
1041                    loop {
1042                        attempts += 1;
1043                        match copy_file(&file_path, &file_path, &remote_store, &local_store).await {
1044                            Ok(()) => break,
1045                            Err(e) if attempts >= max_attempts => {
1046                                return Err(anyhow::anyhow!(
1047                                    "Failed to download {} after {} attempts: {}",
1048                                    file_path,
1049                                    attempts,
1050                                    e
1051                                ));
1052                            }
1053                            Err(e) => {
1054                                tracing::warn!(
1055                                    "Failed to download {} (attempt {}/{}): {}, retrying in {}ms",
1056                                    file_path,
1057                                    attempts,
1058                                    max_attempts,
1059                                    e,
1060                                    1000 * attempts
1061                                );
1062                                tokio::time::sleep(Duration::from_millis(1000 * attempts as u64))
1063                                    .await;
1064                            }
1065                        }
1066                    }
1067
1068                    Ok::<::object_store::path::Path, anyhow::Error>(file_path.clone())
1069                }
1070            })
1071            .boxed()
1072            .buffer_unordered(num_parallel_downloads)
1073            .try_for_each(|path| {
1074                file_counter.fetch_sub(1, Ordering::Relaxed);
1075                cloned_progress_bar.inc(1);
1076                cloned_progress_bar.set_message(format!(
1077                    "Downloading file: {}, #downloads_in_progress: {}",
1078                    path,
1079                    file_counter.load(Ordering::Relaxed)
1080                ));
1081                futures::future::ready(Ok(()))
1082            })
1083            .await?;
1084        progress_bar.finish_with_message("Snapshot file download is complete");
1085        Ok::<(), anyhow::Error>(())
1086    });
1087
1088    let tasks: Vec<_> = vec![Box::pin(snapshot_handle)];
1089    join_all(tasks)
1090        .await
1091        .into_iter()
1092        .collect::<Result<Vec<_>, _>>()?
1093        .into_iter()
1094        .for_each(|result| result.expect("Task failed"));
1095
1096    let store_dir = path.join("store");
1097    if store_dir.exists() {
1098        fs::remove_dir_all(&store_dir)?;
1099    }
1100    let epochs_dir = path.join("epochs");
1101    if epochs_dir.exists() {
1102        fs::remove_dir_all(&epochs_dir)?;
1103    }
1104    Ok(())
1105}