sui_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::authority::ExecutionEnv;
29use sui_core::authority::RandomnessRoundReceiver;
30use sui_core::authority::authority_store_tables::{
31    AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
32};
33use sui_core::authority::backpressure::BackpressureManager;
34use sui_core::authority::epoch_start_configuration::EpochFlag;
35use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
36use sui_core::consensus_adapter::ConsensusClient;
37use sui_core::consensus_manager::UpdatableConsensusClient;
38use sui_core::epoch::randomness::RandomnessManager;
39use sui_core::execution_cache::build_execution_cache;
40use sui_network::endpoint_manager::{AddressSource, EndpointId};
41use sui_network::validator::server::SUI_TLS_SERVER_NAME;
42use sui_types::full_checkpoint_content::Checkpoint;
43
44use sui_core::execution_scheduler::SchedulingSource;
45use sui_core::global_state_hasher::GlobalStateHashMetrics;
46use sui_core::storage::RestReadStore;
47use sui_json_rpc::bridge_api::BridgeReadApi;
48use sui_json_rpc_api::JsonRpcMetrics;
49use sui_network::randomness;
50use sui_rpc_api::RpcMetrics;
51use sui_rpc_api::ServerVersion;
52use sui_rpc_api::subscription::SubscriptionService;
53use sui_types::base_types::ConciseableName;
54use sui_types::crypto::RandomnessRound;
55use sui_types::digests::{
56    ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
57};
58use sui_types::messages_consensus::AuthorityCapabilitiesV2;
59use sui_types::sui_system_state::SuiSystemState;
60use tap::tap::TapFallible;
61use tokio::sync::oneshot;
62use tokio::sync::{Mutex, broadcast, mpsc};
63use tokio::task::JoinHandle;
64use tower::ServiceBuilder;
65use tracing::{Instrument, error_span, info};
66use tracing::{debug, error, warn};
67
68// Logs at debug level in test configuration, info level otherwise.
69// JWK logs cause significant volume in tests, but are insignificant in prod,
70// so we keep them at info
71macro_rules! jwk_log {
72    ($($arg:tt)+) => {
73        if in_test_configuration() {
74            debug!($($arg)+);
75        } else {
76            info!($($arg)+);
77        }
78    };
79}
80
81use fastcrypto_zkp::bn254::zk_login::JWK;
82pub use handle::SuiNodeHandle;
83use mysten_metrics::{RegistryService, spawn_monitored_task};
84use mysten_service::server_timing::server_timing_middleware;
85use sui_config::node::{DBCheckpointConfig, RunWithRange};
86use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
87use sui_config::node_config_metrics::NodeConfigMetrics;
88use sui_config::{ConsensusConfig, NodeConfig};
89use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
90use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
91use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
92use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
93use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
94use sui_core::authority_aggregator::AuthorityAggregator;
95use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
96use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
97use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
98use sui_core::checkpoints::{
99    CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
100    SubmitCheckpointToConsensus,
101};
102use sui_core::consensus_adapter::{
103    CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
104};
105use sui_core::consensus_manager::ConsensusManager;
106use sui_core::consensus_throughput_calculator::{
107    ConsensusThroughputCalculator, ConsensusThroughputProfiler, ThroughputProfileRanges,
108};
109use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
110use sui_core::db_checkpoint_handler::DBCheckpointHandler;
111use sui_core::epoch::committee_store::CommitteeStore;
112use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
113use sui_core::epoch::epoch_metrics::EpochMetrics;
114use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
115use sui_core::global_state_hasher::GlobalStateHasher;
116use sui_core::jsonrpc_index::IndexStore;
117use sui_core::module_cache_metrics::ResolverMetrics;
118use sui_core::overload_monitor::overload_monitor;
119use sui_core::rpc_index::RpcIndexStore;
120use sui_core::signature_verifier::SignatureVerifierMetrics;
121use sui_core::storage::RocksDbStore;
122use sui_core::transaction_orchestrator::TransactionOrchestrator;
123use sui_core::{
124    authority::{AuthorityState, AuthorityStore},
125    authority_client::NetworkAuthorityClient,
126};
127use sui_json_rpc::JsonRpcServerBuilder;
128use sui_json_rpc::coin_api::CoinReadApi;
129use sui_json_rpc::governance_api::GovernanceReadApi;
130use sui_json_rpc::indexer_api::IndexerApi;
131use sui_json_rpc::move_utils::MoveUtils;
132use sui_json_rpc::read_api::ReadApi;
133use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
134use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
135use sui_macros::fail_point;
136use sui_macros::{fail_point_async, replay_log};
137use sui_network::api::ValidatorServer;
138use sui_network::discovery;
139use sui_network::endpoint_manager::EndpointManager;
140use sui_network::state_sync;
141use sui_network::validator::server::ServerBuilder;
142use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
143use sui_snapshot::uploader::StateSnapshotUploader;
144use sui_storage::{
145    http_key_value_store::HttpKVStore,
146    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
147    key_value_store_metrics::KeyValueStoreMetrics,
148};
149use sui_types::base_types::{AuthorityName, EpochId};
150use sui_types::committee::Committee;
151use sui_types::crypto::KeypairTraits;
152use sui_types::error::{SuiError, SuiResult};
153use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
154use sui_types::sui_system_state::SuiSystemStateTrait;
155use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
156use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
157use sui_types::supported_protocol_versions::SupportedProtocolVersions;
158use typed_store::DBMetrics;
159use typed_store::rocks::default_db_options;
160
161use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
162
163pub mod admin;
164mod handle;
165pub mod metrics;
166
167pub struct ValidatorComponents {
168    validator_server_handle: SpawnOnce,
169    validator_overload_monitor_handle: Option<JoinHandle<()>>,
170    consensus_manager: Arc<ConsensusManager>,
171    consensus_store_pruner: ConsensusStorePruner,
172    consensus_adapter: Arc<ConsensusAdapter>,
173    checkpoint_metrics: Arc<CheckpointMetrics>,
174    sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
175}
176pub struct P2pComponents {
177    p2p_network: Network,
178    known_peers: HashMap<PeerId, String>,
179    discovery_handle: discovery::Handle,
180    state_sync_handle: state_sync::Handle,
181    randomness_handle: randomness::Handle,
182}
183
184#[cfg(msim)]
185mod simulator {
186    use std::sync::atomic::AtomicBool;
187    use sui_types::error::SuiErrorKind;
188
189    use super::*;
190    pub(super) struct SimState {
191        pub sim_node: sui_simulator::runtime::NodeHandle,
192        pub sim_safe_mode_expected: AtomicBool,
193        _leak_detector: sui_simulator::NodeLeakDetector,
194    }
195
196    impl Default for SimState {
197        fn default() -> Self {
198            Self {
199                sim_node: sui_simulator::runtime::NodeHandle::current(),
200                sim_safe_mode_expected: AtomicBool::new(false),
201                _leak_detector: sui_simulator::NodeLeakDetector::new(),
202            }
203        }
204    }
205
206    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
207        + Send
208        + Sync
209        + 'static;
210
211    fn default_fetch_jwks(
212        _authority: AuthorityName,
213        _provider: &OIDCProvider,
214    ) -> SuiResult<Vec<(JwkId, JWK)>> {
215        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
216        // Just load a default Twitch jwk for testing.
217        parse_jwks(
218            sui_types::zk_login_util::DEFAULT_JWK_BYTES,
219            &OIDCProvider::Twitch,
220            true,
221        )
222        .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
223    }
224
225    thread_local! {
226        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
227    }
228
229    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
230        JWK_INJECTOR.with(|injector| injector.borrow().clone())
231    }
232
233    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
234        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
235    }
236}
237
238#[cfg(msim)]
239pub use simulator::set_jwk_injector;
240#[cfg(msim)]
241use simulator::*;
242use sui_core::authority::authority_store_pruner::{ObjectsCompactionFilter, PrunerWatermarks};
243use sui_core::{
244    consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
245};
246
247const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
248
249pub struct SuiNode {
250    config: NodeConfig,
251    validator_components: Mutex<Option<ValidatorComponents>>,
252
253    /// The http servers responsible for serving RPC traffic (gRPC and JSON-RPC)
254    #[allow(unused)]
255    http_servers: HttpServers,
256
257    state: Arc<AuthorityState>,
258    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
259    registry_service: RegistryService,
260    metrics: Arc<SuiNodeMetrics>,
261    checkpoint_metrics: Arc<CheckpointMetrics>,
262
263    _discovery: discovery::Handle,
264    _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
265    state_sync_handle: state_sync::Handle,
266    randomness_handle: randomness::Handle,
267    checkpoint_store: Arc<CheckpointStore>,
268    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
269    connection_monitor_status: Arc<ConnectionMonitorStatus>,
270
271    /// Broadcast channel to send the starting system state for the next epoch.
272    end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
273
274    /// EndpointManager for updating peer network addresses.
275    endpoint_manager: EndpointManager,
276
277    backpressure_manager: Arc<BackpressureManager>,
278
279    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
280
281    #[cfg(msim)]
282    sim_state: SimState,
283
284    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
285    // Channel to allow signaling upstream to shutdown sui-node
286    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
287
288    /// AuthorityAggregator of the network, created at start and beginning of each epoch.
289    /// Use ArcSwap so that we could mutate it without taking mut reference.
290    // TODO: Eventually we can make this auth aggregator a shared reference so that this
291    // update will automatically propagate to other uses.
292    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
293
294    subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
295}
296
297impl fmt::Debug for SuiNode {
298    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
299        f.debug_struct("SuiNode")
300            .field("name", &self.state.name.concise())
301            .finish()
302    }
303}
304
305static MAX_JWK_KEYS_PER_FETCH: usize = 100;
306
307impl SuiNode {
308    pub async fn start(
309        config: NodeConfig,
310        registry_service: RegistryService,
311    ) -> Result<Arc<SuiNode>> {
312        Self::start_async(
313            config,
314            registry_service,
315            ServerVersion::new("sui-node", "unknown"),
316        )
317        .await
318    }
319
320    fn start_jwk_updater(
321        config: &NodeConfig,
322        metrics: Arc<SuiNodeMetrics>,
323        authority: AuthorityName,
324        epoch_store: Arc<AuthorityPerEpochStore>,
325        consensus_adapter: Arc<ConsensusAdapter>,
326    ) {
327        let epoch = epoch_store.epoch();
328
329        let supported_providers = config
330            .zklogin_oauth_providers
331            .get(&epoch_store.get_chain_identifier().chain())
332            .unwrap_or(&BTreeSet::new())
333            .iter()
334            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
335            .collect::<Vec<_>>();
336
337        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
338
339        info!(
340            ?fetch_interval,
341            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
342        );
343
344        fn validate_jwk(
345            metrics: &Arc<SuiNodeMetrics>,
346            provider: &OIDCProvider,
347            id: &JwkId,
348            jwk: &JWK,
349        ) -> bool {
350            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
351                warn!(
352                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
353                    id.iss, provider
354                );
355                metrics
356                    .invalid_jwks
357                    .with_label_values(&[&provider.to_string()])
358                    .inc();
359                return false;
360            };
361
362            if iss_provider != *provider {
363                warn!(
364                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
365                    id.iss, provider, iss_provider
366                );
367                metrics
368                    .invalid_jwks
369                    .with_label_values(&[&provider.to_string()])
370                    .inc();
371                return false;
372            }
373
374            if !check_total_jwk_size(id, jwk) {
375                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
376                metrics
377                    .invalid_jwks
378                    .with_label_values(&[&provider.to_string()])
379                    .inc();
380                return false;
381            }
382
383            true
384        }
385
386        // metrics is:
387        //  pub struct SuiNodeMetrics {
388        //      pub jwk_requests: IntCounterVec,
389        //      pub jwk_request_errors: IntCounterVec,
390        //      pub total_jwks: IntCounterVec,
391        //      pub unique_jwks: IntCounterVec,
392        //  }
393
394        for p in supported_providers.into_iter() {
395            let provider_str = p.to_string();
396            let epoch_store = epoch_store.clone();
397            let consensus_adapter = consensus_adapter.clone();
398            let metrics = metrics.clone();
399            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
400                async move {
401                    // note: restart-safe de-duplication happens after consensus, this is
402                    // just best-effort to reduce unneeded submissions.
403                    let mut seen = HashSet::new();
404                    loop {
405                        jwk_log!("fetching JWK for provider {:?}", p);
406                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
407                        match Self::fetch_jwks(authority, &p).await {
408                            Err(e) => {
409                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
410                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
411                                // Retry in 30 seconds
412                                tokio::time::sleep(Duration::from_secs(30)).await;
413                                continue;
414                            }
415                            Ok(mut keys) => {
416                                metrics.total_jwks
417                                    .with_label_values(&[&provider_str])
418                                    .inc_by(keys.len() as u64);
419
420                                keys.retain(|(id, jwk)| {
421                                    validate_jwk(&metrics, &p, id, jwk) &&
422                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
423                                    seen.insert((id.clone(), jwk.clone()))
424                                });
425
426                                metrics.unique_jwks
427                                    .with_label_values(&[&provider_str])
428                                    .inc_by(keys.len() as u64);
429
430                                // prevent oauth providers from sending too many keys,
431                                // inadvertently or otherwise
432                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
433                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
434                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
435                                }
436
437                                for (id, jwk) in keys.into_iter() {
438                                    jwk_log!("Submitting JWK to consensus: {:?}", id);
439
440                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
441                                    consensus_adapter.submit(txn, None, &epoch_store, None, None)
442                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
443                                        .ok();
444                                }
445                            }
446                        }
447                        tokio::time::sleep(fetch_interval).await;
448                    }
449                }
450                .instrument(error_span!("jwk_updater_task", epoch)),
451            ));
452        }
453    }
454
455    pub async fn start_async(
456        config: NodeConfig,
457        registry_service: RegistryService,
458        server_version: ServerVersion,
459    ) -> Result<Arc<SuiNode>> {
460        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
461        let mut config = config.clone();
462        if config.supported_protocol_versions.is_none() {
463            info!(
464                "populating config.supported_protocol_versions with default {:?}",
465                SupportedProtocolVersions::SYSTEM_DEFAULT
466            );
467            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
468        }
469
470        let run_with_range = config.run_with_range;
471        let is_validator = config.consensus_config().is_some();
472        let is_full_node = !is_validator;
473        let prometheus_registry = registry_service.default_registry();
474
475        info!(node =? config.protocol_public_key(),
476            "Initializing sui-node listening on {}", config.network_address
477        );
478
479        // Initialize metrics to track db usage before creating any stores
480        DBMetrics::init(registry_service.clone());
481
482        // Initialize db sync-to-disk setting from config (falls back to env var if not set)
483        typed_store::init_write_sync(config.enable_db_sync_to_disk);
484
485        // Initialize Mysten metrics.
486        mysten_metrics::init_metrics(&prometheus_registry);
487        // Unsupported (because of the use of static variable) and unnecessary in simtests.
488        #[cfg(not(msim))]
489        mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
490
491        let genesis = config.genesis()?.clone();
492
493        let secret = Arc::pin(config.protocol_key_pair().copy());
494        let genesis_committee = genesis.committee();
495        let committee_store = Arc::new(CommitteeStore::new(
496            config.db_path().join("epochs"),
497            &genesis_committee,
498            None,
499        ));
500
501        let pruner_watermarks = Arc::new(PrunerWatermarks::default());
502        let checkpoint_store = CheckpointStore::new(
503            &config.db_path().join("checkpoints"),
504            pruner_watermarks.clone(),
505        );
506        let checkpoint_metrics = CheckpointMetrics::new(&registry_service.default_registry());
507
508        Self::check_and_recover_forks(
509            &checkpoint_store,
510            &checkpoint_metrics,
511            is_validator,
512            config.fork_recovery.as_ref(),
513        )
514        .await?;
515
516        let mut pruner_db = None;
517        if config
518            .authority_store_pruning_config
519            .enable_compaction_filter
520        {
521            pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
522                &config.db_path().join("store"),
523            )));
524        }
525        let compaction_filter = pruner_db
526            .clone()
527            .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
528
529        // By default, only enable write stall on validators for perpetual db.
530        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
531        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
532            enable_write_stall,
533            compaction_filter,
534            is_validator,
535        };
536        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
537            &config.db_path().join("store"),
538            Some(perpetual_tables_options),
539            Some(pruner_watermarks.epoch_id.clone()),
540        ));
541        let is_genesis = perpetual_tables
542            .database_is_empty()
543            .expect("Database read should not fail at init.");
544
545        let backpressure_manager =
546            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
547
548        let store =
549            AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
550
551        let cur_epoch = store.get_recovery_epoch_at_restart()?;
552        let committee = committee_store
553            .get_committee(&cur_epoch)?
554            .expect("Committee of the current epoch must exist");
555        let epoch_start_configuration = store
556            .get_epoch_start_configuration()?
557            .expect("EpochStartConfiguration of the current epoch must exist");
558        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
559        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
560
561        let cache_traits = build_execution_cache(
562            &config.execution_cache,
563            &prometheus_registry,
564            &store,
565            backpressure_manager.clone(),
566        );
567
568        let auth_agg = {
569            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
570            Arc::new(ArcSwap::new(Arc::new(
571                AuthorityAggregator::new_from_epoch_start_state(
572                    epoch_start_configuration.epoch_start_state(),
573                    &committee_store,
574                    safe_client_metrics_base,
575                ),
576            )))
577        };
578
579        let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
580        let chain = match config.chain_override_for_testing {
581            Some(chain) => chain,
582            None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
583        };
584
585        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
586        let epoch_store = AuthorityPerEpochStore::new(
587            config.protocol_public_key(),
588            committee.clone(),
589            &config.db_path().join("store"),
590            Some(epoch_options.options),
591            EpochMetrics::new(&registry_service.default_registry()),
592            epoch_start_configuration,
593            cache_traits.backing_package_store.clone(),
594            cache_traits.object_store.clone(),
595            cache_metrics,
596            signature_verifier_metrics,
597            &config.expensive_safety_check_config,
598            (chain_id, chain),
599            checkpoint_store
600                .get_highest_executed_checkpoint_seq_number()
601                .expect("checkpoint store read cannot fail")
602                .unwrap_or(0),
603            Arc::new(SubmittedTransactionCacheMetrics::new(
604                &registry_service.default_registry(),
605            )),
606        )?;
607
608        info!("created epoch store");
609
610        replay_log!(
611            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
612            epoch_store.epoch(),
613            epoch_store.protocol_config()
614        );
615
616        // the database is empty at genesis time
617        if is_genesis {
618            info!("checking SUI conservation at genesis");
619            // When we are opening the db table, the only time when it's safe to
620            // check SUI conservation is at genesis. Otherwise we may be in the middle of
621            // an epoch and the SUI conservation check will fail. This also initialize
622            // the expected_network_sui_amount table.
623            cache_traits
624                .reconfig_api
625                .expensive_check_sui_conservation(&epoch_store)
626                .expect("SUI conservation check cannot fail at genesis");
627        }
628
629        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
630        let default_buffer_stake = epoch_store
631            .protocol_config()
632            .buffer_stake_for_protocol_upgrade_bps();
633        if effective_buffer_stake != default_buffer_stake {
634            warn!(
635                ?effective_buffer_stake,
636                ?default_buffer_stake,
637                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
638            );
639        }
640
641        checkpoint_store.insert_genesis_checkpoint(
642            genesis.checkpoint(),
643            genesis.checkpoint_contents().clone(),
644            &epoch_store,
645        );
646
647        info!("creating state sync store");
648        let state_sync_store = RocksDbStore::new(
649            cache_traits.clone(),
650            committee_store.clone(),
651            checkpoint_store.clone(),
652        );
653
654        let index_store = if is_full_node && config.enable_index_processing {
655            info!("creating index store");
656            Some(Arc::new(IndexStore::new(
657                config.db_path().join("indexes"),
658                &prometheus_registry,
659                epoch_store
660                    .protocol_config()
661                    .max_move_identifier_len_as_option(),
662                config.remove_deprecated_tables,
663            )))
664        } else {
665            None
666        };
667
668        let rpc_index = if is_full_node && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
669            Some(Arc::new(
670                RpcIndexStore::new(
671                    &config.db_path(),
672                    &store,
673                    &checkpoint_store,
674                    &epoch_store,
675                    &cache_traits.backing_package_store,
676                    pruner_watermarks.checkpoint_id.clone(),
677                    config.rpc().cloned().unwrap_or_default(),
678                )
679                .await,
680            ))
681        } else {
682            None
683        };
684
685        let chain_identifier = epoch_store.get_chain_identifier();
686
687        info!("creating archive reader");
688        // Create network
689        let (randomness_tx, randomness_rx) = mpsc::channel(
690            config
691                .p2p_config
692                .randomness
693                .clone()
694                .unwrap_or_default()
695                .mailbox_capacity(),
696        );
697        let P2pComponents {
698            p2p_network,
699            known_peers,
700            discovery_handle,
701            state_sync_handle,
702            randomness_handle,
703        } = Self::create_p2p_network(
704            &config,
705            state_sync_store.clone(),
706            chain_identifier,
707            randomness_tx,
708            &prometheus_registry,
709        )?;
710
711        let endpoint_manager = EndpointManager::new(discovery_handle.clone());
712
713        // Inject configured peer address overrides.
714        for peer in &config.p2p_config.peer_address_overrides {
715            endpoint_manager
716                .update_endpoint(
717                    EndpointId::P2p(peer.peer_id),
718                    AddressSource::Config,
719                    peer.addresses.clone(),
720                )
721                .expect("Updating peer address overrides should not fail");
722        }
723
724        // Send initial peer addresses to the p2p network.
725        update_peer_addresses(&config, &endpoint_manager, epoch_store.epoch_start_state());
726
727        info!("start snapshot upload");
728        // Start uploading state snapshot to remote store
729        let state_snapshot_handle = Self::start_state_snapshot(
730            &config,
731            &prometheus_registry,
732            checkpoint_store.clone(),
733            chain_identifier,
734        )?;
735
736        // Start uploading db checkpoints to remote store
737        info!("start db checkpoint");
738        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
739            &config,
740            &prometheus_registry,
741            state_snapshot_handle.is_some(),
742        )?;
743
744        if !epoch_store
745            .protocol_config()
746            .simplified_unwrap_then_delete()
747        {
748            // We cannot prune tombstones if simplified_unwrap_then_delete is not enabled.
749            config
750                .authority_store_pruning_config
751                .set_killswitch_tombstone_pruning(true);
752        }
753
754        let authority_name = config.protocol_public_key();
755
756        info!("create authority state");
757        let state = AuthorityState::new(
758            authority_name,
759            secret,
760            config.supported_protocol_versions.unwrap(),
761            store.clone(),
762            cache_traits.clone(),
763            epoch_store.clone(),
764            committee_store.clone(),
765            index_store.clone(),
766            rpc_index,
767            checkpoint_store.clone(),
768            &prometheus_registry,
769            genesis.objects(),
770            &db_checkpoint_config,
771            config.clone(),
772            chain_identifier,
773            pruner_db,
774            config.policy_config.clone(),
775            config.firewall_config.clone(),
776            pruner_watermarks,
777        )
778        .await;
779        // ensure genesis txn was executed
780        if epoch_store.epoch() == 0 {
781            let txn = &genesis.transaction();
782            let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
783            let transaction =
784                sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
785                    sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
786                        genesis.transaction().data().clone(),
787                        sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
788                    ),
789                );
790            state
791                .try_execute_immediately(
792                    &transaction,
793                    ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
794                    &epoch_store,
795                )
796                .instrument(span)
797                .await
798                .unwrap();
799        }
800
801        // Start the loop that receives new randomness and generates transactions for it.
802        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
803
804        if config
805            .expensive_safety_check_config
806            .enable_secondary_index_checks()
807            && let Some(indexes) = state.indexes.clone()
808        {
809            sui_core::verify_indexes::verify_indexes(
810                state.get_global_state_hash_store().as_ref(),
811                indexes,
812            )
813            .expect("secondary indexes are inconsistent");
814        }
815
816        let (end_of_epoch_channel, end_of_epoch_receiver) =
817            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
818
819        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
820            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
821                auth_agg.load_full(),
822                state.clone(),
823                end_of_epoch_receiver,
824                &config.db_path(),
825                &prometheus_registry,
826                &config,
827            )))
828        } else {
829            None
830        };
831
832        let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
833            state.clone(),
834            state_sync_store,
835            &transaction_orchestrator.clone(),
836            &config,
837            &prometheus_registry,
838            server_version,
839        )
840        .await?;
841
842        let global_state_hasher = Arc::new(GlobalStateHasher::new(
843            cache_traits.global_state_hash_store.clone(),
844            GlobalStateHashMetrics::new(&prometheus_registry),
845        ));
846
847        let authority_names_to_peer_ids = epoch_store
848            .epoch_start_state()
849            .get_authority_names_to_peer_ids();
850
851        let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
852            "sui",
853            &registry_service.default_registry(),
854        );
855
856        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
857
858        let connection_monitor_handle =
859            mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
860                p2p_network.downgrade(),
861                Arc::new(network_connection_metrics),
862                known_peers,
863            );
864
865        let connection_monitor_status = ConnectionMonitorStatus {
866            connection_statuses: connection_monitor_handle.connection_statuses(),
867            authority_names_to_peer_ids,
868        };
869
870        let connection_monitor_status = Arc::new(connection_monitor_status);
871        let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));
872
873        sui_node_metrics
874            .binary_max_protocol_version
875            .set(ProtocolVersion::MAX.as_u64() as i64);
876        sui_node_metrics
877            .configured_max_protocol_version
878            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
879
880        let validator_components = if state.is_validator(&epoch_store) {
881            let mut components = Self::construct_validator_components(
882                config.clone(),
883                state.clone(),
884                committee,
885                epoch_store.clone(),
886                checkpoint_store.clone(),
887                state_sync_handle.clone(),
888                randomness_handle.clone(),
889                Arc::downgrade(&global_state_hasher),
890                backpressure_manager.clone(),
891                connection_monitor_status.clone(),
892                &registry_service,
893                sui_node_metrics.clone(),
894                checkpoint_metrics.clone(),
895            )
896            .await?;
897
898            components.consensus_adapter.submit_recovered(&epoch_store);
899
900            // Start the gRPC server
901            components.validator_server_handle = components.validator_server_handle.start().await;
902
903            // Set the consensus address updater so that we can update the consensus peer addresses when requested.
904            endpoint_manager.set_consensus_address_updater(components.consensus_manager.clone());
905
906            Some(components)
907        } else {
908            None
909        };
910
911        // setup shutdown channel
912        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
913
914        let node = Self {
915            config,
916            validator_components: Mutex::new(validator_components),
917            http_servers,
918            state,
919            transaction_orchestrator,
920            registry_service,
921            metrics: sui_node_metrics,
922            checkpoint_metrics,
923
924            _discovery: discovery_handle,
925            _connection_monitor_handle: connection_monitor_handle,
926            state_sync_handle,
927            randomness_handle,
928            checkpoint_store,
929            global_state_hasher: Mutex::new(Some(global_state_hasher)),
930            end_of_epoch_channel,
931            connection_monitor_status,
932            endpoint_manager,
933            backpressure_manager,
934
935            _db_checkpoint_handle: db_checkpoint_handle,
936
937            #[cfg(msim)]
938            sim_state: Default::default(),
939
940            _state_snapshot_uploader_handle: state_snapshot_handle,
941            shutdown_channel_tx: shutdown_channel,
942
943            auth_agg,
944            subscription_service_checkpoint_sender,
945        };
946
947        info!("SuiNode started!");
948        let node = Arc::new(node);
949        let node_copy = node.clone();
950        spawn_monitored_task!(async move {
951            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
952            if let Err(error) = result {
953                warn!("Reconfiguration finished with error {:?}", error);
954            }
955        });
956
957        Ok(node)
958    }
959
960    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
961        self.end_of_epoch_channel.subscribe()
962    }
963
964    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
965        self.shutdown_channel_tx.subscribe()
966    }
967
968    pub fn current_epoch_for_testing(&self) -> EpochId {
969        self.state.current_epoch_for_testing()
970    }
971
972    pub fn db_checkpoint_path(&self) -> PathBuf {
973        self.config.db_checkpoint_path()
974    }
975
976    // Init reconfig process by starting to reject user certs
977    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
978        info!("close_epoch (current epoch = {})", epoch_store.epoch());
979        self.validator_components
980            .lock()
981            .await
982            .as_ref()
983            .ok_or_else(|| SuiError::from("Node is not a validator"))?
984            .consensus_adapter
985            .close_epoch(epoch_store);
986        Ok(())
987    }
988
989    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
990        self.state
991            .clear_override_protocol_upgrade_buffer_stake(epoch)
992    }
993
994    pub fn set_override_protocol_upgrade_buffer_stake(
995        &self,
996        epoch: EpochId,
997        buffer_stake_bps: u64,
998    ) -> SuiResult {
999        self.state
1000            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1001    }
1002
1003    // Testing-only API to start epoch close process.
1004    // For production code, please use the non-testing version.
1005    pub async fn close_epoch_for_testing(&self) -> SuiResult {
1006        let epoch_store = self.state.epoch_store_for_testing();
1007        self.close_epoch(&epoch_store).await
1008    }
1009
1010    fn start_state_snapshot(
1011        config: &NodeConfig,
1012        prometheus_registry: &Registry,
1013        checkpoint_store: Arc<CheckpointStore>,
1014        chain_identifier: ChainIdentifier,
1015    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1016        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1017            let snapshot_uploader = StateSnapshotUploader::new(
1018                &config.db_checkpoint_path(),
1019                &config.snapshot_path(),
1020                remote_store_config.clone(),
1021                60,
1022                prometheus_registry,
1023                checkpoint_store,
1024                chain_identifier,
1025                config.state_snapshot_write_config.archive_interval_epochs,
1026            )?;
1027            Ok(Some(snapshot_uploader.start()))
1028        } else {
1029            Ok(None)
1030        }
1031    }
1032
1033    fn start_db_checkpoint(
1034        config: &NodeConfig,
1035        prometheus_registry: &Registry,
1036        state_snapshot_enabled: bool,
1037    ) -> Result<(
1038        DBCheckpointConfig,
1039        Option<tokio::sync::broadcast::Sender<()>>,
1040    )> {
1041        let checkpoint_path = Some(
1042            config
1043                .db_checkpoint_config
1044                .checkpoint_path
1045                .clone()
1046                .unwrap_or_else(|| config.db_checkpoint_path()),
1047        );
1048        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1049            DBCheckpointConfig {
1050                checkpoint_path,
1051                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1052                    true
1053                } else {
1054                    config
1055                        .db_checkpoint_config
1056                        .perform_db_checkpoints_at_epoch_end
1057                },
1058                ..config.db_checkpoint_config.clone()
1059            }
1060        } else {
1061            config.db_checkpoint_config.clone()
1062        };
1063
1064        match (
1065            db_checkpoint_config.object_store_config.as_ref(),
1066            state_snapshot_enabled,
1067        ) {
1068            // If db checkpoint config object store not specified but
1069            // state snapshot object store is specified, create handler
1070            // anyway for marking db checkpoints as completed so that they
1071            // can be uploaded as state snapshots.
1072            (None, false) => Ok((db_checkpoint_config, None)),
1073            (_, _) => {
1074                let handler = DBCheckpointHandler::new(
1075                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1076                    db_checkpoint_config.object_store_config.as_ref(),
1077                    60,
1078                    db_checkpoint_config
1079                        .prune_and_compact_before_upload
1080                        .unwrap_or(true),
1081                    config.authority_store_pruning_config.clone(),
1082                    prometheus_registry,
1083                    state_snapshot_enabled,
1084                )?;
1085                Ok((
1086                    db_checkpoint_config,
1087                    Some(DBCheckpointHandler::start(handler)),
1088                ))
1089            }
1090        }
1091    }
1092
1093    fn create_p2p_network(
1094        config: &NodeConfig,
1095        state_sync_store: RocksDbStore,
1096        chain_identifier: ChainIdentifier,
1097        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1098        prometheus_registry: &Registry,
1099    ) -> Result<P2pComponents> {
1100        let (state_sync, state_sync_router) = state_sync::Builder::new()
1101            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1102            .store(state_sync_store)
1103            .archive_config(config.archive_reader_config())
1104            .with_metrics(prometheus_registry)
1105            .build();
1106
1107        let (discovery, discovery_server) = discovery::Builder::new()
1108            .config(config.p2p_config.clone())
1109            .build();
1110
1111        let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1112        let known_peers: HashMap<PeerId, String> = discovery_config
1113            .allowlisted_peers
1114            .clone()
1115            .into_iter()
1116            .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1117            .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1118                peer.peer_id
1119                    .map(|peer_id| (peer_id, "seed_peer".to_string()))
1120            }))
1121            .collect();
1122
1123        let (randomness, randomness_router) =
1124            randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1125                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1126                .with_metrics(prometheus_registry)
1127                .build();
1128
1129        let p2p_network = {
1130            let routes = anemo::Router::new()
1131                .add_rpc_service(discovery_server)
1132                .merge(state_sync_router);
1133            let routes = routes.merge(randomness_router);
1134
1135            let inbound_network_metrics =
1136                mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1137            let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1138                "sui",
1139                "outbound",
1140                prometheus_registry,
1141            );
1142
1143            let service = ServiceBuilder::new()
1144                .layer(
1145                    TraceLayer::new_for_server_errors()
1146                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1147                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1148                )
1149                .layer(CallbackLayer::new(
1150                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1151                        Arc::new(inbound_network_metrics),
1152                        config.p2p_config.excessive_message_size(),
1153                    ),
1154                ))
1155                .service(routes);
1156
1157            let outbound_layer = ServiceBuilder::new()
1158                .layer(
1159                    TraceLayer::new_for_client_and_server_errors()
1160                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1161                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1162                )
1163                .layer(CallbackLayer::new(
1164                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1165                        Arc::new(outbound_network_metrics),
1166                        config.p2p_config.excessive_message_size(),
1167                    ),
1168                ))
1169                .into_inner();
1170
1171            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1172            // Set the max_frame_size to be 1 GB to work around the issue of there being too many
1173            // staking events in the epoch change txn.
1174            anemo_config.max_frame_size = Some(1 << 30);
1175
1176            // Set a higher default value for socket send/receive buffers if not already
1177            // configured.
1178            let mut quic_config = anemo_config.quic.unwrap_or_default();
1179            if quic_config.socket_send_buffer_size.is_none() {
1180                quic_config.socket_send_buffer_size = Some(20 << 20);
1181            }
1182            if quic_config.socket_receive_buffer_size.is_none() {
1183                quic_config.socket_receive_buffer_size = Some(20 << 20);
1184            }
1185            quic_config.allow_failed_socket_buffer_size_setting = true;
1186
1187            // Set high-performance defaults for quinn transport.
1188            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1189            if quic_config.max_concurrent_bidi_streams.is_none() {
1190                quic_config.max_concurrent_bidi_streams = Some(500);
1191            }
1192            if quic_config.max_concurrent_uni_streams.is_none() {
1193                quic_config.max_concurrent_uni_streams = Some(500);
1194            }
1195            if quic_config.stream_receive_window.is_none() {
1196                quic_config.stream_receive_window = Some(100 << 20);
1197            }
1198            if quic_config.receive_window.is_none() {
1199                quic_config.receive_window = Some(200 << 20);
1200            }
1201            if quic_config.send_window.is_none() {
1202                quic_config.send_window = Some(200 << 20);
1203            }
1204            if quic_config.crypto_buffer_size.is_none() {
1205                quic_config.crypto_buffer_size = Some(1 << 20);
1206            }
1207            if quic_config.max_idle_timeout_ms.is_none() {
1208                quic_config.max_idle_timeout_ms = Some(30_000);
1209            }
1210            if quic_config.keep_alive_interval_ms.is_none() {
1211                quic_config.keep_alive_interval_ms = Some(5_000);
1212            }
1213            anemo_config.quic = Some(quic_config);
1214
1215            let server_name = format!("sui-{}", chain_identifier);
1216            let network = Network::bind(config.p2p_config.listen_address)
1217                .server_name(&server_name)
1218                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1219                .config(anemo_config)
1220                .outbound_request_layer(outbound_layer)
1221                .start(service)?;
1222            info!(
1223                server_name = server_name,
1224                "P2p network started on {}",
1225                network.local_addr()
1226            );
1227
1228            network
1229        };
1230
1231        let discovery_handle =
1232            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1233        let state_sync_handle = state_sync.start(p2p_network.clone());
1234        let randomness_handle = randomness.start(p2p_network.clone());
1235
1236        Ok(P2pComponents {
1237            p2p_network,
1238            known_peers,
1239            discovery_handle,
1240            state_sync_handle,
1241            randomness_handle,
1242        })
1243    }
1244
1245    async fn construct_validator_components(
1246        config: NodeConfig,
1247        state: Arc<AuthorityState>,
1248        committee: Arc<Committee>,
1249        epoch_store: Arc<AuthorityPerEpochStore>,
1250        checkpoint_store: Arc<CheckpointStore>,
1251        state_sync_handle: state_sync::Handle,
1252        randomness_handle: randomness::Handle,
1253        global_state_hasher: Weak<GlobalStateHasher>,
1254        backpressure_manager: Arc<BackpressureManager>,
1255        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1256        registry_service: &RegistryService,
1257        sui_node_metrics: Arc<SuiNodeMetrics>,
1258        checkpoint_metrics: Arc<CheckpointMetrics>,
1259    ) -> Result<ValidatorComponents> {
1260        let mut config_clone = config.clone();
1261        let consensus_config = config_clone
1262            .consensus_config
1263            .as_mut()
1264            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1265
1266        let client = Arc::new(UpdatableConsensusClient::new());
1267        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1268            &committee,
1269            consensus_config,
1270            state.name,
1271            connection_monitor_status.clone(),
1272            &registry_service.default_registry(),
1273            epoch_store.protocol_config().clone(),
1274            client.clone(),
1275            checkpoint_store.clone(),
1276        ));
1277        let consensus_manager = Arc::new(ConsensusManager::new(
1278            &config,
1279            consensus_config,
1280            registry_service,
1281            client,
1282        ));
1283
1284        // This only gets started up once, not on every epoch. (Make call to remove every epoch.)
1285        let consensus_store_pruner = ConsensusStorePruner::new(
1286            consensus_manager.get_storage_base_path(),
1287            consensus_config.db_retention_epochs(),
1288            consensus_config.db_pruner_period(),
1289            &registry_service.default_registry(),
1290        );
1291
1292        let sui_tx_validator_metrics =
1293            SuiTxValidatorMetrics::new(&registry_service.default_registry());
1294
1295        let validator_server_handle = Self::start_grpc_validator_service(
1296            &config,
1297            state.clone(),
1298            consensus_adapter.clone(),
1299            &registry_service.default_registry(),
1300        )
1301        .await?;
1302
1303        // Starts an overload monitor that monitors the execution of the authority.
1304        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1305        let validator_overload_monitor_handle = if config
1306            .authority_overload_config
1307            .max_load_shedding_percentage
1308            > 0
1309        {
1310            let authority_state = Arc::downgrade(&state);
1311            let overload_config = config.authority_overload_config.clone();
1312            fail_point!("starting_overload_monitor");
1313            Some(spawn_monitored_task!(overload_monitor(
1314                authority_state,
1315                overload_config,
1316            )))
1317        } else {
1318            None
1319        };
1320
1321        Self::start_epoch_specific_validator_components(
1322            &config,
1323            state.clone(),
1324            consensus_adapter,
1325            checkpoint_store,
1326            epoch_store,
1327            state_sync_handle,
1328            randomness_handle,
1329            consensus_manager,
1330            consensus_store_pruner,
1331            global_state_hasher,
1332            backpressure_manager,
1333            validator_server_handle,
1334            validator_overload_monitor_handle,
1335            checkpoint_metrics,
1336            sui_node_metrics,
1337            sui_tx_validator_metrics,
1338        )
1339        .await
1340    }
1341
1342    async fn start_epoch_specific_validator_components(
1343        config: &NodeConfig,
1344        state: Arc<AuthorityState>,
1345        consensus_adapter: Arc<ConsensusAdapter>,
1346        checkpoint_store: Arc<CheckpointStore>,
1347        epoch_store: Arc<AuthorityPerEpochStore>,
1348        state_sync_handle: state_sync::Handle,
1349        randomness_handle: randomness::Handle,
1350        consensus_manager: Arc<ConsensusManager>,
1351        consensus_store_pruner: ConsensusStorePruner,
1352        state_hasher: Weak<GlobalStateHasher>,
1353        backpressure_manager: Arc<BackpressureManager>,
1354        validator_server_handle: SpawnOnce,
1355        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1356        checkpoint_metrics: Arc<CheckpointMetrics>,
1357        sui_node_metrics: Arc<SuiNodeMetrics>,
1358        sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1359    ) -> Result<ValidatorComponents> {
1360        let checkpoint_service = Self::build_checkpoint_service(
1361            config,
1362            consensus_adapter.clone(),
1363            checkpoint_store.clone(),
1364            epoch_store.clone(),
1365            state.clone(),
1366            state_sync_handle,
1367            state_hasher,
1368            checkpoint_metrics.clone(),
1369        );
1370
1371        // create a new map that gets injected into both the consensus handler and the consensus adapter
1372        // the consensus handler will write values forwarded from consensus, and the consensus adapter
1373        // will read the values to make decisions about which validator submits a transaction to consensus
1374        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1375
1376        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1377
1378        if epoch_store.randomness_state_enabled() {
1379            let randomness_manager = RandomnessManager::try_new(
1380                Arc::downgrade(&epoch_store),
1381                Box::new(consensus_adapter.clone()),
1382                randomness_handle,
1383                config.protocol_key_pair(),
1384            )
1385            .await;
1386            if let Some(randomness_manager) = randomness_manager {
1387                epoch_store
1388                    .set_randomness_manager(randomness_manager)
1389                    .await?;
1390            }
1391        }
1392
1393        ExecutionTimeObserver::spawn(
1394            epoch_store.clone(),
1395            Box::new(consensus_adapter.clone()),
1396            config
1397                .execution_time_observer_config
1398                .clone()
1399                .unwrap_or_default(),
1400        );
1401
1402        let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1403            None,
1404            state.metrics.clone(),
1405        ));
1406
1407        let throughput_profiler = Arc::new(ConsensusThroughputProfiler::new(
1408            throughput_calculator.clone(),
1409            None,
1410            None,
1411            state.metrics.clone(),
1412            ThroughputProfileRanges::from_chain(epoch_store.get_chain_identifier()),
1413        ));
1414
1415        consensus_adapter.swap_throughput_profiler(throughput_profiler);
1416
1417        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1418            state.clone(),
1419            checkpoint_service.clone(),
1420            epoch_store.clone(),
1421            consensus_adapter.clone(),
1422            low_scoring_authorities,
1423            throughput_calculator,
1424            backpressure_manager,
1425        );
1426
1427        info!("Starting consensus manager asynchronously");
1428
1429        // Spawn consensus startup asynchronously to avoid blocking other components
1430        tokio::spawn({
1431            let config = config.clone();
1432            let epoch_store = epoch_store.clone();
1433            let sui_tx_validator = SuiTxValidator::new(
1434                state.clone(),
1435                epoch_store.clone(),
1436                checkpoint_service.clone(),
1437                sui_tx_validator_metrics.clone(),
1438            );
1439            let consensus_manager = consensus_manager.clone();
1440            async move {
1441                consensus_manager
1442                    .start(
1443                        &config,
1444                        epoch_store,
1445                        consensus_handler_initializer,
1446                        sui_tx_validator,
1447                    )
1448                    .await;
1449            }
1450        });
1451        let replay_waiter = consensus_manager.replay_waiter();
1452
1453        info!("Spawning checkpoint service");
1454        let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1455            None
1456        } else {
1457            Some(replay_waiter)
1458        };
1459        checkpoint_service
1460            .spawn(epoch_store.clone(), replay_waiter)
1461            .await;
1462
1463        if epoch_store.authenticator_state_enabled() {
1464            Self::start_jwk_updater(
1465                config,
1466                sui_node_metrics,
1467                state.name,
1468                epoch_store.clone(),
1469                consensus_adapter.clone(),
1470            );
1471        }
1472
1473        Ok(ValidatorComponents {
1474            validator_server_handle,
1475            validator_overload_monitor_handle,
1476            consensus_manager,
1477            consensus_store_pruner,
1478            consensus_adapter,
1479            checkpoint_metrics,
1480            sui_tx_validator_metrics,
1481        })
1482    }
1483
1484    fn build_checkpoint_service(
1485        config: &NodeConfig,
1486        consensus_adapter: Arc<ConsensusAdapter>,
1487        checkpoint_store: Arc<CheckpointStore>,
1488        epoch_store: Arc<AuthorityPerEpochStore>,
1489        state: Arc<AuthorityState>,
1490        state_sync_handle: state_sync::Handle,
1491        state_hasher: Weak<GlobalStateHasher>,
1492        checkpoint_metrics: Arc<CheckpointMetrics>,
1493    ) -> Arc<CheckpointService> {
1494        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1495        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1496
1497        debug!(
1498            "Starting checkpoint service with epoch start timestamp {}
1499            and epoch duration {}",
1500            epoch_start_timestamp_ms, epoch_duration_ms
1501        );
1502
1503        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1504            sender: consensus_adapter,
1505            signer: state.secret.clone(),
1506            authority: config.protocol_public_key(),
1507            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1508                .checked_add(epoch_duration_ms)
1509                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1510            metrics: checkpoint_metrics.clone(),
1511        });
1512
1513        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1514        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1515        let max_checkpoint_size_bytes =
1516            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1517
1518        CheckpointService::build(
1519            state.clone(),
1520            checkpoint_store,
1521            epoch_store,
1522            state.get_transaction_cache_reader().clone(),
1523            state_hasher,
1524            checkpoint_output,
1525            Box::new(certified_checkpoint_output),
1526            checkpoint_metrics,
1527            max_tx_per_checkpoint,
1528            max_checkpoint_size_bytes,
1529        )
1530    }
1531
1532    fn construct_consensus_adapter(
1533        committee: &Committee,
1534        consensus_config: &ConsensusConfig,
1535        authority: AuthorityName,
1536        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1537        prometheus_registry: &Registry,
1538        protocol_config: ProtocolConfig,
1539        consensus_client: Arc<dyn ConsensusClient>,
1540        checkpoint_store: Arc<CheckpointStore>,
1541    ) -> ConsensusAdapter {
1542        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1543        // The consensus adapter allows the authority to send user certificates through consensus.
1544
1545        ConsensusAdapter::new(
1546            consensus_client,
1547            checkpoint_store,
1548            authority,
1549            connection_monitor_status,
1550            consensus_config.max_pending_transactions(),
1551            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1552            consensus_config.max_submit_position,
1553            consensus_config.submit_delay_step_override(),
1554            ca_metrics,
1555            protocol_config,
1556        )
1557    }
1558
1559    async fn start_grpc_validator_service(
1560        config: &NodeConfig,
1561        state: Arc<AuthorityState>,
1562        consensus_adapter: Arc<ConsensusAdapter>,
1563        prometheus_registry: &Registry,
1564    ) -> Result<SpawnOnce> {
1565        let validator_service = ValidatorService::new(
1566            state.clone(),
1567            consensus_adapter,
1568            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1569            config.policy_config.clone().map(|p| p.client_id_source),
1570        );
1571
1572        let mut server_conf = mysten_network::config::Config::new();
1573        server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1574        server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1575        server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1576        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1577        server_conf.load_shed = config.grpc_load_shed;
1578        let mut server_builder =
1579            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1580
1581        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1582
1583        let tls_config = sui_tls::create_rustls_server_config(
1584            config.network_key_pair().copy().private(),
1585            SUI_TLS_SERVER_NAME.to_string(),
1586        );
1587
1588        let network_address = config.network_address().clone();
1589
1590        let (ready_tx, ready_rx) = oneshot::channel();
1591
1592        Ok(SpawnOnce::new(ready_rx, async move {
1593            let server = server_builder
1594                .bind(&network_address, Some(tls_config))
1595                .await
1596                .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1597            let local_addr = server.local_addr();
1598            info!("Listening to traffic on {local_addr}");
1599            ready_tx.send(()).unwrap();
1600            if let Err(err) = server.serve().await {
1601                info!("Server stopped: {err}");
1602            }
1603            info!("Server stopped");
1604        }))
1605    }
1606
1607    pub fn state(&self) -> Arc<AuthorityState> {
1608        self.state.clone()
1609    }
1610
1611    // Only used for testing because of how epoch store is loaded.
1612    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1613        self.state.reference_gas_price_for_testing()
1614    }
1615
1616    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1617        self.state.committee_store().clone()
1618    }
1619
1620    /*
1621    pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1622        self.state.db()
1623    }
1624    */
1625
1626    /// Clone an AuthorityAggregator currently used in this node, if the node is a fullnode.
1627    /// After reconfig, Transaction Driver builds a new AuthorityAggregator. The caller
1628    /// of this function will mostly likely want to call this again
1629    /// to get a fresh one.
1630    pub fn clone_authority_aggregator(
1631        &self,
1632    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1633        self.transaction_orchestrator
1634            .as_ref()
1635            .map(|to| to.clone_authority_aggregator())
1636    }
1637
1638    pub fn transaction_orchestrator(
1639        &self,
1640    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1641        self.transaction_orchestrator.clone()
1642    }
1643
1644    /// This function awaits the completion of checkpoint execution of the current epoch,
1645    /// after which it initiates reconfiguration of the entire system.
1646    pub async fn monitor_reconfiguration(
1647        self: Arc<Self>,
1648        mut epoch_store: Arc<AuthorityPerEpochStore>,
1649    ) -> Result<()> {
1650        let checkpoint_executor_metrics =
1651            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1652
1653        loop {
1654            let mut hasher_guard = self.global_state_hasher.lock().await;
1655            let hasher = hasher_guard.take().unwrap();
1656            info!(
1657                "Creating checkpoint executor for epoch {}",
1658                epoch_store.epoch()
1659            );
1660            let checkpoint_executor = CheckpointExecutor::new(
1661                epoch_store.clone(),
1662                self.checkpoint_store.clone(),
1663                self.state.clone(),
1664                hasher.clone(),
1665                self.backpressure_manager.clone(),
1666                self.config.checkpoint_executor_config.clone(),
1667                checkpoint_executor_metrics.clone(),
1668                self.subscription_service_checkpoint_sender.clone(),
1669            );
1670
1671            let run_with_range = self.config.run_with_range;
1672
1673            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1674
1675            // Update the current protocol version metric.
1676            self.metrics
1677                .current_protocol_version
1678                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1679
1680            // Advertise capabilities to committee, if we are a validator.
1681            if let Some(components) = &*self.validator_components.lock().await {
1682                // TODO: without this sleep, the consensus message is not delivered reliably.
1683                tokio::time::sleep(Duration::from_millis(1)).await;
1684
1685                let config = cur_epoch_store.protocol_config();
1686                let mut supported_protocol_versions = self
1687                    .config
1688                    .supported_protocol_versions
1689                    .expect("Supported versions should be populated")
1690                    // no need to send digests of versions less than the current version
1691                    .truncate_below(config.version);
1692
1693                while supported_protocol_versions.max > config.version {
1694                    let proposed_protocol_config = ProtocolConfig::get_for_version(
1695                        supported_protocol_versions.max,
1696                        cur_epoch_store.get_chain(),
1697                    );
1698
1699                    if proposed_protocol_config.enable_accumulators()
1700                        && !epoch_store.accumulator_root_exists()
1701                    {
1702                        error!(
1703                            "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1704                            supported_protocol_versions.max
1705                        );
1706                        supported_protocol_versions.max = supported_protocol_versions.max.prev();
1707                    } else {
1708                        break;
1709                    }
1710                }
1711
1712                let binary_config = config.binary_config(None);
1713                let transaction = ConsensusTransaction::new_capability_notification_v2(
1714                    AuthorityCapabilitiesV2::new(
1715                        self.state.name,
1716                        cur_epoch_store.get_chain_identifier().chain(),
1717                        supported_protocol_versions,
1718                        self.state
1719                            .get_available_system_packages(&binary_config)
1720                            .await,
1721                    ),
1722                );
1723                info!(?transaction, "submitting capabilities to consensus");
1724                components.consensus_adapter.submit(
1725                    transaction,
1726                    None,
1727                    &cur_epoch_store,
1728                    None,
1729                    None,
1730                )?;
1731            }
1732
1733            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1734
1735            if stop_condition == StopReason::RunWithRangeCondition {
1736                SuiNode::shutdown(&self).await;
1737                self.shutdown_channel_tx
1738                    .send(run_with_range)
1739                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1740                return Ok(());
1741            }
1742
1743            // Safe to call because we are in the middle of reconfiguration.
1744            let latest_system_state = self
1745                .state
1746                .get_object_cache_reader()
1747                .get_sui_system_state_object_unsafe()
1748                .expect("Read Sui System State object cannot fail");
1749
1750            #[cfg(msim)]
1751            if !self
1752                .sim_state
1753                .sim_safe_mode_expected
1754                .load(Ordering::Relaxed)
1755            {
1756                debug_assert!(!latest_system_state.safe_mode());
1757            }
1758
1759            #[cfg(not(msim))]
1760            debug_assert!(!latest_system_state.safe_mode());
1761
1762            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1763                && self.state.is_fullnode(&cur_epoch_store)
1764            {
1765                warn!(
1766                    "Failed to send end of epoch notification to subscriber: {:?}",
1767                    err
1768                );
1769            }
1770
1771            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1772            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1773
1774            self.auth_agg.store(Arc::new(
1775                self.auth_agg
1776                    .load()
1777                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1778            ));
1779
1780            let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1781            let next_epoch = next_epoch_committee.epoch();
1782            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1783
1784            info!(
1785                next_epoch,
1786                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1787            );
1788
1789            fail_point_async!("reconfig_delay");
1790
1791            // We save the connection monitor status map regardless of validator / fullnode status
1792            // so that we don't need to restart the connection monitor every epoch.
1793            // Update the mappings that will be used by the consensus adapter if it exists or is
1794            // about to be created.
1795            let authority_names_to_peer_ids =
1796                new_epoch_start_state.get_authority_names_to_peer_ids();
1797            self.connection_monitor_status
1798                .update_mapping_for_epoch(authority_names_to_peer_ids);
1799
1800            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1801
1802            update_peer_addresses(&self.config, &self.endpoint_manager, &new_epoch_start_state);
1803
1804            let mut validator_components_lock_guard = self.validator_components.lock().await;
1805
1806            // The following code handles 4 different cases, depending on whether the node
1807            // was a validator in the previous epoch, and whether the node is a validator
1808            // in the new epoch.
1809            let new_epoch_store = self
1810                .reconfigure_state(
1811                    &self.state,
1812                    &cur_epoch_store,
1813                    next_epoch_committee.clone(),
1814                    new_epoch_start_state,
1815                    hasher.clone(),
1816                )
1817                .await;
1818
1819            let new_validator_components = if let Some(ValidatorComponents {
1820                validator_server_handle,
1821                validator_overload_monitor_handle,
1822                consensus_manager,
1823                consensus_store_pruner,
1824                consensus_adapter,
1825                checkpoint_metrics,
1826                sui_tx_validator_metrics,
1827            }) = validator_components_lock_guard.take()
1828            {
1829                info!("Reconfiguring the validator.");
1830
1831                consensus_manager.shutdown().await;
1832                info!("Consensus has shut down.");
1833
1834                info!("Epoch store finished reconfiguration.");
1835
1836                // No other components should be holding a strong reference to state hasher
1837                // at this point. Confirm here before we swap in the new hasher.
1838                let global_state_hasher_metrics = Arc::into_inner(hasher)
1839                    .expect("Object state hasher should have no other references at this point")
1840                    .metrics();
1841                let new_hasher = Arc::new(GlobalStateHasher::new(
1842                    self.state.get_global_state_hash_store().clone(),
1843                    global_state_hasher_metrics,
1844                ));
1845                let weak_hasher = Arc::downgrade(&new_hasher);
1846                *hasher_guard = Some(new_hasher);
1847
1848                consensus_store_pruner.prune(next_epoch).await;
1849
1850                if self.state.is_validator(&new_epoch_store) {
1851                    // Only restart consensus if this node is still a validator in the new epoch.
1852                    Some(
1853                        Self::start_epoch_specific_validator_components(
1854                            &self.config,
1855                            self.state.clone(),
1856                            consensus_adapter,
1857                            self.checkpoint_store.clone(),
1858                            new_epoch_store.clone(),
1859                            self.state_sync_handle.clone(),
1860                            self.randomness_handle.clone(),
1861                            consensus_manager,
1862                            consensus_store_pruner,
1863                            weak_hasher,
1864                            self.backpressure_manager.clone(),
1865                            validator_server_handle,
1866                            validator_overload_monitor_handle,
1867                            checkpoint_metrics,
1868                            self.metrics.clone(),
1869                            sui_tx_validator_metrics,
1870                        )
1871                        .await?,
1872                    )
1873                } else {
1874                    info!("This node is no longer a validator after reconfiguration");
1875                    None
1876                }
1877            } else {
1878                // No other components should be holding a strong reference to state hasher
1879                // at this point. Confirm here before we swap in the new hasher.
1880                let global_state_hasher_metrics = Arc::into_inner(hasher)
1881                    .expect("Object state hasher should have no other references at this point")
1882                    .metrics();
1883                let new_hasher = Arc::new(GlobalStateHasher::new(
1884                    self.state.get_global_state_hash_store().clone(),
1885                    global_state_hasher_metrics,
1886                ));
1887                let weak_hasher = Arc::downgrade(&new_hasher);
1888                *hasher_guard = Some(new_hasher);
1889
1890                if self.state.is_validator(&new_epoch_store) {
1891                    info!("Promoting the node from fullnode to validator, starting grpc server");
1892
1893                    let mut components = Self::construct_validator_components(
1894                        self.config.clone(),
1895                        self.state.clone(),
1896                        Arc::new(next_epoch_committee.clone()),
1897                        new_epoch_store.clone(),
1898                        self.checkpoint_store.clone(),
1899                        self.state_sync_handle.clone(),
1900                        self.randomness_handle.clone(),
1901                        weak_hasher,
1902                        self.backpressure_manager.clone(),
1903                        self.connection_monitor_status.clone(),
1904                        &self.registry_service,
1905                        self.metrics.clone(),
1906                        self.checkpoint_metrics.clone(),
1907                    )
1908                    .await?;
1909
1910                    components.validator_server_handle =
1911                        components.validator_server_handle.start().await;
1912
1913                    // Set the consensus address updater as the full node got promoted now to a validator.
1914                    self.endpoint_manager
1915                        .set_consensus_address_updater(components.consensus_manager.clone());
1916
1917                    Some(components)
1918                } else {
1919                    None
1920                }
1921            };
1922            *validator_components_lock_guard = new_validator_components;
1923
1924            // Force releasing current epoch store DB handle, because the
1925            // Arc<AuthorityPerEpochStore> may linger.
1926            cur_epoch_store.release_db_handles();
1927
1928            if cfg!(msim)
1929                && !matches!(
1930                    self.config
1931                        .authority_store_pruning_config
1932                        .num_epochs_to_retain_for_checkpoints(),
1933                    None | Some(u64::MAX) | Some(0)
1934                )
1935            {
1936                self.state
1937                    .prune_checkpoints_for_eligible_epochs_for_testing(
1938                        self.config.clone(),
1939                        sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1940                    )
1941                    .await?;
1942            }
1943
1944            epoch_store = new_epoch_store;
1945            info!("Reconfiguration finished");
1946        }
1947    }
1948
1949    async fn shutdown(&self) {
1950        if let Some(validator_components) = &*self.validator_components.lock().await {
1951            validator_components.consensus_manager.shutdown().await;
1952        }
1953    }
1954
1955    async fn reconfigure_state(
1956        &self,
1957        state: &Arc<AuthorityState>,
1958        cur_epoch_store: &AuthorityPerEpochStore,
1959        next_epoch_committee: Committee,
1960        next_epoch_start_system_state: EpochStartSystemState,
1961        global_state_hasher: Arc<GlobalStateHasher>,
1962    ) -> Arc<AuthorityPerEpochStore> {
1963        let next_epoch = next_epoch_committee.epoch();
1964
1965        let last_checkpoint = self
1966            .checkpoint_store
1967            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1968            .expect("Error loading last checkpoint for current epoch")
1969            .expect("Could not load last checkpoint for current epoch");
1970
1971        let last_checkpoint_seq = *last_checkpoint.sequence_number();
1972
1973        assert_eq!(
1974            Some(last_checkpoint_seq),
1975            self.checkpoint_store
1976                .get_highest_executed_checkpoint_seq_number()
1977                .expect("Error loading highest executed checkpoint sequence number")
1978        );
1979
1980        let epoch_start_configuration = EpochStartConfiguration::new(
1981            next_epoch_start_system_state,
1982            *last_checkpoint.digest(),
1983            state.get_object_store().as_ref(),
1984            EpochFlag::default_flags_for_new_epoch(&state.config),
1985        )
1986        .expect("EpochStartConfiguration construction cannot fail");
1987
1988        let new_epoch_store = self
1989            .state
1990            .reconfigure(
1991                cur_epoch_store,
1992                self.config.supported_protocol_versions.unwrap(),
1993                next_epoch_committee,
1994                epoch_start_configuration,
1995                global_state_hasher,
1996                &self.config.expensive_safety_check_config,
1997                last_checkpoint_seq,
1998            )
1999            .await
2000            .expect("Reconfigure authority state cannot fail");
2001        info!(next_epoch, "Node State has been reconfigured");
2002        assert_eq!(next_epoch, new_epoch_store.epoch());
2003        self.state.get_reconfig_api().update_epoch_flags_metrics(
2004            cur_epoch_store.epoch_start_config().flags(),
2005            new_epoch_store.epoch_start_config().flags(),
2006        );
2007
2008        new_epoch_store
2009    }
2010
2011    pub fn get_config(&self) -> &NodeConfig {
2012        &self.config
2013    }
2014
2015    pub fn randomness_handle(&self) -> randomness::Handle {
2016        self.randomness_handle.clone()
2017    }
2018
2019    pub fn endpoint_manager(&self) -> &EndpointManager {
2020        &self.endpoint_manager
2021    }
2022
2023    /// Get a short prefix of a digest for metric labels
2024    fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2025        let digest_str = digest.to_string();
2026        if digest_str.len() >= 8 {
2027            digest_str[0..8].to_string()
2028        } else {
2029            digest_str
2030        }
2031    }
2032
2033    /// Check for previously detected forks and handle them appropriately.
2034    /// For validators with fork recovery config, clear the fork if it matches the recovery config.
2035    /// For all other cases, block node startup if a fork is detected.
2036    async fn check_and_recover_forks(
2037        checkpoint_store: &CheckpointStore,
2038        checkpoint_metrics: &CheckpointMetrics,
2039        is_validator: bool,
2040        fork_recovery: Option<&ForkRecoveryConfig>,
2041    ) -> Result<()> {
2042        // Fork detection and recovery is only relevant for validators
2043        // Fullnodes should sync from validators and don't need fork checking
2044        if !is_validator {
2045            return Ok(());
2046        }
2047
2048        // Try to recover from forks if recovery config is provided
2049        if let Some(recovery) = fork_recovery {
2050            Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2051            Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2052        }
2053
2054        if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2055            .get_checkpoint_fork_detected()
2056            .map_err(|e| {
2057                error!("Failed to check for checkpoint fork: {:?}", e);
2058                e
2059            })?
2060        {
2061            Self::handle_checkpoint_fork(
2062                checkpoint_seq,
2063                checkpoint_digest,
2064                checkpoint_metrics,
2065                fork_recovery,
2066            )
2067            .await?;
2068        }
2069        if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2070            .get_transaction_fork_detected()
2071            .map_err(|e| {
2072                error!("Failed to check for transaction fork: {:?}", e);
2073                e
2074            })?
2075        {
2076            Self::handle_transaction_fork(
2077                tx_digest,
2078                expected_effects,
2079                actual_effects,
2080                checkpoint_metrics,
2081                fork_recovery,
2082            )
2083            .await?;
2084        }
2085
2086        Ok(())
2087    }
2088
2089    fn try_recover_checkpoint_fork(
2090        checkpoint_store: &CheckpointStore,
2091        recovery: &ForkRecoveryConfig,
2092    ) -> Result<()> {
2093        // If configured overrides include a checkpoint whose locally computed digest mismatches,
2094        // clear locally computed checkpoints from that sequence (inclusive).
2095        for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2096            let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2097                anyhow::bail!(
2098                    "Invalid checkpoint digest override for seq {}: {}",
2099                    seq,
2100                    expected_digest_str
2101                );
2102            };
2103
2104            if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2105                let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2106                if local_digest != expected_digest {
2107                    info!(
2108                        seq,
2109                        local = %Self::get_digest_prefix(local_digest),
2110                        expected = %Self::get_digest_prefix(expected_digest),
2111                        "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2112                        seq
2113                    );
2114                    checkpoint_store
2115                        .clear_locally_computed_checkpoints_from(*seq)
2116                        .context(
2117                            "Failed to clear locally computed checkpoints from override seq",
2118                        )?;
2119                }
2120            }
2121        }
2122
2123        if let Some((checkpoint_seq, checkpoint_digest)) =
2124            checkpoint_store.get_checkpoint_fork_detected()?
2125            && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2126        {
2127            info!(
2128                "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2129                checkpoint_seq, checkpoint_digest
2130            );
2131            checkpoint_store
2132                .clear_checkpoint_fork_detected()
2133                .expect("Failed to clear checkpoint fork detected marker");
2134        }
2135        Ok(())
2136    }
2137
2138    fn try_recover_transaction_fork(
2139        checkpoint_store: &CheckpointStore,
2140        recovery: &ForkRecoveryConfig,
2141    ) -> Result<()> {
2142        if recovery.transaction_overrides.is_empty() {
2143            return Ok(());
2144        }
2145
2146        if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2147            && recovery
2148                .transaction_overrides
2149                .contains_key(&tx_digest.to_string())
2150        {
2151            info!(
2152                "Fork recovery enabled: clearing transaction fork for tx {:?}",
2153                tx_digest
2154            );
2155            checkpoint_store
2156                .clear_transaction_fork_detected()
2157                .expect("Failed to clear transaction fork detected marker");
2158        }
2159        Ok(())
2160    }
2161
2162    fn get_current_timestamp() -> u64 {
2163        std::time::SystemTime::now()
2164            .duration_since(std::time::SystemTime::UNIX_EPOCH)
2165            .unwrap()
2166            .as_secs()
2167    }
2168
2169    async fn handle_checkpoint_fork(
2170        checkpoint_seq: u64,
2171        checkpoint_digest: CheckpointDigest,
2172        checkpoint_metrics: &CheckpointMetrics,
2173        fork_recovery: Option<&ForkRecoveryConfig>,
2174    ) -> Result<()> {
2175        checkpoint_metrics
2176            .checkpoint_fork_crash_mode
2177            .with_label_values(&[
2178                &checkpoint_seq.to_string(),
2179                &Self::get_digest_prefix(checkpoint_digest),
2180                &Self::get_current_timestamp().to_string(),
2181            ])
2182            .set(1);
2183
2184        let behavior = fork_recovery
2185            .map(|fr| fr.fork_crash_behavior)
2186            .unwrap_or_default();
2187
2188        match behavior {
2189            ForkCrashBehavior::AwaitForkRecovery => {
2190                error!(
2191                    checkpoint_seq = checkpoint_seq,
2192                    checkpoint_digest = ?checkpoint_digest,
2193                    "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2194                );
2195                futures::future::pending::<()>().await;
2196                unreachable!("pending() should never return");
2197            }
2198            ForkCrashBehavior::ReturnError => {
2199                error!(
2200                    checkpoint_seq = checkpoint_seq,
2201                    checkpoint_digest = ?checkpoint_digest,
2202                    "Checkpoint fork detected! Returning error."
2203                );
2204                Err(anyhow::anyhow!(
2205                    "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2206                    checkpoint_seq,
2207                    checkpoint_digest
2208                ))
2209            }
2210        }
2211    }
2212
2213    async fn handle_transaction_fork(
2214        tx_digest: TransactionDigest,
2215        expected_effects_digest: TransactionEffectsDigest,
2216        actual_effects_digest: TransactionEffectsDigest,
2217        checkpoint_metrics: &CheckpointMetrics,
2218        fork_recovery: Option<&ForkRecoveryConfig>,
2219    ) -> Result<()> {
2220        checkpoint_metrics
2221            .transaction_fork_crash_mode
2222            .with_label_values(&[
2223                &Self::get_digest_prefix(tx_digest),
2224                &Self::get_digest_prefix(expected_effects_digest),
2225                &Self::get_digest_prefix(actual_effects_digest),
2226                &Self::get_current_timestamp().to_string(),
2227            ])
2228            .set(1);
2229
2230        let behavior = fork_recovery
2231            .map(|fr| fr.fork_crash_behavior)
2232            .unwrap_or_default();
2233
2234        match behavior {
2235            ForkCrashBehavior::AwaitForkRecovery => {
2236                error!(
2237                    tx_digest = ?tx_digest,
2238                    expected_effects_digest = ?expected_effects_digest,
2239                    actual_effects_digest = ?actual_effects_digest,
2240                    "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2241                );
2242                futures::future::pending::<()>().await;
2243                unreachable!("pending() should never return");
2244            }
2245            ForkCrashBehavior::ReturnError => {
2246                error!(
2247                    tx_digest = ?tx_digest,
2248                    expected_effects_digest = ?expected_effects_digest,
2249                    actual_effects_digest = ?actual_effects_digest,
2250                    "Transaction fork detected! Returning error."
2251                );
2252                Err(anyhow::anyhow!(
2253                    "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2254                    tx_digest,
2255                    expected_effects_digest,
2256                    actual_effects_digest
2257                ))
2258            }
2259        }
2260    }
2261}
2262
2263#[cfg(not(msim))]
2264impl SuiNode {
2265    async fn fetch_jwks(
2266        _authority: AuthorityName,
2267        provider: &OIDCProvider,
2268    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2269        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2270        use sui_types::error::SuiErrorKind;
2271        let client = reqwest::Client::new();
2272        fetch_jwks(provider, &client, true)
2273            .await
2274            .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2275    }
2276}
2277
2278#[cfg(msim)]
2279impl SuiNode {
2280    pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2281        self.sim_state.sim_node.id()
2282    }
2283
2284    pub fn set_safe_mode_expected(&self, new_value: bool) {
2285        info!("Setting safe mode expected to {}", new_value);
2286        self.sim_state
2287            .sim_safe_mode_expected
2288            .store(new_value, Ordering::Relaxed);
2289    }
2290
2291    #[allow(unused_variables)]
2292    async fn fetch_jwks(
2293        authority: AuthorityName,
2294        provider: &OIDCProvider,
2295    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2296        get_jwk_injector()(authority, provider)
2297    }
2298}
2299
2300enum SpawnOnce {
2301    // Mutex is only needed to make SpawnOnce Send
2302    Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2303    #[allow(unused)]
2304    Started(JoinHandle<()>),
2305}
2306
2307impl SpawnOnce {
2308    pub fn new(
2309        ready_rx: oneshot::Receiver<()>,
2310        future: impl Future<Output = ()> + Send + 'static,
2311    ) -> Self {
2312        Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2313    }
2314
2315    pub async fn start(self) -> Self {
2316        match self {
2317            Self::Unstarted(ready_rx, future) => {
2318                let future = future.into_inner();
2319                let handle = tokio::spawn(future);
2320                ready_rx.await.unwrap();
2321                Self::Started(handle)
2322            }
2323            Self::Started(_) => self,
2324        }
2325    }
2326}
2327
2328/// Updates trusted peer addresses in the p2p network.
2329fn update_peer_addresses(
2330    config: &NodeConfig,
2331    endpoint_manager: &EndpointManager,
2332    epoch_start_state: &EpochStartSystemState,
2333) {
2334    for (peer_id, address) in
2335        epoch_start_state.get_validator_as_p2p_peers(config.protocol_public_key())
2336    {
2337        endpoint_manager
2338            .update_endpoint(
2339                EndpointId::P2p(peer_id),
2340                AddressSource::Committee,
2341                vec![address],
2342            )
2343            .expect("Updating peer addresses should not fail");
2344    }
2345}
2346
2347fn build_kv_store(
2348    state: &Arc<AuthorityState>,
2349    config: &NodeConfig,
2350    registry: &Registry,
2351) -> Result<Arc<TransactionKeyValueStore>> {
2352    let metrics = KeyValueStoreMetrics::new(registry);
2353    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2354
2355    let base_url = &config.transaction_kv_store_read_config.base_url;
2356
2357    if base_url.is_empty() {
2358        info!("no http kv store url provided, using local db only");
2359        return Ok(Arc::new(db_store));
2360    }
2361
2362    let base_url: url::Url = base_url.parse().tap_err(|e| {
2363        error!(
2364            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2365            base_url, e
2366        )
2367    })?;
2368
2369    let network_str = match state.get_chain_identifier().chain() {
2370        Chain::Mainnet => "/mainnet",
2371        _ => {
2372            info!("using local db only for kv store");
2373            return Ok(Arc::new(db_store));
2374        }
2375    };
2376
2377    let base_url = base_url.join(network_str)?.to_string();
2378    let http_store = HttpKVStore::new_kv(
2379        &base_url,
2380        config.transaction_kv_store_read_config.cache_size,
2381        metrics.clone(),
2382    )?;
2383    info!("using local key-value store with fallback to http key-value store");
2384    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2385        db_store,
2386        http_store,
2387        metrics,
2388        "json_rpc_fallback",
2389    )))
2390}
2391
2392async fn build_http_servers(
2393    state: Arc<AuthorityState>,
2394    store: RocksDbStore,
2395    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2396    config: &NodeConfig,
2397    prometheus_registry: &Registry,
2398    server_version: ServerVersion,
2399) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2400    // Validators do not expose these APIs
2401    if config.consensus_config().is_some() {
2402        return Ok((HttpServers::default(), None));
2403    }
2404
2405    let mut router = axum::Router::new();
2406
2407    let json_rpc_router = {
2408        let traffic_controller = state.traffic_controller.clone();
2409        let mut server = JsonRpcServerBuilder::new(
2410            env!("CARGO_PKG_VERSION"),
2411            prometheus_registry,
2412            traffic_controller,
2413            config.policy_config.clone(),
2414        );
2415
2416        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2417
2418        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2419        server.register_module(ReadApi::new(
2420            state.clone(),
2421            kv_store.clone(),
2422            metrics.clone(),
2423        ))?;
2424        server.register_module(CoinReadApi::new(
2425            state.clone(),
2426            kv_store.clone(),
2427            metrics.clone(),
2428        ))?;
2429
2430        // if run_with_range is enabled we want to prevent any transactions
2431        // run_with_range = None is normal operating conditions
2432        if config.run_with_range.is_none() {
2433            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2434        }
2435        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2436        server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2437
2438        if let Some(transaction_orchestrator) = transaction_orchestrator {
2439            server.register_module(TransactionExecutionApi::new(
2440                state.clone(),
2441                transaction_orchestrator.clone(),
2442                metrics.clone(),
2443            ))?;
2444        }
2445
2446        let name_service_config =
2447            if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2448                config.name_service_package_address,
2449                config.name_service_registry_id,
2450                config.name_service_reverse_registry_id,
2451            ) {
2452                sui_name_service::NameServiceConfig::new(
2453                    package_address,
2454                    registry_id,
2455                    reverse_registry_id,
2456                )
2457            } else {
2458                match state.get_chain_identifier().chain() {
2459                    Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2460                    Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2461                    Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2462                }
2463            };
2464
2465        server.register_module(IndexerApi::new(
2466            state.clone(),
2467            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2468            kv_store,
2469            name_service_config,
2470            metrics,
2471            config.indexer_max_subscriptions,
2472        ))?;
2473        server.register_module(MoveUtils::new(state.clone()))?;
2474
2475        let server_type = config.jsonrpc_server_type();
2476
2477        server.to_router(server_type).await?
2478    };
2479
2480    router = router.merge(json_rpc_router);
2481
2482    let (subscription_service_checkpoint_sender, subscription_service_handle) =
2483        SubscriptionService::build(prometheus_registry);
2484    let rpc_router = {
2485        let mut rpc_service =
2486            sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2487        rpc_service.with_server_version(server_version);
2488
2489        if let Some(config) = config.rpc.clone() {
2490            rpc_service.with_config(config);
2491        }
2492
2493        rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2494        rpc_service.with_subscription_service(subscription_service_handle);
2495
2496        if let Some(transaction_orchestrator) = transaction_orchestrator {
2497            rpc_service.with_executor(transaction_orchestrator.clone())
2498        }
2499
2500        rpc_service.into_router().await
2501    };
2502
2503    let layers = ServiceBuilder::new()
2504        .map_request(|mut request: axum::http::Request<_>| {
2505            if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2506                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2507                request.extensions_mut().insert(axum_connect_info);
2508            }
2509            request
2510        })
2511        .layer(axum::middleware::from_fn(server_timing_middleware))
2512        // Setup a permissive CORS policy
2513        .layer(
2514            tower_http::cors::CorsLayer::new()
2515                .allow_methods([http::Method::GET, http::Method::POST])
2516                .allow_origin(tower_http::cors::Any)
2517                .allow_headers(tower_http::cors::Any)
2518                .expose_headers(tower_http::cors::Any),
2519        );
2520
2521    router = router.merge(rpc_router).layer(layers);
2522
2523    let https = if let Some((tls_config, https_address)) = config
2524        .rpc()
2525        .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2526    {
2527        let https = sui_http::Builder::new()
2528            .tls_single_cert(tls_config.cert(), tls_config.key())
2529            .and_then(|builder| builder.serve(https_address, router.clone()))
2530            .map_err(|e| anyhow::anyhow!(e))?;
2531
2532        info!(
2533            https_address =? https.local_addr(),
2534            "HTTPS rpc server listening on {}",
2535            https.local_addr()
2536        );
2537
2538        Some(https)
2539    } else {
2540        None
2541    };
2542
2543    let http = sui_http::Builder::new()
2544        .serve(&config.json_rpc_address, router)
2545        .map_err(|e| anyhow::anyhow!(e))?;
2546
2547    info!(
2548        http_address =? http.local_addr(),
2549        "HTTP rpc server listening on {}",
2550        http.local_addr()
2551    );
2552
2553    Ok((
2554        HttpServers {
2555            http: Some(http),
2556            https,
2557        },
2558        Some(subscription_service_checkpoint_sender),
2559    ))
2560}
2561
2562#[cfg(not(test))]
2563fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2564    protocol_config.max_transactions_per_checkpoint() as usize
2565}
2566
2567#[cfg(test)]
2568fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2569    2
2570}
2571
2572#[derive(Default)]
2573struct HttpServers {
2574    #[allow(unused)]
2575    http: Option<sui_http::ServerHandle>,
2576    #[allow(unused)]
2577    https: Option<sui_http::ServerHandle>,
2578}
2579
2580#[cfg(test)]
2581mod tests {
2582    use super::*;
2583    use prometheus::Registry;
2584    use std::collections::BTreeMap;
2585    use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2586    use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2587    use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2588
2589    #[tokio::test]
2590    async fn test_fork_error_and_recovery_both_paths() {
2591        let checkpoint_store = CheckpointStore::new_for_tests();
2592        let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2593
2594        // ---------- Checkpoint fork path ----------
2595        let seq_num = 42;
2596        let digest = CheckpointDigest::random();
2597        checkpoint_store
2598            .record_checkpoint_fork_detected(seq_num, digest)
2599            .unwrap();
2600
2601        let fork_recovery = ForkRecoveryConfig {
2602            transaction_overrides: Default::default(),
2603            checkpoint_overrides: Default::default(),
2604            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2605        };
2606
2607        let r = SuiNode::check_and_recover_forks(
2608            &checkpoint_store,
2609            &checkpoint_metrics,
2610            true,
2611            Some(&fork_recovery),
2612        )
2613        .await;
2614        assert!(r.is_err());
2615        assert!(
2616            r.unwrap_err()
2617                .to_string()
2618                .contains("Checkpoint fork detected")
2619        );
2620
2621        let mut checkpoint_overrides = BTreeMap::new();
2622        checkpoint_overrides.insert(seq_num, digest.to_string());
2623        let fork_recovery_with_override = ForkRecoveryConfig {
2624            transaction_overrides: Default::default(),
2625            checkpoint_overrides,
2626            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2627        };
2628        let r = SuiNode::check_and_recover_forks(
2629            &checkpoint_store,
2630            &checkpoint_metrics,
2631            true,
2632            Some(&fork_recovery_with_override),
2633        )
2634        .await;
2635        assert!(r.is_ok());
2636        assert!(
2637            checkpoint_store
2638                .get_checkpoint_fork_detected()
2639                .unwrap()
2640                .is_none()
2641        );
2642
2643        // ---------- Transaction fork path ----------
2644        let tx_digest = TransactionDigest::random();
2645        let expected_effects = TransactionEffectsDigest::random();
2646        let actual_effects = TransactionEffectsDigest::random();
2647        checkpoint_store
2648            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2649            .unwrap();
2650
2651        let fork_recovery = ForkRecoveryConfig {
2652            transaction_overrides: Default::default(),
2653            checkpoint_overrides: Default::default(),
2654            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2655        };
2656        let r = SuiNode::check_and_recover_forks(
2657            &checkpoint_store,
2658            &checkpoint_metrics,
2659            true,
2660            Some(&fork_recovery),
2661        )
2662        .await;
2663        assert!(r.is_err());
2664        assert!(
2665            r.unwrap_err()
2666                .to_string()
2667                .contains("Transaction fork detected")
2668        );
2669
2670        let mut transaction_overrides = BTreeMap::new();
2671        transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2672        let fork_recovery_with_override = ForkRecoveryConfig {
2673            transaction_overrides,
2674            checkpoint_overrides: Default::default(),
2675            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2676        };
2677        let r = SuiNode::check_and_recover_forks(
2678            &checkpoint_store,
2679            &checkpoint_metrics,
2680            true,
2681            Some(&fork_recovery_with_override),
2682        )
2683        .await;
2684        assert!(r.is_ok());
2685        assert!(
2686            checkpoint_store
2687                .get_transaction_fork_detected()
2688                .unwrap()
2689                .is_none()
2690        );
2691    }
2692}