sui_tool/
lib.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use fastcrypto::traits::ToFromBytes;
7use futures::future::AbortHandle;
8use futures::future::join_all;
9use itertools::Itertools;
10use std::collections::BTreeMap;
11use std::fmt::Write;
12use std::num::NonZeroUsize;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
16use std::time::Duration;
17use std::{fs, io};
18use sui_config::{NodeConfig, genesis::Genesis};
19use sui_core::authority_client::{AuthorityAPI, NetworkAuthorityClient};
20use sui_core::execution_cache::build_execution_cache_from_env;
21use sui_network::default_mysten_network_config;
22use sui_protocol_config::Chain;
23use sui_rpc_api::Client;
24use sui_storage::object_store::http::HttpDownloaderBuilder;
25use sui_storage::object_store::util::MANIFEST_FILENAME;
26use sui_storage::object_store::util::Manifest;
27use sui_storage::object_store::util::PerEpochManifest;
28use sui_storage::object_store::util::{build_object_store, end_of_epoch_data, fetch_checkpoint};
29use sui_types::committee::QUORUM_THRESHOLD;
30use sui_types::crypto::AuthorityPublicKeyBytes;
31use sui_types::global_state_hash::GlobalStateHash;
32use sui_types::messages_grpc::LayoutGenerationOption;
33use sui_types::multiaddr::Multiaddr;
34use sui_types::{base_types::*, object::Owner};
35use tokio::sync::mpsc;
36use tokio::task::JoinHandle;
37use tokio::time::Instant;
38
39use anyhow::anyhow;
40use clap::ValueEnum;
41use eyre::ContextCompat;
42use fastcrypto::hash::MultisetHash;
43use futures::{StreamExt, TryStreamExt};
44use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
45use prometheus::Registry;
46use serde::{Deserialize, Serialize};
47use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
48use sui_core::authority::AuthorityStore;
49use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
50use sui_core::checkpoints::CheckpointStore;
51use sui_core::epoch::committee_store::CommitteeStore;
52use sui_core::storage::RocksDbStore;
53use sui_snapshot::reader::StateSnapshotReaderV1;
54use sui_snapshot::setup_db_state;
55use sui_storage::object_store::ObjectStoreGetExt;
56use sui_storage::object_store::util::{copy_file, exists, get_path};
57use sui_types::full_checkpoint_content::CheckpointData;
58use sui_types::messages_checkpoint::{CheckpointCommitment, ECMHLiveObjectSetDigest};
59use sui_types::messages_grpc::{
60    ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest,
61    TransactionStatus,
62};
63
64use crate::formal_snapshot_util::read_summaries_for_list_no_verify;
65use sui_core::authority::authority_store_pruner::PrunerWatermarks;
66use sui_types::storage::ReadStore;
67use tracing::info;
68use typed_store::DBMetrics;
69
70pub mod commands;
71pub mod db_tool;
72mod formal_snapshot_util;
73
74#[derive(
75    Clone, Serialize, Deserialize, Debug, PartialEq, Copy, PartialOrd, Ord, Eq, ValueEnum, Default,
76)]
77pub enum SnapshotVerifyMode {
78    /// verification of both db state and downloaded checkpoints are skipped.
79    /// This is the fastest mode, but is unsafe, and thus should only be used
80    /// if you fully trust the source for both the snapshot and the checkpoint
81    /// archive.
82    None,
83    /// verify snapshot state during download, but no post-restore db verification.
84    /// Checkpoint verification is performed.
85    #[default]
86    Normal,
87    /// In ADDITION to the behavior of `--verify normal`, verify db state post-restore
88    /// against the end of epoch state root commitment.
89    Strict,
90}
91
92// This functions requires at least one of genesis or fullnode_rpc to be `Some`.
93async fn make_clients(
94    sui_client: &Client,
95) -> Result<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>> {
96    let mut net_config = default_mysten_network_config();
97    net_config.connect_timeout = Some(Duration::from_secs(5));
98    let mut authority_clients = BTreeMap::new();
99
100    let active_validators = sui_client
101        .get_system_state_summary(None)
102        .await?
103        .active_validators;
104
105    for validator in active_validators {
106        let net_addr = Multiaddr::try_from(validator.net_address).unwrap();
107        let tls_config = sui_tls::create_rustls_client_config(
108            sui_types::crypto::NetworkPublicKey::from_bytes(&validator.network_pubkey_bytes)?,
109            sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
110            None,
111        );
112        let channel = net_config
113            .connect_lazy(&net_addr, tls_config)
114            .map_err(|err| anyhow!(err.to_string()))?;
115        let client = NetworkAuthorityClient::new(channel);
116        let public_key_bytes =
117            AuthorityPublicKeyBytes::from_bytes(&validator.protocol_pubkey_bytes)?;
118        authority_clients.insert(public_key_bytes, (net_addr.clone(), client));
119    }
120
121    Ok(authority_clients)
122}
123
124type ObjectVersionResponses = (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64);
125pub struct ObjectData {
126    requested_id: ObjectID,
127    responses: Vec<(AuthorityName, Multiaddr, ObjectVersionResponses)>,
128}
129
130trait OptionDebug<T> {
131    fn opt_debug(&self, def_str: &str) -> String;
132}
133
134impl<T> OptionDebug<T> for Option<T>
135where
136    T: std::fmt::Debug,
137{
138    fn opt_debug(&self, def_str: &str) -> String {
139        match self {
140            None => def_str.to_string(),
141            Some(t) => format!("{:?}", t),
142        }
143    }
144}
145
146#[allow(clippy::type_complexity)]
147pub struct GroupedObjectOutput {
148    pub grouped_results: BTreeMap<
149        Option<(
150            Option<SequenceNumber>,
151            ObjectDigest,
152            TransactionDigest,
153            Owner,
154            Option<TransactionDigest>,
155        )>,
156        Vec<AuthorityName>,
157    >,
158    pub voting_power: Vec<(
159        Option<(
160            Option<SequenceNumber>,
161            ObjectDigest,
162            TransactionDigest,
163            Owner,
164            Option<TransactionDigest>,
165        )>,
166        u64,
167    )>,
168    pub available_voting_power: u64,
169    pub fully_locked: bool,
170}
171
172impl GroupedObjectOutput {
173    pub fn new(
174        object_data: ObjectData,
175        committee: Arc<BTreeMap<AuthorityPublicKeyBytes, u64>>,
176    ) -> Self {
177        let mut grouped_results = BTreeMap::new();
178        let mut voting_power = BTreeMap::new();
179        let mut available_voting_power = 0;
180        for (name, _, (version, resp, _elapsed)) in &object_data.responses {
181            let stake = committee.get(name).unwrap();
182            let key = match resp {
183                Ok(r) => {
184                    let obj_digest = r.object.compute_object_reference().2;
185                    let parent_tx_digest = r.object.previous_transaction;
186                    let owner = r.object.owner.clone();
187                    let lock = r.lock_for_debugging.as_ref().map(|lock| *lock.digest());
188                    if lock.is_none() {
189                        available_voting_power += stake;
190                    }
191                    Some((*version, obj_digest, parent_tx_digest, owner, lock))
192                }
193                Err(_) => None,
194            };
195            let entry = grouped_results.entry(key.clone()).or_insert_with(Vec::new);
196            entry.push(*name);
197            let entry: &mut u64 = voting_power.entry(key).or_default();
198            *entry += stake;
199        }
200        let voting_power = voting_power
201            .into_iter()
202            .sorted_by(|(_, v1), (_, v2)| Ord::cmp(v2, v1))
203            .collect::<Vec<_>>();
204        let mut fully_locked = false;
205        if !voting_power.is_empty()
206            && voting_power.first().unwrap().1 + available_voting_power < QUORUM_THRESHOLD
207        {
208            fully_locked = true;
209        }
210        Self {
211            grouped_results,
212            voting_power,
213            available_voting_power,
214            fully_locked,
215        }
216    }
217}
218
219#[allow(clippy::format_in_format_args)]
220impl std::fmt::Display for GroupedObjectOutput {
221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222        writeln!(f, "available stake: {}", self.available_voting_power)?;
223        writeln!(f, "fully locked: {}", self.fully_locked)?;
224        writeln!(f, "{:<100}\n", "-".repeat(100))?;
225        for (key, stake) in &self.voting_power {
226            let val = self.grouped_results.get(key).unwrap();
227            writeln!(f, "total stake: {stake}")?;
228            match key {
229                Some((_version, obj_digest, parent_tx_digest, owner, lock)) => {
230                    let lock = lock.opt_debug("no-known-lock");
231                    writeln!(f, "obj ref: {obj_digest}")?;
232                    writeln!(f, "parent tx: {parent_tx_digest}")?;
233                    writeln!(f, "owner: {owner}")?;
234                    writeln!(f, "lock: {lock}")?;
235                    for (i, name) in val.iter().enumerate() {
236                        writeln!(f, "        {:<4} {:<20}", i, name.concise(),)?;
237                    }
238                }
239                None => {
240                    writeln!(f, "ERROR")?;
241                    for (i, name) in val.iter().enumerate() {
242                        writeln!(f, "        {:<4} {:<20}", i, name.concise(),)?;
243                    }
244                }
245            };
246            writeln!(f, "{:<100}\n", "-".repeat(100))?;
247        }
248        Ok(())
249    }
250}
251
252struct ConciseObjectOutput(ObjectData);
253
254impl ConciseObjectOutput {
255    fn header() -> String {
256        format!(
257            "{:<20} {:<8} {:<66} {:<45} {}",
258            "validator", "version", "digest", "parent_cert", "owner"
259        )
260    }
261}
262
263impl std::fmt::Display for ConciseObjectOutput {
264    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265        for (name, _multi_addr, (version, resp, _time_elapsed)) in &self.0.responses {
266            write!(
267                f,
268                "{:<20} {:<8}",
269                format!("{:?}", name.concise()),
270                version.map(|s| s.value()).opt_debug("-")
271            )?;
272            match resp {
273                Err(_) => writeln!(
274                    f,
275                    "{:<66} {:<45} {:<51}",
276                    "object-fetch-failed", "no-cert-available", "no-owner-available"
277                )?,
278                Ok(resp) => {
279                    let obj_digest = resp.object.compute_object_reference().2;
280                    let parent = resp.object.previous_transaction;
281                    let owner = resp.object.owner.clone();
282                    write!(f, " {:<66} {:<45} {:<51}", obj_digest, parent, owner)?;
283                }
284            }
285            writeln!(f)?;
286        }
287        Ok(())
288    }
289}
290
291struct VerboseObjectOutput(ObjectData);
292
293impl std::fmt::Display for VerboseObjectOutput {
294    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295        writeln!(f, "Object: {}", self.0.requested_id)?;
296
297        for (name, multiaddr, (version, resp, timespent)) in &self.0.responses {
298            writeln!(f, "validator: {:?}, addr: {:?}", name.concise(), multiaddr)?;
299            writeln!(
300                f,
301                "-- version: {} ({:.3}s)",
302                version.opt_debug("<version not available>"),
303                timespent,
304            )?;
305
306            match resp {
307                Err(e) => writeln!(f, "Error fetching object: {}", e)?,
308                Ok(resp) => {
309                    writeln!(
310                        f,
311                        "  -- object digest: {}",
312                        resp.object.compute_object_reference().2
313                    )?;
314                    if resp.object.is_package() {
315                        writeln!(f, "  -- object: <Move Package>")?;
316                    } else if let Some(layout) = &resp.layout {
317                        writeln!(
318                            f,
319                            "  -- object: Move Object: {}",
320                            resp.object
321                                .data
322                                .try_as_move()
323                                .unwrap()
324                                .to_move_struct(layout)
325                                .unwrap()
326                        )?;
327                    }
328                    writeln!(f, "  -- owner: {}", resp.object.owner)?;
329                    writeln!(
330                        f,
331                        "  -- locked by: {}",
332                        resp.lock_for_debugging.opt_debug("<not locked>")
333                    )?;
334                }
335            }
336        }
337        Ok(())
338    }
339}
340
341pub async fn get_object(
342    obj_id: ObjectID,
343    version: Option<u64>,
344    validator: Option<AuthorityName>,
345    clients: Arc<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>>,
346) -> Result<ObjectData> {
347    let responses = join_all(
348        clients
349            .iter()
350            .filter(|(name, _)| {
351                if let Some(v) = validator {
352                    v == **name
353                } else {
354                    true
355                }
356            })
357            .map(|(name, (address, client))| async {
358                let object_version = get_object_impl(client, obj_id, version).await;
359                (*name, address.clone(), object_version)
360            }),
361    )
362    .await;
363
364    Ok(ObjectData {
365        requested_id: obj_id,
366        responses,
367    })
368}
369
370pub async fn get_transaction_block(
371    tx_digest: TransactionDigest,
372    show_input_tx: bool,
373    fullnode_rpc: String,
374) -> Result<String> {
375    let sui_client = Client::new(fullnode_rpc)?;
376    let clients = make_clients(&sui_client).await?;
377    let timer = Instant::now();
378    let responses = join_all(clients.iter().map(|(name, (address, client))| async {
379        let result = client
380            .handle_transaction_info_request(TransactionInfoRequest {
381                transaction_digest: tx_digest,
382            })
383            .await;
384        (
385            *name,
386            address.clone(),
387            result,
388            timer.elapsed().as_secs_f64(),
389        )
390    }))
391    .await;
392
393    // Grab one validator that return Some(TransactionInfoResponse)
394    let validator_aware_of_tx = responses.iter().find(|r| r.2.is_ok());
395
396    let responses = responses
397        .iter()
398        .map(|r| {
399            let key =
400                r.2.as_ref()
401                    .map(|ok_result| match &ok_result.status {
402                        TransactionStatus::Signed(_) => None,
403                        TransactionStatus::Executed(_, effects, _) => Some(effects.digest()),
404                    })
405                    .ok();
406            let err = r.2.as_ref().err();
407            (key, err, r)
408        })
409        .sorted_by(|(k1, err1, _), (k2, err2, _)| {
410            Ord::cmp(k1, k2).then_with(|| Ord::cmp(err1, err2))
411        })
412        .chunk_by(|(_, _err, r)| {
413            r.2.as_ref().map(|ok_result| match &ok_result.status {
414                TransactionStatus::Signed(_) => None,
415                TransactionStatus::Executed(_, effects, _) => Some((
416                    ok_result.transaction.transaction_data(),
417                    effects.data(),
418                    effects.digest(),
419                )),
420            })
421        });
422    let mut s = String::new();
423    for (i, (key, group)) in responses.into_iter().enumerate() {
424        match key {
425            Ok(Some((tx, effects, effects_digest))) => {
426                writeln!(
427                    &mut s,
428                    "#{:<2} tx_digest: {:<68?} effects_digest: {:?}",
429                    i, tx_digest, effects_digest,
430                )?;
431                writeln!(&mut s, "{:#?}", effects)?;
432                if show_input_tx {
433                    writeln!(&mut s, "{:#?}", tx)?;
434                }
435            }
436            Ok(None) => {
437                writeln!(
438                    &mut s,
439                    "#{:<2} tx_digest: {:<68?} Signed but not executed",
440                    i, tx_digest
441                )?;
442                if show_input_tx {
443                    // In this case, we expect at least one validator knows about this tx
444                    let validator_aware_of_tx = validator_aware_of_tx.unwrap();
445                    let client = &clients.get(&validator_aware_of_tx.0).unwrap().1;
446                    let tx_info = client.handle_transaction_info_request(TransactionInfoRequest {
447                        transaction_digest: tx_digest,
448                    }).await.unwrap_or_else(|e| panic!("Validator {:?} should have known about tx_digest: {:?}, got error: {:?}", validator_aware_of_tx.0, tx_digest, e));
449                    writeln!(&mut s, "{:#?}", tx_info)?;
450                }
451            }
452            other => {
453                writeln!(&mut s, "#{:<2} {:#?}", i, other)?;
454            }
455        }
456        for (j, (_, _, res)) in group.enumerate() {
457            writeln!(
458                &mut s,
459                "        {:<4} {:<20} {:<56} ({:.3}s)",
460                j,
461                res.0.concise(),
462                format!("{}", res.1),
463                res.3
464            )?;
465        }
466        writeln!(&mut s, "{:<100}\n", "-".repeat(100))?;
467    }
468    Ok(s)
469}
470
471async fn get_object_impl(
472    client: &NetworkAuthorityClient,
473    id: ObjectID,
474    version: Option<u64>,
475) -> (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64) {
476    let start = Instant::now();
477    let resp = client
478        .handle_object_info_request(ObjectInfoRequest {
479            object_id: id,
480            generate_layout: LayoutGenerationOption::Generate,
481            request_kind: match version {
482                None => ObjectInfoRequestKind::LatestObjectInfo,
483                Some(v) => ObjectInfoRequestKind::PastObjectInfoDebug(SequenceNumber::from_u64(v)),
484            },
485        })
486        .await
487        .map_err(anyhow::Error::from);
488    let elapsed = start.elapsed().as_secs_f64();
489
490    let resp_version = resp.as_ref().ok().map(|r| r.object.version().value());
491    (resp_version.map(SequenceNumber::from), resp, elapsed)
492}
493
494pub(crate) fn make_anemo_config() -> anemo_cli::Config {
495    use sui_network::discovery::*;
496    use sui_network::state_sync::*;
497
498    // TODO: implement `ServiceInfo` generation in anemo-build and use here.
499    anemo_cli::Config::new()
500        // Sui discovery
501        .add_service(
502            "Discovery",
503            anemo_cli::ServiceInfo::new().add_method(
504                "GetKnownPeersV2",
505                anemo_cli::ron_method!(DiscoveryClient, get_known_peers_v2, ()),
506            ),
507        )
508        // Sui state sync
509        .add_service(
510            "StateSync",
511            anemo_cli::ServiceInfo::new()
512                .add_method(
513                    "PushCheckpointSummary",
514                    anemo_cli::ron_method!(
515                        StateSyncClient,
516                        push_checkpoint_summary,
517                        sui_types::messages_checkpoint::CertifiedCheckpointSummary
518                    ),
519                )
520                .add_method(
521                    "GetCheckpointSummary",
522                    anemo_cli::ron_method!(
523                        StateSyncClient,
524                        get_checkpoint_summary,
525                        GetCheckpointSummaryRequest
526                    ),
527                )
528                .add_method(
529                    "GetCheckpointContents",
530                    anemo_cli::ron_method!(
531                        StateSyncClient,
532                        get_checkpoint_contents,
533                        sui_types::messages_checkpoint::CheckpointContentsDigest
534                    ),
535                )
536                .add_method(
537                    "GetCheckpointAvailability",
538                    anemo_cli::ron_method!(StateSyncClient, get_checkpoint_availability, ()),
539                ),
540        )
541}
542
543fn copy_dir_all(
544    src: impl AsRef<Path>,
545    dst: impl AsRef<Path>,
546    skip: Vec<PathBuf>,
547) -> io::Result<()> {
548    fs::create_dir_all(&dst)?;
549    for entry in fs::read_dir(src)? {
550        let entry = entry?;
551        let ty = entry.file_type()?;
552        if skip.contains(&entry.path()) {
553            continue;
554        }
555        if ty.is_dir() {
556            copy_dir_all(
557                entry.path(),
558                dst.as_ref().join(entry.file_name()),
559                skip.clone(),
560            )?;
561        } else {
562            fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
563        }
564    }
565    Ok(())
566}
567
568pub async fn restore_from_db_checkpoint(
569    config: &NodeConfig,
570    db_checkpoint_path: &Path,
571) -> Result<(), anyhow::Error> {
572    copy_dir_all(db_checkpoint_path, config.db_path(), vec![])?;
573    Ok(())
574}
575
576fn start_summary_sync(
577    perpetual_db: Arc<AuthorityPerpetualTables>,
578    committee_store: Arc<CommitteeStore>,
579    checkpoint_store: Arc<CheckpointStore>,
580    m: MultiProgress,
581    genesis: Genesis,
582    ingestion_url: String,
583    num_parallel_downloads: usize,
584    verify: bool,
585    end_of_epoch_checkpoint_seq_nums: Vec<u64>,
586) -> JoinHandle<Result<(), anyhow::Error>> {
587    tokio::spawn(async move {
588        let store = AuthorityStore::open_no_genesis(perpetual_db, false, &Registry::default())?;
589        let cache_traits = build_execution_cache_from_env(&Registry::default(), &store);
590        let state_sync_store =
591            RocksDbStore::new(cache_traits, committee_store, checkpoint_store.clone());
592        // Only insert the genesis checkpoint if the DB is empty and doesn't have it already
593        if checkpoint_store
594            .get_checkpoint_by_digest(genesis.checkpoint().digest())
595            .unwrap()
596            .is_none()
597        {
598            checkpoint_store.insert_checkpoint_contents(genesis.checkpoint_contents().clone())?;
599            checkpoint_store.insert_verified_checkpoint(&genesis.checkpoint())?;
600            checkpoint_store.update_highest_synced_checkpoint(&genesis.checkpoint())?;
601        }
602
603        let last_checkpoint = end_of_epoch_checkpoint_seq_nums
604            .last()
605            .expect("Expected at least one checkpoint");
606
607        let num_to_sync = end_of_epoch_checkpoint_seq_nums.len() as u64;
608        let sync_progress_bar = m.add(
609            ProgressBar::new(num_to_sync).with_style(
610                ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})")
611                    .unwrap(),
612            ),
613        );
614
615        let cloned_progress_bar = sync_progress_bar.clone();
616        let sync_checkpoint_counter = Arc::new(AtomicU64::new(0));
617        let s_instant = Instant::now();
618
619        let cloned_counter = sync_checkpoint_counter.clone();
620        let latest_synced = checkpoint_store
621            .get_highest_synced_checkpoint()?
622            .map(|c| c.sequence_number)
623            .unwrap_or(0);
624        let s_start = latest_synced
625            .checked_add(1)
626            .wrap_err("Checkpoint overflow")
627            .map_err(|_| anyhow!("Failed to increment checkpoint"))?;
628        tokio::spawn(async move {
629            loop {
630                if cloned_progress_bar.is_finished() {
631                    break;
632                }
633                let num_summaries = cloned_counter.load(Ordering::Relaxed);
634                let total_checkpoints_per_sec =
635                    num_summaries as f64 / s_instant.elapsed().as_secs_f64();
636                cloned_progress_bar.set_position(s_start + num_summaries);
637                cloned_progress_bar.set_message(format!(
638                    "checkpoints synced per sec: {}",
639                    total_checkpoints_per_sec
640                ));
641                tokio::time::sleep(Duration::from_secs(1)).await;
642            }
643        });
644
645        read_summaries_for_list_no_verify(
646            ingestion_url,
647            num_parallel_downloads,
648            state_sync_store.clone(),
649            end_of_epoch_checkpoint_seq_nums.clone(),
650            sync_checkpoint_counter,
651        )
652        .await?;
653        sync_progress_bar.finish_with_message("Checkpoint summary sync is complete");
654        info!("Checkpoint summary sync is complete");
655
656        let checkpoint = checkpoint_store
657            .get_checkpoint_by_sequence_number(*last_checkpoint)?
658            .ok_or(anyhow!("Failed to read last checkpoint"))?;
659        if verify {
660            let verify_progress_bar = m.add(
661                ProgressBar::new(num_to_sync).with_style(
662                    ProgressStyle::with_template(
663                        "[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})",
664                    )
665                    .unwrap(),
666                ),
667            );
668            let cloned_verify_progress_bar = verify_progress_bar.clone();
669            let verify_checkpoint_counter = Arc::new(AtomicU64::new(0));
670            let cloned_verify_counter = verify_checkpoint_counter.clone();
671            let v_instant = Instant::now();
672
673            tokio::spawn(async move {
674                loop {
675                    if cloned_verify_progress_bar.is_finished() {
676                        break;
677                    }
678                    let num_summaries = cloned_verify_counter.load(Ordering::Relaxed);
679                    let total_checkpoints_per_sec =
680                        num_summaries as f64 / v_instant.elapsed().as_secs_f64();
681                    cloned_verify_progress_bar.set_position(num_summaries);
682                    cloned_verify_progress_bar.set_message(format!(
683                        "checkpoints verified per sec: {}",
684                        total_checkpoints_per_sec
685                    ));
686                    tokio::time::sleep(Duration::from_secs(1)).await;
687                }
688            });
689
690            for (cp_epoch, epoch_last_cp_seq_num) in
691                end_of_epoch_checkpoint_seq_nums.iter().enumerate()
692            {
693                let epoch_last_checkpoint = checkpoint_store
694                    .get_checkpoint_by_sequence_number(*epoch_last_cp_seq_num)?
695                    .ok_or(anyhow!("Failed to read checkpoint"))?;
696                let committee = state_sync_store.get_committee(cp_epoch as u64).expect(
697                    "Expected committee to exist after syncing all end of epoch checkpoints",
698                );
699                epoch_last_checkpoint
700                    .verify_authority_signatures(&committee)
701                    .expect("Failed to verify checkpoint");
702                verify_checkpoint_counter.fetch_add(1, Ordering::Relaxed);
703            }
704
705            verify_progress_bar.finish_with_message("Checkpoint summary verification is complete");
706        }
707
708        checkpoint_store.update_highest_verified_checkpoint(&checkpoint)?;
709        checkpoint_store.update_highest_synced_checkpoint(&checkpoint)?;
710        checkpoint_store.update_highest_executed_checkpoint(&checkpoint)?;
711        checkpoint_store.update_highest_pruned_checkpoint(&checkpoint)?;
712        Ok::<(), anyhow::Error>(())
713    })
714}
715
716pub async fn get_latest_available_epoch(
717    snapshot_store_config: &ObjectStoreConfig,
718) -> Result<u64, anyhow::Error> {
719    let remote_object_store = if snapshot_store_config.no_sign_request {
720        snapshot_store_config.make_http()?
721    } else {
722        snapshot_store_config.make().map(Arc::new)?
723    };
724    let manifest_contents = remote_object_store
725        .get_bytes(&get_path(MANIFEST_FILENAME))
726        .await?;
727    let root_manifest: Manifest = serde_json::from_slice(&manifest_contents)
728        .map_err(|err| anyhow!("Error parsing MANIFEST from bytes: {}", err))?;
729    let epoch = root_manifest
730        .available_epochs
731        .iter()
732        .max()
733        .ok_or(anyhow!("No snapshot found in manifest"))?;
734    Ok(*epoch)
735}
736
737pub async fn check_completed_snapshot(
738    snapshot_store_config: &ObjectStoreConfig,
739    epoch: EpochId,
740) -> Result<(), anyhow::Error> {
741    let success_marker = format!("epoch_{}/_SUCCESS", epoch);
742    let archive_success_marker = format!("archive/epoch_{}/_SUCCESS", epoch);
743    let remote_object_store = if snapshot_store_config.no_sign_request {
744        snapshot_store_config.make_http()?
745    } else {
746        snapshot_store_config.make().map(Arc::new)?
747    };
748
749    // Check regular location first, then archive location
750    if exists(&remote_object_store, &get_path(success_marker.as_str())).await
751        || exists(
752            &remote_object_store,
753            &get_path(archive_success_marker.as_str()),
754        )
755        .await
756    {
757        Ok(())
758    } else {
759        Err(anyhow!(
760            "missing success marker at {}/{} or {}/{}",
761            snapshot_store_config.bucket.as_ref().unwrap_or(
762                &snapshot_store_config
763                    .clone()
764                    .aws_endpoint
765                    .unwrap_or("unknown_bucket".to_string())
766            ),
767            success_marker,
768            snapshot_store_config.bucket.as_ref().unwrap_or(
769                &snapshot_store_config
770                    .clone()
771                    .aws_endpoint
772                    .unwrap_or("unknown_bucket".to_string())
773            ),
774            archive_success_marker
775        ))
776    }
777}
778
779pub async fn download_formal_snapshot(
780    path: &Path,
781    epoch: EpochId,
782    genesis: &Path,
783    snapshot_store_config: ObjectStoreConfig,
784    ingestion_url: &str,
785    num_parallel_downloads: usize,
786    num_parallel_chunks: usize,
787    network: Chain,
788    verify: SnapshotVerifyMode,
789    max_retries: usize,
790) -> Result<(), anyhow::Error> {
791    let m = MultiProgress::new();
792    let msg = format!(
793        "Beginning formal snapshot restore to end of epoch {}, network: {:?}, verification mode: {:?}",
794        epoch, network, verify
795    );
796    m.println(&msg).unwrap();
797    info!("{}", msg);
798
799    let path = path.join("staging").to_path_buf();
800    if path.exists() {
801        fs::remove_dir_all(path.clone())?;
802    }
803
804    // Start prometheus server so that we can serve metrics during snapshot download
805    let registry_service =
806        mysten_metrics::start_prometheus_server("127.0.0.1:9184".parse().unwrap());
807    let prometheus_registry = registry_service.default_registry();
808    DBMetrics::init(registry_service.clone());
809    mysten_metrics::init_metrics(&prometheus_registry);
810
811    let perpetual_db = Arc::new(AuthorityPerpetualTables::open(
812        &path.join("store"),
813        None,
814        None,
815    ));
816    let genesis = Genesis::load(genesis).unwrap();
817    let genesis_committee = genesis.committee();
818    let committee_store = Arc::new(CommitteeStore::new(
819        path.join("epochs"),
820        &genesis_committee,
821        None,
822    ));
823    let checkpoint_store = CheckpointStore::new(
824        &path.join("checkpoints"),
825        Arc::new(PrunerWatermarks::default()),
826    );
827
828    let end_of_epoch_checkpoint_seq_nums: Vec<_> = end_of_epoch_data(ingestion_url, vec![])
829        .await?
830        .into_iter()
831        .take((epoch + 1) as usize)
832        .collect();
833
834    let summaries_handle = start_summary_sync(
835        perpetual_db.clone(),
836        committee_store.clone(),
837        checkpoint_store.clone(),
838        m.clone(),
839        genesis.clone(),
840        ingestion_url.to_string(),
841        num_parallel_downloads,
842        verify != SnapshotVerifyMode::None,
843        end_of_epoch_checkpoint_seq_nums.clone(),
844    );
845
846    // Start transaction backfill in parallel with summary sync
847    let backfill_handle = {
848        let perpetual_db = perpetual_db.clone();
849        let ingestion_url = ingestion_url.to_string();
850        let m = m.clone();
851        let end_of_epoch_checkpoint_seq_nums = end_of_epoch_checkpoint_seq_nums.clone();
852        tokio::spawn(async move {
853            backfill_epoch_transaction_digests(
854                perpetual_db,
855                epoch,
856                ingestion_url,
857                num_parallel_downloads,
858                m,
859                end_of_epoch_checkpoint_seq_nums,
860            )
861            .await
862        })
863    };
864
865    let (_abort_handle, abort_registration) = AbortHandle::new_pair();
866    let perpetual_db_clone = perpetual_db.clone();
867    let snapshot_dir = path.parent().unwrap().join("snapshot");
868    if snapshot_dir.exists() {
869        fs::remove_dir_all(snapshot_dir.clone())?;
870    }
871    let snapshot_dir_clone = snapshot_dir.clone();
872
873    // TODO if verify is false, we should skip generating these and
874    // not pass in a channel to the reader
875    let (sender, mut receiver) = mpsc::channel(num_parallel_downloads);
876    let m_clone = m.clone();
877
878    let snapshot_handle = tokio::spawn(async move {
879        let local_store_config = ObjectStoreConfig {
880            object_store: Some(ObjectStoreType::File),
881            directory: Some(snapshot_dir_clone.to_path_buf()),
882            ..Default::default()
883        };
884        let mut reader = StateSnapshotReaderV1::new(
885            epoch,
886            &snapshot_store_config,
887            &local_store_config,
888            NonZeroUsize::new(num_parallel_downloads).unwrap(),
889            m_clone,
890            false, // skip_reset_local_store
891            max_retries,
892            num_parallel_chunks,
893        )
894        .await
895        .unwrap_or_else(|err| panic!("Failed to create reader: {}", err));
896        reader
897            .read(perpetual_db_clone.clone(), abort_registration, Some(sender))
898            .await
899            .unwrap_or_else(|err| panic!("Failed during read: {}", err));
900        info!("Snapshot download complete");
901        Ok::<(), anyhow::Error>(())
902    });
903    let mut root_global_state_hash = GlobalStateHash::default();
904    let mut num_live_objects = 0;
905    while let Some((partial_hash, num_objects)) = receiver.recv().await {
906        num_live_objects += num_objects;
907        root_global_state_hash.union(&partial_hash);
908    }
909    tokio::pin!(summaries_handle);
910    tokio::pin!(snapshot_handle);
911    tokio::pin!(backfill_handle);
912
913    let mut summaries_done = false;
914    let mut snapshot_done = false;
915    let mut backfill_done = false;
916
917    // Wait for summaries (required for verification) while monitoring other tasks for early failures
918    while !summaries_done {
919        tokio::select! {
920            result = &mut summaries_handle, if !summaries_done => {
921                summaries_done = true;
922                result.expect("Summaries task panicked")?;
923            }
924            result = &mut backfill_handle, if !backfill_done => {
925                backfill_done = true;
926                result.expect("Backfill task panicked")?;
927            }
928            result = &mut snapshot_handle, if !snapshot_done => {
929                snapshot_done = true;
930                result.expect("Snapshot task panicked")?;
931            }
932        }
933    }
934
935    let last_checkpoint = checkpoint_store
936        .get_highest_verified_checkpoint()?
937        .expect("Expected nonempty checkpoint store");
938
939    // Perform snapshot state verification
940    if verify != SnapshotVerifyMode::None {
941        assert_eq!(
942            last_checkpoint.epoch(),
943            epoch,
944            "Expected highest verified checkpoint ({}) to be for epoch {} but was for epoch {}",
945            last_checkpoint.sequence_number,
946            epoch,
947            last_checkpoint.epoch()
948        );
949        let commitment = last_checkpoint
950            .end_of_epoch_data
951            .as_ref()
952            .expect("Expected highest verified checkpoint to have end of epoch data")
953            .epoch_commitments
954            .last()
955            .expect(
956                "End of epoch has no commitments. This likely means that the epoch \
957                you are attempting to restore from does not support end of epoch state \
958                digest commitment. If restoring from mainnet, `--epoch` must be > 20, \
959                and for testnet, `--epoch` must be > 12.",
960            );
961        match commitment {
962            CheckpointCommitment::ECMHLiveObjectSetDigest(consensus_digest) => {
963                let local_digest: ECMHLiveObjectSetDigest = root_global_state_hash.digest().into();
964                assert_eq!(
965                    *consensus_digest, local_digest,
966                    "End of epoch {} root state digest {} does not match \
967                    local root state hash {} computed from snapshot data",
968                    epoch, consensus_digest.digest, local_digest.digest,
969                );
970                let progress_bar = m.add(
971                    ProgressBar::new(1).with_style(
972                        ProgressStyle::with_template(
973                            "[{elapsed_precise}] {wide_bar} Verifying snapshot contents against root state hash ({msg})",
974                        )
975                        .unwrap(),
976                    ),
977                );
978                progress_bar.finish_with_message("Verification complete");
979            }
980            _ => return Err(anyhow!("Expected ECMHLiveObjectSetDigest")),
981        };
982    } else {
983        m.println(
984            "WARNING: Skipping snapshot verification! \
985            This is highly discouraged unless you fully trust the source of this snapshot and its contents.
986            If this was unintentional, rerun with `--verify` set to `normal` or `strict`.",
987        )?;
988    }
989
990    // Wait for remaining tasks to complete
991    while !snapshot_done || !backfill_done {
992        tokio::select! {
993            result = &mut backfill_handle, if !backfill_done => {
994                backfill_done = true;
995                result.expect("Backfill task panicked")?;
996            }
997            result = &mut snapshot_handle, if !snapshot_done => {
998                snapshot_done = true;
999                result.expect("Snapshot task panicked")?;
1000            }
1001        }
1002    }
1003
1004    // TODO we should ensure this map is being updated for all end of epoch
1005    // checkpoints during summary sync. This happens in `insert_{verified|certified}_checkpoint`
1006    // in checkpoint store, but not in the corresponding functions in ObjectStore trait
1007    checkpoint_store.insert_epoch_last_checkpoint(epoch, &last_checkpoint)?;
1008
1009    setup_db_state(
1010        epoch,
1011        root_global_state_hash.clone(),
1012        perpetual_db.clone(),
1013        checkpoint_store.clone(),
1014        committee_store,
1015        network,
1016        verify == SnapshotVerifyMode::Strict,
1017        num_live_objects,
1018        m.clone(),
1019    )
1020    .await?;
1021
1022    // After a large backfill, rebuild the tidehunter control region to reclaim disk space
1023    // and reduce startup time. No-op when compiled without tidehunter.
1024    #[cfg(tidehunter)]
1025    perpetual_db
1026        .force_rebuild_control_region()
1027        .expect("Failed to rebuild tidehunter control region after snapshot restore");
1028
1029    let new_path = path.parent().unwrap().join("live");
1030    if new_path.exists() {
1031        fs::remove_dir_all(new_path.clone())?;
1032    }
1033    fs::rename(&path, &new_path)?;
1034    fs::remove_dir_all(snapshot_dir.clone())?;
1035    println!(
1036        "Successfully restored state from snapshot at end of epoch {}",
1037        epoch
1038    );
1039
1040    Ok(())
1041}
1042
1043async fn backfill_epoch_transaction_digests(
1044    perpetual_db: Arc<AuthorityPerpetualTables>,
1045    epoch: EpochId,
1046    ingestion_url: String,
1047    concurrency: usize,
1048    m: MultiProgress,
1049    end_of_epoch_checkpoint_seq_nums: Vec<u64>,
1050) -> Result<()> {
1051    if epoch == 0 {
1052        return Ok(());
1053    }
1054
1055    // Use end_of_epoch_checkpoint_seq_nums to get checkpoint ranges
1056    // we're backfilling up to the last checkpoint of the previous epoch
1057    // end_of_epoch_checkpoint_seq_nums[890] == end of epoch checkpoint for epoch_891
1058    // if restoring from epoch_891, we want to backfill checkpoints from end of epoch_890 to end of epoch_891
1059    // (when restoring from epoch_891, node will immediately start in epoch 892, so prev_epoch == epoch_891)
1060    let epoch_last_cp_seq = end_of_epoch_checkpoint_seq_nums
1061        .get(epoch as usize)
1062        .ok_or_else(|| anyhow!("No checkpoint sequence found for epoch {}", epoch))?;
1063
1064    let epoch_start_cp = if epoch == 0 {
1065        0
1066    } else {
1067        end_of_epoch_checkpoint_seq_nums
1068            .get(epoch as usize - 1)
1069            .map(|cp| cp + 1)
1070            .unwrap_or(0)
1071    };
1072    let msg = format!(
1073        "Beginning transaction digest backfill for epoch: {:?}, backfilling from: {:?}..{:?}",
1074        epoch, epoch_start_cp, epoch_last_cp_seq
1075    );
1076    m.println(&msg).ok();
1077    info!("{}", msg);
1078
1079    let checkpoints_to_fetch: Vec<_> = (epoch_start_cp..=*epoch_last_cp_seq).collect();
1080    let num_checkpoints = checkpoints_to_fetch.len();
1081
1082    let progress_bar = m.add(
1083        ProgressBar::new(num_checkpoints as u64).with_style(
1084            ProgressStyle::with_template(
1085                "[{elapsed_precise}] {wide_bar} {pos}/{len} transactions backfilled ({msg})",
1086            )
1087            .unwrap(),
1088        ),
1089    );
1090
1091    let client = build_object_store(&ingestion_url, vec![]);
1092    let checkpoint_counter = Arc::new(AtomicU64::new(0));
1093    let tx_counter = Arc::new(AtomicU64::new(0));
1094    let cloned_checkpoint_counter = checkpoint_counter.clone();
1095    let cloned_progress_bar = progress_bar.clone();
1096    let start_instant = Instant::now();
1097
1098    tokio::spawn(async move {
1099        loop {
1100            if cloned_progress_bar.is_finished() {
1101                break;
1102            }
1103            let num_checkpoints_processed = cloned_checkpoint_counter.load(Ordering::Relaxed);
1104            let elapsed = start_instant.elapsed().as_secs_f64();
1105            let chkpts_per_sec = if elapsed > 0.0 {
1106                num_checkpoints_processed as f64 / elapsed
1107            } else {
1108                0.0
1109            };
1110            cloned_progress_bar.set_position(num_checkpoints_processed);
1111            cloned_progress_bar.set_message(format!("{:.1} chkpts/sec", chkpts_per_sec));
1112            tokio::time::sleep(Duration::from_millis(100)).await;
1113        }
1114    });
1115
1116    futures::stream::iter(checkpoints_to_fetch)
1117        .map(|sq| {
1118            let client = client.clone();
1119            async move {
1120                fetch_checkpoint(&client, sq)
1121                    .await
1122                    .map(|c| Arc::new(CheckpointData::from(c)))
1123            }
1124        })
1125        .buffer_unordered(concurrency)
1126        .try_for_each(|checkpoint| {
1127            let perpetual_db = perpetual_db.clone();
1128            let tx_counter = tx_counter.clone();
1129            let checkpoint_counter = checkpoint_counter.clone();
1130            let checkpoint_data = checkpoint;
1131
1132            async move {
1133                let tx_digests: Vec<_> = checkpoint_data
1134                    .transactions
1135                    .iter()
1136                    .map(|tx_data| *tx_data.transaction.digest())
1137                    .collect();
1138                let num_txs = tx_digests.len();
1139                perpetual_db
1140                    .insert_executed_transaction_digests_batch(epoch, tx_digests.into_iter())?;
1141                tx_counter.fetch_add(num_txs as u64, Ordering::Relaxed);
1142                checkpoint_counter.fetch_add(1, Ordering::Relaxed);
1143                Ok::<(), anyhow::Error>(())
1144            }
1145        })
1146        .await?;
1147
1148    let tx_count = tx_counter.load(Ordering::Relaxed);
1149    progress_bar.finish_with_message(format!(
1150        "Backfill complete: {} transactions from {} checkpoints",
1151        tx_count, num_checkpoints
1152    ));
1153    info!(
1154        "Backfill complete: {} transactions from {} checkpoints",
1155        tx_counter.load(Ordering::Relaxed),
1156        checkpoint_counter.load(Ordering::Relaxed)
1157    );
1158
1159    Ok(())
1160}
1161
1162pub async fn download_db_snapshot(
1163    path: &Path,
1164    epoch: u64,
1165    snapshot_store_config: ObjectStoreConfig,
1166    skip_indexes: bool,
1167    num_parallel_downloads: usize,
1168    max_retries: usize,
1169) -> Result<(), anyhow::Error> {
1170    let remote_store = if snapshot_store_config.no_sign_request {
1171        snapshot_store_config.make_http()?
1172    } else {
1173        snapshot_store_config.make().map(Arc::new)?
1174    };
1175
1176    // We rely on the top level MANIFEST file which contains all valid epochs
1177    let manifest_contents = remote_store.get_bytes(&get_path(MANIFEST_FILENAME)).await?;
1178    let root_manifest: Manifest = serde_json::from_slice(&manifest_contents)
1179        .map_err(|err| anyhow!("Error parsing MANIFEST from bytes: {}", err))?;
1180
1181    if !root_manifest.epoch_exists(epoch) {
1182        return Err(anyhow!(
1183            "Epoch dir {} doesn't exist on the remote store",
1184            epoch
1185        ));
1186    }
1187
1188    let epoch_path = format!("epoch_{}", epoch);
1189    let epoch_dir = get_path(&epoch_path);
1190
1191    let manifest_file = epoch_dir.child(MANIFEST_FILENAME);
1192    let epoch_manifest_contents =
1193        String::from_utf8(remote_store.get_bytes(&manifest_file).await?.to_vec())
1194            .map_err(|err| anyhow!("Error parsing {}/MANIFEST from bytes: {}", epoch_path, err))?;
1195
1196    let epoch_manifest =
1197        PerEpochManifest::deserialize_from_newline_delimited(&epoch_manifest_contents);
1198
1199    let mut files: Vec<String> = vec![];
1200    files.extend(epoch_manifest.filter_by_prefix("store/perpetual").lines);
1201    files.extend(epoch_manifest.filter_by_prefix("epochs").lines);
1202    files.extend(epoch_manifest.filter_by_prefix("checkpoints").lines);
1203    if !skip_indexes {
1204        files.extend(epoch_manifest.filter_by_prefix("indexes").lines)
1205    }
1206    let local_store = ObjectStoreConfig {
1207        object_store: Some(ObjectStoreType::File),
1208        directory: Some(path.to_path_buf()),
1209        ..Default::default()
1210    }
1211    .make()?;
1212    let m = MultiProgress::new();
1213    let path = path.to_path_buf();
1214    let snapshot_handle = tokio::spawn(async move {
1215        let progress_bar = m.add(
1216            ProgressBar::new(files.len() as u64).with_style(
1217                ProgressStyle::with_template(
1218                    "[{elapsed_precise}] {wide_bar} {pos} out of {len} files done ({msg})",
1219                )
1220                .unwrap(),
1221            ),
1222        );
1223        let cloned_progress_bar = progress_bar.clone();
1224        let file_counter = Arc::new(AtomicUsize::new(0));
1225        futures::stream::iter(files.iter())
1226            .map(|file| {
1227                let local_store = local_store.clone();
1228                let remote_store = remote_store.clone();
1229                let counter_cloned = file_counter.clone();
1230                async move {
1231                    counter_cloned.fetch_add(1, Ordering::Relaxed);
1232                    let file_path = get_path(format!("epoch_{}/{}", epoch, file).as_str());
1233
1234                    let mut attempts = 0;
1235                    let max_attempts = max_retries + 1;
1236                    loop {
1237                        attempts += 1;
1238                        match copy_file(&file_path, &file_path, &remote_store, &local_store).await {
1239                            Ok(()) => break,
1240                            Err(e) if attempts >= max_attempts => {
1241                                return Err(anyhow::anyhow!(
1242                                    "Failed to download {} after {} attempts: {}",
1243                                    file_path,
1244                                    attempts,
1245                                    e
1246                                ));
1247                            }
1248                            Err(e) => {
1249                                tracing::warn!(
1250                                    "Failed to download {} (attempt {}/{}): {}, retrying in {}ms",
1251                                    file_path,
1252                                    attempts,
1253                                    max_attempts,
1254                                    e,
1255                                    1000 * attempts
1256                                );
1257                                tokio::time::sleep(Duration::from_millis(1000 * attempts as u64))
1258                                    .await;
1259                            }
1260                        }
1261                    }
1262
1263                    Ok::<::object_store::path::Path, anyhow::Error>(file_path.clone())
1264                }
1265            })
1266            .boxed()
1267            .buffer_unordered(num_parallel_downloads)
1268            .try_for_each(|path| {
1269                file_counter.fetch_sub(1, Ordering::Relaxed);
1270                cloned_progress_bar.inc(1);
1271                cloned_progress_bar.set_message(format!(
1272                    "Downloading file: {}, #downloads_in_progress: {}",
1273                    path,
1274                    file_counter.load(Ordering::Relaxed)
1275                ));
1276                futures::future::ready(Ok(()))
1277            })
1278            .await?;
1279        progress_bar.finish_with_message("Snapshot file download is complete");
1280        Ok::<(), anyhow::Error>(())
1281    });
1282
1283    let tasks: Vec<_> = vec![Box::pin(snapshot_handle)];
1284    join_all(tasks)
1285        .await
1286        .into_iter()
1287        .collect::<Result<Vec<_>, _>>()?
1288        .into_iter()
1289        .for_each(|result| result.expect("Task failed"));
1290
1291    let store_dir = path.join("store");
1292    if store_dir.exists() {
1293        fs::remove_dir_all(&store_dir)?;
1294    }
1295    let epochs_dir = path.join("epochs");
1296    if epochs_dir.exists() {
1297        fs::remove_dir_all(&epochs_dir)?;
1298    }
1299    Ok(())
1300}