sui_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::admission_queue::{
29    AdmissionQueueContext, AdmissionQueueManager, AdmissionQueueMetrics,
30};
31use sui_core::authority::ExecutionEnv;
32use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
33use sui_core::authority::backpressure::BackpressureManager;
34use sui_core::authority::epoch_start_configuration::EpochFlag;
35use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
36use sui_core::consensus_adapter::ConsensusClient;
37use sui_core::consensus_manager::UpdatableConsensusClient;
38use sui_core::epoch::randomness::RandomnessManager;
39use sui_core::execution_cache::build_execution_cache;
40use sui_core::randomness_round_receiver::{RandomnessRoundReceiver, RandomnessRoundReceiverHandle};
41use sui_network::endpoint_manager::{AddressSource, EndpointId};
42use sui_network::validator::server::SUI_TLS_SERVER_NAME;
43use sui_types::full_checkpoint_content::Checkpoint;
44use sui_types::node_role::NodeRole;
45
46use sui_core::global_state_hasher::GlobalStateHashMetrics;
47use sui_core::storage::RestReadStore;
48use sui_json_rpc::bridge_api::BridgeReadApi;
49use sui_json_rpc_api::JsonRpcMetrics;
50use sui_network::randomness;
51use sui_rpc_api::RpcMetrics;
52use sui_rpc_api::ServerVersion;
53use sui_rpc_api::subscription::SubscriptionService;
54use sui_types::base_types::ConciseableName;
55use sui_types::crypto::RandomnessRound;
56use sui_types::digests::{
57    ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
58};
59use sui_types::messages_consensus::AuthorityCapabilitiesV2;
60use sui_types::sui_system_state::SuiSystemState;
61use tap::tap::TapFallible;
62use tokio::sync::oneshot;
63use tokio::sync::{Mutex, broadcast, mpsc};
64use tokio::task::JoinHandle;
65use tower::ServiceBuilder;
66use tracing::{Instrument, error_span, info};
67use tracing::{debug, error, warn};
68
69// Logs at debug level in test configuration, info level otherwise.
70// JWK logs cause significant volume in tests, but are insignificant in prod,
71// so we keep them at info
72macro_rules! jwk_log {
73    ($($arg:tt)+) => {
74        if in_test_configuration() {
75            debug!($($arg)+);
76        } else {
77            info!($($arg)+);
78        }
79    };
80}
81
82use fastcrypto_zkp::bn254::zk_login::JWK;
83pub use handle::SuiNodeHandle;
84use mysten_metrics::{RegistryService, spawn_monitored_task};
85use mysten_service::server_timing::server_timing_middleware;
86use sui_config::node::{DBCheckpointConfig, RunWithRange};
87use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
88use sui_config::node_config_metrics::NodeConfigMetrics;
89use sui_config::{ConsensusConfig, NodeConfig};
90use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
91use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
92use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
93use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
94use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
95use sui_core::authority_aggregator::AuthorityAggregator;
96use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
97use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
98use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
99use sui_core::checkpoints::{
100    CheckpointMetrics, CheckpointOutput, CheckpointService, CheckpointStore, LogCheckpointOutput,
101    SendCheckpointToStateSync, SubmitCheckpointToConsensus,
102};
103use sui_core::consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics};
104use sui_core::consensus_manager::ConsensusManager;
105use sui_core::consensus_throughput_calculator::ConsensusThroughputCalculator;
106use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
107use sui_core::db_checkpoint_handler::DBCheckpointHandler;
108use sui_core::epoch::committee_store::CommitteeStore;
109use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
110use sui_core::epoch::epoch_metrics::EpochMetrics;
111use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
112use sui_core::global_state_hasher::GlobalStateHasher;
113use sui_core::jsonrpc_index::IndexStore;
114use sui_core::module_cache_metrics::ResolverMetrics;
115use sui_core::overload_monitor::overload_monitor;
116use sui_core::rpc_index::RpcIndexStore;
117use sui_core::signature_verifier::SignatureVerifierMetrics;
118use sui_core::storage::RocksDbStore;
119use sui_core::transaction_orchestrator::TransactionOrchestrator;
120use sui_core::{
121    authority::{AuthorityState, AuthorityStore},
122    authority_client::NetworkAuthorityClient,
123};
124use sui_json_rpc::JsonRpcServerBuilder;
125use sui_json_rpc::coin_api::CoinReadApi;
126use sui_json_rpc::governance_api::GovernanceReadApi;
127use sui_json_rpc::indexer_api::IndexerApi;
128use sui_json_rpc::move_utils::MoveUtils;
129use sui_json_rpc::read_api::ReadApi;
130use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
131use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
132use sui_macros::fail_point;
133use sui_macros::{fail_point_async, replay_log};
134use sui_network::api::ValidatorServer;
135use sui_network::discovery;
136use sui_network::endpoint_manager::EndpointManager;
137use sui_network::state_sync;
138use sui_network::validator::server::ServerBuilder;
139use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
140use sui_snapshot::uploader::StateSnapshotUploader;
141use sui_storage::{
142    http_key_value_store::HttpKVStore,
143    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
144    key_value_store_metrics::KeyValueStoreMetrics,
145};
146use sui_types::base_types::{AuthorityName, EpochId};
147use sui_types::committee::Committee;
148use sui_types::crypto::KeypairTraits;
149use sui_types::error::{SuiError, SuiResult};
150use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
151use sui_types::sui_system_state::SuiSystemStateTrait;
152use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
153use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
154use sui_types::supported_protocol_versions::SupportedProtocolVersions;
155use typed_store::DBMetrics;
156use typed_store::rocks::default_db_options;
157
158use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
159
160pub mod admin;
161pub mod db_shell;
162mod handle;
163pub mod metrics;
164
165pub struct ValidatorComponents {
166    validator_server_handle: Option<SpawnOnce>,
167    validator_overload_monitor_handle: Option<JoinHandle<()>>,
168    consensus_manager: Arc<ConsensusManager>,
169    consensus_store_pruner: ConsensusStorePruner,
170    consensus_adapter: Arc<ConsensusAdapter>,
171    checkpoint_metrics: Arc<CheckpointMetrics>,
172    sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
173    admission_queue: Option<AdmissionQueueContext>,
174}
175pub struct P2pComponents {
176    p2p_network: Network,
177    known_peers: HashMap<PeerId, String>,
178    discovery_handle: discovery::Handle,
179    state_sync_handle: state_sync::Handle,
180    randomness_handle: randomness::Handle,
181    endpoint_manager: EndpointManager,
182}
183
184#[cfg(msim)]
185mod simulator {
186    use std::sync::atomic::AtomicBool;
187    use sui_types::error::SuiErrorKind;
188
189    use super::*;
190    pub(super) struct SimState {
191        pub sim_node: sui_simulator::runtime::NodeHandle,
192        pub sim_safe_mode_expected: AtomicBool,
193        _leak_detector: sui_simulator::NodeLeakDetector,
194    }
195
196    impl Default for SimState {
197        fn default() -> Self {
198            Self {
199                sim_node: sui_simulator::runtime::NodeHandle::current(),
200                sim_safe_mode_expected: AtomicBool::new(false),
201                _leak_detector: sui_simulator::NodeLeakDetector::new(),
202            }
203        }
204    }
205
206    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
207        + Send
208        + Sync
209        + 'static;
210
211    fn default_fetch_jwks(
212        _authority: AuthorityName,
213        _provider: &OIDCProvider,
214    ) -> SuiResult<Vec<(JwkId, JWK)>> {
215        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
216        // Just load a default Twitch jwk for testing.
217        parse_jwks(
218            sui_types::zk_login_util::DEFAULT_JWK_BYTES,
219            &OIDCProvider::Twitch,
220            true,
221        )
222        .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
223    }
224
225    thread_local! {
226        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
227    }
228
229    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
230        JWK_INJECTOR.with(|injector| injector.borrow().clone())
231    }
232
233    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
234        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
235    }
236}
237
238#[cfg(msim)]
239pub use simulator::set_jwk_injector;
240#[cfg(msim)]
241use simulator::*;
242use sui_core::authority::authority_store_pruner::PrunerWatermarks;
243use sui_core::{
244    consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
245};
246
247const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
248
249pub struct SuiNode {
250    config: NodeConfig,
251    validator_components: Mutex<Option<ValidatorComponents>>,
252
253    /// The http servers responsible for serving RPC traffic (gRPC and JSON-RPC)
254    #[allow(unused)]
255    http_servers: HttpServers,
256
257    state: Arc<AuthorityState>,
258    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
259    registry_service: RegistryService,
260    metrics: Arc<SuiNodeMetrics>,
261    checkpoint_metrics: Arc<CheckpointMetrics>,
262
263    _discovery: discovery::Handle,
264    _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
265    state_sync_handle: state_sync::Handle,
266    randomness_handle: randomness::Handle,
267    checkpoint_store: Arc<CheckpointStore>,
268    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
269
270    /// Broadcast channel to send the starting system state for the next epoch.
271    end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
272
273    /// EndpointManager for updating peer network addresses.
274    endpoint_manager: EndpointManager,
275
276    backpressure_manager: Arc<BackpressureManager>,
277
278    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
279
280    #[cfg(msim)]
281    sim_state: SimState,
282
283    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
284    // Channel to allow signaling upstream to shutdown sui-node
285    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
286
287    /// Handle shared with RandomnessManager and the consensus layer.
288    randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
289
290    /// AuthorityAggregator of the network, created at start and beginning of each epoch.
291    /// Use ArcSwap so that we could mutate it without taking mut reference.
292    // TODO: Eventually we can make this auth aggregator a shared reference so that this
293    // update will automatically propagate to other uses.
294    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
295
296    subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
297}
298
299impl fmt::Debug for SuiNode {
300    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
301        f.debug_struct("SuiNode")
302            .field("name", &self.state.name.concise())
303            .finish()
304    }
305}
306
307static MAX_JWK_KEYS_PER_FETCH: usize = 100;
308
309impl SuiNode {
310    pub async fn start(
311        config: NodeConfig,
312        registry_service: RegistryService,
313    ) -> Result<Arc<SuiNode>> {
314        Self::start_async(
315            config,
316            registry_service,
317            ServerVersion::new("sui-node", "unknown"),
318        )
319        .await
320    }
321
322    fn start_jwk_updater(
323        config: &NodeConfig,
324        metrics: Arc<SuiNodeMetrics>,
325        authority: AuthorityName,
326        epoch_store: Arc<AuthorityPerEpochStore>,
327        consensus_adapter: Arc<ConsensusAdapter>,
328    ) {
329        let epoch = epoch_store.epoch();
330
331        let supported_providers = config
332            .zklogin_oauth_providers
333            .get(&epoch_store.get_chain_identifier().chain())
334            .unwrap_or(&BTreeSet::new())
335            .iter()
336            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
337            .collect::<Vec<_>>();
338
339        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
340
341        info!(
342            ?fetch_interval,
343            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
344        );
345
346        fn validate_jwk(
347            metrics: &Arc<SuiNodeMetrics>,
348            provider: &OIDCProvider,
349            id: &JwkId,
350            jwk: &JWK,
351        ) -> bool {
352            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
353                warn!(
354                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
355                    id.iss, provider
356                );
357                metrics
358                    .invalid_jwks
359                    .with_label_values(&[&provider.to_string()])
360                    .inc();
361                return false;
362            };
363
364            if iss_provider != *provider {
365                warn!(
366                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
367                    id.iss, provider, iss_provider
368                );
369                metrics
370                    .invalid_jwks
371                    .with_label_values(&[&provider.to_string()])
372                    .inc();
373                return false;
374            }
375
376            if !check_total_jwk_size(id, jwk) {
377                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
378                metrics
379                    .invalid_jwks
380                    .with_label_values(&[&provider.to_string()])
381                    .inc();
382                return false;
383            }
384
385            true
386        }
387
388        // metrics is:
389        //  pub struct SuiNodeMetrics {
390        //      pub jwk_requests: IntCounterVec,
391        //      pub jwk_request_errors: IntCounterVec,
392        //      pub total_jwks: IntCounterVec,
393        //      pub unique_jwks: IntCounterVec,
394        //  }
395
396        for p in supported_providers.into_iter() {
397            let provider_str = p.to_string();
398            let epoch_store = epoch_store.clone();
399            let consensus_adapter = consensus_adapter.clone();
400            let metrics = metrics.clone();
401            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
402                async move {
403                    // note: restart-safe de-duplication happens after consensus, this is
404                    // just best-effort to reduce unneeded submissions.
405                    let mut seen = HashSet::new();
406                    loop {
407                        jwk_log!("fetching JWK for provider {:?}", p);
408                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
409                        match Self::fetch_jwks(authority, &p).await {
410                            Err(e) => {
411                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
412                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
413                                // Retry in 30 seconds
414                                tokio::time::sleep(Duration::from_secs(30)).await;
415                                continue;
416                            }
417                            Ok(mut keys) => {
418                                metrics.total_jwks
419                                    .with_label_values(&[&provider_str])
420                                    .inc_by(keys.len() as u64);
421
422                                keys.retain(|(id, jwk)| {
423                                    validate_jwk(&metrics, &p, id, jwk) &&
424                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
425                                    seen.insert((id.clone(), jwk.clone()))
426                                });
427
428                                metrics.unique_jwks
429                                    .with_label_values(&[&provider_str])
430                                    .inc_by(keys.len() as u64);
431
432                                // prevent oauth providers from sending too many keys,
433                                // inadvertently or otherwise
434                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
435                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
436                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
437                                }
438
439                                for (id, jwk) in keys.into_iter() {
440                                    jwk_log!("Submitting JWK to consensus: {:?}", id);
441
442                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
443                                    consensus_adapter.submit(txn, None, &epoch_store, None, None)
444                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
445                                        .ok();
446                                }
447                            }
448                        }
449                        tokio::time::sleep(fetch_interval).await;
450                    }
451                }
452                .instrument(error_span!("jwk_updater_task", epoch)),
453            ));
454        }
455    }
456
457    pub async fn start_async(
458        config: NodeConfig,
459        registry_service: RegistryService,
460        server_version: ServerVersion,
461    ) -> Result<Arc<SuiNode>> {
462        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
463        let mut config = config.clone();
464        if config.supported_protocol_versions.is_none() {
465            info!(
466                "populating config.supported_protocol_versions with default {:?}",
467                SupportedProtocolVersions::SYSTEM_DEFAULT
468            );
469            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
470        }
471
472        let run_with_range = config.run_with_range;
473        let prometheus_registry = registry_service.default_registry();
474        let node_role = config.intended_node_role();
475
476        info!(node =? config.protocol_public_key(),
477            "Initializing sui-node listening on {} with role {:?}", config.network_address, node_role
478        );
479
480        // Initialize metrics to track db usage before creating any stores
481        DBMetrics::init(registry_service.clone());
482
483        // Initialize db sync-to-disk setting from config (falls back to env var if not set)
484        typed_store::init_write_sync(config.enable_db_sync_to_disk);
485
486        // Initialize Mysten metrics.
487        mysten_metrics::init_metrics(&prometheus_registry);
488        // Unsupported (because of the use of static variable) and unnecessary in simtests.
489        #[cfg(not(msim))]
490        mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
491
492        let genesis = config.genesis()?.clone();
493
494        let secret = Arc::pin(config.protocol_key_pair().copy());
495        let genesis_committee = genesis.committee();
496        let committee_store = Arc::new(CommitteeStore::new(
497            config.db_path().join("epochs"),
498            &genesis_committee,
499            None,
500        ));
501
502        let pruner_watermarks = Arc::new(PrunerWatermarks::default());
503        let checkpoint_store = CheckpointStore::new(
504            &config.db_path().join("checkpoints"),
505            pruner_watermarks.clone(),
506        );
507        let checkpoint_metrics = CheckpointMetrics::new(&registry_service.default_registry());
508
509        if node_role.runs_consensus() {
510            Self::check_and_recover_forks(
511                &checkpoint_store,
512                &checkpoint_metrics,
513                config.fork_recovery.as_ref(),
514            )
515            .await?;
516        }
517
518        // By default, only enable write stall on nodes that run consensus.
519        let enable_write_stall = config
520            .enable_db_write_stall
521            .unwrap_or(node_role.runs_consensus());
522        // The tidehunter objects compactor retains only the latest version per
523        // ObjectID and is mutually exclusive with the object pruner. Enable it
524        // for validators (which always disable the pruner), and also for any
525        // node configured with `num_epochs_to_retain = 0` — that aggressive
526        // setting is what the compactor replaces. The pruner is force-disabled
527        // in `AuthorityStorePruner::new` whenever this is true.
528        let enable_objects_compactor = node_role.is_validator()
529            || config.authority_store_pruning_config.num_epochs_to_retain == 0;
530        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
531            enable_write_stall,
532            enable_objects_compactor,
533        };
534        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
535            &config.db_store_path(),
536            Some(perpetual_tables_options),
537            Some(pruner_watermarks.epoch_id.clone()),
538        ));
539        let is_genesis = perpetual_tables
540            .database_is_empty()
541            .expect("Database read should not fail at init.");
542
543        let backpressure_manager =
544            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
545
546        let store =
547            AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
548
549        let cur_epoch = store.get_recovery_epoch_at_restart()?;
550        let committee = committee_store
551            .get_committee(&cur_epoch)?
552            .expect("Committee of the current epoch must exist");
553        let epoch_start_configuration = store
554            .get_epoch_start_configuration()?
555            .expect("EpochStartConfiguration of the current epoch must exist");
556        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
557        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
558
559        let cache_traits = build_execution_cache(
560            &config.execution_cache,
561            &prometheus_registry,
562            &store,
563            backpressure_manager.clone(),
564        );
565
566        let auth_agg = {
567            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
568            Arc::new(ArcSwap::new(Arc::new(
569                AuthorityAggregator::new_from_epoch_start_state(
570                    epoch_start_configuration.epoch_start_state(),
571                    &committee_store,
572                    safe_client_metrics_base,
573                ),
574            )))
575        };
576
577        let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
578        let chain = match config.chain_override_for_testing {
579            Some(chain) => chain,
580            None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
581        };
582
583        let highest_executed_checkpoint = checkpoint_store
584            .get_highest_executed_checkpoint_seq_number()
585            .expect("checkpoint store read cannot fail")
586            .unwrap_or(0);
587
588        let previous_epoch_last_checkpoint = if cur_epoch == 0 {
589            0
590        } else {
591            checkpoint_store
592                .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
593                .expect("checkpoint store read cannot fail")
594                .unwrap_or(highest_executed_checkpoint)
595        };
596
597        let epoch_options = default_db_options().optimize_db_for_write_throughput(4, false);
598        let epoch_store = AuthorityPerEpochStore::new(
599            config.protocol_public_key(),
600            committee.clone(),
601            &config.db_store_path(),
602            Some(epoch_options.options),
603            EpochMetrics::new(&registry_service.default_registry()),
604            epoch_start_configuration,
605            cache_traits.backing_package_store.clone(),
606            cache_traits.object_store.clone(),
607            cache_metrics,
608            signature_verifier_metrics,
609            &config.expensive_safety_check_config,
610            (chain_id, chain),
611            highest_executed_checkpoint,
612            previous_epoch_last_checkpoint,
613            Arc::new(SubmittedTransactionCacheMetrics::new(
614                &registry_service.default_registry(),
615            )),
616            config.fullnode_sync_mode,
617        )?;
618
619        info!("created epoch store");
620
621        replay_log!(
622            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
623            epoch_store.epoch(),
624            epoch_store.protocol_config()
625        );
626
627        // the database is empty at genesis time
628        if is_genesis {
629            info!("checking SUI conservation at genesis");
630            // When we are opening the db table, the only time when it's safe to
631            // check SUI conservation is at genesis. Otherwise we may be in the middle of
632            // an epoch and the SUI conservation check will fail. This also initialize
633            // the expected_network_sui_amount table.
634            cache_traits
635                .reconfig_api
636                .expensive_check_sui_conservation(&epoch_store)
637                .expect("SUI conservation check cannot fail at genesis");
638        }
639
640        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
641        let default_buffer_stake = epoch_store
642            .protocol_config()
643            .buffer_stake_for_protocol_upgrade_bps();
644        if effective_buffer_stake != default_buffer_stake {
645            warn!(
646                ?effective_buffer_stake,
647                ?default_buffer_stake,
648                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
649            );
650        }
651
652        checkpoint_store.insert_genesis_checkpoint(
653            genesis.checkpoint(),
654            genesis.checkpoint_contents().clone(),
655            &epoch_store,
656        );
657
658        info!("creating state sync store");
659        let state_sync_store = RocksDbStore::new(
660            cache_traits.clone(),
661            committee_store.clone(),
662            checkpoint_store.clone(),
663        );
664
665        let index_store =
666            if node_role.should_enable_index_processing() && config.enable_index_processing {
667                info!("creating jsonrpc index store");
668                Some(Arc::new(IndexStore::new(
669                    config.db_path().join("indexes"),
670                    &prometheus_registry,
671                    epoch_store
672                        .protocol_config()
673                        .max_move_identifier_len_as_option(),
674                    config.remove_deprecated_tables,
675                )))
676            } else {
677                None
678            };
679
680        let rpc_index = if node_role.should_enable_index_processing()
681            && config.rpc().is_some_and(|rpc| rpc.enable_indexing())
682        {
683            info!("creating rpc index store");
684            Some(Arc::new(
685                RpcIndexStore::new(
686                    &config.db_path(),
687                    &store,
688                    &checkpoint_store,
689                    &epoch_store,
690                    &cache_traits.backing_package_store,
691                    config.rpc().cloned().unwrap_or_default(),
692                )
693                .await,
694            ))
695        } else {
696            None
697        };
698
699        let chain_identifier = epoch_store.get_chain_identifier();
700
701        info!("creating archive reader");
702        // Create network
703        let (randomness_tx, randomness_rx) = mpsc::channel(
704            config
705                .p2p_config
706                .randomness
707                .clone()
708                .unwrap_or_default()
709                .mailbox_capacity(),
710        );
711        let P2pComponents {
712            p2p_network,
713            known_peers,
714            discovery_handle,
715            state_sync_handle,
716            randomness_handle,
717            endpoint_manager,
718        } = Self::create_p2p_network(
719            &config,
720            state_sync_store.clone(),
721            chain_identifier,
722            randomness_tx,
723            &prometheus_registry,
724        )?;
725
726        // Inject configured peer address overrides.
727        for peer in &config.p2p_config.peer_address_overrides {
728            endpoint_manager
729                .update_endpoint(
730                    EndpointId::P2p(peer.peer_id),
731                    AddressSource::Config,
732                    peer.addresses.clone(),
733                )
734                .expect("Updating peer address overrides should not fail");
735        }
736
737        // Send initial peer addresses to the p2p network.
738        update_peer_addresses(
739            &config,
740            &endpoint_manager,
741            epoch_store.epoch_start_state(),
742            None,
743        );
744
745        info!("start snapshot upload");
746        // Start uploading state snapshot to remote store
747        let state_snapshot_handle = Self::start_state_snapshot(
748            &config,
749            &prometheus_registry,
750            checkpoint_store.clone(),
751            chain_identifier,
752        )?;
753
754        // Start uploading db checkpoints to remote store
755        info!("start db checkpoint");
756        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
757            &config,
758            &prometheus_registry,
759            state_snapshot_handle.is_some(),
760        )?;
761
762        if !epoch_store
763            .protocol_config()
764            .simplified_unwrap_then_delete()
765        {
766            // We cannot prune tombstones if simplified_unwrap_then_delete is not enabled.
767            config
768                .authority_store_pruning_config
769                .set_killswitch_tombstone_pruning(true);
770        }
771
772        let authority_name = config.protocol_public_key();
773
774        info!("create authority state");
775        let state = AuthorityState::new(
776            authority_name,
777            secret,
778            config.supported_protocol_versions.unwrap(),
779            store.clone(),
780            cache_traits.clone(),
781            epoch_store.clone(),
782            committee_store.clone(),
783            index_store.clone(),
784            rpc_index,
785            checkpoint_store.clone(),
786            &prometheus_registry,
787            genesis.objects(),
788            &db_checkpoint_config,
789            config.clone(),
790            chain_identifier,
791            config.policy_config.clone(),
792            config.firewall_config.clone(),
793            pruner_watermarks,
794        )
795        .await;
796        // ensure genesis txn was executed
797        if epoch_store.epoch() == 0 {
798            let txn = &genesis.transaction();
799            let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
800            let transaction =
801                sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
802                    sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
803                        genesis.transaction().data().clone(),
804                        sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
805                    ),
806                );
807            let _enter = span.enter();
808            state
809                .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
810                .unwrap();
811        }
812
813        // Start the loop that receives new randomness and generates transactions for it.
814        // The returned is long-lived (node lifetime).
815        let randomness_receiver_handle =
816            RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
817
818        let (end_of_epoch_channel, end_of_epoch_receiver) =
819            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
820
821        let transaction_orchestrator = if node_role.is_fullnode() && run_with_range.is_none() {
822            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
823                auth_agg.load_full(),
824                state.clone(),
825                end_of_epoch_receiver,
826                &config.db_path(),
827                &prometheus_registry,
828                &config,
829            )))
830        } else {
831            None
832        };
833
834        let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
835            state.clone(),
836            state_sync_store,
837            &transaction_orchestrator.clone(),
838            &config,
839            &prometheus_registry,
840            server_version,
841            node_role,
842        )
843        .await?;
844
845        let global_state_hasher = Arc::new(GlobalStateHasher::new(
846            cache_traits.global_state_hash_store.clone(),
847            GlobalStateHashMetrics::new(&prometheus_registry),
848        ));
849
850        let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
851            "sui",
852            &registry_service.default_registry(),
853        );
854
855        let connection_monitor_handle =
856            mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
857                p2p_network.downgrade(),
858                Arc::new(network_connection_metrics),
859                known_peers,
860            );
861
862        let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));
863
864        sui_node_metrics
865            .binary_max_protocol_version
866            .set(ProtocolVersion::MAX.as_u64() as i64);
867        sui_node_metrics
868            .configured_max_protocol_version
869            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
870
871        let node_role = epoch_store.node_role();
872        let validator_components = if node_role.runs_consensus() {
873            let mut components = Self::construct_validator_components(
874                config.clone(),
875                state.clone(),
876                committee,
877                epoch_store.clone(),
878                checkpoint_store.clone(),
879                state_sync_handle.clone(),
880                randomness_handle.clone(),
881                Arc::downgrade(&global_state_hasher),
882                backpressure_manager.clone(),
883                &registry_service,
884                sui_node_metrics.clone(),
885                checkpoint_metrics.clone(),
886                node_role,
887                randomness_receiver_handle.clone(),
888            )
889            .await?;
890
891            if node_role.is_validator() {
892                components
893                    .consensus_adapter
894                    .recover_end_of_publish(&epoch_store);
895
896                // Start the gRPC server
897                components.validator_server_handle = Some(
898                    components
899                        .validator_server_handle
900                        .take()
901                        .unwrap()
902                        .start()
903                        .await,
904                );
905
906                // Set the consensus address updater so that we can update the consensus peer addresses when requested.
907                endpoint_manager
908                    .set_consensus_address_updater(components.consensus_manager.clone());
909            } else {
910                info!("Starting node as Observer — connecting to configured peers");
911            }
912
913            Some(components)
914        } else {
915            None
916        };
917
918        // setup shutdown channel
919        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
920
921        let node = Self {
922            config,
923            validator_components: Mutex::new(validator_components),
924            http_servers,
925            state,
926            transaction_orchestrator,
927            registry_service,
928            metrics: sui_node_metrics,
929            checkpoint_metrics,
930
931            _discovery: discovery_handle,
932            _connection_monitor_handle: connection_monitor_handle,
933            state_sync_handle,
934            randomness_handle,
935            checkpoint_store,
936            global_state_hasher: Mutex::new(Some(global_state_hasher)),
937            end_of_epoch_channel,
938            endpoint_manager,
939            backpressure_manager,
940
941            _db_checkpoint_handle: db_checkpoint_handle,
942
943            #[cfg(msim)]
944            sim_state: Default::default(),
945
946            _state_snapshot_uploader_handle: state_snapshot_handle,
947            shutdown_channel_tx: shutdown_channel,
948            randomness_receiver_handle,
949
950            auth_agg,
951            subscription_service_checkpoint_sender,
952        };
953
954        info!("SuiNode started!");
955        let node = Arc::new(node);
956        let node_copy = node.clone();
957        spawn_monitored_task!(async move {
958            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
959            if let Err(error) = result {
960                warn!("Reconfiguration finished with error {:?}", error);
961            }
962        });
963
964        Ok(node)
965    }
966
967    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
968        self.end_of_epoch_channel.subscribe()
969    }
970
971    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
972        self.shutdown_channel_tx.subscribe()
973    }
974
975    pub fn current_epoch_for_testing(&self) -> EpochId {
976        self.state.current_epoch_for_testing()
977    }
978
979    pub fn db_checkpoint_path(&self) -> PathBuf {
980        self.config.db_checkpoint_path()
981    }
982
983    // Init reconfig process by starting to reject user certs
984    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
985        info!("close_epoch (current epoch = {})", epoch_store.epoch());
986        self.validator_components
987            .lock()
988            .await
989            .as_ref()
990            .ok_or_else(|| SuiError::from("Node is not a validator"))?
991            .consensus_adapter
992            .close_epoch(epoch_store);
993        Ok(())
994    }
995
996    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
997        self.state
998            .clear_override_protocol_upgrade_buffer_stake(epoch)
999    }
1000
1001    pub fn set_override_protocol_upgrade_buffer_stake(
1002        &self,
1003        epoch: EpochId,
1004        buffer_stake_bps: u64,
1005    ) -> SuiResult {
1006        self.state
1007            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1008    }
1009
1010    // Testing-only API to start epoch close process.
1011    // For production code, please use the non-testing version.
1012    pub async fn close_epoch_for_testing(&self) -> SuiResult {
1013        let epoch_store = self.state.epoch_store_for_testing();
1014        self.close_epoch(&epoch_store).await
1015    }
1016
1017    fn start_state_snapshot(
1018        config: &NodeConfig,
1019        prometheus_registry: &Registry,
1020        checkpoint_store: Arc<CheckpointStore>,
1021        chain_identifier: ChainIdentifier,
1022    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1023        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1024            let snapshot_uploader = StateSnapshotUploader::new(
1025                &config.db_checkpoint_path(),
1026                &config.snapshot_path(),
1027                remote_store_config.clone(),
1028                60,
1029                prometheus_registry,
1030                checkpoint_store,
1031                chain_identifier,
1032                config.state_snapshot_write_config.archive_interval_epochs,
1033            )?;
1034            Ok(Some(snapshot_uploader.start()))
1035        } else {
1036            Ok(None)
1037        }
1038    }
1039
1040    fn start_db_checkpoint(
1041        config: &NodeConfig,
1042        prometheus_registry: &Registry,
1043        state_snapshot_enabled: bool,
1044    ) -> Result<(
1045        DBCheckpointConfig,
1046        Option<tokio::sync::broadcast::Sender<()>>,
1047    )> {
1048        let checkpoint_path = Some(
1049            config
1050                .db_checkpoint_config
1051                .checkpoint_path
1052                .clone()
1053                .unwrap_or_else(|| config.db_checkpoint_path()),
1054        );
1055        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1056            DBCheckpointConfig {
1057                checkpoint_path,
1058                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1059                    true
1060                } else {
1061                    config
1062                        .db_checkpoint_config
1063                        .perform_db_checkpoints_at_epoch_end
1064                },
1065                ..config.db_checkpoint_config.clone()
1066            }
1067        } else {
1068            config.db_checkpoint_config.clone()
1069        };
1070
1071        match (
1072            db_checkpoint_config.object_store_config.as_ref(),
1073            state_snapshot_enabled,
1074        ) {
1075            // If db checkpoint config object store not specified but
1076            // state snapshot object store is specified, create handler
1077            // anyway for marking db checkpoints as completed so that they
1078            // can be uploaded as state snapshots.
1079            (None, false) => Ok((db_checkpoint_config, None)),
1080            (_, _) => {
1081                let handler = DBCheckpointHandler::new(
1082                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1083                    db_checkpoint_config.object_store_config.as_ref(),
1084                    60,
1085                    db_checkpoint_config
1086                        .prune_and_compact_before_upload
1087                        .unwrap_or(true),
1088                    config.authority_store_pruning_config.clone(),
1089                    prometheus_registry,
1090                    state_snapshot_enabled,
1091                )?;
1092                Ok((
1093                    db_checkpoint_config,
1094                    Some(DBCheckpointHandler::start(handler)),
1095                ))
1096            }
1097        }
1098    }
1099
1100    fn create_p2p_network(
1101        config: &NodeConfig,
1102        state_sync_store: RocksDbStore,
1103        chain_identifier: ChainIdentifier,
1104        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1105        prometheus_registry: &Registry,
1106    ) -> Result<P2pComponents> {
1107        let mut p2p_config = config.p2p_config.clone();
1108        {
1109            let disc = p2p_config.discovery.get_or_insert_with(Default::default);
1110            if disc.peer_addr_store_path.is_none() {
1111                disc.peer_addr_store_path =
1112                    Some(config.db_path().join("discovery_peer_cache.yaml"));
1113            }
1114        }
1115        let mut discovery_builder = discovery::Builder::new().config(p2p_config.clone());
1116        if let Some(consensus_config) = &config.consensus_config {
1117            let effective_addr = consensus_config
1118                .external_address
1119                .as_ref()
1120                .or(consensus_config.listen_address.as_ref());
1121            if let Some(addr) = effective_addr {
1122                discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1123            }
1124        }
1125        let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1126        let discovery_sender = discovery.sender();
1127
1128        let (state_sync, state_sync_router) = state_sync::Builder::new()
1129            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1130            .store(state_sync_store)
1131            .archive_config(config.archive_reader_config())
1132            .discovery_sender(discovery_sender)
1133            .with_metrics(prometheus_registry)
1134            .build();
1135
1136        let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1137        let known_peers: HashMap<PeerId, String> = discovery_config
1138            .allowlisted_peers
1139            .clone()
1140            .into_iter()
1141            .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1142            .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1143                peer.peer_id
1144                    .map(|peer_id| (peer_id, "seed_peer".to_string()))
1145            }))
1146            .collect();
1147
1148        let (randomness, randomness_router) =
1149            randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1150                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1151                .with_metrics(prometheus_registry)
1152                .build();
1153
1154        let p2p_network = {
1155            let routes = anemo::Router::new()
1156                .add_rpc_service(discovery_server)
1157                .merge(state_sync_router);
1158            let routes = routes.merge(randomness_router);
1159
1160            let inbound_network_metrics =
1161                mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1162            let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1163                "sui",
1164                "outbound",
1165                prometheus_registry,
1166            );
1167
1168            let service = ServiceBuilder::new()
1169                .layer(
1170                    TraceLayer::new_for_server_errors()
1171                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1172                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1173                )
1174                .layer(CallbackLayer::new(
1175                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1176                        Arc::new(inbound_network_metrics),
1177                        config.p2p_config.excessive_message_size(),
1178                    ),
1179                ))
1180                .service(routes);
1181
1182            let outbound_layer = ServiceBuilder::new()
1183                .layer(
1184                    TraceLayer::new_for_client_and_server_errors()
1185                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1186                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1187                )
1188                .layer(CallbackLayer::new(
1189                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1190                        Arc::new(outbound_network_metrics),
1191                        config.p2p_config.excessive_message_size(),
1192                    ),
1193                ))
1194                .into_inner();
1195
1196            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1197            // Inbound requests on this network are small (signatures, queries, summaries).
1198            // Cap request frames at 1 MiB.
1199            anemo_config.max_request_frame_size = Some(1 << 20);
1200            // Responses can be larger (checkpoint contents).
1201            // Cap response frames at 128 MiB.
1202            anemo_config.max_response_frame_size = Some(128 << 20);
1203
1204            // Set a higher default value for socket send/receive buffers if not already
1205            // configured.
1206            let mut quic_config = anemo_config.quic.unwrap_or_default();
1207            if quic_config.socket_send_buffer_size.is_none() {
1208                quic_config.socket_send_buffer_size = Some(20 << 20);
1209            }
1210            if quic_config.socket_receive_buffer_size.is_none() {
1211                quic_config.socket_receive_buffer_size = Some(20 << 20);
1212            }
1213            quic_config.allow_failed_socket_buffer_size_setting = true;
1214
1215            // Set high-performance defaults for quinn transport.
1216            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1217            if quic_config.max_concurrent_bidi_streams.is_none() {
1218                quic_config.max_concurrent_bidi_streams = Some(500);
1219            }
1220            if quic_config.max_concurrent_uni_streams.is_none() {
1221                quic_config.max_concurrent_uni_streams = Some(500);
1222            }
1223            if quic_config.stream_receive_window.is_none() {
1224                quic_config.stream_receive_window = Some(100 << 20);
1225            }
1226            if quic_config.receive_window.is_none() {
1227                quic_config.receive_window = Some(200 << 20);
1228            }
1229            if quic_config.send_window.is_none() {
1230                quic_config.send_window = Some(200 << 20);
1231            }
1232            if quic_config.crypto_buffer_size.is_none() {
1233                quic_config.crypto_buffer_size = Some(1 << 20);
1234            }
1235            if quic_config.max_idle_timeout_ms.is_none() {
1236                quic_config.max_idle_timeout_ms = Some(10_000);
1237            }
1238            if quic_config.keep_alive_interval_ms.is_none() {
1239                quic_config.keep_alive_interval_ms = Some(5_000);
1240            }
1241            anemo_config.quic = Some(quic_config);
1242
1243            let server_name = format!("sui-{}", chain_identifier);
1244            let network = Network::bind(config.p2p_config.listen_address)
1245                .server_name(&server_name)
1246                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1247                .config(anemo_config)
1248                .outbound_request_layer(outbound_layer)
1249                .start(service)?;
1250            info!(
1251                server_name = server_name,
1252                "P2p network started on {}",
1253                network.local_addr()
1254            );
1255
1256            network
1257        };
1258
1259        let discovery_handle =
1260            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1261        let state_sync_handle = state_sync.start(p2p_network.clone());
1262        let randomness_handle = randomness.start(p2p_network.clone());
1263
1264        Ok(P2pComponents {
1265            p2p_network,
1266            known_peers,
1267            discovery_handle,
1268            state_sync_handle,
1269            randomness_handle,
1270            endpoint_manager,
1271        })
1272    }
1273
1274    async fn construct_validator_components(
1275        config: NodeConfig,
1276        state: Arc<AuthorityState>,
1277        committee: Arc<Committee>,
1278        epoch_store: Arc<AuthorityPerEpochStore>,
1279        checkpoint_store: Arc<CheckpointStore>,
1280        state_sync_handle: state_sync::Handle,
1281        randomness_handle: randomness::Handle,
1282        global_state_hasher: Weak<GlobalStateHasher>,
1283        backpressure_manager: Arc<BackpressureManager>,
1284        registry_service: &RegistryService,
1285        sui_node_metrics: Arc<SuiNodeMetrics>,
1286        checkpoint_metrics: Arc<CheckpointMetrics>,
1287        node_role: NodeRole,
1288        randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
1289    ) -> Result<ValidatorComponents> {
1290        let mut config_clone = config.clone();
1291        let consensus_config = config_clone
1292            .consensus_config
1293            .as_mut()
1294            .ok_or_else(|| anyhow!("Node is missing consensus config"))?;
1295
1296        let client = Arc::new(UpdatableConsensusClient::new());
1297        let inflight_slot_freed_notify = Arc::new(tokio::sync::Notify::new());
1298        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1299            &committee,
1300            consensus_config,
1301            state.name,
1302            &registry_service.default_registry(),
1303            client.clone(),
1304            checkpoint_store.clone(),
1305            inflight_slot_freed_notify.clone(),
1306        ));
1307
1308        let consensus_manager = Arc::new(ConsensusManager::new(
1309            &config,
1310            consensus_config,
1311            registry_service,
1312            client,
1313            node_role,
1314        ));
1315
1316        // This only gets started up once, not on every epoch. (Make call to remove every epoch.)
1317        let consensus_store_pruner = ConsensusStorePruner::new(
1318            consensus_manager.get_storage_base_path(),
1319            consensus_config.db_retention_epochs(),
1320            consensus_config.db_pruner_period(),
1321            &registry_service.default_registry(),
1322        );
1323
1324        let sui_tx_validator_metrics =
1325            SuiTxValidatorMetrics::new(&registry_service.default_registry());
1326
1327        let (validator_server_handle, admission_queue) = if node_role.is_validator() {
1328            let (handle, queue) = Self::start_grpc_validator_service(
1329                &config,
1330                state.clone(),
1331                consensus_adapter.clone(),
1332                epoch_store.clone(),
1333                &registry_service.default_registry(),
1334                inflight_slot_freed_notify,
1335            )
1336            .await?;
1337            (Some(handle), queue)
1338        } else {
1339            (None, None)
1340        };
1341
1342        // Starts an overload monitor that monitors the execution of the authority.
1343        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1344        let validator_overload_monitor_handle = if node_role.is_validator()
1345            && config
1346                .authority_overload_config
1347                .max_load_shedding_percentage
1348                > 0
1349        {
1350            let authority_state = Arc::downgrade(&state);
1351            let overload_config = config.authority_overload_config.clone();
1352            fail_point!("starting_overload_monitor");
1353            Some(spawn_monitored_task!(overload_monitor(
1354                authority_state,
1355                overload_config,
1356            )))
1357        } else {
1358            None
1359        };
1360
1361        Self::start_epoch_specific_validator_components(
1362            &config,
1363            state.clone(),
1364            consensus_adapter,
1365            checkpoint_store,
1366            epoch_store,
1367            state_sync_handle,
1368            randomness_handle,
1369            randomness_receiver_handle,
1370            consensus_manager,
1371            consensus_store_pruner,
1372            global_state_hasher,
1373            backpressure_manager,
1374            validator_server_handle,
1375            validator_overload_monitor_handle,
1376            checkpoint_metrics,
1377            sui_node_metrics,
1378            sui_tx_validator_metrics,
1379            admission_queue,
1380            node_role,
1381        )
1382        .await
1383    }
1384
1385    async fn start_epoch_specific_validator_components(
1386        config: &NodeConfig,
1387        state: Arc<AuthorityState>,
1388        consensus_adapter: Arc<ConsensusAdapter>,
1389        checkpoint_store: Arc<CheckpointStore>,
1390        epoch_store: Arc<AuthorityPerEpochStore>,
1391        state_sync_handle: state_sync::Handle,
1392        randomness_handle: randomness::Handle,
1393        randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
1394        consensus_manager: Arc<ConsensusManager>,
1395        consensus_store_pruner: ConsensusStorePruner,
1396        state_hasher: Weak<GlobalStateHasher>,
1397        backpressure_manager: Arc<BackpressureManager>,
1398        validator_server_handle: Option<SpawnOnce>,
1399        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1400        checkpoint_metrics: Arc<CheckpointMetrics>,
1401        sui_node_metrics: Arc<SuiNodeMetrics>,
1402        sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1403        admission_queue: Option<AdmissionQueueContext>,
1404        node_role: NodeRole,
1405    ) -> Result<ValidatorComponents> {
1406        let checkpoint_service = Self::build_checkpoint_service(
1407            config,
1408            consensus_adapter.clone(),
1409            checkpoint_store.clone(),
1410            epoch_store.clone(),
1411            state.clone(),
1412            state_sync_handle,
1413            state_hasher,
1414            checkpoint_metrics.clone(),
1415            node_role,
1416        );
1417
1418        // Clear the VSS public key from the previous epoch so any randomness round
1419        // signatures buffer in the channel until the new DKG completes.
1420        randomness_receiver_handle.clear_public_key();
1421
1422        if node_role.runs_consensus() && epoch_store.randomness_state_enabled() {
1423            let authority_key_pair = if node_role.is_validator() {
1424                Some(config.protocol_key_pair())
1425            } else {
1426                None
1427            };
1428            let randomness_manager = RandomnessManager::try_new(
1429                Arc::downgrade(&epoch_store),
1430                Box::new(consensus_adapter.clone()),
1431                randomness_handle,
1432                authority_key_pair,
1433                randomness_receiver_handle.clone(),
1434            )
1435            .await;
1436            if let Some(randomness_manager) = randomness_manager {
1437                epoch_store
1438                    .set_randomness_manager(randomness_manager)
1439                    .await?;
1440            }
1441        }
1442
1443        if node_role.is_validator() {
1444            ExecutionTimeObserver::spawn(
1445                epoch_store.clone(),
1446                Box::new(consensus_adapter.clone()),
1447                config
1448                    .execution_time_observer_config
1449                    .clone()
1450                    .unwrap_or_default(),
1451            );
1452        }
1453
1454        let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1455            None,
1456            state.metrics.clone(),
1457        ));
1458
1459        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1460            state.clone(),
1461            checkpoint_service.clone(),
1462            epoch_store.clone(),
1463            consensus_adapter.clone(),
1464            throughput_calculator,
1465            backpressure_manager,
1466            config.congestion_log.clone(),
1467        );
1468
1469        info!("Starting consensus manager asynchronously");
1470
1471        // Spawn consensus startup asynchronously to avoid blocking other components
1472        tokio::spawn({
1473            let config = config.clone();
1474            let epoch_store = epoch_store.clone();
1475            let sui_tx_validator = SuiTxValidator::new(
1476                state.clone(),
1477                epoch_store.clone(),
1478                checkpoint_service.clone(),
1479                sui_tx_validator_metrics.clone(),
1480            );
1481            let consensus_manager = consensus_manager.clone();
1482            async move {
1483                consensus_manager
1484                    .start(
1485                        &config,
1486                        epoch_store,
1487                        consensus_handler_initializer,
1488                        sui_tx_validator,
1489                        Some(randomness_receiver_handle),
1490                    )
1491                    .await;
1492            }
1493        });
1494        let replay_waiter = consensus_manager.replay_waiter();
1495
1496        info!("Spawning checkpoint service");
1497        let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1498            None
1499        } else {
1500            Some(replay_waiter)
1501        };
1502        checkpoint_service
1503            .spawn(epoch_store.clone(), replay_waiter)
1504            .await;
1505
1506        if node_role.is_validator() && epoch_store.authenticator_state_enabled() {
1507            Self::start_jwk_updater(
1508                config,
1509                sui_node_metrics,
1510                state.name,
1511                epoch_store.clone(),
1512                consensus_adapter.clone(),
1513            );
1514        }
1515
1516        if let Some(ctx) = &admission_queue {
1517            ctx.rotate_for_epoch(epoch_store);
1518        }
1519
1520        Ok(ValidatorComponents {
1521            validator_server_handle,
1522            validator_overload_monitor_handle,
1523            consensus_manager,
1524            consensus_store_pruner,
1525            consensus_adapter,
1526            checkpoint_metrics,
1527            sui_tx_validator_metrics,
1528            admission_queue,
1529        })
1530    }
1531
1532    fn build_checkpoint_service(
1533        config: &NodeConfig,
1534        consensus_adapter: Arc<ConsensusAdapter>,
1535        checkpoint_store: Arc<CheckpointStore>,
1536        epoch_store: Arc<AuthorityPerEpochStore>,
1537        state: Arc<AuthorityState>,
1538        state_sync_handle: state_sync::Handle,
1539        state_hasher: Weak<GlobalStateHasher>,
1540        checkpoint_metrics: Arc<CheckpointMetrics>,
1541        node_role: NodeRole,
1542    ) -> Arc<CheckpointService> {
1543        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1544        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1545
1546        debug!(
1547            "Starting checkpoint service with epoch start timestamp {}
1548            and epoch duration {}",
1549            epoch_start_timestamp_ms, epoch_duration_ms
1550        );
1551
1552        let checkpoint_output: Box<dyn CheckpointOutput> = if node_role.is_validator() {
1553            Box::new(SubmitCheckpointToConsensus {
1554                sender: consensus_adapter,
1555                signer: state.secret.clone(),
1556                authority: config.protocol_public_key(),
1557                next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1558                    .checked_add(epoch_duration_ms)
1559                    .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1560                metrics: checkpoint_metrics.clone(),
1561            })
1562        } else {
1563            LogCheckpointOutput::boxed()
1564        };
1565
1566        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1567
1568        CheckpointService::build(
1569            state.clone(),
1570            checkpoint_store,
1571            epoch_store,
1572            state.get_transaction_cache_reader().clone(),
1573            state_hasher,
1574            checkpoint_output,
1575            Box::new(certified_checkpoint_output),
1576            checkpoint_metrics,
1577        )
1578    }
1579
1580    fn construct_consensus_adapter(
1581        committee: &Committee,
1582        consensus_config: &ConsensusConfig,
1583        authority: AuthorityName,
1584        prometheus_registry: &Registry,
1585        consensus_client: Arc<dyn ConsensusClient>,
1586        checkpoint_store: Arc<CheckpointStore>,
1587        inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1588    ) -> ConsensusAdapter {
1589        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1590        // The consensus adapter allows the authority to send user certificates through consensus.
1591
1592        ConsensusAdapter::new(
1593            consensus_client,
1594            checkpoint_store,
1595            authority,
1596            consensus_config.max_pending_transactions(),
1597            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1598            ca_metrics,
1599            inflight_slot_freed_notify,
1600        )
1601    }
1602
1603    async fn start_grpc_validator_service(
1604        config: &NodeConfig,
1605        state: Arc<AuthorityState>,
1606        consensus_adapter: Arc<ConsensusAdapter>,
1607        epoch_store: Arc<AuthorityPerEpochStore>,
1608        prometheus_registry: &Registry,
1609        inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1610    ) -> Result<(SpawnOnce, Option<AdmissionQueueContext>)> {
1611        let overload_config = &config.authority_overload_config;
1612        let admission_queue = overload_config.admission_queue_enabled.then(|| {
1613            let manager = Arc::new(AdmissionQueueManager::new(
1614                consensus_adapter.clone(),
1615                Arc::new(AdmissionQueueMetrics::new(prometheus_registry)),
1616                overload_config.admission_queue_capacity_fraction,
1617                overload_config.admission_queue_bypass_fraction,
1618                overload_config.admission_queue_failover_timeout,
1619                inflight_slot_freed_notify,
1620            ));
1621            AdmissionQueueContext::spawn(manager, epoch_store)
1622        });
1623        let validator_service = ValidatorService::new(
1624            state.clone(),
1625            consensus_adapter,
1626            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1627            config.policy_config.clone().map(|p| p.client_id_source),
1628            admission_queue.clone(),
1629        );
1630
1631        let mut server_conf = mysten_network::config::Config::new();
1632        server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1633        server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1634        server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1635        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1636        server_conf.load_shed = config.grpc_load_shed;
1637        let mut server_builder =
1638            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1639
1640        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1641
1642        let tls_config = sui_tls::create_rustls_server_config(
1643            config.network_key_pair().copy().private(),
1644            SUI_TLS_SERVER_NAME.to_string(),
1645        );
1646
1647        let network_address = config.network_address().clone();
1648
1649        let (ready_tx, ready_rx) = oneshot::channel();
1650
1651        let spawn_once = SpawnOnce::new(ready_rx, async move {
1652            let server = server_builder
1653                .bind(&network_address, Some(tls_config))
1654                .await
1655                .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1656            let local_addr = server.local_addr();
1657            info!("Listening to traffic on {local_addr}");
1658            ready_tx.send(()).unwrap();
1659            if let Err(err) = server.serve().await {
1660                info!("Server stopped: {err}");
1661            }
1662            info!("Server stopped");
1663        });
1664        Ok((spawn_once, admission_queue))
1665    }
1666
1667    pub fn state(&self) -> Arc<AuthorityState> {
1668        self.state.clone()
1669    }
1670
1671    #[cfg(any(test, msim))]
1672    pub fn connection_monitor_handle_for_testing(
1673        &self,
1674    ) -> &mysten_network::anemo_connection_monitor::ConnectionMonitorHandle {
1675        &self._connection_monitor_handle
1676    }
1677
1678    pub fn node_role(&self) -> NodeRole {
1679        self.state.load_epoch_store_one_call_per_task().node_role()
1680    }
1681
1682    // Only used for testing because of how epoch store is loaded.
1683    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1684        self.state.reference_gas_price_for_testing()
1685    }
1686
1687    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1688        self.state.committee_store().clone()
1689    }
1690
1691    pub fn clone_checkpoint_store(&self) -> Arc<CheckpointStore> {
1692        self.checkpoint_store.clone()
1693    }
1694
1695    pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1696        self.state.authority_store()
1697    }
1698
1699    pub fn clone_consensus_store(
1700        &self,
1701    ) -> Option<Arc<consensus_core::storage::rocksdb_store::RocksDBStore>> {
1702        self.validator_components
1703            .try_lock()
1704            .ok()?
1705            .as_ref()?
1706            .consensus_manager
1707            .consensus_store()
1708    }
1709
1710    /// Clone an AuthorityAggregator currently used in this node, if the node is a fullnode.
1711    /// After reconfig, Transaction Driver builds a new AuthorityAggregator. The caller
1712    /// of this function will mostly likely want to call this again
1713    /// to get a fresh one.
1714    pub fn clone_authority_aggregator(
1715        &self,
1716    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1717        self.transaction_orchestrator
1718            .as_ref()
1719            .map(|to| to.clone_authority_aggregator())
1720    }
1721
1722    pub fn transaction_orchestrator(
1723        &self,
1724    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1725        self.transaction_orchestrator.clone()
1726    }
1727
1728    /// This function awaits the completion of checkpoint execution of the current epoch,
1729    /// after which it initiates reconfiguration of the entire system.
1730    pub async fn monitor_reconfiguration(
1731        self: Arc<Self>,
1732        mut epoch_store: Arc<AuthorityPerEpochStore>,
1733    ) -> Result<()> {
1734        let checkpoint_executor_metrics =
1735            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1736
1737        loop {
1738            let mut hasher_guard = self.global_state_hasher.lock().await;
1739            let hasher = hasher_guard.take().unwrap();
1740            info!(
1741                "Creating checkpoint executor for epoch {}",
1742                epoch_store.epoch()
1743            );
1744            let checkpoint_executor = CheckpointExecutor::new(
1745                epoch_store.clone(),
1746                self.checkpoint_store.clone(),
1747                self.state.clone(),
1748                hasher.clone(),
1749                self.backpressure_manager.clone(),
1750                self.config.checkpoint_executor_config.clone(),
1751                checkpoint_executor_metrics.clone(),
1752                self.subscription_service_checkpoint_sender.clone(),
1753            );
1754
1755            let run_with_range = self.config.run_with_range;
1756
1757            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1758
1759            // Update the current protocol version metric.
1760            self.metrics
1761                .current_protocol_version
1762                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1763
1764            // Advertise capabilities to committee, if we are a validator.
1765            // FullNodes that state sync via consensus will also have validator components, by they are not supposed to submit any capabilities.
1766            if let Some(components) = &*self.validator_components.lock().await
1767                && cur_epoch_store.is_validator()
1768            {
1769                // TODO: without this sleep, the consensus message is not delivered reliably.
1770                tokio::time::sleep(Duration::from_millis(1)).await;
1771
1772                let config = cur_epoch_store.protocol_config();
1773                let mut supported_protocol_versions = self
1774                    .config
1775                    .supported_protocol_versions
1776                    .expect("Supported versions should be populated")
1777                    // no need to send digests of versions less than the current version
1778                    .truncate_below(config.version);
1779
1780                while supported_protocol_versions.max > config.version {
1781                    let proposed_protocol_config = ProtocolConfig::get_for_version(
1782                        supported_protocol_versions.max,
1783                        cur_epoch_store.get_chain(),
1784                    );
1785
1786                    if proposed_protocol_config.enable_accumulators()
1787                        && !epoch_store.accumulator_root_exists()
1788                    {
1789                        error!(
1790                            "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1791                            supported_protocol_versions.max
1792                        );
1793                        supported_protocol_versions.max = supported_protocol_versions.max.prev();
1794                    } else {
1795                        break;
1796                    }
1797                }
1798
1799                let binary_config = config.binary_config(None);
1800                let transaction = ConsensusTransaction::new_capability_notification_v2(
1801                    AuthorityCapabilitiesV2::new(
1802                        self.state.name,
1803                        cur_epoch_store.get_chain_identifier().chain(),
1804                        supported_protocol_versions,
1805                        self.state
1806                            .get_available_system_packages(&binary_config)
1807                            .await,
1808                    ),
1809                );
1810                info!(?transaction, "submitting capabilities to consensus");
1811                components.consensus_adapter.submit(
1812                    transaction,
1813                    None,
1814                    &cur_epoch_store,
1815                    None,
1816                    None,
1817                )?;
1818            }
1819
1820            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1821
1822            if stop_condition == StopReason::RunWithRangeCondition {
1823                SuiNode::shutdown(&self).await;
1824                self.shutdown_channel_tx
1825                    .send(run_with_range)
1826                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1827                return Ok(());
1828            }
1829
1830            // Safe to call because we are in the middle of reconfiguration.
1831            let latest_system_state = self
1832                .state
1833                .get_object_cache_reader()
1834                .get_sui_system_state_object_unsafe()
1835                .expect("Read Sui System State object cannot fail");
1836
1837            #[cfg(msim)]
1838            if !self
1839                .sim_state
1840                .sim_safe_mode_expected
1841                .load(Ordering::Relaxed)
1842            {
1843                debug_assert!(!latest_system_state.safe_mode());
1844            }
1845
1846            #[cfg(not(msim))]
1847            debug_assert!(!latest_system_state.safe_mode());
1848
1849            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1850                && self.state.is_fullnode(&cur_epoch_store)
1851            {
1852                warn!(
1853                    "Failed to send end of epoch notification to subscriber: {:?}",
1854                    err
1855                );
1856            }
1857
1858            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1859            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1860
1861            self.auth_agg.store(Arc::new(
1862                self.auth_agg
1863                    .load()
1864                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1865            ));
1866
1867            let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1868            let next_epoch = next_epoch_committee.epoch();
1869            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1870
1871            info!(
1872                next_epoch,
1873                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1874            );
1875
1876            fail_point_async!("reconfig_delay");
1877
1878            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1879
1880            update_peer_addresses(
1881                &self.config,
1882                &self.endpoint_manager,
1883                &new_epoch_start_state,
1884                Some(cur_epoch_store.epoch_start_state()),
1885            );
1886
1887            let mut validator_components_lock_guard = self.validator_components.lock().await;
1888
1889            // The following code handles 4 different cases, depending on whether the node
1890            // was a validator in the previous epoch, and whether the node is a validator
1891            // in the new epoch.
1892            let new_epoch_store = self
1893                .reconfigure_state(
1894                    &self.state,
1895                    &cur_epoch_store,
1896                    next_epoch_committee.clone(),
1897                    new_epoch_start_state,
1898                    hasher.clone(),
1899                )
1900                .await;
1901
1902            let new_role = new_epoch_store.node_role();
1903
1904            let new_validator_components = if let Some(ValidatorComponents {
1905                validator_server_handle,
1906                validator_overload_monitor_handle,
1907                consensus_manager,
1908                consensus_store_pruner,
1909                consensus_adapter,
1910                checkpoint_metrics,
1911                sui_tx_validator_metrics,
1912                admission_queue,
1913            }) = validator_components_lock_guard.take()
1914            {
1915                info!("Reconfiguring node (was running consensus).");
1916
1917                consensus_manager.shutdown().await;
1918                info!("Consensus has shut down.");
1919
1920                info!("Epoch store finished reconfiguration.");
1921
1922                // No other components should be holding a strong reference to state hasher
1923                // at this point. Confirm here before we swap in the new hasher.
1924                let global_state_hasher_metrics = Arc::into_inner(hasher)
1925                    .expect("Object state hasher should have no other references at this point")
1926                    .metrics();
1927                let new_hasher = Arc::new(GlobalStateHasher::new(
1928                    self.state.get_global_state_hash_store().clone(),
1929                    global_state_hasher_metrics,
1930                ));
1931                let weak_hasher = Arc::downgrade(&new_hasher);
1932                *hasher_guard = Some(new_hasher);
1933
1934                consensus_store_pruner.prune(next_epoch).await;
1935
1936                if new_role.runs_consensus() {
1937                    info!("Restarting consensus as {new_role}");
1938                    Some(
1939                        Self::start_epoch_specific_validator_components(
1940                            &self.config,
1941                            self.state.clone(),
1942                            consensus_adapter,
1943                            self.checkpoint_store.clone(),
1944                            new_epoch_store.clone(),
1945                            self.state_sync_handle.clone(),
1946                            self.randomness_handle.clone(),
1947                            self.randomness_receiver_handle.clone(),
1948                            consensus_manager,
1949                            consensus_store_pruner,
1950                            weak_hasher,
1951                            self.backpressure_manager.clone(),
1952                            validator_server_handle,
1953                            validator_overload_monitor_handle,
1954                            checkpoint_metrics,
1955                            self.metrics.clone(),
1956                            sui_tx_validator_metrics,
1957                            admission_queue,
1958                            new_role,
1959                        )
1960                        .await?,
1961                    )
1962                } else {
1963                    info!(
1964                        "This node has new role {new_role} and no longer runs consensus after reconfiguration"
1965                    );
1966                    None
1967                }
1968            } else {
1969                // No other components should be holding a strong reference to state hasher
1970                // at this point. Confirm here before we swap in the new hasher.
1971                let global_state_hasher_metrics = Arc::into_inner(hasher)
1972                    .expect("Object state hasher should have no other references at this point")
1973                    .metrics();
1974                let new_hasher = Arc::new(GlobalStateHasher::new(
1975                    self.state.get_global_state_hash_store().clone(),
1976                    global_state_hasher_metrics,
1977                ));
1978                let weak_hasher = Arc::downgrade(&new_hasher);
1979                *hasher_guard = Some(new_hasher);
1980
1981                if new_role.runs_consensus() {
1982                    info!("Promoting node to {new_role}, starting consensus components");
1983
1984                    let mut components = Self::construct_validator_components(
1985                        self.config.clone(),
1986                        self.state.clone(),
1987                        Arc::new(next_epoch_committee.clone()),
1988                        new_epoch_store.clone(),
1989                        self.checkpoint_store.clone(),
1990                        self.state_sync_handle.clone(),
1991                        self.randomness_handle.clone(),
1992                        weak_hasher,
1993                        self.backpressure_manager.clone(),
1994                        &self.registry_service,
1995                        self.metrics.clone(),
1996                        self.checkpoint_metrics.clone(),
1997                        new_role,
1998                        self.randomness_receiver_handle.clone(),
1999                    )
2000                    .await?;
2001
2002                    if new_role.is_validator() {
2003                        components.validator_server_handle = Some(
2004                            components
2005                                .validator_server_handle
2006                                .take()
2007                                .unwrap()
2008                                .start()
2009                                .await,
2010                        );
2011
2012                        self.endpoint_manager
2013                            .set_consensus_address_updater(components.consensus_manager.clone());
2014                    }
2015
2016                    Some(components)
2017                } else {
2018                    None
2019                }
2020            };
2021            *validator_components_lock_guard = new_validator_components;
2022
2023            // Force releasing current epoch store DB handle, because the
2024            // Arc<AuthorityPerEpochStore> may linger.
2025            cur_epoch_store.release_db_handles();
2026
2027            if cfg!(msim)
2028                && !matches!(
2029                    self.config
2030                        .authority_store_pruning_config
2031                        .num_epochs_to_retain_for_checkpoints(),
2032                    None | Some(u64::MAX) | Some(0)
2033                )
2034            {
2035                self.state
2036                    .prune_checkpoints_for_eligible_epochs_for_testing(
2037                        self.config.clone(),
2038                        sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2039                    )
2040                    .await?;
2041            }
2042
2043            epoch_store = new_epoch_store;
2044            info!("Reconfiguration finished");
2045        }
2046    }
2047
2048    async fn shutdown(&self) {
2049        if let Some(validator_components) = &*self.validator_components.lock().await {
2050            validator_components.consensus_manager.shutdown().await;
2051        }
2052    }
2053
2054    async fn reconfigure_state(
2055        &self,
2056        state: &Arc<AuthorityState>,
2057        cur_epoch_store: &AuthorityPerEpochStore,
2058        next_epoch_committee: Committee,
2059        next_epoch_start_system_state: EpochStartSystemState,
2060        global_state_hasher: Arc<GlobalStateHasher>,
2061    ) -> Arc<AuthorityPerEpochStore> {
2062        let next_epoch = next_epoch_committee.epoch();
2063
2064        let last_checkpoint = self
2065            .checkpoint_store
2066            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2067            .expect("Error loading last checkpoint for current epoch")
2068            .expect("Could not load last checkpoint for current epoch");
2069
2070        let last_checkpoint_seq = *last_checkpoint.sequence_number();
2071
2072        assert_eq!(
2073            Some(last_checkpoint_seq),
2074            self.checkpoint_store
2075                .get_highest_executed_checkpoint_seq_number()
2076                .expect("Error loading highest executed checkpoint sequence number")
2077        );
2078
2079        let epoch_start_configuration = EpochStartConfiguration::new(
2080            next_epoch_start_system_state,
2081            *last_checkpoint.digest(),
2082            state.get_object_store().as_ref(),
2083            EpochFlag::default_flags_for_new_epoch(&state.config),
2084        )
2085        .expect("EpochStartConfiguration construction cannot fail");
2086
2087        let new_epoch_store = self
2088            .state
2089            .reconfigure(
2090                cur_epoch_store,
2091                self.config.supported_protocol_versions.unwrap(),
2092                next_epoch_committee,
2093                epoch_start_configuration,
2094                global_state_hasher,
2095                &self.config.expensive_safety_check_config,
2096                last_checkpoint_seq,
2097            )
2098            .await
2099            .expect("Reconfigure authority state cannot fail");
2100        info!(next_epoch, "Node State has been reconfigured");
2101        assert_eq!(next_epoch, new_epoch_store.epoch());
2102        self.state.get_reconfig_api().update_epoch_flags_metrics(
2103            cur_epoch_store.epoch_start_config().flags(),
2104            new_epoch_store.epoch_start_config().flags(),
2105        );
2106
2107        new_epoch_store
2108    }
2109
2110    pub fn get_config(&self) -> &NodeConfig {
2111        &self.config
2112    }
2113
2114    pub fn randomness_handle(&self) -> randomness::Handle {
2115        self.randomness_handle.clone()
2116    }
2117
2118    pub fn state_sync_handle(&self) -> state_sync::Handle {
2119        self.state_sync_handle.clone()
2120    }
2121
2122    pub fn endpoint_manager(&self) -> &EndpointManager {
2123        &self.endpoint_manager
2124    }
2125
2126    /// Get a short prefix of a digest for metric labels
2127    fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2128        let digest_str = digest.to_string();
2129        if digest_str.len() >= 8 {
2130            digest_str[0..8].to_string()
2131        } else {
2132            digest_str
2133        }
2134    }
2135
2136    /// Check for previously detected forks and handle them appropriately.
2137    /// For validators with fork recovery config, clear the fork if it matches the recovery config.
2138    /// For all other cases, block node startup if a fork is detected.
2139    async fn check_and_recover_forks(
2140        checkpoint_store: &CheckpointStore,
2141        checkpoint_metrics: &CheckpointMetrics,
2142        fork_recovery: Option<&ForkRecoveryConfig>,
2143    ) -> Result<()> {
2144        // Try to recover from forks if recovery config is provided
2145        if let Some(recovery) = fork_recovery {
2146            Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2147            Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2148        }
2149
2150        if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2151            .get_checkpoint_fork_detected()
2152            .map_err(|e| {
2153                error!("Failed to check for checkpoint fork: {:?}", e);
2154                e
2155            })?
2156        {
2157            Self::handle_checkpoint_fork(
2158                checkpoint_seq,
2159                checkpoint_digest,
2160                checkpoint_metrics,
2161                fork_recovery,
2162            )
2163            .await?;
2164        }
2165        if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2166            .get_transaction_fork_detected()
2167            .map_err(|e| {
2168                error!("Failed to check for transaction fork: {:?}", e);
2169                e
2170            })?
2171        {
2172            Self::handle_transaction_fork(
2173                tx_digest,
2174                expected_effects,
2175                actual_effects,
2176                checkpoint_metrics,
2177                fork_recovery,
2178            )
2179            .await?;
2180        }
2181
2182        Ok(())
2183    }
2184
2185    fn try_recover_checkpoint_fork(
2186        checkpoint_store: &CheckpointStore,
2187        recovery: &ForkRecoveryConfig,
2188    ) -> Result<()> {
2189        // If configured overrides include a checkpoint whose locally computed digest mismatches,
2190        // clear locally computed checkpoints from that sequence (inclusive).
2191        for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2192            let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2193                anyhow::bail!(
2194                    "Invalid checkpoint digest override for seq {}: {}",
2195                    seq,
2196                    expected_digest_str
2197                );
2198            };
2199
2200            if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2201                let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2202                if local_digest != expected_digest {
2203                    info!(
2204                        seq,
2205                        local = %Self::get_digest_prefix(local_digest),
2206                        expected = %Self::get_digest_prefix(expected_digest),
2207                        "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2208                        seq
2209                    );
2210                    checkpoint_store
2211                        .clear_locally_computed_checkpoints_from(*seq)
2212                        .context(
2213                            "Failed to clear locally computed checkpoints from override seq",
2214                        )?;
2215                }
2216            }
2217        }
2218
2219        if let Some((checkpoint_seq, checkpoint_digest)) =
2220            checkpoint_store.get_checkpoint_fork_detected()?
2221            && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2222        {
2223            info!(
2224                "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2225                checkpoint_seq, checkpoint_digest
2226            );
2227            checkpoint_store
2228                .clear_checkpoint_fork_detected()
2229                .expect("Failed to clear checkpoint fork detected marker");
2230        }
2231        Ok(())
2232    }
2233
2234    fn try_recover_transaction_fork(
2235        checkpoint_store: &CheckpointStore,
2236        recovery: &ForkRecoveryConfig,
2237    ) -> Result<()> {
2238        if recovery.transaction_overrides.is_empty() {
2239            return Ok(());
2240        }
2241
2242        if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2243            && recovery
2244                .transaction_overrides
2245                .contains_key(&tx_digest.to_string())
2246        {
2247            info!(
2248                "Fork recovery enabled: clearing transaction fork for tx {:?}",
2249                tx_digest
2250            );
2251            checkpoint_store
2252                .clear_transaction_fork_detected()
2253                .expect("Failed to clear transaction fork detected marker");
2254        }
2255        Ok(())
2256    }
2257
2258    fn get_current_timestamp() -> u64 {
2259        std::time::SystemTime::now()
2260            .duration_since(std::time::SystemTime::UNIX_EPOCH)
2261            .unwrap()
2262            .as_secs()
2263    }
2264
2265    async fn handle_checkpoint_fork(
2266        checkpoint_seq: u64,
2267        checkpoint_digest: CheckpointDigest,
2268        checkpoint_metrics: &CheckpointMetrics,
2269        fork_recovery: Option<&ForkRecoveryConfig>,
2270    ) -> Result<()> {
2271        checkpoint_metrics
2272            .checkpoint_fork_crash_mode
2273            .with_label_values(&[
2274                &checkpoint_seq.to_string(),
2275                &Self::get_digest_prefix(checkpoint_digest),
2276                &Self::get_current_timestamp().to_string(),
2277            ])
2278            .set(1);
2279
2280        let behavior = fork_recovery
2281            .map(|fr| fr.fork_crash_behavior)
2282            .unwrap_or_default();
2283
2284        match behavior {
2285            ForkCrashBehavior::AwaitForkRecovery => {
2286                error!(
2287                    checkpoint_seq = checkpoint_seq,
2288                    checkpoint_digest = ?checkpoint_digest,
2289                    "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2290                );
2291                futures::future::pending::<()>().await;
2292                unreachable!("pending() should never return");
2293            }
2294            ForkCrashBehavior::ReturnError => {
2295                error!(
2296                    checkpoint_seq = checkpoint_seq,
2297                    checkpoint_digest = ?checkpoint_digest,
2298                    "Checkpoint fork detected! Returning error."
2299                );
2300                Err(anyhow::anyhow!(
2301                    "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2302                    checkpoint_seq,
2303                    checkpoint_digest
2304                ))
2305            }
2306        }
2307    }
2308
2309    async fn handle_transaction_fork(
2310        tx_digest: TransactionDigest,
2311        expected_effects_digest: TransactionEffectsDigest,
2312        actual_effects_digest: TransactionEffectsDigest,
2313        checkpoint_metrics: &CheckpointMetrics,
2314        fork_recovery: Option<&ForkRecoveryConfig>,
2315    ) -> Result<()> {
2316        checkpoint_metrics
2317            .transaction_fork_crash_mode
2318            .with_label_values(&[
2319                &Self::get_digest_prefix(tx_digest),
2320                &Self::get_digest_prefix(expected_effects_digest),
2321                &Self::get_digest_prefix(actual_effects_digest),
2322                &Self::get_current_timestamp().to_string(),
2323            ])
2324            .set(1);
2325
2326        let behavior = fork_recovery
2327            .map(|fr| fr.fork_crash_behavior)
2328            .unwrap_or_default();
2329
2330        match behavior {
2331            ForkCrashBehavior::AwaitForkRecovery => {
2332                error!(
2333                    tx_digest = ?tx_digest,
2334                    expected_effects_digest = ?expected_effects_digest,
2335                    actual_effects_digest = ?actual_effects_digest,
2336                    "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2337                );
2338                futures::future::pending::<()>().await;
2339                unreachable!("pending() should never return");
2340            }
2341            ForkCrashBehavior::ReturnError => {
2342                error!(
2343                    tx_digest = ?tx_digest,
2344                    expected_effects_digest = ?expected_effects_digest,
2345                    actual_effects_digest = ?actual_effects_digest,
2346                    "Transaction fork detected! Returning error."
2347                );
2348                Err(anyhow::anyhow!(
2349                    "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2350                    tx_digest,
2351                    expected_effects_digest,
2352                    actual_effects_digest
2353                ))
2354            }
2355        }
2356    }
2357}
2358
2359#[cfg(not(msim))]
2360impl SuiNode {
2361    async fn fetch_jwks(
2362        _authority: AuthorityName,
2363        provider: &OIDCProvider,
2364    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2365        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2366        use sui_types::error::SuiErrorKind;
2367        let client = reqwest::Client::new();
2368        fetch_jwks(provider, &client, true)
2369            .await
2370            .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2371    }
2372}
2373
2374#[cfg(msim)]
2375impl SuiNode {
2376    pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2377        self.sim_state.sim_node.id()
2378    }
2379
2380    pub fn set_safe_mode_expected(&self, new_value: bool) {
2381        info!("Setting safe mode expected to {}", new_value);
2382        self.sim_state
2383            .sim_safe_mode_expected
2384            .store(new_value, Ordering::Relaxed);
2385    }
2386
2387    #[allow(unused_variables)]
2388    async fn fetch_jwks(
2389        authority: AuthorityName,
2390        provider: &OIDCProvider,
2391    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2392        get_jwk_injector()(authority, provider)
2393    }
2394}
2395
2396enum SpawnOnce {
2397    // Mutex is only needed to make SpawnOnce Send
2398    Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2399    #[allow(unused)]
2400    Started(JoinHandle<()>),
2401}
2402
2403impl SpawnOnce {
2404    pub fn new(
2405        ready_rx: oneshot::Receiver<()>,
2406        future: impl Future<Output = ()> + Send + 'static,
2407    ) -> Self {
2408        Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2409    }
2410
2411    pub async fn start(self) -> Self {
2412        match self {
2413            Self::Unstarted(ready_rx, future) => {
2414                let future = future.into_inner();
2415                let handle = tokio::spawn(future);
2416                ready_rx.await.unwrap();
2417                Self::Started(handle)
2418            }
2419            Self::Started(_) => self,
2420        }
2421    }
2422}
2423
2424/// Updates trusted peer addresses in the p2p network (for nodes configured as validators).
2425/// When `prev_epoch_start_state` is provided, validators that are no longer in the committee
2426/// have their Chain addresses cleared.
2427fn update_peer_addresses(
2428    config: &NodeConfig,
2429    endpoint_manager: &EndpointManager,
2430    epoch_start_state: &EpochStartSystemState,
2431    prev_epoch_start_state: Option<&EpochStartSystemState>,
2432) {
2433    if config.consensus_config().is_none() {
2434        return;
2435    }
2436    let new_peers: HashSet<PeerId> = epoch_start_state
2437        .get_validator_as_p2p_peers(config.protocol_public_key())
2438        .into_iter()
2439        .map(|(peer_id, address)| {
2440            endpoint_manager
2441                .update_endpoint(
2442                    EndpointId::P2p(peer_id),
2443                    AddressSource::Chain,
2444                    vec![address],
2445                )
2446                .expect("Updating peer addresses should not fail");
2447            peer_id
2448        })
2449        .collect();
2450
2451    // Clear Chain addresses for validators that left the committee.
2452    if let Some(prev) = prev_epoch_start_state {
2453        for (peer_id, _) in prev.get_validator_as_p2p_peers(config.protocol_public_key()) {
2454            if !new_peers.contains(&peer_id) {
2455                endpoint_manager
2456                    .update_endpoint(EndpointId::P2p(peer_id), AddressSource::Chain, vec![])
2457                    .expect("Clearing peer addresses should not fail");
2458            }
2459        }
2460    }
2461}
2462
2463fn build_kv_store(
2464    state: &Arc<AuthorityState>,
2465    config: &NodeConfig,
2466    registry: &Registry,
2467) -> Result<Arc<TransactionKeyValueStore>> {
2468    let metrics = KeyValueStoreMetrics::new(registry);
2469    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2470
2471    let base_url = &config.transaction_kv_store_read_config.base_url;
2472
2473    if base_url.is_empty() {
2474        info!("no http kv store url provided, using local db only");
2475        return Ok(Arc::new(db_store));
2476    }
2477
2478    let base_url: url::Url = base_url.parse().tap_err(|e| {
2479        error!(
2480            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2481            base_url, e
2482        )
2483    })?;
2484
2485    let network_str = match state.get_chain_identifier().chain() {
2486        Chain::Mainnet => "/mainnet",
2487        _ => {
2488            info!("using local db only for kv store");
2489            return Ok(Arc::new(db_store));
2490        }
2491    };
2492
2493    let base_url = base_url.join(network_str)?.to_string();
2494    let http_store = HttpKVStore::new_kv(
2495        &base_url,
2496        config.transaction_kv_store_read_config.cache_size,
2497        metrics.clone(),
2498    )?;
2499    info!("using local key-value store with fallback to http key-value store");
2500    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2501        db_store,
2502        http_store,
2503        metrics,
2504        "json_rpc_fallback",
2505    )))
2506}
2507
2508async fn build_json_rpc_router(
2509    state: &Arc<AuthorityState>,
2510    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2511    config: &NodeConfig,
2512    prometheus_registry: &Registry,
2513) -> Result<axum::Router> {
2514    let traffic_controller = state.traffic_controller.clone();
2515    let mut server = JsonRpcServerBuilder::new(
2516        env!("CARGO_PKG_VERSION"),
2517        prometheus_registry,
2518        traffic_controller,
2519        config.policy_config.clone(),
2520    );
2521
2522    let kv_store = build_kv_store(state, config, prometheus_registry)?;
2523
2524    let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2525    server.register_module(ReadApi::new(
2526        state.clone(),
2527        kv_store.clone(),
2528        metrics.clone(),
2529    ))?;
2530    server.register_module(CoinReadApi::new(
2531        state.clone(),
2532        kv_store.clone(),
2533        metrics.clone(),
2534    ))?;
2535
2536    // if run_with_range is enabled we want to prevent any transactions
2537    // run_with_range = None is normal operating conditions
2538    if config.run_with_range.is_none() {
2539        server.register_module(TransactionBuilderApi::new(state.clone()))?;
2540    }
2541    server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2542    server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2543
2544    if let Some(transaction_orchestrator) = transaction_orchestrator {
2545        server.register_module(TransactionExecutionApi::new(
2546            state.clone(),
2547            transaction_orchestrator.clone(),
2548            metrics.clone(),
2549        ))?;
2550    }
2551
2552    let name_service_config = if let (
2553        Some(package_address),
2554        Some(registry_id),
2555        Some(reverse_registry_id),
2556    ) = (
2557        config.name_service_package_address,
2558        config.name_service_registry_id,
2559        config.name_service_reverse_registry_id,
2560    ) {
2561        sui_name_service::NameServiceConfig::new(package_address, registry_id, reverse_registry_id)
2562    } else {
2563        match state.get_chain_identifier().chain() {
2564            Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2565            Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2566            Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2567        }
2568    };
2569
2570    server.register_module(IndexerApi::new(
2571        state.clone(),
2572        ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2573        kv_store,
2574        name_service_config,
2575        metrics,
2576        config.indexer_max_subscriptions,
2577    ))?;
2578    server.register_module(MoveUtils::new(state.clone()))?;
2579
2580    let server_type = config.jsonrpc_server_type();
2581
2582    Ok(server.to_router(server_type).await?)
2583}
2584
2585async fn build_http_servers(
2586    state: Arc<AuthorityState>,
2587    store: RocksDbStore,
2588    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2589    config: &NodeConfig,
2590    prometheus_registry: &Registry,
2591    server_version: ServerVersion,
2592    node_role: NodeRole,
2593) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2594    // Validators do not expose these APIs
2595    if !node_role.should_run_rpc_servers() {
2596        return Ok((HttpServers::default(), None));
2597    }
2598
2599    info!("starting rpc service with config: {:?}", config.rpc);
2600
2601    let mut router = axum::Router::new();
2602
2603    // The JSON-RPC service can be disabled independently of the gRPC/REST
2604    // service and of JSON-RPC indexing, so that a node can keep indexing
2605    // without exposing the JSON-RPC endpoints.
2606    if config.json_rpc_enabled() {
2607        router = router.merge(
2608            build_json_rpc_router(
2609                &state,
2610                transaction_orchestrator,
2611                config,
2612                prometheus_registry,
2613            )
2614            .await?,
2615        );
2616    } else {
2617        info!("json-rpc service is disabled");
2618    }
2619
2620    let (subscription_service_checkpoint_sender, subscription_service_handle) =
2621        SubscriptionService::build(prometheus_registry);
2622    let rpc_router = {
2623        let mut rpc_service =
2624            sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2625        rpc_service.with_server_version(server_version);
2626
2627        if let Some(config) = config.rpc.clone() {
2628            config.validate()?;
2629            rpc_service.with_config(config);
2630        }
2631
2632        rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2633        rpc_service.with_subscription_service(subscription_service_handle);
2634
2635        if let Some(transaction_orchestrator) = transaction_orchestrator {
2636            rpc_service.with_executor(transaction_orchestrator.clone())
2637        }
2638
2639        rpc_service.into_router().await
2640    };
2641
2642    let layers = ServiceBuilder::new()
2643        .map_request(|mut request: axum::http::Request<_>| {
2644            if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2645                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2646                request.extensions_mut().insert(axum_connect_info);
2647            }
2648            request
2649        })
2650        .layer(axum::middleware::from_fn(server_timing_middleware))
2651        // Setup a permissive CORS policy
2652        .layer(
2653            tower_http::cors::CorsLayer::new()
2654                .allow_methods([http::Method::GET, http::Method::POST])
2655                .allow_origin(tower_http::cors::Any)
2656                .allow_headers(tower_http::cors::Any)
2657                .expose_headers(tower_http::cors::Any),
2658        );
2659
2660    router = router.merge(rpc_router).layer(layers);
2661
2662    let https = if let Some((tls_config, https_address)) = config
2663        .rpc()
2664        .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2665    {
2666        let https = sui_http::Builder::new()
2667            .tls_single_cert(tls_config.cert(), tls_config.key())
2668            .and_then(|builder| builder.serve(https_address, router.clone()))
2669            .map_err(|e| anyhow::anyhow!(e))?;
2670
2671        info!(
2672            https_address =? https.local_addr(),
2673            "HTTPS rpc server listening on {}",
2674            https.local_addr()
2675        );
2676
2677        Some(https)
2678    } else {
2679        None
2680    };
2681
2682    let http = sui_http::Builder::new()
2683        .serve(&config.json_rpc_address, router)
2684        .map_err(|e| anyhow::anyhow!(e))?;
2685
2686    info!(
2687        http_address =? http.local_addr(),
2688        "HTTP rpc server listening on {}",
2689        http.local_addr()
2690    );
2691
2692    Ok((
2693        HttpServers {
2694            http: Some(http),
2695            https,
2696        },
2697        Some(subscription_service_checkpoint_sender),
2698    ))
2699}
2700
2701#[derive(Default)]
2702struct HttpServers {
2703    #[allow(unused)]
2704    http: Option<sui_http::ServerHandle>,
2705    #[allow(unused)]
2706    https: Option<sui_http::ServerHandle>,
2707}
2708
2709#[cfg(test)]
2710mod tests {
2711    use super::*;
2712    use prometheus::Registry;
2713    use std::collections::BTreeMap;
2714    use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2715    use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2716    use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2717
2718    #[tokio::test]
2719    async fn test_fork_error_and_recovery_both_paths() {
2720        let checkpoint_store = CheckpointStore::new_for_tests();
2721        let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2722
2723        // ---------- Checkpoint fork path ----------
2724        let seq_num = 42;
2725        let digest = CheckpointDigest::random();
2726        checkpoint_store
2727            .record_checkpoint_fork_detected(seq_num, digest)
2728            .unwrap();
2729
2730        let fork_recovery = ForkRecoveryConfig {
2731            transaction_overrides: Default::default(),
2732            checkpoint_overrides: Default::default(),
2733            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2734        };
2735
2736        let r = SuiNode::check_and_recover_forks(
2737            &checkpoint_store,
2738            &checkpoint_metrics,
2739            Some(&fork_recovery),
2740        )
2741        .await;
2742        assert!(r.is_err());
2743        assert!(
2744            r.unwrap_err()
2745                .to_string()
2746                .contains("Checkpoint fork detected")
2747        );
2748
2749        let mut checkpoint_overrides = BTreeMap::new();
2750        checkpoint_overrides.insert(seq_num, digest.to_string());
2751        let fork_recovery_with_override = ForkRecoveryConfig {
2752            transaction_overrides: Default::default(),
2753            checkpoint_overrides,
2754            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2755        };
2756        let r = SuiNode::check_and_recover_forks(
2757            &checkpoint_store,
2758            &checkpoint_metrics,
2759            Some(&fork_recovery_with_override),
2760        )
2761        .await;
2762        assert!(r.is_ok());
2763        assert!(
2764            checkpoint_store
2765                .get_checkpoint_fork_detected()
2766                .unwrap()
2767                .is_none()
2768        );
2769
2770        // ---------- Transaction fork path ----------
2771        let tx_digest = TransactionDigest::random();
2772        let expected_effects = TransactionEffectsDigest::random();
2773        let actual_effects = TransactionEffectsDigest::random();
2774        checkpoint_store
2775            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2776            .unwrap();
2777
2778        let fork_recovery = ForkRecoveryConfig {
2779            transaction_overrides: Default::default(),
2780            checkpoint_overrides: Default::default(),
2781            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2782        };
2783        let r = SuiNode::check_and_recover_forks(
2784            &checkpoint_store,
2785            &checkpoint_metrics,
2786            Some(&fork_recovery),
2787        )
2788        .await;
2789        assert!(r.is_err());
2790        assert!(
2791            r.unwrap_err()
2792                .to_string()
2793                .contains("Transaction fork detected")
2794        );
2795
2796        let mut transaction_overrides = BTreeMap::new();
2797        transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2798        let fork_recovery_with_override = ForkRecoveryConfig {
2799            transaction_overrides,
2800            checkpoint_overrides: Default::default(),
2801            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2802        };
2803        let r = SuiNode::check_and_recover_forks(
2804            &checkpoint_store,
2805            &checkpoint_metrics,
2806            Some(&fork_recovery_with_override),
2807        )
2808        .await;
2809        assert!(r.is_ok());
2810        assert!(
2811            checkpoint_store
2812                .get_transaction_fork_detected()
2813                .unwrap()
2814                .is_none()
2815        );
2816    }
2817}