sui_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::authority::ExecutionEnv;
29use sui_core::authority::RandomnessRoundReceiver;
30use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
31use sui_core::authority::backpressure::BackpressureManager;
32use sui_core::authority::epoch_start_configuration::EpochFlag;
33use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
34use sui_core::consensus_adapter::ConsensusClient;
35use sui_core::consensus_manager::UpdatableConsensusClient;
36use sui_core::epoch::randomness::RandomnessManager;
37use sui_core::execution_cache::build_execution_cache;
38use sui_network::endpoint_manager::{AddressSource, EndpointId};
39use sui_network::validator::server::SUI_TLS_SERVER_NAME;
40use sui_types::full_checkpoint_content::Checkpoint;
41
42use sui_core::global_state_hasher::GlobalStateHashMetrics;
43use sui_core::storage::RestReadStore;
44use sui_json_rpc::bridge_api::BridgeReadApi;
45use sui_json_rpc_api::JsonRpcMetrics;
46use sui_network::randomness;
47use sui_rpc_api::RpcMetrics;
48use sui_rpc_api::ServerVersion;
49use sui_rpc_api::subscription::SubscriptionService;
50use sui_types::base_types::ConciseableName;
51use sui_types::crypto::RandomnessRound;
52use sui_types::digests::{
53    ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
54};
55use sui_types::messages_consensus::AuthorityCapabilitiesV2;
56use sui_types::sui_system_state::SuiSystemState;
57use tap::tap::TapFallible;
58use tokio::sync::oneshot;
59use tokio::sync::{Mutex, broadcast, mpsc};
60use tokio::task::JoinHandle;
61use tower::ServiceBuilder;
62use tracing::{Instrument, error_span, info};
63use tracing::{debug, error, warn};
64
65// Logs at debug level in test configuration, info level otherwise.
66// JWK logs cause significant volume in tests, but are insignificant in prod,
67// so we keep them at info
68macro_rules! jwk_log {
69    ($($arg:tt)+) => {
70        if in_test_configuration() {
71            debug!($($arg)+);
72        } else {
73            info!($($arg)+);
74        }
75    };
76}
77
78use fastcrypto_zkp::bn254::zk_login::JWK;
79pub use handle::SuiNodeHandle;
80use mysten_metrics::{RegistryService, spawn_monitored_task};
81use mysten_service::server_timing::server_timing_middleware;
82use sui_config::node::{DBCheckpointConfig, RunWithRange};
83use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
84use sui_config::node_config_metrics::NodeConfigMetrics;
85use sui_config::{ConsensusConfig, NodeConfig};
86use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
87use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
88use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
89use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
90use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
91use sui_core::authority_aggregator::AuthorityAggregator;
92use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
93use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
94use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
95use sui_core::checkpoints::{
96    CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
97    SubmitCheckpointToConsensus,
98};
99use sui_core::consensus_adapter::{
100    CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
101};
102use sui_core::consensus_manager::ConsensusManager;
103use sui_core::consensus_throughput_calculator::{
104    ConsensusThroughputCalculator, ConsensusThroughputProfiler, ThroughputProfileRanges,
105};
106use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
107use sui_core::db_checkpoint_handler::DBCheckpointHandler;
108use sui_core::epoch::committee_store::CommitteeStore;
109use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
110use sui_core::epoch::epoch_metrics::EpochMetrics;
111use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
112use sui_core::global_state_hasher::GlobalStateHasher;
113use sui_core::jsonrpc_index::IndexStore;
114use sui_core::module_cache_metrics::ResolverMetrics;
115use sui_core::overload_monitor::overload_monitor;
116use sui_core::rpc_index::RpcIndexStore;
117use sui_core::signature_verifier::SignatureVerifierMetrics;
118use sui_core::storage::RocksDbStore;
119use sui_core::transaction_orchestrator::TransactionOrchestrator;
120use sui_core::{
121    authority::{AuthorityState, AuthorityStore},
122    authority_client::NetworkAuthorityClient,
123};
124use sui_json_rpc::JsonRpcServerBuilder;
125use sui_json_rpc::coin_api::CoinReadApi;
126use sui_json_rpc::governance_api::GovernanceReadApi;
127use sui_json_rpc::indexer_api::IndexerApi;
128use sui_json_rpc::move_utils::MoveUtils;
129use sui_json_rpc::read_api::ReadApi;
130use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
131use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
132use sui_macros::fail_point;
133use sui_macros::{fail_point_async, replay_log};
134use sui_network::api::ValidatorServer;
135use sui_network::discovery;
136use sui_network::endpoint_manager::EndpointManager;
137use sui_network::state_sync;
138use sui_network::validator::server::ServerBuilder;
139use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
140use sui_snapshot::uploader::StateSnapshotUploader;
141use sui_storage::{
142    http_key_value_store::HttpKVStore,
143    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
144    key_value_store_metrics::KeyValueStoreMetrics,
145};
146use sui_types::base_types::{AuthorityName, EpochId};
147use sui_types::committee::Committee;
148use sui_types::crypto::KeypairTraits;
149use sui_types::error::{SuiError, SuiResult};
150use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
151use sui_types::sui_system_state::SuiSystemStateTrait;
152use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
153use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
154use sui_types::supported_protocol_versions::SupportedProtocolVersions;
155use typed_store::DBMetrics;
156use typed_store::rocks::default_db_options;
157
158use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
159
160pub mod admin;
161mod handle;
162pub mod metrics;
163
164pub struct ValidatorComponents {
165    validator_server_handle: SpawnOnce,
166    validator_overload_monitor_handle: Option<JoinHandle<()>>,
167    consensus_manager: Arc<ConsensusManager>,
168    consensus_store_pruner: ConsensusStorePruner,
169    consensus_adapter: Arc<ConsensusAdapter>,
170    checkpoint_metrics: Arc<CheckpointMetrics>,
171    sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
172}
173pub struct P2pComponents {
174    p2p_network: Network,
175    known_peers: HashMap<PeerId, String>,
176    discovery_handle: discovery::Handle,
177    state_sync_handle: state_sync::Handle,
178    randomness_handle: randomness::Handle,
179}
180
181#[cfg(msim)]
182mod simulator {
183    use std::sync::atomic::AtomicBool;
184    use sui_types::error::SuiErrorKind;
185
186    use super::*;
187    pub(super) struct SimState {
188        pub sim_node: sui_simulator::runtime::NodeHandle,
189        pub sim_safe_mode_expected: AtomicBool,
190        _leak_detector: sui_simulator::NodeLeakDetector,
191    }
192
193    impl Default for SimState {
194        fn default() -> Self {
195            Self {
196                sim_node: sui_simulator::runtime::NodeHandle::current(),
197                sim_safe_mode_expected: AtomicBool::new(false),
198                _leak_detector: sui_simulator::NodeLeakDetector::new(),
199            }
200        }
201    }
202
203    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
204        + Send
205        + Sync
206        + 'static;
207
208    fn default_fetch_jwks(
209        _authority: AuthorityName,
210        _provider: &OIDCProvider,
211    ) -> SuiResult<Vec<(JwkId, JWK)>> {
212        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
213        // Just load a default Twitch jwk for testing.
214        parse_jwks(
215            sui_types::zk_login_util::DEFAULT_JWK_BYTES,
216            &OIDCProvider::Twitch,
217            true,
218        )
219        .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
220    }
221
222    thread_local! {
223        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
224    }
225
226    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
227        JWK_INJECTOR.with(|injector| injector.borrow().clone())
228    }
229
230    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
231        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
232    }
233}
234
235#[cfg(msim)]
236pub use simulator::set_jwk_injector;
237#[cfg(msim)]
238use simulator::*;
239use sui_core::authority::authority_store_pruner::PrunerWatermarks;
240use sui_core::{
241    consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
242};
243
244const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
245
246pub struct SuiNode {
247    config: NodeConfig,
248    validator_components: Mutex<Option<ValidatorComponents>>,
249
250    /// The http servers responsible for serving RPC traffic (gRPC and JSON-RPC)
251    #[allow(unused)]
252    http_servers: HttpServers,
253
254    state: Arc<AuthorityState>,
255    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
256    registry_service: RegistryService,
257    metrics: Arc<SuiNodeMetrics>,
258    checkpoint_metrics: Arc<CheckpointMetrics>,
259
260    _discovery: discovery::Handle,
261    _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
262    state_sync_handle: state_sync::Handle,
263    randomness_handle: randomness::Handle,
264    checkpoint_store: Arc<CheckpointStore>,
265    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
266    connection_monitor_status: Arc<ConnectionMonitorStatus>,
267
268    /// Broadcast channel to send the starting system state for the next epoch.
269    end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
270
271    /// EndpointManager for updating peer network addresses.
272    endpoint_manager: EndpointManager,
273
274    backpressure_manager: Arc<BackpressureManager>,
275
276    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
277
278    #[cfg(msim)]
279    sim_state: SimState,
280
281    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
282    // Channel to allow signaling upstream to shutdown sui-node
283    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
284
285    /// AuthorityAggregator of the network, created at start and beginning of each epoch.
286    /// Use ArcSwap so that we could mutate it without taking mut reference.
287    // TODO: Eventually we can make this auth aggregator a shared reference so that this
288    // update will automatically propagate to other uses.
289    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
290
291    subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
292}
293
294impl fmt::Debug for SuiNode {
295    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
296        f.debug_struct("SuiNode")
297            .field("name", &self.state.name.concise())
298            .finish()
299    }
300}
301
302static MAX_JWK_KEYS_PER_FETCH: usize = 100;
303
304impl SuiNode {
305    pub async fn start(
306        config: NodeConfig,
307        registry_service: RegistryService,
308    ) -> Result<Arc<SuiNode>> {
309        Self::start_async(
310            config,
311            registry_service,
312            ServerVersion::new("sui-node", "unknown"),
313        )
314        .await
315    }
316
317    fn start_jwk_updater(
318        config: &NodeConfig,
319        metrics: Arc<SuiNodeMetrics>,
320        authority: AuthorityName,
321        epoch_store: Arc<AuthorityPerEpochStore>,
322        consensus_adapter: Arc<ConsensusAdapter>,
323    ) {
324        let epoch = epoch_store.epoch();
325
326        let supported_providers = config
327            .zklogin_oauth_providers
328            .get(&epoch_store.get_chain_identifier().chain())
329            .unwrap_or(&BTreeSet::new())
330            .iter()
331            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
332            .collect::<Vec<_>>();
333
334        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
335
336        info!(
337            ?fetch_interval,
338            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
339        );
340
341        fn validate_jwk(
342            metrics: &Arc<SuiNodeMetrics>,
343            provider: &OIDCProvider,
344            id: &JwkId,
345            jwk: &JWK,
346        ) -> bool {
347            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
348                warn!(
349                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
350                    id.iss, provider
351                );
352                metrics
353                    .invalid_jwks
354                    .with_label_values(&[&provider.to_string()])
355                    .inc();
356                return false;
357            };
358
359            if iss_provider != *provider {
360                warn!(
361                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
362                    id.iss, provider, iss_provider
363                );
364                metrics
365                    .invalid_jwks
366                    .with_label_values(&[&provider.to_string()])
367                    .inc();
368                return false;
369            }
370
371            if !check_total_jwk_size(id, jwk) {
372                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
373                metrics
374                    .invalid_jwks
375                    .with_label_values(&[&provider.to_string()])
376                    .inc();
377                return false;
378            }
379
380            true
381        }
382
383        // metrics is:
384        //  pub struct SuiNodeMetrics {
385        //      pub jwk_requests: IntCounterVec,
386        //      pub jwk_request_errors: IntCounterVec,
387        //      pub total_jwks: IntCounterVec,
388        //      pub unique_jwks: IntCounterVec,
389        //  }
390
391        for p in supported_providers.into_iter() {
392            let provider_str = p.to_string();
393            let epoch_store = epoch_store.clone();
394            let consensus_adapter = consensus_adapter.clone();
395            let metrics = metrics.clone();
396            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
397                async move {
398                    // note: restart-safe de-duplication happens after consensus, this is
399                    // just best-effort to reduce unneeded submissions.
400                    let mut seen = HashSet::new();
401                    loop {
402                        jwk_log!("fetching JWK for provider {:?}", p);
403                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
404                        match Self::fetch_jwks(authority, &p).await {
405                            Err(e) => {
406                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
407                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
408                                // Retry in 30 seconds
409                                tokio::time::sleep(Duration::from_secs(30)).await;
410                                continue;
411                            }
412                            Ok(mut keys) => {
413                                metrics.total_jwks
414                                    .with_label_values(&[&provider_str])
415                                    .inc_by(keys.len() as u64);
416
417                                keys.retain(|(id, jwk)| {
418                                    validate_jwk(&metrics, &p, id, jwk) &&
419                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
420                                    seen.insert((id.clone(), jwk.clone()))
421                                });
422
423                                metrics.unique_jwks
424                                    .with_label_values(&[&provider_str])
425                                    .inc_by(keys.len() as u64);
426
427                                // prevent oauth providers from sending too many keys,
428                                // inadvertently or otherwise
429                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
430                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
431                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
432                                }
433
434                                for (id, jwk) in keys.into_iter() {
435                                    jwk_log!("Submitting JWK to consensus: {:?}", id);
436
437                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
438                                    consensus_adapter.submit(txn, None, &epoch_store, None, None)
439                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
440                                        .ok();
441                                }
442                            }
443                        }
444                        tokio::time::sleep(fetch_interval).await;
445                    }
446                }
447                .instrument(error_span!("jwk_updater_task", epoch)),
448            ));
449        }
450    }
451
452    pub async fn start_async(
453        config: NodeConfig,
454        registry_service: RegistryService,
455        server_version: ServerVersion,
456    ) -> Result<Arc<SuiNode>> {
457        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
458        let mut config = config.clone();
459        if config.supported_protocol_versions.is_none() {
460            info!(
461                "populating config.supported_protocol_versions with default {:?}",
462                SupportedProtocolVersions::SYSTEM_DEFAULT
463            );
464            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
465        }
466
467        let run_with_range = config.run_with_range;
468        let is_validator = config.consensus_config().is_some();
469        let is_full_node = !is_validator;
470        let prometheus_registry = registry_service.default_registry();
471
472        info!(node =? config.protocol_public_key(),
473            "Initializing sui-node listening on {}", config.network_address
474        );
475
476        // Initialize metrics to track db usage before creating any stores
477        DBMetrics::init(registry_service.clone());
478
479        // Initialize db sync-to-disk setting from config (falls back to env var if not set)
480        typed_store::init_write_sync(config.enable_db_sync_to_disk);
481
482        // Initialize Mysten metrics.
483        mysten_metrics::init_metrics(&prometheus_registry);
484        // Unsupported (because of the use of static variable) and unnecessary in simtests.
485        #[cfg(not(msim))]
486        mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
487
488        let genesis = config.genesis()?.clone();
489
490        let secret = Arc::pin(config.protocol_key_pair().copy());
491        let genesis_committee = genesis.committee();
492        let committee_store = Arc::new(CommitteeStore::new(
493            config.db_path().join("epochs"),
494            &genesis_committee,
495            None,
496        ));
497
498        let pruner_watermarks = Arc::new(PrunerWatermarks::default());
499        let checkpoint_store = CheckpointStore::new(
500            &config.db_path().join("checkpoints"),
501            pruner_watermarks.clone(),
502        );
503        let checkpoint_metrics = CheckpointMetrics::new(&registry_service.default_registry());
504
505        Self::check_and_recover_forks(
506            &checkpoint_store,
507            &checkpoint_metrics,
508            is_validator,
509            config.fork_recovery.as_ref(),
510        )
511        .await?;
512
513        // By default, only enable write stall on validators for perpetual db.
514        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
515        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
516            enable_write_stall,
517            is_validator,
518        };
519        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
520            &config.db_path().join("store"),
521            Some(perpetual_tables_options),
522            Some(pruner_watermarks.epoch_id.clone()),
523        ));
524        let is_genesis = perpetual_tables
525            .database_is_empty()
526            .expect("Database read should not fail at init.");
527
528        let backpressure_manager =
529            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
530
531        let store =
532            AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
533
534        let cur_epoch = store.get_recovery_epoch_at_restart()?;
535        let committee = committee_store
536            .get_committee(&cur_epoch)?
537            .expect("Committee of the current epoch must exist");
538        let epoch_start_configuration = store
539            .get_epoch_start_configuration()?
540            .expect("EpochStartConfiguration of the current epoch must exist");
541        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
542        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
543
544        let cache_traits = build_execution_cache(
545            &config.execution_cache,
546            &prometheus_registry,
547            &store,
548            backpressure_manager.clone(),
549        );
550
551        let auth_agg = {
552            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
553            Arc::new(ArcSwap::new(Arc::new(
554                AuthorityAggregator::new_from_epoch_start_state(
555                    epoch_start_configuration.epoch_start_state(),
556                    &committee_store,
557                    safe_client_metrics_base,
558                ),
559            )))
560        };
561
562        let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
563        let chain = match config.chain_override_for_testing {
564            Some(chain) => chain,
565            None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
566        };
567
568        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
569        let epoch_store = AuthorityPerEpochStore::new(
570            config.protocol_public_key(),
571            committee.clone(),
572            &config.db_path().join("store"),
573            Some(epoch_options.options),
574            EpochMetrics::new(&registry_service.default_registry()),
575            epoch_start_configuration,
576            cache_traits.backing_package_store.clone(),
577            cache_traits.object_store.clone(),
578            cache_metrics,
579            signature_verifier_metrics,
580            &config.expensive_safety_check_config,
581            (chain_id, chain),
582            checkpoint_store
583                .get_highest_executed_checkpoint_seq_number()
584                .expect("checkpoint store read cannot fail")
585                .unwrap_or(0),
586            Arc::new(SubmittedTransactionCacheMetrics::new(
587                &registry_service.default_registry(),
588            )),
589        )?;
590
591        info!("created epoch store");
592
593        replay_log!(
594            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
595            epoch_store.epoch(),
596            epoch_store.protocol_config()
597        );
598
599        // the database is empty at genesis time
600        if is_genesis {
601            info!("checking SUI conservation at genesis");
602            // When we are opening the db table, the only time when it's safe to
603            // check SUI conservation is at genesis. Otherwise we may be in the middle of
604            // an epoch and the SUI conservation check will fail. This also initialize
605            // the expected_network_sui_amount table.
606            cache_traits
607                .reconfig_api
608                .expensive_check_sui_conservation(&epoch_store)
609                .expect("SUI conservation check cannot fail at genesis");
610        }
611
612        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
613        let default_buffer_stake = epoch_store
614            .protocol_config()
615            .buffer_stake_for_protocol_upgrade_bps();
616        if effective_buffer_stake != default_buffer_stake {
617            warn!(
618                ?effective_buffer_stake,
619                ?default_buffer_stake,
620                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
621            );
622        }
623
624        checkpoint_store.insert_genesis_checkpoint(
625            genesis.checkpoint(),
626            genesis.checkpoint_contents().clone(),
627            &epoch_store,
628        );
629
630        info!("creating state sync store");
631        let state_sync_store = RocksDbStore::new(
632            cache_traits.clone(),
633            committee_store.clone(),
634            checkpoint_store.clone(),
635        );
636
637        let index_store = if is_full_node && config.enable_index_processing {
638            info!("creating jsonrpc index store");
639            Some(Arc::new(IndexStore::new(
640                config.db_path().join("indexes"),
641                &prometheus_registry,
642                epoch_store
643                    .protocol_config()
644                    .max_move_identifier_len_as_option(),
645                config.remove_deprecated_tables,
646            )))
647        } else {
648            None
649        };
650
651        let rpc_index = if is_full_node && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
652            info!("creating rpc index store");
653            Some(Arc::new(
654                RpcIndexStore::new(
655                    &config.db_path(),
656                    &store,
657                    &checkpoint_store,
658                    &epoch_store,
659                    &cache_traits.backing_package_store,
660                    pruner_watermarks.checkpoint_id.clone(),
661                    config.rpc().cloned().unwrap_or_default(),
662                )
663                .await,
664            ))
665        } else {
666            None
667        };
668
669        let chain_identifier = epoch_store.get_chain_identifier();
670
671        info!("creating archive reader");
672        // Create network
673        let (randomness_tx, randomness_rx) = mpsc::channel(
674            config
675                .p2p_config
676                .randomness
677                .clone()
678                .unwrap_or_default()
679                .mailbox_capacity(),
680        );
681        let P2pComponents {
682            p2p_network,
683            known_peers,
684            discovery_handle,
685            state_sync_handle,
686            randomness_handle,
687        } = Self::create_p2p_network(
688            &config,
689            state_sync_store.clone(),
690            chain_identifier,
691            randomness_tx,
692            &prometheus_registry,
693        )?;
694
695        let endpoint_manager = EndpointManager::new(discovery_handle.clone());
696
697        // Inject configured peer address overrides.
698        for peer in &config.p2p_config.peer_address_overrides {
699            endpoint_manager
700                .update_endpoint(
701                    EndpointId::P2p(peer.peer_id),
702                    AddressSource::Config,
703                    peer.addresses.clone(),
704                )
705                .expect("Updating peer address overrides should not fail");
706        }
707
708        // Send initial peer addresses to the p2p network.
709        update_peer_addresses(&config, &endpoint_manager, epoch_store.epoch_start_state());
710
711        info!("start snapshot upload");
712        // Start uploading state snapshot to remote store
713        let state_snapshot_handle = Self::start_state_snapshot(
714            &config,
715            &prometheus_registry,
716            checkpoint_store.clone(),
717            chain_identifier,
718        )?;
719
720        // Start uploading db checkpoints to remote store
721        info!("start db checkpoint");
722        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
723            &config,
724            &prometheus_registry,
725            state_snapshot_handle.is_some(),
726        )?;
727
728        if !epoch_store
729            .protocol_config()
730            .simplified_unwrap_then_delete()
731        {
732            // We cannot prune tombstones if simplified_unwrap_then_delete is not enabled.
733            config
734                .authority_store_pruning_config
735                .set_killswitch_tombstone_pruning(true);
736        }
737
738        let authority_name = config.protocol_public_key();
739
740        info!("create authority state");
741        let state = AuthorityState::new(
742            authority_name,
743            secret,
744            config.supported_protocol_versions.unwrap(),
745            store.clone(),
746            cache_traits.clone(),
747            epoch_store.clone(),
748            committee_store.clone(),
749            index_store.clone(),
750            rpc_index,
751            checkpoint_store.clone(),
752            &prometheus_registry,
753            genesis.objects(),
754            &db_checkpoint_config,
755            config.clone(),
756            chain_identifier,
757            config.policy_config.clone(),
758            config.firewall_config.clone(),
759            pruner_watermarks,
760        )
761        .await;
762        // ensure genesis txn was executed
763        if epoch_store.epoch() == 0 {
764            let txn = &genesis.transaction();
765            let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
766            let transaction =
767                sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
768                    sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
769                        genesis.transaction().data().clone(),
770                        sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
771                    ),
772                );
773            state
774                .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
775                .instrument(span)
776                .await
777                .unwrap();
778        }
779
780        // Start the loop that receives new randomness and generates transactions for it.
781        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
782
783        if config
784            .expensive_safety_check_config
785            .enable_secondary_index_checks()
786            && let Some(indexes) = state.indexes.clone()
787        {
788            sui_core::verify_indexes::verify_indexes(
789                state.get_global_state_hash_store().as_ref(),
790                indexes,
791            )
792            .expect("secondary indexes are inconsistent");
793        }
794
795        let (end_of_epoch_channel, end_of_epoch_receiver) =
796            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
797
798        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
799            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
800                auth_agg.load_full(),
801                state.clone(),
802                end_of_epoch_receiver,
803                &config.db_path(),
804                &prometheus_registry,
805                &config,
806            )))
807        } else {
808            None
809        };
810
811        let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
812            state.clone(),
813            state_sync_store,
814            &transaction_orchestrator.clone(),
815            &config,
816            &prometheus_registry,
817            server_version,
818        )
819        .await?;
820
821        let global_state_hasher = Arc::new(GlobalStateHasher::new(
822            cache_traits.global_state_hash_store.clone(),
823            GlobalStateHashMetrics::new(&prometheus_registry),
824        ));
825
826        let authority_names_to_peer_ids = epoch_store
827            .epoch_start_state()
828            .get_authority_names_to_peer_ids();
829
830        let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
831            "sui",
832            &registry_service.default_registry(),
833        );
834
835        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
836
837        let connection_monitor_handle =
838            mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
839                p2p_network.downgrade(),
840                Arc::new(network_connection_metrics),
841                known_peers,
842            );
843
844        let connection_monitor_status = ConnectionMonitorStatus {
845            connection_statuses: connection_monitor_handle.connection_statuses(),
846            authority_names_to_peer_ids,
847        };
848
849        let connection_monitor_status = Arc::new(connection_monitor_status);
850        let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));
851
852        sui_node_metrics
853            .binary_max_protocol_version
854            .set(ProtocolVersion::MAX.as_u64() as i64);
855        sui_node_metrics
856            .configured_max_protocol_version
857            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
858
859        let validator_components = if state.is_validator(&epoch_store) {
860            let mut components = Self::construct_validator_components(
861                config.clone(),
862                state.clone(),
863                committee,
864                epoch_store.clone(),
865                checkpoint_store.clone(),
866                state_sync_handle.clone(),
867                randomness_handle.clone(),
868                Arc::downgrade(&global_state_hasher),
869                backpressure_manager.clone(),
870                connection_monitor_status.clone(),
871                &registry_service,
872                sui_node_metrics.clone(),
873                checkpoint_metrics.clone(),
874            )
875            .await?;
876
877            components.consensus_adapter.submit_recovered(&epoch_store);
878
879            // Start the gRPC server
880            components.validator_server_handle = components.validator_server_handle.start().await;
881
882            // Set the consensus address updater so that we can update the consensus peer addresses when requested.
883            endpoint_manager.set_consensus_address_updater(components.consensus_manager.clone());
884
885            Some(components)
886        } else {
887            None
888        };
889
890        // setup shutdown channel
891        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
892
893        let node = Self {
894            config,
895            validator_components: Mutex::new(validator_components),
896            http_servers,
897            state,
898            transaction_orchestrator,
899            registry_service,
900            metrics: sui_node_metrics,
901            checkpoint_metrics,
902
903            _discovery: discovery_handle,
904            _connection_monitor_handle: connection_monitor_handle,
905            state_sync_handle,
906            randomness_handle,
907            checkpoint_store,
908            global_state_hasher: Mutex::new(Some(global_state_hasher)),
909            end_of_epoch_channel,
910            connection_monitor_status,
911            endpoint_manager,
912            backpressure_manager,
913
914            _db_checkpoint_handle: db_checkpoint_handle,
915
916            #[cfg(msim)]
917            sim_state: Default::default(),
918
919            _state_snapshot_uploader_handle: state_snapshot_handle,
920            shutdown_channel_tx: shutdown_channel,
921
922            auth_agg,
923            subscription_service_checkpoint_sender,
924        };
925
926        info!("SuiNode started!");
927        let node = Arc::new(node);
928        let node_copy = node.clone();
929        spawn_monitored_task!(async move {
930            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
931            if let Err(error) = result {
932                warn!("Reconfiguration finished with error {:?}", error);
933            }
934        });
935
936        Ok(node)
937    }
938
939    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
940        self.end_of_epoch_channel.subscribe()
941    }
942
943    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
944        self.shutdown_channel_tx.subscribe()
945    }
946
947    pub fn current_epoch_for_testing(&self) -> EpochId {
948        self.state.current_epoch_for_testing()
949    }
950
951    pub fn db_checkpoint_path(&self) -> PathBuf {
952        self.config.db_checkpoint_path()
953    }
954
955    // Init reconfig process by starting to reject user certs
956    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
957        info!("close_epoch (current epoch = {})", epoch_store.epoch());
958        self.validator_components
959            .lock()
960            .await
961            .as_ref()
962            .ok_or_else(|| SuiError::from("Node is not a validator"))?
963            .consensus_adapter
964            .close_epoch(epoch_store);
965        Ok(())
966    }
967
968    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
969        self.state
970            .clear_override_protocol_upgrade_buffer_stake(epoch)
971    }
972
973    pub fn set_override_protocol_upgrade_buffer_stake(
974        &self,
975        epoch: EpochId,
976        buffer_stake_bps: u64,
977    ) -> SuiResult {
978        self.state
979            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
980    }
981
982    // Testing-only API to start epoch close process.
983    // For production code, please use the non-testing version.
984    pub async fn close_epoch_for_testing(&self) -> SuiResult {
985        let epoch_store = self.state.epoch_store_for_testing();
986        self.close_epoch(&epoch_store).await
987    }
988
989    fn start_state_snapshot(
990        config: &NodeConfig,
991        prometheus_registry: &Registry,
992        checkpoint_store: Arc<CheckpointStore>,
993        chain_identifier: ChainIdentifier,
994    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
995        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
996            let snapshot_uploader = StateSnapshotUploader::new(
997                &config.db_checkpoint_path(),
998                &config.snapshot_path(),
999                remote_store_config.clone(),
1000                60,
1001                prometheus_registry,
1002                checkpoint_store,
1003                chain_identifier,
1004                config.state_snapshot_write_config.archive_interval_epochs,
1005            )?;
1006            Ok(Some(snapshot_uploader.start()))
1007        } else {
1008            Ok(None)
1009        }
1010    }
1011
1012    fn start_db_checkpoint(
1013        config: &NodeConfig,
1014        prometheus_registry: &Registry,
1015        state_snapshot_enabled: bool,
1016    ) -> Result<(
1017        DBCheckpointConfig,
1018        Option<tokio::sync::broadcast::Sender<()>>,
1019    )> {
1020        let checkpoint_path = Some(
1021            config
1022                .db_checkpoint_config
1023                .checkpoint_path
1024                .clone()
1025                .unwrap_or_else(|| config.db_checkpoint_path()),
1026        );
1027        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1028            DBCheckpointConfig {
1029                checkpoint_path,
1030                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1031                    true
1032                } else {
1033                    config
1034                        .db_checkpoint_config
1035                        .perform_db_checkpoints_at_epoch_end
1036                },
1037                ..config.db_checkpoint_config.clone()
1038            }
1039        } else {
1040            config.db_checkpoint_config.clone()
1041        };
1042
1043        match (
1044            db_checkpoint_config.object_store_config.as_ref(),
1045            state_snapshot_enabled,
1046        ) {
1047            // If db checkpoint config object store not specified but
1048            // state snapshot object store is specified, create handler
1049            // anyway for marking db checkpoints as completed so that they
1050            // can be uploaded as state snapshots.
1051            (None, false) => Ok((db_checkpoint_config, None)),
1052            (_, _) => {
1053                let handler = DBCheckpointHandler::new(
1054                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1055                    db_checkpoint_config.object_store_config.as_ref(),
1056                    60,
1057                    db_checkpoint_config
1058                        .prune_and_compact_before_upload
1059                        .unwrap_or(true),
1060                    config.authority_store_pruning_config.clone(),
1061                    prometheus_registry,
1062                    state_snapshot_enabled,
1063                )?;
1064                Ok((
1065                    db_checkpoint_config,
1066                    Some(DBCheckpointHandler::start(handler)),
1067                ))
1068            }
1069        }
1070    }
1071
1072    fn create_p2p_network(
1073        config: &NodeConfig,
1074        state_sync_store: RocksDbStore,
1075        chain_identifier: ChainIdentifier,
1076        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1077        prometheus_registry: &Registry,
1078    ) -> Result<P2pComponents> {
1079        let (state_sync, state_sync_router) = state_sync::Builder::new()
1080            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1081            .store(state_sync_store)
1082            .archive_config(config.archive_reader_config())
1083            .with_metrics(prometheus_registry)
1084            .build();
1085
1086        let (discovery, discovery_server) = discovery::Builder::new()
1087            .config(config.p2p_config.clone())
1088            .build();
1089
1090        let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1091        let known_peers: HashMap<PeerId, String> = discovery_config
1092            .allowlisted_peers
1093            .clone()
1094            .into_iter()
1095            .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1096            .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1097                peer.peer_id
1098                    .map(|peer_id| (peer_id, "seed_peer".to_string()))
1099            }))
1100            .collect();
1101
1102        let (randomness, randomness_router) =
1103            randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1104                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1105                .with_metrics(prometheus_registry)
1106                .build();
1107
1108        let p2p_network = {
1109            let routes = anemo::Router::new()
1110                .add_rpc_service(discovery_server)
1111                .merge(state_sync_router);
1112            let routes = routes.merge(randomness_router);
1113
1114            let inbound_network_metrics =
1115                mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1116            let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1117                "sui",
1118                "outbound",
1119                prometheus_registry,
1120            );
1121
1122            let service = ServiceBuilder::new()
1123                .layer(
1124                    TraceLayer::new_for_server_errors()
1125                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1126                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1127                )
1128                .layer(CallbackLayer::new(
1129                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1130                        Arc::new(inbound_network_metrics),
1131                        config.p2p_config.excessive_message_size(),
1132                    ),
1133                ))
1134                .service(routes);
1135
1136            let outbound_layer = ServiceBuilder::new()
1137                .layer(
1138                    TraceLayer::new_for_client_and_server_errors()
1139                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1140                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1141                )
1142                .layer(CallbackLayer::new(
1143                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1144                        Arc::new(outbound_network_metrics),
1145                        config.p2p_config.excessive_message_size(),
1146                    ),
1147                ))
1148                .into_inner();
1149
1150            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1151            // Set the max_frame_size to be 1 GB to work around the issue of there being too many
1152            // staking events in the epoch change txn.
1153            anemo_config.max_frame_size = Some(1 << 30);
1154
1155            // Set a higher default value for socket send/receive buffers if not already
1156            // configured.
1157            let mut quic_config = anemo_config.quic.unwrap_or_default();
1158            if quic_config.socket_send_buffer_size.is_none() {
1159                quic_config.socket_send_buffer_size = Some(20 << 20);
1160            }
1161            if quic_config.socket_receive_buffer_size.is_none() {
1162                quic_config.socket_receive_buffer_size = Some(20 << 20);
1163            }
1164            quic_config.allow_failed_socket_buffer_size_setting = true;
1165
1166            // Set high-performance defaults for quinn transport.
1167            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1168            if quic_config.max_concurrent_bidi_streams.is_none() {
1169                quic_config.max_concurrent_bidi_streams = Some(500);
1170            }
1171            if quic_config.max_concurrent_uni_streams.is_none() {
1172                quic_config.max_concurrent_uni_streams = Some(500);
1173            }
1174            if quic_config.stream_receive_window.is_none() {
1175                quic_config.stream_receive_window = Some(100 << 20);
1176            }
1177            if quic_config.receive_window.is_none() {
1178                quic_config.receive_window = Some(200 << 20);
1179            }
1180            if quic_config.send_window.is_none() {
1181                quic_config.send_window = Some(200 << 20);
1182            }
1183            if quic_config.crypto_buffer_size.is_none() {
1184                quic_config.crypto_buffer_size = Some(1 << 20);
1185            }
1186            if quic_config.max_idle_timeout_ms.is_none() {
1187                quic_config.max_idle_timeout_ms = Some(10_000);
1188            }
1189            if quic_config.keep_alive_interval_ms.is_none() {
1190                quic_config.keep_alive_interval_ms = Some(5_000);
1191            }
1192            anemo_config.quic = Some(quic_config);
1193
1194            let server_name = format!("sui-{}", chain_identifier);
1195            let network = Network::bind(config.p2p_config.listen_address)
1196                .server_name(&server_name)
1197                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1198                .config(anemo_config)
1199                .outbound_request_layer(outbound_layer)
1200                .start(service)?;
1201            info!(
1202                server_name = server_name,
1203                "P2p network started on {}",
1204                network.local_addr()
1205            );
1206
1207            network
1208        };
1209
1210        let discovery_handle =
1211            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1212        let state_sync_handle = state_sync.start(p2p_network.clone());
1213        let randomness_handle = randomness.start(p2p_network.clone());
1214
1215        Ok(P2pComponents {
1216            p2p_network,
1217            known_peers,
1218            discovery_handle,
1219            state_sync_handle,
1220            randomness_handle,
1221        })
1222    }
1223
1224    async fn construct_validator_components(
1225        config: NodeConfig,
1226        state: Arc<AuthorityState>,
1227        committee: Arc<Committee>,
1228        epoch_store: Arc<AuthorityPerEpochStore>,
1229        checkpoint_store: Arc<CheckpointStore>,
1230        state_sync_handle: state_sync::Handle,
1231        randomness_handle: randomness::Handle,
1232        global_state_hasher: Weak<GlobalStateHasher>,
1233        backpressure_manager: Arc<BackpressureManager>,
1234        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1235        registry_service: &RegistryService,
1236        sui_node_metrics: Arc<SuiNodeMetrics>,
1237        checkpoint_metrics: Arc<CheckpointMetrics>,
1238    ) -> Result<ValidatorComponents> {
1239        let mut config_clone = config.clone();
1240        let consensus_config = config_clone
1241            .consensus_config
1242            .as_mut()
1243            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1244
1245        let client = Arc::new(UpdatableConsensusClient::new());
1246        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1247            &committee,
1248            consensus_config,
1249            state.name,
1250            connection_monitor_status.clone(),
1251            &registry_service.default_registry(),
1252            epoch_store.protocol_config().clone(),
1253            client.clone(),
1254            checkpoint_store.clone(),
1255        ));
1256        let consensus_manager = Arc::new(ConsensusManager::new(
1257            &config,
1258            consensus_config,
1259            registry_service,
1260            client,
1261        ));
1262
1263        // This only gets started up once, not on every epoch. (Make call to remove every epoch.)
1264        let consensus_store_pruner = ConsensusStorePruner::new(
1265            consensus_manager.get_storage_base_path(),
1266            consensus_config.db_retention_epochs(),
1267            consensus_config.db_pruner_period(),
1268            &registry_service.default_registry(),
1269        );
1270
1271        let sui_tx_validator_metrics =
1272            SuiTxValidatorMetrics::new(&registry_service.default_registry());
1273
1274        let validator_server_handle = Self::start_grpc_validator_service(
1275            &config,
1276            state.clone(),
1277            consensus_adapter.clone(),
1278            &registry_service.default_registry(),
1279        )
1280        .await?;
1281
1282        // Starts an overload monitor that monitors the execution of the authority.
1283        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1284        let validator_overload_monitor_handle = if config
1285            .authority_overload_config
1286            .max_load_shedding_percentage
1287            > 0
1288        {
1289            let authority_state = Arc::downgrade(&state);
1290            let overload_config = config.authority_overload_config.clone();
1291            fail_point!("starting_overload_monitor");
1292            Some(spawn_monitored_task!(overload_monitor(
1293                authority_state,
1294                overload_config,
1295            )))
1296        } else {
1297            None
1298        };
1299
1300        Self::start_epoch_specific_validator_components(
1301            &config,
1302            state.clone(),
1303            consensus_adapter,
1304            checkpoint_store,
1305            epoch_store,
1306            state_sync_handle,
1307            randomness_handle,
1308            consensus_manager,
1309            consensus_store_pruner,
1310            global_state_hasher,
1311            backpressure_manager,
1312            validator_server_handle,
1313            validator_overload_monitor_handle,
1314            checkpoint_metrics,
1315            sui_node_metrics,
1316            sui_tx_validator_metrics,
1317        )
1318        .await
1319    }
1320
1321    async fn start_epoch_specific_validator_components(
1322        config: &NodeConfig,
1323        state: Arc<AuthorityState>,
1324        consensus_adapter: Arc<ConsensusAdapter>,
1325        checkpoint_store: Arc<CheckpointStore>,
1326        epoch_store: Arc<AuthorityPerEpochStore>,
1327        state_sync_handle: state_sync::Handle,
1328        randomness_handle: randomness::Handle,
1329        consensus_manager: Arc<ConsensusManager>,
1330        consensus_store_pruner: ConsensusStorePruner,
1331        state_hasher: Weak<GlobalStateHasher>,
1332        backpressure_manager: Arc<BackpressureManager>,
1333        validator_server_handle: SpawnOnce,
1334        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1335        checkpoint_metrics: Arc<CheckpointMetrics>,
1336        sui_node_metrics: Arc<SuiNodeMetrics>,
1337        sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1338    ) -> Result<ValidatorComponents> {
1339        let checkpoint_service = Self::build_checkpoint_service(
1340            config,
1341            consensus_adapter.clone(),
1342            checkpoint_store.clone(),
1343            epoch_store.clone(),
1344            state.clone(),
1345            state_sync_handle,
1346            state_hasher,
1347            checkpoint_metrics.clone(),
1348        );
1349
1350        // create a new map that gets injected into both the consensus handler and the consensus adapter
1351        // the consensus handler will write values forwarded from consensus, and the consensus adapter
1352        // will read the values to make decisions about which validator submits a transaction to consensus
1353        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1354
1355        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1356
1357        if epoch_store.randomness_state_enabled() {
1358            let randomness_manager = RandomnessManager::try_new(
1359                Arc::downgrade(&epoch_store),
1360                Box::new(consensus_adapter.clone()),
1361                randomness_handle,
1362                config.protocol_key_pair(),
1363            )
1364            .await;
1365            if let Some(randomness_manager) = randomness_manager {
1366                epoch_store
1367                    .set_randomness_manager(randomness_manager)
1368                    .await?;
1369            }
1370        }
1371
1372        ExecutionTimeObserver::spawn(
1373            epoch_store.clone(),
1374            Box::new(consensus_adapter.clone()),
1375            config
1376                .execution_time_observer_config
1377                .clone()
1378                .unwrap_or_default(),
1379        );
1380
1381        let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1382            None,
1383            state.metrics.clone(),
1384        ));
1385
1386        let throughput_profiler = Arc::new(ConsensusThroughputProfiler::new(
1387            throughput_calculator.clone(),
1388            None,
1389            None,
1390            state.metrics.clone(),
1391            ThroughputProfileRanges::from_chain(epoch_store.get_chain_identifier()),
1392        ));
1393
1394        consensus_adapter.swap_throughput_profiler(throughput_profiler);
1395
1396        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1397            state.clone(),
1398            checkpoint_service.clone(),
1399            epoch_store.clone(),
1400            consensus_adapter.clone(),
1401            low_scoring_authorities,
1402            throughput_calculator,
1403            backpressure_manager,
1404            config.congestion_log.clone(),
1405        );
1406
1407        info!("Starting consensus manager asynchronously");
1408
1409        // Spawn consensus startup asynchronously to avoid blocking other components
1410        tokio::spawn({
1411            let config = config.clone();
1412            let epoch_store = epoch_store.clone();
1413            let sui_tx_validator = SuiTxValidator::new(
1414                state.clone(),
1415                epoch_store.clone(),
1416                checkpoint_service.clone(),
1417                sui_tx_validator_metrics.clone(),
1418            );
1419            let consensus_manager = consensus_manager.clone();
1420            async move {
1421                consensus_manager
1422                    .start(
1423                        &config,
1424                        epoch_store,
1425                        consensus_handler_initializer,
1426                        sui_tx_validator,
1427                    )
1428                    .await;
1429            }
1430        });
1431        let replay_waiter = consensus_manager.replay_waiter();
1432
1433        info!("Spawning checkpoint service");
1434        let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1435            None
1436        } else {
1437            Some(replay_waiter)
1438        };
1439        checkpoint_service
1440            .spawn(epoch_store.clone(), replay_waiter)
1441            .await;
1442
1443        if epoch_store.authenticator_state_enabled() {
1444            Self::start_jwk_updater(
1445                config,
1446                sui_node_metrics,
1447                state.name,
1448                epoch_store.clone(),
1449                consensus_adapter.clone(),
1450            );
1451        }
1452
1453        Ok(ValidatorComponents {
1454            validator_server_handle,
1455            validator_overload_monitor_handle,
1456            consensus_manager,
1457            consensus_store_pruner,
1458            consensus_adapter,
1459            checkpoint_metrics,
1460            sui_tx_validator_metrics,
1461        })
1462    }
1463
1464    fn build_checkpoint_service(
1465        config: &NodeConfig,
1466        consensus_adapter: Arc<ConsensusAdapter>,
1467        checkpoint_store: Arc<CheckpointStore>,
1468        epoch_store: Arc<AuthorityPerEpochStore>,
1469        state: Arc<AuthorityState>,
1470        state_sync_handle: state_sync::Handle,
1471        state_hasher: Weak<GlobalStateHasher>,
1472        checkpoint_metrics: Arc<CheckpointMetrics>,
1473    ) -> Arc<CheckpointService> {
1474        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1475        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1476
1477        debug!(
1478            "Starting checkpoint service with epoch start timestamp {}
1479            and epoch duration {}",
1480            epoch_start_timestamp_ms, epoch_duration_ms
1481        );
1482
1483        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1484            sender: consensus_adapter,
1485            signer: state.secret.clone(),
1486            authority: config.protocol_public_key(),
1487            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1488                .checked_add(epoch_duration_ms)
1489                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1490            metrics: checkpoint_metrics.clone(),
1491        });
1492
1493        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1494        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1495        let max_checkpoint_size_bytes =
1496            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1497
1498        CheckpointService::build(
1499            state.clone(),
1500            checkpoint_store,
1501            epoch_store,
1502            state.get_transaction_cache_reader().clone(),
1503            state_hasher,
1504            checkpoint_output,
1505            Box::new(certified_checkpoint_output),
1506            checkpoint_metrics,
1507            max_tx_per_checkpoint,
1508            max_checkpoint_size_bytes,
1509        )
1510    }
1511
1512    fn construct_consensus_adapter(
1513        committee: &Committee,
1514        consensus_config: &ConsensusConfig,
1515        authority: AuthorityName,
1516        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1517        prometheus_registry: &Registry,
1518        protocol_config: ProtocolConfig,
1519        consensus_client: Arc<dyn ConsensusClient>,
1520        checkpoint_store: Arc<CheckpointStore>,
1521    ) -> ConsensusAdapter {
1522        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1523        // The consensus adapter allows the authority to send user certificates through consensus.
1524
1525        ConsensusAdapter::new(
1526            consensus_client,
1527            checkpoint_store,
1528            authority,
1529            connection_monitor_status,
1530            consensus_config.max_pending_transactions(),
1531            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1532            consensus_config.max_submit_position,
1533            consensus_config.submit_delay_step_override(),
1534            ca_metrics,
1535            protocol_config,
1536        )
1537    }
1538
1539    async fn start_grpc_validator_service(
1540        config: &NodeConfig,
1541        state: Arc<AuthorityState>,
1542        consensus_adapter: Arc<ConsensusAdapter>,
1543        prometheus_registry: &Registry,
1544    ) -> Result<SpawnOnce> {
1545        let validator_service = ValidatorService::new(
1546            state.clone(),
1547            consensus_adapter,
1548            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1549            config.policy_config.clone().map(|p| p.client_id_source),
1550        );
1551
1552        let mut server_conf = mysten_network::config::Config::new();
1553        server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1554        server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1555        server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1556        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1557        server_conf.load_shed = config.grpc_load_shed;
1558        let mut server_builder =
1559            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1560
1561        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1562
1563        let tls_config = sui_tls::create_rustls_server_config(
1564            config.network_key_pair().copy().private(),
1565            SUI_TLS_SERVER_NAME.to_string(),
1566        );
1567
1568        let network_address = config.network_address().clone();
1569
1570        let (ready_tx, ready_rx) = oneshot::channel();
1571
1572        Ok(SpawnOnce::new(ready_rx, async move {
1573            let server = server_builder
1574                .bind(&network_address, Some(tls_config))
1575                .await
1576                .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1577            let local_addr = server.local_addr();
1578            info!("Listening to traffic on {local_addr}");
1579            ready_tx.send(()).unwrap();
1580            if let Err(err) = server.serve().await {
1581                info!("Server stopped: {err}");
1582            }
1583            info!("Server stopped");
1584        }))
1585    }
1586
1587    pub fn state(&self) -> Arc<AuthorityState> {
1588        self.state.clone()
1589    }
1590
1591    // Only used for testing because of how epoch store is loaded.
1592    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1593        self.state.reference_gas_price_for_testing()
1594    }
1595
1596    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1597        self.state.committee_store().clone()
1598    }
1599
1600    /*
1601    pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1602        self.state.db()
1603    }
1604    */
1605
1606    /// Clone an AuthorityAggregator currently used in this node, if the node is a fullnode.
1607    /// After reconfig, Transaction Driver builds a new AuthorityAggregator. The caller
1608    /// of this function will mostly likely want to call this again
1609    /// to get a fresh one.
1610    pub fn clone_authority_aggregator(
1611        &self,
1612    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1613        self.transaction_orchestrator
1614            .as_ref()
1615            .map(|to| to.clone_authority_aggregator())
1616    }
1617
1618    pub fn transaction_orchestrator(
1619        &self,
1620    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1621        self.transaction_orchestrator.clone()
1622    }
1623
1624    /// This function awaits the completion of checkpoint execution of the current epoch,
1625    /// after which it initiates reconfiguration of the entire system.
1626    pub async fn monitor_reconfiguration(
1627        self: Arc<Self>,
1628        mut epoch_store: Arc<AuthorityPerEpochStore>,
1629    ) -> Result<()> {
1630        let checkpoint_executor_metrics =
1631            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1632
1633        loop {
1634            let mut hasher_guard = self.global_state_hasher.lock().await;
1635            let hasher = hasher_guard.take().unwrap();
1636            info!(
1637                "Creating checkpoint executor for epoch {}",
1638                epoch_store.epoch()
1639            );
1640            let checkpoint_executor = CheckpointExecutor::new(
1641                epoch_store.clone(),
1642                self.checkpoint_store.clone(),
1643                self.state.clone(),
1644                hasher.clone(),
1645                self.backpressure_manager.clone(),
1646                self.config.checkpoint_executor_config.clone(),
1647                checkpoint_executor_metrics.clone(),
1648                self.subscription_service_checkpoint_sender.clone(),
1649            );
1650
1651            let run_with_range = self.config.run_with_range;
1652
1653            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1654
1655            // Update the current protocol version metric.
1656            self.metrics
1657                .current_protocol_version
1658                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1659
1660            // Advertise capabilities to committee, if we are a validator.
1661            if let Some(components) = &*self.validator_components.lock().await {
1662                // TODO: without this sleep, the consensus message is not delivered reliably.
1663                tokio::time::sleep(Duration::from_millis(1)).await;
1664
1665                let config = cur_epoch_store.protocol_config();
1666                let mut supported_protocol_versions = self
1667                    .config
1668                    .supported_protocol_versions
1669                    .expect("Supported versions should be populated")
1670                    // no need to send digests of versions less than the current version
1671                    .truncate_below(config.version);
1672
1673                while supported_protocol_versions.max > config.version {
1674                    let proposed_protocol_config = ProtocolConfig::get_for_version(
1675                        supported_protocol_versions.max,
1676                        cur_epoch_store.get_chain(),
1677                    );
1678
1679                    if proposed_protocol_config.enable_accumulators()
1680                        && !epoch_store.accumulator_root_exists()
1681                    {
1682                        error!(
1683                            "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1684                            supported_protocol_versions.max
1685                        );
1686                        supported_protocol_versions.max = supported_protocol_versions.max.prev();
1687                    } else {
1688                        break;
1689                    }
1690                }
1691
1692                let binary_config = config.binary_config(None);
1693                let transaction = ConsensusTransaction::new_capability_notification_v2(
1694                    AuthorityCapabilitiesV2::new(
1695                        self.state.name,
1696                        cur_epoch_store.get_chain_identifier().chain(),
1697                        supported_protocol_versions,
1698                        self.state
1699                            .get_available_system_packages(&binary_config)
1700                            .await,
1701                    ),
1702                );
1703                info!(?transaction, "submitting capabilities to consensus");
1704                components.consensus_adapter.submit(
1705                    transaction,
1706                    None,
1707                    &cur_epoch_store,
1708                    None,
1709                    None,
1710                )?;
1711            }
1712
1713            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1714
1715            if stop_condition == StopReason::RunWithRangeCondition {
1716                SuiNode::shutdown(&self).await;
1717                self.shutdown_channel_tx
1718                    .send(run_with_range)
1719                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1720                return Ok(());
1721            }
1722
1723            // Safe to call because we are in the middle of reconfiguration.
1724            let latest_system_state = self
1725                .state
1726                .get_object_cache_reader()
1727                .get_sui_system_state_object_unsafe()
1728                .expect("Read Sui System State object cannot fail");
1729
1730            #[cfg(msim)]
1731            if !self
1732                .sim_state
1733                .sim_safe_mode_expected
1734                .load(Ordering::Relaxed)
1735            {
1736                debug_assert!(!latest_system_state.safe_mode());
1737            }
1738
1739            #[cfg(not(msim))]
1740            debug_assert!(!latest_system_state.safe_mode());
1741
1742            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1743                && self.state.is_fullnode(&cur_epoch_store)
1744            {
1745                warn!(
1746                    "Failed to send end of epoch notification to subscriber: {:?}",
1747                    err
1748                );
1749            }
1750
1751            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1752            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1753
1754            self.auth_agg.store(Arc::new(
1755                self.auth_agg
1756                    .load()
1757                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1758            ));
1759
1760            let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1761            let next_epoch = next_epoch_committee.epoch();
1762            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1763
1764            info!(
1765                next_epoch,
1766                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1767            );
1768
1769            fail_point_async!("reconfig_delay");
1770
1771            // We save the connection monitor status map regardless of validator / fullnode status
1772            // so that we don't need to restart the connection monitor every epoch.
1773            // Update the mappings that will be used by the consensus adapter if it exists or is
1774            // about to be created.
1775            let authority_names_to_peer_ids =
1776                new_epoch_start_state.get_authority_names_to_peer_ids();
1777            self.connection_monitor_status
1778                .update_mapping_for_epoch(authority_names_to_peer_ids);
1779
1780            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1781
1782            update_peer_addresses(&self.config, &self.endpoint_manager, &new_epoch_start_state);
1783
1784            let mut validator_components_lock_guard = self.validator_components.lock().await;
1785
1786            // The following code handles 4 different cases, depending on whether the node
1787            // was a validator in the previous epoch, and whether the node is a validator
1788            // in the new epoch.
1789            let new_epoch_store = self
1790                .reconfigure_state(
1791                    &self.state,
1792                    &cur_epoch_store,
1793                    next_epoch_committee.clone(),
1794                    new_epoch_start_state,
1795                    hasher.clone(),
1796                )
1797                .await;
1798
1799            let new_validator_components = if let Some(ValidatorComponents {
1800                validator_server_handle,
1801                validator_overload_monitor_handle,
1802                consensus_manager,
1803                consensus_store_pruner,
1804                consensus_adapter,
1805                checkpoint_metrics,
1806                sui_tx_validator_metrics,
1807            }) = validator_components_lock_guard.take()
1808            {
1809                info!("Reconfiguring the validator.");
1810
1811                consensus_manager.shutdown().await;
1812                info!("Consensus has shut down.");
1813
1814                info!("Epoch store finished reconfiguration.");
1815
1816                // No other components should be holding a strong reference to state hasher
1817                // at this point. Confirm here before we swap in the new hasher.
1818                let global_state_hasher_metrics = Arc::into_inner(hasher)
1819                    .expect("Object state hasher should have no other references at this point")
1820                    .metrics();
1821                let new_hasher = Arc::new(GlobalStateHasher::new(
1822                    self.state.get_global_state_hash_store().clone(),
1823                    global_state_hasher_metrics,
1824                ));
1825                let weak_hasher = Arc::downgrade(&new_hasher);
1826                *hasher_guard = Some(new_hasher);
1827
1828                consensus_store_pruner.prune(next_epoch).await;
1829
1830                if self.state.is_validator(&new_epoch_store) {
1831                    // Only restart consensus if this node is still a validator in the new epoch.
1832                    Some(
1833                        Self::start_epoch_specific_validator_components(
1834                            &self.config,
1835                            self.state.clone(),
1836                            consensus_adapter,
1837                            self.checkpoint_store.clone(),
1838                            new_epoch_store.clone(),
1839                            self.state_sync_handle.clone(),
1840                            self.randomness_handle.clone(),
1841                            consensus_manager,
1842                            consensus_store_pruner,
1843                            weak_hasher,
1844                            self.backpressure_manager.clone(),
1845                            validator_server_handle,
1846                            validator_overload_monitor_handle,
1847                            checkpoint_metrics,
1848                            self.metrics.clone(),
1849                            sui_tx_validator_metrics,
1850                        )
1851                        .await?,
1852                    )
1853                } else {
1854                    info!("This node is no longer a validator after reconfiguration");
1855                    None
1856                }
1857            } else {
1858                // No other components should be holding a strong reference to state hasher
1859                // at this point. Confirm here before we swap in the new hasher.
1860                let global_state_hasher_metrics = Arc::into_inner(hasher)
1861                    .expect("Object state hasher should have no other references at this point")
1862                    .metrics();
1863                let new_hasher = Arc::new(GlobalStateHasher::new(
1864                    self.state.get_global_state_hash_store().clone(),
1865                    global_state_hasher_metrics,
1866                ));
1867                let weak_hasher = Arc::downgrade(&new_hasher);
1868                *hasher_guard = Some(new_hasher);
1869
1870                if self.state.is_validator(&new_epoch_store) {
1871                    info!("Promoting the node from fullnode to validator, starting grpc server");
1872
1873                    let mut components = Self::construct_validator_components(
1874                        self.config.clone(),
1875                        self.state.clone(),
1876                        Arc::new(next_epoch_committee.clone()),
1877                        new_epoch_store.clone(),
1878                        self.checkpoint_store.clone(),
1879                        self.state_sync_handle.clone(),
1880                        self.randomness_handle.clone(),
1881                        weak_hasher,
1882                        self.backpressure_manager.clone(),
1883                        self.connection_monitor_status.clone(),
1884                        &self.registry_service,
1885                        self.metrics.clone(),
1886                        self.checkpoint_metrics.clone(),
1887                    )
1888                    .await?;
1889
1890                    components.validator_server_handle =
1891                        components.validator_server_handle.start().await;
1892
1893                    // Set the consensus address updater as the full node got promoted now to a validator.
1894                    self.endpoint_manager
1895                        .set_consensus_address_updater(components.consensus_manager.clone());
1896
1897                    Some(components)
1898                } else {
1899                    None
1900                }
1901            };
1902            *validator_components_lock_guard = new_validator_components;
1903
1904            // Force releasing current epoch store DB handle, because the
1905            // Arc<AuthorityPerEpochStore> may linger.
1906            cur_epoch_store.release_db_handles();
1907
1908            if cfg!(msim)
1909                && !matches!(
1910                    self.config
1911                        .authority_store_pruning_config
1912                        .num_epochs_to_retain_for_checkpoints(),
1913                    None | Some(u64::MAX) | Some(0)
1914                )
1915            {
1916                self.state
1917                    .prune_checkpoints_for_eligible_epochs_for_testing(
1918                        self.config.clone(),
1919                        sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1920                    )
1921                    .await?;
1922            }
1923
1924            epoch_store = new_epoch_store;
1925            info!("Reconfiguration finished");
1926        }
1927    }
1928
1929    async fn shutdown(&self) {
1930        if let Some(validator_components) = &*self.validator_components.lock().await {
1931            validator_components.consensus_manager.shutdown().await;
1932        }
1933    }
1934
1935    async fn reconfigure_state(
1936        &self,
1937        state: &Arc<AuthorityState>,
1938        cur_epoch_store: &AuthorityPerEpochStore,
1939        next_epoch_committee: Committee,
1940        next_epoch_start_system_state: EpochStartSystemState,
1941        global_state_hasher: Arc<GlobalStateHasher>,
1942    ) -> Arc<AuthorityPerEpochStore> {
1943        let next_epoch = next_epoch_committee.epoch();
1944
1945        let last_checkpoint = self
1946            .checkpoint_store
1947            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1948            .expect("Error loading last checkpoint for current epoch")
1949            .expect("Could not load last checkpoint for current epoch");
1950
1951        let last_checkpoint_seq = *last_checkpoint.sequence_number();
1952
1953        assert_eq!(
1954            Some(last_checkpoint_seq),
1955            self.checkpoint_store
1956                .get_highest_executed_checkpoint_seq_number()
1957                .expect("Error loading highest executed checkpoint sequence number")
1958        );
1959
1960        let epoch_start_configuration = EpochStartConfiguration::new(
1961            next_epoch_start_system_state,
1962            *last_checkpoint.digest(),
1963            state.get_object_store().as_ref(),
1964            EpochFlag::default_flags_for_new_epoch(&state.config),
1965        )
1966        .expect("EpochStartConfiguration construction cannot fail");
1967
1968        let new_epoch_store = self
1969            .state
1970            .reconfigure(
1971                cur_epoch_store,
1972                self.config.supported_protocol_versions.unwrap(),
1973                next_epoch_committee,
1974                epoch_start_configuration,
1975                global_state_hasher,
1976                &self.config.expensive_safety_check_config,
1977                last_checkpoint_seq,
1978            )
1979            .await
1980            .expect("Reconfigure authority state cannot fail");
1981        info!(next_epoch, "Node State has been reconfigured");
1982        assert_eq!(next_epoch, new_epoch_store.epoch());
1983        self.state.get_reconfig_api().update_epoch_flags_metrics(
1984            cur_epoch_store.epoch_start_config().flags(),
1985            new_epoch_store.epoch_start_config().flags(),
1986        );
1987
1988        new_epoch_store
1989    }
1990
1991    pub fn get_config(&self) -> &NodeConfig {
1992        &self.config
1993    }
1994
1995    pub fn randomness_handle(&self) -> randomness::Handle {
1996        self.randomness_handle.clone()
1997    }
1998
1999    pub fn endpoint_manager(&self) -> &EndpointManager {
2000        &self.endpoint_manager
2001    }
2002
2003    /// Get a short prefix of a digest for metric labels
2004    fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2005        let digest_str = digest.to_string();
2006        if digest_str.len() >= 8 {
2007            digest_str[0..8].to_string()
2008        } else {
2009            digest_str
2010        }
2011    }
2012
2013    /// Check for previously detected forks and handle them appropriately.
2014    /// For validators with fork recovery config, clear the fork if it matches the recovery config.
2015    /// For all other cases, block node startup if a fork is detected.
2016    async fn check_and_recover_forks(
2017        checkpoint_store: &CheckpointStore,
2018        checkpoint_metrics: &CheckpointMetrics,
2019        is_validator: bool,
2020        fork_recovery: Option<&ForkRecoveryConfig>,
2021    ) -> Result<()> {
2022        // Fork detection and recovery is only relevant for validators
2023        // Fullnodes should sync from validators and don't need fork checking
2024        if !is_validator {
2025            return Ok(());
2026        }
2027
2028        // Try to recover from forks if recovery config is provided
2029        if let Some(recovery) = fork_recovery {
2030            Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2031            Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2032        }
2033
2034        if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2035            .get_checkpoint_fork_detected()
2036            .map_err(|e| {
2037                error!("Failed to check for checkpoint fork: {:?}", e);
2038                e
2039            })?
2040        {
2041            Self::handle_checkpoint_fork(
2042                checkpoint_seq,
2043                checkpoint_digest,
2044                checkpoint_metrics,
2045                fork_recovery,
2046            )
2047            .await?;
2048        }
2049        if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2050            .get_transaction_fork_detected()
2051            .map_err(|e| {
2052                error!("Failed to check for transaction fork: {:?}", e);
2053                e
2054            })?
2055        {
2056            Self::handle_transaction_fork(
2057                tx_digest,
2058                expected_effects,
2059                actual_effects,
2060                checkpoint_metrics,
2061                fork_recovery,
2062            )
2063            .await?;
2064        }
2065
2066        Ok(())
2067    }
2068
2069    fn try_recover_checkpoint_fork(
2070        checkpoint_store: &CheckpointStore,
2071        recovery: &ForkRecoveryConfig,
2072    ) -> Result<()> {
2073        // If configured overrides include a checkpoint whose locally computed digest mismatches,
2074        // clear locally computed checkpoints from that sequence (inclusive).
2075        for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2076            let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2077                anyhow::bail!(
2078                    "Invalid checkpoint digest override for seq {}: {}",
2079                    seq,
2080                    expected_digest_str
2081                );
2082            };
2083
2084            if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2085                let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2086                if local_digest != expected_digest {
2087                    info!(
2088                        seq,
2089                        local = %Self::get_digest_prefix(local_digest),
2090                        expected = %Self::get_digest_prefix(expected_digest),
2091                        "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2092                        seq
2093                    );
2094                    checkpoint_store
2095                        .clear_locally_computed_checkpoints_from(*seq)
2096                        .context(
2097                            "Failed to clear locally computed checkpoints from override seq",
2098                        )?;
2099                }
2100            }
2101        }
2102
2103        if let Some((checkpoint_seq, checkpoint_digest)) =
2104            checkpoint_store.get_checkpoint_fork_detected()?
2105            && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2106        {
2107            info!(
2108                "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2109                checkpoint_seq, checkpoint_digest
2110            );
2111            checkpoint_store
2112                .clear_checkpoint_fork_detected()
2113                .expect("Failed to clear checkpoint fork detected marker");
2114        }
2115        Ok(())
2116    }
2117
2118    fn try_recover_transaction_fork(
2119        checkpoint_store: &CheckpointStore,
2120        recovery: &ForkRecoveryConfig,
2121    ) -> Result<()> {
2122        if recovery.transaction_overrides.is_empty() {
2123            return Ok(());
2124        }
2125
2126        if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2127            && recovery
2128                .transaction_overrides
2129                .contains_key(&tx_digest.to_string())
2130        {
2131            info!(
2132                "Fork recovery enabled: clearing transaction fork for tx {:?}",
2133                tx_digest
2134            );
2135            checkpoint_store
2136                .clear_transaction_fork_detected()
2137                .expect("Failed to clear transaction fork detected marker");
2138        }
2139        Ok(())
2140    }
2141
2142    fn get_current_timestamp() -> u64 {
2143        std::time::SystemTime::now()
2144            .duration_since(std::time::SystemTime::UNIX_EPOCH)
2145            .unwrap()
2146            .as_secs()
2147    }
2148
2149    async fn handle_checkpoint_fork(
2150        checkpoint_seq: u64,
2151        checkpoint_digest: CheckpointDigest,
2152        checkpoint_metrics: &CheckpointMetrics,
2153        fork_recovery: Option<&ForkRecoveryConfig>,
2154    ) -> Result<()> {
2155        checkpoint_metrics
2156            .checkpoint_fork_crash_mode
2157            .with_label_values(&[
2158                &checkpoint_seq.to_string(),
2159                &Self::get_digest_prefix(checkpoint_digest),
2160                &Self::get_current_timestamp().to_string(),
2161            ])
2162            .set(1);
2163
2164        let behavior = fork_recovery
2165            .map(|fr| fr.fork_crash_behavior)
2166            .unwrap_or_default();
2167
2168        match behavior {
2169            ForkCrashBehavior::AwaitForkRecovery => {
2170                error!(
2171                    checkpoint_seq = checkpoint_seq,
2172                    checkpoint_digest = ?checkpoint_digest,
2173                    "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2174                );
2175                futures::future::pending::<()>().await;
2176                unreachable!("pending() should never return");
2177            }
2178            ForkCrashBehavior::ReturnError => {
2179                error!(
2180                    checkpoint_seq = checkpoint_seq,
2181                    checkpoint_digest = ?checkpoint_digest,
2182                    "Checkpoint fork detected! Returning error."
2183                );
2184                Err(anyhow::anyhow!(
2185                    "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2186                    checkpoint_seq,
2187                    checkpoint_digest
2188                ))
2189            }
2190        }
2191    }
2192
2193    async fn handle_transaction_fork(
2194        tx_digest: TransactionDigest,
2195        expected_effects_digest: TransactionEffectsDigest,
2196        actual_effects_digest: TransactionEffectsDigest,
2197        checkpoint_metrics: &CheckpointMetrics,
2198        fork_recovery: Option<&ForkRecoveryConfig>,
2199    ) -> Result<()> {
2200        checkpoint_metrics
2201            .transaction_fork_crash_mode
2202            .with_label_values(&[
2203                &Self::get_digest_prefix(tx_digest),
2204                &Self::get_digest_prefix(expected_effects_digest),
2205                &Self::get_digest_prefix(actual_effects_digest),
2206                &Self::get_current_timestamp().to_string(),
2207            ])
2208            .set(1);
2209
2210        let behavior = fork_recovery
2211            .map(|fr| fr.fork_crash_behavior)
2212            .unwrap_or_default();
2213
2214        match behavior {
2215            ForkCrashBehavior::AwaitForkRecovery => {
2216                error!(
2217                    tx_digest = ?tx_digest,
2218                    expected_effects_digest = ?expected_effects_digest,
2219                    actual_effects_digest = ?actual_effects_digest,
2220                    "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2221                );
2222                futures::future::pending::<()>().await;
2223                unreachable!("pending() should never return");
2224            }
2225            ForkCrashBehavior::ReturnError => {
2226                error!(
2227                    tx_digest = ?tx_digest,
2228                    expected_effects_digest = ?expected_effects_digest,
2229                    actual_effects_digest = ?actual_effects_digest,
2230                    "Transaction fork detected! Returning error."
2231                );
2232                Err(anyhow::anyhow!(
2233                    "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2234                    tx_digest,
2235                    expected_effects_digest,
2236                    actual_effects_digest
2237                ))
2238            }
2239        }
2240    }
2241}
2242
2243#[cfg(not(msim))]
2244impl SuiNode {
2245    async fn fetch_jwks(
2246        _authority: AuthorityName,
2247        provider: &OIDCProvider,
2248    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2249        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2250        use sui_types::error::SuiErrorKind;
2251        let client = reqwest::Client::new();
2252        fetch_jwks(provider, &client, true)
2253            .await
2254            .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2255    }
2256}
2257
2258#[cfg(msim)]
2259impl SuiNode {
2260    pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2261        self.sim_state.sim_node.id()
2262    }
2263
2264    pub fn set_safe_mode_expected(&self, new_value: bool) {
2265        info!("Setting safe mode expected to {}", new_value);
2266        self.sim_state
2267            .sim_safe_mode_expected
2268            .store(new_value, Ordering::Relaxed);
2269    }
2270
2271    #[allow(unused_variables)]
2272    async fn fetch_jwks(
2273        authority: AuthorityName,
2274        provider: &OIDCProvider,
2275    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2276        get_jwk_injector()(authority, provider)
2277    }
2278}
2279
2280enum SpawnOnce {
2281    // Mutex is only needed to make SpawnOnce Send
2282    Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2283    #[allow(unused)]
2284    Started(JoinHandle<()>),
2285}
2286
2287impl SpawnOnce {
2288    pub fn new(
2289        ready_rx: oneshot::Receiver<()>,
2290        future: impl Future<Output = ()> + Send + 'static,
2291    ) -> Self {
2292        Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2293    }
2294
2295    pub async fn start(self) -> Self {
2296        match self {
2297            Self::Unstarted(ready_rx, future) => {
2298                let future = future.into_inner();
2299                let handle = tokio::spawn(future);
2300                ready_rx.await.unwrap();
2301                Self::Started(handle)
2302            }
2303            Self::Started(_) => self,
2304        }
2305    }
2306}
2307
2308/// Updates trusted peer addresses in the p2p network.
2309fn update_peer_addresses(
2310    config: &NodeConfig,
2311    endpoint_manager: &EndpointManager,
2312    epoch_start_state: &EpochStartSystemState,
2313) {
2314    for (peer_id, address) in
2315        epoch_start_state.get_validator_as_p2p_peers(config.protocol_public_key())
2316    {
2317        endpoint_manager
2318            .update_endpoint(
2319                EndpointId::P2p(peer_id),
2320                AddressSource::Committee,
2321                vec![address],
2322            )
2323            .expect("Updating peer addresses should not fail");
2324    }
2325}
2326
2327fn build_kv_store(
2328    state: &Arc<AuthorityState>,
2329    config: &NodeConfig,
2330    registry: &Registry,
2331) -> Result<Arc<TransactionKeyValueStore>> {
2332    let metrics = KeyValueStoreMetrics::new(registry);
2333    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2334
2335    let base_url = &config.transaction_kv_store_read_config.base_url;
2336
2337    if base_url.is_empty() {
2338        info!("no http kv store url provided, using local db only");
2339        return Ok(Arc::new(db_store));
2340    }
2341
2342    let base_url: url::Url = base_url.parse().tap_err(|e| {
2343        error!(
2344            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2345            base_url, e
2346        )
2347    })?;
2348
2349    let network_str = match state.get_chain_identifier().chain() {
2350        Chain::Mainnet => "/mainnet",
2351        _ => {
2352            info!("using local db only for kv store");
2353            return Ok(Arc::new(db_store));
2354        }
2355    };
2356
2357    let base_url = base_url.join(network_str)?.to_string();
2358    let http_store = HttpKVStore::new_kv(
2359        &base_url,
2360        config.transaction_kv_store_read_config.cache_size,
2361        metrics.clone(),
2362    )?;
2363    info!("using local key-value store with fallback to http key-value store");
2364    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2365        db_store,
2366        http_store,
2367        metrics,
2368        "json_rpc_fallback",
2369    )))
2370}
2371
2372async fn build_http_servers(
2373    state: Arc<AuthorityState>,
2374    store: RocksDbStore,
2375    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2376    config: &NodeConfig,
2377    prometheus_registry: &Registry,
2378    server_version: ServerVersion,
2379) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2380    // Validators do not expose these APIs
2381    if config.consensus_config().is_some() {
2382        return Ok((HttpServers::default(), None));
2383    }
2384
2385    info!("starting rpc service with config: {:?}", config.rpc);
2386
2387    let mut router = axum::Router::new();
2388
2389    let json_rpc_router = {
2390        let traffic_controller = state.traffic_controller.clone();
2391        let mut server = JsonRpcServerBuilder::new(
2392            env!("CARGO_PKG_VERSION"),
2393            prometheus_registry,
2394            traffic_controller,
2395            config.policy_config.clone(),
2396        );
2397
2398        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2399
2400        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2401        server.register_module(ReadApi::new(
2402            state.clone(),
2403            kv_store.clone(),
2404            metrics.clone(),
2405        ))?;
2406        server.register_module(CoinReadApi::new(
2407            state.clone(),
2408            kv_store.clone(),
2409            metrics.clone(),
2410        ))?;
2411
2412        // if run_with_range is enabled we want to prevent any transactions
2413        // run_with_range = None is normal operating conditions
2414        if config.run_with_range.is_none() {
2415            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2416        }
2417        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2418        server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2419
2420        if let Some(transaction_orchestrator) = transaction_orchestrator {
2421            server.register_module(TransactionExecutionApi::new(
2422                state.clone(),
2423                transaction_orchestrator.clone(),
2424                metrics.clone(),
2425            ))?;
2426        }
2427
2428        let name_service_config =
2429            if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2430                config.name_service_package_address,
2431                config.name_service_registry_id,
2432                config.name_service_reverse_registry_id,
2433            ) {
2434                sui_name_service::NameServiceConfig::new(
2435                    package_address,
2436                    registry_id,
2437                    reverse_registry_id,
2438                )
2439            } else {
2440                match state.get_chain_identifier().chain() {
2441                    Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2442                    Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2443                    Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2444                }
2445            };
2446
2447        server.register_module(IndexerApi::new(
2448            state.clone(),
2449            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2450            kv_store,
2451            name_service_config,
2452            metrics,
2453            config.indexer_max_subscriptions,
2454        ))?;
2455        server.register_module(MoveUtils::new(state.clone()))?;
2456
2457        let server_type = config.jsonrpc_server_type();
2458
2459        server.to_router(server_type).await?
2460    };
2461
2462    router = router.merge(json_rpc_router);
2463
2464    let (subscription_service_checkpoint_sender, subscription_service_handle) =
2465        SubscriptionService::build(prometheus_registry);
2466    let rpc_router = {
2467        let mut rpc_service =
2468            sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2469        rpc_service.with_server_version(server_version);
2470
2471        if let Some(config) = config.rpc.clone() {
2472            rpc_service.with_config(config);
2473        }
2474
2475        rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2476        rpc_service.with_subscription_service(subscription_service_handle);
2477
2478        if let Some(transaction_orchestrator) = transaction_orchestrator {
2479            rpc_service.with_executor(transaction_orchestrator.clone())
2480        }
2481
2482        rpc_service.into_router().await
2483    };
2484
2485    let layers = ServiceBuilder::new()
2486        .map_request(|mut request: axum::http::Request<_>| {
2487            if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2488                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2489                request.extensions_mut().insert(axum_connect_info);
2490            }
2491            request
2492        })
2493        .layer(axum::middleware::from_fn(server_timing_middleware))
2494        // Setup a permissive CORS policy
2495        .layer(
2496            tower_http::cors::CorsLayer::new()
2497                .allow_methods([http::Method::GET, http::Method::POST])
2498                .allow_origin(tower_http::cors::Any)
2499                .allow_headers(tower_http::cors::Any)
2500                .expose_headers(tower_http::cors::Any),
2501        );
2502
2503    router = router.merge(rpc_router).layer(layers);
2504
2505    let https = if let Some((tls_config, https_address)) = config
2506        .rpc()
2507        .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2508    {
2509        let https = sui_http::Builder::new()
2510            .tls_single_cert(tls_config.cert(), tls_config.key())
2511            .and_then(|builder| builder.serve(https_address, router.clone()))
2512            .map_err(|e| anyhow::anyhow!(e))?;
2513
2514        info!(
2515            https_address =? https.local_addr(),
2516            "HTTPS rpc server listening on {}",
2517            https.local_addr()
2518        );
2519
2520        Some(https)
2521    } else {
2522        None
2523    };
2524
2525    let http = sui_http::Builder::new()
2526        .serve(&config.json_rpc_address, router)
2527        .map_err(|e| anyhow::anyhow!(e))?;
2528
2529    info!(
2530        http_address =? http.local_addr(),
2531        "HTTP rpc server listening on {}",
2532        http.local_addr()
2533    );
2534
2535    Ok((
2536        HttpServers {
2537            http: Some(http),
2538            https,
2539        },
2540        Some(subscription_service_checkpoint_sender),
2541    ))
2542}
2543
2544#[cfg(not(test))]
2545fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2546    protocol_config.max_transactions_per_checkpoint() as usize
2547}
2548
2549#[cfg(test)]
2550fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2551    2
2552}
2553
2554#[derive(Default)]
2555struct HttpServers {
2556    #[allow(unused)]
2557    http: Option<sui_http::ServerHandle>,
2558    #[allow(unused)]
2559    https: Option<sui_http::ServerHandle>,
2560}
2561
2562#[cfg(test)]
2563mod tests {
2564    use super::*;
2565    use prometheus::Registry;
2566    use std::collections::BTreeMap;
2567    use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2568    use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2569    use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2570
2571    #[tokio::test]
2572    async fn test_fork_error_and_recovery_both_paths() {
2573        let checkpoint_store = CheckpointStore::new_for_tests();
2574        let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2575
2576        // ---------- Checkpoint fork path ----------
2577        let seq_num = 42;
2578        let digest = CheckpointDigest::random();
2579        checkpoint_store
2580            .record_checkpoint_fork_detected(seq_num, digest)
2581            .unwrap();
2582
2583        let fork_recovery = ForkRecoveryConfig {
2584            transaction_overrides: Default::default(),
2585            checkpoint_overrides: Default::default(),
2586            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2587        };
2588
2589        let r = SuiNode::check_and_recover_forks(
2590            &checkpoint_store,
2591            &checkpoint_metrics,
2592            true,
2593            Some(&fork_recovery),
2594        )
2595        .await;
2596        assert!(r.is_err());
2597        assert!(
2598            r.unwrap_err()
2599                .to_string()
2600                .contains("Checkpoint fork detected")
2601        );
2602
2603        let mut checkpoint_overrides = BTreeMap::new();
2604        checkpoint_overrides.insert(seq_num, digest.to_string());
2605        let fork_recovery_with_override = ForkRecoveryConfig {
2606            transaction_overrides: Default::default(),
2607            checkpoint_overrides,
2608            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2609        };
2610        let r = SuiNode::check_and_recover_forks(
2611            &checkpoint_store,
2612            &checkpoint_metrics,
2613            true,
2614            Some(&fork_recovery_with_override),
2615        )
2616        .await;
2617        assert!(r.is_ok());
2618        assert!(
2619            checkpoint_store
2620                .get_checkpoint_fork_detected()
2621                .unwrap()
2622                .is_none()
2623        );
2624
2625        // ---------- Transaction fork path ----------
2626        let tx_digest = TransactionDigest::random();
2627        let expected_effects = TransactionEffectsDigest::random();
2628        let actual_effects = TransactionEffectsDigest::random();
2629        checkpoint_store
2630            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2631            .unwrap();
2632
2633        let fork_recovery = ForkRecoveryConfig {
2634            transaction_overrides: Default::default(),
2635            checkpoint_overrides: Default::default(),
2636            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2637        };
2638        let r = SuiNode::check_and_recover_forks(
2639            &checkpoint_store,
2640            &checkpoint_metrics,
2641            true,
2642            Some(&fork_recovery),
2643        )
2644        .await;
2645        assert!(r.is_err());
2646        assert!(
2647            r.unwrap_err()
2648                .to_string()
2649                .contains("Transaction fork detected")
2650        );
2651
2652        let mut transaction_overrides = BTreeMap::new();
2653        transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2654        let fork_recovery_with_override = ForkRecoveryConfig {
2655            transaction_overrides,
2656            checkpoint_overrides: Default::default(),
2657            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2658        };
2659        let r = SuiNode::check_and_recover_forks(
2660            &checkpoint_store,
2661            &checkpoint_metrics,
2662            true,
2663            Some(&fork_recovery_with_override),
2664        )
2665        .await;
2666        assert!(r.is_ok());
2667        assert!(
2668            checkpoint_store
2669                .get_transaction_fork_detected()
2670                .unwrap()
2671                .is_none()
2672        );
2673    }
2674}