sui_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::admission_queue::{
29    AdmissionQueueContext, AdmissionQueueManager, AdmissionQueueMetrics,
30};
31use sui_core::authority::ExecutionEnv;
32use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
33use sui_core::authority::backpressure::BackpressureManager;
34use sui_core::authority::epoch_start_configuration::EpochFlag;
35use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
36use sui_core::consensus_adapter::ConsensusClient;
37use sui_core::consensus_manager::UpdatableConsensusClient;
38use sui_core::epoch::randomness::RandomnessManager;
39use sui_core::execution_cache::build_execution_cache;
40use sui_core::randomness_round_receiver::{RandomnessRoundReceiver, RandomnessRoundReceiverHandle};
41use sui_network::endpoint_manager::{AddressSource, EndpointId};
42use sui_network::validator::server::SUI_TLS_SERVER_NAME;
43use sui_types::full_checkpoint_content::Checkpoint;
44use sui_types::node_role::NodeRole;
45
46use sui_core::global_state_hasher::GlobalStateHashMetrics;
47use sui_core::storage::RestReadStore;
48use sui_json_rpc::bridge_api::BridgeReadApi;
49use sui_json_rpc_api::JsonRpcMetrics;
50use sui_network::randomness;
51use sui_rpc_api::RpcMetrics;
52use sui_rpc_api::ServerVersion;
53use sui_rpc_api::subscription::SubscriptionService;
54use sui_types::base_types::ConciseableName;
55use sui_types::crypto::RandomnessRound;
56use sui_types::digests::{
57    ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
58};
59use sui_types::messages_consensus::AuthorityCapabilitiesV2;
60use sui_types::sui_system_state::SuiSystemState;
61use tap::tap::TapFallible;
62use tokio::sync::oneshot;
63use tokio::sync::{Mutex, broadcast, mpsc};
64use tokio::task::JoinHandle;
65use tower::ServiceBuilder;
66use tracing::{Instrument, error_span, info};
67use tracing::{debug, error, warn};
68
69// Logs at debug level in test configuration, info level otherwise.
70// JWK logs cause significant volume in tests, but are insignificant in prod,
71// so we keep them at info
72macro_rules! jwk_log {
73    ($($arg:tt)+) => {
74        if in_test_configuration() {
75            debug!($($arg)+);
76        } else {
77            info!($($arg)+);
78        }
79    };
80}
81
82use fastcrypto_zkp::bn254::zk_login::JWK;
83pub use handle::SuiNodeHandle;
84use mysten_metrics::{RegistryService, spawn_monitored_task};
85use mysten_service::server_timing::server_timing_middleware;
86use sui_config::node::{DBCheckpointConfig, RunWithRange};
87use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
88use sui_config::node_config_metrics::NodeConfigMetrics;
89use sui_config::{ConsensusConfig, NodeConfig};
90use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
91use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
92use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
93use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
94use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
95use sui_core::authority_aggregator::AuthorityAggregator;
96use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
97use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
98use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
99use sui_core::checkpoints::{
100    CheckpointMetrics, CheckpointOutput, CheckpointService, CheckpointStore, LogCheckpointOutput,
101    SendCheckpointToStateSync, SubmitCheckpointToConsensus,
102};
103use sui_core::consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics};
104use sui_core::consensus_manager::ConsensusManager;
105use sui_core::consensus_throughput_calculator::ConsensusThroughputCalculator;
106use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
107use sui_core::db_checkpoint_handler::DBCheckpointHandler;
108use sui_core::epoch::committee_store::CommitteeStore;
109use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
110use sui_core::epoch::epoch_metrics::EpochMetrics;
111use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
112use sui_core::global_state_hasher::GlobalStateHasher;
113use sui_core::jsonrpc_index::IndexStore;
114use sui_core::module_cache_metrics::ResolverMetrics;
115use sui_core::overload_monitor::overload_monitor;
116use sui_core::rpc_index::RpcIndexStore;
117use sui_core::rpc_store_embed::EmbeddedRpcStore;
118use sui_core::signature_verifier::SignatureVerifierMetrics;
119use sui_core::storage::RocksDbStore;
120use sui_core::storage::RpcStoreReadStore;
121use sui_core::transaction_orchestrator::TransactionOrchestrator;
122use sui_core::{
123    authority::{AuthorityState, AuthorityStore},
124    authority_client::NetworkAuthorityClient,
125};
126use sui_json_rpc::JsonRpcServerBuilder;
127use sui_json_rpc::coin_api::CoinReadApi;
128use sui_json_rpc::governance_api::GovernanceReadApi;
129use sui_json_rpc::indexer_api::IndexerApi;
130use sui_json_rpc::move_utils::MoveUtils;
131use sui_json_rpc::read_api::ReadApi;
132use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
133use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
134use sui_macros::fail_point;
135use sui_macros::{fail_point_async, replay_log};
136use sui_network::api::ValidatorServer;
137use sui_network::discovery;
138use sui_network::endpoint_manager::EndpointManager;
139use sui_network::state_sync;
140use sui_network::validator::server::ServerBuilder;
141use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
142use sui_snapshot::uploader::StateSnapshotUploader;
143use sui_storage::{
144    http_key_value_store::HttpKVStore,
145    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
146    key_value_store_metrics::KeyValueStoreMetrics,
147};
148use sui_types::base_types::{AuthorityName, EpochId};
149use sui_types::committee::Committee;
150use sui_types::crypto::KeypairTraits;
151use sui_types::error::{SuiError, SuiResult};
152use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
153use sui_types::storage::RpcStateReader;
154use sui_types::sui_system_state::SuiSystemStateTrait;
155use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
156use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
157use sui_types::supported_protocol_versions::SupportedProtocolVersions;
158use typed_store::DBMetrics;
159use typed_store::rocks::default_db_options;
160
161use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
162
163pub mod admin;
164pub mod db_shell;
165mod handle;
166pub mod metrics;
167
168pub struct ValidatorComponents {
169    validator_server_handle: Option<SpawnOnce>,
170    validator_overload_monitor_handle: Option<JoinHandle<()>>,
171    consensus_manager: Arc<ConsensusManager>,
172    consensus_store_pruner: ConsensusStorePruner,
173    consensus_adapter: Arc<ConsensusAdapter>,
174    checkpoint_metrics: Arc<CheckpointMetrics>,
175    sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
176    admission_queue: Option<AdmissionQueueContext>,
177}
178pub struct P2pComponents {
179    p2p_network: Network,
180    known_peers: HashMap<PeerId, String>,
181    discovery_handle: discovery::Handle,
182    state_sync_handle: state_sync::Handle,
183    randomness_handle: randomness::Handle,
184    endpoint_manager: EndpointManager,
185}
186
187#[cfg(msim)]
188mod simulator {
189    use std::sync::atomic::AtomicBool;
190    use sui_types::error::SuiErrorKind;
191
192    use super::*;
193    pub(super) struct SimState {
194        pub sim_node: sui_simulator::runtime::NodeHandle,
195        pub sim_safe_mode_expected: AtomicBool,
196        _leak_detector: sui_simulator::NodeLeakDetector,
197    }
198
199    impl Default for SimState {
200        fn default() -> Self {
201            Self {
202                sim_node: sui_simulator::runtime::NodeHandle::current(),
203                sim_safe_mode_expected: AtomicBool::new(false),
204                _leak_detector: sui_simulator::NodeLeakDetector::new(),
205            }
206        }
207    }
208
209    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
210        + Send
211        + Sync
212        + 'static;
213
214    fn default_fetch_jwks(
215        _authority: AuthorityName,
216        _provider: &OIDCProvider,
217    ) -> SuiResult<Vec<(JwkId, JWK)>> {
218        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
219        // Just load a default Twitch jwk for testing.
220        parse_jwks(
221            sui_types::zk_login_util::DEFAULT_JWK_BYTES,
222            &OIDCProvider::Twitch,
223            true,
224        )
225        .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
226    }
227
228    thread_local! {
229        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
230    }
231
232    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
233        JWK_INJECTOR.with(|injector| injector.borrow().clone())
234    }
235
236    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
237        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
238    }
239}
240
241#[cfg(msim)]
242pub use simulator::set_jwk_injector;
243#[cfg(msim)]
244use simulator::*;
245use sui_core::authority::authority_store_pruner::PrunerWatermarks;
246use sui_core::{
247    consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
248};
249
250const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
251
252pub struct SuiNode {
253    config: NodeConfig,
254    validator_components: Mutex<Option<ValidatorComponents>>,
255
256    /// The http servers responsible for serving RPC traffic (gRPC and JSON-RPC)
257    #[allow(unused)]
258    http_servers: HttpServers,
259
260    state: Arc<AuthorityState>,
261    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
262    registry_service: RegistryService,
263    metrics: Arc<SuiNodeMetrics>,
264    checkpoint_metrics: Arc<CheckpointMetrics>,
265
266    _discovery: discovery::Handle,
267    _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
268    state_sync_handle: state_sync::Handle,
269    randomness_handle: randomness::Handle,
270    checkpoint_store: Arc<CheckpointStore>,
271    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
272
273    /// Broadcast channel to send the starting system state for the next epoch.
274    end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
275
276    /// EndpointManager for updating peer network addresses.
277    endpoint_manager: EndpointManager,
278
279    backpressure_manager: Arc<BackpressureManager>,
280
281    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
282
283    #[cfg(msim)]
284    sim_state: SimState,
285
286    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
287    // Channel to allow signaling upstream to shutdown sui-node
288    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
289
290    /// Handle shared with RandomnessManager and the consensus layer.
291    randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
292
293    /// AuthorityAggregator of the network, created at start and beginning of each epoch.
294    /// Use ArcSwap so that we could mutate it without taking mut reference.
295    // TODO: Eventually we can make this auth aggregator a shared reference so that this
296    // update will automatically propagate to other uses.
297    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
298
299    subscription_service_checkpoint_sender: Option<tokio::sync::broadcast::Sender<Arc<Checkpoint>>>,
300
301    /// The embedded `sui-rpc-store`, present when the node is configured
302    /// with `use_experimental_rpc_store`. Held for the node's lifetime
303    /// so its tip indexer keeps running (dropping it aborts the indexer).
304    /// Exposed through [`SuiNode::embedded_rpc_store`] for introspection.
305    embedded_rpc_store: Option<EmbeddedRpcStore>,
306}
307
308impl fmt::Debug for SuiNode {
309    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
310        f.debug_struct("SuiNode")
311            .field("name", &self.state.name.concise())
312            .finish()
313    }
314}
315
316static MAX_JWK_KEYS_PER_FETCH: usize = 100;
317
318impl SuiNode {
319    pub async fn start(
320        config: NodeConfig,
321        registry_service: RegistryService,
322    ) -> Result<Arc<SuiNode>> {
323        Self::start_async(
324            config,
325            registry_service,
326            ServerVersion::new("sui-node", "unknown"),
327        )
328        .await
329    }
330
331    fn start_jwk_updater(
332        config: &NodeConfig,
333        metrics: Arc<SuiNodeMetrics>,
334        authority: AuthorityName,
335        epoch_store: Arc<AuthorityPerEpochStore>,
336        consensus_adapter: Arc<ConsensusAdapter>,
337    ) {
338        let epoch = epoch_store.epoch();
339
340        let supported_providers = config
341            .zklogin_oauth_providers
342            .get(&epoch_store.get_chain_identifier().chain())
343            .unwrap_or(&BTreeSet::new())
344            .iter()
345            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
346            .collect::<Vec<_>>();
347
348        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
349
350        info!(
351            ?fetch_interval,
352            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
353        );
354
355        fn validate_jwk(
356            metrics: &Arc<SuiNodeMetrics>,
357            provider: &OIDCProvider,
358            id: &JwkId,
359            jwk: &JWK,
360        ) -> bool {
361            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
362                warn!(
363                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
364                    id.iss, provider
365                );
366                metrics
367                    .invalid_jwks
368                    .with_label_values(&[&provider.to_string()])
369                    .inc();
370                return false;
371            };
372
373            if iss_provider != *provider {
374                warn!(
375                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
376                    id.iss, provider, iss_provider
377                );
378                metrics
379                    .invalid_jwks
380                    .with_label_values(&[&provider.to_string()])
381                    .inc();
382                return false;
383            }
384
385            if !check_total_jwk_size(id, jwk) {
386                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
387                metrics
388                    .invalid_jwks
389                    .with_label_values(&[&provider.to_string()])
390                    .inc();
391                return false;
392            }
393
394            true
395        }
396
397        // metrics is:
398        //  pub struct SuiNodeMetrics {
399        //      pub jwk_requests: IntCounterVec,
400        //      pub jwk_request_errors: IntCounterVec,
401        //      pub total_jwks: IntCounterVec,
402        //      pub unique_jwks: IntCounterVec,
403        //  }
404
405        for p in supported_providers.into_iter() {
406            let provider_str = p.to_string();
407            let epoch_store = epoch_store.clone();
408            let consensus_adapter = consensus_adapter.clone();
409            let metrics = metrics.clone();
410            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
411                async move {
412                    // note: restart-safe de-duplication happens after consensus, this is
413                    // just best-effort to reduce unneeded submissions.
414                    let mut seen = HashSet::new();
415                    loop {
416                        jwk_log!("fetching JWK for provider {:?}", p);
417                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
418                        match Self::fetch_jwks(authority, &p).await {
419                            Err(e) => {
420                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
421                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
422                                // Retry in 30 seconds
423                                tokio::time::sleep(Duration::from_secs(30)).await;
424                                continue;
425                            }
426                            Ok(mut keys) => {
427                                metrics.total_jwks
428                                    .with_label_values(&[&provider_str])
429                                    .inc_by(keys.len() as u64);
430
431                                keys.retain(|(id, jwk)| {
432                                    validate_jwk(&metrics, &p, id, jwk) &&
433                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
434                                    seen.insert((id.clone(), jwk.clone()))
435                                });
436
437                                metrics.unique_jwks
438                                    .with_label_values(&[&provider_str])
439                                    .inc_by(keys.len() as u64);
440
441                                // prevent oauth providers from sending too many keys,
442                                // inadvertently or otherwise
443                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
444                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
445                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
446                                }
447
448                                for (id, jwk) in keys.into_iter() {
449                                    jwk_log!("Submitting JWK to consensus: {:?}", id);
450
451                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
452                                    consensus_adapter.submit(txn, None, &epoch_store, None, None)
453                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
454                                        .ok();
455                                }
456                            }
457                        }
458                        tokio::time::sleep(fetch_interval).await;
459                    }
460                }
461                .instrument(error_span!("jwk_updater_task", epoch)),
462            ));
463        }
464    }
465
466    pub async fn start_async(
467        config: NodeConfig,
468        registry_service: RegistryService,
469        server_version: ServerVersion,
470    ) -> Result<Arc<SuiNode>> {
471        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
472        let mut config = config.clone();
473        if config.supported_protocol_versions.is_none() {
474            info!(
475                "populating config.supported_protocol_versions with default {:?}",
476                SupportedProtocolVersions::SYSTEM_DEFAULT
477            );
478            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
479        }
480
481        let run_with_range = config.run_with_range;
482        let prometheus_registry = registry_service.default_registry();
483        let node_role = config.intended_node_role();
484
485        info!(node =? config.protocol_public_key(),
486            "Initializing sui-node listening on {} with role {:?}", config.network_address, node_role
487        );
488
489        // Initialize metrics to track db usage before creating any stores
490        DBMetrics::init(registry_service.clone());
491
492        // Initialize db sync-to-disk setting from config (falls back to env var if not set)
493        typed_store::init_write_sync(config.enable_db_sync_to_disk);
494
495        // Initialize Mysten metrics.
496        mysten_metrics::init_metrics(&prometheus_registry);
497        // Unsupported (because of the use of static variable) and unnecessary in simtests.
498        #[cfg(not(msim))]
499        mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
500
501        let genesis = config.genesis()?.clone();
502
503        let secret = Arc::pin(config.protocol_key_pair().copy());
504        let genesis_committee = genesis.committee();
505        let committee_store = Arc::new(CommitteeStore::new(
506            config.db_path().join("epochs"),
507            &genesis_committee,
508            None,
509        ));
510
511        let pruner_watermarks = Arc::new(PrunerWatermarks::default());
512        let checkpoint_store = CheckpointStore::new(
513            &config.db_path().join("checkpoints"),
514            pruner_watermarks.clone(),
515        );
516        let checkpoint_metrics = CheckpointMetrics::new(&registry_service.default_registry());
517
518        if node_role.runs_consensus() {
519            Self::check_and_recover_forks(
520                &checkpoint_store,
521                &checkpoint_metrics,
522                config.fork_recovery.as_ref(),
523            )
524            .await?;
525        }
526
527        // By default, only enable write stall on nodes that run consensus.
528        let enable_write_stall = config
529            .enable_db_write_stall
530            .unwrap_or(node_role.runs_consensus());
531        // The tidehunter objects compactor retains only the latest version per
532        // ObjectID and is mutually exclusive with the object pruner. Enable it
533        // for validators (which always disable the pruner), and also for any
534        // node configured with `num_epochs_to_retain = 0` — that aggressive
535        // setting is what the compactor replaces. The pruner is force-disabled
536        // in `AuthorityStorePruner::new` whenever this is true.
537        let enable_objects_compactor = node_role.is_validator()
538            || config.authority_store_pruning_config.num_epochs_to_retain == 0;
539        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
540            enable_write_stall,
541            enable_objects_compactor,
542        };
543        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
544            &config.db_store_path(),
545            Some(perpetual_tables_options),
546            Some(pruner_watermarks.epoch_id.clone()),
547        ));
548        let is_genesis = perpetual_tables
549            .database_is_empty()
550            .expect("Database read should not fail at init.");
551
552        let backpressure_manager =
553            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
554
555        let store =
556            AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
557
558        let cur_epoch = store.get_recovery_epoch_at_restart()?;
559        let committee = committee_store
560            .get_committee(&cur_epoch)?
561            .expect("Committee of the current epoch must exist");
562        let epoch_start_configuration = store
563            .get_epoch_start_configuration()?
564            .expect("EpochStartConfiguration of the current epoch must exist");
565        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
566        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
567
568        let cache_traits = build_execution_cache(
569            &config.execution_cache,
570            &prometheus_registry,
571            &store,
572            backpressure_manager.clone(),
573        );
574
575        let auth_agg = {
576            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
577            Arc::new(ArcSwap::new(Arc::new(
578                AuthorityAggregator::new_from_epoch_start_state(
579                    epoch_start_configuration.epoch_start_state(),
580                    &committee_store,
581                    safe_client_metrics_base,
582                ),
583            )))
584        };
585
586        let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
587        let chain = match config.chain_override_for_testing {
588            Some(chain) => chain,
589            None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
590        };
591
592        let highest_executed_checkpoint = checkpoint_store
593            .get_highest_executed_checkpoint_seq_number()
594            .expect("checkpoint store read cannot fail")
595            .unwrap_or(0);
596
597        let previous_epoch_last_checkpoint = if cur_epoch == 0 {
598            0
599        } else {
600            checkpoint_store
601                .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
602                .expect("checkpoint store read cannot fail")
603                .unwrap_or(highest_executed_checkpoint)
604        };
605
606        let epoch_options = default_db_options().optimize_db_for_write_throughput(4, false);
607        let epoch_store = AuthorityPerEpochStore::new(
608            config.protocol_public_key(),
609            committee.clone(),
610            &config.db_store_path(),
611            Some(epoch_options.options),
612            EpochMetrics::new(&registry_service.default_registry()),
613            epoch_start_configuration,
614            cache_traits.backing_package_store.clone(),
615            cache_traits.object_store.clone(),
616            cache_metrics,
617            signature_verifier_metrics,
618            &config.expensive_safety_check_config,
619            (chain_id, chain),
620            highest_executed_checkpoint,
621            previous_epoch_last_checkpoint,
622            Arc::new(SubmittedTransactionCacheMetrics::new(
623                &registry_service.default_registry(),
624            )),
625            config.fullnode_sync_mode,
626        )?;
627
628        info!("created epoch store");
629
630        replay_log!(
631            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
632            epoch_store.epoch(),
633            epoch_store.protocol_config()
634        );
635
636        // the database is empty at genesis time
637        if is_genesis {
638            info!("checking SUI conservation at genesis");
639            // When we are opening the db table, the only time when it's safe to
640            // check SUI conservation is at genesis. Otherwise we may be in the middle of
641            // an epoch and the SUI conservation check will fail. This also initialize
642            // the expected_network_sui_amount table.
643            cache_traits
644                .reconfig_api
645                .expensive_check_sui_conservation(&epoch_store)
646                .expect("SUI conservation check cannot fail at genesis");
647        }
648
649        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
650        let default_buffer_stake = epoch_store
651            .protocol_config()
652            .buffer_stake_for_protocol_upgrade_bps();
653        if effective_buffer_stake != default_buffer_stake {
654            warn!(
655                ?effective_buffer_stake,
656                ?default_buffer_stake,
657                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
658            );
659        }
660
661        checkpoint_store.insert_genesis_checkpoint(
662            genesis.checkpoint(),
663            genesis.checkpoint_contents().clone(),
664            &epoch_store,
665        );
666
667        info!("creating state sync store");
668        let state_sync_store = RocksDbStore::new(
669            cache_traits.clone(),
670            committee_store.clone(),
671            checkpoint_store.clone(),
672        );
673
674        let index_store = if node_role.is_fullnode() && config.enable_index_processing {
675            info!("creating jsonrpc index store");
676            Some(Arc::new(IndexStore::new(
677                config.db_path().join("indexes"),
678                &prometheus_registry,
679                epoch_store
680                    .protocol_config()
681                    .max_move_identifier_len_as_option(),
682                config.remove_deprecated_tables,
683            )))
684        } else {
685            None
686        };
687
688        let chain_identifier = epoch_store.get_chain_identifier();
689
690        // The embedded `sui-rpc-store` and the legacy `rpc-index` are
691        // mutually exclusive index backends; selecting the experimental
692        // store skips building the old index and serves the index read
693        // paths from the embedded store instead.
694        let (rpc_index, mut embedded_rpc_store) =
695            if node_role.is_fullnode() && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
696                if config
697                    .rpc()
698                    .is_some_and(|rpc| rpc.use_experimental_rpc_store())
699                {
700                    info!("creating embedded rpc-store");
701                    // The tip indexer pulls checkpoints from the node's local
702                    // checkpoint / perpetual stores via a dedicated read handle.
703                    let ingestion_source = RocksDbStore::new(
704                        cache_traits.clone(),
705                        committee_store.clone(),
706                        checkpoint_store.clone(),
707                    );
708                    let embedded_rpc_store = EmbeddedRpcStore::bootstrap(
709                        &config,
710                        &store,
711                        &checkpoint_store,
712                        ingestion_source,
713                        chain_identifier,
714                        &prometheus_registry,
715                    )
716                    .await?;
717                    (None, Some(embedded_rpc_store))
718                } else {
719                    info!("creating rpc index store");
720                    let rpc_index = Arc::new(
721                        RpcIndexStore::new(
722                            &config.db_path(),
723                            &store,
724                            &checkpoint_store,
725                            &epoch_store,
726                            &cache_traits.backing_package_store,
727                            config.rpc().cloned().unwrap_or_default(),
728                        )
729                        .await,
730                    );
731                    (Some(rpc_index), None)
732                }
733            } else {
734                (None, None)
735            };
736
737        info!("creating archive reader");
738        // Create network
739        let (randomness_tx, randomness_rx) = mpsc::channel(
740            config
741                .p2p_config
742                .randomness
743                .clone()
744                .unwrap_or_default()
745                .mailbox_capacity(),
746        );
747        let P2pComponents {
748            p2p_network,
749            known_peers,
750            discovery_handle,
751            state_sync_handle,
752            randomness_handle,
753            endpoint_manager,
754        } = Self::create_p2p_network(
755            &config,
756            state_sync_store.clone(),
757            chain_identifier,
758            randomness_tx,
759            &prometheus_registry,
760        )?;
761
762        // Inject configured peer address overrides.
763        for peer in &config.p2p_config.peer_address_overrides {
764            endpoint_manager
765                .update_endpoint(
766                    EndpointId::P2p(peer.peer_id),
767                    AddressSource::Config,
768                    peer.addresses.clone(),
769                )
770                .expect("Updating peer address overrides should not fail");
771        }
772
773        // Send initial peer addresses to the p2p network.
774        update_peer_addresses(
775            &config,
776            &endpoint_manager,
777            epoch_store.epoch_start_state(),
778            None,
779        );
780
781        info!("start snapshot upload");
782        // Start uploading state snapshot to remote store
783        let state_snapshot_handle = Self::start_state_snapshot(
784            &config,
785            &prometheus_registry,
786            checkpoint_store.clone(),
787            chain_identifier,
788        )?;
789
790        // Start uploading db checkpoints to remote store
791        info!("start db checkpoint");
792        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
793            &config,
794            &prometheus_registry,
795            state_snapshot_handle.is_some(),
796        )?;
797
798        if !epoch_store
799            .protocol_config()
800            .simplified_unwrap_then_delete()
801        {
802            // We cannot prune tombstones if simplified_unwrap_then_delete is not enabled.
803            config
804                .authority_store_pruning_config
805                .set_killswitch_tombstone_pruning(true);
806        }
807
808        let authority_name = config.protocol_public_key();
809
810        info!("create authority state");
811        let state = AuthorityState::new(
812            authority_name,
813            secret,
814            config.supported_protocol_versions.unwrap(),
815            store.clone(),
816            cache_traits.clone(),
817            epoch_store.clone(),
818            committee_store.clone(),
819            index_store.clone(),
820            rpc_index,
821            embedded_rpc_store.as_ref().map(|embedded| embedded.store()),
822            checkpoint_store.clone(),
823            &prometheus_registry,
824            genesis.objects(),
825            &db_checkpoint_config,
826            config.clone(),
827            chain_identifier,
828            config.policy_config.clone(),
829            config.firewall_config.clone(),
830            pruner_watermarks,
831        )
832        .await;
833        // ensure genesis txn was executed
834        if epoch_store.epoch() == 0 {
835            let txn = &genesis.transaction();
836            let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
837            let transaction =
838                sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
839                    sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
840                        genesis.transaction().data().clone(),
841                        sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
842                    ),
843                );
844            let _enter = span.enter();
845            state
846                .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
847                .unwrap();
848        }
849
850        // Start the loop that receives new randomness and generates transactions for it.
851        // The returned is long-lived (node lifetime).
852        let randomness_receiver_handle =
853            RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
854
855        let (end_of_epoch_channel, end_of_epoch_receiver) =
856            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
857
858        let transaction_orchestrator = if node_role.is_fullnode() && run_with_range.is_none() {
859            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
860                auth_agg.load_full(),
861                state.clone(),
862                end_of_epoch_receiver,
863                &config.db_path(),
864                &prometheus_registry,
865                &config,
866            )))
867        } else {
868            None
869        };
870
871        let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
872            state.clone(),
873            state_sync_store,
874            &transaction_orchestrator.clone(),
875            &config,
876            &prometheus_registry,
877            server_version,
878            node_role,
879            embedded_rpc_store.as_ref(),
880        )
881        .await?;
882
883        // Start the embedded rpc-store's tip indexer. It follows the tip
884        // via the checkpoint executor's broadcast stream and backfills
885        // any gap from the perpetual store. Spawned on a background task
886        // (see `spawn_indexer`) so node startup does not block on the
887        // first checkpoint, which the executor only produces after this
888        // function returns.
889        if let Some(embedded) = embedded_rpc_store.as_mut() {
890            embedded.spawn_indexer(
891                subscription_service_checkpoint_sender.clone(),
892                prometheus_registry.clone(),
893            );
894        }
895
896        let global_state_hasher = Arc::new(GlobalStateHasher::new(
897            cache_traits.global_state_hash_store.clone(),
898            GlobalStateHashMetrics::new(&prometheus_registry),
899        ));
900
901        let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
902            "sui",
903            &registry_service.default_registry(),
904        );
905
906        let connection_monitor_handle =
907            mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
908                p2p_network.downgrade(),
909                Arc::new(network_connection_metrics),
910                known_peers,
911            );
912
913        let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));
914
915        sui_node_metrics
916            .binary_max_protocol_version
917            .set(ProtocolVersion::MAX.as_u64() as i64);
918        sui_node_metrics
919            .configured_max_protocol_version
920            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
921
922        let node_role = epoch_store.node_role();
923        let validator_components = if node_role.runs_consensus() {
924            let mut components = Self::construct_validator_components(
925                config.clone(),
926                state.clone(),
927                committee,
928                epoch_store.clone(),
929                checkpoint_store.clone(),
930                state_sync_handle.clone(),
931                randomness_handle.clone(),
932                Arc::downgrade(&global_state_hasher),
933                backpressure_manager.clone(),
934                &registry_service,
935                sui_node_metrics.clone(),
936                checkpoint_metrics.clone(),
937                node_role,
938                randomness_receiver_handle.clone(),
939            )
940            .await?;
941
942            if node_role.is_validator() {
943                components
944                    .consensus_adapter
945                    .recover_end_of_publish(&epoch_store);
946
947                // Start the gRPC server
948                components.validator_server_handle = Some(
949                    components
950                        .validator_server_handle
951                        .take()
952                        .unwrap()
953                        .start()
954                        .await,
955                );
956
957                // Set the consensus address updater so that we can update the consensus peer addresses when requested.
958                endpoint_manager
959                    .set_consensus_address_updater(components.consensus_manager.clone());
960            } else {
961                info!("Starting node as Observer — connecting to configured peers");
962            }
963
964            Some(components)
965        } else {
966            None
967        };
968
969        // setup shutdown channel
970        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
971
972        let node = Self {
973            config,
974            validator_components: Mutex::new(validator_components),
975            http_servers,
976            state,
977            transaction_orchestrator,
978            registry_service,
979            metrics: sui_node_metrics,
980            checkpoint_metrics,
981
982            _discovery: discovery_handle,
983            _connection_monitor_handle: connection_monitor_handle,
984            state_sync_handle,
985            randomness_handle,
986            checkpoint_store,
987            global_state_hasher: Mutex::new(Some(global_state_hasher)),
988            end_of_epoch_channel,
989            endpoint_manager,
990            backpressure_manager,
991
992            _db_checkpoint_handle: db_checkpoint_handle,
993
994            #[cfg(msim)]
995            sim_state: Default::default(),
996
997            _state_snapshot_uploader_handle: state_snapshot_handle,
998            shutdown_channel_tx: shutdown_channel,
999            randomness_receiver_handle,
1000
1001            auth_agg,
1002            subscription_service_checkpoint_sender,
1003            embedded_rpc_store,
1004        };
1005
1006        info!("SuiNode started!");
1007        let node = Arc::new(node);
1008        let node_copy = node.clone();
1009        spawn_monitored_task!(async move {
1010            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
1011            if let Err(error) = result {
1012                warn!("Reconfiguration finished with error {:?}", error);
1013            }
1014        });
1015
1016        Ok(node)
1017    }
1018
1019    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
1020        self.end_of_epoch_channel.subscribe()
1021    }
1022
1023    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
1024        self.shutdown_channel_tx.subscribe()
1025    }
1026
1027    pub fn current_epoch_for_testing(&self) -> EpochId {
1028        self.state.current_epoch_for_testing()
1029    }
1030
1031    pub fn db_checkpoint_path(&self) -> PathBuf {
1032        self.config.db_checkpoint_path()
1033    }
1034
1035    // Init reconfig process by starting to reject user certs
1036    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
1037        info!("close_epoch (current epoch = {})", epoch_store.epoch());
1038        self.validator_components
1039            .lock()
1040            .await
1041            .as_ref()
1042            .ok_or_else(|| SuiError::from("Node is not a validator"))?
1043            .consensus_adapter
1044            .close_epoch(epoch_store);
1045        Ok(())
1046    }
1047
1048    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
1049        self.state
1050            .clear_override_protocol_upgrade_buffer_stake(epoch)
1051    }
1052
1053    pub fn set_override_protocol_upgrade_buffer_stake(
1054        &self,
1055        epoch: EpochId,
1056        buffer_stake_bps: u64,
1057    ) -> SuiResult {
1058        self.state
1059            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1060    }
1061
1062    // Testing-only API to start epoch close process.
1063    // For production code, please use the non-testing version.
1064    pub async fn close_epoch_for_testing(&self) -> SuiResult {
1065        let epoch_store = self.state.epoch_store_for_testing();
1066        self.close_epoch(&epoch_store).await
1067    }
1068
1069    fn start_state_snapshot(
1070        config: &NodeConfig,
1071        prometheus_registry: &Registry,
1072        checkpoint_store: Arc<CheckpointStore>,
1073        chain_identifier: ChainIdentifier,
1074    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1075        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1076            let snapshot_uploader = StateSnapshotUploader::new(
1077                &config.db_checkpoint_path(),
1078                &config.snapshot_path(),
1079                remote_store_config.clone(),
1080                60,
1081                prometheus_registry,
1082                checkpoint_store,
1083                chain_identifier,
1084                config.state_snapshot_write_config.archive_interval_epochs,
1085            )?;
1086            Ok(Some(snapshot_uploader.start()))
1087        } else {
1088            Ok(None)
1089        }
1090    }
1091
1092    fn start_db_checkpoint(
1093        config: &NodeConfig,
1094        prometheus_registry: &Registry,
1095        state_snapshot_enabled: bool,
1096    ) -> Result<(
1097        DBCheckpointConfig,
1098        Option<tokio::sync::broadcast::Sender<()>>,
1099    )> {
1100        let checkpoint_path = Some(
1101            config
1102                .db_checkpoint_config
1103                .checkpoint_path
1104                .clone()
1105                .unwrap_or_else(|| config.db_checkpoint_path()),
1106        );
1107        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1108            DBCheckpointConfig {
1109                checkpoint_path,
1110                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1111                    true
1112                } else {
1113                    config
1114                        .db_checkpoint_config
1115                        .perform_db_checkpoints_at_epoch_end
1116                },
1117                ..config.db_checkpoint_config.clone()
1118            }
1119        } else {
1120            config.db_checkpoint_config.clone()
1121        };
1122
1123        match (
1124            db_checkpoint_config.object_store_config.as_ref(),
1125            state_snapshot_enabled,
1126        ) {
1127            // If db checkpoint config object store not specified but
1128            // state snapshot object store is specified, create handler
1129            // anyway for marking db checkpoints as completed so that they
1130            // can be uploaded as state snapshots.
1131            (None, false) => Ok((db_checkpoint_config, None)),
1132            (_, _) => {
1133                let handler = DBCheckpointHandler::new(
1134                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1135                    db_checkpoint_config.object_store_config.as_ref(),
1136                    60,
1137                    db_checkpoint_config
1138                        .prune_and_compact_before_upload
1139                        .unwrap_or(true),
1140                    config.authority_store_pruning_config.clone(),
1141                    prometheus_registry,
1142                    state_snapshot_enabled,
1143                )?;
1144                Ok((
1145                    db_checkpoint_config,
1146                    Some(DBCheckpointHandler::start(handler)),
1147                ))
1148            }
1149        }
1150    }
1151
1152    fn create_p2p_network(
1153        config: &NodeConfig,
1154        state_sync_store: RocksDbStore,
1155        chain_identifier: ChainIdentifier,
1156        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1157        prometheus_registry: &Registry,
1158    ) -> Result<P2pComponents> {
1159        let mut p2p_config = config.p2p_config.clone();
1160        {
1161            let disc = p2p_config.discovery.get_or_insert_with(Default::default);
1162            if disc.peer_addr_store_path.is_none() {
1163                disc.peer_addr_store_path =
1164                    Some(config.db_path().join("discovery_peer_cache.yaml"));
1165            }
1166        }
1167        let mut discovery_builder = discovery::Builder::new().config(p2p_config.clone());
1168        if let Some(consensus_config) = &config.consensus_config {
1169            let effective_addr = consensus_config
1170                .external_address
1171                .as_ref()
1172                .or(consensus_config.listen_address.as_ref());
1173            if let Some(addr) = effective_addr {
1174                discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1175            }
1176        }
1177        let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1178        let discovery_sender = discovery.sender();
1179
1180        let (state_sync, state_sync_router) = state_sync::Builder::new()
1181            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1182            .store(state_sync_store)
1183            .archive_config(config.archive_reader_config())
1184            .discovery_sender(discovery_sender)
1185            .with_metrics(prometheus_registry)
1186            .build();
1187
1188        let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1189        let known_peers: HashMap<PeerId, String> = discovery_config
1190            .allowlisted_peers
1191            .clone()
1192            .into_iter()
1193            .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1194            .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1195                peer.peer_id
1196                    .map(|peer_id| (peer_id, "seed_peer".to_string()))
1197            }))
1198            .collect();
1199
1200        let (randomness, randomness_router) =
1201            randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1202                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1203                .with_metrics(prometheus_registry)
1204                .build();
1205
1206        let p2p_network = {
1207            let routes = anemo::Router::new()
1208                .add_rpc_service(discovery_server)
1209                .merge(state_sync_router);
1210            let routes = routes.merge(randomness_router);
1211
1212            let inbound_network_metrics =
1213                mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1214            let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1215                "sui",
1216                "outbound",
1217                prometheus_registry,
1218            );
1219
1220            let service = ServiceBuilder::new()
1221                .layer(
1222                    TraceLayer::new_for_server_errors()
1223                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1224                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1225                )
1226                .layer(CallbackLayer::new(
1227                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1228                        Arc::new(inbound_network_metrics),
1229                        config.p2p_config.excessive_message_size(),
1230                    ),
1231                ))
1232                .service(routes);
1233
1234            let outbound_layer = ServiceBuilder::new()
1235                .layer(
1236                    TraceLayer::new_for_client_and_server_errors()
1237                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1238                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1239                )
1240                .layer(CallbackLayer::new(
1241                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1242                        Arc::new(outbound_network_metrics),
1243                        config.p2p_config.excessive_message_size(),
1244                    ),
1245                ))
1246                .into_inner();
1247
1248            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1249            // Inbound requests on this network are small (signatures, queries, summaries).
1250            // Cap request frames at 1 MiB.
1251            anemo_config.max_request_frame_size = Some(1 << 20);
1252            // Responses can be larger (checkpoint contents).
1253            // Cap response frames at 128 MiB.
1254            anemo_config.max_response_frame_size = Some(128 << 20);
1255
1256            // Set a higher default value for socket send/receive buffers if not already
1257            // configured.
1258            let mut quic_config = anemo_config.quic.unwrap_or_default();
1259            if quic_config.socket_send_buffer_size.is_none() {
1260                quic_config.socket_send_buffer_size = Some(20 << 20);
1261            }
1262            if quic_config.socket_receive_buffer_size.is_none() {
1263                quic_config.socket_receive_buffer_size = Some(20 << 20);
1264            }
1265            quic_config.allow_failed_socket_buffer_size_setting = true;
1266
1267            // Set high-performance defaults for quinn transport.
1268            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1269            if quic_config.max_concurrent_bidi_streams.is_none() {
1270                quic_config.max_concurrent_bidi_streams = Some(500);
1271            }
1272            if quic_config.max_concurrent_uni_streams.is_none() {
1273                quic_config.max_concurrent_uni_streams = Some(500);
1274            }
1275            if quic_config.stream_receive_window.is_none() {
1276                quic_config.stream_receive_window = Some(100 << 20);
1277            }
1278            if quic_config.receive_window.is_none() {
1279                quic_config.receive_window = Some(200 << 20);
1280            }
1281            if quic_config.send_window.is_none() {
1282                quic_config.send_window = Some(200 << 20);
1283            }
1284            if quic_config.crypto_buffer_size.is_none() {
1285                quic_config.crypto_buffer_size = Some(1 << 20);
1286            }
1287            if quic_config.max_idle_timeout_ms.is_none() {
1288                quic_config.max_idle_timeout_ms = Some(10_000);
1289            }
1290            if quic_config.keep_alive_interval_ms.is_none() {
1291                quic_config.keep_alive_interval_ms = Some(5_000);
1292            }
1293            anemo_config.quic = Some(quic_config);
1294
1295            let server_name = format!("sui-{}", chain_identifier);
1296            let network = Network::bind(config.p2p_config.listen_address)
1297                .server_name(&server_name)
1298                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1299                .config(anemo_config)
1300                .outbound_request_layer(outbound_layer)
1301                .start(service)?;
1302            info!(
1303                server_name = server_name,
1304                "P2p network started on {}",
1305                network.local_addr()
1306            );
1307
1308            network
1309        };
1310
1311        let discovery_handle =
1312            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1313        let state_sync_handle = state_sync.start(p2p_network.clone());
1314        let randomness_handle = randomness.start(p2p_network.clone());
1315
1316        Ok(P2pComponents {
1317            p2p_network,
1318            known_peers,
1319            discovery_handle,
1320            state_sync_handle,
1321            randomness_handle,
1322            endpoint_manager,
1323        })
1324    }
1325
1326    async fn construct_validator_components(
1327        config: NodeConfig,
1328        state: Arc<AuthorityState>,
1329        committee: Arc<Committee>,
1330        epoch_store: Arc<AuthorityPerEpochStore>,
1331        checkpoint_store: Arc<CheckpointStore>,
1332        state_sync_handle: state_sync::Handle,
1333        randomness_handle: randomness::Handle,
1334        global_state_hasher: Weak<GlobalStateHasher>,
1335        backpressure_manager: Arc<BackpressureManager>,
1336        registry_service: &RegistryService,
1337        sui_node_metrics: Arc<SuiNodeMetrics>,
1338        checkpoint_metrics: Arc<CheckpointMetrics>,
1339        node_role: NodeRole,
1340        randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
1341    ) -> Result<ValidatorComponents> {
1342        let mut config_clone = config.clone();
1343        let consensus_config = config_clone
1344            .consensus_config
1345            .as_mut()
1346            .ok_or_else(|| anyhow!("Node is missing consensus config"))?;
1347
1348        let client = Arc::new(UpdatableConsensusClient::new());
1349        let inflight_slot_freed_notify = Arc::new(tokio::sync::Notify::new());
1350        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1351            &committee,
1352            consensus_config,
1353            state.name,
1354            &registry_service.default_registry(),
1355            client.clone(),
1356            checkpoint_store.clone(),
1357            inflight_slot_freed_notify.clone(),
1358        ));
1359
1360        let consensus_manager = Arc::new(ConsensusManager::new(
1361            &config,
1362            consensus_config,
1363            registry_service,
1364            client,
1365            node_role,
1366        ));
1367
1368        // This only gets started up once, not on every epoch. (Make call to remove every epoch.)
1369        let consensus_store_pruner = ConsensusStorePruner::new(
1370            consensus_manager.get_storage_base_path(),
1371            consensus_config.db_retention_epochs(),
1372            consensus_config.db_pruner_period(),
1373            &registry_service.default_registry(),
1374        );
1375
1376        let sui_tx_validator_metrics =
1377            SuiTxValidatorMetrics::new(&registry_service.default_registry());
1378
1379        let (validator_server_handle, admission_queue) = if node_role.is_validator() {
1380            let (handle, queue) = Self::start_grpc_validator_service(
1381                &config,
1382                state.clone(),
1383                consensus_adapter.clone(),
1384                epoch_store.clone(),
1385                &registry_service.default_registry(),
1386                inflight_slot_freed_notify,
1387            )
1388            .await?;
1389            (Some(handle), queue)
1390        } else {
1391            (None, None)
1392        };
1393
1394        // Starts an overload monitor that monitors the execution of the authority.
1395        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1396        let validator_overload_monitor_handle = if node_role.is_validator()
1397            && config
1398                .authority_overload_config
1399                .max_load_shedding_percentage
1400                > 0
1401        {
1402            let authority_state = Arc::downgrade(&state);
1403            let overload_config = config.authority_overload_config.clone();
1404            fail_point!("starting_overload_monitor");
1405            Some(spawn_monitored_task!(overload_monitor(
1406                authority_state,
1407                overload_config,
1408            )))
1409        } else {
1410            None
1411        };
1412
1413        Self::start_epoch_specific_validator_components(
1414            &config,
1415            state.clone(),
1416            consensus_adapter,
1417            checkpoint_store,
1418            epoch_store,
1419            state_sync_handle,
1420            randomness_handle,
1421            randomness_receiver_handle,
1422            consensus_manager,
1423            consensus_store_pruner,
1424            global_state_hasher,
1425            backpressure_manager,
1426            validator_server_handle,
1427            validator_overload_monitor_handle,
1428            checkpoint_metrics,
1429            sui_node_metrics,
1430            sui_tx_validator_metrics,
1431            admission_queue,
1432            node_role,
1433        )
1434        .await
1435    }
1436
1437    async fn start_epoch_specific_validator_components(
1438        config: &NodeConfig,
1439        state: Arc<AuthorityState>,
1440        consensus_adapter: Arc<ConsensusAdapter>,
1441        checkpoint_store: Arc<CheckpointStore>,
1442        epoch_store: Arc<AuthorityPerEpochStore>,
1443        state_sync_handle: state_sync::Handle,
1444        randomness_handle: randomness::Handle,
1445        randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
1446        consensus_manager: Arc<ConsensusManager>,
1447        consensus_store_pruner: ConsensusStorePruner,
1448        state_hasher: Weak<GlobalStateHasher>,
1449        backpressure_manager: Arc<BackpressureManager>,
1450        validator_server_handle: Option<SpawnOnce>,
1451        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1452        checkpoint_metrics: Arc<CheckpointMetrics>,
1453        sui_node_metrics: Arc<SuiNodeMetrics>,
1454        sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1455        admission_queue: Option<AdmissionQueueContext>,
1456        node_role: NodeRole,
1457    ) -> Result<ValidatorComponents> {
1458        let checkpoint_service = Self::build_checkpoint_service(
1459            config,
1460            consensus_adapter.clone(),
1461            checkpoint_store.clone(),
1462            epoch_store.clone(),
1463            state.clone(),
1464            state_sync_handle,
1465            state_hasher,
1466            checkpoint_metrics.clone(),
1467            node_role,
1468        );
1469
1470        // Clear the VSS public key from the previous epoch so any randomness round
1471        // signatures buffer in the channel until the new DKG completes.
1472        randomness_receiver_handle.clear_public_key();
1473
1474        if node_role.runs_consensus() && epoch_store.randomness_state_enabled() {
1475            let authority_key_pair = if node_role.is_validator() {
1476                Some(config.protocol_key_pair())
1477            } else {
1478                None
1479            };
1480            let randomness_manager = RandomnessManager::try_new(
1481                Arc::downgrade(&epoch_store),
1482                Box::new(consensus_adapter.clone()),
1483                randomness_handle,
1484                authority_key_pair,
1485                randomness_receiver_handle.clone(),
1486            )
1487            .await;
1488            if let Some(randomness_manager) = randomness_manager {
1489                epoch_store
1490                    .set_randomness_manager(randomness_manager)
1491                    .await?;
1492            }
1493        }
1494
1495        if node_role.is_validator() {
1496            ExecutionTimeObserver::spawn(
1497                epoch_store.clone(),
1498                Box::new(consensus_adapter.clone()),
1499                config
1500                    .execution_time_observer_config
1501                    .clone()
1502                    .unwrap_or_default(),
1503            );
1504        }
1505
1506        let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1507            None,
1508            state.metrics.clone(),
1509        ));
1510
1511        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1512            state.clone(),
1513            checkpoint_service.clone(),
1514            epoch_store.clone(),
1515            consensus_adapter.clone(),
1516            throughput_calculator,
1517            backpressure_manager,
1518            config.congestion_log.clone(),
1519        );
1520
1521        info!("Starting consensus manager asynchronously");
1522
1523        // Spawn consensus startup asynchronously to avoid blocking other components
1524        tokio::spawn({
1525            let config = config.clone();
1526            let epoch_store = epoch_store.clone();
1527            let sui_tx_validator = SuiTxValidator::new(
1528                state.clone(),
1529                epoch_store.clone(),
1530                checkpoint_service.clone(),
1531                sui_tx_validator_metrics.clone(),
1532            );
1533            let consensus_manager = consensus_manager.clone();
1534            async move {
1535                consensus_manager
1536                    .start(
1537                        &config,
1538                        epoch_store,
1539                        consensus_handler_initializer,
1540                        sui_tx_validator,
1541                        Some(randomness_receiver_handle),
1542                    )
1543                    .await;
1544            }
1545        });
1546        let replay_waiter = consensus_manager.replay_waiter();
1547
1548        info!("Spawning checkpoint service");
1549        let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1550            None
1551        } else {
1552            Some(replay_waiter)
1553        };
1554        checkpoint_service
1555            .spawn(epoch_store.clone(), replay_waiter)
1556            .await;
1557
1558        if node_role.is_validator() && epoch_store.authenticator_state_enabled() {
1559            Self::start_jwk_updater(
1560                config,
1561                sui_node_metrics,
1562                state.name,
1563                epoch_store.clone(),
1564                consensus_adapter.clone(),
1565            );
1566        }
1567
1568        if let Some(ctx) = &admission_queue {
1569            ctx.rotate_for_epoch(epoch_store);
1570        }
1571
1572        Ok(ValidatorComponents {
1573            validator_server_handle,
1574            validator_overload_monitor_handle,
1575            consensus_manager,
1576            consensus_store_pruner,
1577            consensus_adapter,
1578            checkpoint_metrics,
1579            sui_tx_validator_metrics,
1580            admission_queue,
1581        })
1582    }
1583
1584    fn build_checkpoint_service(
1585        config: &NodeConfig,
1586        consensus_adapter: Arc<ConsensusAdapter>,
1587        checkpoint_store: Arc<CheckpointStore>,
1588        epoch_store: Arc<AuthorityPerEpochStore>,
1589        state: Arc<AuthorityState>,
1590        state_sync_handle: state_sync::Handle,
1591        state_hasher: Weak<GlobalStateHasher>,
1592        checkpoint_metrics: Arc<CheckpointMetrics>,
1593        node_role: NodeRole,
1594    ) -> Arc<CheckpointService> {
1595        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1596        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1597
1598        debug!(
1599            "Starting checkpoint service with epoch start timestamp {}
1600            and epoch duration {}",
1601            epoch_start_timestamp_ms, epoch_duration_ms
1602        );
1603
1604        let checkpoint_output: Box<dyn CheckpointOutput> = if node_role.is_validator() {
1605            Box::new(SubmitCheckpointToConsensus::new(
1606                consensus_adapter,
1607                state.secret.clone(),
1608                config.protocol_public_key(),
1609                epoch_start_timestamp_ms
1610                    .checked_add(epoch_duration_ms)
1611                    .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1612                checkpoint_metrics.clone(),
1613            ))
1614        } else {
1615            Box::new(LogCheckpointOutput::new(checkpoint_metrics.clone()))
1616        };
1617
1618        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1619
1620        CheckpointService::build(
1621            state.clone(),
1622            checkpoint_store,
1623            epoch_store,
1624            state.get_transaction_cache_reader().clone(),
1625            state_hasher,
1626            checkpoint_output,
1627            Box::new(certified_checkpoint_output),
1628            checkpoint_metrics,
1629        )
1630    }
1631
1632    fn construct_consensus_adapter(
1633        committee: &Committee,
1634        consensus_config: &ConsensusConfig,
1635        authority: AuthorityName,
1636        prometheus_registry: &Registry,
1637        consensus_client: Arc<dyn ConsensusClient>,
1638        checkpoint_store: Arc<CheckpointStore>,
1639        inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1640    ) -> ConsensusAdapter {
1641        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1642        // The consensus adapter allows the authority to send user certificates through consensus.
1643
1644        ConsensusAdapter::new(
1645            consensus_client,
1646            checkpoint_store,
1647            authority,
1648            consensus_config.max_pending_transactions(),
1649            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1650            ca_metrics,
1651            inflight_slot_freed_notify,
1652        )
1653    }
1654
1655    async fn start_grpc_validator_service(
1656        config: &NodeConfig,
1657        state: Arc<AuthorityState>,
1658        consensus_adapter: Arc<ConsensusAdapter>,
1659        epoch_store: Arc<AuthorityPerEpochStore>,
1660        prometheus_registry: &Registry,
1661        inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1662    ) -> Result<(SpawnOnce, Option<AdmissionQueueContext>)> {
1663        let overload_config = &config.authority_overload_config;
1664        let admission_queue = overload_config.admission_queue_enabled.then(|| {
1665            let manager = Arc::new(AdmissionQueueManager::new(
1666                consensus_adapter.clone(),
1667                Arc::new(AdmissionQueueMetrics::new(prometheus_registry)),
1668                overload_config.admission_queue_capacity_fraction,
1669                overload_config.admission_queue_bypass_fraction,
1670                overload_config.admission_queue_failover_timeout,
1671                inflight_slot_freed_notify,
1672            ));
1673            AdmissionQueueContext::spawn(manager, epoch_store)
1674        });
1675        let validator_service = ValidatorService::new(
1676            state.clone(),
1677            consensus_adapter,
1678            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1679            config.policy_config.clone().map(|p| p.client_id_source),
1680            admission_queue.clone(),
1681        );
1682
1683        let mut server_conf = mysten_network::config::Config::new();
1684        server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1685        server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1686        server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1687        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1688        server_conf.load_shed = config.grpc_load_shed;
1689        let mut server_builder =
1690            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1691
1692        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1693
1694        let tls_config = sui_tls::create_rustls_server_config(
1695            config.network_key_pair().copy().private(),
1696            SUI_TLS_SERVER_NAME.to_string(),
1697        );
1698
1699        let network_address = config.network_address().clone();
1700
1701        let (ready_tx, ready_rx) = oneshot::channel();
1702
1703        let spawn_once = SpawnOnce::new(ready_rx, async move {
1704            let server = server_builder
1705                .bind(&network_address, Some(tls_config))
1706                .await
1707                .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1708            let local_addr = server.local_addr();
1709            info!("Listening to traffic on {local_addr}");
1710            ready_tx.send(()).unwrap();
1711            if let Err(err) = server.serve().await {
1712                info!("Server stopped: {err}");
1713            }
1714            info!("Server stopped");
1715        });
1716        Ok((spawn_once, admission_queue))
1717    }
1718
1719    pub fn state(&self) -> Arc<AuthorityState> {
1720        self.state.clone()
1721    }
1722
1723    /// The embedded `sui-rpc-store` index backend, when the node runs
1724    /// with `use_experimental_rpc_store`. Exposes the startup bootstrap
1725    /// decision and per-cohort watermarks for introspection (used by
1726    /// tests to observe restore/resume behavior across restarts without
1727    /// going through the RPC surface).
1728    pub fn embedded_rpc_store(&self) -> Option<&EmbeddedRpcStore> {
1729        self.embedded_rpc_store.as_ref()
1730    }
1731
1732    #[cfg(any(test, msim))]
1733    pub fn connection_monitor_handle_for_testing(
1734        &self,
1735    ) -> &mysten_network::anemo_connection_monitor::ConnectionMonitorHandle {
1736        &self._connection_monitor_handle
1737    }
1738
1739    pub fn node_role(&self) -> NodeRole {
1740        self.state.load_epoch_store_one_call_per_task().node_role()
1741    }
1742
1743    // Only used for testing because of how epoch store is loaded.
1744    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1745        self.state.reference_gas_price_for_testing()
1746    }
1747
1748    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1749        self.state.committee_store().clone()
1750    }
1751
1752    pub fn clone_checkpoint_store(&self) -> Arc<CheckpointStore> {
1753        self.checkpoint_store.clone()
1754    }
1755
1756    pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1757        self.state.authority_store()
1758    }
1759
1760    pub fn clone_consensus_store(
1761        &self,
1762    ) -> Option<Arc<consensus_core::storage::rocksdb_store::RocksDBStore>> {
1763        self.validator_components
1764            .try_lock()
1765            .ok()?
1766            .as_ref()?
1767            .consensus_manager
1768            .consensus_store()
1769    }
1770
1771    /// Clone an AuthorityAggregator currently used in this node, if the node is a fullnode.
1772    /// After reconfig, Transaction Driver builds a new AuthorityAggregator. The caller
1773    /// of this function will mostly likely want to call this again
1774    /// to get a fresh one.
1775    pub fn clone_authority_aggregator(
1776        &self,
1777    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1778        self.transaction_orchestrator
1779            .as_ref()
1780            .map(|to| to.clone_authority_aggregator())
1781    }
1782
1783    pub fn transaction_orchestrator(
1784        &self,
1785    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1786        self.transaction_orchestrator.clone()
1787    }
1788
1789    /// This function awaits the completion of checkpoint execution of the current epoch,
1790    /// after which it initiates reconfiguration of the entire system.
1791    pub async fn monitor_reconfiguration(
1792        self: Arc<Self>,
1793        mut epoch_store: Arc<AuthorityPerEpochStore>,
1794    ) -> Result<()> {
1795        let checkpoint_executor_metrics =
1796            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1797
1798        loop {
1799            let mut hasher_guard = self.global_state_hasher.lock().await;
1800            let hasher = hasher_guard.take().unwrap();
1801            info!(
1802                "Creating checkpoint executor for epoch {}",
1803                epoch_store.epoch()
1804            );
1805            let checkpoint_executor = CheckpointExecutor::new(
1806                epoch_store.clone(),
1807                self.checkpoint_store.clone(),
1808                self.state.clone(),
1809                hasher.clone(),
1810                self.backpressure_manager.clone(),
1811                self.config.checkpoint_executor_config.clone(),
1812                checkpoint_executor_metrics.clone(),
1813                self.subscription_service_checkpoint_sender.clone(),
1814            );
1815
1816            let run_with_range = self.config.run_with_range;
1817
1818            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1819
1820            // Update the current protocol version metric.
1821            self.metrics
1822                .current_protocol_version
1823                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1824
1825            // Advertise capabilities to committee, if we are a validator.
1826            // FullNodes that state sync via consensus will also have validator components, by they are not supposed to submit any capabilities.
1827            if let Some(components) = &*self.validator_components.lock().await
1828                && cur_epoch_store.is_validator()
1829            {
1830                // TODO: without this sleep, the consensus message is not delivered reliably.
1831                tokio::time::sleep(Duration::from_millis(1)).await;
1832
1833                let config = cur_epoch_store.protocol_config();
1834                let mut supported_protocol_versions = self
1835                    .config
1836                    .supported_protocol_versions
1837                    .expect("Supported versions should be populated")
1838                    // no need to send digests of versions less than the current version
1839                    .truncate_below(config.version);
1840
1841                while supported_protocol_versions.max > config.version {
1842                    let proposed_protocol_config = ProtocolConfig::get_for_version(
1843                        supported_protocol_versions.max,
1844                        cur_epoch_store.get_chain(),
1845                    );
1846
1847                    if proposed_protocol_config.enable_accumulators()
1848                        && !epoch_store.accumulator_root_exists()
1849                    {
1850                        error!(
1851                            "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1852                            supported_protocol_versions.max
1853                        );
1854                        supported_protocol_versions.max = supported_protocol_versions.max.prev();
1855                    } else {
1856                        break;
1857                    }
1858                }
1859
1860                let binary_config = config.binary_config(None);
1861                let transaction = ConsensusTransaction::new_capability_notification_v2(
1862                    AuthorityCapabilitiesV2::new(
1863                        self.state.name,
1864                        cur_epoch_store.get_chain_identifier().chain(),
1865                        supported_protocol_versions,
1866                        self.state
1867                            .get_available_system_packages(&binary_config)
1868                            .await,
1869                    ),
1870                );
1871                info!(?transaction, "submitting capabilities to consensus");
1872                components.consensus_adapter.submit(
1873                    transaction,
1874                    None,
1875                    &cur_epoch_store,
1876                    None,
1877                    None,
1878                )?;
1879            }
1880
1881            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1882
1883            if stop_condition == StopReason::RunWithRangeCondition {
1884                SuiNode::shutdown(&self).await;
1885                self.shutdown_channel_tx
1886                    .send(run_with_range)
1887                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1888                return Ok(());
1889            }
1890
1891            // Safe to call because we are in the middle of reconfiguration.
1892            let latest_system_state = self
1893                .state
1894                .get_object_cache_reader()
1895                .get_sui_system_state_object_unsafe()
1896                .expect("Read Sui System State object cannot fail");
1897
1898            #[cfg(msim)]
1899            if !self
1900                .sim_state
1901                .sim_safe_mode_expected
1902                .load(Ordering::Relaxed)
1903            {
1904                debug_assert!(!latest_system_state.safe_mode());
1905            }
1906
1907            #[cfg(not(msim))]
1908            debug_assert!(!latest_system_state.safe_mode());
1909
1910            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1911                && self.state.is_fullnode(&cur_epoch_store)
1912            {
1913                warn!(
1914                    "Failed to send end of epoch notification to subscriber: {:?}",
1915                    err
1916                );
1917            }
1918
1919            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1920            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1921
1922            self.auth_agg.store(Arc::new(
1923                self.auth_agg
1924                    .load()
1925                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1926            ));
1927
1928            let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1929            let next_epoch = next_epoch_committee.epoch();
1930            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1931
1932            info!(
1933                next_epoch,
1934                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1935            );
1936
1937            fail_point_async!("reconfig_delay");
1938
1939            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1940
1941            update_peer_addresses(
1942                &self.config,
1943                &self.endpoint_manager,
1944                &new_epoch_start_state,
1945                Some(cur_epoch_store.epoch_start_state()),
1946            );
1947
1948            let mut validator_components_lock_guard = self.validator_components.lock().await;
1949
1950            // The following code handles 4 different cases, depending on whether the node
1951            // was a validator in the previous epoch, and whether the node is a validator
1952            // in the new epoch.
1953            let new_epoch_store = self
1954                .reconfigure_state(
1955                    &self.state,
1956                    &cur_epoch_store,
1957                    next_epoch_committee.clone(),
1958                    new_epoch_start_state,
1959                    hasher.clone(),
1960                )
1961                .await;
1962
1963            let new_role = new_epoch_store.node_role();
1964
1965            let new_validator_components = if let Some(ValidatorComponents {
1966                validator_server_handle,
1967                validator_overload_monitor_handle,
1968                consensus_manager,
1969                consensus_store_pruner,
1970                consensus_adapter,
1971                checkpoint_metrics,
1972                sui_tx_validator_metrics,
1973                admission_queue,
1974            }) = validator_components_lock_guard.take()
1975            {
1976                info!("Reconfiguring node (was running consensus).");
1977
1978                consensus_manager.shutdown().await;
1979                info!("Consensus has shut down.");
1980
1981                info!("Epoch store finished reconfiguration.");
1982
1983                // No other components should be holding a strong reference to state hasher
1984                // at this point. Confirm here before we swap in the new hasher.
1985                let global_state_hasher_metrics = Arc::into_inner(hasher)
1986                    .expect("Object state hasher should have no other references at this point")
1987                    .metrics();
1988                let new_hasher = Arc::new(GlobalStateHasher::new(
1989                    self.state.get_global_state_hash_store().clone(),
1990                    global_state_hasher_metrics,
1991                ));
1992                let weak_hasher = Arc::downgrade(&new_hasher);
1993                *hasher_guard = Some(new_hasher);
1994
1995                consensus_store_pruner.prune(next_epoch).await;
1996
1997                if new_role.runs_consensus() {
1998                    info!("Restarting consensus as {new_role}");
1999                    Some(
2000                        Self::start_epoch_specific_validator_components(
2001                            &self.config,
2002                            self.state.clone(),
2003                            consensus_adapter,
2004                            self.checkpoint_store.clone(),
2005                            new_epoch_store.clone(),
2006                            self.state_sync_handle.clone(),
2007                            self.randomness_handle.clone(),
2008                            self.randomness_receiver_handle.clone(),
2009                            consensus_manager,
2010                            consensus_store_pruner,
2011                            weak_hasher,
2012                            self.backpressure_manager.clone(),
2013                            validator_server_handle,
2014                            validator_overload_monitor_handle,
2015                            checkpoint_metrics,
2016                            self.metrics.clone(),
2017                            sui_tx_validator_metrics,
2018                            admission_queue,
2019                            new_role,
2020                        )
2021                        .await?,
2022                    )
2023                } else {
2024                    info!(
2025                        "This node has new role {new_role} and no longer runs consensus after reconfiguration"
2026                    );
2027                    None
2028                }
2029            } else {
2030                // No other components should be holding a strong reference to state hasher
2031                // at this point. Confirm here before we swap in the new hasher.
2032                let global_state_hasher_metrics = Arc::into_inner(hasher)
2033                    .expect("Object state hasher should have no other references at this point")
2034                    .metrics();
2035                let new_hasher = Arc::new(GlobalStateHasher::new(
2036                    self.state.get_global_state_hash_store().clone(),
2037                    global_state_hasher_metrics,
2038                ));
2039                let weak_hasher = Arc::downgrade(&new_hasher);
2040                *hasher_guard = Some(new_hasher);
2041
2042                if new_role.runs_consensus() {
2043                    info!("Promoting node to {new_role}, starting consensus components");
2044
2045                    let mut components = Self::construct_validator_components(
2046                        self.config.clone(),
2047                        self.state.clone(),
2048                        Arc::new(next_epoch_committee.clone()),
2049                        new_epoch_store.clone(),
2050                        self.checkpoint_store.clone(),
2051                        self.state_sync_handle.clone(),
2052                        self.randomness_handle.clone(),
2053                        weak_hasher,
2054                        self.backpressure_manager.clone(),
2055                        &self.registry_service,
2056                        self.metrics.clone(),
2057                        self.checkpoint_metrics.clone(),
2058                        new_role,
2059                        self.randomness_receiver_handle.clone(),
2060                    )
2061                    .await?;
2062
2063                    if new_role.is_validator() {
2064                        components.validator_server_handle = Some(
2065                            components
2066                                .validator_server_handle
2067                                .take()
2068                                .unwrap()
2069                                .start()
2070                                .await,
2071                        );
2072
2073                        self.endpoint_manager
2074                            .set_consensus_address_updater(components.consensus_manager.clone());
2075                    }
2076
2077                    Some(components)
2078                } else {
2079                    None
2080                }
2081            };
2082            *validator_components_lock_guard = new_validator_components;
2083
2084            // Force releasing current epoch store DB handle, because the
2085            // Arc<AuthorityPerEpochStore> may linger.
2086            cur_epoch_store.release_db_handles();
2087
2088            if cfg!(msim)
2089                && !matches!(
2090                    self.config
2091                        .authority_store_pruning_config
2092                        .num_epochs_to_retain_for_checkpoints(),
2093                    None | Some(u64::MAX) | Some(0)
2094                )
2095            {
2096                self.state
2097                    .prune_checkpoints_for_eligible_epochs_for_testing(
2098                        self.config.clone(),
2099                        sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2100                    )
2101                    .await?;
2102            }
2103
2104            epoch_store = new_epoch_store;
2105            info!("Reconfiguration finished");
2106        }
2107    }
2108
2109    async fn shutdown(&self) {
2110        if let Some(validator_components) = &*self.validator_components.lock().await {
2111            validator_components.consensus_manager.shutdown().await;
2112        }
2113    }
2114
2115    async fn reconfigure_state(
2116        &self,
2117        state: &Arc<AuthorityState>,
2118        cur_epoch_store: &AuthorityPerEpochStore,
2119        next_epoch_committee: Committee,
2120        next_epoch_start_system_state: EpochStartSystemState,
2121        global_state_hasher: Arc<GlobalStateHasher>,
2122    ) -> Arc<AuthorityPerEpochStore> {
2123        let next_epoch = next_epoch_committee.epoch();
2124
2125        let last_checkpoint = self
2126            .checkpoint_store
2127            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2128            .expect("Error loading last checkpoint for current epoch")
2129            .expect("Could not load last checkpoint for current epoch");
2130
2131        let last_checkpoint_seq = *last_checkpoint.sequence_number();
2132
2133        assert_eq!(
2134            Some(last_checkpoint_seq),
2135            self.checkpoint_store
2136                .get_highest_executed_checkpoint_seq_number()
2137                .expect("Error loading highest executed checkpoint sequence number")
2138        );
2139
2140        let epoch_start_configuration = EpochStartConfiguration::new(
2141            next_epoch_start_system_state,
2142            *last_checkpoint.digest(),
2143            state.get_object_store().as_ref(),
2144            EpochFlag::default_flags_for_new_epoch(&state.config),
2145        )
2146        .expect("EpochStartConfiguration construction cannot fail");
2147
2148        let new_epoch_store = self
2149            .state
2150            .reconfigure(
2151                cur_epoch_store,
2152                self.config.supported_protocol_versions.unwrap(),
2153                next_epoch_committee,
2154                epoch_start_configuration,
2155                global_state_hasher,
2156                &self.config.expensive_safety_check_config,
2157                last_checkpoint_seq,
2158            )
2159            .await
2160            .expect("Reconfigure authority state cannot fail");
2161        info!(next_epoch, "Node State has been reconfigured");
2162        assert_eq!(next_epoch, new_epoch_store.epoch());
2163        self.state.get_reconfig_api().update_epoch_flags_metrics(
2164            cur_epoch_store.epoch_start_config().flags(),
2165            new_epoch_store.epoch_start_config().flags(),
2166        );
2167
2168        new_epoch_store
2169    }
2170
2171    pub fn get_config(&self) -> &NodeConfig {
2172        &self.config
2173    }
2174
2175    pub fn randomness_handle(&self) -> randomness::Handle {
2176        self.randomness_handle.clone()
2177    }
2178
2179    pub fn state_sync_handle(&self) -> state_sync::Handle {
2180        self.state_sync_handle.clone()
2181    }
2182
2183    pub fn endpoint_manager(&self) -> &EndpointManager {
2184        &self.endpoint_manager
2185    }
2186
2187    /// Get a short prefix of a digest for metric labels
2188    fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2189        let digest_str = digest.to_string();
2190        if digest_str.len() >= 8 {
2191            digest_str[0..8].to_string()
2192        } else {
2193            digest_str
2194        }
2195    }
2196
2197    /// Check for previously detected forks and handle them appropriately.
2198    /// For validators with fork recovery config, clear the fork if it matches the recovery config.
2199    /// For all other cases, block node startup if a fork is detected.
2200    async fn check_and_recover_forks(
2201        checkpoint_store: &CheckpointStore,
2202        checkpoint_metrics: &CheckpointMetrics,
2203        fork_recovery: Option<&ForkRecoveryConfig>,
2204    ) -> Result<()> {
2205        // Try to recover from forks if recovery config is provided
2206        if let Some(recovery) = fork_recovery {
2207            Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2208            Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2209        }
2210
2211        if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2212            .get_checkpoint_fork_detected()
2213            .map_err(|e| {
2214                error!("Failed to check for checkpoint fork: {:?}", e);
2215                e
2216            })?
2217        {
2218            Self::handle_checkpoint_fork(
2219                checkpoint_seq,
2220                checkpoint_digest,
2221                checkpoint_metrics,
2222                fork_recovery,
2223            )
2224            .await?;
2225        }
2226        if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2227            .get_transaction_fork_detected()
2228            .map_err(|e| {
2229                error!("Failed to check for transaction fork: {:?}", e);
2230                e
2231            })?
2232        {
2233            Self::handle_transaction_fork(
2234                tx_digest,
2235                expected_effects,
2236                actual_effects,
2237                checkpoint_metrics,
2238                fork_recovery,
2239            )
2240            .await?;
2241        }
2242
2243        Ok(())
2244    }
2245
2246    fn try_recover_checkpoint_fork(
2247        checkpoint_store: &CheckpointStore,
2248        recovery: &ForkRecoveryConfig,
2249    ) -> Result<()> {
2250        // If configured overrides include a checkpoint whose locally computed digest mismatches,
2251        // clear locally computed checkpoints from that sequence (inclusive).
2252        for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2253            let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2254                anyhow::bail!(
2255                    "Invalid checkpoint digest override for seq {}: {}",
2256                    seq,
2257                    expected_digest_str
2258                );
2259            };
2260
2261            if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2262                let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2263                if local_digest != expected_digest {
2264                    info!(
2265                        seq,
2266                        local = %Self::get_digest_prefix(local_digest),
2267                        expected = %Self::get_digest_prefix(expected_digest),
2268                        "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2269                        seq
2270                    );
2271                    checkpoint_store
2272                        .clear_locally_computed_checkpoints_from(*seq)
2273                        .context(
2274                            "Failed to clear locally computed checkpoints from override seq",
2275                        )?;
2276                }
2277            }
2278        }
2279
2280        if let Some((checkpoint_seq, checkpoint_digest)) =
2281            checkpoint_store.get_checkpoint_fork_detected()?
2282            && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2283        {
2284            info!(
2285                "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2286                checkpoint_seq, checkpoint_digest
2287            );
2288            checkpoint_store
2289                .clear_checkpoint_fork_detected()
2290                .expect("Failed to clear checkpoint fork detected marker");
2291        }
2292        Ok(())
2293    }
2294
2295    fn try_recover_transaction_fork(
2296        checkpoint_store: &CheckpointStore,
2297        recovery: &ForkRecoveryConfig,
2298    ) -> Result<()> {
2299        if recovery.transaction_overrides.is_empty() {
2300            return Ok(());
2301        }
2302
2303        if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2304            && recovery
2305                .transaction_overrides
2306                .contains_key(&tx_digest.to_string())
2307        {
2308            info!(
2309                "Fork recovery enabled: clearing transaction fork for tx {:?}",
2310                tx_digest
2311            );
2312            checkpoint_store
2313                .clear_transaction_fork_detected()
2314                .expect("Failed to clear transaction fork detected marker");
2315        }
2316        Ok(())
2317    }
2318
2319    fn get_current_timestamp() -> u64 {
2320        std::time::SystemTime::now()
2321            .duration_since(std::time::SystemTime::UNIX_EPOCH)
2322            .unwrap()
2323            .as_secs()
2324    }
2325
2326    async fn handle_checkpoint_fork(
2327        checkpoint_seq: u64,
2328        checkpoint_digest: CheckpointDigest,
2329        checkpoint_metrics: &CheckpointMetrics,
2330        fork_recovery: Option<&ForkRecoveryConfig>,
2331    ) -> Result<()> {
2332        checkpoint_metrics
2333            .checkpoint_fork_crash_mode
2334            .with_label_values(&[
2335                &checkpoint_seq.to_string(),
2336                &Self::get_digest_prefix(checkpoint_digest),
2337                &Self::get_current_timestamp().to_string(),
2338            ])
2339            .set(1);
2340
2341        let behavior = fork_recovery
2342            .map(|fr| fr.fork_crash_behavior)
2343            .unwrap_or_default();
2344
2345        match behavior {
2346            ForkCrashBehavior::AwaitForkRecovery => {
2347                error!(
2348                    checkpoint_seq = checkpoint_seq,
2349                    checkpoint_digest = ?checkpoint_digest,
2350                    "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2351                );
2352                futures::future::pending::<()>().await;
2353                unreachable!("pending() should never return");
2354            }
2355            ForkCrashBehavior::ReturnError => {
2356                error!(
2357                    checkpoint_seq = checkpoint_seq,
2358                    checkpoint_digest = ?checkpoint_digest,
2359                    "Checkpoint fork detected! Returning error."
2360                );
2361                Err(anyhow::anyhow!(
2362                    "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2363                    checkpoint_seq,
2364                    checkpoint_digest
2365                ))
2366            }
2367        }
2368    }
2369
2370    async fn handle_transaction_fork(
2371        tx_digest: TransactionDigest,
2372        expected_effects_digest: TransactionEffectsDigest,
2373        actual_effects_digest: TransactionEffectsDigest,
2374        checkpoint_metrics: &CheckpointMetrics,
2375        fork_recovery: Option<&ForkRecoveryConfig>,
2376    ) -> Result<()> {
2377        checkpoint_metrics
2378            .transaction_fork_crash_mode
2379            .with_label_values(&[
2380                &Self::get_digest_prefix(tx_digest),
2381                &Self::get_digest_prefix(expected_effects_digest),
2382                &Self::get_digest_prefix(actual_effects_digest),
2383                &Self::get_current_timestamp().to_string(),
2384            ])
2385            .set(1);
2386
2387        let behavior = fork_recovery
2388            .map(|fr| fr.fork_crash_behavior)
2389            .unwrap_or_default();
2390
2391        match behavior {
2392            ForkCrashBehavior::AwaitForkRecovery => {
2393                error!(
2394                    tx_digest = ?tx_digest,
2395                    expected_effects_digest = ?expected_effects_digest,
2396                    actual_effects_digest = ?actual_effects_digest,
2397                    "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2398                );
2399                futures::future::pending::<()>().await;
2400                unreachable!("pending() should never return");
2401            }
2402            ForkCrashBehavior::ReturnError => {
2403                error!(
2404                    tx_digest = ?tx_digest,
2405                    expected_effects_digest = ?expected_effects_digest,
2406                    actual_effects_digest = ?actual_effects_digest,
2407                    "Transaction fork detected! Returning error."
2408                );
2409                Err(anyhow::anyhow!(
2410                    "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2411                    tx_digest,
2412                    expected_effects_digest,
2413                    actual_effects_digest
2414                ))
2415            }
2416        }
2417    }
2418}
2419
2420#[cfg(not(msim))]
2421impl SuiNode {
2422    async fn fetch_jwks(
2423        _authority: AuthorityName,
2424        provider: &OIDCProvider,
2425    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2426        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2427        use sui_types::error::SuiErrorKind;
2428        let client = reqwest::Client::new();
2429        fetch_jwks(provider, &client, true)
2430            .await
2431            .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2432    }
2433}
2434
2435#[cfg(msim)]
2436impl SuiNode {
2437    pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2438        self.sim_state.sim_node.id()
2439    }
2440
2441    pub fn set_safe_mode_expected(&self, new_value: bool) {
2442        info!("Setting safe mode expected to {}", new_value);
2443        self.sim_state
2444            .sim_safe_mode_expected
2445            .store(new_value, Ordering::Relaxed);
2446    }
2447
2448    #[allow(unused_variables)]
2449    async fn fetch_jwks(
2450        authority: AuthorityName,
2451        provider: &OIDCProvider,
2452    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2453        get_jwk_injector()(authority, provider)
2454    }
2455}
2456
2457enum SpawnOnce {
2458    // Mutex is only needed to make SpawnOnce Send
2459    Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2460    #[allow(unused)]
2461    Started(JoinHandle<()>),
2462}
2463
2464impl SpawnOnce {
2465    pub fn new(
2466        ready_rx: oneshot::Receiver<()>,
2467        future: impl Future<Output = ()> + Send + 'static,
2468    ) -> Self {
2469        Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2470    }
2471
2472    pub async fn start(self) -> Self {
2473        match self {
2474            Self::Unstarted(ready_rx, future) => {
2475                let future = future.into_inner();
2476                let handle = tokio::spawn(future);
2477                ready_rx.await.unwrap();
2478                Self::Started(handle)
2479            }
2480            Self::Started(_) => self,
2481        }
2482    }
2483}
2484
2485/// Updates trusted peer addresses in the p2p network (for nodes configured as validators).
2486/// When `prev_epoch_start_state` is provided, validators that are no longer in the committee
2487/// have their Chain addresses cleared.
2488fn update_peer_addresses(
2489    config: &NodeConfig,
2490    endpoint_manager: &EndpointManager,
2491    epoch_start_state: &EpochStartSystemState,
2492    prev_epoch_start_state: Option<&EpochStartSystemState>,
2493) {
2494    if config.consensus_config().is_none() {
2495        return;
2496    }
2497    let new_peers: HashSet<PeerId> = epoch_start_state
2498        .get_validator_as_p2p_peers(config.protocol_public_key())
2499        .into_iter()
2500        .map(|(peer_id, address)| {
2501            endpoint_manager
2502                .update_endpoint(
2503                    EndpointId::P2p(peer_id),
2504                    AddressSource::Chain,
2505                    vec![address],
2506                )
2507                .expect("Updating peer addresses should not fail");
2508            peer_id
2509        })
2510        .collect();
2511
2512    // Clear Chain addresses for validators that left the committee.
2513    if let Some(prev) = prev_epoch_start_state {
2514        for (peer_id, _) in prev.get_validator_as_p2p_peers(config.protocol_public_key()) {
2515            if !new_peers.contains(&peer_id) {
2516                endpoint_manager
2517                    .update_endpoint(EndpointId::P2p(peer_id), AddressSource::Chain, vec![])
2518                    .expect("Clearing peer addresses should not fail");
2519            }
2520        }
2521    }
2522}
2523
2524fn build_kv_store(
2525    state: &Arc<AuthorityState>,
2526    config: &NodeConfig,
2527    registry: &Registry,
2528) -> Result<Arc<TransactionKeyValueStore>> {
2529    let metrics = KeyValueStoreMetrics::new(registry);
2530    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2531
2532    let base_url = &config.transaction_kv_store_read_config.base_url;
2533
2534    if base_url.is_empty() {
2535        info!("no http kv store url provided, using local db only");
2536        return Ok(Arc::new(db_store));
2537    }
2538
2539    let base_url: url::Url = base_url.parse().tap_err(|e| {
2540        error!(
2541            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2542            base_url, e
2543        )
2544    })?;
2545
2546    let network_str = match state.get_chain_identifier().chain() {
2547        Chain::Mainnet => "/mainnet",
2548        _ => {
2549            info!("using local db only for kv store");
2550            return Ok(Arc::new(db_store));
2551        }
2552    };
2553
2554    let base_url = base_url.join(network_str)?.to_string();
2555    let http_store = HttpKVStore::new_kv(
2556        &base_url,
2557        config.transaction_kv_store_read_config.cache_size,
2558        metrics.clone(),
2559    )?;
2560    info!("using local key-value store with fallback to http key-value store");
2561    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2562        db_store,
2563        http_store,
2564        metrics,
2565        "json_rpc_fallback",
2566    )))
2567}
2568
2569async fn build_json_rpc_router(
2570    state: &Arc<AuthorityState>,
2571    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2572    config: &NodeConfig,
2573    prometheus_registry: &Registry,
2574) -> Result<axum::Router> {
2575    let traffic_controller = state.traffic_controller.clone();
2576    let mut server = JsonRpcServerBuilder::new(
2577        env!("CARGO_PKG_VERSION"),
2578        prometheus_registry,
2579        traffic_controller,
2580        config.policy_config.clone(),
2581    );
2582
2583    let kv_store = build_kv_store(state, config, prometheus_registry)?;
2584
2585    let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2586    server.register_module(ReadApi::new(
2587        state.clone(),
2588        kv_store.clone(),
2589        metrics.clone(),
2590    ))?;
2591    server.register_module(CoinReadApi::new(
2592        state.clone(),
2593        kv_store.clone(),
2594        metrics.clone(),
2595    ))?;
2596
2597    // if run_with_range is enabled we want to prevent any transactions
2598    // run_with_range = None is normal operating conditions
2599    if config.run_with_range.is_none() {
2600        server.register_module(TransactionBuilderApi::new(state.clone()))?;
2601    }
2602    server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2603    server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2604
2605    if let Some(transaction_orchestrator) = transaction_orchestrator {
2606        server.register_module(TransactionExecutionApi::new(
2607            state.clone(),
2608            transaction_orchestrator.clone(),
2609            metrics.clone(),
2610        ))?;
2611    }
2612
2613    let name_service_config = if let (
2614        Some(package_address),
2615        Some(registry_id),
2616        Some(reverse_registry_id),
2617    ) = (
2618        config.name_service_package_address,
2619        config.name_service_registry_id,
2620        config.name_service_reverse_registry_id,
2621    ) {
2622        sui_name_service::NameServiceConfig::new(package_address, registry_id, reverse_registry_id)
2623    } else {
2624        match state.get_chain_identifier().chain() {
2625            Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2626            Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2627            Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2628        }
2629    };
2630
2631    server.register_module(IndexerApi::new(
2632        state.clone(),
2633        ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2634        kv_store,
2635        name_service_config,
2636        metrics,
2637        config.indexer_max_subscriptions,
2638    ))?;
2639    server.register_module(MoveUtils::new(state.clone()))?;
2640
2641    let server_type = config.jsonrpc_server_type();
2642
2643    Ok(server.to_router(server_type).await?)
2644}
2645
2646async fn build_http_servers(
2647    state: Arc<AuthorityState>,
2648    store: RocksDbStore,
2649    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2650    config: &NodeConfig,
2651    prometheus_registry: &Registry,
2652    server_version: ServerVersion,
2653    node_role: NodeRole,
2654    embedded_rpc_store: Option<&EmbeddedRpcStore>,
2655) -> Result<(
2656    HttpServers,
2657    Option<tokio::sync::broadcast::Sender<Arc<Checkpoint>>>,
2658)> {
2659    // Validators do not expose these APIs
2660    if !node_role.is_fullnode() {
2661        return Ok((HttpServers::default(), None));
2662    }
2663
2664    info!("starting rpc service with config: {:?}", config.rpc);
2665
2666    let mut router = axum::Router::new();
2667
2668    // The JSON-RPC service can be disabled independently of the gRPC/REST
2669    // service and of JSON-RPC indexing, so that a node can keep indexing
2670    // without exposing the JSON-RPC endpoints.
2671    if config.json_rpc_enabled() {
2672        router = router.merge(
2673            build_json_rpc_router(
2674                &state,
2675                transaction_orchestrator,
2676                config,
2677                prometheus_registry,
2678            )
2679            .await?,
2680        );
2681    } else {
2682        info!("json-rpc service is disabled");
2683    }
2684
2685    // When the embedded rpc-store is active, gate checkpoint delivery on the
2686    // index so a client that waits for a checkpoint can immediately read its
2687    // indexed state (matching the legacy synchronously-committed index).
2688    let indexed_checkpoint = embedded_rpc_store.map(|embedded| embedded.indexed_checkpoint_fn());
2689    let (subscription_service_checkpoint_sender, subscription_service_handle) =
2690        SubscriptionService::build(prometheus_registry, indexed_checkpoint);
2691    let rpc_router = {
2692        // Serve the index read paths from the embedded rpc-store when it
2693        // is enabled, otherwise from the legacy `rpc-index`. Raw chain
2694        // data comes from the perpetual / checkpoint stores either way.
2695        let reader: Arc<dyn RpcStateReader> = match embedded_rpc_store {
2696            Some(embedded) => Arc::new(RpcStoreReadStore::new(
2697                state.clone(),
2698                store,
2699                embedded.reader(),
2700            )),
2701            None => Arc::new(RestReadStore::new(state.clone(), store)),
2702        };
2703        let mut rpc_service = sui_rpc_api::RpcService::new(reader);
2704        rpc_service.with_server_version(server_version);
2705
2706        if let Some(config) = config.rpc.clone() {
2707            config.validate()?;
2708            rpc_service.with_config(config);
2709        }
2710
2711        rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2712        rpc_service.with_subscription_service(subscription_service_handle);
2713
2714        if let Some(transaction_orchestrator) = transaction_orchestrator {
2715            rpc_service.with_executor(transaction_orchestrator.clone())
2716        }
2717
2718        rpc_service.into_router().await
2719    };
2720
2721    let layers = ServiceBuilder::new()
2722        .map_request(|mut request: axum::http::Request<_>| {
2723            if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2724                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2725                request.extensions_mut().insert(axum_connect_info);
2726            }
2727            request
2728        })
2729        .layer(axum::middleware::from_fn(server_timing_middleware))
2730        // Setup a permissive CORS policy
2731        .layer(
2732            tower_http::cors::CorsLayer::new()
2733                .allow_methods([http::Method::GET, http::Method::POST])
2734                .allow_origin(tower_http::cors::Any)
2735                .allow_headers(tower_http::cors::Any)
2736                .expose_headers(tower_http::cors::Any),
2737        );
2738
2739    router = router.merge(rpc_router).layer(layers);
2740
2741    let https = if let Some((tls_config, https_address)) = config
2742        .rpc()
2743        .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2744    {
2745        let https = sui_http::Builder::new()
2746            .tls_single_cert(tls_config.cert(), tls_config.key())
2747            .and_then(|builder| builder.serve(https_address, router.clone()))
2748            .map_err(|e| anyhow::anyhow!(e))?;
2749
2750        info!(
2751            https_address =? https.local_addr(),
2752            "HTTPS rpc server listening on {}",
2753            https.local_addr()
2754        );
2755
2756        Some(https)
2757    } else {
2758        None
2759    };
2760
2761    let http = sui_http::Builder::new()
2762        .serve(&config.json_rpc_address, router)
2763        .map_err(|e| anyhow::anyhow!(e))?;
2764
2765    info!(
2766        http_address =? http.local_addr(),
2767        "HTTP rpc server listening on {}",
2768        http.local_addr()
2769    );
2770
2771    Ok((
2772        HttpServers {
2773            http: Some(http),
2774            https,
2775        },
2776        Some(subscription_service_checkpoint_sender),
2777    ))
2778}
2779
2780#[derive(Default)]
2781struct HttpServers {
2782    #[allow(unused)]
2783    http: Option<sui_http::ServerHandle>,
2784    #[allow(unused)]
2785    https: Option<sui_http::ServerHandle>,
2786}
2787
2788#[cfg(test)]
2789mod tests {
2790    use super::*;
2791    use prometheus::Registry;
2792    use std::collections::BTreeMap;
2793    use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2794    use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2795    use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2796
2797    #[tokio::test]
2798    async fn test_fork_error_and_recovery_both_paths() {
2799        let checkpoint_store = CheckpointStore::new_for_tests();
2800        let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2801
2802        // ---------- Checkpoint fork path ----------
2803        let seq_num = 42;
2804        let digest = CheckpointDigest::random();
2805        checkpoint_store
2806            .record_checkpoint_fork_detected(seq_num, digest)
2807            .unwrap();
2808
2809        let fork_recovery = ForkRecoveryConfig {
2810            transaction_overrides: Default::default(),
2811            checkpoint_overrides: Default::default(),
2812            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2813        };
2814
2815        let r = SuiNode::check_and_recover_forks(
2816            &checkpoint_store,
2817            &checkpoint_metrics,
2818            Some(&fork_recovery),
2819        )
2820        .await;
2821        assert!(r.is_err());
2822        assert!(
2823            r.unwrap_err()
2824                .to_string()
2825                .contains("Checkpoint fork detected")
2826        );
2827
2828        let mut checkpoint_overrides = BTreeMap::new();
2829        checkpoint_overrides.insert(seq_num, digest.to_string());
2830        let fork_recovery_with_override = ForkRecoveryConfig {
2831            transaction_overrides: Default::default(),
2832            checkpoint_overrides,
2833            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2834        };
2835        let r = SuiNode::check_and_recover_forks(
2836            &checkpoint_store,
2837            &checkpoint_metrics,
2838            Some(&fork_recovery_with_override),
2839        )
2840        .await;
2841        assert!(r.is_ok());
2842        assert!(
2843            checkpoint_store
2844                .get_checkpoint_fork_detected()
2845                .unwrap()
2846                .is_none()
2847        );
2848
2849        // ---------- Transaction fork path ----------
2850        let tx_digest = TransactionDigest::random();
2851        let expected_effects = TransactionEffectsDigest::random();
2852        let actual_effects = TransactionEffectsDigest::random();
2853        checkpoint_store
2854            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2855            .unwrap();
2856
2857        let fork_recovery = ForkRecoveryConfig {
2858            transaction_overrides: Default::default(),
2859            checkpoint_overrides: Default::default(),
2860            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2861        };
2862        let r = SuiNode::check_and_recover_forks(
2863            &checkpoint_store,
2864            &checkpoint_metrics,
2865            Some(&fork_recovery),
2866        )
2867        .await;
2868        assert!(r.is_err());
2869        assert!(
2870            r.unwrap_err()
2871                .to_string()
2872                .contains("Transaction fork detected")
2873        );
2874
2875        let mut transaction_overrides = BTreeMap::new();
2876        transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2877        let fork_recovery_with_override = ForkRecoveryConfig {
2878            transaction_overrides,
2879            checkpoint_overrides: Default::default(),
2880            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2881        };
2882        let r = SuiNode::check_and_recover_forks(
2883            &checkpoint_store,
2884            &checkpoint_metrics,
2885            Some(&fork_recovery_with_override),
2886        )
2887        .await;
2888        assert!(r.is_ok());
2889        assert!(
2890            checkpoint_store
2891                .get_transaction_fork_detected()
2892                .unwrap()
2893                .is_none()
2894        );
2895    }
2896}