sui_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::admission_queue::{
29    AdmissionQueueContext, AdmissionQueueManager, AdmissionQueueMetrics,
30};
31use sui_core::authority::ExecutionEnv;
32use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
33use sui_core::authority::backpressure::BackpressureManager;
34use sui_core::authority::epoch_start_configuration::EpochFlag;
35use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
36use sui_core::consensus_adapter::ConsensusClient;
37use sui_core::consensus_manager::UpdatableConsensusClient;
38use sui_core::epoch::randomness::RandomnessManager;
39use sui_core::execution_cache::build_execution_cache;
40use sui_core::randomness_round_receiver::{RandomnessRoundReceiver, RandomnessRoundReceiverHandle};
41use sui_network::endpoint_manager::{AddressSource, EndpointId};
42use sui_network::validator::server::SUI_TLS_SERVER_NAME;
43use sui_types::full_checkpoint_content::Checkpoint;
44use sui_types::node_role::NodeRole;
45
46use sui_core::global_state_hasher::GlobalStateHashMetrics;
47use sui_core::storage::RestReadStore;
48use sui_json_rpc::bridge_api::BridgeReadApi;
49use sui_json_rpc_api::JsonRpcMetrics;
50use sui_network::randomness;
51use sui_rpc_api::RpcMetrics;
52use sui_rpc_api::ServerVersion;
53use sui_rpc_api::subscription::SubscriptionService;
54use sui_types::base_types::ConciseableName;
55use sui_types::crypto::RandomnessRound;
56use sui_types::digests::{
57    ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
58};
59use sui_types::messages_consensus::AuthorityCapabilitiesV2;
60use sui_types::sui_system_state::SuiSystemState;
61use tap::tap::TapFallible;
62use tokio::sync::oneshot;
63use tokio::sync::{Mutex, broadcast, mpsc};
64use tokio::task::JoinHandle;
65use tower::ServiceBuilder;
66use tracing::{Instrument, error_span, info};
67use tracing::{debug, error, warn};
68
69// Logs at debug level in test configuration, info level otherwise.
70// JWK logs cause significant volume in tests, but are insignificant in prod,
71// so we keep them at info
72macro_rules! jwk_log {
73    ($($arg:tt)+) => {
74        if in_test_configuration() {
75            debug!($($arg)+);
76        } else {
77            info!($($arg)+);
78        }
79    };
80}
81
82use fastcrypto_zkp::bn254::zk_login::JWK;
83pub use handle::SuiNodeHandle;
84use mysten_metrics::{RegistryService, spawn_monitored_task};
85use mysten_service::server_timing::server_timing_middleware;
86use sui_config::node::{DBCheckpointConfig, RunWithRange};
87use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
88use sui_config::node_config_metrics::NodeConfigMetrics;
89use sui_config::{ConsensusConfig, NodeConfig};
90use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
91use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
92use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
93use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
94use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
95use sui_core::authority_aggregator::AuthorityAggregator;
96use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
97use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
98use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
99use sui_core::checkpoints::{
100    CheckpointMetrics, CheckpointOutput, CheckpointService, CheckpointStore, LogCheckpointOutput,
101    SendCheckpointToStateSync, SubmitCheckpointToConsensus,
102};
103use sui_core::consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics};
104use sui_core::consensus_manager::ConsensusManager;
105use sui_core::consensus_throughput_calculator::ConsensusThroughputCalculator;
106use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
107use sui_core::db_checkpoint_handler::DBCheckpointHandler;
108use sui_core::epoch::committee_store::CommitteeStore;
109use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
110use sui_core::epoch::epoch_metrics::EpochMetrics;
111use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
112use sui_core::global_state_hasher::GlobalStateHasher;
113use sui_core::jsonrpc_index::IndexStore;
114use sui_core::module_cache_metrics::ResolverMetrics;
115use sui_core::overload_monitor::overload_monitor;
116use sui_core::rpc_index::RpcIndexStore;
117use sui_core::rpc_store_embed::EmbeddedRpcStore;
118use sui_core::signature_verifier::SignatureVerifierMetrics;
119use sui_core::storage::RocksDbStore;
120use sui_core::storage::RpcStoreReadStore;
121use sui_core::transaction_orchestrator::TransactionOrchestrator;
122use sui_core::{
123    authority::{AuthorityState, AuthorityStore},
124    authority_client::NetworkAuthorityClient,
125};
126use sui_json_rpc::JsonRpcServerBuilder;
127use sui_json_rpc::coin_api::CoinReadApi;
128use sui_json_rpc::governance_api::GovernanceReadApi;
129use sui_json_rpc::indexer_api::IndexerApi;
130use sui_json_rpc::move_utils::MoveUtils;
131use sui_json_rpc::read_api::ReadApi;
132use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
133use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
134use sui_macros::fail_point;
135use sui_macros::{fail_point_async, replay_log};
136use sui_network::api::ValidatorServer;
137use sui_network::discovery;
138use sui_network::endpoint_manager::EndpointManager;
139use sui_network::state_sync;
140use sui_network::validator::server::ServerBuilder;
141use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
142use sui_snapshot::uploader::StateSnapshotUploader;
143use sui_storage::{
144    http_key_value_store::HttpKVStore,
145    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
146    key_value_store_metrics::KeyValueStoreMetrics,
147};
148use sui_types::base_types::{AuthorityName, EpochId};
149use sui_types::committee::Committee;
150use sui_types::crypto::KeypairTraits;
151use sui_types::error::{SuiError, SuiResult};
152use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
153use sui_types::storage::RpcStateReader;
154use sui_types::sui_system_state::SuiSystemStateTrait;
155use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
156use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
157use sui_types::supported_protocol_versions::SupportedProtocolVersions;
158use typed_store::DBMetrics;
159use typed_store::rocks::default_db_options;
160
161use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
162
163pub mod admin;
164pub mod db_shell;
165mod handle;
166pub mod metrics;
167
168pub struct ValidatorComponents {
169    validator_server_handle: Option<SpawnOnce>,
170    validator_overload_monitor_handle: Option<JoinHandle<()>>,
171    consensus_manager: Arc<ConsensusManager>,
172    consensus_store_pruner: ConsensusStorePruner,
173    consensus_adapter: Arc<ConsensusAdapter>,
174    checkpoint_metrics: Arc<CheckpointMetrics>,
175    sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
176    admission_queue: Option<AdmissionQueueContext>,
177}
178pub struct P2pComponents {
179    p2p_network: Network,
180    known_peers: HashMap<PeerId, String>,
181    discovery_handle: discovery::Handle,
182    state_sync_handle: state_sync::Handle,
183    randomness_handle: randomness::Handle,
184    endpoint_manager: EndpointManager,
185}
186
187#[cfg(msim)]
188mod simulator {
189    use std::sync::atomic::AtomicBool;
190    use sui_types::error::SuiErrorKind;
191
192    use super::*;
193    pub(super) struct SimState {
194        pub sim_node: sui_simulator::runtime::NodeHandle,
195        pub sim_safe_mode_expected: AtomicBool,
196        _leak_detector: sui_simulator::NodeLeakDetector,
197    }
198
199    impl Default for SimState {
200        fn default() -> Self {
201            Self {
202                sim_node: sui_simulator::runtime::NodeHandle::current(),
203                sim_safe_mode_expected: AtomicBool::new(false),
204                _leak_detector: sui_simulator::NodeLeakDetector::new(),
205            }
206        }
207    }
208
209    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
210        + Send
211        + Sync
212        + 'static;
213
214    fn default_fetch_jwks(
215        _authority: AuthorityName,
216        _provider: &OIDCProvider,
217    ) -> SuiResult<Vec<(JwkId, JWK)>> {
218        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
219        // Just load a default Twitch jwk for testing.
220        parse_jwks(
221            sui_types::zk_login_util::DEFAULT_JWK_BYTES,
222            &OIDCProvider::Twitch,
223            true,
224        )
225        .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
226    }
227
228    thread_local! {
229        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
230    }
231
232    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
233        JWK_INJECTOR.with(|injector| injector.borrow().clone())
234    }
235
236    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
237        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
238    }
239}
240
241#[cfg(msim)]
242pub use simulator::set_jwk_injector;
243#[cfg(msim)]
244use simulator::*;
245use sui_core::authority::authority_store_pruner::PrunerWatermarks;
246use sui_core::{
247    consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
248};
249
250const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
251
252pub struct SuiNode {
253    config: NodeConfig,
254    validator_components: Mutex<Option<ValidatorComponents>>,
255
256    /// The http servers responsible for serving RPC traffic (gRPC and JSON-RPC)
257    #[allow(unused)]
258    http_servers: HttpServers,
259
260    state: Arc<AuthorityState>,
261    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
262    registry_service: RegistryService,
263    metrics: Arc<SuiNodeMetrics>,
264    checkpoint_metrics: Arc<CheckpointMetrics>,
265
266    _discovery: discovery::Handle,
267    _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
268    state_sync_handle: state_sync::Handle,
269    randomness_handle: randomness::Handle,
270    checkpoint_store: Arc<CheckpointStore>,
271    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
272
273    /// Broadcast channel to send the starting system state for the next epoch.
274    end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
275
276    /// EndpointManager for updating peer network addresses.
277    endpoint_manager: EndpointManager,
278
279    backpressure_manager: Arc<BackpressureManager>,
280
281    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
282
283    #[cfg(msim)]
284    sim_state: SimState,
285
286    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
287    // Channel to allow signaling upstream to shutdown sui-node
288    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
289
290    /// Handle shared with RandomnessManager and the consensus layer.
291    randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
292
293    /// AuthorityAggregator of the network, created at start and beginning of each epoch.
294    /// Use ArcSwap so that we could mutate it without taking mut reference.
295    // TODO: Eventually we can make this auth aggregator a shared reference so that this
296    // update will automatically propagate to other uses.
297    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
298
299    subscription_service_checkpoint_sender: Option<tokio::sync::broadcast::Sender<Arc<Checkpoint>>>,
300
301    /// The embedded `sui-rpc-store`, present when the node is configured
302    /// with `use_experimental_rpc_store`. Held for the node's lifetime
303    /// so its tip indexer keeps running (dropping it aborts the indexer).
304    /// Exposed through [`SuiNode::embedded_rpc_store`] for introspection.
305    embedded_rpc_store: Option<EmbeddedRpcStore>,
306}
307
308impl fmt::Debug for SuiNode {
309    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
310        f.debug_struct("SuiNode")
311            .field("name", &self.state.name.concise())
312            .finish()
313    }
314}
315
316static MAX_JWK_KEYS_PER_FETCH: usize = 100;
317
318impl SuiNode {
319    pub async fn start(
320        config: NodeConfig,
321        registry_service: RegistryService,
322    ) -> Result<Arc<SuiNode>> {
323        Self::start_async(
324            config,
325            registry_service,
326            ServerVersion::new("sui-node", "unknown"),
327        )
328        .await
329    }
330
331    fn start_jwk_updater(
332        config: &NodeConfig,
333        metrics: Arc<SuiNodeMetrics>,
334        authority: AuthorityName,
335        epoch_store: Arc<AuthorityPerEpochStore>,
336        consensus_adapter: Arc<ConsensusAdapter>,
337    ) {
338        let epoch = epoch_store.epoch();
339
340        let supported_providers = config
341            .zklogin_oauth_providers
342            .get(&epoch_store.get_chain_identifier().chain())
343            .unwrap_or(&BTreeSet::new())
344            .iter()
345            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
346            .collect::<Vec<_>>();
347
348        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
349
350        info!(
351            ?fetch_interval,
352            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
353        );
354
355        fn validate_jwk(
356            metrics: &Arc<SuiNodeMetrics>,
357            provider: &OIDCProvider,
358            id: &JwkId,
359            jwk: &JWK,
360        ) -> bool {
361            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
362                warn!(
363                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
364                    id.iss, provider
365                );
366                metrics
367                    .invalid_jwks
368                    .with_label_values(&[&provider.to_string()])
369                    .inc();
370                return false;
371            };
372
373            if iss_provider != *provider {
374                warn!(
375                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
376                    id.iss, provider, iss_provider
377                );
378                metrics
379                    .invalid_jwks
380                    .with_label_values(&[&provider.to_string()])
381                    .inc();
382                return false;
383            }
384
385            if !check_total_jwk_size(id, jwk) {
386                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
387                metrics
388                    .invalid_jwks
389                    .with_label_values(&[&provider.to_string()])
390                    .inc();
391                return false;
392            }
393
394            true
395        }
396
397        // metrics is:
398        //  pub struct SuiNodeMetrics {
399        //      pub jwk_requests: IntCounterVec,
400        //      pub jwk_request_errors: IntCounterVec,
401        //      pub total_jwks: IntCounterVec,
402        //      pub unique_jwks: IntCounterVec,
403        //  }
404
405        for p in supported_providers.into_iter() {
406            let provider_str = p.to_string();
407            let epoch_store = epoch_store.clone();
408            let consensus_adapter = consensus_adapter.clone();
409            let metrics = metrics.clone();
410            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
411                async move {
412                    // note: restart-safe de-duplication happens after consensus, this is
413                    // just best-effort to reduce unneeded submissions.
414                    let mut seen = HashSet::new();
415                    loop {
416                        jwk_log!("fetching JWK for provider {:?}", p);
417                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
418                        match Self::fetch_jwks(authority, &p).await {
419                            Err(e) => {
420                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
421                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
422                                // Retry in 30 seconds
423                                tokio::time::sleep(Duration::from_secs(30)).await;
424                                continue;
425                            }
426                            Ok(mut keys) => {
427                                metrics.total_jwks
428                                    .with_label_values(&[&provider_str])
429                                    .inc_by(keys.len() as u64);
430
431                                keys.retain(|(id, jwk)| {
432                                    validate_jwk(&metrics, &p, id, jwk) &&
433                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
434                                    seen.insert((id.clone(), jwk.clone()))
435                                });
436
437                                metrics.unique_jwks
438                                    .with_label_values(&[&provider_str])
439                                    .inc_by(keys.len() as u64);
440
441                                // prevent oauth providers from sending too many keys,
442                                // inadvertently or otherwise
443                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
444                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
445                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
446                                }
447
448                                for (id, jwk) in keys.into_iter() {
449                                    jwk_log!("Submitting JWK to consensus: {:?}", id);
450
451                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
452                                    consensus_adapter.submit(txn, None, &epoch_store, None, None)
453                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
454                                        .ok();
455                                }
456                            }
457                        }
458                        tokio::time::sleep(fetch_interval).await;
459                    }
460                }
461                .instrument(error_span!("jwk_updater_task", epoch)),
462            ));
463        }
464    }
465
466    pub async fn start_async(
467        config: NodeConfig,
468        registry_service: RegistryService,
469        server_version: ServerVersion,
470    ) -> Result<Arc<SuiNode>> {
471        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
472        let mut config = config.clone();
473        if config.supported_protocol_versions.is_none() {
474            info!(
475                "populating config.supported_protocol_versions with default {:?}",
476                SupportedProtocolVersions::SYSTEM_DEFAULT
477            );
478            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
479        }
480
481        let run_with_range = config.run_with_range;
482        let prometheus_registry = registry_service.default_registry();
483        let node_role = config.intended_node_role();
484
485        info!(node =? config.protocol_public_key(),
486            "Initializing sui-node listening on {} with role {:?}", config.network_address, node_role
487        );
488
489        // Initialize metrics to track db usage before creating any stores
490        DBMetrics::init(registry_service.clone());
491
492        // Initialize db sync-to-disk setting from config (falls back to env var if not set)
493        typed_store::init_write_sync(config.enable_db_sync_to_disk);
494
495        // Initialize Mysten metrics.
496        mysten_metrics::init_metrics(&prometheus_registry);
497        // Unsupported (because of the use of static variable) and unnecessary in simtests.
498        #[cfg(not(msim))]
499        mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
500
501        let genesis = config.genesis()?.clone();
502
503        let secret = Arc::pin(config.protocol_key_pair().copy());
504        let genesis_committee = genesis.committee();
505        let committee_store = Arc::new(CommitteeStore::new(
506            config.db_path().join("epochs"),
507            &genesis_committee,
508            None,
509        ));
510
511        let pruner_watermarks = Arc::new(PrunerWatermarks::default());
512        let checkpoint_store = CheckpointStore::new(
513            &config.db_path().join("checkpoints"),
514            pruner_watermarks.clone(),
515        );
516        let checkpoint_metrics = CheckpointMetrics::new(&registry_service.default_registry());
517
518        if node_role.runs_consensus() {
519            Self::check_and_recover_forks(
520                &checkpoint_store,
521                &checkpoint_metrics,
522                config.fork_recovery.as_ref(),
523            )
524            .await?;
525        }
526
527        // By default, only enable write stall on nodes that run consensus.
528        let enable_write_stall = config
529            .enable_db_write_stall
530            .unwrap_or(node_role.runs_consensus());
531        // The tidehunter objects compactor retains only the latest version per
532        // ObjectID and is mutually exclusive with the object pruner. Enable it
533        // for validators (which always disable the pruner), and also for any
534        // node configured with `num_epochs_to_retain = 0` — that aggressive
535        // setting is what the compactor replaces. The pruner is force-disabled
536        // in `AuthorityStorePruner::new` whenever this is true.
537        let enable_objects_compactor = node_role.is_validator()
538            || config.authority_store_pruning_config.num_epochs_to_retain == 0;
539        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
540            enable_write_stall,
541            enable_objects_compactor,
542        };
543        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
544            &config.db_store_path(),
545            Some(perpetual_tables_options),
546            Some(pruner_watermarks.epoch_id.clone()),
547        ));
548        let is_genesis = perpetual_tables
549            .database_is_empty()
550            .expect("Database read should not fail at init.");
551
552        let backpressure_manager =
553            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
554
555        let store =
556            AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
557
558        let cur_epoch = store.get_recovery_epoch_at_restart()?;
559        let committee = committee_store
560            .get_committee(&cur_epoch)?
561            .expect("Committee of the current epoch must exist");
562        let epoch_start_configuration = store
563            .get_epoch_start_configuration()?
564            .expect("EpochStartConfiguration of the current epoch must exist");
565        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
566        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
567
568        let cache_traits = build_execution_cache(
569            &config.execution_cache,
570            &prometheus_registry,
571            &store,
572            backpressure_manager.clone(),
573        );
574
575        let auth_agg = {
576            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
577            Arc::new(ArcSwap::new(Arc::new(
578                AuthorityAggregator::new_from_epoch_start_state(
579                    epoch_start_configuration.epoch_start_state(),
580                    &committee_store,
581                    safe_client_metrics_base,
582                ),
583            )))
584        };
585
586        let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
587        let chain = match config.chain_override_for_testing {
588            Some(chain) => chain,
589            None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
590        };
591
592        let highest_executed_checkpoint = checkpoint_store
593            .get_highest_executed_checkpoint_seq_number()
594            .expect("checkpoint store read cannot fail")
595            .unwrap_or(0);
596
597        let previous_epoch_last_checkpoint = if cur_epoch == 0 {
598            0
599        } else {
600            checkpoint_store
601                .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
602                .expect("checkpoint store read cannot fail")
603                .unwrap_or(highest_executed_checkpoint)
604        };
605
606        let epoch_options = default_db_options().optimize_db_for_write_throughput(4, false);
607        let epoch_store = AuthorityPerEpochStore::new(
608            config.protocol_public_key(),
609            committee.clone(),
610            &config.db_store_path(),
611            Some(epoch_options.options),
612            EpochMetrics::new(&registry_service.default_registry()),
613            epoch_start_configuration,
614            cache_traits.backing_package_store.clone(),
615            cache_traits.object_store.clone(),
616            cache_metrics,
617            signature_verifier_metrics,
618            &config.expensive_safety_check_config,
619            (chain_id, chain),
620            highest_executed_checkpoint,
621            previous_epoch_last_checkpoint,
622            Arc::new(SubmittedTransactionCacheMetrics::new(
623                &registry_service.default_registry(),
624            )),
625            config.fullnode_sync_mode,
626        )?;
627
628        info!("created epoch store");
629
630        replay_log!(
631            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
632            epoch_store.epoch(),
633            epoch_store.protocol_config()
634        );
635
636        // the database is empty at genesis time
637        if is_genesis {
638            info!("checking SUI conservation at genesis");
639            // When we are opening the db table, the only time when it's safe to
640            // check SUI conservation is at genesis. Otherwise we may be in the middle of
641            // an epoch and the SUI conservation check will fail. This also initialize
642            // the expected_network_sui_amount table.
643            cache_traits
644                .reconfig_api
645                .expensive_check_sui_conservation(&epoch_store)
646                .expect("SUI conservation check cannot fail at genesis");
647        }
648
649        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
650        let default_buffer_stake = epoch_store
651            .protocol_config()
652            .buffer_stake_for_protocol_upgrade_bps();
653        if effective_buffer_stake != default_buffer_stake {
654            warn!(
655                ?effective_buffer_stake,
656                ?default_buffer_stake,
657                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
658            );
659        }
660
661        checkpoint_store.insert_genesis_checkpoint(
662            genesis.checkpoint(),
663            genesis.checkpoint_contents().clone(),
664            &epoch_store,
665        );
666
667        info!("creating state sync store");
668        let state_sync_store = RocksDbStore::new(
669            cache_traits.clone(),
670            committee_store.clone(),
671            checkpoint_store.clone(),
672        );
673
674        let index_store =
675            if node_role.should_enable_index_processing() && config.enable_index_processing {
676                info!("creating jsonrpc index store");
677                Some(Arc::new(IndexStore::new(
678                    config.db_path().join("indexes"),
679                    &prometheus_registry,
680                    epoch_store
681                        .protocol_config()
682                        .max_move_identifier_len_as_option(),
683                    config.remove_deprecated_tables,
684                )))
685            } else {
686                None
687            };
688
689        let chain_identifier = epoch_store.get_chain_identifier();
690
691        // The embedded `sui-rpc-store` and the legacy `rpc-index` are
692        // mutually exclusive index backends; selecting the experimental
693        // store skips building the old index and serves the index read
694        // paths from the embedded store instead.
695        let (rpc_index, mut embedded_rpc_store) = if node_role.should_enable_index_processing()
696            && config.rpc().is_some_and(|rpc| rpc.enable_indexing())
697        {
698            if config
699                .rpc()
700                .is_some_and(|rpc| rpc.use_experimental_rpc_store())
701            {
702                info!("creating embedded rpc-store");
703                // The tip indexer pulls checkpoints from the node's local
704                // checkpoint / perpetual stores via a dedicated read handle.
705                let ingestion_source = RocksDbStore::new(
706                    cache_traits.clone(),
707                    committee_store.clone(),
708                    checkpoint_store.clone(),
709                );
710                let embedded_rpc_store = EmbeddedRpcStore::bootstrap(
711                    &config,
712                    &store,
713                    &checkpoint_store,
714                    ingestion_source,
715                    chain_identifier,
716                    &prometheus_registry,
717                )
718                .await?;
719                (None, Some(embedded_rpc_store))
720            } else {
721                info!("creating rpc index store");
722                let rpc_index = Arc::new(
723                    RpcIndexStore::new(
724                        &config.db_path(),
725                        &store,
726                        &checkpoint_store,
727                        &epoch_store,
728                        &cache_traits.backing_package_store,
729                        config.rpc().cloned().unwrap_or_default(),
730                    )
731                    .await,
732                );
733                (Some(rpc_index), None)
734            }
735        } else {
736            (None, None)
737        };
738
739        info!("creating archive reader");
740        // Create network
741        let (randomness_tx, randomness_rx) = mpsc::channel(
742            config
743                .p2p_config
744                .randomness
745                .clone()
746                .unwrap_or_default()
747                .mailbox_capacity(),
748        );
749        let P2pComponents {
750            p2p_network,
751            known_peers,
752            discovery_handle,
753            state_sync_handle,
754            randomness_handle,
755            endpoint_manager,
756        } = Self::create_p2p_network(
757            &config,
758            state_sync_store.clone(),
759            chain_identifier,
760            randomness_tx,
761            &prometheus_registry,
762        )?;
763
764        // Inject configured peer address overrides.
765        for peer in &config.p2p_config.peer_address_overrides {
766            endpoint_manager
767                .update_endpoint(
768                    EndpointId::P2p(peer.peer_id),
769                    AddressSource::Config,
770                    peer.addresses.clone(),
771                )
772                .expect("Updating peer address overrides should not fail");
773        }
774
775        // Send initial peer addresses to the p2p network.
776        update_peer_addresses(
777            &config,
778            &endpoint_manager,
779            epoch_store.epoch_start_state(),
780            None,
781        );
782
783        info!("start snapshot upload");
784        // Start uploading state snapshot to remote store
785        let state_snapshot_handle = Self::start_state_snapshot(
786            &config,
787            &prometheus_registry,
788            checkpoint_store.clone(),
789            chain_identifier,
790        )?;
791
792        // Start uploading db checkpoints to remote store
793        info!("start db checkpoint");
794        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
795            &config,
796            &prometheus_registry,
797            state_snapshot_handle.is_some(),
798        )?;
799
800        if !epoch_store
801            .protocol_config()
802            .simplified_unwrap_then_delete()
803        {
804            // We cannot prune tombstones if simplified_unwrap_then_delete is not enabled.
805            config
806                .authority_store_pruning_config
807                .set_killswitch_tombstone_pruning(true);
808        }
809
810        let authority_name = config.protocol_public_key();
811
812        info!("create authority state");
813        let state = AuthorityState::new(
814            authority_name,
815            secret,
816            config.supported_protocol_versions.unwrap(),
817            store.clone(),
818            cache_traits.clone(),
819            epoch_store.clone(),
820            committee_store.clone(),
821            index_store.clone(),
822            rpc_index,
823            embedded_rpc_store.as_ref().map(|embedded| embedded.store()),
824            checkpoint_store.clone(),
825            &prometheus_registry,
826            genesis.objects(),
827            &db_checkpoint_config,
828            config.clone(),
829            chain_identifier,
830            config.policy_config.clone(),
831            config.firewall_config.clone(),
832            pruner_watermarks,
833        )
834        .await;
835        // ensure genesis txn was executed
836        if epoch_store.epoch() == 0 {
837            let txn = &genesis.transaction();
838            let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
839            let transaction =
840                sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
841                    sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
842                        genesis.transaction().data().clone(),
843                        sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
844                    ),
845                );
846            let _enter = span.enter();
847            state
848                .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
849                .unwrap();
850        }
851
852        // Start the loop that receives new randomness and generates transactions for it.
853        // The returned is long-lived (node lifetime).
854        let randomness_receiver_handle =
855            RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
856
857        let (end_of_epoch_channel, end_of_epoch_receiver) =
858            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
859
860        let transaction_orchestrator = if node_role.is_fullnode() && run_with_range.is_none() {
861            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
862                auth_agg.load_full(),
863                state.clone(),
864                end_of_epoch_receiver,
865                &config.db_path(),
866                &prometheus_registry,
867                &config,
868            )))
869        } else {
870            None
871        };
872
873        let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
874            state.clone(),
875            state_sync_store,
876            &transaction_orchestrator.clone(),
877            &config,
878            &prometheus_registry,
879            server_version,
880            node_role,
881            embedded_rpc_store.as_ref(),
882        )
883        .await?;
884
885        // Start the embedded rpc-store's tip indexer. It follows the tip
886        // via the checkpoint executor's broadcast stream and backfills
887        // any gap from the perpetual store. Spawned on a background task
888        // (see `spawn_indexer`) so node startup does not block on the
889        // first checkpoint, which the executor only produces after this
890        // function returns.
891        if let Some(embedded) = embedded_rpc_store.as_mut() {
892            embedded.spawn_indexer(
893                subscription_service_checkpoint_sender.clone(),
894                prometheus_registry.clone(),
895            );
896        }
897
898        let global_state_hasher = Arc::new(GlobalStateHasher::new(
899            cache_traits.global_state_hash_store.clone(),
900            GlobalStateHashMetrics::new(&prometheus_registry),
901        ));
902
903        let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
904            "sui",
905            &registry_service.default_registry(),
906        );
907
908        let connection_monitor_handle =
909            mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
910                p2p_network.downgrade(),
911                Arc::new(network_connection_metrics),
912                known_peers,
913            );
914
915        let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));
916
917        sui_node_metrics
918            .binary_max_protocol_version
919            .set(ProtocolVersion::MAX.as_u64() as i64);
920        sui_node_metrics
921            .configured_max_protocol_version
922            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
923
924        let node_role = epoch_store.node_role();
925        let validator_components = if node_role.runs_consensus() {
926            let mut components = Self::construct_validator_components(
927                config.clone(),
928                state.clone(),
929                committee,
930                epoch_store.clone(),
931                checkpoint_store.clone(),
932                state_sync_handle.clone(),
933                randomness_handle.clone(),
934                Arc::downgrade(&global_state_hasher),
935                backpressure_manager.clone(),
936                &registry_service,
937                sui_node_metrics.clone(),
938                checkpoint_metrics.clone(),
939                node_role,
940                randomness_receiver_handle.clone(),
941            )
942            .await?;
943
944            if node_role.is_validator() {
945                components
946                    .consensus_adapter
947                    .recover_end_of_publish(&epoch_store);
948
949                // Start the gRPC server
950                components.validator_server_handle = Some(
951                    components
952                        .validator_server_handle
953                        .take()
954                        .unwrap()
955                        .start()
956                        .await,
957                );
958
959                // Set the consensus address updater so that we can update the consensus peer addresses when requested.
960                endpoint_manager
961                    .set_consensus_address_updater(components.consensus_manager.clone());
962            } else {
963                info!("Starting node as Observer — connecting to configured peers");
964            }
965
966            Some(components)
967        } else {
968            None
969        };
970
971        // setup shutdown channel
972        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
973
974        let node = Self {
975            config,
976            validator_components: Mutex::new(validator_components),
977            http_servers,
978            state,
979            transaction_orchestrator,
980            registry_service,
981            metrics: sui_node_metrics,
982            checkpoint_metrics,
983
984            _discovery: discovery_handle,
985            _connection_monitor_handle: connection_monitor_handle,
986            state_sync_handle,
987            randomness_handle,
988            checkpoint_store,
989            global_state_hasher: Mutex::new(Some(global_state_hasher)),
990            end_of_epoch_channel,
991            endpoint_manager,
992            backpressure_manager,
993
994            _db_checkpoint_handle: db_checkpoint_handle,
995
996            #[cfg(msim)]
997            sim_state: Default::default(),
998
999            _state_snapshot_uploader_handle: state_snapshot_handle,
1000            shutdown_channel_tx: shutdown_channel,
1001            randomness_receiver_handle,
1002
1003            auth_agg,
1004            subscription_service_checkpoint_sender,
1005            embedded_rpc_store,
1006        };
1007
1008        info!("SuiNode started!");
1009        let node = Arc::new(node);
1010        let node_copy = node.clone();
1011        spawn_monitored_task!(async move {
1012            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
1013            if let Err(error) = result {
1014                warn!("Reconfiguration finished with error {:?}", error);
1015            }
1016        });
1017
1018        Ok(node)
1019    }
1020
1021    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
1022        self.end_of_epoch_channel.subscribe()
1023    }
1024
1025    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
1026        self.shutdown_channel_tx.subscribe()
1027    }
1028
1029    pub fn current_epoch_for_testing(&self) -> EpochId {
1030        self.state.current_epoch_for_testing()
1031    }
1032
1033    pub fn db_checkpoint_path(&self) -> PathBuf {
1034        self.config.db_checkpoint_path()
1035    }
1036
1037    // Init reconfig process by starting to reject user certs
1038    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
1039        info!("close_epoch (current epoch = {})", epoch_store.epoch());
1040        self.validator_components
1041            .lock()
1042            .await
1043            .as_ref()
1044            .ok_or_else(|| SuiError::from("Node is not a validator"))?
1045            .consensus_adapter
1046            .close_epoch(epoch_store);
1047        Ok(())
1048    }
1049
1050    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
1051        self.state
1052            .clear_override_protocol_upgrade_buffer_stake(epoch)
1053    }
1054
1055    pub fn set_override_protocol_upgrade_buffer_stake(
1056        &self,
1057        epoch: EpochId,
1058        buffer_stake_bps: u64,
1059    ) -> SuiResult {
1060        self.state
1061            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1062    }
1063
1064    // Testing-only API to start epoch close process.
1065    // For production code, please use the non-testing version.
1066    pub async fn close_epoch_for_testing(&self) -> SuiResult {
1067        let epoch_store = self.state.epoch_store_for_testing();
1068        self.close_epoch(&epoch_store).await
1069    }
1070
1071    fn start_state_snapshot(
1072        config: &NodeConfig,
1073        prometheus_registry: &Registry,
1074        checkpoint_store: Arc<CheckpointStore>,
1075        chain_identifier: ChainIdentifier,
1076    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1077        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1078            let snapshot_uploader = StateSnapshotUploader::new(
1079                &config.db_checkpoint_path(),
1080                &config.snapshot_path(),
1081                remote_store_config.clone(),
1082                60,
1083                prometheus_registry,
1084                checkpoint_store,
1085                chain_identifier,
1086                config.state_snapshot_write_config.archive_interval_epochs,
1087            )?;
1088            Ok(Some(snapshot_uploader.start()))
1089        } else {
1090            Ok(None)
1091        }
1092    }
1093
1094    fn start_db_checkpoint(
1095        config: &NodeConfig,
1096        prometheus_registry: &Registry,
1097        state_snapshot_enabled: bool,
1098    ) -> Result<(
1099        DBCheckpointConfig,
1100        Option<tokio::sync::broadcast::Sender<()>>,
1101    )> {
1102        let checkpoint_path = Some(
1103            config
1104                .db_checkpoint_config
1105                .checkpoint_path
1106                .clone()
1107                .unwrap_or_else(|| config.db_checkpoint_path()),
1108        );
1109        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1110            DBCheckpointConfig {
1111                checkpoint_path,
1112                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1113                    true
1114                } else {
1115                    config
1116                        .db_checkpoint_config
1117                        .perform_db_checkpoints_at_epoch_end
1118                },
1119                ..config.db_checkpoint_config.clone()
1120            }
1121        } else {
1122            config.db_checkpoint_config.clone()
1123        };
1124
1125        match (
1126            db_checkpoint_config.object_store_config.as_ref(),
1127            state_snapshot_enabled,
1128        ) {
1129            // If db checkpoint config object store not specified but
1130            // state snapshot object store is specified, create handler
1131            // anyway for marking db checkpoints as completed so that they
1132            // can be uploaded as state snapshots.
1133            (None, false) => Ok((db_checkpoint_config, None)),
1134            (_, _) => {
1135                let handler = DBCheckpointHandler::new(
1136                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1137                    db_checkpoint_config.object_store_config.as_ref(),
1138                    60,
1139                    db_checkpoint_config
1140                        .prune_and_compact_before_upload
1141                        .unwrap_or(true),
1142                    config.authority_store_pruning_config.clone(),
1143                    prometheus_registry,
1144                    state_snapshot_enabled,
1145                )?;
1146                Ok((
1147                    db_checkpoint_config,
1148                    Some(DBCheckpointHandler::start(handler)),
1149                ))
1150            }
1151        }
1152    }
1153
1154    fn create_p2p_network(
1155        config: &NodeConfig,
1156        state_sync_store: RocksDbStore,
1157        chain_identifier: ChainIdentifier,
1158        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1159        prometheus_registry: &Registry,
1160    ) -> Result<P2pComponents> {
1161        let mut p2p_config = config.p2p_config.clone();
1162        {
1163            let disc = p2p_config.discovery.get_or_insert_with(Default::default);
1164            if disc.peer_addr_store_path.is_none() {
1165                disc.peer_addr_store_path =
1166                    Some(config.db_path().join("discovery_peer_cache.yaml"));
1167            }
1168        }
1169        let mut discovery_builder = discovery::Builder::new().config(p2p_config.clone());
1170        if let Some(consensus_config) = &config.consensus_config {
1171            let effective_addr = consensus_config
1172                .external_address
1173                .as_ref()
1174                .or(consensus_config.listen_address.as_ref());
1175            if let Some(addr) = effective_addr {
1176                discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1177            }
1178        }
1179        let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1180        let discovery_sender = discovery.sender();
1181
1182        let (state_sync, state_sync_router) = state_sync::Builder::new()
1183            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1184            .store(state_sync_store)
1185            .archive_config(config.archive_reader_config())
1186            .discovery_sender(discovery_sender)
1187            .with_metrics(prometheus_registry)
1188            .build();
1189
1190        let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1191        let known_peers: HashMap<PeerId, String> = discovery_config
1192            .allowlisted_peers
1193            .clone()
1194            .into_iter()
1195            .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1196            .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1197                peer.peer_id
1198                    .map(|peer_id| (peer_id, "seed_peer".to_string()))
1199            }))
1200            .collect();
1201
1202        let (randomness, randomness_router) =
1203            randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1204                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1205                .with_metrics(prometheus_registry)
1206                .build();
1207
1208        let p2p_network = {
1209            let routes = anemo::Router::new()
1210                .add_rpc_service(discovery_server)
1211                .merge(state_sync_router);
1212            let routes = routes.merge(randomness_router);
1213
1214            let inbound_network_metrics =
1215                mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1216            let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1217                "sui",
1218                "outbound",
1219                prometheus_registry,
1220            );
1221
1222            let service = ServiceBuilder::new()
1223                .layer(
1224                    TraceLayer::new_for_server_errors()
1225                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1226                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1227                )
1228                .layer(CallbackLayer::new(
1229                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1230                        Arc::new(inbound_network_metrics),
1231                        config.p2p_config.excessive_message_size(),
1232                    ),
1233                ))
1234                .service(routes);
1235
1236            let outbound_layer = ServiceBuilder::new()
1237                .layer(
1238                    TraceLayer::new_for_client_and_server_errors()
1239                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1240                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1241                )
1242                .layer(CallbackLayer::new(
1243                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1244                        Arc::new(outbound_network_metrics),
1245                        config.p2p_config.excessive_message_size(),
1246                    ),
1247                ))
1248                .into_inner();
1249
1250            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1251            // Inbound requests on this network are small (signatures, queries, summaries).
1252            // Cap request frames at 1 MiB.
1253            anemo_config.max_request_frame_size = Some(1 << 20);
1254            // Responses can be larger (checkpoint contents).
1255            // Cap response frames at 128 MiB.
1256            anemo_config.max_response_frame_size = Some(128 << 20);
1257
1258            // Set a higher default value for socket send/receive buffers if not already
1259            // configured.
1260            let mut quic_config = anemo_config.quic.unwrap_or_default();
1261            if quic_config.socket_send_buffer_size.is_none() {
1262                quic_config.socket_send_buffer_size = Some(20 << 20);
1263            }
1264            if quic_config.socket_receive_buffer_size.is_none() {
1265                quic_config.socket_receive_buffer_size = Some(20 << 20);
1266            }
1267            quic_config.allow_failed_socket_buffer_size_setting = true;
1268
1269            // Set high-performance defaults for quinn transport.
1270            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1271            if quic_config.max_concurrent_bidi_streams.is_none() {
1272                quic_config.max_concurrent_bidi_streams = Some(500);
1273            }
1274            if quic_config.max_concurrent_uni_streams.is_none() {
1275                quic_config.max_concurrent_uni_streams = Some(500);
1276            }
1277            if quic_config.stream_receive_window.is_none() {
1278                quic_config.stream_receive_window = Some(100 << 20);
1279            }
1280            if quic_config.receive_window.is_none() {
1281                quic_config.receive_window = Some(200 << 20);
1282            }
1283            if quic_config.send_window.is_none() {
1284                quic_config.send_window = Some(200 << 20);
1285            }
1286            if quic_config.crypto_buffer_size.is_none() {
1287                quic_config.crypto_buffer_size = Some(1 << 20);
1288            }
1289            if quic_config.max_idle_timeout_ms.is_none() {
1290                quic_config.max_idle_timeout_ms = Some(10_000);
1291            }
1292            if quic_config.keep_alive_interval_ms.is_none() {
1293                quic_config.keep_alive_interval_ms = Some(5_000);
1294            }
1295            anemo_config.quic = Some(quic_config);
1296
1297            let server_name = format!("sui-{}", chain_identifier);
1298            let network = Network::bind(config.p2p_config.listen_address)
1299                .server_name(&server_name)
1300                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1301                .config(anemo_config)
1302                .outbound_request_layer(outbound_layer)
1303                .start(service)?;
1304            info!(
1305                server_name = server_name,
1306                "P2p network started on {}",
1307                network.local_addr()
1308            );
1309
1310            network
1311        };
1312
1313        let discovery_handle =
1314            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1315        let state_sync_handle = state_sync.start(p2p_network.clone());
1316        let randomness_handle = randomness.start(p2p_network.clone());
1317
1318        Ok(P2pComponents {
1319            p2p_network,
1320            known_peers,
1321            discovery_handle,
1322            state_sync_handle,
1323            randomness_handle,
1324            endpoint_manager,
1325        })
1326    }
1327
1328    async fn construct_validator_components(
1329        config: NodeConfig,
1330        state: Arc<AuthorityState>,
1331        committee: Arc<Committee>,
1332        epoch_store: Arc<AuthorityPerEpochStore>,
1333        checkpoint_store: Arc<CheckpointStore>,
1334        state_sync_handle: state_sync::Handle,
1335        randomness_handle: randomness::Handle,
1336        global_state_hasher: Weak<GlobalStateHasher>,
1337        backpressure_manager: Arc<BackpressureManager>,
1338        registry_service: &RegistryService,
1339        sui_node_metrics: Arc<SuiNodeMetrics>,
1340        checkpoint_metrics: Arc<CheckpointMetrics>,
1341        node_role: NodeRole,
1342        randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
1343    ) -> Result<ValidatorComponents> {
1344        let mut config_clone = config.clone();
1345        let consensus_config = config_clone
1346            .consensus_config
1347            .as_mut()
1348            .ok_or_else(|| anyhow!("Node is missing consensus config"))?;
1349
1350        let client = Arc::new(UpdatableConsensusClient::new());
1351        let inflight_slot_freed_notify = Arc::new(tokio::sync::Notify::new());
1352        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1353            &committee,
1354            consensus_config,
1355            state.name,
1356            &registry_service.default_registry(),
1357            client.clone(),
1358            checkpoint_store.clone(),
1359            inflight_slot_freed_notify.clone(),
1360        ));
1361
1362        let consensus_manager = Arc::new(ConsensusManager::new(
1363            &config,
1364            consensus_config,
1365            registry_service,
1366            client,
1367            node_role,
1368        ));
1369
1370        // This only gets started up once, not on every epoch. (Make call to remove every epoch.)
1371        let consensus_store_pruner = ConsensusStorePruner::new(
1372            consensus_manager.get_storage_base_path(),
1373            consensus_config.db_retention_epochs(),
1374            consensus_config.db_pruner_period(),
1375            &registry_service.default_registry(),
1376        );
1377
1378        let sui_tx_validator_metrics =
1379            SuiTxValidatorMetrics::new(&registry_service.default_registry());
1380
1381        let (validator_server_handle, admission_queue) = if node_role.is_validator() {
1382            let (handle, queue) = Self::start_grpc_validator_service(
1383                &config,
1384                state.clone(),
1385                consensus_adapter.clone(),
1386                epoch_store.clone(),
1387                &registry_service.default_registry(),
1388                inflight_slot_freed_notify,
1389            )
1390            .await?;
1391            (Some(handle), queue)
1392        } else {
1393            (None, None)
1394        };
1395
1396        // Starts an overload monitor that monitors the execution of the authority.
1397        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1398        let validator_overload_monitor_handle = if node_role.is_validator()
1399            && config
1400                .authority_overload_config
1401                .max_load_shedding_percentage
1402                > 0
1403        {
1404            let authority_state = Arc::downgrade(&state);
1405            let overload_config = config.authority_overload_config.clone();
1406            fail_point!("starting_overload_monitor");
1407            Some(spawn_monitored_task!(overload_monitor(
1408                authority_state,
1409                overload_config,
1410            )))
1411        } else {
1412            None
1413        };
1414
1415        Self::start_epoch_specific_validator_components(
1416            &config,
1417            state.clone(),
1418            consensus_adapter,
1419            checkpoint_store,
1420            epoch_store,
1421            state_sync_handle,
1422            randomness_handle,
1423            randomness_receiver_handle,
1424            consensus_manager,
1425            consensus_store_pruner,
1426            global_state_hasher,
1427            backpressure_manager,
1428            validator_server_handle,
1429            validator_overload_monitor_handle,
1430            checkpoint_metrics,
1431            sui_node_metrics,
1432            sui_tx_validator_metrics,
1433            admission_queue,
1434            node_role,
1435        )
1436        .await
1437    }
1438
1439    async fn start_epoch_specific_validator_components(
1440        config: &NodeConfig,
1441        state: Arc<AuthorityState>,
1442        consensus_adapter: Arc<ConsensusAdapter>,
1443        checkpoint_store: Arc<CheckpointStore>,
1444        epoch_store: Arc<AuthorityPerEpochStore>,
1445        state_sync_handle: state_sync::Handle,
1446        randomness_handle: randomness::Handle,
1447        randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
1448        consensus_manager: Arc<ConsensusManager>,
1449        consensus_store_pruner: ConsensusStorePruner,
1450        state_hasher: Weak<GlobalStateHasher>,
1451        backpressure_manager: Arc<BackpressureManager>,
1452        validator_server_handle: Option<SpawnOnce>,
1453        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1454        checkpoint_metrics: Arc<CheckpointMetrics>,
1455        sui_node_metrics: Arc<SuiNodeMetrics>,
1456        sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1457        admission_queue: Option<AdmissionQueueContext>,
1458        node_role: NodeRole,
1459    ) -> Result<ValidatorComponents> {
1460        let checkpoint_service = Self::build_checkpoint_service(
1461            config,
1462            consensus_adapter.clone(),
1463            checkpoint_store.clone(),
1464            epoch_store.clone(),
1465            state.clone(),
1466            state_sync_handle,
1467            state_hasher,
1468            checkpoint_metrics.clone(),
1469            node_role,
1470        );
1471
1472        // Clear the VSS public key from the previous epoch so any randomness round
1473        // signatures buffer in the channel until the new DKG completes.
1474        randomness_receiver_handle.clear_public_key();
1475
1476        if node_role.runs_consensus() && epoch_store.randomness_state_enabled() {
1477            let authority_key_pair = if node_role.is_validator() {
1478                Some(config.protocol_key_pair())
1479            } else {
1480                None
1481            };
1482            let randomness_manager = RandomnessManager::try_new(
1483                Arc::downgrade(&epoch_store),
1484                Box::new(consensus_adapter.clone()),
1485                randomness_handle,
1486                authority_key_pair,
1487                randomness_receiver_handle.clone(),
1488            )
1489            .await;
1490            if let Some(randomness_manager) = randomness_manager {
1491                epoch_store
1492                    .set_randomness_manager(randomness_manager)
1493                    .await?;
1494            }
1495        }
1496
1497        if node_role.is_validator() {
1498            ExecutionTimeObserver::spawn(
1499                epoch_store.clone(),
1500                Box::new(consensus_adapter.clone()),
1501                config
1502                    .execution_time_observer_config
1503                    .clone()
1504                    .unwrap_or_default(),
1505            );
1506        }
1507
1508        let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1509            None,
1510            state.metrics.clone(),
1511        ));
1512
1513        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1514            state.clone(),
1515            checkpoint_service.clone(),
1516            epoch_store.clone(),
1517            consensus_adapter.clone(),
1518            throughput_calculator,
1519            backpressure_manager,
1520            config.congestion_log.clone(),
1521        );
1522
1523        info!("Starting consensus manager asynchronously");
1524
1525        // Spawn consensus startup asynchronously to avoid blocking other components
1526        tokio::spawn({
1527            let config = config.clone();
1528            let epoch_store = epoch_store.clone();
1529            let sui_tx_validator = SuiTxValidator::new(
1530                state.clone(),
1531                epoch_store.clone(),
1532                checkpoint_service.clone(),
1533                sui_tx_validator_metrics.clone(),
1534            );
1535            let consensus_manager = consensus_manager.clone();
1536            async move {
1537                consensus_manager
1538                    .start(
1539                        &config,
1540                        epoch_store,
1541                        consensus_handler_initializer,
1542                        sui_tx_validator,
1543                        Some(randomness_receiver_handle),
1544                    )
1545                    .await;
1546            }
1547        });
1548        let replay_waiter = consensus_manager.replay_waiter();
1549
1550        info!("Spawning checkpoint service");
1551        let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1552            None
1553        } else {
1554            Some(replay_waiter)
1555        };
1556        checkpoint_service
1557            .spawn(epoch_store.clone(), replay_waiter)
1558            .await;
1559
1560        if node_role.is_validator() && epoch_store.authenticator_state_enabled() {
1561            Self::start_jwk_updater(
1562                config,
1563                sui_node_metrics,
1564                state.name,
1565                epoch_store.clone(),
1566                consensus_adapter.clone(),
1567            );
1568        }
1569
1570        if let Some(ctx) = &admission_queue {
1571            ctx.rotate_for_epoch(epoch_store);
1572        }
1573
1574        Ok(ValidatorComponents {
1575            validator_server_handle,
1576            validator_overload_monitor_handle,
1577            consensus_manager,
1578            consensus_store_pruner,
1579            consensus_adapter,
1580            checkpoint_metrics,
1581            sui_tx_validator_metrics,
1582            admission_queue,
1583        })
1584    }
1585
1586    fn build_checkpoint_service(
1587        config: &NodeConfig,
1588        consensus_adapter: Arc<ConsensusAdapter>,
1589        checkpoint_store: Arc<CheckpointStore>,
1590        epoch_store: Arc<AuthorityPerEpochStore>,
1591        state: Arc<AuthorityState>,
1592        state_sync_handle: state_sync::Handle,
1593        state_hasher: Weak<GlobalStateHasher>,
1594        checkpoint_metrics: Arc<CheckpointMetrics>,
1595        node_role: NodeRole,
1596    ) -> Arc<CheckpointService> {
1597        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1598        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1599
1600        debug!(
1601            "Starting checkpoint service with epoch start timestamp {}
1602            and epoch duration {}",
1603            epoch_start_timestamp_ms, epoch_duration_ms
1604        );
1605
1606        let checkpoint_output: Box<dyn CheckpointOutput> = if node_role.is_validator() {
1607            Box::new(SubmitCheckpointToConsensus {
1608                sender: consensus_adapter,
1609                signer: state.secret.clone(),
1610                authority: config.protocol_public_key(),
1611                next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1612                    .checked_add(epoch_duration_ms)
1613                    .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1614                metrics: checkpoint_metrics.clone(),
1615            })
1616        } else {
1617            LogCheckpointOutput::boxed()
1618        };
1619
1620        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1621
1622        CheckpointService::build(
1623            state.clone(),
1624            checkpoint_store,
1625            epoch_store,
1626            state.get_transaction_cache_reader().clone(),
1627            state_hasher,
1628            checkpoint_output,
1629            Box::new(certified_checkpoint_output),
1630            checkpoint_metrics,
1631        )
1632    }
1633
1634    fn construct_consensus_adapter(
1635        committee: &Committee,
1636        consensus_config: &ConsensusConfig,
1637        authority: AuthorityName,
1638        prometheus_registry: &Registry,
1639        consensus_client: Arc<dyn ConsensusClient>,
1640        checkpoint_store: Arc<CheckpointStore>,
1641        inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1642    ) -> ConsensusAdapter {
1643        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1644        // The consensus adapter allows the authority to send user certificates through consensus.
1645
1646        ConsensusAdapter::new(
1647            consensus_client,
1648            checkpoint_store,
1649            authority,
1650            consensus_config.max_pending_transactions(),
1651            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1652            ca_metrics,
1653            inflight_slot_freed_notify,
1654        )
1655    }
1656
1657    async fn start_grpc_validator_service(
1658        config: &NodeConfig,
1659        state: Arc<AuthorityState>,
1660        consensus_adapter: Arc<ConsensusAdapter>,
1661        epoch_store: Arc<AuthorityPerEpochStore>,
1662        prometheus_registry: &Registry,
1663        inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1664    ) -> Result<(SpawnOnce, Option<AdmissionQueueContext>)> {
1665        let overload_config = &config.authority_overload_config;
1666        let admission_queue = overload_config.admission_queue_enabled.then(|| {
1667            let manager = Arc::new(AdmissionQueueManager::new(
1668                consensus_adapter.clone(),
1669                Arc::new(AdmissionQueueMetrics::new(prometheus_registry)),
1670                overload_config.admission_queue_capacity_fraction,
1671                overload_config.admission_queue_bypass_fraction,
1672                overload_config.admission_queue_failover_timeout,
1673                inflight_slot_freed_notify,
1674            ));
1675            AdmissionQueueContext::spawn(manager, epoch_store)
1676        });
1677        let validator_service = ValidatorService::new(
1678            state.clone(),
1679            consensus_adapter,
1680            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1681            config.policy_config.clone().map(|p| p.client_id_source),
1682            admission_queue.clone(),
1683        );
1684
1685        let mut server_conf = mysten_network::config::Config::new();
1686        server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1687        server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1688        server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1689        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1690        server_conf.load_shed = config.grpc_load_shed;
1691        let mut server_builder =
1692            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1693
1694        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1695
1696        let tls_config = sui_tls::create_rustls_server_config(
1697            config.network_key_pair().copy().private(),
1698            SUI_TLS_SERVER_NAME.to_string(),
1699        );
1700
1701        let network_address = config.network_address().clone();
1702
1703        let (ready_tx, ready_rx) = oneshot::channel();
1704
1705        let spawn_once = SpawnOnce::new(ready_rx, async move {
1706            let server = server_builder
1707                .bind(&network_address, Some(tls_config))
1708                .await
1709                .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1710            let local_addr = server.local_addr();
1711            info!("Listening to traffic on {local_addr}");
1712            ready_tx.send(()).unwrap();
1713            if let Err(err) = server.serve().await {
1714                info!("Server stopped: {err}");
1715            }
1716            info!("Server stopped");
1717        });
1718        Ok((spawn_once, admission_queue))
1719    }
1720
1721    pub fn state(&self) -> Arc<AuthorityState> {
1722        self.state.clone()
1723    }
1724
1725    /// The embedded `sui-rpc-store` index backend, when the node runs
1726    /// with `use_experimental_rpc_store`. Exposes the startup bootstrap
1727    /// decision and per-cohort watermarks for introspection (used by
1728    /// tests to observe restore/resume behavior across restarts without
1729    /// going through the RPC surface).
1730    pub fn embedded_rpc_store(&self) -> Option<&EmbeddedRpcStore> {
1731        self.embedded_rpc_store.as_ref()
1732    }
1733
1734    #[cfg(any(test, msim))]
1735    pub fn connection_monitor_handle_for_testing(
1736        &self,
1737    ) -> &mysten_network::anemo_connection_monitor::ConnectionMonitorHandle {
1738        &self._connection_monitor_handle
1739    }
1740
1741    pub fn node_role(&self) -> NodeRole {
1742        self.state.load_epoch_store_one_call_per_task().node_role()
1743    }
1744
1745    // Only used for testing because of how epoch store is loaded.
1746    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1747        self.state.reference_gas_price_for_testing()
1748    }
1749
1750    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1751        self.state.committee_store().clone()
1752    }
1753
1754    pub fn clone_checkpoint_store(&self) -> Arc<CheckpointStore> {
1755        self.checkpoint_store.clone()
1756    }
1757
1758    pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1759        self.state.authority_store()
1760    }
1761
1762    pub fn clone_consensus_store(
1763        &self,
1764    ) -> Option<Arc<consensus_core::storage::rocksdb_store::RocksDBStore>> {
1765        self.validator_components
1766            .try_lock()
1767            .ok()?
1768            .as_ref()?
1769            .consensus_manager
1770            .consensus_store()
1771    }
1772
1773    /// Clone an AuthorityAggregator currently used in this node, if the node is a fullnode.
1774    /// After reconfig, Transaction Driver builds a new AuthorityAggregator. The caller
1775    /// of this function will mostly likely want to call this again
1776    /// to get a fresh one.
1777    pub fn clone_authority_aggregator(
1778        &self,
1779    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1780        self.transaction_orchestrator
1781            .as_ref()
1782            .map(|to| to.clone_authority_aggregator())
1783    }
1784
1785    pub fn transaction_orchestrator(
1786        &self,
1787    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1788        self.transaction_orchestrator.clone()
1789    }
1790
1791    /// This function awaits the completion of checkpoint execution of the current epoch,
1792    /// after which it initiates reconfiguration of the entire system.
1793    pub async fn monitor_reconfiguration(
1794        self: Arc<Self>,
1795        mut epoch_store: Arc<AuthorityPerEpochStore>,
1796    ) -> Result<()> {
1797        let checkpoint_executor_metrics =
1798            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1799
1800        loop {
1801            let mut hasher_guard = self.global_state_hasher.lock().await;
1802            let hasher = hasher_guard.take().unwrap();
1803            info!(
1804                "Creating checkpoint executor for epoch {}",
1805                epoch_store.epoch()
1806            );
1807            let checkpoint_executor = CheckpointExecutor::new(
1808                epoch_store.clone(),
1809                self.checkpoint_store.clone(),
1810                self.state.clone(),
1811                hasher.clone(),
1812                self.backpressure_manager.clone(),
1813                self.config.checkpoint_executor_config.clone(),
1814                checkpoint_executor_metrics.clone(),
1815                self.subscription_service_checkpoint_sender.clone(),
1816            );
1817
1818            let run_with_range = self.config.run_with_range;
1819
1820            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1821
1822            // Update the current protocol version metric.
1823            self.metrics
1824                .current_protocol_version
1825                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1826
1827            // Advertise capabilities to committee, if we are a validator.
1828            // FullNodes that state sync via consensus will also have validator components, by they are not supposed to submit any capabilities.
1829            if let Some(components) = &*self.validator_components.lock().await
1830                && cur_epoch_store.is_validator()
1831            {
1832                // TODO: without this sleep, the consensus message is not delivered reliably.
1833                tokio::time::sleep(Duration::from_millis(1)).await;
1834
1835                let config = cur_epoch_store.protocol_config();
1836                let mut supported_protocol_versions = self
1837                    .config
1838                    .supported_protocol_versions
1839                    .expect("Supported versions should be populated")
1840                    // no need to send digests of versions less than the current version
1841                    .truncate_below(config.version);
1842
1843                while supported_protocol_versions.max > config.version {
1844                    let proposed_protocol_config = ProtocolConfig::get_for_version(
1845                        supported_protocol_versions.max,
1846                        cur_epoch_store.get_chain(),
1847                    );
1848
1849                    if proposed_protocol_config.enable_accumulators()
1850                        && !epoch_store.accumulator_root_exists()
1851                    {
1852                        error!(
1853                            "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1854                            supported_protocol_versions.max
1855                        );
1856                        supported_protocol_versions.max = supported_protocol_versions.max.prev();
1857                    } else {
1858                        break;
1859                    }
1860                }
1861
1862                let binary_config = config.binary_config(None);
1863                let transaction = ConsensusTransaction::new_capability_notification_v2(
1864                    AuthorityCapabilitiesV2::new(
1865                        self.state.name,
1866                        cur_epoch_store.get_chain_identifier().chain(),
1867                        supported_protocol_versions,
1868                        self.state
1869                            .get_available_system_packages(&binary_config)
1870                            .await,
1871                    ),
1872                );
1873                info!(?transaction, "submitting capabilities to consensus");
1874                components.consensus_adapter.submit(
1875                    transaction,
1876                    None,
1877                    &cur_epoch_store,
1878                    None,
1879                    None,
1880                )?;
1881            }
1882
1883            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1884
1885            if stop_condition == StopReason::RunWithRangeCondition {
1886                SuiNode::shutdown(&self).await;
1887                self.shutdown_channel_tx
1888                    .send(run_with_range)
1889                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1890                return Ok(());
1891            }
1892
1893            // Safe to call because we are in the middle of reconfiguration.
1894            let latest_system_state = self
1895                .state
1896                .get_object_cache_reader()
1897                .get_sui_system_state_object_unsafe()
1898                .expect("Read Sui System State object cannot fail");
1899
1900            #[cfg(msim)]
1901            if !self
1902                .sim_state
1903                .sim_safe_mode_expected
1904                .load(Ordering::Relaxed)
1905            {
1906                debug_assert!(!latest_system_state.safe_mode());
1907            }
1908
1909            #[cfg(not(msim))]
1910            debug_assert!(!latest_system_state.safe_mode());
1911
1912            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1913                && self.state.is_fullnode(&cur_epoch_store)
1914            {
1915                warn!(
1916                    "Failed to send end of epoch notification to subscriber: {:?}",
1917                    err
1918                );
1919            }
1920
1921            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1922            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1923
1924            self.auth_agg.store(Arc::new(
1925                self.auth_agg
1926                    .load()
1927                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1928            ));
1929
1930            let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1931            let next_epoch = next_epoch_committee.epoch();
1932            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1933
1934            info!(
1935                next_epoch,
1936                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1937            );
1938
1939            fail_point_async!("reconfig_delay");
1940
1941            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1942
1943            update_peer_addresses(
1944                &self.config,
1945                &self.endpoint_manager,
1946                &new_epoch_start_state,
1947                Some(cur_epoch_store.epoch_start_state()),
1948            );
1949
1950            let mut validator_components_lock_guard = self.validator_components.lock().await;
1951
1952            // The following code handles 4 different cases, depending on whether the node
1953            // was a validator in the previous epoch, and whether the node is a validator
1954            // in the new epoch.
1955            let new_epoch_store = self
1956                .reconfigure_state(
1957                    &self.state,
1958                    &cur_epoch_store,
1959                    next_epoch_committee.clone(),
1960                    new_epoch_start_state,
1961                    hasher.clone(),
1962                )
1963                .await;
1964
1965            let new_role = new_epoch_store.node_role();
1966
1967            let new_validator_components = if let Some(ValidatorComponents {
1968                validator_server_handle,
1969                validator_overload_monitor_handle,
1970                consensus_manager,
1971                consensus_store_pruner,
1972                consensus_adapter,
1973                checkpoint_metrics,
1974                sui_tx_validator_metrics,
1975                admission_queue,
1976            }) = validator_components_lock_guard.take()
1977            {
1978                info!("Reconfiguring node (was running consensus).");
1979
1980                consensus_manager.shutdown().await;
1981                info!("Consensus has shut down.");
1982
1983                info!("Epoch store finished reconfiguration.");
1984
1985                // No other components should be holding a strong reference to state hasher
1986                // at this point. Confirm here before we swap in the new hasher.
1987                let global_state_hasher_metrics = Arc::into_inner(hasher)
1988                    .expect("Object state hasher should have no other references at this point")
1989                    .metrics();
1990                let new_hasher = Arc::new(GlobalStateHasher::new(
1991                    self.state.get_global_state_hash_store().clone(),
1992                    global_state_hasher_metrics,
1993                ));
1994                let weak_hasher = Arc::downgrade(&new_hasher);
1995                *hasher_guard = Some(new_hasher);
1996
1997                consensus_store_pruner.prune(next_epoch).await;
1998
1999                if new_role.runs_consensus() {
2000                    info!("Restarting consensus as {new_role}");
2001                    Some(
2002                        Self::start_epoch_specific_validator_components(
2003                            &self.config,
2004                            self.state.clone(),
2005                            consensus_adapter,
2006                            self.checkpoint_store.clone(),
2007                            new_epoch_store.clone(),
2008                            self.state_sync_handle.clone(),
2009                            self.randomness_handle.clone(),
2010                            self.randomness_receiver_handle.clone(),
2011                            consensus_manager,
2012                            consensus_store_pruner,
2013                            weak_hasher,
2014                            self.backpressure_manager.clone(),
2015                            validator_server_handle,
2016                            validator_overload_monitor_handle,
2017                            checkpoint_metrics,
2018                            self.metrics.clone(),
2019                            sui_tx_validator_metrics,
2020                            admission_queue,
2021                            new_role,
2022                        )
2023                        .await?,
2024                    )
2025                } else {
2026                    info!(
2027                        "This node has new role {new_role} and no longer runs consensus after reconfiguration"
2028                    );
2029                    None
2030                }
2031            } else {
2032                // No other components should be holding a strong reference to state hasher
2033                // at this point. Confirm here before we swap in the new hasher.
2034                let global_state_hasher_metrics = Arc::into_inner(hasher)
2035                    .expect("Object state hasher should have no other references at this point")
2036                    .metrics();
2037                let new_hasher = Arc::new(GlobalStateHasher::new(
2038                    self.state.get_global_state_hash_store().clone(),
2039                    global_state_hasher_metrics,
2040                ));
2041                let weak_hasher = Arc::downgrade(&new_hasher);
2042                *hasher_guard = Some(new_hasher);
2043
2044                if new_role.runs_consensus() {
2045                    info!("Promoting node to {new_role}, starting consensus components");
2046
2047                    let mut components = Self::construct_validator_components(
2048                        self.config.clone(),
2049                        self.state.clone(),
2050                        Arc::new(next_epoch_committee.clone()),
2051                        new_epoch_store.clone(),
2052                        self.checkpoint_store.clone(),
2053                        self.state_sync_handle.clone(),
2054                        self.randomness_handle.clone(),
2055                        weak_hasher,
2056                        self.backpressure_manager.clone(),
2057                        &self.registry_service,
2058                        self.metrics.clone(),
2059                        self.checkpoint_metrics.clone(),
2060                        new_role,
2061                        self.randomness_receiver_handle.clone(),
2062                    )
2063                    .await?;
2064
2065                    if new_role.is_validator() {
2066                        components.validator_server_handle = Some(
2067                            components
2068                                .validator_server_handle
2069                                .take()
2070                                .unwrap()
2071                                .start()
2072                                .await,
2073                        );
2074
2075                        self.endpoint_manager
2076                            .set_consensus_address_updater(components.consensus_manager.clone());
2077                    }
2078
2079                    Some(components)
2080                } else {
2081                    None
2082                }
2083            };
2084            *validator_components_lock_guard = new_validator_components;
2085
2086            // Force releasing current epoch store DB handle, because the
2087            // Arc<AuthorityPerEpochStore> may linger.
2088            cur_epoch_store.release_db_handles();
2089
2090            if cfg!(msim)
2091                && !matches!(
2092                    self.config
2093                        .authority_store_pruning_config
2094                        .num_epochs_to_retain_for_checkpoints(),
2095                    None | Some(u64::MAX) | Some(0)
2096                )
2097            {
2098                self.state
2099                    .prune_checkpoints_for_eligible_epochs_for_testing(
2100                        self.config.clone(),
2101                        sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2102                    )
2103                    .await?;
2104            }
2105
2106            epoch_store = new_epoch_store;
2107            info!("Reconfiguration finished");
2108        }
2109    }
2110
2111    async fn shutdown(&self) {
2112        if let Some(validator_components) = &*self.validator_components.lock().await {
2113            validator_components.consensus_manager.shutdown().await;
2114        }
2115    }
2116
2117    async fn reconfigure_state(
2118        &self,
2119        state: &Arc<AuthorityState>,
2120        cur_epoch_store: &AuthorityPerEpochStore,
2121        next_epoch_committee: Committee,
2122        next_epoch_start_system_state: EpochStartSystemState,
2123        global_state_hasher: Arc<GlobalStateHasher>,
2124    ) -> Arc<AuthorityPerEpochStore> {
2125        let next_epoch = next_epoch_committee.epoch();
2126
2127        let last_checkpoint = self
2128            .checkpoint_store
2129            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2130            .expect("Error loading last checkpoint for current epoch")
2131            .expect("Could not load last checkpoint for current epoch");
2132
2133        let last_checkpoint_seq = *last_checkpoint.sequence_number();
2134
2135        assert_eq!(
2136            Some(last_checkpoint_seq),
2137            self.checkpoint_store
2138                .get_highest_executed_checkpoint_seq_number()
2139                .expect("Error loading highest executed checkpoint sequence number")
2140        );
2141
2142        let epoch_start_configuration = EpochStartConfiguration::new(
2143            next_epoch_start_system_state,
2144            *last_checkpoint.digest(),
2145            state.get_object_store().as_ref(),
2146            EpochFlag::default_flags_for_new_epoch(&state.config),
2147        )
2148        .expect("EpochStartConfiguration construction cannot fail");
2149
2150        let new_epoch_store = self
2151            .state
2152            .reconfigure(
2153                cur_epoch_store,
2154                self.config.supported_protocol_versions.unwrap(),
2155                next_epoch_committee,
2156                epoch_start_configuration,
2157                global_state_hasher,
2158                &self.config.expensive_safety_check_config,
2159                last_checkpoint_seq,
2160            )
2161            .await
2162            .expect("Reconfigure authority state cannot fail");
2163        info!(next_epoch, "Node State has been reconfigured");
2164        assert_eq!(next_epoch, new_epoch_store.epoch());
2165        self.state.get_reconfig_api().update_epoch_flags_metrics(
2166            cur_epoch_store.epoch_start_config().flags(),
2167            new_epoch_store.epoch_start_config().flags(),
2168        );
2169
2170        new_epoch_store
2171    }
2172
2173    pub fn get_config(&self) -> &NodeConfig {
2174        &self.config
2175    }
2176
2177    pub fn randomness_handle(&self) -> randomness::Handle {
2178        self.randomness_handle.clone()
2179    }
2180
2181    pub fn state_sync_handle(&self) -> state_sync::Handle {
2182        self.state_sync_handle.clone()
2183    }
2184
2185    pub fn endpoint_manager(&self) -> &EndpointManager {
2186        &self.endpoint_manager
2187    }
2188
2189    /// Get a short prefix of a digest for metric labels
2190    fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2191        let digest_str = digest.to_string();
2192        if digest_str.len() >= 8 {
2193            digest_str[0..8].to_string()
2194        } else {
2195            digest_str
2196        }
2197    }
2198
2199    /// Check for previously detected forks and handle them appropriately.
2200    /// For validators with fork recovery config, clear the fork if it matches the recovery config.
2201    /// For all other cases, block node startup if a fork is detected.
2202    async fn check_and_recover_forks(
2203        checkpoint_store: &CheckpointStore,
2204        checkpoint_metrics: &CheckpointMetrics,
2205        fork_recovery: Option<&ForkRecoveryConfig>,
2206    ) -> Result<()> {
2207        // Try to recover from forks if recovery config is provided
2208        if let Some(recovery) = fork_recovery {
2209            Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2210            Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2211        }
2212
2213        if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2214            .get_checkpoint_fork_detected()
2215            .map_err(|e| {
2216                error!("Failed to check for checkpoint fork: {:?}", e);
2217                e
2218            })?
2219        {
2220            Self::handle_checkpoint_fork(
2221                checkpoint_seq,
2222                checkpoint_digest,
2223                checkpoint_metrics,
2224                fork_recovery,
2225            )
2226            .await?;
2227        }
2228        if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2229            .get_transaction_fork_detected()
2230            .map_err(|e| {
2231                error!("Failed to check for transaction fork: {:?}", e);
2232                e
2233            })?
2234        {
2235            Self::handle_transaction_fork(
2236                tx_digest,
2237                expected_effects,
2238                actual_effects,
2239                checkpoint_metrics,
2240                fork_recovery,
2241            )
2242            .await?;
2243        }
2244
2245        Ok(())
2246    }
2247
2248    fn try_recover_checkpoint_fork(
2249        checkpoint_store: &CheckpointStore,
2250        recovery: &ForkRecoveryConfig,
2251    ) -> Result<()> {
2252        // If configured overrides include a checkpoint whose locally computed digest mismatches,
2253        // clear locally computed checkpoints from that sequence (inclusive).
2254        for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2255            let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2256                anyhow::bail!(
2257                    "Invalid checkpoint digest override for seq {}: {}",
2258                    seq,
2259                    expected_digest_str
2260                );
2261            };
2262
2263            if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2264                let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2265                if local_digest != expected_digest {
2266                    info!(
2267                        seq,
2268                        local = %Self::get_digest_prefix(local_digest),
2269                        expected = %Self::get_digest_prefix(expected_digest),
2270                        "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2271                        seq
2272                    );
2273                    checkpoint_store
2274                        .clear_locally_computed_checkpoints_from(*seq)
2275                        .context(
2276                            "Failed to clear locally computed checkpoints from override seq",
2277                        )?;
2278                }
2279            }
2280        }
2281
2282        if let Some((checkpoint_seq, checkpoint_digest)) =
2283            checkpoint_store.get_checkpoint_fork_detected()?
2284            && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2285        {
2286            info!(
2287                "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2288                checkpoint_seq, checkpoint_digest
2289            );
2290            checkpoint_store
2291                .clear_checkpoint_fork_detected()
2292                .expect("Failed to clear checkpoint fork detected marker");
2293        }
2294        Ok(())
2295    }
2296
2297    fn try_recover_transaction_fork(
2298        checkpoint_store: &CheckpointStore,
2299        recovery: &ForkRecoveryConfig,
2300    ) -> Result<()> {
2301        if recovery.transaction_overrides.is_empty() {
2302            return Ok(());
2303        }
2304
2305        if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2306            && recovery
2307                .transaction_overrides
2308                .contains_key(&tx_digest.to_string())
2309        {
2310            info!(
2311                "Fork recovery enabled: clearing transaction fork for tx {:?}",
2312                tx_digest
2313            );
2314            checkpoint_store
2315                .clear_transaction_fork_detected()
2316                .expect("Failed to clear transaction fork detected marker");
2317        }
2318        Ok(())
2319    }
2320
2321    fn get_current_timestamp() -> u64 {
2322        std::time::SystemTime::now()
2323            .duration_since(std::time::SystemTime::UNIX_EPOCH)
2324            .unwrap()
2325            .as_secs()
2326    }
2327
2328    async fn handle_checkpoint_fork(
2329        checkpoint_seq: u64,
2330        checkpoint_digest: CheckpointDigest,
2331        checkpoint_metrics: &CheckpointMetrics,
2332        fork_recovery: Option<&ForkRecoveryConfig>,
2333    ) -> Result<()> {
2334        checkpoint_metrics
2335            .checkpoint_fork_crash_mode
2336            .with_label_values(&[
2337                &checkpoint_seq.to_string(),
2338                &Self::get_digest_prefix(checkpoint_digest),
2339                &Self::get_current_timestamp().to_string(),
2340            ])
2341            .set(1);
2342
2343        let behavior = fork_recovery
2344            .map(|fr| fr.fork_crash_behavior)
2345            .unwrap_or_default();
2346
2347        match behavior {
2348            ForkCrashBehavior::AwaitForkRecovery => {
2349                error!(
2350                    checkpoint_seq = checkpoint_seq,
2351                    checkpoint_digest = ?checkpoint_digest,
2352                    "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2353                );
2354                futures::future::pending::<()>().await;
2355                unreachable!("pending() should never return");
2356            }
2357            ForkCrashBehavior::ReturnError => {
2358                error!(
2359                    checkpoint_seq = checkpoint_seq,
2360                    checkpoint_digest = ?checkpoint_digest,
2361                    "Checkpoint fork detected! Returning error."
2362                );
2363                Err(anyhow::anyhow!(
2364                    "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2365                    checkpoint_seq,
2366                    checkpoint_digest
2367                ))
2368            }
2369        }
2370    }
2371
2372    async fn handle_transaction_fork(
2373        tx_digest: TransactionDigest,
2374        expected_effects_digest: TransactionEffectsDigest,
2375        actual_effects_digest: TransactionEffectsDigest,
2376        checkpoint_metrics: &CheckpointMetrics,
2377        fork_recovery: Option<&ForkRecoveryConfig>,
2378    ) -> Result<()> {
2379        checkpoint_metrics
2380            .transaction_fork_crash_mode
2381            .with_label_values(&[
2382                &Self::get_digest_prefix(tx_digest),
2383                &Self::get_digest_prefix(expected_effects_digest),
2384                &Self::get_digest_prefix(actual_effects_digest),
2385                &Self::get_current_timestamp().to_string(),
2386            ])
2387            .set(1);
2388
2389        let behavior = fork_recovery
2390            .map(|fr| fr.fork_crash_behavior)
2391            .unwrap_or_default();
2392
2393        match behavior {
2394            ForkCrashBehavior::AwaitForkRecovery => {
2395                error!(
2396                    tx_digest = ?tx_digest,
2397                    expected_effects_digest = ?expected_effects_digest,
2398                    actual_effects_digest = ?actual_effects_digest,
2399                    "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2400                );
2401                futures::future::pending::<()>().await;
2402                unreachable!("pending() should never return");
2403            }
2404            ForkCrashBehavior::ReturnError => {
2405                error!(
2406                    tx_digest = ?tx_digest,
2407                    expected_effects_digest = ?expected_effects_digest,
2408                    actual_effects_digest = ?actual_effects_digest,
2409                    "Transaction fork detected! Returning error."
2410                );
2411                Err(anyhow::anyhow!(
2412                    "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2413                    tx_digest,
2414                    expected_effects_digest,
2415                    actual_effects_digest
2416                ))
2417            }
2418        }
2419    }
2420}
2421
2422#[cfg(not(msim))]
2423impl SuiNode {
2424    async fn fetch_jwks(
2425        _authority: AuthorityName,
2426        provider: &OIDCProvider,
2427    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2428        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2429        use sui_types::error::SuiErrorKind;
2430        let client = reqwest::Client::new();
2431        fetch_jwks(provider, &client, true)
2432            .await
2433            .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2434    }
2435}
2436
2437#[cfg(msim)]
2438impl SuiNode {
2439    pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2440        self.sim_state.sim_node.id()
2441    }
2442
2443    pub fn set_safe_mode_expected(&self, new_value: bool) {
2444        info!("Setting safe mode expected to {}", new_value);
2445        self.sim_state
2446            .sim_safe_mode_expected
2447            .store(new_value, Ordering::Relaxed);
2448    }
2449
2450    #[allow(unused_variables)]
2451    async fn fetch_jwks(
2452        authority: AuthorityName,
2453        provider: &OIDCProvider,
2454    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2455        get_jwk_injector()(authority, provider)
2456    }
2457}
2458
2459enum SpawnOnce {
2460    // Mutex is only needed to make SpawnOnce Send
2461    Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2462    #[allow(unused)]
2463    Started(JoinHandle<()>),
2464}
2465
2466impl SpawnOnce {
2467    pub fn new(
2468        ready_rx: oneshot::Receiver<()>,
2469        future: impl Future<Output = ()> + Send + 'static,
2470    ) -> Self {
2471        Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2472    }
2473
2474    pub async fn start(self) -> Self {
2475        match self {
2476            Self::Unstarted(ready_rx, future) => {
2477                let future = future.into_inner();
2478                let handle = tokio::spawn(future);
2479                ready_rx.await.unwrap();
2480                Self::Started(handle)
2481            }
2482            Self::Started(_) => self,
2483        }
2484    }
2485}
2486
2487/// Updates trusted peer addresses in the p2p network (for nodes configured as validators).
2488/// When `prev_epoch_start_state` is provided, validators that are no longer in the committee
2489/// have their Chain addresses cleared.
2490fn update_peer_addresses(
2491    config: &NodeConfig,
2492    endpoint_manager: &EndpointManager,
2493    epoch_start_state: &EpochStartSystemState,
2494    prev_epoch_start_state: Option<&EpochStartSystemState>,
2495) {
2496    if config.consensus_config().is_none() {
2497        return;
2498    }
2499    let new_peers: HashSet<PeerId> = epoch_start_state
2500        .get_validator_as_p2p_peers(config.protocol_public_key())
2501        .into_iter()
2502        .map(|(peer_id, address)| {
2503            endpoint_manager
2504                .update_endpoint(
2505                    EndpointId::P2p(peer_id),
2506                    AddressSource::Chain,
2507                    vec![address],
2508                )
2509                .expect("Updating peer addresses should not fail");
2510            peer_id
2511        })
2512        .collect();
2513
2514    // Clear Chain addresses for validators that left the committee.
2515    if let Some(prev) = prev_epoch_start_state {
2516        for (peer_id, _) in prev.get_validator_as_p2p_peers(config.protocol_public_key()) {
2517            if !new_peers.contains(&peer_id) {
2518                endpoint_manager
2519                    .update_endpoint(EndpointId::P2p(peer_id), AddressSource::Chain, vec![])
2520                    .expect("Clearing peer addresses should not fail");
2521            }
2522        }
2523    }
2524}
2525
2526fn build_kv_store(
2527    state: &Arc<AuthorityState>,
2528    config: &NodeConfig,
2529    registry: &Registry,
2530) -> Result<Arc<TransactionKeyValueStore>> {
2531    let metrics = KeyValueStoreMetrics::new(registry);
2532    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2533
2534    let base_url = &config.transaction_kv_store_read_config.base_url;
2535
2536    if base_url.is_empty() {
2537        info!("no http kv store url provided, using local db only");
2538        return Ok(Arc::new(db_store));
2539    }
2540
2541    let base_url: url::Url = base_url.parse().tap_err(|e| {
2542        error!(
2543            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2544            base_url, e
2545        )
2546    })?;
2547
2548    let network_str = match state.get_chain_identifier().chain() {
2549        Chain::Mainnet => "/mainnet",
2550        _ => {
2551            info!("using local db only for kv store");
2552            return Ok(Arc::new(db_store));
2553        }
2554    };
2555
2556    let base_url = base_url.join(network_str)?.to_string();
2557    let http_store = HttpKVStore::new_kv(
2558        &base_url,
2559        config.transaction_kv_store_read_config.cache_size,
2560        metrics.clone(),
2561    )?;
2562    info!("using local key-value store with fallback to http key-value store");
2563    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2564        db_store,
2565        http_store,
2566        metrics,
2567        "json_rpc_fallback",
2568    )))
2569}
2570
2571async fn build_json_rpc_router(
2572    state: &Arc<AuthorityState>,
2573    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2574    config: &NodeConfig,
2575    prometheus_registry: &Registry,
2576) -> Result<axum::Router> {
2577    let traffic_controller = state.traffic_controller.clone();
2578    let mut server = JsonRpcServerBuilder::new(
2579        env!("CARGO_PKG_VERSION"),
2580        prometheus_registry,
2581        traffic_controller,
2582        config.policy_config.clone(),
2583    );
2584
2585    let kv_store = build_kv_store(state, config, prometheus_registry)?;
2586
2587    let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2588    server.register_module(ReadApi::new(
2589        state.clone(),
2590        kv_store.clone(),
2591        metrics.clone(),
2592    ))?;
2593    server.register_module(CoinReadApi::new(
2594        state.clone(),
2595        kv_store.clone(),
2596        metrics.clone(),
2597    ))?;
2598
2599    // if run_with_range is enabled we want to prevent any transactions
2600    // run_with_range = None is normal operating conditions
2601    if config.run_with_range.is_none() {
2602        server.register_module(TransactionBuilderApi::new(state.clone()))?;
2603    }
2604    server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2605    server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2606
2607    if let Some(transaction_orchestrator) = transaction_orchestrator {
2608        server.register_module(TransactionExecutionApi::new(
2609            state.clone(),
2610            transaction_orchestrator.clone(),
2611            metrics.clone(),
2612        ))?;
2613    }
2614
2615    let name_service_config = if let (
2616        Some(package_address),
2617        Some(registry_id),
2618        Some(reverse_registry_id),
2619    ) = (
2620        config.name_service_package_address,
2621        config.name_service_registry_id,
2622        config.name_service_reverse_registry_id,
2623    ) {
2624        sui_name_service::NameServiceConfig::new(package_address, registry_id, reverse_registry_id)
2625    } else {
2626        match state.get_chain_identifier().chain() {
2627            Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2628            Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2629            Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2630        }
2631    };
2632
2633    server.register_module(IndexerApi::new(
2634        state.clone(),
2635        ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2636        kv_store,
2637        name_service_config,
2638        metrics,
2639        config.indexer_max_subscriptions,
2640    ))?;
2641    server.register_module(MoveUtils::new(state.clone()))?;
2642
2643    let server_type = config.jsonrpc_server_type();
2644
2645    Ok(server.to_router(server_type).await?)
2646}
2647
2648async fn build_http_servers(
2649    state: Arc<AuthorityState>,
2650    store: RocksDbStore,
2651    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2652    config: &NodeConfig,
2653    prometheus_registry: &Registry,
2654    server_version: ServerVersion,
2655    node_role: NodeRole,
2656    embedded_rpc_store: Option<&EmbeddedRpcStore>,
2657) -> Result<(
2658    HttpServers,
2659    Option<tokio::sync::broadcast::Sender<Arc<Checkpoint>>>,
2660)> {
2661    // Validators do not expose these APIs
2662    if !node_role.should_run_rpc_servers() {
2663        return Ok((HttpServers::default(), None));
2664    }
2665
2666    info!("starting rpc service with config: {:?}", config.rpc);
2667
2668    let mut router = axum::Router::new();
2669
2670    // The JSON-RPC service can be disabled independently of the gRPC/REST
2671    // service and of JSON-RPC indexing, so that a node can keep indexing
2672    // without exposing the JSON-RPC endpoints.
2673    if config.json_rpc_enabled() {
2674        router = router.merge(
2675            build_json_rpc_router(
2676                &state,
2677                transaction_orchestrator,
2678                config,
2679                prometheus_registry,
2680            )
2681            .await?,
2682        );
2683    } else {
2684        info!("json-rpc service is disabled");
2685    }
2686
2687    // When the embedded rpc-store is active, gate checkpoint delivery on the
2688    // index so a client that waits for a checkpoint can immediately read its
2689    // indexed state (matching the legacy synchronously-committed index).
2690    let indexed_checkpoint = embedded_rpc_store.map(|embedded| embedded.indexed_checkpoint_fn());
2691    let (subscription_service_checkpoint_sender, subscription_service_handle) =
2692        SubscriptionService::build(prometheus_registry, indexed_checkpoint);
2693    let rpc_router = {
2694        // Serve the index read paths from the embedded rpc-store when it
2695        // is enabled, otherwise from the legacy `rpc-index`. Raw chain
2696        // data comes from the perpetual / checkpoint stores either way.
2697        let reader: Arc<dyn RpcStateReader> = match embedded_rpc_store {
2698            Some(embedded) => Arc::new(RpcStoreReadStore::new(
2699                state.clone(),
2700                store,
2701                embedded.reader(),
2702            )),
2703            None => Arc::new(RestReadStore::new(state.clone(), store)),
2704        };
2705        let mut rpc_service = sui_rpc_api::RpcService::new(reader);
2706        rpc_service.with_server_version(server_version);
2707
2708        if let Some(config) = config.rpc.clone() {
2709            config.validate()?;
2710            rpc_service.with_config(config);
2711        }
2712
2713        rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2714        rpc_service.with_subscription_service(subscription_service_handle);
2715
2716        if let Some(transaction_orchestrator) = transaction_orchestrator {
2717            rpc_service.with_executor(transaction_orchestrator.clone())
2718        }
2719
2720        rpc_service.into_router().await
2721    };
2722
2723    let layers = ServiceBuilder::new()
2724        .map_request(|mut request: axum::http::Request<_>| {
2725            if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2726                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2727                request.extensions_mut().insert(axum_connect_info);
2728            }
2729            request
2730        })
2731        .layer(axum::middleware::from_fn(server_timing_middleware))
2732        // Setup a permissive CORS policy
2733        .layer(
2734            tower_http::cors::CorsLayer::new()
2735                .allow_methods([http::Method::GET, http::Method::POST])
2736                .allow_origin(tower_http::cors::Any)
2737                .allow_headers(tower_http::cors::Any)
2738                .expose_headers(tower_http::cors::Any),
2739        );
2740
2741    router = router.merge(rpc_router).layer(layers);
2742
2743    let https = if let Some((tls_config, https_address)) = config
2744        .rpc()
2745        .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2746    {
2747        let https = sui_http::Builder::new()
2748            .tls_single_cert(tls_config.cert(), tls_config.key())
2749            .and_then(|builder| builder.serve(https_address, router.clone()))
2750            .map_err(|e| anyhow::anyhow!(e))?;
2751
2752        info!(
2753            https_address =? https.local_addr(),
2754            "HTTPS rpc server listening on {}",
2755            https.local_addr()
2756        );
2757
2758        Some(https)
2759    } else {
2760        None
2761    };
2762
2763    let http = sui_http::Builder::new()
2764        .serve(&config.json_rpc_address, router)
2765        .map_err(|e| anyhow::anyhow!(e))?;
2766
2767    info!(
2768        http_address =? http.local_addr(),
2769        "HTTP rpc server listening on {}",
2770        http.local_addr()
2771    );
2772
2773    Ok((
2774        HttpServers {
2775            http: Some(http),
2776            https,
2777        },
2778        Some(subscription_service_checkpoint_sender),
2779    ))
2780}
2781
2782#[derive(Default)]
2783struct HttpServers {
2784    #[allow(unused)]
2785    http: Option<sui_http::ServerHandle>,
2786    #[allow(unused)]
2787    https: Option<sui_http::ServerHandle>,
2788}
2789
2790#[cfg(test)]
2791mod tests {
2792    use super::*;
2793    use prometheus::Registry;
2794    use std::collections::BTreeMap;
2795    use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2796    use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2797    use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2798
2799    #[tokio::test]
2800    async fn test_fork_error_and_recovery_both_paths() {
2801        let checkpoint_store = CheckpointStore::new_for_tests();
2802        let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2803
2804        // ---------- Checkpoint fork path ----------
2805        let seq_num = 42;
2806        let digest = CheckpointDigest::random();
2807        checkpoint_store
2808            .record_checkpoint_fork_detected(seq_num, digest)
2809            .unwrap();
2810
2811        let fork_recovery = ForkRecoveryConfig {
2812            transaction_overrides: Default::default(),
2813            checkpoint_overrides: Default::default(),
2814            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2815        };
2816
2817        let r = SuiNode::check_and_recover_forks(
2818            &checkpoint_store,
2819            &checkpoint_metrics,
2820            Some(&fork_recovery),
2821        )
2822        .await;
2823        assert!(r.is_err());
2824        assert!(
2825            r.unwrap_err()
2826                .to_string()
2827                .contains("Checkpoint fork detected")
2828        );
2829
2830        let mut checkpoint_overrides = BTreeMap::new();
2831        checkpoint_overrides.insert(seq_num, digest.to_string());
2832        let fork_recovery_with_override = ForkRecoveryConfig {
2833            transaction_overrides: Default::default(),
2834            checkpoint_overrides,
2835            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2836        };
2837        let r = SuiNode::check_and_recover_forks(
2838            &checkpoint_store,
2839            &checkpoint_metrics,
2840            Some(&fork_recovery_with_override),
2841        )
2842        .await;
2843        assert!(r.is_ok());
2844        assert!(
2845            checkpoint_store
2846                .get_checkpoint_fork_detected()
2847                .unwrap()
2848                .is_none()
2849        );
2850
2851        // ---------- Transaction fork path ----------
2852        let tx_digest = TransactionDigest::random();
2853        let expected_effects = TransactionEffectsDigest::random();
2854        let actual_effects = TransactionEffectsDigest::random();
2855        checkpoint_store
2856            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2857            .unwrap();
2858
2859        let fork_recovery = ForkRecoveryConfig {
2860            transaction_overrides: Default::default(),
2861            checkpoint_overrides: Default::default(),
2862            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2863        };
2864        let r = SuiNode::check_and_recover_forks(
2865            &checkpoint_store,
2866            &checkpoint_metrics,
2867            Some(&fork_recovery),
2868        )
2869        .await;
2870        assert!(r.is_err());
2871        assert!(
2872            r.unwrap_err()
2873                .to_string()
2874                .contains("Transaction fork detected")
2875        );
2876
2877        let mut transaction_overrides = BTreeMap::new();
2878        transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2879        let fork_recovery_with_override = ForkRecoveryConfig {
2880            transaction_overrides,
2881            checkpoint_overrides: Default::default(),
2882            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2883        };
2884        let r = SuiNode::check_and_recover_forks(
2885            &checkpoint_store,
2886            &checkpoint_metrics,
2887            Some(&fork_recovery_with_override),
2888        )
2889        .await;
2890        assert!(r.is_ok());
2891        assert!(
2892            checkpoint_store
2893                .get_transaction_fork_detected()
2894                .unwrap()
2895                .is_none()
2896        );
2897    }
2898}