sui_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::authority::ExecutionEnv;
29use sui_core::authority::RandomnessRoundReceiver;
30use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
31use sui_core::authority::backpressure::BackpressureManager;
32use sui_core::authority::epoch_start_configuration::EpochFlag;
33use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
34use sui_core::consensus_adapter::ConsensusClient;
35use sui_core::consensus_manager::UpdatableConsensusClient;
36use sui_core::epoch::randomness::RandomnessManager;
37use sui_core::execution_cache::build_execution_cache;
38use sui_network::endpoint_manager::{AddressSource, EndpointId};
39use sui_network::validator::server::SUI_TLS_SERVER_NAME;
40use sui_types::full_checkpoint_content::Checkpoint;
41
42use sui_core::execution_scheduler::SchedulingSource;
43use sui_core::global_state_hasher::GlobalStateHashMetrics;
44use sui_core::storage::RestReadStore;
45use sui_json_rpc::bridge_api::BridgeReadApi;
46use sui_json_rpc_api::JsonRpcMetrics;
47use sui_network::randomness;
48use sui_rpc_api::RpcMetrics;
49use sui_rpc_api::ServerVersion;
50use sui_rpc_api::subscription::SubscriptionService;
51use sui_types::base_types::ConciseableName;
52use sui_types::crypto::RandomnessRound;
53use sui_types::digests::{
54    ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
55};
56use sui_types::messages_consensus::AuthorityCapabilitiesV2;
57use sui_types::sui_system_state::SuiSystemState;
58use tap::tap::TapFallible;
59use tokio::sync::oneshot;
60use tokio::sync::{Mutex, broadcast, mpsc};
61use tokio::task::JoinHandle;
62use tower::ServiceBuilder;
63use tracing::{Instrument, error_span, info};
64use tracing::{debug, error, warn};
65
66// Logs at debug level in test configuration, info level otherwise.
67// JWK logs cause significant volume in tests, but are insignificant in prod,
68// so we keep them at info
69macro_rules! jwk_log {
70    ($($arg:tt)+) => {
71        if in_test_configuration() {
72            debug!($($arg)+);
73        } else {
74            info!($($arg)+);
75        }
76    };
77}
78
79use fastcrypto_zkp::bn254::zk_login::JWK;
80pub use handle::SuiNodeHandle;
81use mysten_metrics::{RegistryService, spawn_monitored_task};
82use mysten_service::server_timing::server_timing_middleware;
83use sui_config::node::{DBCheckpointConfig, RunWithRange};
84use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
85use sui_config::node_config_metrics::NodeConfigMetrics;
86use sui_config::{ConsensusConfig, NodeConfig};
87use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
88use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
89use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
90use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
91use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
92use sui_core::authority_aggregator::AuthorityAggregator;
93use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
94use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
95use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
96use sui_core::checkpoints::{
97    CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
98    SubmitCheckpointToConsensus,
99};
100use sui_core::consensus_adapter::{
101    CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
102};
103use sui_core::consensus_manager::ConsensusManager;
104use sui_core::consensus_throughput_calculator::{
105    ConsensusThroughputCalculator, ConsensusThroughputProfiler, ThroughputProfileRanges,
106};
107use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
108use sui_core::db_checkpoint_handler::DBCheckpointHandler;
109use sui_core::epoch::committee_store::CommitteeStore;
110use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
111use sui_core::epoch::epoch_metrics::EpochMetrics;
112use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
113use sui_core::global_state_hasher::GlobalStateHasher;
114use sui_core::jsonrpc_index::IndexStore;
115use sui_core::module_cache_metrics::ResolverMetrics;
116use sui_core::overload_monitor::overload_monitor;
117use sui_core::rpc_index::RpcIndexStore;
118use sui_core::signature_verifier::SignatureVerifierMetrics;
119use sui_core::storage::RocksDbStore;
120use sui_core::transaction_orchestrator::TransactionOrchestrator;
121use sui_core::{
122    authority::{AuthorityState, AuthorityStore},
123    authority_client::NetworkAuthorityClient,
124};
125use sui_json_rpc::JsonRpcServerBuilder;
126use sui_json_rpc::coin_api::CoinReadApi;
127use sui_json_rpc::governance_api::GovernanceReadApi;
128use sui_json_rpc::indexer_api::IndexerApi;
129use sui_json_rpc::move_utils::MoveUtils;
130use sui_json_rpc::read_api::ReadApi;
131use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
132use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
133use sui_macros::fail_point;
134use sui_macros::{fail_point_async, replay_log};
135use sui_network::api::ValidatorServer;
136use sui_network::discovery;
137use sui_network::endpoint_manager::EndpointManager;
138use sui_network::state_sync;
139use sui_network::validator::server::ServerBuilder;
140use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
141use sui_snapshot::uploader::StateSnapshotUploader;
142use sui_storage::{
143    http_key_value_store::HttpKVStore,
144    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
145    key_value_store_metrics::KeyValueStoreMetrics,
146};
147use sui_types::base_types::{AuthorityName, EpochId};
148use sui_types::committee::Committee;
149use sui_types::crypto::KeypairTraits;
150use sui_types::error::{SuiError, SuiResult};
151use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
152use sui_types::sui_system_state::SuiSystemStateTrait;
153use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
154use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
155use sui_types::supported_protocol_versions::SupportedProtocolVersions;
156use typed_store::DBMetrics;
157use typed_store::rocks::default_db_options;
158
159use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
160
161pub mod admin;
162mod handle;
163pub mod metrics;
164
165pub struct ValidatorComponents {
166    validator_server_handle: SpawnOnce,
167    validator_overload_monitor_handle: Option<JoinHandle<()>>,
168    consensus_manager: Arc<ConsensusManager>,
169    consensus_store_pruner: ConsensusStorePruner,
170    consensus_adapter: Arc<ConsensusAdapter>,
171    checkpoint_metrics: Arc<CheckpointMetrics>,
172    sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
173}
174pub struct P2pComponents {
175    p2p_network: Network,
176    known_peers: HashMap<PeerId, String>,
177    discovery_handle: discovery::Handle,
178    state_sync_handle: state_sync::Handle,
179    randomness_handle: randomness::Handle,
180}
181
182#[cfg(msim)]
183mod simulator {
184    use std::sync::atomic::AtomicBool;
185    use sui_types::error::SuiErrorKind;
186
187    use super::*;
188    pub(super) struct SimState {
189        pub sim_node: sui_simulator::runtime::NodeHandle,
190        pub sim_safe_mode_expected: AtomicBool,
191        _leak_detector: sui_simulator::NodeLeakDetector,
192    }
193
194    impl Default for SimState {
195        fn default() -> Self {
196            Self {
197                sim_node: sui_simulator::runtime::NodeHandle::current(),
198                sim_safe_mode_expected: AtomicBool::new(false),
199                _leak_detector: sui_simulator::NodeLeakDetector::new(),
200            }
201        }
202    }
203
204    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
205        + Send
206        + Sync
207        + 'static;
208
209    fn default_fetch_jwks(
210        _authority: AuthorityName,
211        _provider: &OIDCProvider,
212    ) -> SuiResult<Vec<(JwkId, JWK)>> {
213        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
214        // Just load a default Twitch jwk for testing.
215        parse_jwks(
216            sui_types::zk_login_util::DEFAULT_JWK_BYTES,
217            &OIDCProvider::Twitch,
218            true,
219        )
220        .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
221    }
222
223    thread_local! {
224        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
225    }
226
227    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
228        JWK_INJECTOR.with(|injector| injector.borrow().clone())
229    }
230
231    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
232        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
233    }
234}
235
236#[cfg(msim)]
237pub use simulator::set_jwk_injector;
238#[cfg(msim)]
239use simulator::*;
240use sui_core::authority::authority_store_pruner::PrunerWatermarks;
241use sui_core::{
242    consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
243};
244
245const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
246
247pub struct SuiNode {
248    config: NodeConfig,
249    validator_components: Mutex<Option<ValidatorComponents>>,
250
251    /// The http servers responsible for serving RPC traffic (gRPC and JSON-RPC)
252    #[allow(unused)]
253    http_servers: HttpServers,
254
255    state: Arc<AuthorityState>,
256    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
257    registry_service: RegistryService,
258    metrics: Arc<SuiNodeMetrics>,
259    checkpoint_metrics: Arc<CheckpointMetrics>,
260
261    _discovery: discovery::Handle,
262    _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
263    state_sync_handle: state_sync::Handle,
264    randomness_handle: randomness::Handle,
265    checkpoint_store: Arc<CheckpointStore>,
266    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
267    connection_monitor_status: Arc<ConnectionMonitorStatus>,
268
269    /// Broadcast channel to send the starting system state for the next epoch.
270    end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
271
272    /// EndpointManager for updating peer network addresses.
273    endpoint_manager: EndpointManager,
274
275    backpressure_manager: Arc<BackpressureManager>,
276
277    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
278
279    #[cfg(msim)]
280    sim_state: SimState,
281
282    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
283    // Channel to allow signaling upstream to shutdown sui-node
284    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
285
286    /// AuthorityAggregator of the network, created at start and beginning of each epoch.
287    /// Use ArcSwap so that we could mutate it without taking mut reference.
288    // TODO: Eventually we can make this auth aggregator a shared reference so that this
289    // update will automatically propagate to other uses.
290    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
291
292    subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
293}
294
295impl fmt::Debug for SuiNode {
296    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
297        f.debug_struct("SuiNode")
298            .field("name", &self.state.name.concise())
299            .finish()
300    }
301}
302
303static MAX_JWK_KEYS_PER_FETCH: usize = 100;
304
305impl SuiNode {
306    pub async fn start(
307        config: NodeConfig,
308        registry_service: RegistryService,
309    ) -> Result<Arc<SuiNode>> {
310        Self::start_async(
311            config,
312            registry_service,
313            ServerVersion::new("sui-node", "unknown"),
314        )
315        .await
316    }
317
318    fn start_jwk_updater(
319        config: &NodeConfig,
320        metrics: Arc<SuiNodeMetrics>,
321        authority: AuthorityName,
322        epoch_store: Arc<AuthorityPerEpochStore>,
323        consensus_adapter: Arc<ConsensusAdapter>,
324    ) {
325        let epoch = epoch_store.epoch();
326
327        let supported_providers = config
328            .zklogin_oauth_providers
329            .get(&epoch_store.get_chain_identifier().chain())
330            .unwrap_or(&BTreeSet::new())
331            .iter()
332            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
333            .collect::<Vec<_>>();
334
335        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
336
337        info!(
338            ?fetch_interval,
339            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
340        );
341
342        fn validate_jwk(
343            metrics: &Arc<SuiNodeMetrics>,
344            provider: &OIDCProvider,
345            id: &JwkId,
346            jwk: &JWK,
347        ) -> bool {
348            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
349                warn!(
350                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
351                    id.iss, provider
352                );
353                metrics
354                    .invalid_jwks
355                    .with_label_values(&[&provider.to_string()])
356                    .inc();
357                return false;
358            };
359
360            if iss_provider != *provider {
361                warn!(
362                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
363                    id.iss, provider, iss_provider
364                );
365                metrics
366                    .invalid_jwks
367                    .with_label_values(&[&provider.to_string()])
368                    .inc();
369                return false;
370            }
371
372            if !check_total_jwk_size(id, jwk) {
373                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
374                metrics
375                    .invalid_jwks
376                    .with_label_values(&[&provider.to_string()])
377                    .inc();
378                return false;
379            }
380
381            true
382        }
383
384        // metrics is:
385        //  pub struct SuiNodeMetrics {
386        //      pub jwk_requests: IntCounterVec,
387        //      pub jwk_request_errors: IntCounterVec,
388        //      pub total_jwks: IntCounterVec,
389        //      pub unique_jwks: IntCounterVec,
390        //  }
391
392        for p in supported_providers.into_iter() {
393            let provider_str = p.to_string();
394            let epoch_store = epoch_store.clone();
395            let consensus_adapter = consensus_adapter.clone();
396            let metrics = metrics.clone();
397            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
398                async move {
399                    // note: restart-safe de-duplication happens after consensus, this is
400                    // just best-effort to reduce unneeded submissions.
401                    let mut seen = HashSet::new();
402                    loop {
403                        jwk_log!("fetching JWK for provider {:?}", p);
404                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
405                        match Self::fetch_jwks(authority, &p).await {
406                            Err(e) => {
407                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
408                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
409                                // Retry in 30 seconds
410                                tokio::time::sleep(Duration::from_secs(30)).await;
411                                continue;
412                            }
413                            Ok(mut keys) => {
414                                metrics.total_jwks
415                                    .with_label_values(&[&provider_str])
416                                    .inc_by(keys.len() as u64);
417
418                                keys.retain(|(id, jwk)| {
419                                    validate_jwk(&metrics, &p, id, jwk) &&
420                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
421                                    seen.insert((id.clone(), jwk.clone()))
422                                });
423
424                                metrics.unique_jwks
425                                    .with_label_values(&[&provider_str])
426                                    .inc_by(keys.len() as u64);
427
428                                // prevent oauth providers from sending too many keys,
429                                // inadvertently or otherwise
430                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
431                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
432                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
433                                }
434
435                                for (id, jwk) in keys.into_iter() {
436                                    jwk_log!("Submitting JWK to consensus: {:?}", id);
437
438                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
439                                    consensus_adapter.submit(txn, None, &epoch_store, None, None)
440                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
441                                        .ok();
442                                }
443                            }
444                        }
445                        tokio::time::sleep(fetch_interval).await;
446                    }
447                }
448                .instrument(error_span!("jwk_updater_task", epoch)),
449            ));
450        }
451    }
452
453    pub async fn start_async(
454        config: NodeConfig,
455        registry_service: RegistryService,
456        server_version: ServerVersion,
457    ) -> Result<Arc<SuiNode>> {
458        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
459        let mut config = config.clone();
460        if config.supported_protocol_versions.is_none() {
461            info!(
462                "populating config.supported_protocol_versions with default {:?}",
463                SupportedProtocolVersions::SYSTEM_DEFAULT
464            );
465            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
466        }
467
468        let run_with_range = config.run_with_range;
469        let is_validator = config.consensus_config().is_some();
470        let is_full_node = !is_validator;
471        let prometheus_registry = registry_service.default_registry();
472
473        info!(node =? config.protocol_public_key(),
474            "Initializing sui-node listening on {}", config.network_address
475        );
476
477        // Initialize metrics to track db usage before creating any stores
478        DBMetrics::init(registry_service.clone());
479
480        // Initialize db sync-to-disk setting from config (falls back to env var if not set)
481        typed_store::init_write_sync(config.enable_db_sync_to_disk);
482
483        // Initialize Mysten metrics.
484        mysten_metrics::init_metrics(&prometheus_registry);
485        // Unsupported (because of the use of static variable) and unnecessary in simtests.
486        #[cfg(not(msim))]
487        mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
488
489        let genesis = config.genesis()?.clone();
490
491        let secret = Arc::pin(config.protocol_key_pair().copy());
492        let genesis_committee = genesis.committee();
493        let committee_store = Arc::new(CommitteeStore::new(
494            config.db_path().join("epochs"),
495            &genesis_committee,
496            None,
497        ));
498
499        let pruner_watermarks = Arc::new(PrunerWatermarks::default());
500        let checkpoint_store = CheckpointStore::new(
501            &config.db_path().join("checkpoints"),
502            pruner_watermarks.clone(),
503        );
504        let checkpoint_metrics = CheckpointMetrics::new(&registry_service.default_registry());
505
506        Self::check_and_recover_forks(
507            &checkpoint_store,
508            &checkpoint_metrics,
509            is_validator,
510            config.fork_recovery.as_ref(),
511        )
512        .await?;
513
514        // By default, only enable write stall on validators for perpetual db.
515        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
516        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
517            enable_write_stall,
518            is_validator,
519        };
520        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
521            &config.db_path().join("store"),
522            Some(perpetual_tables_options),
523            Some(pruner_watermarks.epoch_id.clone()),
524        ));
525        let is_genesis = perpetual_tables
526            .database_is_empty()
527            .expect("Database read should not fail at init.");
528
529        let backpressure_manager =
530            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
531
532        let store =
533            AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
534
535        let cur_epoch = store.get_recovery_epoch_at_restart()?;
536        let committee = committee_store
537            .get_committee(&cur_epoch)?
538            .expect("Committee of the current epoch must exist");
539        let epoch_start_configuration = store
540            .get_epoch_start_configuration()?
541            .expect("EpochStartConfiguration of the current epoch must exist");
542        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
543        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
544
545        let cache_traits = build_execution_cache(
546            &config.execution_cache,
547            &prometheus_registry,
548            &store,
549            backpressure_manager.clone(),
550        );
551
552        let auth_agg = {
553            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
554            Arc::new(ArcSwap::new(Arc::new(
555                AuthorityAggregator::new_from_epoch_start_state(
556                    epoch_start_configuration.epoch_start_state(),
557                    &committee_store,
558                    safe_client_metrics_base,
559                ),
560            )))
561        };
562
563        let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
564        let chain = match config.chain_override_for_testing {
565            Some(chain) => chain,
566            None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
567        };
568
569        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
570        let epoch_store = AuthorityPerEpochStore::new(
571            config.protocol_public_key(),
572            committee.clone(),
573            &config.db_path().join("store"),
574            Some(epoch_options.options),
575            EpochMetrics::new(&registry_service.default_registry()),
576            epoch_start_configuration,
577            cache_traits.backing_package_store.clone(),
578            cache_traits.object_store.clone(),
579            cache_metrics,
580            signature_verifier_metrics,
581            &config.expensive_safety_check_config,
582            (chain_id, chain),
583            checkpoint_store
584                .get_highest_executed_checkpoint_seq_number()
585                .expect("checkpoint store read cannot fail")
586                .unwrap_or(0),
587            Arc::new(SubmittedTransactionCacheMetrics::new(
588                &registry_service.default_registry(),
589            )),
590        )?;
591
592        info!("created epoch store");
593
594        replay_log!(
595            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
596            epoch_store.epoch(),
597            epoch_store.protocol_config()
598        );
599
600        // the database is empty at genesis time
601        if is_genesis {
602            info!("checking SUI conservation at genesis");
603            // When we are opening the db table, the only time when it's safe to
604            // check SUI conservation is at genesis. Otherwise we may be in the middle of
605            // an epoch and the SUI conservation check will fail. This also initialize
606            // the expected_network_sui_amount table.
607            cache_traits
608                .reconfig_api
609                .expensive_check_sui_conservation(&epoch_store)
610                .expect("SUI conservation check cannot fail at genesis");
611        }
612
613        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
614        let default_buffer_stake = epoch_store
615            .protocol_config()
616            .buffer_stake_for_protocol_upgrade_bps();
617        if effective_buffer_stake != default_buffer_stake {
618            warn!(
619                ?effective_buffer_stake,
620                ?default_buffer_stake,
621                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
622            );
623        }
624
625        checkpoint_store.insert_genesis_checkpoint(
626            genesis.checkpoint(),
627            genesis.checkpoint_contents().clone(),
628            &epoch_store,
629        );
630
631        info!("creating state sync store");
632        let state_sync_store = RocksDbStore::new(
633            cache_traits.clone(),
634            committee_store.clone(),
635            checkpoint_store.clone(),
636        );
637
638        let index_store = if is_full_node && config.enable_index_processing {
639            info!("creating jsonrpc index store");
640            Some(Arc::new(IndexStore::new(
641                config.db_path().join("indexes"),
642                &prometheus_registry,
643                epoch_store
644                    .protocol_config()
645                    .max_move_identifier_len_as_option(),
646                config.remove_deprecated_tables,
647            )))
648        } else {
649            None
650        };
651
652        let rpc_index = if is_full_node && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
653            info!("creating rpc index store");
654            Some(Arc::new(
655                RpcIndexStore::new(
656                    &config.db_path(),
657                    &store,
658                    &checkpoint_store,
659                    &epoch_store,
660                    &cache_traits.backing_package_store,
661                    pruner_watermarks.checkpoint_id.clone(),
662                    config.rpc().cloned().unwrap_or_default(),
663                )
664                .await,
665            ))
666        } else {
667            None
668        };
669
670        let chain_identifier = epoch_store.get_chain_identifier();
671
672        info!("creating archive reader");
673        // Create network
674        let (randomness_tx, randomness_rx) = mpsc::channel(
675            config
676                .p2p_config
677                .randomness
678                .clone()
679                .unwrap_or_default()
680                .mailbox_capacity(),
681        );
682        let P2pComponents {
683            p2p_network,
684            known_peers,
685            discovery_handle,
686            state_sync_handle,
687            randomness_handle,
688        } = Self::create_p2p_network(
689            &config,
690            state_sync_store.clone(),
691            chain_identifier,
692            randomness_tx,
693            &prometheus_registry,
694        )?;
695
696        let endpoint_manager = EndpointManager::new(discovery_handle.clone());
697
698        // Inject configured peer address overrides.
699        for peer in &config.p2p_config.peer_address_overrides {
700            endpoint_manager
701                .update_endpoint(
702                    EndpointId::P2p(peer.peer_id),
703                    AddressSource::Config,
704                    peer.addresses.clone(),
705                )
706                .expect("Updating peer address overrides should not fail");
707        }
708
709        // Send initial peer addresses to the p2p network.
710        update_peer_addresses(&config, &endpoint_manager, epoch_store.epoch_start_state());
711
712        info!("start snapshot upload");
713        // Start uploading state snapshot to remote store
714        let state_snapshot_handle = Self::start_state_snapshot(
715            &config,
716            &prometheus_registry,
717            checkpoint_store.clone(),
718            chain_identifier,
719        )?;
720
721        // Start uploading db checkpoints to remote store
722        info!("start db checkpoint");
723        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
724            &config,
725            &prometheus_registry,
726            state_snapshot_handle.is_some(),
727        )?;
728
729        if !epoch_store
730            .protocol_config()
731            .simplified_unwrap_then_delete()
732        {
733            // We cannot prune tombstones if simplified_unwrap_then_delete is not enabled.
734            config
735                .authority_store_pruning_config
736                .set_killswitch_tombstone_pruning(true);
737        }
738
739        let authority_name = config.protocol_public_key();
740
741        info!("create authority state");
742        let state = AuthorityState::new(
743            authority_name,
744            secret,
745            config.supported_protocol_versions.unwrap(),
746            store.clone(),
747            cache_traits.clone(),
748            epoch_store.clone(),
749            committee_store.clone(),
750            index_store.clone(),
751            rpc_index,
752            checkpoint_store.clone(),
753            &prometheus_registry,
754            genesis.objects(),
755            &db_checkpoint_config,
756            config.clone(),
757            chain_identifier,
758            config.policy_config.clone(),
759            config.firewall_config.clone(),
760            pruner_watermarks,
761        )
762        .await;
763        // ensure genesis txn was executed
764        if epoch_store.epoch() == 0 {
765            let txn = &genesis.transaction();
766            let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
767            let transaction =
768                sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
769                    sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
770                        genesis.transaction().data().clone(),
771                        sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
772                    ),
773                );
774            state
775                .try_execute_immediately(
776                    &transaction,
777                    ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
778                    &epoch_store,
779                )
780                .instrument(span)
781                .await
782                .unwrap();
783        }
784
785        // Start the loop that receives new randomness and generates transactions for it.
786        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
787
788        if config
789            .expensive_safety_check_config
790            .enable_secondary_index_checks()
791            && let Some(indexes) = state.indexes.clone()
792        {
793            sui_core::verify_indexes::verify_indexes(
794                state.get_global_state_hash_store().as_ref(),
795                indexes,
796            )
797            .expect("secondary indexes are inconsistent");
798        }
799
800        let (end_of_epoch_channel, end_of_epoch_receiver) =
801            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
802
803        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
804            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
805                auth_agg.load_full(),
806                state.clone(),
807                end_of_epoch_receiver,
808                &config.db_path(),
809                &prometheus_registry,
810                &config,
811            )))
812        } else {
813            None
814        };
815
816        let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
817            state.clone(),
818            state_sync_store,
819            &transaction_orchestrator.clone(),
820            &config,
821            &prometheus_registry,
822            server_version,
823        )
824        .await?;
825
826        let global_state_hasher = Arc::new(GlobalStateHasher::new(
827            cache_traits.global_state_hash_store.clone(),
828            GlobalStateHashMetrics::new(&prometheus_registry),
829        ));
830
831        let authority_names_to_peer_ids = epoch_store
832            .epoch_start_state()
833            .get_authority_names_to_peer_ids();
834
835        let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
836            "sui",
837            &registry_service.default_registry(),
838        );
839
840        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
841
842        let connection_monitor_handle =
843            mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
844                p2p_network.downgrade(),
845                Arc::new(network_connection_metrics),
846                known_peers,
847            );
848
849        let connection_monitor_status = ConnectionMonitorStatus {
850            connection_statuses: connection_monitor_handle.connection_statuses(),
851            authority_names_to_peer_ids,
852        };
853
854        let connection_monitor_status = Arc::new(connection_monitor_status);
855        let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));
856
857        sui_node_metrics
858            .binary_max_protocol_version
859            .set(ProtocolVersion::MAX.as_u64() as i64);
860        sui_node_metrics
861            .configured_max_protocol_version
862            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
863
864        let validator_components = if state.is_validator(&epoch_store) {
865            let mut components = Self::construct_validator_components(
866                config.clone(),
867                state.clone(),
868                committee,
869                epoch_store.clone(),
870                checkpoint_store.clone(),
871                state_sync_handle.clone(),
872                randomness_handle.clone(),
873                Arc::downgrade(&global_state_hasher),
874                backpressure_manager.clone(),
875                connection_monitor_status.clone(),
876                &registry_service,
877                sui_node_metrics.clone(),
878                checkpoint_metrics.clone(),
879            )
880            .await?;
881
882            components.consensus_adapter.submit_recovered(&epoch_store);
883
884            // Start the gRPC server
885            components.validator_server_handle = components.validator_server_handle.start().await;
886
887            // Set the consensus address updater so that we can update the consensus peer addresses when requested.
888            endpoint_manager.set_consensus_address_updater(components.consensus_manager.clone());
889
890            Some(components)
891        } else {
892            None
893        };
894
895        // setup shutdown channel
896        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
897
898        let node = Self {
899            config,
900            validator_components: Mutex::new(validator_components),
901            http_servers,
902            state,
903            transaction_orchestrator,
904            registry_service,
905            metrics: sui_node_metrics,
906            checkpoint_metrics,
907
908            _discovery: discovery_handle,
909            _connection_monitor_handle: connection_monitor_handle,
910            state_sync_handle,
911            randomness_handle,
912            checkpoint_store,
913            global_state_hasher: Mutex::new(Some(global_state_hasher)),
914            end_of_epoch_channel,
915            connection_monitor_status,
916            endpoint_manager,
917            backpressure_manager,
918
919            _db_checkpoint_handle: db_checkpoint_handle,
920
921            #[cfg(msim)]
922            sim_state: Default::default(),
923
924            _state_snapshot_uploader_handle: state_snapshot_handle,
925            shutdown_channel_tx: shutdown_channel,
926
927            auth_agg,
928            subscription_service_checkpoint_sender,
929        };
930
931        info!("SuiNode started!");
932        let node = Arc::new(node);
933        let node_copy = node.clone();
934        spawn_monitored_task!(async move {
935            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
936            if let Err(error) = result {
937                warn!("Reconfiguration finished with error {:?}", error);
938            }
939        });
940
941        Ok(node)
942    }
943
944    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
945        self.end_of_epoch_channel.subscribe()
946    }
947
948    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
949        self.shutdown_channel_tx.subscribe()
950    }
951
952    pub fn current_epoch_for_testing(&self) -> EpochId {
953        self.state.current_epoch_for_testing()
954    }
955
956    pub fn db_checkpoint_path(&self) -> PathBuf {
957        self.config.db_checkpoint_path()
958    }
959
960    // Init reconfig process by starting to reject user certs
961    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
962        info!("close_epoch (current epoch = {})", epoch_store.epoch());
963        self.validator_components
964            .lock()
965            .await
966            .as_ref()
967            .ok_or_else(|| SuiError::from("Node is not a validator"))?
968            .consensus_adapter
969            .close_epoch(epoch_store);
970        Ok(())
971    }
972
973    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
974        self.state
975            .clear_override_protocol_upgrade_buffer_stake(epoch)
976    }
977
978    pub fn set_override_protocol_upgrade_buffer_stake(
979        &self,
980        epoch: EpochId,
981        buffer_stake_bps: u64,
982    ) -> SuiResult {
983        self.state
984            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
985    }
986
987    // Testing-only API to start epoch close process.
988    // For production code, please use the non-testing version.
989    pub async fn close_epoch_for_testing(&self) -> SuiResult {
990        let epoch_store = self.state.epoch_store_for_testing();
991        self.close_epoch(&epoch_store).await
992    }
993
994    fn start_state_snapshot(
995        config: &NodeConfig,
996        prometheus_registry: &Registry,
997        checkpoint_store: Arc<CheckpointStore>,
998        chain_identifier: ChainIdentifier,
999    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1000        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1001            let snapshot_uploader = StateSnapshotUploader::new(
1002                &config.db_checkpoint_path(),
1003                &config.snapshot_path(),
1004                remote_store_config.clone(),
1005                60,
1006                prometheus_registry,
1007                checkpoint_store,
1008                chain_identifier,
1009                config.state_snapshot_write_config.archive_interval_epochs,
1010            )?;
1011            Ok(Some(snapshot_uploader.start()))
1012        } else {
1013            Ok(None)
1014        }
1015    }
1016
1017    fn start_db_checkpoint(
1018        config: &NodeConfig,
1019        prometheus_registry: &Registry,
1020        state_snapshot_enabled: bool,
1021    ) -> Result<(
1022        DBCheckpointConfig,
1023        Option<tokio::sync::broadcast::Sender<()>>,
1024    )> {
1025        let checkpoint_path = Some(
1026            config
1027                .db_checkpoint_config
1028                .checkpoint_path
1029                .clone()
1030                .unwrap_or_else(|| config.db_checkpoint_path()),
1031        );
1032        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1033            DBCheckpointConfig {
1034                checkpoint_path,
1035                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1036                    true
1037                } else {
1038                    config
1039                        .db_checkpoint_config
1040                        .perform_db_checkpoints_at_epoch_end
1041                },
1042                ..config.db_checkpoint_config.clone()
1043            }
1044        } else {
1045            config.db_checkpoint_config.clone()
1046        };
1047
1048        match (
1049            db_checkpoint_config.object_store_config.as_ref(),
1050            state_snapshot_enabled,
1051        ) {
1052            // If db checkpoint config object store not specified but
1053            // state snapshot object store is specified, create handler
1054            // anyway for marking db checkpoints as completed so that they
1055            // can be uploaded as state snapshots.
1056            (None, false) => Ok((db_checkpoint_config, None)),
1057            (_, _) => {
1058                let handler = DBCheckpointHandler::new(
1059                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1060                    db_checkpoint_config.object_store_config.as_ref(),
1061                    60,
1062                    db_checkpoint_config
1063                        .prune_and_compact_before_upload
1064                        .unwrap_or(true),
1065                    config.authority_store_pruning_config.clone(),
1066                    prometheus_registry,
1067                    state_snapshot_enabled,
1068                )?;
1069                Ok((
1070                    db_checkpoint_config,
1071                    Some(DBCheckpointHandler::start(handler)),
1072                ))
1073            }
1074        }
1075    }
1076
1077    fn create_p2p_network(
1078        config: &NodeConfig,
1079        state_sync_store: RocksDbStore,
1080        chain_identifier: ChainIdentifier,
1081        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1082        prometheus_registry: &Registry,
1083    ) -> Result<P2pComponents> {
1084        let (state_sync, state_sync_router) = state_sync::Builder::new()
1085            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1086            .store(state_sync_store)
1087            .archive_config(config.archive_reader_config())
1088            .with_metrics(prometheus_registry)
1089            .build();
1090
1091        let (discovery, discovery_server) = discovery::Builder::new()
1092            .config(config.p2p_config.clone())
1093            .build();
1094
1095        let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1096        let known_peers: HashMap<PeerId, String> = discovery_config
1097            .allowlisted_peers
1098            .clone()
1099            .into_iter()
1100            .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1101            .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1102                peer.peer_id
1103                    .map(|peer_id| (peer_id, "seed_peer".to_string()))
1104            }))
1105            .collect();
1106
1107        let (randomness, randomness_router) =
1108            randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1109                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1110                .with_metrics(prometheus_registry)
1111                .build();
1112
1113        let p2p_network = {
1114            let routes = anemo::Router::new()
1115                .add_rpc_service(discovery_server)
1116                .merge(state_sync_router);
1117            let routes = routes.merge(randomness_router);
1118
1119            let inbound_network_metrics =
1120                mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1121            let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1122                "sui",
1123                "outbound",
1124                prometheus_registry,
1125            );
1126
1127            let service = ServiceBuilder::new()
1128                .layer(
1129                    TraceLayer::new_for_server_errors()
1130                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1131                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1132                )
1133                .layer(CallbackLayer::new(
1134                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1135                        Arc::new(inbound_network_metrics),
1136                        config.p2p_config.excessive_message_size(),
1137                    ),
1138                ))
1139                .service(routes);
1140
1141            let outbound_layer = ServiceBuilder::new()
1142                .layer(
1143                    TraceLayer::new_for_client_and_server_errors()
1144                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1145                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1146                )
1147                .layer(CallbackLayer::new(
1148                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1149                        Arc::new(outbound_network_metrics),
1150                        config.p2p_config.excessive_message_size(),
1151                    ),
1152                ))
1153                .into_inner();
1154
1155            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1156            // Set the max_frame_size to be 1 GB to work around the issue of there being too many
1157            // staking events in the epoch change txn.
1158            anemo_config.max_frame_size = Some(1 << 30);
1159
1160            // Set a higher default value for socket send/receive buffers if not already
1161            // configured.
1162            let mut quic_config = anemo_config.quic.unwrap_or_default();
1163            if quic_config.socket_send_buffer_size.is_none() {
1164                quic_config.socket_send_buffer_size = Some(20 << 20);
1165            }
1166            if quic_config.socket_receive_buffer_size.is_none() {
1167                quic_config.socket_receive_buffer_size = Some(20 << 20);
1168            }
1169            quic_config.allow_failed_socket_buffer_size_setting = true;
1170
1171            // Set high-performance defaults for quinn transport.
1172            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1173            if quic_config.max_concurrent_bidi_streams.is_none() {
1174                quic_config.max_concurrent_bidi_streams = Some(500);
1175            }
1176            if quic_config.max_concurrent_uni_streams.is_none() {
1177                quic_config.max_concurrent_uni_streams = Some(500);
1178            }
1179            if quic_config.stream_receive_window.is_none() {
1180                quic_config.stream_receive_window = Some(100 << 20);
1181            }
1182            if quic_config.receive_window.is_none() {
1183                quic_config.receive_window = Some(200 << 20);
1184            }
1185            if quic_config.send_window.is_none() {
1186                quic_config.send_window = Some(200 << 20);
1187            }
1188            if quic_config.crypto_buffer_size.is_none() {
1189                quic_config.crypto_buffer_size = Some(1 << 20);
1190            }
1191            if quic_config.max_idle_timeout_ms.is_none() {
1192                quic_config.max_idle_timeout_ms = Some(10_000);
1193            }
1194            if quic_config.keep_alive_interval_ms.is_none() {
1195                quic_config.keep_alive_interval_ms = Some(5_000);
1196            }
1197            anemo_config.quic = Some(quic_config);
1198
1199            let server_name = format!("sui-{}", chain_identifier);
1200            let network = Network::bind(config.p2p_config.listen_address)
1201                .server_name(&server_name)
1202                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1203                .config(anemo_config)
1204                .outbound_request_layer(outbound_layer)
1205                .start(service)?;
1206            info!(
1207                server_name = server_name,
1208                "P2p network started on {}",
1209                network.local_addr()
1210            );
1211
1212            network
1213        };
1214
1215        let discovery_handle =
1216            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1217        let state_sync_handle = state_sync.start(p2p_network.clone());
1218        let randomness_handle = randomness.start(p2p_network.clone());
1219
1220        Ok(P2pComponents {
1221            p2p_network,
1222            known_peers,
1223            discovery_handle,
1224            state_sync_handle,
1225            randomness_handle,
1226        })
1227    }
1228
1229    async fn construct_validator_components(
1230        config: NodeConfig,
1231        state: Arc<AuthorityState>,
1232        committee: Arc<Committee>,
1233        epoch_store: Arc<AuthorityPerEpochStore>,
1234        checkpoint_store: Arc<CheckpointStore>,
1235        state_sync_handle: state_sync::Handle,
1236        randomness_handle: randomness::Handle,
1237        global_state_hasher: Weak<GlobalStateHasher>,
1238        backpressure_manager: Arc<BackpressureManager>,
1239        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1240        registry_service: &RegistryService,
1241        sui_node_metrics: Arc<SuiNodeMetrics>,
1242        checkpoint_metrics: Arc<CheckpointMetrics>,
1243    ) -> Result<ValidatorComponents> {
1244        let mut config_clone = config.clone();
1245        let consensus_config = config_clone
1246            .consensus_config
1247            .as_mut()
1248            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1249
1250        let client = Arc::new(UpdatableConsensusClient::new());
1251        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1252            &committee,
1253            consensus_config,
1254            state.name,
1255            connection_monitor_status.clone(),
1256            &registry_service.default_registry(),
1257            epoch_store.protocol_config().clone(),
1258            client.clone(),
1259            checkpoint_store.clone(),
1260        ));
1261        let consensus_manager = Arc::new(ConsensusManager::new(
1262            &config,
1263            consensus_config,
1264            registry_service,
1265            client,
1266        ));
1267
1268        // This only gets started up once, not on every epoch. (Make call to remove every epoch.)
1269        let consensus_store_pruner = ConsensusStorePruner::new(
1270            consensus_manager.get_storage_base_path(),
1271            consensus_config.db_retention_epochs(),
1272            consensus_config.db_pruner_period(),
1273            &registry_service.default_registry(),
1274        );
1275
1276        let sui_tx_validator_metrics =
1277            SuiTxValidatorMetrics::new(&registry_service.default_registry());
1278
1279        let validator_server_handle = Self::start_grpc_validator_service(
1280            &config,
1281            state.clone(),
1282            consensus_adapter.clone(),
1283            &registry_service.default_registry(),
1284        )
1285        .await?;
1286
1287        // Starts an overload monitor that monitors the execution of the authority.
1288        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1289        let validator_overload_monitor_handle = if config
1290            .authority_overload_config
1291            .max_load_shedding_percentage
1292            > 0
1293        {
1294            let authority_state = Arc::downgrade(&state);
1295            let overload_config = config.authority_overload_config.clone();
1296            fail_point!("starting_overload_monitor");
1297            Some(spawn_monitored_task!(overload_monitor(
1298                authority_state,
1299                overload_config,
1300            )))
1301        } else {
1302            None
1303        };
1304
1305        Self::start_epoch_specific_validator_components(
1306            &config,
1307            state.clone(),
1308            consensus_adapter,
1309            checkpoint_store,
1310            epoch_store,
1311            state_sync_handle,
1312            randomness_handle,
1313            consensus_manager,
1314            consensus_store_pruner,
1315            global_state_hasher,
1316            backpressure_manager,
1317            validator_server_handle,
1318            validator_overload_monitor_handle,
1319            checkpoint_metrics,
1320            sui_node_metrics,
1321            sui_tx_validator_metrics,
1322        )
1323        .await
1324    }
1325
1326    async fn start_epoch_specific_validator_components(
1327        config: &NodeConfig,
1328        state: Arc<AuthorityState>,
1329        consensus_adapter: Arc<ConsensusAdapter>,
1330        checkpoint_store: Arc<CheckpointStore>,
1331        epoch_store: Arc<AuthorityPerEpochStore>,
1332        state_sync_handle: state_sync::Handle,
1333        randomness_handle: randomness::Handle,
1334        consensus_manager: Arc<ConsensusManager>,
1335        consensus_store_pruner: ConsensusStorePruner,
1336        state_hasher: Weak<GlobalStateHasher>,
1337        backpressure_manager: Arc<BackpressureManager>,
1338        validator_server_handle: SpawnOnce,
1339        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1340        checkpoint_metrics: Arc<CheckpointMetrics>,
1341        sui_node_metrics: Arc<SuiNodeMetrics>,
1342        sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1343    ) -> Result<ValidatorComponents> {
1344        let checkpoint_service = Self::build_checkpoint_service(
1345            config,
1346            consensus_adapter.clone(),
1347            checkpoint_store.clone(),
1348            epoch_store.clone(),
1349            state.clone(),
1350            state_sync_handle,
1351            state_hasher,
1352            checkpoint_metrics.clone(),
1353        );
1354
1355        // create a new map that gets injected into both the consensus handler and the consensus adapter
1356        // the consensus handler will write values forwarded from consensus, and the consensus adapter
1357        // will read the values to make decisions about which validator submits a transaction to consensus
1358        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1359
1360        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1361
1362        if epoch_store.randomness_state_enabled() {
1363            let randomness_manager = RandomnessManager::try_new(
1364                Arc::downgrade(&epoch_store),
1365                Box::new(consensus_adapter.clone()),
1366                randomness_handle,
1367                config.protocol_key_pair(),
1368            )
1369            .await;
1370            if let Some(randomness_manager) = randomness_manager {
1371                epoch_store
1372                    .set_randomness_manager(randomness_manager)
1373                    .await?;
1374            }
1375        }
1376
1377        ExecutionTimeObserver::spawn(
1378            epoch_store.clone(),
1379            Box::new(consensus_adapter.clone()),
1380            config
1381                .execution_time_observer_config
1382                .clone()
1383                .unwrap_or_default(),
1384        );
1385
1386        let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1387            None,
1388            state.metrics.clone(),
1389        ));
1390
1391        let throughput_profiler = Arc::new(ConsensusThroughputProfiler::new(
1392            throughput_calculator.clone(),
1393            None,
1394            None,
1395            state.metrics.clone(),
1396            ThroughputProfileRanges::from_chain(epoch_store.get_chain_identifier()),
1397        ));
1398
1399        consensus_adapter.swap_throughput_profiler(throughput_profiler);
1400
1401        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1402            state.clone(),
1403            checkpoint_service.clone(),
1404            epoch_store.clone(),
1405            consensus_adapter.clone(),
1406            low_scoring_authorities,
1407            throughput_calculator,
1408            backpressure_manager,
1409            config.congestion_log.clone(),
1410        );
1411
1412        info!("Starting consensus manager asynchronously");
1413
1414        // Spawn consensus startup asynchronously to avoid blocking other components
1415        tokio::spawn({
1416            let config = config.clone();
1417            let epoch_store = epoch_store.clone();
1418            let sui_tx_validator = SuiTxValidator::new(
1419                state.clone(),
1420                epoch_store.clone(),
1421                checkpoint_service.clone(),
1422                sui_tx_validator_metrics.clone(),
1423            );
1424            let consensus_manager = consensus_manager.clone();
1425            async move {
1426                consensus_manager
1427                    .start(
1428                        &config,
1429                        epoch_store,
1430                        consensus_handler_initializer,
1431                        sui_tx_validator,
1432                    )
1433                    .await;
1434            }
1435        });
1436        let replay_waiter = consensus_manager.replay_waiter();
1437
1438        info!("Spawning checkpoint service");
1439        let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1440            None
1441        } else {
1442            Some(replay_waiter)
1443        };
1444        checkpoint_service
1445            .spawn(epoch_store.clone(), replay_waiter)
1446            .await;
1447
1448        if epoch_store.authenticator_state_enabled() {
1449            Self::start_jwk_updater(
1450                config,
1451                sui_node_metrics,
1452                state.name,
1453                epoch_store.clone(),
1454                consensus_adapter.clone(),
1455            );
1456        }
1457
1458        Ok(ValidatorComponents {
1459            validator_server_handle,
1460            validator_overload_monitor_handle,
1461            consensus_manager,
1462            consensus_store_pruner,
1463            consensus_adapter,
1464            checkpoint_metrics,
1465            sui_tx_validator_metrics,
1466        })
1467    }
1468
1469    fn build_checkpoint_service(
1470        config: &NodeConfig,
1471        consensus_adapter: Arc<ConsensusAdapter>,
1472        checkpoint_store: Arc<CheckpointStore>,
1473        epoch_store: Arc<AuthorityPerEpochStore>,
1474        state: Arc<AuthorityState>,
1475        state_sync_handle: state_sync::Handle,
1476        state_hasher: Weak<GlobalStateHasher>,
1477        checkpoint_metrics: Arc<CheckpointMetrics>,
1478    ) -> Arc<CheckpointService> {
1479        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1480        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1481
1482        debug!(
1483            "Starting checkpoint service with epoch start timestamp {}
1484            and epoch duration {}",
1485            epoch_start_timestamp_ms, epoch_duration_ms
1486        );
1487
1488        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1489            sender: consensus_adapter,
1490            signer: state.secret.clone(),
1491            authority: config.protocol_public_key(),
1492            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1493                .checked_add(epoch_duration_ms)
1494                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1495            metrics: checkpoint_metrics.clone(),
1496        });
1497
1498        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1499        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1500        let max_checkpoint_size_bytes =
1501            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1502
1503        CheckpointService::build(
1504            state.clone(),
1505            checkpoint_store,
1506            epoch_store,
1507            state.get_transaction_cache_reader().clone(),
1508            state_hasher,
1509            checkpoint_output,
1510            Box::new(certified_checkpoint_output),
1511            checkpoint_metrics,
1512            max_tx_per_checkpoint,
1513            max_checkpoint_size_bytes,
1514        )
1515    }
1516
1517    fn construct_consensus_adapter(
1518        committee: &Committee,
1519        consensus_config: &ConsensusConfig,
1520        authority: AuthorityName,
1521        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1522        prometheus_registry: &Registry,
1523        protocol_config: ProtocolConfig,
1524        consensus_client: Arc<dyn ConsensusClient>,
1525        checkpoint_store: Arc<CheckpointStore>,
1526    ) -> ConsensusAdapter {
1527        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1528        // The consensus adapter allows the authority to send user certificates through consensus.
1529
1530        ConsensusAdapter::new(
1531            consensus_client,
1532            checkpoint_store,
1533            authority,
1534            connection_monitor_status,
1535            consensus_config.max_pending_transactions(),
1536            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1537            consensus_config.max_submit_position,
1538            consensus_config.submit_delay_step_override(),
1539            ca_metrics,
1540            protocol_config,
1541        )
1542    }
1543
1544    async fn start_grpc_validator_service(
1545        config: &NodeConfig,
1546        state: Arc<AuthorityState>,
1547        consensus_adapter: Arc<ConsensusAdapter>,
1548        prometheus_registry: &Registry,
1549    ) -> Result<SpawnOnce> {
1550        let validator_service = ValidatorService::new(
1551            state.clone(),
1552            consensus_adapter,
1553            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1554            config.policy_config.clone().map(|p| p.client_id_source),
1555        );
1556
1557        let mut server_conf = mysten_network::config::Config::new();
1558        server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1559        server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1560        server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1561        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1562        server_conf.load_shed = config.grpc_load_shed;
1563        let mut server_builder =
1564            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1565
1566        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1567
1568        let tls_config = sui_tls::create_rustls_server_config(
1569            config.network_key_pair().copy().private(),
1570            SUI_TLS_SERVER_NAME.to_string(),
1571        );
1572
1573        let network_address = config.network_address().clone();
1574
1575        let (ready_tx, ready_rx) = oneshot::channel();
1576
1577        Ok(SpawnOnce::new(ready_rx, async move {
1578            let server = server_builder
1579                .bind(&network_address, Some(tls_config))
1580                .await
1581                .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1582            let local_addr = server.local_addr();
1583            info!("Listening to traffic on {local_addr}");
1584            ready_tx.send(()).unwrap();
1585            if let Err(err) = server.serve().await {
1586                info!("Server stopped: {err}");
1587            }
1588            info!("Server stopped");
1589        }))
1590    }
1591
1592    pub fn state(&self) -> Arc<AuthorityState> {
1593        self.state.clone()
1594    }
1595
1596    // Only used for testing because of how epoch store is loaded.
1597    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1598        self.state.reference_gas_price_for_testing()
1599    }
1600
1601    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1602        self.state.committee_store().clone()
1603    }
1604
1605    /*
1606    pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1607        self.state.db()
1608    }
1609    */
1610
1611    /// Clone an AuthorityAggregator currently used in this node, if the node is a fullnode.
1612    /// After reconfig, Transaction Driver builds a new AuthorityAggregator. The caller
1613    /// of this function will mostly likely want to call this again
1614    /// to get a fresh one.
1615    pub fn clone_authority_aggregator(
1616        &self,
1617    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1618        self.transaction_orchestrator
1619            .as_ref()
1620            .map(|to| to.clone_authority_aggregator())
1621    }
1622
1623    pub fn transaction_orchestrator(
1624        &self,
1625    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1626        self.transaction_orchestrator.clone()
1627    }
1628
1629    /// This function awaits the completion of checkpoint execution of the current epoch,
1630    /// after which it initiates reconfiguration of the entire system.
1631    pub async fn monitor_reconfiguration(
1632        self: Arc<Self>,
1633        mut epoch_store: Arc<AuthorityPerEpochStore>,
1634    ) -> Result<()> {
1635        let checkpoint_executor_metrics =
1636            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1637
1638        loop {
1639            let mut hasher_guard = self.global_state_hasher.lock().await;
1640            let hasher = hasher_guard.take().unwrap();
1641            info!(
1642                "Creating checkpoint executor for epoch {}",
1643                epoch_store.epoch()
1644            );
1645            let checkpoint_executor = CheckpointExecutor::new(
1646                epoch_store.clone(),
1647                self.checkpoint_store.clone(),
1648                self.state.clone(),
1649                hasher.clone(),
1650                self.backpressure_manager.clone(),
1651                self.config.checkpoint_executor_config.clone(),
1652                checkpoint_executor_metrics.clone(),
1653                self.subscription_service_checkpoint_sender.clone(),
1654            );
1655
1656            let run_with_range = self.config.run_with_range;
1657
1658            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1659
1660            // Update the current protocol version metric.
1661            self.metrics
1662                .current_protocol_version
1663                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1664
1665            // Advertise capabilities to committee, if we are a validator.
1666            if let Some(components) = &*self.validator_components.lock().await {
1667                // TODO: without this sleep, the consensus message is not delivered reliably.
1668                tokio::time::sleep(Duration::from_millis(1)).await;
1669
1670                let config = cur_epoch_store.protocol_config();
1671                let mut supported_protocol_versions = self
1672                    .config
1673                    .supported_protocol_versions
1674                    .expect("Supported versions should be populated")
1675                    // no need to send digests of versions less than the current version
1676                    .truncate_below(config.version);
1677
1678                while supported_protocol_versions.max > config.version {
1679                    let proposed_protocol_config = ProtocolConfig::get_for_version(
1680                        supported_protocol_versions.max,
1681                        cur_epoch_store.get_chain(),
1682                    );
1683
1684                    if proposed_protocol_config.enable_accumulators()
1685                        && !epoch_store.accumulator_root_exists()
1686                    {
1687                        error!(
1688                            "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1689                            supported_protocol_versions.max
1690                        );
1691                        supported_protocol_versions.max = supported_protocol_versions.max.prev();
1692                    } else {
1693                        break;
1694                    }
1695                }
1696
1697                let binary_config = config.binary_config(None);
1698                let transaction = ConsensusTransaction::new_capability_notification_v2(
1699                    AuthorityCapabilitiesV2::new(
1700                        self.state.name,
1701                        cur_epoch_store.get_chain_identifier().chain(),
1702                        supported_protocol_versions,
1703                        self.state
1704                            .get_available_system_packages(&binary_config)
1705                            .await,
1706                    ),
1707                );
1708                info!(?transaction, "submitting capabilities to consensus");
1709                components.consensus_adapter.submit(
1710                    transaction,
1711                    None,
1712                    &cur_epoch_store,
1713                    None,
1714                    None,
1715                )?;
1716            }
1717
1718            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1719
1720            if stop_condition == StopReason::RunWithRangeCondition {
1721                SuiNode::shutdown(&self).await;
1722                self.shutdown_channel_tx
1723                    .send(run_with_range)
1724                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1725                return Ok(());
1726            }
1727
1728            // Safe to call because we are in the middle of reconfiguration.
1729            let latest_system_state = self
1730                .state
1731                .get_object_cache_reader()
1732                .get_sui_system_state_object_unsafe()
1733                .expect("Read Sui System State object cannot fail");
1734
1735            #[cfg(msim)]
1736            if !self
1737                .sim_state
1738                .sim_safe_mode_expected
1739                .load(Ordering::Relaxed)
1740            {
1741                debug_assert!(!latest_system_state.safe_mode());
1742            }
1743
1744            #[cfg(not(msim))]
1745            debug_assert!(!latest_system_state.safe_mode());
1746
1747            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1748                && self.state.is_fullnode(&cur_epoch_store)
1749            {
1750                warn!(
1751                    "Failed to send end of epoch notification to subscriber: {:?}",
1752                    err
1753                );
1754            }
1755
1756            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1757            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1758
1759            self.auth_agg.store(Arc::new(
1760                self.auth_agg
1761                    .load()
1762                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1763            ));
1764
1765            let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1766            let next_epoch = next_epoch_committee.epoch();
1767            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1768
1769            info!(
1770                next_epoch,
1771                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1772            );
1773
1774            fail_point_async!("reconfig_delay");
1775
1776            // We save the connection monitor status map regardless of validator / fullnode status
1777            // so that we don't need to restart the connection monitor every epoch.
1778            // Update the mappings that will be used by the consensus adapter if it exists or is
1779            // about to be created.
1780            let authority_names_to_peer_ids =
1781                new_epoch_start_state.get_authority_names_to_peer_ids();
1782            self.connection_monitor_status
1783                .update_mapping_for_epoch(authority_names_to_peer_ids);
1784
1785            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1786
1787            update_peer_addresses(&self.config, &self.endpoint_manager, &new_epoch_start_state);
1788
1789            let mut validator_components_lock_guard = self.validator_components.lock().await;
1790
1791            // The following code handles 4 different cases, depending on whether the node
1792            // was a validator in the previous epoch, and whether the node is a validator
1793            // in the new epoch.
1794            let new_epoch_store = self
1795                .reconfigure_state(
1796                    &self.state,
1797                    &cur_epoch_store,
1798                    next_epoch_committee.clone(),
1799                    new_epoch_start_state,
1800                    hasher.clone(),
1801                )
1802                .await;
1803
1804            let new_validator_components = if let Some(ValidatorComponents {
1805                validator_server_handle,
1806                validator_overload_monitor_handle,
1807                consensus_manager,
1808                consensus_store_pruner,
1809                consensus_adapter,
1810                checkpoint_metrics,
1811                sui_tx_validator_metrics,
1812            }) = validator_components_lock_guard.take()
1813            {
1814                info!("Reconfiguring the validator.");
1815
1816                consensus_manager.shutdown().await;
1817                info!("Consensus has shut down.");
1818
1819                info!("Epoch store finished reconfiguration.");
1820
1821                // No other components should be holding a strong reference to state hasher
1822                // at this point. Confirm here before we swap in the new hasher.
1823                let global_state_hasher_metrics = Arc::into_inner(hasher)
1824                    .expect("Object state hasher should have no other references at this point")
1825                    .metrics();
1826                let new_hasher = Arc::new(GlobalStateHasher::new(
1827                    self.state.get_global_state_hash_store().clone(),
1828                    global_state_hasher_metrics,
1829                ));
1830                let weak_hasher = Arc::downgrade(&new_hasher);
1831                *hasher_guard = Some(new_hasher);
1832
1833                consensus_store_pruner.prune(next_epoch).await;
1834
1835                if self.state.is_validator(&new_epoch_store) {
1836                    // Only restart consensus if this node is still a validator in the new epoch.
1837                    Some(
1838                        Self::start_epoch_specific_validator_components(
1839                            &self.config,
1840                            self.state.clone(),
1841                            consensus_adapter,
1842                            self.checkpoint_store.clone(),
1843                            new_epoch_store.clone(),
1844                            self.state_sync_handle.clone(),
1845                            self.randomness_handle.clone(),
1846                            consensus_manager,
1847                            consensus_store_pruner,
1848                            weak_hasher,
1849                            self.backpressure_manager.clone(),
1850                            validator_server_handle,
1851                            validator_overload_monitor_handle,
1852                            checkpoint_metrics,
1853                            self.metrics.clone(),
1854                            sui_tx_validator_metrics,
1855                        )
1856                        .await?,
1857                    )
1858                } else {
1859                    info!("This node is no longer a validator after reconfiguration");
1860                    None
1861                }
1862            } else {
1863                // No other components should be holding a strong reference to state hasher
1864                // at this point. Confirm here before we swap in the new hasher.
1865                let global_state_hasher_metrics = Arc::into_inner(hasher)
1866                    .expect("Object state hasher should have no other references at this point")
1867                    .metrics();
1868                let new_hasher = Arc::new(GlobalStateHasher::new(
1869                    self.state.get_global_state_hash_store().clone(),
1870                    global_state_hasher_metrics,
1871                ));
1872                let weak_hasher = Arc::downgrade(&new_hasher);
1873                *hasher_guard = Some(new_hasher);
1874
1875                if self.state.is_validator(&new_epoch_store) {
1876                    info!("Promoting the node from fullnode to validator, starting grpc server");
1877
1878                    let mut components = Self::construct_validator_components(
1879                        self.config.clone(),
1880                        self.state.clone(),
1881                        Arc::new(next_epoch_committee.clone()),
1882                        new_epoch_store.clone(),
1883                        self.checkpoint_store.clone(),
1884                        self.state_sync_handle.clone(),
1885                        self.randomness_handle.clone(),
1886                        weak_hasher,
1887                        self.backpressure_manager.clone(),
1888                        self.connection_monitor_status.clone(),
1889                        &self.registry_service,
1890                        self.metrics.clone(),
1891                        self.checkpoint_metrics.clone(),
1892                    )
1893                    .await?;
1894
1895                    components.validator_server_handle =
1896                        components.validator_server_handle.start().await;
1897
1898                    // Set the consensus address updater as the full node got promoted now to a validator.
1899                    self.endpoint_manager
1900                        .set_consensus_address_updater(components.consensus_manager.clone());
1901
1902                    Some(components)
1903                } else {
1904                    None
1905                }
1906            };
1907            *validator_components_lock_guard = new_validator_components;
1908
1909            // Force releasing current epoch store DB handle, because the
1910            // Arc<AuthorityPerEpochStore> may linger.
1911            cur_epoch_store.release_db_handles();
1912
1913            if cfg!(msim)
1914                && !matches!(
1915                    self.config
1916                        .authority_store_pruning_config
1917                        .num_epochs_to_retain_for_checkpoints(),
1918                    None | Some(u64::MAX) | Some(0)
1919                )
1920            {
1921                self.state
1922                    .prune_checkpoints_for_eligible_epochs_for_testing(
1923                        self.config.clone(),
1924                        sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1925                    )
1926                    .await?;
1927            }
1928
1929            epoch_store = new_epoch_store;
1930            info!("Reconfiguration finished");
1931        }
1932    }
1933
1934    async fn shutdown(&self) {
1935        if let Some(validator_components) = &*self.validator_components.lock().await {
1936            validator_components.consensus_manager.shutdown().await;
1937        }
1938    }
1939
1940    async fn reconfigure_state(
1941        &self,
1942        state: &Arc<AuthorityState>,
1943        cur_epoch_store: &AuthorityPerEpochStore,
1944        next_epoch_committee: Committee,
1945        next_epoch_start_system_state: EpochStartSystemState,
1946        global_state_hasher: Arc<GlobalStateHasher>,
1947    ) -> Arc<AuthorityPerEpochStore> {
1948        let next_epoch = next_epoch_committee.epoch();
1949
1950        let last_checkpoint = self
1951            .checkpoint_store
1952            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1953            .expect("Error loading last checkpoint for current epoch")
1954            .expect("Could not load last checkpoint for current epoch");
1955
1956        let last_checkpoint_seq = *last_checkpoint.sequence_number();
1957
1958        assert_eq!(
1959            Some(last_checkpoint_seq),
1960            self.checkpoint_store
1961                .get_highest_executed_checkpoint_seq_number()
1962                .expect("Error loading highest executed checkpoint sequence number")
1963        );
1964
1965        let epoch_start_configuration = EpochStartConfiguration::new(
1966            next_epoch_start_system_state,
1967            *last_checkpoint.digest(),
1968            state.get_object_store().as_ref(),
1969            EpochFlag::default_flags_for_new_epoch(&state.config),
1970        )
1971        .expect("EpochStartConfiguration construction cannot fail");
1972
1973        let new_epoch_store = self
1974            .state
1975            .reconfigure(
1976                cur_epoch_store,
1977                self.config.supported_protocol_versions.unwrap(),
1978                next_epoch_committee,
1979                epoch_start_configuration,
1980                global_state_hasher,
1981                &self.config.expensive_safety_check_config,
1982                last_checkpoint_seq,
1983            )
1984            .await
1985            .expect("Reconfigure authority state cannot fail");
1986        info!(next_epoch, "Node State has been reconfigured");
1987        assert_eq!(next_epoch, new_epoch_store.epoch());
1988        self.state.get_reconfig_api().update_epoch_flags_metrics(
1989            cur_epoch_store.epoch_start_config().flags(),
1990            new_epoch_store.epoch_start_config().flags(),
1991        );
1992
1993        new_epoch_store
1994    }
1995
1996    pub fn get_config(&self) -> &NodeConfig {
1997        &self.config
1998    }
1999
2000    pub fn randomness_handle(&self) -> randomness::Handle {
2001        self.randomness_handle.clone()
2002    }
2003
2004    pub fn endpoint_manager(&self) -> &EndpointManager {
2005        &self.endpoint_manager
2006    }
2007
2008    /// Get a short prefix of a digest for metric labels
2009    fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2010        let digest_str = digest.to_string();
2011        if digest_str.len() >= 8 {
2012            digest_str[0..8].to_string()
2013        } else {
2014            digest_str
2015        }
2016    }
2017
2018    /// Check for previously detected forks and handle them appropriately.
2019    /// For validators with fork recovery config, clear the fork if it matches the recovery config.
2020    /// For all other cases, block node startup if a fork is detected.
2021    async fn check_and_recover_forks(
2022        checkpoint_store: &CheckpointStore,
2023        checkpoint_metrics: &CheckpointMetrics,
2024        is_validator: bool,
2025        fork_recovery: Option<&ForkRecoveryConfig>,
2026    ) -> Result<()> {
2027        // Fork detection and recovery is only relevant for validators
2028        // Fullnodes should sync from validators and don't need fork checking
2029        if !is_validator {
2030            return Ok(());
2031        }
2032
2033        // Try to recover from forks if recovery config is provided
2034        if let Some(recovery) = fork_recovery {
2035            Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2036            Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2037        }
2038
2039        if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2040            .get_checkpoint_fork_detected()
2041            .map_err(|e| {
2042                error!("Failed to check for checkpoint fork: {:?}", e);
2043                e
2044            })?
2045        {
2046            Self::handle_checkpoint_fork(
2047                checkpoint_seq,
2048                checkpoint_digest,
2049                checkpoint_metrics,
2050                fork_recovery,
2051            )
2052            .await?;
2053        }
2054        if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2055            .get_transaction_fork_detected()
2056            .map_err(|e| {
2057                error!("Failed to check for transaction fork: {:?}", e);
2058                e
2059            })?
2060        {
2061            Self::handle_transaction_fork(
2062                tx_digest,
2063                expected_effects,
2064                actual_effects,
2065                checkpoint_metrics,
2066                fork_recovery,
2067            )
2068            .await?;
2069        }
2070
2071        Ok(())
2072    }
2073
2074    fn try_recover_checkpoint_fork(
2075        checkpoint_store: &CheckpointStore,
2076        recovery: &ForkRecoveryConfig,
2077    ) -> Result<()> {
2078        // If configured overrides include a checkpoint whose locally computed digest mismatches,
2079        // clear locally computed checkpoints from that sequence (inclusive).
2080        for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2081            let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2082                anyhow::bail!(
2083                    "Invalid checkpoint digest override for seq {}: {}",
2084                    seq,
2085                    expected_digest_str
2086                );
2087            };
2088
2089            if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2090                let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2091                if local_digest != expected_digest {
2092                    info!(
2093                        seq,
2094                        local = %Self::get_digest_prefix(local_digest),
2095                        expected = %Self::get_digest_prefix(expected_digest),
2096                        "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2097                        seq
2098                    );
2099                    checkpoint_store
2100                        .clear_locally_computed_checkpoints_from(*seq)
2101                        .context(
2102                            "Failed to clear locally computed checkpoints from override seq",
2103                        )?;
2104                }
2105            }
2106        }
2107
2108        if let Some((checkpoint_seq, checkpoint_digest)) =
2109            checkpoint_store.get_checkpoint_fork_detected()?
2110            && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2111        {
2112            info!(
2113                "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2114                checkpoint_seq, checkpoint_digest
2115            );
2116            checkpoint_store
2117                .clear_checkpoint_fork_detected()
2118                .expect("Failed to clear checkpoint fork detected marker");
2119        }
2120        Ok(())
2121    }
2122
2123    fn try_recover_transaction_fork(
2124        checkpoint_store: &CheckpointStore,
2125        recovery: &ForkRecoveryConfig,
2126    ) -> Result<()> {
2127        if recovery.transaction_overrides.is_empty() {
2128            return Ok(());
2129        }
2130
2131        if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2132            && recovery
2133                .transaction_overrides
2134                .contains_key(&tx_digest.to_string())
2135        {
2136            info!(
2137                "Fork recovery enabled: clearing transaction fork for tx {:?}",
2138                tx_digest
2139            );
2140            checkpoint_store
2141                .clear_transaction_fork_detected()
2142                .expect("Failed to clear transaction fork detected marker");
2143        }
2144        Ok(())
2145    }
2146
2147    fn get_current_timestamp() -> u64 {
2148        std::time::SystemTime::now()
2149            .duration_since(std::time::SystemTime::UNIX_EPOCH)
2150            .unwrap()
2151            .as_secs()
2152    }
2153
2154    async fn handle_checkpoint_fork(
2155        checkpoint_seq: u64,
2156        checkpoint_digest: CheckpointDigest,
2157        checkpoint_metrics: &CheckpointMetrics,
2158        fork_recovery: Option<&ForkRecoveryConfig>,
2159    ) -> Result<()> {
2160        checkpoint_metrics
2161            .checkpoint_fork_crash_mode
2162            .with_label_values(&[
2163                &checkpoint_seq.to_string(),
2164                &Self::get_digest_prefix(checkpoint_digest),
2165                &Self::get_current_timestamp().to_string(),
2166            ])
2167            .set(1);
2168
2169        let behavior = fork_recovery
2170            .map(|fr| fr.fork_crash_behavior)
2171            .unwrap_or_default();
2172
2173        match behavior {
2174            ForkCrashBehavior::AwaitForkRecovery => {
2175                error!(
2176                    checkpoint_seq = checkpoint_seq,
2177                    checkpoint_digest = ?checkpoint_digest,
2178                    "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2179                );
2180                futures::future::pending::<()>().await;
2181                unreachable!("pending() should never return");
2182            }
2183            ForkCrashBehavior::ReturnError => {
2184                error!(
2185                    checkpoint_seq = checkpoint_seq,
2186                    checkpoint_digest = ?checkpoint_digest,
2187                    "Checkpoint fork detected! Returning error."
2188                );
2189                Err(anyhow::anyhow!(
2190                    "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2191                    checkpoint_seq,
2192                    checkpoint_digest
2193                ))
2194            }
2195        }
2196    }
2197
2198    async fn handle_transaction_fork(
2199        tx_digest: TransactionDigest,
2200        expected_effects_digest: TransactionEffectsDigest,
2201        actual_effects_digest: TransactionEffectsDigest,
2202        checkpoint_metrics: &CheckpointMetrics,
2203        fork_recovery: Option<&ForkRecoveryConfig>,
2204    ) -> Result<()> {
2205        checkpoint_metrics
2206            .transaction_fork_crash_mode
2207            .with_label_values(&[
2208                &Self::get_digest_prefix(tx_digest),
2209                &Self::get_digest_prefix(expected_effects_digest),
2210                &Self::get_digest_prefix(actual_effects_digest),
2211                &Self::get_current_timestamp().to_string(),
2212            ])
2213            .set(1);
2214
2215        let behavior = fork_recovery
2216            .map(|fr| fr.fork_crash_behavior)
2217            .unwrap_or_default();
2218
2219        match behavior {
2220            ForkCrashBehavior::AwaitForkRecovery => {
2221                error!(
2222                    tx_digest = ?tx_digest,
2223                    expected_effects_digest = ?expected_effects_digest,
2224                    actual_effects_digest = ?actual_effects_digest,
2225                    "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2226                );
2227                futures::future::pending::<()>().await;
2228                unreachable!("pending() should never return");
2229            }
2230            ForkCrashBehavior::ReturnError => {
2231                error!(
2232                    tx_digest = ?tx_digest,
2233                    expected_effects_digest = ?expected_effects_digest,
2234                    actual_effects_digest = ?actual_effects_digest,
2235                    "Transaction fork detected! Returning error."
2236                );
2237                Err(anyhow::anyhow!(
2238                    "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2239                    tx_digest,
2240                    expected_effects_digest,
2241                    actual_effects_digest
2242                ))
2243            }
2244        }
2245    }
2246}
2247
2248#[cfg(not(msim))]
2249impl SuiNode {
2250    async fn fetch_jwks(
2251        _authority: AuthorityName,
2252        provider: &OIDCProvider,
2253    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2254        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2255        use sui_types::error::SuiErrorKind;
2256        let client = reqwest::Client::new();
2257        fetch_jwks(provider, &client, true)
2258            .await
2259            .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2260    }
2261}
2262
2263#[cfg(msim)]
2264impl SuiNode {
2265    pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2266        self.sim_state.sim_node.id()
2267    }
2268
2269    pub fn set_safe_mode_expected(&self, new_value: bool) {
2270        info!("Setting safe mode expected to {}", new_value);
2271        self.sim_state
2272            .sim_safe_mode_expected
2273            .store(new_value, Ordering::Relaxed);
2274    }
2275
2276    #[allow(unused_variables)]
2277    async fn fetch_jwks(
2278        authority: AuthorityName,
2279        provider: &OIDCProvider,
2280    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2281        get_jwk_injector()(authority, provider)
2282    }
2283}
2284
2285enum SpawnOnce {
2286    // Mutex is only needed to make SpawnOnce Send
2287    Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2288    #[allow(unused)]
2289    Started(JoinHandle<()>),
2290}
2291
2292impl SpawnOnce {
2293    pub fn new(
2294        ready_rx: oneshot::Receiver<()>,
2295        future: impl Future<Output = ()> + Send + 'static,
2296    ) -> Self {
2297        Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2298    }
2299
2300    pub async fn start(self) -> Self {
2301        match self {
2302            Self::Unstarted(ready_rx, future) => {
2303                let future = future.into_inner();
2304                let handle = tokio::spawn(future);
2305                ready_rx.await.unwrap();
2306                Self::Started(handle)
2307            }
2308            Self::Started(_) => self,
2309        }
2310    }
2311}
2312
2313/// Updates trusted peer addresses in the p2p network.
2314fn update_peer_addresses(
2315    config: &NodeConfig,
2316    endpoint_manager: &EndpointManager,
2317    epoch_start_state: &EpochStartSystemState,
2318) {
2319    for (peer_id, address) in
2320        epoch_start_state.get_validator_as_p2p_peers(config.protocol_public_key())
2321    {
2322        endpoint_manager
2323            .update_endpoint(
2324                EndpointId::P2p(peer_id),
2325                AddressSource::Committee,
2326                vec![address],
2327            )
2328            .expect("Updating peer addresses should not fail");
2329    }
2330}
2331
2332fn build_kv_store(
2333    state: &Arc<AuthorityState>,
2334    config: &NodeConfig,
2335    registry: &Registry,
2336) -> Result<Arc<TransactionKeyValueStore>> {
2337    let metrics = KeyValueStoreMetrics::new(registry);
2338    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2339
2340    let base_url = &config.transaction_kv_store_read_config.base_url;
2341
2342    if base_url.is_empty() {
2343        info!("no http kv store url provided, using local db only");
2344        return Ok(Arc::new(db_store));
2345    }
2346
2347    let base_url: url::Url = base_url.parse().tap_err(|e| {
2348        error!(
2349            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2350            base_url, e
2351        )
2352    })?;
2353
2354    let network_str = match state.get_chain_identifier().chain() {
2355        Chain::Mainnet => "/mainnet",
2356        _ => {
2357            info!("using local db only for kv store");
2358            return Ok(Arc::new(db_store));
2359        }
2360    };
2361
2362    let base_url = base_url.join(network_str)?.to_string();
2363    let http_store = HttpKVStore::new_kv(
2364        &base_url,
2365        config.transaction_kv_store_read_config.cache_size,
2366        metrics.clone(),
2367    )?;
2368    info!("using local key-value store with fallback to http key-value store");
2369    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2370        db_store,
2371        http_store,
2372        metrics,
2373        "json_rpc_fallback",
2374    )))
2375}
2376
2377async fn build_http_servers(
2378    state: Arc<AuthorityState>,
2379    store: RocksDbStore,
2380    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2381    config: &NodeConfig,
2382    prometheus_registry: &Registry,
2383    server_version: ServerVersion,
2384) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2385    // Validators do not expose these APIs
2386    if config.consensus_config().is_some() {
2387        return Ok((HttpServers::default(), None));
2388    }
2389
2390    info!("starting rpc service with config: {:?}", config.rpc);
2391
2392    let mut router = axum::Router::new();
2393
2394    let json_rpc_router = {
2395        let traffic_controller = state.traffic_controller.clone();
2396        let mut server = JsonRpcServerBuilder::new(
2397            env!("CARGO_PKG_VERSION"),
2398            prometheus_registry,
2399            traffic_controller,
2400            config.policy_config.clone(),
2401        );
2402
2403        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2404
2405        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2406        server.register_module(ReadApi::new(
2407            state.clone(),
2408            kv_store.clone(),
2409            metrics.clone(),
2410        ))?;
2411        server.register_module(CoinReadApi::new(
2412            state.clone(),
2413            kv_store.clone(),
2414            metrics.clone(),
2415        ))?;
2416
2417        // if run_with_range is enabled we want to prevent any transactions
2418        // run_with_range = None is normal operating conditions
2419        if config.run_with_range.is_none() {
2420            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2421        }
2422        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2423        server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2424
2425        if let Some(transaction_orchestrator) = transaction_orchestrator {
2426            server.register_module(TransactionExecutionApi::new(
2427                state.clone(),
2428                transaction_orchestrator.clone(),
2429                metrics.clone(),
2430            ))?;
2431        }
2432
2433        let name_service_config =
2434            if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2435                config.name_service_package_address,
2436                config.name_service_registry_id,
2437                config.name_service_reverse_registry_id,
2438            ) {
2439                sui_name_service::NameServiceConfig::new(
2440                    package_address,
2441                    registry_id,
2442                    reverse_registry_id,
2443                )
2444            } else {
2445                match state.get_chain_identifier().chain() {
2446                    Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2447                    Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2448                    Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2449                }
2450            };
2451
2452        server.register_module(IndexerApi::new(
2453            state.clone(),
2454            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2455            kv_store,
2456            name_service_config,
2457            metrics,
2458            config.indexer_max_subscriptions,
2459        ))?;
2460        server.register_module(MoveUtils::new(state.clone()))?;
2461
2462        let server_type = config.jsonrpc_server_type();
2463
2464        server.to_router(server_type).await?
2465    };
2466
2467    router = router.merge(json_rpc_router);
2468
2469    let (subscription_service_checkpoint_sender, subscription_service_handle) =
2470        SubscriptionService::build(prometheus_registry);
2471    let rpc_router = {
2472        let mut rpc_service =
2473            sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2474        rpc_service.with_server_version(server_version);
2475
2476        if let Some(config) = config.rpc.clone() {
2477            rpc_service.with_config(config);
2478        }
2479
2480        rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2481        rpc_service.with_subscription_service(subscription_service_handle);
2482
2483        if let Some(transaction_orchestrator) = transaction_orchestrator {
2484            rpc_service.with_executor(transaction_orchestrator.clone())
2485        }
2486
2487        rpc_service.into_router().await
2488    };
2489
2490    let layers = ServiceBuilder::new()
2491        .map_request(|mut request: axum::http::Request<_>| {
2492            if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2493                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2494                request.extensions_mut().insert(axum_connect_info);
2495            }
2496            request
2497        })
2498        .layer(axum::middleware::from_fn(server_timing_middleware))
2499        // Setup a permissive CORS policy
2500        .layer(
2501            tower_http::cors::CorsLayer::new()
2502                .allow_methods([http::Method::GET, http::Method::POST])
2503                .allow_origin(tower_http::cors::Any)
2504                .allow_headers(tower_http::cors::Any)
2505                .expose_headers(tower_http::cors::Any),
2506        );
2507
2508    router = router.merge(rpc_router).layer(layers);
2509
2510    let https = if let Some((tls_config, https_address)) = config
2511        .rpc()
2512        .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2513    {
2514        let https = sui_http::Builder::new()
2515            .tls_single_cert(tls_config.cert(), tls_config.key())
2516            .and_then(|builder| builder.serve(https_address, router.clone()))
2517            .map_err(|e| anyhow::anyhow!(e))?;
2518
2519        info!(
2520            https_address =? https.local_addr(),
2521            "HTTPS rpc server listening on {}",
2522            https.local_addr()
2523        );
2524
2525        Some(https)
2526    } else {
2527        None
2528    };
2529
2530    let http = sui_http::Builder::new()
2531        .serve(&config.json_rpc_address, router)
2532        .map_err(|e| anyhow::anyhow!(e))?;
2533
2534    info!(
2535        http_address =? http.local_addr(),
2536        "HTTP rpc server listening on {}",
2537        http.local_addr()
2538    );
2539
2540    Ok((
2541        HttpServers {
2542            http: Some(http),
2543            https,
2544        },
2545        Some(subscription_service_checkpoint_sender),
2546    ))
2547}
2548
2549#[cfg(not(test))]
2550fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2551    protocol_config.max_transactions_per_checkpoint() as usize
2552}
2553
2554#[cfg(test)]
2555fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2556    2
2557}
2558
2559#[derive(Default)]
2560struct HttpServers {
2561    #[allow(unused)]
2562    http: Option<sui_http::ServerHandle>,
2563    #[allow(unused)]
2564    https: Option<sui_http::ServerHandle>,
2565}
2566
2567#[cfg(test)]
2568mod tests {
2569    use super::*;
2570    use prometheus::Registry;
2571    use std::collections::BTreeMap;
2572    use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2573    use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2574    use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2575
2576    #[tokio::test]
2577    async fn test_fork_error_and_recovery_both_paths() {
2578        let checkpoint_store = CheckpointStore::new_for_tests();
2579        let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2580
2581        // ---------- Checkpoint fork path ----------
2582        let seq_num = 42;
2583        let digest = CheckpointDigest::random();
2584        checkpoint_store
2585            .record_checkpoint_fork_detected(seq_num, digest)
2586            .unwrap();
2587
2588        let fork_recovery = ForkRecoveryConfig {
2589            transaction_overrides: Default::default(),
2590            checkpoint_overrides: Default::default(),
2591            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2592        };
2593
2594        let r = SuiNode::check_and_recover_forks(
2595            &checkpoint_store,
2596            &checkpoint_metrics,
2597            true,
2598            Some(&fork_recovery),
2599        )
2600        .await;
2601        assert!(r.is_err());
2602        assert!(
2603            r.unwrap_err()
2604                .to_string()
2605                .contains("Checkpoint fork detected")
2606        );
2607
2608        let mut checkpoint_overrides = BTreeMap::new();
2609        checkpoint_overrides.insert(seq_num, digest.to_string());
2610        let fork_recovery_with_override = ForkRecoveryConfig {
2611            transaction_overrides: Default::default(),
2612            checkpoint_overrides,
2613            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2614        };
2615        let r = SuiNode::check_and_recover_forks(
2616            &checkpoint_store,
2617            &checkpoint_metrics,
2618            true,
2619            Some(&fork_recovery_with_override),
2620        )
2621        .await;
2622        assert!(r.is_ok());
2623        assert!(
2624            checkpoint_store
2625                .get_checkpoint_fork_detected()
2626                .unwrap()
2627                .is_none()
2628        );
2629
2630        // ---------- Transaction fork path ----------
2631        let tx_digest = TransactionDigest::random();
2632        let expected_effects = TransactionEffectsDigest::random();
2633        let actual_effects = TransactionEffectsDigest::random();
2634        checkpoint_store
2635            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2636            .unwrap();
2637
2638        let fork_recovery = ForkRecoveryConfig {
2639            transaction_overrides: Default::default(),
2640            checkpoint_overrides: Default::default(),
2641            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2642        };
2643        let r = SuiNode::check_and_recover_forks(
2644            &checkpoint_store,
2645            &checkpoint_metrics,
2646            true,
2647            Some(&fork_recovery),
2648        )
2649        .await;
2650        assert!(r.is_err());
2651        assert!(
2652            r.unwrap_err()
2653                .to_string()
2654                .contains("Transaction fork detected")
2655        );
2656
2657        let mut transaction_overrides = BTreeMap::new();
2658        transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2659        let fork_recovery_with_override = ForkRecoveryConfig {
2660            transaction_overrides,
2661            checkpoint_overrides: Default::default(),
2662            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2663        };
2664        let r = SuiNode::check_and_recover_forks(
2665            &checkpoint_store,
2666            &checkpoint_metrics,
2667            true,
2668            Some(&fork_recovery_with_override),
2669        )
2670        .await;
2671        assert!(r.is_ok());
2672        assert!(
2673            checkpoint_store
2674                .get_transaction_fork_detected()
2675                .unwrap()
2676                .is_none()
2677        );
2678    }
2679}