sui_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::admission_queue::{
29    AdmissionQueueContext, AdmissionQueueManager, AdmissionQueueMetrics,
30};
31use sui_core::authority::ExecutionEnv;
32use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
33use sui_core::authority::backpressure::BackpressureManager;
34use sui_core::authority::epoch_start_configuration::EpochFlag;
35use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
36use sui_core::consensus_adapter::ConsensusClient;
37use sui_core::consensus_manager::UpdatableConsensusClient;
38use sui_core::epoch::randomness::RandomnessManager;
39use sui_core::execution_cache::build_execution_cache;
40use sui_core::randomness_round_receiver::{RandomnessRoundReceiver, RandomnessRoundReceiverHandle};
41use sui_network::endpoint_manager::{AddressSource, EndpointId};
42use sui_network::validator::server::SUI_TLS_SERVER_NAME;
43use sui_types::full_checkpoint_content::Checkpoint;
44use sui_types::node_role::NodeRole;
45
46use sui_core::global_state_hasher::GlobalStateHashMetrics;
47use sui_core::storage::RestReadStore;
48use sui_json_rpc::bridge_api::BridgeReadApi;
49use sui_json_rpc_api::JsonRpcMetrics;
50use sui_network::randomness;
51use sui_rpc_api::RpcMetrics;
52use sui_rpc_api::ServerVersion;
53use sui_rpc_api::subscription::SubscriptionService;
54use sui_types::base_types::ConciseableName;
55use sui_types::crypto::RandomnessRound;
56use sui_types::digests::{
57    ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
58};
59use sui_types::messages_consensus::AuthorityCapabilitiesV2;
60use sui_types::sui_system_state::SuiSystemState;
61use tap::tap::TapFallible;
62use tokio::sync::oneshot;
63use tokio::sync::{Mutex, broadcast, mpsc};
64use tokio::task::JoinHandle;
65use tower::ServiceBuilder;
66use tracing::{Instrument, error_span, info};
67use tracing::{debug, error, warn};
68
69// Logs at debug level in test configuration, info level otherwise.
70// JWK logs cause significant volume in tests, but are insignificant in prod,
71// so we keep them at info
72macro_rules! jwk_log {
73    ($($arg:tt)+) => {
74        if in_test_configuration() {
75            debug!($($arg)+);
76        } else {
77            info!($($arg)+);
78        }
79    };
80}
81
82use fastcrypto_zkp::bn254::zk_login::JWK;
83pub use handle::SuiNodeHandle;
84use mysten_metrics::{RegistryService, spawn_monitored_task};
85use mysten_service::server_timing::server_timing_middleware;
86use sui_config::node::{DBCheckpointConfig, RunWithRange};
87use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
88use sui_config::node_config_metrics::NodeConfigMetrics;
89use sui_config::{ConsensusConfig, NodeConfig};
90use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
91use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
92use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
93use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
94use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
95use sui_core::authority_aggregator::AuthorityAggregator;
96use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
97use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
98use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
99use sui_core::checkpoints::{
100    CheckpointMetrics, CheckpointOutput, CheckpointService, CheckpointStore, LogCheckpointOutput,
101    SendCheckpointToStateSync, SubmitCheckpointToConsensus,
102};
103use sui_core::consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics};
104use sui_core::consensus_manager::ConsensusManager;
105use sui_core::consensus_throughput_calculator::ConsensusThroughputCalculator;
106use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
107use sui_core::db_checkpoint_handler::DBCheckpointHandler;
108use sui_core::epoch::committee_store::CommitteeStore;
109use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
110use sui_core::epoch::epoch_metrics::EpochMetrics;
111use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
112use sui_core::global_state_hasher::GlobalStateHasher;
113use sui_core::jsonrpc_index::IndexStore;
114use sui_core::module_cache_metrics::ResolverMetrics;
115use sui_core::overload_monitor::overload_monitor;
116use sui_core::rpc_index::RpcIndexStore;
117use sui_core::signature_verifier::SignatureVerifierMetrics;
118use sui_core::storage::RocksDbStore;
119use sui_core::transaction_orchestrator::TransactionOrchestrator;
120use sui_core::{
121    authority::{AuthorityState, AuthorityStore},
122    authority_client::NetworkAuthorityClient,
123};
124use sui_json_rpc::JsonRpcServerBuilder;
125use sui_json_rpc::coin_api::CoinReadApi;
126use sui_json_rpc::governance_api::GovernanceReadApi;
127use sui_json_rpc::indexer_api::IndexerApi;
128use sui_json_rpc::move_utils::MoveUtils;
129use sui_json_rpc::read_api::ReadApi;
130use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
131use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
132use sui_macros::fail_point;
133use sui_macros::{fail_point_async, replay_log};
134use sui_network::api::ValidatorServer;
135use sui_network::discovery;
136use sui_network::endpoint_manager::EndpointManager;
137use sui_network::state_sync;
138use sui_network::validator::server::ServerBuilder;
139use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
140use sui_snapshot::uploader::StateSnapshotUploader;
141use sui_storage::{
142    http_key_value_store::HttpKVStore,
143    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
144    key_value_store_metrics::KeyValueStoreMetrics,
145};
146use sui_types::base_types::{AuthorityName, EpochId};
147use sui_types::committee::Committee;
148use sui_types::crypto::KeypairTraits;
149use sui_types::error::{SuiError, SuiResult};
150use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
151use sui_types::sui_system_state::SuiSystemStateTrait;
152use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
153use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
154use sui_types::supported_protocol_versions::SupportedProtocolVersions;
155use typed_store::DBMetrics;
156use typed_store::rocks::default_db_options;
157
158use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
159
160pub mod admin;
161mod handle;
162pub mod metrics;
163
164pub struct ValidatorComponents {
165    validator_server_handle: Option<SpawnOnce>,
166    validator_overload_monitor_handle: Option<JoinHandle<()>>,
167    consensus_manager: Arc<ConsensusManager>,
168    consensus_store_pruner: ConsensusStorePruner,
169    consensus_adapter: Arc<ConsensusAdapter>,
170    checkpoint_metrics: Arc<CheckpointMetrics>,
171    sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
172    admission_queue: Option<AdmissionQueueContext>,
173}
174pub struct P2pComponents {
175    p2p_network: Network,
176    known_peers: HashMap<PeerId, String>,
177    discovery_handle: discovery::Handle,
178    state_sync_handle: state_sync::Handle,
179    randomness_handle: randomness::Handle,
180    endpoint_manager: EndpointManager,
181}
182
183#[cfg(msim)]
184mod simulator {
185    use std::sync::atomic::AtomicBool;
186    use sui_types::error::SuiErrorKind;
187
188    use super::*;
189    pub(super) struct SimState {
190        pub sim_node: sui_simulator::runtime::NodeHandle,
191        pub sim_safe_mode_expected: AtomicBool,
192        _leak_detector: sui_simulator::NodeLeakDetector,
193    }
194
195    impl Default for SimState {
196        fn default() -> Self {
197            Self {
198                sim_node: sui_simulator::runtime::NodeHandle::current(),
199                sim_safe_mode_expected: AtomicBool::new(false),
200                _leak_detector: sui_simulator::NodeLeakDetector::new(),
201            }
202        }
203    }
204
205    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
206        + Send
207        + Sync
208        + 'static;
209
210    fn default_fetch_jwks(
211        _authority: AuthorityName,
212        _provider: &OIDCProvider,
213    ) -> SuiResult<Vec<(JwkId, JWK)>> {
214        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
215        // Just load a default Twitch jwk for testing.
216        parse_jwks(
217            sui_types::zk_login_util::DEFAULT_JWK_BYTES,
218            &OIDCProvider::Twitch,
219            true,
220        )
221        .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
222    }
223
224    thread_local! {
225        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
226    }
227
228    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
229        JWK_INJECTOR.with(|injector| injector.borrow().clone())
230    }
231
232    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
233        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
234    }
235}
236
237#[cfg(msim)]
238pub use simulator::set_jwk_injector;
239#[cfg(msim)]
240use simulator::*;
241use sui_core::authority::authority_store_pruner::PrunerWatermarks;
242use sui_core::{
243    consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
244};
245
246const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
247
248pub struct SuiNode {
249    config: NodeConfig,
250    validator_components: Mutex<Option<ValidatorComponents>>,
251
252    /// The http servers responsible for serving RPC traffic (gRPC and JSON-RPC)
253    #[allow(unused)]
254    http_servers: HttpServers,
255
256    state: Arc<AuthorityState>,
257    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
258    registry_service: RegistryService,
259    metrics: Arc<SuiNodeMetrics>,
260    checkpoint_metrics: Arc<CheckpointMetrics>,
261
262    _discovery: discovery::Handle,
263    _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
264    state_sync_handle: state_sync::Handle,
265    randomness_handle: randomness::Handle,
266    checkpoint_store: Arc<CheckpointStore>,
267    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
268
269    /// Broadcast channel to send the starting system state for the next epoch.
270    end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
271
272    /// EndpointManager for updating peer network addresses.
273    endpoint_manager: EndpointManager,
274
275    backpressure_manager: Arc<BackpressureManager>,
276
277    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
278
279    #[cfg(msim)]
280    sim_state: SimState,
281
282    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
283    // Channel to allow signaling upstream to shutdown sui-node
284    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
285
286    /// Handle shared with RandomnessManager and the consensus layer.
287    randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
288
289    /// AuthorityAggregator of the network, created at start and beginning of each epoch.
290    /// Use ArcSwap so that we could mutate it without taking mut reference.
291    // TODO: Eventually we can make this auth aggregator a shared reference so that this
292    // update will automatically propagate to other uses.
293    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
294
295    subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
296}
297
298impl fmt::Debug for SuiNode {
299    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
300        f.debug_struct("SuiNode")
301            .field("name", &self.state.name.concise())
302            .finish()
303    }
304}
305
306static MAX_JWK_KEYS_PER_FETCH: usize = 100;
307
308impl SuiNode {
309    pub async fn start(
310        config: NodeConfig,
311        registry_service: RegistryService,
312    ) -> Result<Arc<SuiNode>> {
313        Self::start_async(
314            config,
315            registry_service,
316            ServerVersion::new("sui-node", "unknown"),
317        )
318        .await
319    }
320
321    fn start_jwk_updater(
322        config: &NodeConfig,
323        metrics: Arc<SuiNodeMetrics>,
324        authority: AuthorityName,
325        epoch_store: Arc<AuthorityPerEpochStore>,
326        consensus_adapter: Arc<ConsensusAdapter>,
327    ) {
328        let epoch = epoch_store.epoch();
329
330        let supported_providers = config
331            .zklogin_oauth_providers
332            .get(&epoch_store.get_chain_identifier().chain())
333            .unwrap_or(&BTreeSet::new())
334            .iter()
335            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
336            .collect::<Vec<_>>();
337
338        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
339
340        info!(
341            ?fetch_interval,
342            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
343        );
344
345        fn validate_jwk(
346            metrics: &Arc<SuiNodeMetrics>,
347            provider: &OIDCProvider,
348            id: &JwkId,
349            jwk: &JWK,
350        ) -> bool {
351            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
352                warn!(
353                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
354                    id.iss, provider
355                );
356                metrics
357                    .invalid_jwks
358                    .with_label_values(&[&provider.to_string()])
359                    .inc();
360                return false;
361            };
362
363            if iss_provider != *provider {
364                warn!(
365                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
366                    id.iss, provider, iss_provider
367                );
368                metrics
369                    .invalid_jwks
370                    .with_label_values(&[&provider.to_string()])
371                    .inc();
372                return false;
373            }
374
375            if !check_total_jwk_size(id, jwk) {
376                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
377                metrics
378                    .invalid_jwks
379                    .with_label_values(&[&provider.to_string()])
380                    .inc();
381                return false;
382            }
383
384            true
385        }
386
387        // metrics is:
388        //  pub struct SuiNodeMetrics {
389        //      pub jwk_requests: IntCounterVec,
390        //      pub jwk_request_errors: IntCounterVec,
391        //      pub total_jwks: IntCounterVec,
392        //      pub unique_jwks: IntCounterVec,
393        //  }
394
395        for p in supported_providers.into_iter() {
396            let provider_str = p.to_string();
397            let epoch_store = epoch_store.clone();
398            let consensus_adapter = consensus_adapter.clone();
399            let metrics = metrics.clone();
400            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
401                async move {
402                    // note: restart-safe de-duplication happens after consensus, this is
403                    // just best-effort to reduce unneeded submissions.
404                    let mut seen = HashSet::new();
405                    loop {
406                        jwk_log!("fetching JWK for provider {:?}", p);
407                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
408                        match Self::fetch_jwks(authority, &p).await {
409                            Err(e) => {
410                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
411                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
412                                // Retry in 30 seconds
413                                tokio::time::sleep(Duration::from_secs(30)).await;
414                                continue;
415                            }
416                            Ok(mut keys) => {
417                                metrics.total_jwks
418                                    .with_label_values(&[&provider_str])
419                                    .inc_by(keys.len() as u64);
420
421                                keys.retain(|(id, jwk)| {
422                                    validate_jwk(&metrics, &p, id, jwk) &&
423                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
424                                    seen.insert((id.clone(), jwk.clone()))
425                                });
426
427                                metrics.unique_jwks
428                                    .with_label_values(&[&provider_str])
429                                    .inc_by(keys.len() as u64);
430
431                                // prevent oauth providers from sending too many keys,
432                                // inadvertently or otherwise
433                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
434                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
435                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
436                                }
437
438                                for (id, jwk) in keys.into_iter() {
439                                    jwk_log!("Submitting JWK to consensus: {:?}", id);
440
441                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
442                                    consensus_adapter.submit(txn, None, &epoch_store, None, None)
443                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
444                                        .ok();
445                                }
446                            }
447                        }
448                        tokio::time::sleep(fetch_interval).await;
449                    }
450                }
451                .instrument(error_span!("jwk_updater_task", epoch)),
452            ));
453        }
454    }
455
456    pub async fn start_async(
457        config: NodeConfig,
458        registry_service: RegistryService,
459        server_version: ServerVersion,
460    ) -> Result<Arc<SuiNode>> {
461        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
462        let mut config = config.clone();
463        if config.supported_protocol_versions.is_none() {
464            info!(
465                "populating config.supported_protocol_versions with default {:?}",
466                SupportedProtocolVersions::SYSTEM_DEFAULT
467            );
468            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
469        }
470
471        let run_with_range = config.run_with_range;
472        let prometheus_registry = registry_service.default_registry();
473        let node_role = config.intended_node_role();
474
475        info!(node =? config.protocol_public_key(),
476            "Initializing sui-node listening on {} with role {:?}", config.network_address, node_role
477        );
478
479        // Initialize metrics to track db usage before creating any stores
480        DBMetrics::init(registry_service.clone());
481
482        // Initialize db sync-to-disk setting from config (falls back to env var if not set)
483        typed_store::init_write_sync(config.enable_db_sync_to_disk);
484
485        // Initialize Mysten metrics.
486        mysten_metrics::init_metrics(&prometheus_registry);
487        // Unsupported (because of the use of static variable) and unnecessary in simtests.
488        #[cfg(not(msim))]
489        mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
490
491        let genesis = config.genesis()?.clone();
492
493        let secret = Arc::pin(config.protocol_key_pair().copy());
494        let genesis_committee = genesis.committee();
495        let committee_store = Arc::new(CommitteeStore::new(
496            config.db_path().join("epochs"),
497            &genesis_committee,
498            None,
499        ));
500
501        let pruner_watermarks = Arc::new(PrunerWatermarks::default());
502        let checkpoint_store = CheckpointStore::new(
503            &config.db_path().join("checkpoints"),
504            pruner_watermarks.clone(),
505        );
506        let checkpoint_metrics = CheckpointMetrics::new(&registry_service.default_registry());
507
508        if node_role.runs_consensus() {
509            Self::check_and_recover_forks(
510                &checkpoint_store,
511                &checkpoint_metrics,
512                config.fork_recovery.as_ref(),
513            )
514            .await?;
515        }
516
517        // By default, only enable write stall on nodes that run consensus.
518        let enable_write_stall = config
519            .enable_db_write_stall
520            .unwrap_or(node_role.runs_consensus());
521        // The tidehunter objects compactor retains only the latest version per
522        // ObjectID and is mutually exclusive with the object pruner. Enable it
523        // for validators (which always disable the pruner), and also for any
524        // node configured with `num_epochs_to_retain = 0` — that aggressive
525        // setting is what the compactor replaces. The pruner is force-disabled
526        // in `AuthorityStorePruner::new` whenever this is true.
527        let enable_objects_compactor = node_role.is_validator()
528            || config.authority_store_pruning_config.num_epochs_to_retain == 0;
529        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
530            enable_write_stall,
531            enable_objects_compactor,
532        };
533        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
534            &config.db_store_path(),
535            Some(perpetual_tables_options),
536            Some(pruner_watermarks.epoch_id.clone()),
537        ));
538        let is_genesis = perpetual_tables
539            .database_is_empty()
540            .expect("Database read should not fail at init.");
541
542        let backpressure_manager =
543            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
544
545        let store =
546            AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
547
548        let cur_epoch = store.get_recovery_epoch_at_restart()?;
549        let committee = committee_store
550            .get_committee(&cur_epoch)?
551            .expect("Committee of the current epoch must exist");
552        let epoch_start_configuration = store
553            .get_epoch_start_configuration()?
554            .expect("EpochStartConfiguration of the current epoch must exist");
555        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
556        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
557
558        let cache_traits = build_execution_cache(
559            &config.execution_cache,
560            &prometheus_registry,
561            &store,
562            backpressure_manager.clone(),
563        );
564
565        let auth_agg = {
566            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
567            Arc::new(ArcSwap::new(Arc::new(
568                AuthorityAggregator::new_from_epoch_start_state(
569                    epoch_start_configuration.epoch_start_state(),
570                    &committee_store,
571                    safe_client_metrics_base,
572                ),
573            )))
574        };
575
576        let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
577        let chain = match config.chain_override_for_testing {
578            Some(chain) => chain,
579            None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
580        };
581
582        let highest_executed_checkpoint = checkpoint_store
583            .get_highest_executed_checkpoint_seq_number()
584            .expect("checkpoint store read cannot fail")
585            .unwrap_or(0);
586
587        let previous_epoch_last_checkpoint = if cur_epoch == 0 {
588            0
589        } else {
590            checkpoint_store
591                .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
592                .expect("checkpoint store read cannot fail")
593                .unwrap_or(highest_executed_checkpoint)
594        };
595
596        let epoch_options = default_db_options().optimize_db_for_write_throughput(4, false);
597        let epoch_store = AuthorityPerEpochStore::new(
598            config.protocol_public_key(),
599            committee.clone(),
600            &config.db_store_path(),
601            Some(epoch_options.options),
602            EpochMetrics::new(&registry_service.default_registry()),
603            epoch_start_configuration,
604            cache_traits.backing_package_store.clone(),
605            cache_traits.object_store.clone(),
606            cache_metrics,
607            signature_verifier_metrics,
608            &config.expensive_safety_check_config,
609            (chain_id, chain),
610            highest_executed_checkpoint,
611            previous_epoch_last_checkpoint,
612            Arc::new(SubmittedTransactionCacheMetrics::new(
613                &registry_service.default_registry(),
614            )),
615            config.fullnode_sync_mode,
616        )?;
617
618        info!("created epoch store");
619
620        replay_log!(
621            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
622            epoch_store.epoch(),
623            epoch_store.protocol_config()
624        );
625
626        // the database is empty at genesis time
627        if is_genesis {
628            info!("checking SUI conservation at genesis");
629            // When we are opening the db table, the only time when it's safe to
630            // check SUI conservation is at genesis. Otherwise we may be in the middle of
631            // an epoch and the SUI conservation check will fail. This also initialize
632            // the expected_network_sui_amount table.
633            cache_traits
634                .reconfig_api
635                .expensive_check_sui_conservation(&epoch_store)
636                .expect("SUI conservation check cannot fail at genesis");
637        }
638
639        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
640        let default_buffer_stake = epoch_store
641            .protocol_config()
642            .buffer_stake_for_protocol_upgrade_bps();
643        if effective_buffer_stake != default_buffer_stake {
644            warn!(
645                ?effective_buffer_stake,
646                ?default_buffer_stake,
647                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
648            );
649        }
650
651        checkpoint_store.insert_genesis_checkpoint(
652            genesis.checkpoint(),
653            genesis.checkpoint_contents().clone(),
654            &epoch_store,
655        );
656
657        info!("creating state sync store");
658        let state_sync_store = RocksDbStore::new(
659            cache_traits.clone(),
660            committee_store.clone(),
661            checkpoint_store.clone(),
662        );
663
664        let index_store =
665            if node_role.should_enable_index_processing() && config.enable_index_processing {
666                info!("creating jsonrpc index store");
667                Some(Arc::new(IndexStore::new(
668                    config.db_path().join("indexes"),
669                    &prometheus_registry,
670                    epoch_store
671                        .protocol_config()
672                        .max_move_identifier_len_as_option(),
673                    config.remove_deprecated_tables,
674                )))
675            } else {
676                None
677            };
678
679        let rpc_index = if node_role.should_enable_index_processing()
680            && config.rpc().is_some_and(|rpc| rpc.enable_indexing())
681        {
682            info!("creating rpc index store");
683            Some(Arc::new(
684                RpcIndexStore::new(
685                    &config.db_path(),
686                    &store,
687                    &checkpoint_store,
688                    &epoch_store,
689                    &cache_traits.backing_package_store,
690                    config.rpc().cloned().unwrap_or_default(),
691                )
692                .await,
693            ))
694        } else {
695            None
696        };
697
698        let chain_identifier = epoch_store.get_chain_identifier();
699
700        info!("creating archive reader");
701        // Create network
702        let (randomness_tx, randomness_rx) = mpsc::channel(
703            config
704                .p2p_config
705                .randomness
706                .clone()
707                .unwrap_or_default()
708                .mailbox_capacity(),
709        );
710        let P2pComponents {
711            p2p_network,
712            known_peers,
713            discovery_handle,
714            state_sync_handle,
715            randomness_handle,
716            endpoint_manager,
717        } = Self::create_p2p_network(
718            &config,
719            state_sync_store.clone(),
720            chain_identifier,
721            randomness_tx,
722            &prometheus_registry,
723        )?;
724
725        // Inject configured peer address overrides.
726        for peer in &config.p2p_config.peer_address_overrides {
727            endpoint_manager
728                .update_endpoint(
729                    EndpointId::P2p(peer.peer_id),
730                    AddressSource::Config,
731                    peer.addresses.clone(),
732                )
733                .expect("Updating peer address overrides should not fail");
734        }
735
736        // Send initial peer addresses to the p2p network.
737        update_peer_addresses(
738            &config,
739            &endpoint_manager,
740            epoch_store.epoch_start_state(),
741            None,
742        );
743
744        info!("start snapshot upload");
745        // Start uploading state snapshot to remote store
746        let state_snapshot_handle = Self::start_state_snapshot(
747            &config,
748            &prometheus_registry,
749            checkpoint_store.clone(),
750            chain_identifier,
751        )?;
752
753        // Start uploading db checkpoints to remote store
754        info!("start db checkpoint");
755        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
756            &config,
757            &prometheus_registry,
758            state_snapshot_handle.is_some(),
759        )?;
760
761        if !epoch_store
762            .protocol_config()
763            .simplified_unwrap_then_delete()
764        {
765            // We cannot prune tombstones if simplified_unwrap_then_delete is not enabled.
766            config
767                .authority_store_pruning_config
768                .set_killswitch_tombstone_pruning(true);
769        }
770
771        let authority_name = config.protocol_public_key();
772
773        info!("create authority state");
774        let state = AuthorityState::new(
775            authority_name,
776            secret,
777            config.supported_protocol_versions.unwrap(),
778            store.clone(),
779            cache_traits.clone(),
780            epoch_store.clone(),
781            committee_store.clone(),
782            index_store.clone(),
783            rpc_index,
784            checkpoint_store.clone(),
785            &prometheus_registry,
786            genesis.objects(),
787            &db_checkpoint_config,
788            config.clone(),
789            chain_identifier,
790            config.policy_config.clone(),
791            config.firewall_config.clone(),
792            pruner_watermarks,
793        )
794        .await;
795        // ensure genesis txn was executed
796        if epoch_store.epoch() == 0 {
797            let txn = &genesis.transaction();
798            let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
799            let transaction =
800                sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
801                    sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
802                        genesis.transaction().data().clone(),
803                        sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
804                    ),
805                );
806            let _enter = span.enter();
807            state
808                .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
809                .unwrap();
810        }
811
812        // Start the loop that receives new randomness and generates transactions for it.
813        // The returned is long-lived (node lifetime).
814        let randomness_receiver_handle =
815            RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
816
817        let (end_of_epoch_channel, end_of_epoch_receiver) =
818            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
819
820        let transaction_orchestrator = if node_role.is_fullnode() && run_with_range.is_none() {
821            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
822                auth_agg.load_full(),
823                state.clone(),
824                end_of_epoch_receiver,
825                &config.db_path(),
826                &prometheus_registry,
827                &config,
828            )))
829        } else {
830            None
831        };
832
833        let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
834            state.clone(),
835            state_sync_store,
836            &transaction_orchestrator.clone(),
837            &config,
838            &prometheus_registry,
839            server_version,
840            node_role,
841        )
842        .await?;
843
844        let global_state_hasher = Arc::new(GlobalStateHasher::new(
845            cache_traits.global_state_hash_store.clone(),
846            GlobalStateHashMetrics::new(&prometheus_registry),
847        ));
848
849        let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
850            "sui",
851            &registry_service.default_registry(),
852        );
853
854        let connection_monitor_handle =
855            mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
856                p2p_network.downgrade(),
857                Arc::new(network_connection_metrics),
858                known_peers,
859            );
860
861        let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));
862
863        sui_node_metrics
864            .binary_max_protocol_version
865            .set(ProtocolVersion::MAX.as_u64() as i64);
866        sui_node_metrics
867            .configured_max_protocol_version
868            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
869
870        let node_role = epoch_store.node_role();
871        let validator_components = if node_role.runs_consensus() {
872            let mut components = Self::construct_validator_components(
873                config.clone(),
874                state.clone(),
875                committee,
876                epoch_store.clone(),
877                checkpoint_store.clone(),
878                state_sync_handle.clone(),
879                randomness_handle.clone(),
880                Arc::downgrade(&global_state_hasher),
881                backpressure_manager.clone(),
882                &registry_service,
883                sui_node_metrics.clone(),
884                checkpoint_metrics.clone(),
885                node_role,
886                randomness_receiver_handle.clone(),
887            )
888            .await?;
889
890            if node_role.is_validator() {
891                components
892                    .consensus_adapter
893                    .recover_end_of_publish(&epoch_store);
894
895                // Start the gRPC server
896                components.validator_server_handle = Some(
897                    components
898                        .validator_server_handle
899                        .take()
900                        .unwrap()
901                        .start()
902                        .await,
903                );
904
905                // Set the consensus address updater so that we can update the consensus peer addresses when requested.
906                endpoint_manager
907                    .set_consensus_address_updater(components.consensus_manager.clone());
908            } else {
909                info!("Starting node as Observer — connecting to configured peers");
910            }
911
912            Some(components)
913        } else {
914            None
915        };
916
917        // setup shutdown channel
918        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
919
920        let node = Self {
921            config,
922            validator_components: Mutex::new(validator_components),
923            http_servers,
924            state,
925            transaction_orchestrator,
926            registry_service,
927            metrics: sui_node_metrics,
928            checkpoint_metrics,
929
930            _discovery: discovery_handle,
931            _connection_monitor_handle: connection_monitor_handle,
932            state_sync_handle,
933            randomness_handle,
934            checkpoint_store,
935            global_state_hasher: Mutex::new(Some(global_state_hasher)),
936            end_of_epoch_channel,
937            endpoint_manager,
938            backpressure_manager,
939
940            _db_checkpoint_handle: db_checkpoint_handle,
941
942            #[cfg(msim)]
943            sim_state: Default::default(),
944
945            _state_snapshot_uploader_handle: state_snapshot_handle,
946            shutdown_channel_tx: shutdown_channel,
947            randomness_receiver_handle,
948
949            auth_agg,
950            subscription_service_checkpoint_sender,
951        };
952
953        info!("SuiNode started!");
954        let node = Arc::new(node);
955        let node_copy = node.clone();
956        spawn_monitored_task!(async move {
957            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
958            if let Err(error) = result {
959                warn!("Reconfiguration finished with error {:?}", error);
960            }
961        });
962
963        Ok(node)
964    }
965
966    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
967        self.end_of_epoch_channel.subscribe()
968    }
969
970    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
971        self.shutdown_channel_tx.subscribe()
972    }
973
974    pub fn current_epoch_for_testing(&self) -> EpochId {
975        self.state.current_epoch_for_testing()
976    }
977
978    pub fn db_checkpoint_path(&self) -> PathBuf {
979        self.config.db_checkpoint_path()
980    }
981
982    // Init reconfig process by starting to reject user certs
983    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
984        info!("close_epoch (current epoch = {})", epoch_store.epoch());
985        self.validator_components
986            .lock()
987            .await
988            .as_ref()
989            .ok_or_else(|| SuiError::from("Node is not a validator"))?
990            .consensus_adapter
991            .close_epoch(epoch_store);
992        Ok(())
993    }
994
995    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
996        self.state
997            .clear_override_protocol_upgrade_buffer_stake(epoch)
998    }
999
1000    pub fn set_override_protocol_upgrade_buffer_stake(
1001        &self,
1002        epoch: EpochId,
1003        buffer_stake_bps: u64,
1004    ) -> SuiResult {
1005        self.state
1006            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1007    }
1008
1009    // Testing-only API to start epoch close process.
1010    // For production code, please use the non-testing version.
1011    pub async fn close_epoch_for_testing(&self) -> SuiResult {
1012        let epoch_store = self.state.epoch_store_for_testing();
1013        self.close_epoch(&epoch_store).await
1014    }
1015
1016    fn start_state_snapshot(
1017        config: &NodeConfig,
1018        prometheus_registry: &Registry,
1019        checkpoint_store: Arc<CheckpointStore>,
1020        chain_identifier: ChainIdentifier,
1021    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1022        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1023            let snapshot_uploader = StateSnapshotUploader::new(
1024                &config.db_checkpoint_path(),
1025                &config.snapshot_path(),
1026                remote_store_config.clone(),
1027                60,
1028                prometheus_registry,
1029                checkpoint_store,
1030                chain_identifier,
1031                config.state_snapshot_write_config.archive_interval_epochs,
1032            )?;
1033            Ok(Some(snapshot_uploader.start()))
1034        } else {
1035            Ok(None)
1036        }
1037    }
1038
1039    fn start_db_checkpoint(
1040        config: &NodeConfig,
1041        prometheus_registry: &Registry,
1042        state_snapshot_enabled: bool,
1043    ) -> Result<(
1044        DBCheckpointConfig,
1045        Option<tokio::sync::broadcast::Sender<()>>,
1046    )> {
1047        let checkpoint_path = Some(
1048            config
1049                .db_checkpoint_config
1050                .checkpoint_path
1051                .clone()
1052                .unwrap_or_else(|| config.db_checkpoint_path()),
1053        );
1054        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1055            DBCheckpointConfig {
1056                checkpoint_path,
1057                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1058                    true
1059                } else {
1060                    config
1061                        .db_checkpoint_config
1062                        .perform_db_checkpoints_at_epoch_end
1063                },
1064                ..config.db_checkpoint_config.clone()
1065            }
1066        } else {
1067            config.db_checkpoint_config.clone()
1068        };
1069
1070        match (
1071            db_checkpoint_config.object_store_config.as_ref(),
1072            state_snapshot_enabled,
1073        ) {
1074            // If db checkpoint config object store not specified but
1075            // state snapshot object store is specified, create handler
1076            // anyway for marking db checkpoints as completed so that they
1077            // can be uploaded as state snapshots.
1078            (None, false) => Ok((db_checkpoint_config, None)),
1079            (_, _) => {
1080                let handler = DBCheckpointHandler::new(
1081                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1082                    db_checkpoint_config.object_store_config.as_ref(),
1083                    60,
1084                    db_checkpoint_config
1085                        .prune_and_compact_before_upload
1086                        .unwrap_or(true),
1087                    config.authority_store_pruning_config.clone(),
1088                    prometheus_registry,
1089                    state_snapshot_enabled,
1090                )?;
1091                Ok((
1092                    db_checkpoint_config,
1093                    Some(DBCheckpointHandler::start(handler)),
1094                ))
1095            }
1096        }
1097    }
1098
1099    fn create_p2p_network(
1100        config: &NodeConfig,
1101        state_sync_store: RocksDbStore,
1102        chain_identifier: ChainIdentifier,
1103        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1104        prometheus_registry: &Registry,
1105    ) -> Result<P2pComponents> {
1106        let mut p2p_config = config.p2p_config.clone();
1107        {
1108            let disc = p2p_config.discovery.get_or_insert_with(Default::default);
1109            if disc.peer_addr_store_path.is_none() {
1110                disc.peer_addr_store_path =
1111                    Some(config.db_path().join("discovery_peer_cache.yaml"));
1112            }
1113        }
1114        let mut discovery_builder = discovery::Builder::new().config(p2p_config.clone());
1115        if let Some(consensus_config) = &config.consensus_config {
1116            let effective_addr = consensus_config
1117                .external_address
1118                .as_ref()
1119                .or(consensus_config.listen_address.as_ref());
1120            if let Some(addr) = effective_addr {
1121                discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1122            }
1123        }
1124        let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1125        let discovery_sender = discovery.sender();
1126
1127        let (state_sync, state_sync_router) = state_sync::Builder::new()
1128            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1129            .store(state_sync_store)
1130            .archive_config(config.archive_reader_config())
1131            .discovery_sender(discovery_sender)
1132            .with_metrics(prometheus_registry)
1133            .build();
1134
1135        let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1136        let known_peers: HashMap<PeerId, String> = discovery_config
1137            .allowlisted_peers
1138            .clone()
1139            .into_iter()
1140            .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1141            .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1142                peer.peer_id
1143                    .map(|peer_id| (peer_id, "seed_peer".to_string()))
1144            }))
1145            .collect();
1146
1147        let (randomness, randomness_router) =
1148            randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1149                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1150                .with_metrics(prometheus_registry)
1151                .build();
1152
1153        let p2p_network = {
1154            let routes = anemo::Router::new()
1155                .add_rpc_service(discovery_server)
1156                .merge(state_sync_router);
1157            let routes = routes.merge(randomness_router);
1158
1159            let inbound_network_metrics =
1160                mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1161            let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1162                "sui",
1163                "outbound",
1164                prometheus_registry,
1165            );
1166
1167            let service = ServiceBuilder::new()
1168                .layer(
1169                    TraceLayer::new_for_server_errors()
1170                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1171                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1172                )
1173                .layer(CallbackLayer::new(
1174                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1175                        Arc::new(inbound_network_metrics),
1176                        config.p2p_config.excessive_message_size(),
1177                    ),
1178                ))
1179                .service(routes);
1180
1181            let outbound_layer = ServiceBuilder::new()
1182                .layer(
1183                    TraceLayer::new_for_client_and_server_errors()
1184                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1185                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1186                )
1187                .layer(CallbackLayer::new(
1188                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1189                        Arc::new(outbound_network_metrics),
1190                        config.p2p_config.excessive_message_size(),
1191                    ),
1192                ))
1193                .into_inner();
1194
1195            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1196            // Inbound requests on this network are small (signatures, queries, summaries).
1197            // Cap request frames at 1 MiB.
1198            anemo_config.max_request_frame_size = Some(1 << 20);
1199            // Responses can be larger (checkpoint contents).
1200            // Cap response frames at 128 MiB.
1201            anemo_config.max_response_frame_size = Some(128 << 20);
1202
1203            // Set a higher default value for socket send/receive buffers if not already
1204            // configured.
1205            let mut quic_config = anemo_config.quic.unwrap_or_default();
1206            if quic_config.socket_send_buffer_size.is_none() {
1207                quic_config.socket_send_buffer_size = Some(20 << 20);
1208            }
1209            if quic_config.socket_receive_buffer_size.is_none() {
1210                quic_config.socket_receive_buffer_size = Some(20 << 20);
1211            }
1212            quic_config.allow_failed_socket_buffer_size_setting = true;
1213
1214            // Set high-performance defaults for quinn transport.
1215            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1216            if quic_config.max_concurrent_bidi_streams.is_none() {
1217                quic_config.max_concurrent_bidi_streams = Some(500);
1218            }
1219            if quic_config.max_concurrent_uni_streams.is_none() {
1220                quic_config.max_concurrent_uni_streams = Some(500);
1221            }
1222            if quic_config.stream_receive_window.is_none() {
1223                quic_config.stream_receive_window = Some(100 << 20);
1224            }
1225            if quic_config.receive_window.is_none() {
1226                quic_config.receive_window = Some(200 << 20);
1227            }
1228            if quic_config.send_window.is_none() {
1229                quic_config.send_window = Some(200 << 20);
1230            }
1231            if quic_config.crypto_buffer_size.is_none() {
1232                quic_config.crypto_buffer_size = Some(1 << 20);
1233            }
1234            if quic_config.max_idle_timeout_ms.is_none() {
1235                quic_config.max_idle_timeout_ms = Some(10_000);
1236            }
1237            if quic_config.keep_alive_interval_ms.is_none() {
1238                quic_config.keep_alive_interval_ms = Some(5_000);
1239            }
1240            anemo_config.quic = Some(quic_config);
1241
1242            let server_name = format!("sui-{}", chain_identifier);
1243            let network = Network::bind(config.p2p_config.listen_address)
1244                .server_name(&server_name)
1245                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1246                .config(anemo_config)
1247                .outbound_request_layer(outbound_layer)
1248                .start(service)?;
1249            info!(
1250                server_name = server_name,
1251                "P2p network started on {}",
1252                network.local_addr()
1253            );
1254
1255            network
1256        };
1257
1258        let discovery_handle =
1259            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1260        let state_sync_handle = state_sync.start(p2p_network.clone());
1261        let randomness_handle = randomness.start(p2p_network.clone());
1262
1263        Ok(P2pComponents {
1264            p2p_network,
1265            known_peers,
1266            discovery_handle,
1267            state_sync_handle,
1268            randomness_handle,
1269            endpoint_manager,
1270        })
1271    }
1272
1273    async fn construct_validator_components(
1274        config: NodeConfig,
1275        state: Arc<AuthorityState>,
1276        committee: Arc<Committee>,
1277        epoch_store: Arc<AuthorityPerEpochStore>,
1278        checkpoint_store: Arc<CheckpointStore>,
1279        state_sync_handle: state_sync::Handle,
1280        randomness_handle: randomness::Handle,
1281        global_state_hasher: Weak<GlobalStateHasher>,
1282        backpressure_manager: Arc<BackpressureManager>,
1283        registry_service: &RegistryService,
1284        sui_node_metrics: Arc<SuiNodeMetrics>,
1285        checkpoint_metrics: Arc<CheckpointMetrics>,
1286        node_role: NodeRole,
1287        randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
1288    ) -> Result<ValidatorComponents> {
1289        let mut config_clone = config.clone();
1290        let consensus_config = config_clone
1291            .consensus_config
1292            .as_mut()
1293            .ok_or_else(|| anyhow!("Node is missing consensus config"))?;
1294
1295        let client = Arc::new(UpdatableConsensusClient::new());
1296        let inflight_slot_freed_notify = Arc::new(tokio::sync::Notify::new());
1297        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1298            &committee,
1299            consensus_config,
1300            state.name,
1301            &registry_service.default_registry(),
1302            client.clone(),
1303            checkpoint_store.clone(),
1304            inflight_slot_freed_notify.clone(),
1305        ));
1306
1307        let consensus_manager = Arc::new(ConsensusManager::new(
1308            &config,
1309            consensus_config,
1310            registry_service,
1311            client,
1312            node_role,
1313        ));
1314
1315        // This only gets started up once, not on every epoch. (Make call to remove every epoch.)
1316        let consensus_store_pruner = ConsensusStorePruner::new(
1317            consensus_manager.get_storage_base_path(),
1318            consensus_config.db_retention_epochs(),
1319            consensus_config.db_pruner_period(),
1320            &registry_service.default_registry(),
1321        );
1322
1323        let sui_tx_validator_metrics =
1324            SuiTxValidatorMetrics::new(&registry_service.default_registry());
1325
1326        let (validator_server_handle, admission_queue) = if node_role.is_validator() {
1327            let (handle, queue) = Self::start_grpc_validator_service(
1328                &config,
1329                state.clone(),
1330                consensus_adapter.clone(),
1331                epoch_store.clone(),
1332                &registry_service.default_registry(),
1333                inflight_slot_freed_notify,
1334            )
1335            .await?;
1336            (Some(handle), queue)
1337        } else {
1338            (None, None)
1339        };
1340
1341        // Starts an overload monitor that monitors the execution of the authority.
1342        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1343        let validator_overload_monitor_handle = if node_role.is_validator()
1344            && config
1345                .authority_overload_config
1346                .max_load_shedding_percentage
1347                > 0
1348        {
1349            let authority_state = Arc::downgrade(&state);
1350            let overload_config = config.authority_overload_config.clone();
1351            fail_point!("starting_overload_monitor");
1352            Some(spawn_monitored_task!(overload_monitor(
1353                authority_state,
1354                overload_config,
1355            )))
1356        } else {
1357            None
1358        };
1359
1360        Self::start_epoch_specific_validator_components(
1361            &config,
1362            state.clone(),
1363            consensus_adapter,
1364            checkpoint_store,
1365            epoch_store,
1366            state_sync_handle,
1367            randomness_handle,
1368            randomness_receiver_handle,
1369            consensus_manager,
1370            consensus_store_pruner,
1371            global_state_hasher,
1372            backpressure_manager,
1373            validator_server_handle,
1374            validator_overload_monitor_handle,
1375            checkpoint_metrics,
1376            sui_node_metrics,
1377            sui_tx_validator_metrics,
1378            admission_queue,
1379            node_role,
1380        )
1381        .await
1382    }
1383
1384    async fn start_epoch_specific_validator_components(
1385        config: &NodeConfig,
1386        state: Arc<AuthorityState>,
1387        consensus_adapter: Arc<ConsensusAdapter>,
1388        checkpoint_store: Arc<CheckpointStore>,
1389        epoch_store: Arc<AuthorityPerEpochStore>,
1390        state_sync_handle: state_sync::Handle,
1391        randomness_handle: randomness::Handle,
1392        randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
1393        consensus_manager: Arc<ConsensusManager>,
1394        consensus_store_pruner: ConsensusStorePruner,
1395        state_hasher: Weak<GlobalStateHasher>,
1396        backpressure_manager: Arc<BackpressureManager>,
1397        validator_server_handle: Option<SpawnOnce>,
1398        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1399        checkpoint_metrics: Arc<CheckpointMetrics>,
1400        sui_node_metrics: Arc<SuiNodeMetrics>,
1401        sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1402        admission_queue: Option<AdmissionQueueContext>,
1403        node_role: NodeRole,
1404    ) -> Result<ValidatorComponents> {
1405        let checkpoint_service = Self::build_checkpoint_service(
1406            config,
1407            consensus_adapter.clone(),
1408            checkpoint_store.clone(),
1409            epoch_store.clone(),
1410            state.clone(),
1411            state_sync_handle,
1412            state_hasher,
1413            checkpoint_metrics.clone(),
1414            node_role,
1415        );
1416
1417        // Clear the VSS public key from the previous epoch so any randomness round
1418        // signatures buffer in the channel until the new DKG completes.
1419        randomness_receiver_handle.clear_public_key();
1420
1421        if node_role.runs_consensus() && epoch_store.randomness_state_enabled() {
1422            let authority_key_pair = if node_role.is_validator() {
1423                Some(config.protocol_key_pair())
1424            } else {
1425                None
1426            };
1427            let randomness_manager = RandomnessManager::try_new(
1428                Arc::downgrade(&epoch_store),
1429                Box::new(consensus_adapter.clone()),
1430                randomness_handle,
1431                authority_key_pair,
1432                randomness_receiver_handle.clone(),
1433            )
1434            .await;
1435            if let Some(randomness_manager) = randomness_manager {
1436                epoch_store
1437                    .set_randomness_manager(randomness_manager)
1438                    .await?;
1439            }
1440        }
1441
1442        if node_role.is_validator() {
1443            ExecutionTimeObserver::spawn(
1444                epoch_store.clone(),
1445                Box::new(consensus_adapter.clone()),
1446                config
1447                    .execution_time_observer_config
1448                    .clone()
1449                    .unwrap_or_default(),
1450            );
1451        }
1452
1453        let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1454            None,
1455            state.metrics.clone(),
1456        ));
1457
1458        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1459            state.clone(),
1460            checkpoint_service.clone(),
1461            epoch_store.clone(),
1462            consensus_adapter.clone(),
1463            throughput_calculator,
1464            backpressure_manager,
1465            config.congestion_log.clone(),
1466        );
1467
1468        info!("Starting consensus manager asynchronously");
1469
1470        // Spawn consensus startup asynchronously to avoid blocking other components
1471        tokio::spawn({
1472            let config = config.clone();
1473            let epoch_store = epoch_store.clone();
1474            let sui_tx_validator = SuiTxValidator::new(
1475                state.clone(),
1476                epoch_store.clone(),
1477                checkpoint_service.clone(),
1478                sui_tx_validator_metrics.clone(),
1479            );
1480            let consensus_manager = consensus_manager.clone();
1481            async move {
1482                consensus_manager
1483                    .start(
1484                        &config,
1485                        epoch_store,
1486                        consensus_handler_initializer,
1487                        sui_tx_validator,
1488                        Some(randomness_receiver_handle),
1489                    )
1490                    .await;
1491            }
1492        });
1493        let replay_waiter = consensus_manager.replay_waiter();
1494
1495        info!("Spawning checkpoint service");
1496        let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1497            None
1498        } else {
1499            Some(replay_waiter)
1500        };
1501        checkpoint_service
1502            .spawn(epoch_store.clone(), replay_waiter)
1503            .await;
1504
1505        if node_role.is_validator() && epoch_store.authenticator_state_enabled() {
1506            Self::start_jwk_updater(
1507                config,
1508                sui_node_metrics,
1509                state.name,
1510                epoch_store.clone(),
1511                consensus_adapter.clone(),
1512            );
1513        }
1514
1515        if let Some(ctx) = &admission_queue {
1516            ctx.rotate_for_epoch(epoch_store);
1517        }
1518
1519        Ok(ValidatorComponents {
1520            validator_server_handle,
1521            validator_overload_monitor_handle,
1522            consensus_manager,
1523            consensus_store_pruner,
1524            consensus_adapter,
1525            checkpoint_metrics,
1526            sui_tx_validator_metrics,
1527            admission_queue,
1528        })
1529    }
1530
1531    fn build_checkpoint_service(
1532        config: &NodeConfig,
1533        consensus_adapter: Arc<ConsensusAdapter>,
1534        checkpoint_store: Arc<CheckpointStore>,
1535        epoch_store: Arc<AuthorityPerEpochStore>,
1536        state: Arc<AuthorityState>,
1537        state_sync_handle: state_sync::Handle,
1538        state_hasher: Weak<GlobalStateHasher>,
1539        checkpoint_metrics: Arc<CheckpointMetrics>,
1540        node_role: NodeRole,
1541    ) -> Arc<CheckpointService> {
1542        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1543        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1544
1545        debug!(
1546            "Starting checkpoint service with epoch start timestamp {}
1547            and epoch duration {}",
1548            epoch_start_timestamp_ms, epoch_duration_ms
1549        );
1550
1551        let checkpoint_output: Box<dyn CheckpointOutput> = if node_role.is_validator() {
1552            Box::new(SubmitCheckpointToConsensus {
1553                sender: consensus_adapter,
1554                signer: state.secret.clone(),
1555                authority: config.protocol_public_key(),
1556                next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1557                    .checked_add(epoch_duration_ms)
1558                    .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1559                metrics: checkpoint_metrics.clone(),
1560            })
1561        } else {
1562            LogCheckpointOutput::boxed()
1563        };
1564
1565        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1566        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1567        let max_checkpoint_size_bytes =
1568            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1569
1570        CheckpointService::build(
1571            state.clone(),
1572            checkpoint_store,
1573            epoch_store,
1574            state.get_transaction_cache_reader().clone(),
1575            state_hasher,
1576            checkpoint_output,
1577            Box::new(certified_checkpoint_output),
1578            checkpoint_metrics,
1579            max_tx_per_checkpoint,
1580            max_checkpoint_size_bytes,
1581        )
1582    }
1583
1584    fn construct_consensus_adapter(
1585        committee: &Committee,
1586        consensus_config: &ConsensusConfig,
1587        authority: AuthorityName,
1588        prometheus_registry: &Registry,
1589        consensus_client: Arc<dyn ConsensusClient>,
1590        checkpoint_store: Arc<CheckpointStore>,
1591        inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1592    ) -> ConsensusAdapter {
1593        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1594        // The consensus adapter allows the authority to send user certificates through consensus.
1595
1596        ConsensusAdapter::new(
1597            consensus_client,
1598            checkpoint_store,
1599            authority,
1600            consensus_config.max_pending_transactions(),
1601            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1602            ca_metrics,
1603            inflight_slot_freed_notify,
1604        )
1605    }
1606
1607    async fn start_grpc_validator_service(
1608        config: &NodeConfig,
1609        state: Arc<AuthorityState>,
1610        consensus_adapter: Arc<ConsensusAdapter>,
1611        epoch_store: Arc<AuthorityPerEpochStore>,
1612        prometheus_registry: &Registry,
1613        inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1614    ) -> Result<(SpawnOnce, Option<AdmissionQueueContext>)> {
1615        let overload_config = &config.authority_overload_config;
1616        let admission_queue = overload_config.admission_queue_enabled.then(|| {
1617            let manager = Arc::new(AdmissionQueueManager::new(
1618                consensus_adapter.clone(),
1619                Arc::new(AdmissionQueueMetrics::new(prometheus_registry)),
1620                overload_config.admission_queue_capacity_fraction,
1621                overload_config.admission_queue_bypass_fraction,
1622                overload_config.admission_queue_failover_timeout,
1623                inflight_slot_freed_notify,
1624            ));
1625            AdmissionQueueContext::spawn(manager, epoch_store)
1626        });
1627        let validator_service = ValidatorService::new(
1628            state.clone(),
1629            consensus_adapter,
1630            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1631            config.policy_config.clone().map(|p| p.client_id_source),
1632            admission_queue.clone(),
1633        );
1634
1635        let mut server_conf = mysten_network::config::Config::new();
1636        server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1637        server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1638        server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1639        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1640        server_conf.load_shed = config.grpc_load_shed;
1641        let mut server_builder =
1642            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1643
1644        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1645
1646        let tls_config = sui_tls::create_rustls_server_config(
1647            config.network_key_pair().copy().private(),
1648            SUI_TLS_SERVER_NAME.to_string(),
1649        );
1650
1651        let network_address = config.network_address().clone();
1652
1653        let (ready_tx, ready_rx) = oneshot::channel();
1654
1655        let spawn_once = SpawnOnce::new(ready_rx, async move {
1656            let server = server_builder
1657                .bind(&network_address, Some(tls_config))
1658                .await
1659                .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1660            let local_addr = server.local_addr();
1661            info!("Listening to traffic on {local_addr}");
1662            ready_tx.send(()).unwrap();
1663            if let Err(err) = server.serve().await {
1664                info!("Server stopped: {err}");
1665            }
1666            info!("Server stopped");
1667        });
1668        Ok((spawn_once, admission_queue))
1669    }
1670
1671    pub fn state(&self) -> Arc<AuthorityState> {
1672        self.state.clone()
1673    }
1674
1675    #[cfg(any(test, msim))]
1676    pub fn connection_monitor_handle_for_testing(
1677        &self,
1678    ) -> &mysten_network::anemo_connection_monitor::ConnectionMonitorHandle {
1679        &self._connection_monitor_handle
1680    }
1681
1682    pub fn node_role(&self) -> NodeRole {
1683        self.state.load_epoch_store_one_call_per_task().node_role()
1684    }
1685
1686    // Only used for testing because of how epoch store is loaded.
1687    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1688        self.state.reference_gas_price_for_testing()
1689    }
1690
1691    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1692        self.state.committee_store().clone()
1693    }
1694
1695    /*
1696    pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1697        self.state.db()
1698    }
1699    */
1700
1701    /// Clone an AuthorityAggregator currently used in this node, if the node is a fullnode.
1702    /// After reconfig, Transaction Driver builds a new AuthorityAggregator. The caller
1703    /// of this function will mostly likely want to call this again
1704    /// to get a fresh one.
1705    pub fn clone_authority_aggregator(
1706        &self,
1707    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1708        self.transaction_orchestrator
1709            .as_ref()
1710            .map(|to| to.clone_authority_aggregator())
1711    }
1712
1713    pub fn transaction_orchestrator(
1714        &self,
1715    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1716        self.transaction_orchestrator.clone()
1717    }
1718
1719    /// This function awaits the completion of checkpoint execution of the current epoch,
1720    /// after which it initiates reconfiguration of the entire system.
1721    pub async fn monitor_reconfiguration(
1722        self: Arc<Self>,
1723        mut epoch_store: Arc<AuthorityPerEpochStore>,
1724    ) -> Result<()> {
1725        let checkpoint_executor_metrics =
1726            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1727
1728        loop {
1729            let mut hasher_guard = self.global_state_hasher.lock().await;
1730            let hasher = hasher_guard.take().unwrap();
1731            info!(
1732                "Creating checkpoint executor for epoch {}",
1733                epoch_store.epoch()
1734            );
1735            let checkpoint_executor = CheckpointExecutor::new(
1736                epoch_store.clone(),
1737                self.checkpoint_store.clone(),
1738                self.state.clone(),
1739                hasher.clone(),
1740                self.backpressure_manager.clone(),
1741                self.config.checkpoint_executor_config.clone(),
1742                checkpoint_executor_metrics.clone(),
1743                self.subscription_service_checkpoint_sender.clone(),
1744            );
1745
1746            let run_with_range = self.config.run_with_range;
1747
1748            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1749
1750            // Update the current protocol version metric.
1751            self.metrics
1752                .current_protocol_version
1753                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1754
1755            // Advertise capabilities to committee, if we are a validator.
1756            // FullNodes that state sync via consensus will also have validator components, by they are not supposed to submit any capabilities.
1757            if let Some(components) = &*self.validator_components.lock().await
1758                && cur_epoch_store.is_validator()
1759            {
1760                // TODO: without this sleep, the consensus message is not delivered reliably.
1761                tokio::time::sleep(Duration::from_millis(1)).await;
1762
1763                let config = cur_epoch_store.protocol_config();
1764                let mut supported_protocol_versions = self
1765                    .config
1766                    .supported_protocol_versions
1767                    .expect("Supported versions should be populated")
1768                    // no need to send digests of versions less than the current version
1769                    .truncate_below(config.version);
1770
1771                while supported_protocol_versions.max > config.version {
1772                    let proposed_protocol_config = ProtocolConfig::get_for_version(
1773                        supported_protocol_versions.max,
1774                        cur_epoch_store.get_chain(),
1775                    );
1776
1777                    if proposed_protocol_config.enable_accumulators()
1778                        && !epoch_store.accumulator_root_exists()
1779                    {
1780                        error!(
1781                            "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1782                            supported_protocol_versions.max
1783                        );
1784                        supported_protocol_versions.max = supported_protocol_versions.max.prev();
1785                    } else {
1786                        break;
1787                    }
1788                }
1789
1790                let binary_config = config.binary_config(None);
1791                let transaction = ConsensusTransaction::new_capability_notification_v2(
1792                    AuthorityCapabilitiesV2::new(
1793                        self.state.name,
1794                        cur_epoch_store.get_chain_identifier().chain(),
1795                        supported_protocol_versions,
1796                        self.state
1797                            .get_available_system_packages(&binary_config)
1798                            .await,
1799                    ),
1800                );
1801                info!(?transaction, "submitting capabilities to consensus");
1802                components.consensus_adapter.submit(
1803                    transaction,
1804                    None,
1805                    &cur_epoch_store,
1806                    None,
1807                    None,
1808                )?;
1809            }
1810
1811            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1812
1813            if stop_condition == StopReason::RunWithRangeCondition {
1814                SuiNode::shutdown(&self).await;
1815                self.shutdown_channel_tx
1816                    .send(run_with_range)
1817                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1818                return Ok(());
1819            }
1820
1821            // Safe to call because we are in the middle of reconfiguration.
1822            let latest_system_state = self
1823                .state
1824                .get_object_cache_reader()
1825                .get_sui_system_state_object_unsafe()
1826                .expect("Read Sui System State object cannot fail");
1827
1828            #[cfg(msim)]
1829            if !self
1830                .sim_state
1831                .sim_safe_mode_expected
1832                .load(Ordering::Relaxed)
1833            {
1834                debug_assert!(!latest_system_state.safe_mode());
1835            }
1836
1837            #[cfg(not(msim))]
1838            debug_assert!(!latest_system_state.safe_mode());
1839
1840            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1841                && self.state.is_fullnode(&cur_epoch_store)
1842            {
1843                warn!(
1844                    "Failed to send end of epoch notification to subscriber: {:?}",
1845                    err
1846                );
1847            }
1848
1849            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1850            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1851
1852            self.auth_agg.store(Arc::new(
1853                self.auth_agg
1854                    .load()
1855                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1856            ));
1857
1858            let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1859            let next_epoch = next_epoch_committee.epoch();
1860            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1861
1862            info!(
1863                next_epoch,
1864                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1865            );
1866
1867            fail_point_async!("reconfig_delay");
1868
1869            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1870
1871            update_peer_addresses(
1872                &self.config,
1873                &self.endpoint_manager,
1874                &new_epoch_start_state,
1875                Some(cur_epoch_store.epoch_start_state()),
1876            );
1877
1878            let mut validator_components_lock_guard = self.validator_components.lock().await;
1879
1880            // The following code handles 4 different cases, depending on whether the node
1881            // was a validator in the previous epoch, and whether the node is a validator
1882            // in the new epoch.
1883            let new_epoch_store = self
1884                .reconfigure_state(
1885                    &self.state,
1886                    &cur_epoch_store,
1887                    next_epoch_committee.clone(),
1888                    new_epoch_start_state,
1889                    hasher.clone(),
1890                )
1891                .await;
1892
1893            let new_role = new_epoch_store.node_role();
1894
1895            let new_validator_components = if let Some(ValidatorComponents {
1896                validator_server_handle,
1897                validator_overload_monitor_handle,
1898                consensus_manager,
1899                consensus_store_pruner,
1900                consensus_adapter,
1901                checkpoint_metrics,
1902                sui_tx_validator_metrics,
1903                admission_queue,
1904            }) = validator_components_lock_guard.take()
1905            {
1906                info!("Reconfiguring node (was running consensus).");
1907
1908                consensus_manager.shutdown().await;
1909                info!("Consensus has shut down.");
1910
1911                info!("Epoch store finished reconfiguration.");
1912
1913                // No other components should be holding a strong reference to state hasher
1914                // at this point. Confirm here before we swap in the new hasher.
1915                let global_state_hasher_metrics = Arc::into_inner(hasher)
1916                    .expect("Object state hasher should have no other references at this point")
1917                    .metrics();
1918                let new_hasher = Arc::new(GlobalStateHasher::new(
1919                    self.state.get_global_state_hash_store().clone(),
1920                    global_state_hasher_metrics,
1921                ));
1922                let weak_hasher = Arc::downgrade(&new_hasher);
1923                *hasher_guard = Some(new_hasher);
1924
1925                consensus_store_pruner.prune(next_epoch).await;
1926
1927                if new_role.runs_consensus() {
1928                    info!("Restarting consensus as {new_role}");
1929                    Some(
1930                        Self::start_epoch_specific_validator_components(
1931                            &self.config,
1932                            self.state.clone(),
1933                            consensus_adapter,
1934                            self.checkpoint_store.clone(),
1935                            new_epoch_store.clone(),
1936                            self.state_sync_handle.clone(),
1937                            self.randomness_handle.clone(),
1938                            self.randomness_receiver_handle.clone(),
1939                            consensus_manager,
1940                            consensus_store_pruner,
1941                            weak_hasher,
1942                            self.backpressure_manager.clone(),
1943                            validator_server_handle,
1944                            validator_overload_monitor_handle,
1945                            checkpoint_metrics,
1946                            self.metrics.clone(),
1947                            sui_tx_validator_metrics,
1948                            admission_queue,
1949                            new_role,
1950                        )
1951                        .await?,
1952                    )
1953                } else {
1954                    info!(
1955                        "This node has new role {new_role} and no longer runs consensus after reconfiguration"
1956                    );
1957                    None
1958                }
1959            } else {
1960                // No other components should be holding a strong reference to state hasher
1961                // at this point. Confirm here before we swap in the new hasher.
1962                let global_state_hasher_metrics = Arc::into_inner(hasher)
1963                    .expect("Object state hasher should have no other references at this point")
1964                    .metrics();
1965                let new_hasher = Arc::new(GlobalStateHasher::new(
1966                    self.state.get_global_state_hash_store().clone(),
1967                    global_state_hasher_metrics,
1968                ));
1969                let weak_hasher = Arc::downgrade(&new_hasher);
1970                *hasher_guard = Some(new_hasher);
1971
1972                if new_role.runs_consensus() {
1973                    info!("Promoting node to {new_role}, starting consensus components");
1974
1975                    let mut components = Self::construct_validator_components(
1976                        self.config.clone(),
1977                        self.state.clone(),
1978                        Arc::new(next_epoch_committee.clone()),
1979                        new_epoch_store.clone(),
1980                        self.checkpoint_store.clone(),
1981                        self.state_sync_handle.clone(),
1982                        self.randomness_handle.clone(),
1983                        weak_hasher,
1984                        self.backpressure_manager.clone(),
1985                        &self.registry_service,
1986                        self.metrics.clone(),
1987                        self.checkpoint_metrics.clone(),
1988                        new_role,
1989                        self.randomness_receiver_handle.clone(),
1990                    )
1991                    .await?;
1992
1993                    if new_role.is_validator() {
1994                        components.validator_server_handle = Some(
1995                            components
1996                                .validator_server_handle
1997                                .take()
1998                                .unwrap()
1999                                .start()
2000                                .await,
2001                        );
2002
2003                        self.endpoint_manager
2004                            .set_consensus_address_updater(components.consensus_manager.clone());
2005                    }
2006
2007                    Some(components)
2008                } else {
2009                    None
2010                }
2011            };
2012            *validator_components_lock_guard = new_validator_components;
2013
2014            // Force releasing current epoch store DB handle, because the
2015            // Arc<AuthorityPerEpochStore> may linger.
2016            cur_epoch_store.release_db_handles();
2017
2018            if cfg!(msim)
2019                && !matches!(
2020                    self.config
2021                        .authority_store_pruning_config
2022                        .num_epochs_to_retain_for_checkpoints(),
2023                    None | Some(u64::MAX) | Some(0)
2024                )
2025            {
2026                self.state
2027                    .prune_checkpoints_for_eligible_epochs_for_testing(
2028                        self.config.clone(),
2029                        sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2030                    )
2031                    .await?;
2032            }
2033
2034            epoch_store = new_epoch_store;
2035            info!("Reconfiguration finished");
2036        }
2037    }
2038
2039    async fn shutdown(&self) {
2040        if let Some(validator_components) = &*self.validator_components.lock().await {
2041            validator_components.consensus_manager.shutdown().await;
2042        }
2043    }
2044
2045    async fn reconfigure_state(
2046        &self,
2047        state: &Arc<AuthorityState>,
2048        cur_epoch_store: &AuthorityPerEpochStore,
2049        next_epoch_committee: Committee,
2050        next_epoch_start_system_state: EpochStartSystemState,
2051        global_state_hasher: Arc<GlobalStateHasher>,
2052    ) -> Arc<AuthorityPerEpochStore> {
2053        let next_epoch = next_epoch_committee.epoch();
2054
2055        let last_checkpoint = self
2056            .checkpoint_store
2057            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2058            .expect("Error loading last checkpoint for current epoch")
2059            .expect("Could not load last checkpoint for current epoch");
2060
2061        let last_checkpoint_seq = *last_checkpoint.sequence_number();
2062
2063        assert_eq!(
2064            Some(last_checkpoint_seq),
2065            self.checkpoint_store
2066                .get_highest_executed_checkpoint_seq_number()
2067                .expect("Error loading highest executed checkpoint sequence number")
2068        );
2069
2070        let epoch_start_configuration = EpochStartConfiguration::new(
2071            next_epoch_start_system_state,
2072            *last_checkpoint.digest(),
2073            state.get_object_store().as_ref(),
2074            EpochFlag::default_flags_for_new_epoch(&state.config),
2075        )
2076        .expect("EpochStartConfiguration construction cannot fail");
2077
2078        let new_epoch_store = self
2079            .state
2080            .reconfigure(
2081                cur_epoch_store,
2082                self.config.supported_protocol_versions.unwrap(),
2083                next_epoch_committee,
2084                epoch_start_configuration,
2085                global_state_hasher,
2086                &self.config.expensive_safety_check_config,
2087                last_checkpoint_seq,
2088            )
2089            .await
2090            .expect("Reconfigure authority state cannot fail");
2091        info!(next_epoch, "Node State has been reconfigured");
2092        assert_eq!(next_epoch, new_epoch_store.epoch());
2093        self.state.get_reconfig_api().update_epoch_flags_metrics(
2094            cur_epoch_store.epoch_start_config().flags(),
2095            new_epoch_store.epoch_start_config().flags(),
2096        );
2097
2098        new_epoch_store
2099    }
2100
2101    pub fn get_config(&self) -> &NodeConfig {
2102        &self.config
2103    }
2104
2105    pub fn randomness_handle(&self) -> randomness::Handle {
2106        self.randomness_handle.clone()
2107    }
2108
2109    pub fn state_sync_handle(&self) -> state_sync::Handle {
2110        self.state_sync_handle.clone()
2111    }
2112
2113    pub fn endpoint_manager(&self) -> &EndpointManager {
2114        &self.endpoint_manager
2115    }
2116
2117    /// Get a short prefix of a digest for metric labels
2118    fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2119        let digest_str = digest.to_string();
2120        if digest_str.len() >= 8 {
2121            digest_str[0..8].to_string()
2122        } else {
2123            digest_str
2124        }
2125    }
2126
2127    /// Check for previously detected forks and handle them appropriately.
2128    /// For validators with fork recovery config, clear the fork if it matches the recovery config.
2129    /// For all other cases, block node startup if a fork is detected.
2130    async fn check_and_recover_forks(
2131        checkpoint_store: &CheckpointStore,
2132        checkpoint_metrics: &CheckpointMetrics,
2133        fork_recovery: Option<&ForkRecoveryConfig>,
2134    ) -> Result<()> {
2135        // Try to recover from forks if recovery config is provided
2136        if let Some(recovery) = fork_recovery {
2137            Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2138            Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2139        }
2140
2141        if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2142            .get_checkpoint_fork_detected()
2143            .map_err(|e| {
2144                error!("Failed to check for checkpoint fork: {:?}", e);
2145                e
2146            })?
2147        {
2148            Self::handle_checkpoint_fork(
2149                checkpoint_seq,
2150                checkpoint_digest,
2151                checkpoint_metrics,
2152                fork_recovery,
2153            )
2154            .await?;
2155        }
2156        if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2157            .get_transaction_fork_detected()
2158            .map_err(|e| {
2159                error!("Failed to check for transaction fork: {:?}", e);
2160                e
2161            })?
2162        {
2163            Self::handle_transaction_fork(
2164                tx_digest,
2165                expected_effects,
2166                actual_effects,
2167                checkpoint_metrics,
2168                fork_recovery,
2169            )
2170            .await?;
2171        }
2172
2173        Ok(())
2174    }
2175
2176    fn try_recover_checkpoint_fork(
2177        checkpoint_store: &CheckpointStore,
2178        recovery: &ForkRecoveryConfig,
2179    ) -> Result<()> {
2180        // If configured overrides include a checkpoint whose locally computed digest mismatches,
2181        // clear locally computed checkpoints from that sequence (inclusive).
2182        for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2183            let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2184                anyhow::bail!(
2185                    "Invalid checkpoint digest override for seq {}: {}",
2186                    seq,
2187                    expected_digest_str
2188                );
2189            };
2190
2191            if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2192                let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2193                if local_digest != expected_digest {
2194                    info!(
2195                        seq,
2196                        local = %Self::get_digest_prefix(local_digest),
2197                        expected = %Self::get_digest_prefix(expected_digest),
2198                        "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2199                        seq
2200                    );
2201                    checkpoint_store
2202                        .clear_locally_computed_checkpoints_from(*seq)
2203                        .context(
2204                            "Failed to clear locally computed checkpoints from override seq",
2205                        )?;
2206                }
2207            }
2208        }
2209
2210        if let Some((checkpoint_seq, checkpoint_digest)) =
2211            checkpoint_store.get_checkpoint_fork_detected()?
2212            && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2213        {
2214            info!(
2215                "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2216                checkpoint_seq, checkpoint_digest
2217            );
2218            checkpoint_store
2219                .clear_checkpoint_fork_detected()
2220                .expect("Failed to clear checkpoint fork detected marker");
2221        }
2222        Ok(())
2223    }
2224
2225    fn try_recover_transaction_fork(
2226        checkpoint_store: &CheckpointStore,
2227        recovery: &ForkRecoveryConfig,
2228    ) -> Result<()> {
2229        if recovery.transaction_overrides.is_empty() {
2230            return Ok(());
2231        }
2232
2233        if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2234            && recovery
2235                .transaction_overrides
2236                .contains_key(&tx_digest.to_string())
2237        {
2238            info!(
2239                "Fork recovery enabled: clearing transaction fork for tx {:?}",
2240                tx_digest
2241            );
2242            checkpoint_store
2243                .clear_transaction_fork_detected()
2244                .expect("Failed to clear transaction fork detected marker");
2245        }
2246        Ok(())
2247    }
2248
2249    fn get_current_timestamp() -> u64 {
2250        std::time::SystemTime::now()
2251            .duration_since(std::time::SystemTime::UNIX_EPOCH)
2252            .unwrap()
2253            .as_secs()
2254    }
2255
2256    async fn handle_checkpoint_fork(
2257        checkpoint_seq: u64,
2258        checkpoint_digest: CheckpointDigest,
2259        checkpoint_metrics: &CheckpointMetrics,
2260        fork_recovery: Option<&ForkRecoveryConfig>,
2261    ) -> Result<()> {
2262        checkpoint_metrics
2263            .checkpoint_fork_crash_mode
2264            .with_label_values(&[
2265                &checkpoint_seq.to_string(),
2266                &Self::get_digest_prefix(checkpoint_digest),
2267                &Self::get_current_timestamp().to_string(),
2268            ])
2269            .set(1);
2270
2271        let behavior = fork_recovery
2272            .map(|fr| fr.fork_crash_behavior)
2273            .unwrap_or_default();
2274
2275        match behavior {
2276            ForkCrashBehavior::AwaitForkRecovery => {
2277                error!(
2278                    checkpoint_seq = checkpoint_seq,
2279                    checkpoint_digest = ?checkpoint_digest,
2280                    "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2281                );
2282                futures::future::pending::<()>().await;
2283                unreachable!("pending() should never return");
2284            }
2285            ForkCrashBehavior::ReturnError => {
2286                error!(
2287                    checkpoint_seq = checkpoint_seq,
2288                    checkpoint_digest = ?checkpoint_digest,
2289                    "Checkpoint fork detected! Returning error."
2290                );
2291                Err(anyhow::anyhow!(
2292                    "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2293                    checkpoint_seq,
2294                    checkpoint_digest
2295                ))
2296            }
2297        }
2298    }
2299
2300    async fn handle_transaction_fork(
2301        tx_digest: TransactionDigest,
2302        expected_effects_digest: TransactionEffectsDigest,
2303        actual_effects_digest: TransactionEffectsDigest,
2304        checkpoint_metrics: &CheckpointMetrics,
2305        fork_recovery: Option<&ForkRecoveryConfig>,
2306    ) -> Result<()> {
2307        checkpoint_metrics
2308            .transaction_fork_crash_mode
2309            .with_label_values(&[
2310                &Self::get_digest_prefix(tx_digest),
2311                &Self::get_digest_prefix(expected_effects_digest),
2312                &Self::get_digest_prefix(actual_effects_digest),
2313                &Self::get_current_timestamp().to_string(),
2314            ])
2315            .set(1);
2316
2317        let behavior = fork_recovery
2318            .map(|fr| fr.fork_crash_behavior)
2319            .unwrap_or_default();
2320
2321        match behavior {
2322            ForkCrashBehavior::AwaitForkRecovery => {
2323                error!(
2324                    tx_digest = ?tx_digest,
2325                    expected_effects_digest = ?expected_effects_digest,
2326                    actual_effects_digest = ?actual_effects_digest,
2327                    "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2328                );
2329                futures::future::pending::<()>().await;
2330                unreachable!("pending() should never return");
2331            }
2332            ForkCrashBehavior::ReturnError => {
2333                error!(
2334                    tx_digest = ?tx_digest,
2335                    expected_effects_digest = ?expected_effects_digest,
2336                    actual_effects_digest = ?actual_effects_digest,
2337                    "Transaction fork detected! Returning error."
2338                );
2339                Err(anyhow::anyhow!(
2340                    "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2341                    tx_digest,
2342                    expected_effects_digest,
2343                    actual_effects_digest
2344                ))
2345            }
2346        }
2347    }
2348}
2349
2350#[cfg(not(msim))]
2351impl SuiNode {
2352    async fn fetch_jwks(
2353        _authority: AuthorityName,
2354        provider: &OIDCProvider,
2355    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2356        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2357        use sui_types::error::SuiErrorKind;
2358        let client = reqwest::Client::new();
2359        fetch_jwks(provider, &client, true)
2360            .await
2361            .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2362    }
2363}
2364
2365#[cfg(msim)]
2366impl SuiNode {
2367    pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2368        self.sim_state.sim_node.id()
2369    }
2370
2371    pub fn set_safe_mode_expected(&self, new_value: bool) {
2372        info!("Setting safe mode expected to {}", new_value);
2373        self.sim_state
2374            .sim_safe_mode_expected
2375            .store(new_value, Ordering::Relaxed);
2376    }
2377
2378    #[allow(unused_variables)]
2379    async fn fetch_jwks(
2380        authority: AuthorityName,
2381        provider: &OIDCProvider,
2382    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2383        get_jwk_injector()(authority, provider)
2384    }
2385}
2386
2387enum SpawnOnce {
2388    // Mutex is only needed to make SpawnOnce Send
2389    Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2390    #[allow(unused)]
2391    Started(JoinHandle<()>),
2392}
2393
2394impl SpawnOnce {
2395    pub fn new(
2396        ready_rx: oneshot::Receiver<()>,
2397        future: impl Future<Output = ()> + Send + 'static,
2398    ) -> Self {
2399        Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2400    }
2401
2402    pub async fn start(self) -> Self {
2403        match self {
2404            Self::Unstarted(ready_rx, future) => {
2405                let future = future.into_inner();
2406                let handle = tokio::spawn(future);
2407                ready_rx.await.unwrap();
2408                Self::Started(handle)
2409            }
2410            Self::Started(_) => self,
2411        }
2412    }
2413}
2414
2415/// Updates trusted peer addresses in the p2p network (for nodes configured as validators).
2416/// When `prev_epoch_start_state` is provided, validators that are no longer in the committee
2417/// have their Chain addresses cleared.
2418fn update_peer_addresses(
2419    config: &NodeConfig,
2420    endpoint_manager: &EndpointManager,
2421    epoch_start_state: &EpochStartSystemState,
2422    prev_epoch_start_state: Option<&EpochStartSystemState>,
2423) {
2424    if config.consensus_config().is_none() {
2425        return;
2426    }
2427    let new_peers: HashSet<PeerId> = epoch_start_state
2428        .get_validator_as_p2p_peers(config.protocol_public_key())
2429        .into_iter()
2430        .map(|(peer_id, address)| {
2431            endpoint_manager
2432                .update_endpoint(
2433                    EndpointId::P2p(peer_id),
2434                    AddressSource::Chain,
2435                    vec![address],
2436                )
2437                .expect("Updating peer addresses should not fail");
2438            peer_id
2439        })
2440        .collect();
2441
2442    // Clear Chain addresses for validators that left the committee.
2443    if let Some(prev) = prev_epoch_start_state {
2444        for (peer_id, _) in prev.get_validator_as_p2p_peers(config.protocol_public_key()) {
2445            if !new_peers.contains(&peer_id) {
2446                endpoint_manager
2447                    .update_endpoint(EndpointId::P2p(peer_id), AddressSource::Chain, vec![])
2448                    .expect("Clearing peer addresses should not fail");
2449            }
2450        }
2451    }
2452}
2453
2454fn build_kv_store(
2455    state: &Arc<AuthorityState>,
2456    config: &NodeConfig,
2457    registry: &Registry,
2458) -> Result<Arc<TransactionKeyValueStore>> {
2459    let metrics = KeyValueStoreMetrics::new(registry);
2460    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2461
2462    let base_url = &config.transaction_kv_store_read_config.base_url;
2463
2464    if base_url.is_empty() {
2465        info!("no http kv store url provided, using local db only");
2466        return Ok(Arc::new(db_store));
2467    }
2468
2469    let base_url: url::Url = base_url.parse().tap_err(|e| {
2470        error!(
2471            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2472            base_url, e
2473        )
2474    })?;
2475
2476    let network_str = match state.get_chain_identifier().chain() {
2477        Chain::Mainnet => "/mainnet",
2478        _ => {
2479            info!("using local db only for kv store");
2480            return Ok(Arc::new(db_store));
2481        }
2482    };
2483
2484    let base_url = base_url.join(network_str)?.to_string();
2485    let http_store = HttpKVStore::new_kv(
2486        &base_url,
2487        config.transaction_kv_store_read_config.cache_size,
2488        metrics.clone(),
2489    )?;
2490    info!("using local key-value store with fallback to http key-value store");
2491    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2492        db_store,
2493        http_store,
2494        metrics,
2495        "json_rpc_fallback",
2496    )))
2497}
2498
2499async fn build_http_servers(
2500    state: Arc<AuthorityState>,
2501    store: RocksDbStore,
2502    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2503    config: &NodeConfig,
2504    prometheus_registry: &Registry,
2505    server_version: ServerVersion,
2506    node_role: NodeRole,
2507) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2508    // Validators do not expose these APIs
2509    if !node_role.should_run_rpc_servers() {
2510        return Ok((HttpServers::default(), None));
2511    }
2512
2513    info!("starting rpc service with config: {:?}", config.rpc);
2514
2515    let mut router = axum::Router::new();
2516
2517    let json_rpc_router = {
2518        let traffic_controller = state.traffic_controller.clone();
2519        let mut server = JsonRpcServerBuilder::new(
2520            env!("CARGO_PKG_VERSION"),
2521            prometheus_registry,
2522            traffic_controller,
2523            config.policy_config.clone(),
2524        );
2525
2526        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2527
2528        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2529        server.register_module(ReadApi::new(
2530            state.clone(),
2531            kv_store.clone(),
2532            metrics.clone(),
2533        ))?;
2534        server.register_module(CoinReadApi::new(
2535            state.clone(),
2536            kv_store.clone(),
2537            metrics.clone(),
2538        ))?;
2539
2540        // if run_with_range is enabled we want to prevent any transactions
2541        // run_with_range = None is normal operating conditions
2542        if config.run_with_range.is_none() {
2543            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2544        }
2545        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2546        server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2547
2548        if let Some(transaction_orchestrator) = transaction_orchestrator {
2549            server.register_module(TransactionExecutionApi::new(
2550                state.clone(),
2551                transaction_orchestrator.clone(),
2552                metrics.clone(),
2553            ))?;
2554        }
2555
2556        let name_service_config =
2557            if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2558                config.name_service_package_address,
2559                config.name_service_registry_id,
2560                config.name_service_reverse_registry_id,
2561            ) {
2562                sui_name_service::NameServiceConfig::new(
2563                    package_address,
2564                    registry_id,
2565                    reverse_registry_id,
2566                )
2567            } else {
2568                match state.get_chain_identifier().chain() {
2569                    Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2570                    Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2571                    Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2572                }
2573            };
2574
2575        server.register_module(IndexerApi::new(
2576            state.clone(),
2577            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2578            kv_store,
2579            name_service_config,
2580            metrics,
2581            config.indexer_max_subscriptions,
2582        ))?;
2583        server.register_module(MoveUtils::new(state.clone()))?;
2584
2585        let server_type = config.jsonrpc_server_type();
2586
2587        server.to_router(server_type).await?
2588    };
2589
2590    router = router.merge(json_rpc_router);
2591
2592    let (subscription_service_checkpoint_sender, subscription_service_handle) =
2593        SubscriptionService::build(prometheus_registry);
2594    let rpc_router = {
2595        let mut rpc_service =
2596            sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2597        rpc_service.with_server_version(server_version);
2598
2599        if let Some(config) = config.rpc.clone() {
2600            config.validate()?;
2601            rpc_service.with_config(config);
2602        }
2603
2604        rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2605        rpc_service.with_subscription_service(subscription_service_handle);
2606
2607        if let Some(transaction_orchestrator) = transaction_orchestrator {
2608            rpc_service.with_executor(transaction_orchestrator.clone())
2609        }
2610
2611        rpc_service.into_router().await
2612    };
2613
2614    let layers = ServiceBuilder::new()
2615        .map_request(|mut request: axum::http::Request<_>| {
2616            if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2617                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2618                request.extensions_mut().insert(axum_connect_info);
2619            }
2620            request
2621        })
2622        .layer(axum::middleware::from_fn(server_timing_middleware))
2623        // Setup a permissive CORS policy
2624        .layer(
2625            tower_http::cors::CorsLayer::new()
2626                .allow_methods([http::Method::GET, http::Method::POST])
2627                .allow_origin(tower_http::cors::Any)
2628                .allow_headers(tower_http::cors::Any)
2629                .expose_headers(tower_http::cors::Any),
2630        );
2631
2632    router = router.merge(rpc_router).layer(layers);
2633
2634    let https = if let Some((tls_config, https_address)) = config
2635        .rpc()
2636        .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2637    {
2638        let https = sui_http::Builder::new()
2639            .tls_single_cert(tls_config.cert(), tls_config.key())
2640            .and_then(|builder| builder.serve(https_address, router.clone()))
2641            .map_err(|e| anyhow::anyhow!(e))?;
2642
2643        info!(
2644            https_address =? https.local_addr(),
2645            "HTTPS rpc server listening on {}",
2646            https.local_addr()
2647        );
2648
2649        Some(https)
2650    } else {
2651        None
2652    };
2653
2654    let http = sui_http::Builder::new()
2655        .serve(&config.json_rpc_address, router)
2656        .map_err(|e| anyhow::anyhow!(e))?;
2657
2658    info!(
2659        http_address =? http.local_addr(),
2660        "HTTP rpc server listening on {}",
2661        http.local_addr()
2662    );
2663
2664    Ok((
2665        HttpServers {
2666            http: Some(http),
2667            https,
2668        },
2669        Some(subscription_service_checkpoint_sender),
2670    ))
2671}
2672
2673#[cfg(not(test))]
2674fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2675    protocol_config.max_transactions_per_checkpoint() as usize
2676}
2677
2678#[cfg(test)]
2679fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2680    2
2681}
2682
2683#[derive(Default)]
2684struct HttpServers {
2685    #[allow(unused)]
2686    http: Option<sui_http::ServerHandle>,
2687    #[allow(unused)]
2688    https: Option<sui_http::ServerHandle>,
2689}
2690
2691#[cfg(test)]
2692mod tests {
2693    use super::*;
2694    use prometheus::Registry;
2695    use std::collections::BTreeMap;
2696    use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2697    use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2698    use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2699
2700    #[tokio::test]
2701    async fn test_fork_error_and_recovery_both_paths() {
2702        let checkpoint_store = CheckpointStore::new_for_tests();
2703        let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2704
2705        // ---------- Checkpoint fork path ----------
2706        let seq_num = 42;
2707        let digest = CheckpointDigest::random();
2708        checkpoint_store
2709            .record_checkpoint_fork_detected(seq_num, digest)
2710            .unwrap();
2711
2712        let fork_recovery = ForkRecoveryConfig {
2713            transaction_overrides: Default::default(),
2714            checkpoint_overrides: Default::default(),
2715            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2716        };
2717
2718        let r = SuiNode::check_and_recover_forks(
2719            &checkpoint_store,
2720            &checkpoint_metrics,
2721            Some(&fork_recovery),
2722        )
2723        .await;
2724        assert!(r.is_err());
2725        assert!(
2726            r.unwrap_err()
2727                .to_string()
2728                .contains("Checkpoint fork detected")
2729        );
2730
2731        let mut checkpoint_overrides = BTreeMap::new();
2732        checkpoint_overrides.insert(seq_num, digest.to_string());
2733        let fork_recovery_with_override = ForkRecoveryConfig {
2734            transaction_overrides: Default::default(),
2735            checkpoint_overrides,
2736            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2737        };
2738        let r = SuiNode::check_and_recover_forks(
2739            &checkpoint_store,
2740            &checkpoint_metrics,
2741            Some(&fork_recovery_with_override),
2742        )
2743        .await;
2744        assert!(r.is_ok());
2745        assert!(
2746            checkpoint_store
2747                .get_checkpoint_fork_detected()
2748                .unwrap()
2749                .is_none()
2750        );
2751
2752        // ---------- Transaction fork path ----------
2753        let tx_digest = TransactionDigest::random();
2754        let expected_effects = TransactionEffectsDigest::random();
2755        let actual_effects = TransactionEffectsDigest::random();
2756        checkpoint_store
2757            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2758            .unwrap();
2759
2760        let fork_recovery = ForkRecoveryConfig {
2761            transaction_overrides: Default::default(),
2762            checkpoint_overrides: Default::default(),
2763            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2764        };
2765        let r = SuiNode::check_and_recover_forks(
2766            &checkpoint_store,
2767            &checkpoint_metrics,
2768            Some(&fork_recovery),
2769        )
2770        .await;
2771        assert!(r.is_err());
2772        assert!(
2773            r.unwrap_err()
2774                .to_string()
2775                .contains("Transaction fork detected")
2776        );
2777
2778        let mut transaction_overrides = BTreeMap::new();
2779        transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2780        let fork_recovery_with_override = ForkRecoveryConfig {
2781            transaction_overrides,
2782            checkpoint_overrides: Default::default(),
2783            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2784        };
2785        let r = SuiNode::check_and_recover_forks(
2786            &checkpoint_store,
2787            &checkpoint_metrics,
2788            Some(&fork_recovery_with_override),
2789        )
2790        .await;
2791        assert!(r.is_ok());
2792        assert!(
2793            checkpoint_store
2794                .get_transaction_fork_detected()
2795                .unwrap()
2796                .is_none()
2797        );
2798    }
2799}