sui_tool/
lib.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use fastcrypto::traits::ToFromBytes;
7use futures::future::AbortHandle;
8use futures::future::join_all;
9use itertools::Itertools;
10use std::collections::BTreeMap;
11use std::fmt::Write;
12use std::net::{IpAddr, Ipv4Addr, SocketAddr};
13use std::num::NonZeroUsize;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Duration;
18use std::{fs, io};
19use sui_config::{NodeConfig, genesis::Genesis};
20use sui_core::authority_client::{AuthorityAPI, NetworkAuthorityClient};
21use sui_core::execution_cache::build_execution_cache_from_env;
22use sui_network::default_mysten_network_config;
23use sui_protocol_config::Chain;
24use sui_rpc_api::Client;
25use sui_storage::object_store::http::HttpDownloaderBuilder;
26use sui_storage::object_store::util::MANIFEST_FILENAME;
27use sui_storage::object_store::util::Manifest;
28use sui_storage::object_store::util::{build_object_store, end_of_epoch_data, fetch_checkpoint};
29use sui_types::committee::QUORUM_THRESHOLD;
30use sui_types::crypto::AuthorityPublicKeyBytes;
31use sui_types::digests::ChainIdentifier;
32use sui_types::global_state_hash::GlobalStateHash;
33use sui_types::messages_grpc::LayoutGenerationOption;
34use sui_types::multiaddr::Multiaddr;
35use sui_types::{base_types::*, object::Owner};
36use tokio::sync::mpsc;
37use tokio::task::JoinHandle;
38use tokio::time::Instant;
39
40use anyhow::anyhow;
41use clap::ValueEnum;
42use eyre::ContextCompat;
43use fastcrypto::hash::MultisetHash;
44use futures::{StreamExt, TryStreamExt};
45use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
46use prometheus::Registry;
47use serde::{Deserialize, Serialize};
48use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
49use sui_core::authority::AuthorityStore;
50use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
51use sui_core::checkpoints::CheckpointStore;
52use sui_core::epoch::committee_store::CommitteeStore;
53use sui_core::storage::RocksDbStore;
54use sui_snapshot::reader::StateSnapshotReaderV1;
55use sui_snapshot::setup_db_state;
56use sui_storage::object_store::ObjectStoreGetExt;
57use sui_storage::object_store::util::{exists, get_path};
58use sui_types::full_checkpoint_content::CheckpointData;
59use sui_types::messages_checkpoint::{CheckpointCommitment, ECMHLiveObjectSetDigest};
60use sui_types::messages_grpc::{
61    ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest,
62    TransactionStatus,
63};
64
65use crate::formal_snapshot_util::read_summaries_for_list_no_verify;
66use sui_core::authority::authority_store_pruner::PrunerWatermarks;
67use sui_types::storage::ReadStore;
68use tracing::info;
69use typed_store::DBMetrics;
70
71pub mod commands;
72pub mod db_tool;
73mod formal_snapshot_util;
74#[cfg(all(feature = "tideconsole", not(windows)))]
75pub mod tideconsole_cmd;
76
77#[derive(
78    Clone, Serialize, Deserialize, Debug, PartialEq, Copy, PartialOrd, Ord, Eq, ValueEnum, Default,
79)]
80pub enum SnapshotVerifyMode {
81    /// verification of both db state and downloaded checkpoints are skipped.
82    /// This is the fastest mode, but is unsafe, and thus should only be used
83    /// if you fully trust the source for both the snapshot and the checkpoint
84    /// archive.
85    None,
86    /// verify snapshot state during download, but no post-restore db verification.
87    /// Checkpoint verification is performed.
88    #[default]
89    Normal,
90    /// In ADDITION to the behavior of `--verify normal`, verify db state post-restore
91    /// against the end of epoch state root commitment.
92    Strict,
93}
94
95// This functions requires at least one of genesis or fullnode_rpc to be `Some`.
96async fn make_clients(
97    sui_client: &Client,
98) -> Result<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>> {
99    let mut net_config = default_mysten_network_config();
100    net_config.connect_timeout = Some(Duration::from_secs(5));
101    let mut authority_clients = BTreeMap::new();
102
103    let active_validators = sui_client
104        .get_system_state_summary(None)
105        .await?
106        .active_validators;
107
108    for validator in active_validators {
109        let net_addr = Multiaddr::try_from(validator.net_address)
110            .unwrap()
111            .rewrite_http_to_https();
112        let tls_config = sui_tls::create_rustls_client_config(
113            sui_types::crypto::NetworkPublicKey::from_bytes(&validator.network_pubkey_bytes)?,
114            sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
115            None,
116        );
117        let channel = net_config
118            .connect_lazy(&net_addr, tls_config)
119            .map_err(|err| anyhow!(err.to_string()))?;
120        let client = NetworkAuthorityClient::new(channel);
121        let public_key_bytes =
122            AuthorityPublicKeyBytes::from_bytes(&validator.protocol_pubkey_bytes)?;
123        authority_clients.insert(public_key_bytes, (net_addr.clone(), client));
124    }
125
126    Ok(authority_clients)
127}
128
129type ObjectVersionResponses = (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64);
130pub struct ObjectData {
131    requested_id: ObjectID,
132    responses: Vec<(AuthorityName, Multiaddr, ObjectVersionResponses)>,
133}
134
135trait OptionDebug<T> {
136    fn opt_debug(&self, def_str: &str) -> String;
137}
138
139impl<T> OptionDebug<T> for Option<T>
140where
141    T: std::fmt::Debug,
142{
143    fn opt_debug(&self, def_str: &str) -> String {
144        match self {
145            None => def_str.to_string(),
146            Some(t) => format!("{:?}", t),
147        }
148    }
149}
150
151#[allow(clippy::type_complexity)]
152pub struct GroupedObjectOutput {
153    pub grouped_results: BTreeMap<
154        Option<(
155            Option<SequenceNumber>,
156            ObjectDigest,
157            TransactionDigest,
158            Owner,
159            Option<TransactionDigest>,
160        )>,
161        Vec<AuthorityName>,
162    >,
163    pub voting_power: Vec<(
164        Option<(
165            Option<SequenceNumber>,
166            ObjectDigest,
167            TransactionDigest,
168            Owner,
169            Option<TransactionDigest>,
170        )>,
171        u64,
172    )>,
173    pub available_voting_power: u64,
174    pub fully_locked: bool,
175}
176
177impl GroupedObjectOutput {
178    pub fn new(
179        object_data: ObjectData,
180        committee: Arc<BTreeMap<AuthorityPublicKeyBytes, u64>>,
181    ) -> Self {
182        let mut grouped_results = BTreeMap::new();
183        let mut voting_power = BTreeMap::new();
184        let mut available_voting_power = 0;
185        for (name, _, (version, resp, _elapsed)) in &object_data.responses {
186            let stake = committee.get(name).unwrap();
187            let key = match resp {
188                Ok(r) => {
189                    let obj_digest = r.object.compute_object_reference().2;
190                    let parent_tx_digest = r.object.previous_transaction;
191                    let owner = r.object.owner.clone();
192                    let lock = r.lock_for_debugging.as_ref().map(|lock| *lock.digest());
193                    if lock.is_none() {
194                        available_voting_power += stake;
195                    }
196                    Some((*version, obj_digest, parent_tx_digest, owner, lock))
197                }
198                Err(_) => None,
199            };
200            let entry = grouped_results.entry(key.clone()).or_insert_with(Vec::new);
201            entry.push(*name);
202            let entry: &mut u64 = voting_power.entry(key).or_default();
203            *entry += stake;
204        }
205        let voting_power = voting_power
206            .into_iter()
207            .sorted_by(|(_, v1), (_, v2)| Ord::cmp(v2, v1))
208            .collect::<Vec<_>>();
209        let mut fully_locked = false;
210        if !voting_power.is_empty()
211            && voting_power.first().unwrap().1 + available_voting_power < QUORUM_THRESHOLD
212        {
213            fully_locked = true;
214        }
215        Self {
216            grouped_results,
217            voting_power,
218            available_voting_power,
219            fully_locked,
220        }
221    }
222}
223
224#[allow(clippy::format_in_format_args)]
225impl std::fmt::Display for GroupedObjectOutput {
226    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227        writeln!(f, "available stake: {}", self.available_voting_power)?;
228        writeln!(f, "fully locked: {}", self.fully_locked)?;
229        writeln!(f, "{:<100}\n", "-".repeat(100))?;
230        for (key, stake) in &self.voting_power {
231            let val = self.grouped_results.get(key).unwrap();
232            writeln!(f, "total stake: {stake}")?;
233            match key {
234                Some((_version, obj_digest, parent_tx_digest, owner, lock)) => {
235                    let lock = lock.opt_debug("no-known-lock");
236                    writeln!(f, "obj ref: {obj_digest}")?;
237                    writeln!(f, "parent tx: {parent_tx_digest}")?;
238                    writeln!(f, "owner: {owner}")?;
239                    writeln!(f, "lock: {lock}")?;
240                    for (i, name) in val.iter().enumerate() {
241                        writeln!(f, "        {:<4} {:<20}", i, name.concise(),)?;
242                    }
243                }
244                None => {
245                    writeln!(f, "ERROR")?;
246                    for (i, name) in val.iter().enumerate() {
247                        writeln!(f, "        {:<4} {:<20}", i, name.concise(),)?;
248                    }
249                }
250            };
251            writeln!(f, "{:<100}\n", "-".repeat(100))?;
252        }
253        Ok(())
254    }
255}
256
257struct ConciseObjectOutput(ObjectData);
258
259impl ConciseObjectOutput {
260    fn header() -> String {
261        format!(
262            "{:<20} {:<8} {:<66} {:<45} {}",
263            "validator", "version", "digest", "parent_cert", "owner"
264        )
265    }
266}
267
268impl std::fmt::Display for ConciseObjectOutput {
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        for (name, _multi_addr, (version, resp, _time_elapsed)) in &self.0.responses {
271            write!(
272                f,
273                "{:<20} {:<8}",
274                format!("{:?}", name.concise()),
275                version.map(|s| s.value()).opt_debug("-")
276            )?;
277            match resp {
278                Err(_) => writeln!(
279                    f,
280                    "{:<66} {:<45} {:<51}",
281                    "object-fetch-failed", "no-cert-available", "no-owner-available"
282                )?,
283                Ok(resp) => {
284                    let obj_digest = resp.object.compute_object_reference().2;
285                    let parent = resp.object.previous_transaction;
286                    let owner = resp.object.owner.clone();
287                    write!(f, " {:<66} {:<45} {:<51}", obj_digest, parent, owner)?;
288                }
289            }
290            writeln!(f)?;
291        }
292        Ok(())
293    }
294}
295
296struct VerboseObjectOutput(ObjectData);
297
298impl std::fmt::Display for VerboseObjectOutput {
299    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300        writeln!(f, "Object: {}", self.0.requested_id)?;
301
302        for (name, multiaddr, (version, resp, timespent)) in &self.0.responses {
303            writeln!(f, "validator: {:?}, addr: {:?}", name.concise(), multiaddr)?;
304            writeln!(
305                f,
306                "-- version: {} ({:.3}s)",
307                version.opt_debug("<version not available>"),
308                timespent,
309            )?;
310
311            match resp {
312                Err(e) => writeln!(f, "Error fetching object: {}", e)?,
313                Ok(resp) => {
314                    writeln!(
315                        f,
316                        "  -- object digest: {}",
317                        resp.object.compute_object_reference().2
318                    )?;
319                    if resp.object.is_package() {
320                        writeln!(f, "  -- object: <Move Package>")?;
321                    } else if let Some(layout) = &resp.layout {
322                        writeln!(
323                            f,
324                            "  -- object: Move Object: {}",
325                            resp.object
326                                .data
327                                .try_as_move()
328                                .unwrap()
329                                .to_move_struct(layout)
330                                .unwrap()
331                        )?;
332                    }
333                    writeln!(f, "  -- owner: {}", resp.object.owner)?;
334                    writeln!(
335                        f,
336                        "  -- locked by: {}",
337                        resp.lock_for_debugging.opt_debug("<not locked>")
338                    )?;
339                }
340            }
341        }
342        Ok(())
343    }
344}
345
346pub async fn get_object(
347    obj_id: ObjectID,
348    version: Option<u64>,
349    validator: Option<AuthorityName>,
350    clients: Arc<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>>,
351) -> Result<ObjectData> {
352    let responses = join_all(
353        clients
354            .iter()
355            .filter(|(name, _)| {
356                if let Some(v) = validator {
357                    v == **name
358                } else {
359                    true
360                }
361            })
362            .map(|(name, (address, client))| async {
363                let object_version = get_object_impl(client, obj_id, version).await;
364                (*name, address.clone(), object_version)
365            }),
366    )
367    .await;
368
369    Ok(ObjectData {
370        requested_id: obj_id,
371        responses,
372    })
373}
374
375pub async fn get_transaction_block(
376    tx_digest: TransactionDigest,
377    show_input_tx: bool,
378    fullnode_rpc: String,
379) -> Result<String> {
380    let sui_client = Client::new(fullnode_rpc)?;
381    let clients = make_clients(&sui_client).await?;
382    let timer = Instant::now();
383    let responses = join_all(clients.iter().map(|(name, (address, client))| async {
384        let result = client
385            .handle_transaction_info_request(TransactionInfoRequest {
386                transaction_digest: tx_digest,
387            })
388            .await;
389        (
390            *name,
391            address.clone(),
392            result,
393            timer.elapsed().as_secs_f64(),
394        )
395    }))
396    .await;
397
398    // Grab one validator that return Some(TransactionInfoResponse)
399    let validator_aware_of_tx = responses.iter().find(|r| r.2.is_ok());
400
401    let responses = responses
402        .iter()
403        .map(|r| {
404            let key =
405                r.2.as_ref()
406                    .map(|ok_result| match &ok_result.status {
407                        TransactionStatus::Signed(_) => None,
408                        TransactionStatus::Executed(_, effects, _) => Some(effects.digest()),
409                    })
410                    .ok();
411            let err = r.2.as_ref().err();
412            (key, err, r)
413        })
414        .sorted_by(|(k1, err1, _), (k2, err2, _)| {
415            Ord::cmp(k1, k2).then_with(|| Ord::cmp(err1, err2))
416        })
417        .chunk_by(|(_, _err, r)| {
418            r.2.as_ref().map(|ok_result| match &ok_result.status {
419                TransactionStatus::Signed(_) => None,
420                TransactionStatus::Executed(_, effects, _) => Some((
421                    ok_result.transaction.transaction_data(),
422                    effects.data(),
423                    effects.digest(),
424                )),
425            })
426        });
427    let mut s = String::new();
428    for (i, (key, group)) in responses.into_iter().enumerate() {
429        match key {
430            Ok(Some((tx, effects, effects_digest))) => {
431                writeln!(
432                    &mut s,
433                    "#{:<2} tx_digest: {:<68?} effects_digest: {:?}",
434                    i, tx_digest, effects_digest,
435                )?;
436                writeln!(&mut s, "{:#?}", effects)?;
437                if show_input_tx {
438                    writeln!(&mut s, "{:#?}", tx)?;
439                }
440            }
441            Ok(None) => {
442                writeln!(
443                    &mut s,
444                    "#{:<2} tx_digest: {:<68?} Signed but not executed",
445                    i, tx_digest
446                )?;
447                if show_input_tx {
448                    // In this case, we expect at least one validator knows about this tx
449                    let validator_aware_of_tx = validator_aware_of_tx.unwrap();
450                    let client = &clients.get(&validator_aware_of_tx.0).unwrap().1;
451                    let tx_info = client.handle_transaction_info_request(TransactionInfoRequest {
452                        transaction_digest: tx_digest,
453                    }).await.unwrap_or_else(|e| panic!("Validator {:?} should have known about tx_digest: {:?}, got error: {:?}", validator_aware_of_tx.0, tx_digest, e));
454                    writeln!(&mut s, "{:#?}", tx_info)?;
455                }
456            }
457            other => {
458                writeln!(&mut s, "#{:<2} {:#?}", i, other)?;
459            }
460        }
461        for (j, (_, _, res)) in group.enumerate() {
462            writeln!(
463                &mut s,
464                "        {:<4} {:<20} {:<56} ({:.3}s)",
465                j,
466                res.0.concise(),
467                format!("{}", res.1),
468                res.3
469            )?;
470        }
471        writeln!(&mut s, "{:<100}\n", "-".repeat(100))?;
472    }
473    Ok(s)
474}
475
476async fn get_object_impl(
477    client: &NetworkAuthorityClient,
478    id: ObjectID,
479    version: Option<u64>,
480) -> (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64) {
481    let start = Instant::now();
482    let resp = client
483        .handle_object_info_request(ObjectInfoRequest {
484            object_id: id,
485            generate_layout: LayoutGenerationOption::Generate,
486            request_kind: match version {
487                None => ObjectInfoRequestKind::LatestObjectInfo,
488                Some(v) => ObjectInfoRequestKind::PastObjectInfoDebug(SequenceNumber::from_u64(v)),
489            },
490        })
491        .await
492        .map_err(anyhow::Error::from);
493    let elapsed = start.elapsed().as_secs_f64();
494
495    let resp_version = resp.as_ref().ok().map(|r| r.object.version().value());
496    (resp_version.map(SequenceNumber::from), resp, elapsed)
497}
498
499pub(crate) fn make_anemo_config() -> anemo_cli::Config {
500    use sui_network::discovery::*;
501    use sui_network::state_sync::*;
502
503    // TODO: implement `ServiceInfo` generation in anemo-build and use here.
504    anemo_cli::Config::new()
505        // Sui discovery
506        .add_service(
507            "Discovery",
508            anemo_cli::ServiceInfo::new().add_method(
509                "GetKnownPeersV2",
510                anemo_cli::ron_method!(DiscoveryClient, get_known_peers_v2, ()),
511            ),
512        )
513        // Sui state sync
514        .add_service(
515            "StateSync",
516            anemo_cli::ServiceInfo::new()
517                .add_method(
518                    "PushCheckpointSummary",
519                    anemo_cli::ron_method!(
520                        StateSyncClient,
521                        push_checkpoint_summary,
522                        sui_types::messages_checkpoint::CertifiedCheckpointSummary
523                    ),
524                )
525                .add_method(
526                    "GetCheckpointSummary",
527                    anemo_cli::ron_method!(
528                        StateSyncClient,
529                        get_checkpoint_summary,
530                        GetCheckpointSummaryRequest
531                    ),
532                )
533                .add_method(
534                    "GetCheckpointContents",
535                    anemo_cli::ron_method!(
536                        StateSyncClient,
537                        get_checkpoint_contents,
538                        sui_types::messages_checkpoint::CheckpointContentsDigest
539                    ),
540                )
541                .add_method(
542                    "GetCheckpointAvailability",
543                    anemo_cli::ron_method!(StateSyncClient, get_checkpoint_availability, ()),
544                ),
545        )
546}
547
548fn copy_dir_all(
549    src: impl AsRef<Path>,
550    dst: impl AsRef<Path>,
551    skip: Vec<PathBuf>,
552) -> io::Result<()> {
553    fs::create_dir_all(&dst)?;
554    for entry in fs::read_dir(src)? {
555        let entry = entry?;
556        let ty = entry.file_type()?;
557        if skip.contains(&entry.path()) {
558            continue;
559        }
560        if ty.is_dir() {
561            copy_dir_all(
562                entry.path(),
563                dst.as_ref().join(entry.file_name()),
564                skip.clone(),
565            )?;
566        } else {
567            fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
568        }
569    }
570    Ok(())
571}
572
573pub async fn restore_from_db_checkpoint(
574    config: &NodeConfig,
575    db_checkpoint_path: &Path,
576) -> Result<(), anyhow::Error> {
577    copy_dir_all(db_checkpoint_path, config.db_path(), vec![])?;
578    Ok(())
579}
580
581fn start_summary_sync(
582    perpetual_db: Arc<AuthorityPerpetualTables>,
583    committee_store: Arc<CommitteeStore>,
584    checkpoint_store: Arc<CheckpointStore>,
585    m: MultiProgress,
586    genesis: Genesis,
587    ingestion_url: String,
588    num_parallel_downloads: usize,
589    verify: bool,
590    end_of_epoch_checkpoint_seq_nums: Vec<u64>,
591) -> JoinHandle<Result<(), anyhow::Error>> {
592    tokio::spawn(async move {
593        let store = AuthorityStore::open_no_genesis(perpetual_db, false, &Registry::default())?;
594        let cache_traits = build_execution_cache_from_env(&Registry::default(), &store);
595        let state_sync_store =
596            RocksDbStore::new(cache_traits, committee_store, checkpoint_store.clone());
597        // Only insert the genesis checkpoint if the DB is empty and doesn't have it already
598        if checkpoint_store
599            .get_checkpoint_by_digest(genesis.checkpoint().digest())
600            .unwrap()
601            .is_none()
602        {
603            checkpoint_store.insert_checkpoint_contents(genesis.checkpoint_contents().clone())?;
604            checkpoint_store.insert_verified_checkpoint(&genesis.checkpoint())?;
605            checkpoint_store.update_highest_synced_checkpoint(&genesis.checkpoint())?;
606        }
607
608        let last_checkpoint = end_of_epoch_checkpoint_seq_nums
609            .last()
610            .expect("Expected at least one checkpoint");
611
612        let num_to_sync = end_of_epoch_checkpoint_seq_nums.len() as u64;
613        let sync_progress_bar = m.add(
614            ProgressBar::new(num_to_sync).with_style(
615                ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})")
616                    .unwrap(),
617            ),
618        );
619
620        let cloned_progress_bar = sync_progress_bar.clone();
621        let sync_checkpoint_counter = Arc::new(AtomicU64::new(0));
622        let s_instant = Instant::now();
623
624        let cloned_counter = sync_checkpoint_counter.clone();
625        let latest_synced = checkpoint_store
626            .get_highest_synced_checkpoint()?
627            .map(|c| c.sequence_number)
628            .unwrap_or(0);
629        let s_start = latest_synced
630            .checked_add(1)
631            .wrap_err("Checkpoint overflow")
632            .map_err(|_| anyhow!("Failed to increment checkpoint"))?;
633        tokio::spawn(async move {
634            loop {
635                if cloned_progress_bar.is_finished() {
636                    break;
637                }
638                let num_summaries = cloned_counter.load(Ordering::Relaxed);
639                let total_checkpoints_per_sec =
640                    num_summaries as f64 / s_instant.elapsed().as_secs_f64();
641                cloned_progress_bar.set_position(s_start + num_summaries);
642                cloned_progress_bar.set_message(format!(
643                    "checkpoints synced per sec: {}",
644                    total_checkpoints_per_sec
645                ));
646                tokio::time::sleep(Duration::from_secs(1)).await;
647            }
648        });
649
650        read_summaries_for_list_no_verify(
651            ingestion_url,
652            num_parallel_downloads,
653            state_sync_store.clone(),
654            end_of_epoch_checkpoint_seq_nums.clone(),
655            sync_checkpoint_counter,
656        )
657        .await?;
658        sync_progress_bar.finish_with_message("Checkpoint summary sync is complete");
659        info!("Checkpoint summary sync is complete");
660
661        let checkpoint = checkpoint_store
662            .get_checkpoint_by_sequence_number(*last_checkpoint)?
663            .ok_or(anyhow!("Failed to read last checkpoint"))?;
664        if verify {
665            let verify_progress_bar = m.add(
666                ProgressBar::new(num_to_sync).with_style(
667                    ProgressStyle::with_template(
668                        "[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})",
669                    )
670                    .unwrap(),
671                ),
672            );
673            let cloned_verify_progress_bar = verify_progress_bar.clone();
674            let verify_checkpoint_counter = Arc::new(AtomicU64::new(0));
675            let cloned_verify_counter = verify_checkpoint_counter.clone();
676            let v_instant = Instant::now();
677
678            tokio::spawn(async move {
679                loop {
680                    if cloned_verify_progress_bar.is_finished() {
681                        break;
682                    }
683                    let num_summaries = cloned_verify_counter.load(Ordering::Relaxed);
684                    let total_checkpoints_per_sec =
685                        num_summaries as f64 / v_instant.elapsed().as_secs_f64();
686                    cloned_verify_progress_bar.set_position(num_summaries);
687                    cloned_verify_progress_bar.set_message(format!(
688                        "checkpoints verified per sec: {}",
689                        total_checkpoints_per_sec
690                    ));
691                    tokio::time::sleep(Duration::from_secs(1)).await;
692                }
693            });
694
695            for (cp_epoch, epoch_last_cp_seq_num) in
696                end_of_epoch_checkpoint_seq_nums.iter().enumerate()
697            {
698                let epoch_last_checkpoint = checkpoint_store
699                    .get_checkpoint_by_sequence_number(*epoch_last_cp_seq_num)?
700                    .ok_or(anyhow!("Failed to read checkpoint"))?;
701                let committee = state_sync_store.get_committee(cp_epoch as u64).expect(
702                    "Expected committee to exist after syncing all end of epoch checkpoints",
703                );
704                epoch_last_checkpoint
705                    .verify_authority_signatures(&committee)
706                    .expect("Failed to verify checkpoint");
707                verify_checkpoint_counter.fetch_add(1, Ordering::Relaxed);
708            }
709
710            verify_progress_bar.finish_with_message("Checkpoint summary verification is complete");
711        }
712
713        checkpoint_store.update_highest_verified_checkpoint(&checkpoint)?;
714        checkpoint_store.update_highest_synced_checkpoint(&checkpoint)?;
715        checkpoint_store.update_highest_executed_checkpoint(&checkpoint)?;
716        checkpoint_store.update_highest_pruned_checkpoint(&checkpoint)?;
717        Ok::<(), anyhow::Error>(())
718    })
719}
720
721pub async fn get_latest_available_epoch(
722    snapshot_store_config: &ObjectStoreConfig,
723) -> Result<u64, anyhow::Error> {
724    let remote_object_store = if snapshot_store_config.no_sign_request {
725        snapshot_store_config.make_http()?
726    } else {
727        snapshot_store_config.make().map(Arc::new)?
728    };
729    let manifest_contents = remote_object_store
730        .get_bytes(&get_path(MANIFEST_FILENAME))
731        .await?;
732    let root_manifest: Manifest = serde_json::from_slice(&manifest_contents)
733        .map_err(|err| anyhow!("Error parsing MANIFEST from bytes: {}", err))?;
734    let epoch = root_manifest
735        .available_epochs
736        .iter()
737        .max()
738        .ok_or(anyhow!("No snapshot found in manifest"))?;
739    Ok(*epoch)
740}
741
742pub async fn check_completed_snapshot(
743    snapshot_store_config: &ObjectStoreConfig,
744    epoch: EpochId,
745) -> Result<(), anyhow::Error> {
746    let success_marker = format!("epoch_{}/_SUCCESS", epoch);
747    let archive_success_marker = format!("archive/epoch_{}/_SUCCESS", epoch);
748    let remote_object_store = if snapshot_store_config.no_sign_request {
749        snapshot_store_config.make_http()?
750    } else {
751        snapshot_store_config.make().map(Arc::new)?
752    };
753
754    // Check regular location first, then archive location
755    if exists(&remote_object_store, &get_path(success_marker.as_str())).await
756        || exists(
757            &remote_object_store,
758            &get_path(archive_success_marker.as_str()),
759        )
760        .await
761    {
762        Ok(())
763    } else {
764        Err(anyhow!(
765            "missing success marker at {}/{} or {}/{}",
766            snapshot_store_config.bucket.as_ref().unwrap_or(
767                &snapshot_store_config
768                    .clone()
769                    .aws_endpoint
770                    .unwrap_or("unknown_bucket".to_string())
771            ),
772            success_marker,
773            snapshot_store_config.bucket.as_ref().unwrap_or(
774                &snapshot_store_config
775                    .clone()
776                    .aws_endpoint
777                    .unwrap_or("unknown_bucket".to_string())
778            ),
779            archive_success_marker
780        ))
781    }
782}
783
784pub async fn download_formal_snapshot(
785    path: &Path,
786    epoch: EpochId,
787    genesis: &Path,
788    snapshot_store_config: ObjectStoreConfig,
789    ingestion_url: &str,
790    num_parallel_downloads: usize,
791    num_parallel_chunks: usize,
792    network: Chain,
793    verify: SnapshotVerifyMode,
794    max_retries: usize,
795    metrics_port: u16,
796) -> Result<(), anyhow::Error> {
797    let m = MultiProgress::new();
798    let msg = format!(
799        "Beginning formal snapshot restore to end of epoch {}, network: {:?}, verification mode: {:?}",
800        epoch, network, verify
801    );
802    m.println(&msg).unwrap();
803    info!("{}", msg);
804
805    let path = path.join("staging").to_path_buf();
806    if path.exists() {
807        fs::remove_dir_all(path.clone())?;
808    }
809
810    // Start prometheus server so that we can serve metrics during snapshot download
811    let metrics_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), metrics_port);
812    let registry_service = mysten_metrics::start_prometheus_server(metrics_addr);
813    let prometheus_registry = registry_service.default_registry();
814    DBMetrics::init(registry_service.clone());
815    mysten_metrics::init_metrics(&prometheus_registry);
816
817    let perpetual_db = Arc::new(AuthorityPerpetualTables::open(
818        &path.join("store"),
819        None,
820        None,
821    ));
822    let genesis = Genesis::load(genesis)?;
823    let genesis_chain = ChainIdentifier::from(*genesis.checkpoint().digest()).chain();
824    if genesis_chain != network {
825        return Err(anyhow!(
826            "Genesis file is for chain {}, but formal snapshot download is configured for --network {}. \
827            Use the matching genesis.blob or pass the correct --network flag.",
828            genesis_chain.as_str(),
829            network.as_str(),
830        ));
831    }
832    let genesis_committee = genesis.committee();
833    let committee_store = Arc::new(CommitteeStore::new(
834        path.join("epochs"),
835        &genesis_committee,
836        None,
837    ));
838    let checkpoint_store = CheckpointStore::new(
839        &path.join("checkpoints"),
840        Arc::new(PrunerWatermarks::default()),
841    );
842
843    let end_of_epoch_checkpoint_seq_nums: Vec<_> = end_of_epoch_data(ingestion_url, vec![])
844        .await?
845        .into_iter()
846        .take((epoch + 1) as usize)
847        .collect();
848
849    let summaries_handle = start_summary_sync(
850        perpetual_db.clone(),
851        committee_store.clone(),
852        checkpoint_store.clone(),
853        m.clone(),
854        genesis.clone(),
855        ingestion_url.to_string(),
856        num_parallel_downloads,
857        verify != SnapshotVerifyMode::None,
858        end_of_epoch_checkpoint_seq_nums.clone(),
859    );
860
861    // Start transaction backfill in parallel with summary sync
862    let backfill_handle = {
863        let perpetual_db = perpetual_db.clone();
864        let ingestion_url = ingestion_url.to_string();
865        let m = m.clone();
866        let end_of_epoch_checkpoint_seq_nums = end_of_epoch_checkpoint_seq_nums.clone();
867        tokio::spawn(async move {
868            backfill_epoch_transaction_digests(
869                perpetual_db,
870                epoch,
871                ingestion_url,
872                num_parallel_downloads,
873                m,
874                end_of_epoch_checkpoint_seq_nums,
875            )
876            .await
877        })
878    };
879
880    let (_abort_handle, abort_registration) = AbortHandle::new_pair();
881    let perpetual_db_clone = perpetual_db.clone();
882    let snapshot_dir = path.parent().unwrap().join("snapshot");
883    if snapshot_dir.exists() {
884        fs::remove_dir_all(snapshot_dir.clone())?;
885    }
886    let snapshot_dir_clone = snapshot_dir.clone();
887
888    // TODO if verify is false, we should skip generating these and
889    // not pass in a channel to the reader
890    let (sender, mut receiver) = mpsc::channel(num_parallel_downloads);
891    let m_clone = m.clone();
892
893    let snapshot_handle = tokio::spawn(async move {
894        let local_store_config = ObjectStoreConfig {
895            object_store: Some(ObjectStoreType::File),
896            directory: Some(snapshot_dir_clone.to_path_buf()),
897            ..Default::default()
898        };
899        let mut reader = StateSnapshotReaderV1::new(
900            epoch,
901            &snapshot_store_config,
902            &local_store_config,
903            NonZeroUsize::new(num_parallel_downloads).unwrap(),
904            m_clone,
905            false, // skip_reset_local_store
906            max_retries,
907            num_parallel_chunks,
908        )
909        .await
910        .unwrap_or_else(|err| panic!("Failed to create reader: {}", err));
911        reader
912            .read(perpetual_db_clone.clone(), abort_registration, Some(sender))
913            .await
914            .unwrap_or_else(|err| panic!("Failed during read: {}", err));
915        info!("Snapshot download complete");
916        Ok::<(), anyhow::Error>(())
917    });
918    let mut root_global_state_hash = GlobalStateHash::default();
919    let mut num_live_objects = 0;
920    while let Some((partial_hash, num_objects)) = receiver.recv().await {
921        num_live_objects += num_objects;
922        root_global_state_hash.union(&partial_hash);
923    }
924    tokio::pin!(summaries_handle);
925    tokio::pin!(snapshot_handle);
926    tokio::pin!(backfill_handle);
927
928    let mut summaries_done = false;
929    let mut snapshot_done = false;
930    let mut backfill_done = false;
931
932    // Wait for summaries (required for verification) while monitoring other tasks for early failures
933    while !summaries_done {
934        tokio::select! {
935            result = &mut summaries_handle, if !summaries_done => {
936                summaries_done = true;
937                result.expect("Summaries task panicked")?;
938            }
939            result = &mut backfill_handle, if !backfill_done => {
940                backfill_done = true;
941                result.expect("Backfill task panicked")?;
942            }
943            result = &mut snapshot_handle, if !snapshot_done => {
944                snapshot_done = true;
945                result.expect("Snapshot task panicked")?;
946            }
947        }
948    }
949
950    let last_checkpoint = checkpoint_store
951        .get_highest_verified_checkpoint()?
952        .expect("Expected nonempty checkpoint store");
953
954    // Perform snapshot state verification
955    if verify != SnapshotVerifyMode::None {
956        assert_eq!(
957            last_checkpoint.epoch(),
958            epoch,
959            "Expected highest verified checkpoint ({}) to be for epoch {} but was for epoch {}",
960            last_checkpoint.sequence_number,
961            epoch,
962            last_checkpoint.epoch()
963        );
964        let commitment = last_checkpoint
965            .end_of_epoch_data
966            .as_ref()
967            .expect("Expected highest verified checkpoint to have end of epoch data")
968            .epoch_commitments
969            .last()
970            .expect(
971                "End of epoch has no commitments. This likely means that the epoch \
972                you are attempting to restore from does not support end of epoch state \
973                digest commitment. If restoring from mainnet, `--epoch` must be > 20, \
974                and for testnet, `--epoch` must be > 12.",
975            );
976        match commitment {
977            CheckpointCommitment::ECMHLiveObjectSetDigest(consensus_digest) => {
978                let local_digest: ECMHLiveObjectSetDigest = root_global_state_hash.digest().into();
979                assert_eq!(
980                    *consensus_digest, local_digest,
981                    "End of epoch {} root state digest {} does not match \
982                    local root state hash {} computed from snapshot data",
983                    epoch, consensus_digest.digest, local_digest.digest,
984                );
985                let progress_bar = m.add(
986                    ProgressBar::new(1).with_style(
987                        ProgressStyle::with_template(
988                            "[{elapsed_precise}] {wide_bar} Verifying snapshot contents against root state hash ({msg})",
989                        )
990                        .unwrap(),
991                    ),
992                );
993                progress_bar.finish_with_message("Verification complete");
994            }
995            _ => return Err(anyhow!("Expected ECMHLiveObjectSetDigest")),
996        };
997    } else {
998        m.println(
999            "WARNING: Skipping snapshot verification! \
1000            This is highly discouraged unless you fully trust the source of this snapshot and its contents.
1001            If this was unintentional, rerun with `--verify` set to `normal` or `strict`.",
1002        )?;
1003    }
1004
1005    // Wait for remaining tasks to complete
1006    while !snapshot_done || !backfill_done {
1007        tokio::select! {
1008            result = &mut backfill_handle, if !backfill_done => {
1009                backfill_done = true;
1010                result.expect("Backfill task panicked")?;
1011            }
1012            result = &mut snapshot_handle, if !snapshot_done => {
1013                snapshot_done = true;
1014                result.expect("Snapshot task panicked")?;
1015            }
1016        }
1017    }
1018
1019    // TODO we should ensure this map is being updated for all end of epoch
1020    // checkpoints during summary sync. This happens in `insert_{verified|certified}_checkpoint`
1021    // in checkpoint store, but not in the corresponding functions in ObjectStore trait
1022    checkpoint_store.insert_epoch_last_checkpoint(epoch, &last_checkpoint)?;
1023
1024    setup_db_state(
1025        epoch,
1026        root_global_state_hash.clone(),
1027        perpetual_db.clone(),
1028        checkpoint_store.clone(),
1029        committee_store,
1030        network,
1031        verify == SnapshotVerifyMode::Strict,
1032        num_live_objects,
1033        m.clone(),
1034    )
1035    .await?;
1036
1037    // After a large backfill, rebuild the tidehunter control region to reclaim disk space
1038    // and reduce startup time. No-op when compiled without tidehunter.
1039    #[cfg(tidehunter)]
1040    {
1041        perpetual_db
1042            .force_rebuild_control_region()
1043            .expect("Failed to rebuild tidehunter control region after snapshot restore");
1044        // The tidehunter Db spawns background threads (periodic snapshot, relocator,
1045        // flusher pool, etc.) that own file handles inside the staging directory.
1046        // Wait for them to exit before renaming staging -> live, otherwise a
1047        // periodic timer could try to open a new file under the (now missing) path.
1048        println!(
1049            "Waiting for tidehunter background threads to finish before renaming staging to live"
1050        );
1051        perpetual_db.wait_for_tidehunter_background_threads();
1052        println!("Tidehunter background threads finished, proceeding with rename");
1053    }
1054
1055    let new_path = path.parent().unwrap().join("live");
1056    if new_path.exists() {
1057        fs::remove_dir_all(new_path.clone())?;
1058    }
1059    fs::rename(&path, &new_path)?;
1060    fs::remove_dir_all(snapshot_dir.clone())?;
1061    println!(
1062        "Successfully restored state from snapshot at end of epoch {}",
1063        epoch
1064    );
1065
1066    Ok(())
1067}
1068
1069async fn backfill_epoch_transaction_digests(
1070    perpetual_db: Arc<AuthorityPerpetualTables>,
1071    epoch: EpochId,
1072    ingestion_url: String,
1073    concurrency: usize,
1074    m: MultiProgress,
1075    end_of_epoch_checkpoint_seq_nums: Vec<u64>,
1076) -> Result<()> {
1077    if epoch == 0 {
1078        return Ok(());
1079    }
1080
1081    // Use end_of_epoch_checkpoint_seq_nums to get checkpoint ranges
1082    // we're backfilling up to the last checkpoint of the previous epoch
1083    // end_of_epoch_checkpoint_seq_nums[890] == end of epoch checkpoint for epoch_891
1084    // if restoring from epoch_891, we want to backfill checkpoints from end of epoch_890 to end of epoch_891
1085    // (when restoring from epoch_891, node will immediately start in epoch 892, so prev_epoch == epoch_891)
1086    let epoch_last_cp_seq = end_of_epoch_checkpoint_seq_nums
1087        .get(epoch as usize)
1088        .ok_or_else(|| anyhow!("No checkpoint sequence found for epoch {}", epoch))?;
1089
1090    let epoch_start_cp = if epoch == 0 {
1091        0
1092    } else {
1093        end_of_epoch_checkpoint_seq_nums
1094            .get(epoch as usize - 1)
1095            .map(|cp| cp + 1)
1096            .unwrap_or(0)
1097    };
1098    let msg = format!(
1099        "Beginning transaction digest backfill for epoch: {:?}, backfilling from: {:?}..{:?}",
1100        epoch, epoch_start_cp, epoch_last_cp_seq
1101    );
1102    m.println(&msg).ok();
1103    info!("{}", msg);
1104
1105    let checkpoints_to_fetch: Vec<_> = (epoch_start_cp..=*epoch_last_cp_seq).collect();
1106    let num_checkpoints = checkpoints_to_fetch.len();
1107
1108    let progress_bar = m.add(
1109        ProgressBar::new(num_checkpoints as u64).with_style(
1110            ProgressStyle::with_template(
1111                "[{elapsed_precise}] {wide_bar} {pos}/{len} transactions backfilled ({msg})",
1112            )
1113            .unwrap(),
1114        ),
1115    );
1116
1117    let client = build_object_store(&ingestion_url, vec![]);
1118    let checkpoint_counter = Arc::new(AtomicU64::new(0));
1119    let tx_counter = Arc::new(AtomicU64::new(0));
1120    let cloned_checkpoint_counter = checkpoint_counter.clone();
1121    let cloned_progress_bar = progress_bar.clone();
1122    let start_instant = Instant::now();
1123
1124    tokio::spawn(async move {
1125        loop {
1126            if cloned_progress_bar.is_finished() {
1127                break;
1128            }
1129            let num_checkpoints_processed = cloned_checkpoint_counter.load(Ordering::Relaxed);
1130            let elapsed = start_instant.elapsed().as_secs_f64();
1131            let chkpts_per_sec = if elapsed > 0.0 {
1132                num_checkpoints_processed as f64 / elapsed
1133            } else {
1134                0.0
1135            };
1136            cloned_progress_bar.set_position(num_checkpoints_processed);
1137            cloned_progress_bar.set_message(format!("{:.1} chkpts/sec", chkpts_per_sec));
1138            tokio::time::sleep(Duration::from_millis(100)).await;
1139        }
1140    });
1141
1142    futures::stream::iter(checkpoints_to_fetch)
1143        .map(|sq| {
1144            let client = client.clone();
1145            async move {
1146                fetch_checkpoint(&client, sq)
1147                    .await
1148                    .map(|c| Arc::new(CheckpointData::from(c)))
1149            }
1150        })
1151        .buffer_unordered(concurrency)
1152        .try_for_each(|checkpoint| {
1153            let perpetual_db = perpetual_db.clone();
1154            let tx_counter = tx_counter.clone();
1155            let checkpoint_counter = checkpoint_counter.clone();
1156            let checkpoint_data = checkpoint;
1157
1158            async move {
1159                let tx_digests: Vec<_> = checkpoint_data
1160                    .transactions
1161                    .iter()
1162                    .map(|tx_data| *tx_data.transaction.digest())
1163                    .collect();
1164                let num_txs = tx_digests.len();
1165                perpetual_db
1166                    .insert_executed_transaction_digests_batch(epoch, tx_digests.into_iter())?;
1167                tx_counter.fetch_add(num_txs as u64, Ordering::Relaxed);
1168                checkpoint_counter.fetch_add(1, Ordering::Relaxed);
1169                Ok::<(), anyhow::Error>(())
1170            }
1171        })
1172        .await?;
1173
1174    let tx_count = tx_counter.load(Ordering::Relaxed);
1175    progress_bar.finish_with_message(format!(
1176        "Backfill complete: {} transactions from {} checkpoints",
1177        tx_count, num_checkpoints
1178    ));
1179    info!(
1180        "Backfill complete: {} transactions from {} checkpoints",
1181        tx_counter.load(Ordering::Relaxed),
1182        checkpoint_counter.load(Ordering::Relaxed)
1183    );
1184
1185    Ok(())
1186}