sui_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::debug_fatal;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::authority::ExecutionEnv;
29use sui_core::authority::RandomnessRoundReceiver;
30use sui_core::authority::authority_store_tables::{
31    AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
32};
33use sui_core::authority::backpressure::BackpressureManager;
34use sui_core::authority::epoch_start_configuration::EpochFlag;
35use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
36use sui_core::authority::shared_object_version_manager::Schedulable;
37use sui_core::consensus_adapter::ConsensusClient;
38use sui_core::consensus_manager::UpdatableConsensusClient;
39use sui_core::epoch::randomness::RandomnessManager;
40use sui_core::execution_cache::build_execution_cache;
41use sui_network::endpoint_manager::EndpointId;
42use sui_network::validator::server::SUI_TLS_SERVER_NAME;
43use sui_types::full_checkpoint_content::Checkpoint;
44
45use sui_core::execution_scheduler::SchedulingSource;
46use sui_core::global_state_hasher::GlobalStateHashMetrics;
47use sui_core::storage::RestReadStore;
48use sui_json_rpc::bridge_api::BridgeReadApi;
49use sui_json_rpc_api::JsonRpcMetrics;
50use sui_network::randomness;
51use sui_rpc_api::RpcMetrics;
52use sui_rpc_api::ServerVersion;
53use sui_rpc_api::subscription::SubscriptionService;
54use sui_types::base_types::ConciseableName;
55use sui_types::crypto::RandomnessRound;
56use sui_types::digests::{
57    ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
58};
59use sui_types::executable_transaction::VerifiedExecutableTransaction;
60use sui_types::messages_consensus::AuthorityCapabilitiesV2;
61use sui_types::messages_consensus::ConsensusTransactionKind;
62use sui_types::sui_system_state::SuiSystemState;
63use sui_types::transaction::VerifiedCertificate;
64use tap::tap::TapFallible;
65use tokio::sync::oneshot;
66use tokio::sync::{Mutex, broadcast, mpsc};
67use tokio::task::JoinHandle;
68use tower::ServiceBuilder;
69use tracing::{Instrument, error_span, info};
70use tracing::{debug, error, warn};
71
72use fastcrypto_zkp::bn254::zk_login::JWK;
73pub use handle::SuiNodeHandle;
74use mysten_metrics::{RegistryService, spawn_monitored_task};
75use mysten_service::server_timing::server_timing_middleware;
76use sui_config::node::{DBCheckpointConfig, RunWithRange};
77use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
78use sui_config::node_config_metrics::NodeConfigMetrics;
79use sui_config::{ConsensusConfig, NodeConfig};
80use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
81use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
82use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
83use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
84use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
85use sui_core::authority_aggregator::AuthorityAggregator;
86use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
87use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
88use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
89use sui_core::checkpoints::{
90    CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
91    SubmitCheckpointToConsensus,
92};
93use sui_core::consensus_adapter::{
94    CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
95};
96use sui_core::consensus_manager::ConsensusManager;
97use sui_core::consensus_throughput_calculator::{
98    ConsensusThroughputCalculator, ConsensusThroughputProfiler, ThroughputProfileRanges,
99};
100use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
101use sui_core::db_checkpoint_handler::DBCheckpointHandler;
102use sui_core::epoch::committee_store::CommitteeStore;
103use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
104use sui_core::epoch::epoch_metrics::EpochMetrics;
105use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
106use sui_core::global_state_hasher::GlobalStateHasher;
107use sui_core::jsonrpc_index::IndexStore;
108use sui_core::module_cache_metrics::ResolverMetrics;
109use sui_core::overload_monitor::overload_monitor;
110use sui_core::rpc_index::RpcIndexStore;
111use sui_core::signature_verifier::SignatureVerifierMetrics;
112use sui_core::storage::RocksDbStore;
113use sui_core::transaction_orchestrator::TransactionOrchestrator;
114use sui_core::{
115    authority::{AuthorityState, AuthorityStore},
116    authority_client::NetworkAuthorityClient,
117};
118use sui_json_rpc::JsonRpcServerBuilder;
119use sui_json_rpc::coin_api::CoinReadApi;
120use sui_json_rpc::governance_api::GovernanceReadApi;
121use sui_json_rpc::indexer_api::IndexerApi;
122use sui_json_rpc::move_utils::MoveUtils;
123use sui_json_rpc::read_api::ReadApi;
124use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
125use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
126use sui_macros::fail_point;
127use sui_macros::{fail_point_async, replay_log};
128use sui_network::api::ValidatorServer;
129use sui_network::discovery;
130use sui_network::endpoint_manager::EndpointManager;
131use sui_network::state_sync;
132use sui_network::validator::server::ServerBuilder;
133use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
134use sui_snapshot::uploader::StateSnapshotUploader;
135use sui_storage::{
136    http_key_value_store::HttpKVStore,
137    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
138    key_value_store_metrics::KeyValueStoreMetrics,
139};
140use sui_types::base_types::{AuthorityName, EpochId};
141use sui_types::committee::Committee;
142use sui_types::crypto::KeypairTraits;
143use sui_types::error::{SuiError, SuiResult};
144use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
145use sui_types::sui_system_state::SuiSystemStateTrait;
146use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
147use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
148use sui_types::supported_protocol_versions::SupportedProtocolVersions;
149use typed_store::DBMetrics;
150use typed_store::rocks::default_db_options;
151
152use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
153
154pub mod admin;
155mod handle;
156pub mod metrics;
157
158pub struct ValidatorComponents {
159    validator_server_handle: SpawnOnce,
160    validator_overload_monitor_handle: Option<JoinHandle<()>>,
161    consensus_manager: Arc<ConsensusManager>,
162    consensus_store_pruner: ConsensusStorePruner,
163    consensus_adapter: Arc<ConsensusAdapter>,
164    checkpoint_metrics: Arc<CheckpointMetrics>,
165    sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
166}
167pub struct P2pComponents {
168    p2p_network: Network,
169    known_peers: HashMap<PeerId, String>,
170    discovery_handle: discovery::Handle,
171    state_sync_handle: state_sync::Handle,
172    randomness_handle: randomness::Handle,
173}
174
175#[cfg(msim)]
176mod simulator {
177    use std::sync::atomic::AtomicBool;
178    use sui_types::error::SuiErrorKind;
179
180    use super::*;
181    pub(super) struct SimState {
182        pub sim_node: sui_simulator::runtime::NodeHandle,
183        pub sim_safe_mode_expected: AtomicBool,
184        _leak_detector: sui_simulator::NodeLeakDetector,
185    }
186
187    impl Default for SimState {
188        fn default() -> Self {
189            Self {
190                sim_node: sui_simulator::runtime::NodeHandle::current(),
191                sim_safe_mode_expected: AtomicBool::new(false),
192                _leak_detector: sui_simulator::NodeLeakDetector::new(),
193            }
194        }
195    }
196
197    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
198        + Send
199        + Sync
200        + 'static;
201
202    fn default_fetch_jwks(
203        _authority: AuthorityName,
204        _provider: &OIDCProvider,
205    ) -> SuiResult<Vec<(JwkId, JWK)>> {
206        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
207        // Just load a default Twitch jwk for testing.
208        parse_jwks(
209            sui_types::zk_login_util::DEFAULT_JWK_BYTES,
210            &OIDCProvider::Twitch,
211            true,
212        )
213        .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
214    }
215
216    thread_local! {
217        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
218    }
219
220    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
221        JWK_INJECTOR.with(|injector| injector.borrow().clone())
222    }
223
224    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
225        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
226    }
227}
228
229#[cfg(msim)]
230pub use simulator::set_jwk_injector;
231#[cfg(msim)]
232use simulator::*;
233use sui_core::authority::authority_store_pruner::{ObjectsCompactionFilter, PrunerWatermarks};
234use sui_core::{
235    consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
236};
237
238const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
239
240pub struct SuiNode {
241    config: NodeConfig,
242    validator_components: Mutex<Option<ValidatorComponents>>,
243
244    /// The http servers responsible for serving RPC traffic (gRPC and JSON-RPC)
245    #[allow(unused)]
246    http_servers: HttpServers,
247
248    state: Arc<AuthorityState>,
249    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
250    registry_service: RegistryService,
251    metrics: Arc<SuiNodeMetrics>,
252    checkpoint_metrics: Arc<CheckpointMetrics>,
253
254    _discovery: discovery::Handle,
255    _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
256    state_sync_handle: state_sync::Handle,
257    randomness_handle: randomness::Handle,
258    checkpoint_store: Arc<CheckpointStore>,
259    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
260    connection_monitor_status: Arc<ConnectionMonitorStatus>,
261
262    /// Broadcast channel to send the starting system state for the next epoch.
263    end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
264
265    /// EndpointManager for updating peer network addresses.
266    endpoint_manager: EndpointManager,
267
268    backpressure_manager: Arc<BackpressureManager>,
269
270    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
271
272    #[cfg(msim)]
273    sim_state: SimState,
274
275    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
276    // Channel to allow signaling upstream to shutdown sui-node
277    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
278
279    /// AuthorityAggregator of the network, created at start and beginning of each epoch.
280    /// Use ArcSwap so that we could mutate it without taking mut reference.
281    // TODO: Eventually we can make this auth aggregator a shared reference so that this
282    // update will automatically propagate to other uses.
283    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
284
285    subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
286}
287
288impl fmt::Debug for SuiNode {
289    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
290        f.debug_struct("SuiNode")
291            .field("name", &self.state.name.concise())
292            .finish()
293    }
294}
295
296static MAX_JWK_KEYS_PER_FETCH: usize = 100;
297
298impl SuiNode {
299    pub async fn start(
300        config: NodeConfig,
301        registry_service: RegistryService,
302    ) -> Result<Arc<SuiNode>> {
303        Self::start_async(
304            config,
305            registry_service,
306            ServerVersion::new("sui-node", "unknown"),
307        )
308        .await
309    }
310
311    fn start_jwk_updater(
312        config: &NodeConfig,
313        metrics: Arc<SuiNodeMetrics>,
314        authority: AuthorityName,
315        epoch_store: Arc<AuthorityPerEpochStore>,
316        consensus_adapter: Arc<ConsensusAdapter>,
317    ) {
318        let epoch = epoch_store.epoch();
319
320        let supported_providers = config
321            .zklogin_oauth_providers
322            .get(&epoch_store.get_chain_identifier().chain())
323            .unwrap_or(&BTreeSet::new())
324            .iter()
325            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
326            .collect::<Vec<_>>();
327
328        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
329
330        info!(
331            ?fetch_interval,
332            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
333        );
334
335        fn validate_jwk(
336            metrics: &Arc<SuiNodeMetrics>,
337            provider: &OIDCProvider,
338            id: &JwkId,
339            jwk: &JWK,
340        ) -> bool {
341            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
342                warn!(
343                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
344                    id.iss, provider
345                );
346                metrics
347                    .invalid_jwks
348                    .with_label_values(&[&provider.to_string()])
349                    .inc();
350                return false;
351            };
352
353            if iss_provider != *provider {
354                warn!(
355                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
356                    id.iss, provider, iss_provider
357                );
358                metrics
359                    .invalid_jwks
360                    .with_label_values(&[&provider.to_string()])
361                    .inc();
362                return false;
363            }
364
365            if !check_total_jwk_size(id, jwk) {
366                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
367                metrics
368                    .invalid_jwks
369                    .with_label_values(&[&provider.to_string()])
370                    .inc();
371                return false;
372            }
373
374            true
375        }
376
377        // metrics is:
378        //  pub struct SuiNodeMetrics {
379        //      pub jwk_requests: IntCounterVec,
380        //      pub jwk_request_errors: IntCounterVec,
381        //      pub total_jwks: IntCounterVec,
382        //      pub unique_jwks: IntCounterVec,
383        //  }
384
385        for p in supported_providers.into_iter() {
386            let provider_str = p.to_string();
387            let epoch_store = epoch_store.clone();
388            let consensus_adapter = consensus_adapter.clone();
389            let metrics = metrics.clone();
390            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
391                async move {
392                    // note: restart-safe de-duplication happens after consensus, this is
393                    // just best-effort to reduce unneeded submissions.
394                    let mut seen = HashSet::new();
395                    loop {
396                        info!("fetching JWK for provider {:?}", p);
397                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
398                        match Self::fetch_jwks(authority, &p).await {
399                            Err(e) => {
400                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
401                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
402                                // Retry in 30 seconds
403                                tokio::time::sleep(Duration::from_secs(30)).await;
404                                continue;
405                            }
406                            Ok(mut keys) => {
407                                metrics.total_jwks
408                                    .with_label_values(&[&provider_str])
409                                    .inc_by(keys.len() as u64);
410
411                                keys.retain(|(id, jwk)| {
412                                    validate_jwk(&metrics, &p, id, jwk) &&
413                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
414                                    seen.insert((id.clone(), jwk.clone()))
415                                });
416
417                                metrics.unique_jwks
418                                    .with_label_values(&[&provider_str])
419                                    .inc_by(keys.len() as u64);
420
421                                // prevent oauth providers from sending too many keys,
422                                // inadvertently or otherwise
423                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
424                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
425                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
426                                }
427
428                                for (id, jwk) in keys.into_iter() {
429                                    info!("Submitting JWK to consensus: {:?}", id);
430
431                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
432                                    consensus_adapter.submit(txn, None, &epoch_store, None, None)
433                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
434                                        .ok();
435                                }
436                            }
437                        }
438                        tokio::time::sleep(fetch_interval).await;
439                    }
440                }
441                .instrument(error_span!("jwk_updater_task", epoch)),
442            ));
443        }
444    }
445
446    pub async fn start_async(
447        config: NodeConfig,
448        registry_service: RegistryService,
449        server_version: ServerVersion,
450    ) -> Result<Arc<SuiNode>> {
451        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
452        let mut config = config.clone();
453        if config.supported_protocol_versions.is_none() {
454            info!(
455                "populating config.supported_protocol_versions with default {:?}",
456                SupportedProtocolVersions::SYSTEM_DEFAULT
457            );
458            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
459        }
460
461        let run_with_range = config.run_with_range;
462        let is_validator = config.consensus_config().is_some();
463        let is_full_node = !is_validator;
464        let prometheus_registry = registry_service.default_registry();
465
466        info!(node =? config.protocol_public_key(),
467            "Initializing sui-node listening on {}", config.network_address
468        );
469
470        // Initialize metrics to track db usage before creating any stores
471        DBMetrics::init(registry_service.clone());
472
473        // Initialize db sync-to-disk setting from config (falls back to env var if not set)
474        typed_store::init_write_sync(config.enable_db_sync_to_disk);
475
476        // Initialize Mysten metrics.
477        mysten_metrics::init_metrics(&prometheus_registry);
478        // Unsupported (because of the use of static variable) and unnecessary in simtests.
479        #[cfg(not(msim))]
480        mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
481
482        let genesis = config.genesis()?.clone();
483
484        let secret = Arc::pin(config.protocol_key_pair().copy());
485        let genesis_committee = genesis.committee()?;
486        let committee_store = Arc::new(CommitteeStore::new(
487            config.db_path().join("epochs"),
488            &genesis_committee,
489            None,
490        ));
491
492        let pruner_watermarks = Arc::new(PrunerWatermarks::default());
493        let checkpoint_store = CheckpointStore::new(
494            &config.db_path().join("checkpoints"),
495            pruner_watermarks.clone(),
496        );
497        let checkpoint_metrics = CheckpointMetrics::new(&registry_service.default_registry());
498
499        Self::check_and_recover_forks(
500            &checkpoint_store,
501            &checkpoint_metrics,
502            is_validator,
503            config.fork_recovery.as_ref(),
504        )
505        .await?;
506
507        let mut pruner_db = None;
508        if config
509            .authority_store_pruning_config
510            .enable_compaction_filter
511        {
512            pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
513                &config.db_path().join("store"),
514            )));
515        }
516        let compaction_filter = pruner_db
517            .clone()
518            .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
519
520        // By default, only enable write stall on validators for perpetual db.
521        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
522        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
523            enable_write_stall,
524            compaction_filter,
525        };
526        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
527            &config.db_path().join("store"),
528            Some(perpetual_tables_options),
529            Some(pruner_watermarks.epoch_id.clone()),
530        ));
531        let is_genesis = perpetual_tables
532            .database_is_empty()
533            .expect("Database read should not fail at init.");
534
535        let backpressure_manager =
536            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
537
538        let store =
539            AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
540
541        let cur_epoch = store.get_recovery_epoch_at_restart()?;
542        let committee = committee_store
543            .get_committee(&cur_epoch)?
544            .expect("Committee of the current epoch must exist");
545        let epoch_start_configuration = store
546            .get_epoch_start_configuration()?
547            .expect("EpochStartConfiguration of the current epoch must exist");
548        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
549        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
550
551        let cache_traits = build_execution_cache(
552            &config.execution_cache,
553            &prometheus_registry,
554            &store,
555            backpressure_manager.clone(),
556        );
557
558        let auth_agg = {
559            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
560            Arc::new(ArcSwap::new(Arc::new(
561                AuthorityAggregator::new_from_epoch_start_state(
562                    epoch_start_configuration.epoch_start_state(),
563                    &committee_store,
564                    safe_client_metrics_base,
565                ),
566            )))
567        };
568
569        let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
570        let chain = match config.chain_override_for_testing {
571            Some(chain) => chain,
572            None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
573        };
574
575        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
576        let epoch_store = AuthorityPerEpochStore::new(
577            config.protocol_public_key(),
578            committee.clone(),
579            &config.db_path().join("store"),
580            Some(epoch_options.options),
581            EpochMetrics::new(&registry_service.default_registry()),
582            epoch_start_configuration,
583            cache_traits.backing_package_store.clone(),
584            cache_traits.object_store.clone(),
585            cache_metrics,
586            signature_verifier_metrics,
587            &config.expensive_safety_check_config,
588            (chain_id, chain),
589            checkpoint_store
590                .get_highest_executed_checkpoint_seq_number()
591                .expect("checkpoint store read cannot fail")
592                .unwrap_or(0),
593            Arc::new(SubmittedTransactionCacheMetrics::new(
594                &registry_service.default_registry(),
595            )),
596        )?;
597
598        info!("created epoch store");
599
600        replay_log!(
601            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
602            epoch_store.epoch(),
603            epoch_store.protocol_config()
604        );
605
606        // the database is empty at genesis time
607        if is_genesis {
608            info!("checking SUI conservation at genesis");
609            // When we are opening the db table, the only time when it's safe to
610            // check SUI conservation is at genesis. Otherwise we may be in the middle of
611            // an epoch and the SUI conservation check will fail. This also initialize
612            // the expected_network_sui_amount table.
613            cache_traits
614                .reconfig_api
615                .expensive_check_sui_conservation(&epoch_store)
616                .expect("SUI conservation check cannot fail at genesis");
617        }
618
619        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
620        let default_buffer_stake = epoch_store
621            .protocol_config()
622            .buffer_stake_for_protocol_upgrade_bps();
623        if effective_buffer_stake != default_buffer_stake {
624            warn!(
625                ?effective_buffer_stake,
626                ?default_buffer_stake,
627                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
628            );
629        }
630
631        checkpoint_store.insert_genesis_checkpoint(
632            genesis.checkpoint(),
633            genesis.checkpoint_contents().clone(),
634            &epoch_store,
635        );
636
637        info!("creating state sync store");
638        let state_sync_store = RocksDbStore::new(
639            cache_traits.clone(),
640            committee_store.clone(),
641            checkpoint_store.clone(),
642        );
643
644        let index_store = if is_full_node && config.enable_index_processing {
645            info!("creating index store");
646            Some(Arc::new(IndexStore::new(
647                config.db_path().join("indexes"),
648                &prometheus_registry,
649                epoch_store
650                    .protocol_config()
651                    .max_move_identifier_len_as_option(),
652                config.remove_deprecated_tables,
653            )))
654        } else {
655            None
656        };
657
658        let rpc_index = if is_full_node && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
659            Some(Arc::new(
660                RpcIndexStore::new(
661                    &config.db_path(),
662                    &store,
663                    &checkpoint_store,
664                    &epoch_store,
665                    &cache_traits.backing_package_store,
666                    pruner_watermarks.checkpoint_id.clone(),
667                    config.rpc().cloned().unwrap_or_default(),
668                )
669                .await,
670            ))
671        } else {
672            None
673        };
674
675        let chain_identifier = epoch_store.get_chain_identifier();
676
677        info!("creating archive reader");
678        // Create network
679        let (randomness_tx, randomness_rx) = mpsc::channel(
680            config
681                .p2p_config
682                .randomness
683                .clone()
684                .unwrap_or_default()
685                .mailbox_capacity(),
686        );
687        let P2pComponents {
688            p2p_network,
689            known_peers,
690            discovery_handle,
691            state_sync_handle,
692            randomness_handle,
693        } = Self::create_p2p_network(
694            &config,
695            state_sync_store.clone(),
696            chain_identifier,
697            randomness_tx,
698            &prometheus_registry,
699        )?;
700
701        let endpoint_manager = EndpointManager::new(discovery_handle.clone());
702
703        // Send initial peer addresses to the p2p network.
704        update_peer_addresses(&config, &endpoint_manager, epoch_store.epoch_start_state());
705
706        info!("start snapshot upload");
707        // Start uploading state snapshot to remote store
708        let state_snapshot_handle = Self::start_state_snapshot(
709            &config,
710            &prometheus_registry,
711            checkpoint_store.clone(),
712            chain_identifier,
713        )?;
714
715        // Start uploading db checkpoints to remote store
716        info!("start db checkpoint");
717        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
718            &config,
719            &prometheus_registry,
720            state_snapshot_handle.is_some(),
721        )?;
722
723        if !epoch_store
724            .protocol_config()
725            .simplified_unwrap_then_delete()
726        {
727            // We cannot prune tombstones if simplified_unwrap_then_delete is not enabled.
728            config
729                .authority_store_pruning_config
730                .set_killswitch_tombstone_pruning(true);
731        }
732
733        let authority_name = config.protocol_public_key();
734
735        info!("create authority state");
736        let state = AuthorityState::new(
737            authority_name,
738            secret,
739            config.supported_protocol_versions.unwrap(),
740            store.clone(),
741            cache_traits.clone(),
742            epoch_store.clone(),
743            committee_store.clone(),
744            index_store.clone(),
745            rpc_index,
746            checkpoint_store.clone(),
747            &prometheus_registry,
748            genesis.objects(),
749            &db_checkpoint_config,
750            config.clone(),
751            chain_identifier,
752            pruner_db,
753            config.policy_config.clone(),
754            config.firewall_config.clone(),
755            pruner_watermarks,
756        )
757        .await;
758        // ensure genesis txn was executed
759        if epoch_store.epoch() == 0 {
760            let txn = &genesis.transaction();
761            let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
762            let transaction =
763                sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
764                    sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
765                        genesis.transaction().data().clone(),
766                        sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
767                    ),
768                );
769            state
770                .try_execute_immediately(
771                    &transaction,
772                    ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
773                    &epoch_store,
774                )
775                .instrument(span)
776                .await
777                .unwrap();
778        }
779
780        // Start the loop that receives new randomness and generates transactions for it.
781        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
782
783        if config
784            .expensive_safety_check_config
785            .enable_secondary_index_checks()
786            && let Some(indexes) = state.indexes.clone()
787        {
788            sui_core::verify_indexes::verify_indexes(
789                state.get_global_state_hash_store().as_ref(),
790                indexes,
791            )
792            .expect("secondary indexes are inconsistent");
793        }
794
795        let (end_of_epoch_channel, end_of_epoch_receiver) =
796            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
797
798        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
799            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
800                auth_agg.load_full(),
801                state.clone(),
802                end_of_epoch_receiver,
803                &config.db_path(),
804                &prometheus_registry,
805                &config,
806            )))
807        } else {
808            None
809        };
810
811        let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
812            state.clone(),
813            state_sync_store,
814            &transaction_orchestrator.clone(),
815            &config,
816            &prometheus_registry,
817            server_version,
818        )
819        .await?;
820
821        let global_state_hasher = Arc::new(GlobalStateHasher::new(
822            cache_traits.global_state_hash_store.clone(),
823            GlobalStateHashMetrics::new(&prometheus_registry),
824        ));
825
826        let authority_names_to_peer_ids = epoch_store
827            .epoch_start_state()
828            .get_authority_names_to_peer_ids();
829
830        let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
831            "sui",
832            &registry_service.default_registry(),
833        );
834
835        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
836
837        let connection_monitor_handle =
838            mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
839                p2p_network.downgrade(),
840                Arc::new(network_connection_metrics),
841                known_peers,
842            );
843
844        let connection_monitor_status = ConnectionMonitorStatus {
845            connection_statuses: connection_monitor_handle.connection_statuses(),
846            authority_names_to_peer_ids,
847        };
848
849        let connection_monitor_status = Arc::new(connection_monitor_status);
850        let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));
851
852        sui_node_metrics
853            .binary_max_protocol_version
854            .set(ProtocolVersion::MAX.as_u64() as i64);
855        sui_node_metrics
856            .configured_max_protocol_version
857            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
858
859        let validator_components = if state.is_validator(&epoch_store) {
860            let (components, _) = futures::join!(
861                Self::construct_validator_components(
862                    config.clone(),
863                    state.clone(),
864                    committee,
865                    epoch_store.clone(),
866                    checkpoint_store.clone(),
867                    state_sync_handle.clone(),
868                    randomness_handle.clone(),
869                    Arc::downgrade(&global_state_hasher),
870                    backpressure_manager.clone(),
871                    connection_monitor_status.clone(),
872                    &registry_service,
873                    sui_node_metrics.clone(),
874                    checkpoint_metrics.clone(),
875                ),
876                Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
877            );
878            let mut components = components?;
879
880            components.consensus_adapter.submit_recovered(&epoch_store);
881
882            // Start the gRPC server
883            components.validator_server_handle = components.validator_server_handle.start().await;
884
885            Some(components)
886        } else {
887            None
888        };
889
890        // setup shutdown channel
891        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
892
893        let node = Self {
894            config,
895            validator_components: Mutex::new(validator_components),
896            http_servers,
897            state,
898            transaction_orchestrator,
899            registry_service,
900            metrics: sui_node_metrics,
901            checkpoint_metrics,
902
903            _discovery: discovery_handle,
904            _connection_monitor_handle: connection_monitor_handle,
905            state_sync_handle,
906            randomness_handle,
907            checkpoint_store,
908            global_state_hasher: Mutex::new(Some(global_state_hasher)),
909            end_of_epoch_channel,
910            connection_monitor_status,
911            endpoint_manager,
912            backpressure_manager,
913
914            _db_checkpoint_handle: db_checkpoint_handle,
915
916            #[cfg(msim)]
917            sim_state: Default::default(),
918
919            _state_snapshot_uploader_handle: state_snapshot_handle,
920            shutdown_channel_tx: shutdown_channel,
921
922            auth_agg,
923            subscription_service_checkpoint_sender,
924        };
925
926        info!("SuiNode started!");
927        let node = Arc::new(node);
928        let node_copy = node.clone();
929        spawn_monitored_task!(async move {
930            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
931            if let Err(error) = result {
932                warn!("Reconfiguration finished with error {:?}", error);
933            }
934        });
935
936        Ok(node)
937    }
938
939    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
940        self.end_of_epoch_channel.subscribe()
941    }
942
943    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
944        self.shutdown_channel_tx.subscribe()
945    }
946
947    pub fn current_epoch_for_testing(&self) -> EpochId {
948        self.state.current_epoch_for_testing()
949    }
950
951    pub fn db_checkpoint_path(&self) -> PathBuf {
952        self.config.db_checkpoint_path()
953    }
954
955    // Init reconfig process by starting to reject user certs
956    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
957        info!("close_epoch (current epoch = {})", epoch_store.epoch());
958        self.validator_components
959            .lock()
960            .await
961            .as_ref()
962            .ok_or_else(|| SuiError::from("Node is not a validator"))?
963            .consensus_adapter
964            .close_epoch(epoch_store);
965        Ok(())
966    }
967
968    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
969        self.state
970            .clear_override_protocol_upgrade_buffer_stake(epoch)
971    }
972
973    pub fn set_override_protocol_upgrade_buffer_stake(
974        &self,
975        epoch: EpochId,
976        buffer_stake_bps: u64,
977    ) -> SuiResult {
978        self.state
979            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
980    }
981
982    // Testing-only API to start epoch close process.
983    // For production code, please use the non-testing version.
984    pub async fn close_epoch_for_testing(&self) -> SuiResult {
985        let epoch_store = self.state.epoch_store_for_testing();
986        self.close_epoch(&epoch_store).await
987    }
988
989    fn start_state_snapshot(
990        config: &NodeConfig,
991        prometheus_registry: &Registry,
992        checkpoint_store: Arc<CheckpointStore>,
993        chain_identifier: ChainIdentifier,
994    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
995        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
996            let snapshot_uploader = StateSnapshotUploader::new(
997                &config.db_checkpoint_path(),
998                &config.snapshot_path(),
999                remote_store_config.clone(),
1000                60,
1001                prometheus_registry,
1002                checkpoint_store,
1003                chain_identifier,
1004                config.state_snapshot_write_config.archive_interval_epochs,
1005            )?;
1006            Ok(Some(snapshot_uploader.start()))
1007        } else {
1008            Ok(None)
1009        }
1010    }
1011
1012    fn start_db_checkpoint(
1013        config: &NodeConfig,
1014        prometheus_registry: &Registry,
1015        state_snapshot_enabled: bool,
1016    ) -> Result<(
1017        DBCheckpointConfig,
1018        Option<tokio::sync::broadcast::Sender<()>>,
1019    )> {
1020        let checkpoint_path = Some(
1021            config
1022                .db_checkpoint_config
1023                .checkpoint_path
1024                .clone()
1025                .unwrap_or_else(|| config.db_checkpoint_path()),
1026        );
1027        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1028            DBCheckpointConfig {
1029                checkpoint_path,
1030                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1031                    true
1032                } else {
1033                    config
1034                        .db_checkpoint_config
1035                        .perform_db_checkpoints_at_epoch_end
1036                },
1037                ..config.db_checkpoint_config.clone()
1038            }
1039        } else {
1040            config.db_checkpoint_config.clone()
1041        };
1042
1043        match (
1044            db_checkpoint_config.object_store_config.as_ref(),
1045            state_snapshot_enabled,
1046        ) {
1047            // If db checkpoint config object store not specified but
1048            // state snapshot object store is specified, create handler
1049            // anyway for marking db checkpoints as completed so that they
1050            // can be uploaded as state snapshots.
1051            (None, false) => Ok((db_checkpoint_config, None)),
1052            (_, _) => {
1053                let handler = DBCheckpointHandler::new(
1054                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1055                    db_checkpoint_config.object_store_config.as_ref(),
1056                    60,
1057                    db_checkpoint_config
1058                        .prune_and_compact_before_upload
1059                        .unwrap_or(true),
1060                    config.authority_store_pruning_config.clone(),
1061                    prometheus_registry,
1062                    state_snapshot_enabled,
1063                )?;
1064                Ok((
1065                    db_checkpoint_config,
1066                    Some(DBCheckpointHandler::start(handler)),
1067                ))
1068            }
1069        }
1070    }
1071
1072    fn create_p2p_network(
1073        config: &NodeConfig,
1074        state_sync_store: RocksDbStore,
1075        chain_identifier: ChainIdentifier,
1076        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1077        prometheus_registry: &Registry,
1078    ) -> Result<P2pComponents> {
1079        let (state_sync, state_sync_router) = state_sync::Builder::new()
1080            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1081            .store(state_sync_store)
1082            .archive_config(config.archive_reader_config())
1083            .with_metrics(prometheus_registry)
1084            .build();
1085
1086        let (discovery, discovery_server) = discovery::Builder::new()
1087            .config(config.p2p_config.clone())
1088            .build();
1089
1090        let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1091        let known_peers: HashMap<PeerId, String> = discovery_config
1092            .allowlisted_peers
1093            .clone()
1094            .into_iter()
1095            .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1096            .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1097                peer.peer_id
1098                    .map(|peer_id| (peer_id, "seed_peer".to_string()))
1099            }))
1100            .collect();
1101
1102        let (randomness, randomness_router) =
1103            randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1104                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1105                .with_metrics(prometheus_registry)
1106                .build();
1107
1108        let p2p_network = {
1109            let routes = anemo::Router::new()
1110                .add_rpc_service(discovery_server)
1111                .merge(state_sync_router);
1112            let routes = routes.merge(randomness_router);
1113
1114            let inbound_network_metrics =
1115                mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1116            let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1117                "sui",
1118                "outbound",
1119                prometheus_registry,
1120            );
1121
1122            let service = ServiceBuilder::new()
1123                .layer(
1124                    TraceLayer::new_for_server_errors()
1125                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1126                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1127                )
1128                .layer(CallbackLayer::new(
1129                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1130                        Arc::new(inbound_network_metrics),
1131                        config.p2p_config.excessive_message_size(),
1132                    ),
1133                ))
1134                .service(routes);
1135
1136            let outbound_layer = ServiceBuilder::new()
1137                .layer(
1138                    TraceLayer::new_for_client_and_server_errors()
1139                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1140                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1141                )
1142                .layer(CallbackLayer::new(
1143                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1144                        Arc::new(outbound_network_metrics),
1145                        config.p2p_config.excessive_message_size(),
1146                    ),
1147                ))
1148                .into_inner();
1149
1150            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1151            // Set the max_frame_size to be 1 GB to work around the issue of there being too many
1152            // staking events in the epoch change txn.
1153            anemo_config.max_frame_size = Some(1 << 30);
1154
1155            // Set a higher default value for socket send/receive buffers if not already
1156            // configured.
1157            let mut quic_config = anemo_config.quic.unwrap_or_default();
1158            if quic_config.socket_send_buffer_size.is_none() {
1159                quic_config.socket_send_buffer_size = Some(20 << 20);
1160            }
1161            if quic_config.socket_receive_buffer_size.is_none() {
1162                quic_config.socket_receive_buffer_size = Some(20 << 20);
1163            }
1164            quic_config.allow_failed_socket_buffer_size_setting = true;
1165
1166            // Set high-performance defaults for quinn transport.
1167            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1168            if quic_config.max_concurrent_bidi_streams.is_none() {
1169                quic_config.max_concurrent_bidi_streams = Some(500);
1170            }
1171            if quic_config.max_concurrent_uni_streams.is_none() {
1172                quic_config.max_concurrent_uni_streams = Some(500);
1173            }
1174            if quic_config.stream_receive_window.is_none() {
1175                quic_config.stream_receive_window = Some(100 << 20);
1176            }
1177            if quic_config.receive_window.is_none() {
1178                quic_config.receive_window = Some(200 << 20);
1179            }
1180            if quic_config.send_window.is_none() {
1181                quic_config.send_window = Some(200 << 20);
1182            }
1183            if quic_config.crypto_buffer_size.is_none() {
1184                quic_config.crypto_buffer_size = Some(1 << 20);
1185            }
1186            if quic_config.max_idle_timeout_ms.is_none() {
1187                quic_config.max_idle_timeout_ms = Some(30_000);
1188            }
1189            if quic_config.keep_alive_interval_ms.is_none() {
1190                quic_config.keep_alive_interval_ms = Some(5_000);
1191            }
1192            anemo_config.quic = Some(quic_config);
1193
1194            let server_name = format!("sui-{}", chain_identifier);
1195            let network = Network::bind(config.p2p_config.listen_address)
1196                .server_name(&server_name)
1197                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1198                .config(anemo_config)
1199                .outbound_request_layer(outbound_layer)
1200                .start(service)?;
1201            info!(
1202                server_name = server_name,
1203                "P2p network started on {}",
1204                network.local_addr()
1205            );
1206
1207            network
1208        };
1209
1210        let discovery_handle =
1211            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1212        let state_sync_handle = state_sync.start(p2p_network.clone());
1213        let randomness_handle = randomness.start(p2p_network.clone());
1214
1215        Ok(P2pComponents {
1216            p2p_network,
1217            known_peers,
1218            discovery_handle,
1219            state_sync_handle,
1220            randomness_handle,
1221        })
1222    }
1223
1224    async fn construct_validator_components(
1225        config: NodeConfig,
1226        state: Arc<AuthorityState>,
1227        committee: Arc<Committee>,
1228        epoch_store: Arc<AuthorityPerEpochStore>,
1229        checkpoint_store: Arc<CheckpointStore>,
1230        state_sync_handle: state_sync::Handle,
1231        randomness_handle: randomness::Handle,
1232        global_state_hasher: Weak<GlobalStateHasher>,
1233        backpressure_manager: Arc<BackpressureManager>,
1234        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1235        registry_service: &RegistryService,
1236        sui_node_metrics: Arc<SuiNodeMetrics>,
1237        checkpoint_metrics: Arc<CheckpointMetrics>,
1238    ) -> Result<ValidatorComponents> {
1239        let mut config_clone = config.clone();
1240        let consensus_config = config_clone
1241            .consensus_config
1242            .as_mut()
1243            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1244
1245        let client = Arc::new(UpdatableConsensusClient::new());
1246        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1247            &committee,
1248            consensus_config,
1249            state.name,
1250            connection_monitor_status.clone(),
1251            &registry_service.default_registry(),
1252            epoch_store.protocol_config().clone(),
1253            client.clone(),
1254            checkpoint_store.clone(),
1255        ));
1256        let consensus_manager = Arc::new(ConsensusManager::new(
1257            &config,
1258            consensus_config,
1259            registry_service,
1260            client,
1261        ));
1262
1263        // This only gets started up once, not on every epoch. (Make call to remove every epoch.)
1264        let consensus_store_pruner = ConsensusStorePruner::new(
1265            consensus_manager.get_storage_base_path(),
1266            consensus_config.db_retention_epochs(),
1267            consensus_config.db_pruner_period(),
1268            &registry_service.default_registry(),
1269        );
1270
1271        let sui_tx_validator_metrics =
1272            SuiTxValidatorMetrics::new(&registry_service.default_registry());
1273
1274        let validator_server_handle = Self::start_grpc_validator_service(
1275            &config,
1276            state.clone(),
1277            consensus_adapter.clone(),
1278            &registry_service.default_registry(),
1279        )
1280        .await?;
1281
1282        // Starts an overload monitor that monitors the execution of the authority.
1283        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1284        let validator_overload_monitor_handle = if config
1285            .authority_overload_config
1286            .max_load_shedding_percentage
1287            > 0
1288        {
1289            let authority_state = Arc::downgrade(&state);
1290            let overload_config = config.authority_overload_config.clone();
1291            fail_point!("starting_overload_monitor");
1292            Some(spawn_monitored_task!(overload_monitor(
1293                authority_state,
1294                overload_config,
1295            )))
1296        } else {
1297            None
1298        };
1299
1300        Self::start_epoch_specific_validator_components(
1301            &config,
1302            state.clone(),
1303            consensus_adapter,
1304            checkpoint_store,
1305            epoch_store,
1306            state_sync_handle,
1307            randomness_handle,
1308            consensus_manager,
1309            consensus_store_pruner,
1310            global_state_hasher,
1311            backpressure_manager,
1312            validator_server_handle,
1313            validator_overload_monitor_handle,
1314            checkpoint_metrics,
1315            sui_node_metrics,
1316            sui_tx_validator_metrics,
1317        )
1318        .await
1319    }
1320
1321    async fn start_epoch_specific_validator_components(
1322        config: &NodeConfig,
1323        state: Arc<AuthorityState>,
1324        consensus_adapter: Arc<ConsensusAdapter>,
1325        checkpoint_store: Arc<CheckpointStore>,
1326        epoch_store: Arc<AuthorityPerEpochStore>,
1327        state_sync_handle: state_sync::Handle,
1328        randomness_handle: randomness::Handle,
1329        consensus_manager: Arc<ConsensusManager>,
1330        consensus_store_pruner: ConsensusStorePruner,
1331        state_hasher: Weak<GlobalStateHasher>,
1332        backpressure_manager: Arc<BackpressureManager>,
1333        validator_server_handle: SpawnOnce,
1334        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1335        checkpoint_metrics: Arc<CheckpointMetrics>,
1336        sui_node_metrics: Arc<SuiNodeMetrics>,
1337        sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1338    ) -> Result<ValidatorComponents> {
1339        let checkpoint_service = Self::build_checkpoint_service(
1340            config,
1341            consensus_adapter.clone(),
1342            checkpoint_store.clone(),
1343            epoch_store.clone(),
1344            state.clone(),
1345            state_sync_handle,
1346            state_hasher,
1347            checkpoint_metrics.clone(),
1348        );
1349
1350        // create a new map that gets injected into both the consensus handler and the consensus adapter
1351        // the consensus handler will write values forwarded from consensus, and the consensus adapter
1352        // will read the values to make decisions about which validator submits a transaction to consensus
1353        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1354
1355        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1356
1357        if epoch_store.randomness_state_enabled() {
1358            let randomness_manager = RandomnessManager::try_new(
1359                Arc::downgrade(&epoch_store),
1360                Box::new(consensus_adapter.clone()),
1361                randomness_handle,
1362                config.protocol_key_pair(),
1363            )
1364            .await;
1365            if let Some(randomness_manager) = randomness_manager {
1366                epoch_store
1367                    .set_randomness_manager(randomness_manager)
1368                    .await?;
1369            }
1370        }
1371
1372        ExecutionTimeObserver::spawn(
1373            epoch_store.clone(),
1374            Box::new(consensus_adapter.clone()),
1375            config
1376                .execution_time_observer_config
1377                .clone()
1378                .unwrap_or_default(),
1379        );
1380
1381        let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1382            None,
1383            state.metrics.clone(),
1384        ));
1385
1386        let throughput_profiler = Arc::new(ConsensusThroughputProfiler::new(
1387            throughput_calculator.clone(),
1388            None,
1389            None,
1390            state.metrics.clone(),
1391            ThroughputProfileRanges::from_chain(epoch_store.get_chain_identifier()),
1392        ));
1393
1394        consensus_adapter.swap_throughput_profiler(throughput_profiler);
1395
1396        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1397            state.clone(),
1398            checkpoint_service.clone(),
1399            epoch_store.clone(),
1400            consensus_adapter.clone(),
1401            low_scoring_authorities,
1402            throughput_calculator,
1403            backpressure_manager,
1404        );
1405
1406        info!("Starting consensus manager asynchronously");
1407
1408        // Spawn consensus startup asynchronously to avoid blocking other components
1409        tokio::spawn({
1410            let config = config.clone();
1411            let epoch_store = epoch_store.clone();
1412            let sui_tx_validator = SuiTxValidator::new(
1413                state.clone(),
1414                epoch_store.clone(),
1415                checkpoint_service.clone(),
1416                sui_tx_validator_metrics.clone(),
1417            );
1418            let consensus_manager = consensus_manager.clone();
1419            async move {
1420                consensus_manager
1421                    .start(
1422                        &config,
1423                        epoch_store,
1424                        consensus_handler_initializer,
1425                        sui_tx_validator,
1426                    )
1427                    .await;
1428            }
1429        });
1430        let replay_waiter = consensus_manager.replay_waiter();
1431
1432        info!("Spawning checkpoint service");
1433        let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1434            None
1435        } else {
1436            Some(replay_waiter)
1437        };
1438        checkpoint_service
1439            .spawn(epoch_store.clone(), replay_waiter)
1440            .await;
1441
1442        if epoch_store.authenticator_state_enabled() {
1443            Self::start_jwk_updater(
1444                config,
1445                sui_node_metrics,
1446                state.name,
1447                epoch_store.clone(),
1448                consensus_adapter.clone(),
1449            );
1450        }
1451
1452        Ok(ValidatorComponents {
1453            validator_server_handle,
1454            validator_overload_monitor_handle,
1455            consensus_manager,
1456            consensus_store_pruner,
1457            consensus_adapter,
1458            checkpoint_metrics,
1459            sui_tx_validator_metrics,
1460        })
1461    }
1462
1463    fn build_checkpoint_service(
1464        config: &NodeConfig,
1465        consensus_adapter: Arc<ConsensusAdapter>,
1466        checkpoint_store: Arc<CheckpointStore>,
1467        epoch_store: Arc<AuthorityPerEpochStore>,
1468        state: Arc<AuthorityState>,
1469        state_sync_handle: state_sync::Handle,
1470        state_hasher: Weak<GlobalStateHasher>,
1471        checkpoint_metrics: Arc<CheckpointMetrics>,
1472    ) -> Arc<CheckpointService> {
1473        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1474        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1475
1476        debug!(
1477            "Starting checkpoint service with epoch start timestamp {}
1478            and epoch duration {}",
1479            epoch_start_timestamp_ms, epoch_duration_ms
1480        );
1481
1482        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1483            sender: consensus_adapter,
1484            signer: state.secret.clone(),
1485            authority: config.protocol_public_key(),
1486            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1487                .checked_add(epoch_duration_ms)
1488                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1489            metrics: checkpoint_metrics.clone(),
1490        });
1491
1492        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1493        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1494        let max_checkpoint_size_bytes =
1495            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1496
1497        CheckpointService::build(
1498            state.clone(),
1499            checkpoint_store,
1500            epoch_store,
1501            state.get_transaction_cache_reader().clone(),
1502            state_hasher,
1503            checkpoint_output,
1504            Box::new(certified_checkpoint_output),
1505            checkpoint_metrics,
1506            max_tx_per_checkpoint,
1507            max_checkpoint_size_bytes,
1508        )
1509    }
1510
1511    fn construct_consensus_adapter(
1512        committee: &Committee,
1513        consensus_config: &ConsensusConfig,
1514        authority: AuthorityName,
1515        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1516        prometheus_registry: &Registry,
1517        protocol_config: ProtocolConfig,
1518        consensus_client: Arc<dyn ConsensusClient>,
1519        checkpoint_store: Arc<CheckpointStore>,
1520    ) -> ConsensusAdapter {
1521        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1522        // The consensus adapter allows the authority to send user certificates through consensus.
1523
1524        ConsensusAdapter::new(
1525            consensus_client,
1526            checkpoint_store,
1527            authority,
1528            connection_monitor_status,
1529            consensus_config.max_pending_transactions(),
1530            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1531            consensus_config.max_submit_position,
1532            consensus_config.submit_delay_step_override(),
1533            ca_metrics,
1534            protocol_config,
1535        )
1536    }
1537
1538    async fn start_grpc_validator_service(
1539        config: &NodeConfig,
1540        state: Arc<AuthorityState>,
1541        consensus_adapter: Arc<ConsensusAdapter>,
1542        prometheus_registry: &Registry,
1543    ) -> Result<SpawnOnce> {
1544        let validator_service = ValidatorService::new(
1545            state.clone(),
1546            consensus_adapter,
1547            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1548            config.policy_config.clone().map(|p| p.client_id_source),
1549        );
1550
1551        let mut server_conf = mysten_network::config::Config::new();
1552        server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1553        server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1554        server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1555        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1556        server_conf.load_shed = config.grpc_load_shed;
1557        let mut server_builder =
1558            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1559
1560        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1561
1562        let tls_config = sui_tls::create_rustls_server_config(
1563            config.network_key_pair().copy().private(),
1564            SUI_TLS_SERVER_NAME.to_string(),
1565        );
1566
1567        let network_address = config.network_address().clone();
1568
1569        let (ready_tx, ready_rx) = oneshot::channel();
1570
1571        Ok(SpawnOnce::new(ready_rx, async move {
1572            let server = server_builder
1573                .bind(&network_address, Some(tls_config))
1574                .await
1575                .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1576            let local_addr = server.local_addr();
1577            info!("Listening to traffic on {local_addr}");
1578            ready_tx.send(()).unwrap();
1579            if let Err(err) = server.serve().await {
1580                info!("Server stopped: {err}");
1581            }
1582            info!("Server stopped");
1583        }))
1584    }
1585
1586    /// Re-executes pending consensus certificates, which may not have been committed to disk
1587    /// before the node restarted. This is necessary for the following reasons:
1588    ///
1589    /// 1. For any transaction for which we returned signed effects to a client, we must ensure
1590    ///    that we have re-executed the transaction before we begin accepting grpc requests.
1591    ///    Otherwise we would appear to have forgotten about the transaction.
1592    /// 2. While this is running, we are concurrently waiting for all previously built checkpoints
1593    ///    to be rebuilt. Since there may be dependencies in either direction (from checkpointed
1594    ///    consensus transactions to pending consensus transactions, or vice versa), we must
1595    ///    re-execute pending consensus transactions to ensure that both processes can complete.
1596    /// 3. Also note that for any pending consensus transactions for which we wrote a signed effects
1597    ///    digest to disk, we must re-execute using that digest as the expected effects digest,
1598    ///    to ensure that we cannot arrive at different effects than what we previously signed.
1599    async fn reexecute_pending_consensus_certs(
1600        epoch_store: &Arc<AuthorityPerEpochStore>,
1601        state: &Arc<AuthorityState>,
1602    ) {
1603        let mut pending_consensus_certificates = Vec::new();
1604        let mut additional_certs = Vec::new();
1605
1606        for tx in epoch_store.get_all_pending_consensus_transactions() {
1607            match tx.kind {
1608                // Shared object txns cannot be re-executed at this point, because we must wait for
1609                // consensus replay to assign shared object versions.
1610                // Similarly, when preconsensus locking is disabled, owned object transactions
1611                // must go through consensus to determine execution order.
1612                ConsensusTransactionKind::CertifiedTransaction(tx)
1613                    if !tx.is_consensus_tx()
1614                        && !epoch_store.protocol_config().disable_preconsensus_locking() =>
1615                {
1616                    let tx = *tx;
1617                    // new_unchecked is safe because we never submit a transaction to consensus
1618                    // without verifying it
1619                    let tx = VerifiedExecutableTransaction::new_from_certificate(
1620                        VerifiedCertificate::new_unchecked(tx),
1621                    );
1622                    // we only need to re-execute if we previously signed the effects (which indicates we
1623                    // returned the effects to a client).
1624                    if let Some(fx_digest) = epoch_store
1625                        .get_signed_effects_digest(tx.digest())
1626                        .expect("db error")
1627                    {
1628                        pending_consensus_certificates.push((
1629                            Schedulable::Transaction(tx),
1630                            ExecutionEnv::new().with_expected_effects_digest(fx_digest),
1631                        ));
1632                    } else {
1633                        additional_certs.push((
1634                            Schedulable::Transaction(tx),
1635                            ExecutionEnv::new()
1636                                .with_scheduling_source(SchedulingSource::NonFastPath),
1637                        ));
1638                    }
1639                }
1640                _ => (),
1641            }
1642        }
1643
1644        let digests = pending_consensus_certificates
1645            .iter()
1646            // unwrap_digest okay because only user certs are in pending_consensus_certificates
1647            .map(|(tx, _)| *tx.key().unwrap_digest())
1648            .collect::<Vec<_>>();
1649
1650        info!(
1651            "reexecuting {} pending consensus certificates: {:?}",
1652            digests.len(),
1653            digests
1654        );
1655
1656        state
1657            .execution_scheduler()
1658            .enqueue(pending_consensus_certificates, epoch_store);
1659        state
1660            .execution_scheduler()
1661            .enqueue(additional_certs, epoch_store);
1662
1663        // If this times out, the validator will still almost certainly start up fine. But, it is
1664        // possible that it may temporarily "forget" about transactions that it had previously
1665        // executed. This could confuse clients in some circumstances. However, the transactions
1666        // are still in pending_consensus_certificates, so we cannot lose any finality guarantees.
1667        let timeout = if cfg!(msim) { 120 } else { 60 };
1668        if tokio::time::timeout(
1669            std::time::Duration::from_secs(timeout),
1670            state
1671                .get_transaction_cache_reader()
1672                .notify_read_executed_effects_digests(
1673                    "SuiNode::notify_read_executed_effects_digests",
1674                    &digests,
1675                ),
1676        )
1677        .await
1678        .is_err()
1679        {
1680            // Log all the digests that were not executed to help debugging.
1681            let executed_effects_digests = state
1682                .get_transaction_cache_reader()
1683                .multi_get_executed_effects_digests(&digests);
1684            let pending_digests = digests
1685                .iter()
1686                .zip(executed_effects_digests.iter())
1687                .filter_map(|(digest, executed_effects_digest)| {
1688                    if executed_effects_digest.is_none() {
1689                        Some(digest)
1690                    } else {
1691                        None
1692                    }
1693                })
1694                .collect::<Vec<_>>();
1695            debug_fatal!(
1696                "Timed out waiting for effects digests to be executed: {:?}",
1697                pending_digests
1698            );
1699        }
1700    }
1701
1702    pub fn state(&self) -> Arc<AuthorityState> {
1703        self.state.clone()
1704    }
1705
1706    // Only used for testing because of how epoch store is loaded.
1707    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1708        self.state.reference_gas_price_for_testing()
1709    }
1710
1711    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1712        self.state.committee_store().clone()
1713    }
1714
1715    /*
1716    pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1717        self.state.db()
1718    }
1719    */
1720
1721    /// Clone an AuthorityAggregator currently used in this node, if the node is a fullnode.
1722    /// After reconfig, Transaction Driver builds a new AuthorityAggregator. The caller
1723    /// of this function will mostly likely want to call this again
1724    /// to get a fresh one.
1725    pub fn clone_authority_aggregator(
1726        &self,
1727    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1728        self.transaction_orchestrator
1729            .as_ref()
1730            .map(|to| to.clone_authority_aggregator())
1731    }
1732
1733    pub fn transaction_orchestrator(
1734        &self,
1735    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1736        self.transaction_orchestrator.clone()
1737    }
1738
1739    /// This function awaits the completion of checkpoint execution of the current epoch,
1740    /// after which it initiates reconfiguration of the entire system.
1741    pub async fn monitor_reconfiguration(
1742        self: Arc<Self>,
1743        mut epoch_store: Arc<AuthorityPerEpochStore>,
1744    ) -> Result<()> {
1745        let checkpoint_executor_metrics =
1746            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1747
1748        loop {
1749            let mut hasher_guard = self.global_state_hasher.lock().await;
1750            let hasher = hasher_guard.take().unwrap();
1751            info!(
1752                "Creating checkpoint executor for epoch {}",
1753                epoch_store.epoch()
1754            );
1755            let checkpoint_executor = CheckpointExecutor::new(
1756                epoch_store.clone(),
1757                self.checkpoint_store.clone(),
1758                self.state.clone(),
1759                hasher.clone(),
1760                self.backpressure_manager.clone(),
1761                self.config.checkpoint_executor_config.clone(),
1762                checkpoint_executor_metrics.clone(),
1763                self.subscription_service_checkpoint_sender.clone(),
1764            );
1765
1766            let run_with_range = self.config.run_with_range;
1767
1768            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1769
1770            // Update the current protocol version metric.
1771            self.metrics
1772                .current_protocol_version
1773                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1774
1775            // Advertise capabilities to committee, if we are a validator.
1776            if let Some(components) = &*self.validator_components.lock().await {
1777                // TODO: without this sleep, the consensus message is not delivered reliably.
1778                tokio::time::sleep(Duration::from_millis(1)).await;
1779
1780                let config = cur_epoch_store.protocol_config();
1781                let mut supported_protocol_versions = self
1782                    .config
1783                    .supported_protocol_versions
1784                    .expect("Supported versions should be populated")
1785                    // no need to send digests of versions less than the current version
1786                    .truncate_below(config.version);
1787
1788                while supported_protocol_versions.max > config.version {
1789                    let proposed_protocol_config = ProtocolConfig::get_for_version(
1790                        supported_protocol_versions.max,
1791                        cur_epoch_store.get_chain(),
1792                    );
1793
1794                    if proposed_protocol_config.enable_accumulators()
1795                        && !epoch_store.accumulator_root_exists()
1796                    {
1797                        error!(
1798                            "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1799                            supported_protocol_versions.max
1800                        );
1801                        supported_protocol_versions.max = supported_protocol_versions.max.prev();
1802                    } else {
1803                        break;
1804                    }
1805                }
1806
1807                let binary_config = config.binary_config(None);
1808                let transaction = ConsensusTransaction::new_capability_notification_v2(
1809                    AuthorityCapabilitiesV2::new(
1810                        self.state.name,
1811                        cur_epoch_store.get_chain_identifier().chain(),
1812                        supported_protocol_versions,
1813                        self.state
1814                            .get_available_system_packages(&binary_config)
1815                            .await,
1816                    ),
1817                );
1818                info!(?transaction, "submitting capabilities to consensus");
1819                components.consensus_adapter.submit(
1820                    transaction,
1821                    None,
1822                    &cur_epoch_store,
1823                    None,
1824                    None,
1825                )?;
1826            }
1827
1828            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1829
1830            if stop_condition == StopReason::RunWithRangeCondition {
1831                SuiNode::shutdown(&self).await;
1832                self.shutdown_channel_tx
1833                    .send(run_with_range)
1834                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1835                return Ok(());
1836            }
1837
1838            // Safe to call because we are in the middle of reconfiguration.
1839            let latest_system_state = self
1840                .state
1841                .get_object_cache_reader()
1842                .get_sui_system_state_object_unsafe()
1843                .expect("Read Sui System State object cannot fail");
1844
1845            #[cfg(msim)]
1846            if !self
1847                .sim_state
1848                .sim_safe_mode_expected
1849                .load(Ordering::Relaxed)
1850            {
1851                debug_assert!(!latest_system_state.safe_mode());
1852            }
1853
1854            #[cfg(not(msim))]
1855            debug_assert!(!latest_system_state.safe_mode());
1856
1857            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1858                && self.state.is_fullnode(&cur_epoch_store)
1859            {
1860                warn!(
1861                    "Failed to send end of epoch notification to subscriber: {:?}",
1862                    err
1863                );
1864            }
1865
1866            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1867            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1868
1869            self.auth_agg.store(Arc::new(
1870                self.auth_agg
1871                    .load()
1872                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1873            ));
1874
1875            let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1876            let next_epoch = next_epoch_committee.epoch();
1877            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1878
1879            info!(
1880                next_epoch,
1881                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1882            );
1883
1884            fail_point_async!("reconfig_delay");
1885
1886            // We save the connection monitor status map regardless of validator / fullnode status
1887            // so that we don't need to restart the connection monitor every epoch.
1888            // Update the mappings that will be used by the consensus adapter if it exists or is
1889            // about to be created.
1890            let authority_names_to_peer_ids =
1891                new_epoch_start_state.get_authority_names_to_peer_ids();
1892            self.connection_monitor_status
1893                .update_mapping_for_epoch(authority_names_to_peer_ids);
1894
1895            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1896
1897            update_peer_addresses(&self.config, &self.endpoint_manager, &new_epoch_start_state);
1898
1899            let mut validator_components_lock_guard = self.validator_components.lock().await;
1900
1901            // The following code handles 4 different cases, depending on whether the node
1902            // was a validator in the previous epoch, and whether the node is a validator
1903            // in the new epoch.
1904            let new_epoch_store = self
1905                .reconfigure_state(
1906                    &self.state,
1907                    &cur_epoch_store,
1908                    next_epoch_committee.clone(),
1909                    new_epoch_start_state,
1910                    hasher.clone(),
1911                )
1912                .await;
1913
1914            let new_validator_components = if let Some(ValidatorComponents {
1915                validator_server_handle,
1916                validator_overload_monitor_handle,
1917                consensus_manager,
1918                consensus_store_pruner,
1919                consensus_adapter,
1920                checkpoint_metrics,
1921                sui_tx_validator_metrics,
1922            }) = validator_components_lock_guard.take()
1923            {
1924                info!("Reconfiguring the validator.");
1925
1926                consensus_manager.shutdown().await;
1927                info!("Consensus has shut down.");
1928
1929                info!("Epoch store finished reconfiguration.");
1930
1931                // No other components should be holding a strong reference to state hasher
1932                // at this point. Confirm here before we swap in the new hasher.
1933                let global_state_hasher_metrics = Arc::into_inner(hasher)
1934                    .expect("Object state hasher should have no other references at this point")
1935                    .metrics();
1936                let new_hasher = Arc::new(GlobalStateHasher::new(
1937                    self.state.get_global_state_hash_store().clone(),
1938                    global_state_hasher_metrics,
1939                ));
1940                let weak_hasher = Arc::downgrade(&new_hasher);
1941                *hasher_guard = Some(new_hasher);
1942
1943                consensus_store_pruner.prune(next_epoch).await;
1944
1945                if self.state.is_validator(&new_epoch_store) {
1946                    // Only restart consensus if this node is still a validator in the new epoch.
1947                    Some(
1948                        Self::start_epoch_specific_validator_components(
1949                            &self.config,
1950                            self.state.clone(),
1951                            consensus_adapter,
1952                            self.checkpoint_store.clone(),
1953                            new_epoch_store.clone(),
1954                            self.state_sync_handle.clone(),
1955                            self.randomness_handle.clone(),
1956                            consensus_manager,
1957                            consensus_store_pruner,
1958                            weak_hasher,
1959                            self.backpressure_manager.clone(),
1960                            validator_server_handle,
1961                            validator_overload_monitor_handle,
1962                            checkpoint_metrics,
1963                            self.metrics.clone(),
1964                            sui_tx_validator_metrics,
1965                        )
1966                        .await?,
1967                    )
1968                } else {
1969                    info!("This node is no longer a validator after reconfiguration");
1970                    None
1971                }
1972            } else {
1973                // No other components should be holding a strong reference to state hasher
1974                // at this point. Confirm here before we swap in the new hasher.
1975                let global_state_hasher_metrics = Arc::into_inner(hasher)
1976                    .expect("Object state hasher should have no other references at this point")
1977                    .metrics();
1978                let new_hasher = Arc::new(GlobalStateHasher::new(
1979                    self.state.get_global_state_hash_store().clone(),
1980                    global_state_hasher_metrics,
1981                ));
1982                let weak_hasher = Arc::downgrade(&new_hasher);
1983                *hasher_guard = Some(new_hasher);
1984
1985                if self.state.is_validator(&new_epoch_store) {
1986                    info!("Promoting the node from fullnode to validator, starting grpc server");
1987
1988                    let mut components = Self::construct_validator_components(
1989                        self.config.clone(),
1990                        self.state.clone(),
1991                        Arc::new(next_epoch_committee.clone()),
1992                        new_epoch_store.clone(),
1993                        self.checkpoint_store.clone(),
1994                        self.state_sync_handle.clone(),
1995                        self.randomness_handle.clone(),
1996                        weak_hasher,
1997                        self.backpressure_manager.clone(),
1998                        self.connection_monitor_status.clone(),
1999                        &self.registry_service,
2000                        self.metrics.clone(),
2001                        self.checkpoint_metrics.clone(),
2002                    )
2003                    .await?;
2004
2005                    components.validator_server_handle =
2006                        components.validator_server_handle.start().await;
2007
2008                    Some(components)
2009                } else {
2010                    None
2011                }
2012            };
2013            *validator_components_lock_guard = new_validator_components;
2014
2015            // Force releasing current epoch store DB handle, because the
2016            // Arc<AuthorityPerEpochStore> may linger.
2017            cur_epoch_store.release_db_handles();
2018
2019            if cfg!(msim)
2020                && !matches!(
2021                    self.config
2022                        .authority_store_pruning_config
2023                        .num_epochs_to_retain_for_checkpoints(),
2024                    None | Some(u64::MAX) | Some(0)
2025                )
2026            {
2027                self.state
2028                    .prune_checkpoints_for_eligible_epochs_for_testing(
2029                        self.config.clone(),
2030                        sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2031                    )
2032                    .await?;
2033            }
2034
2035            epoch_store = new_epoch_store;
2036            info!("Reconfiguration finished");
2037        }
2038    }
2039
2040    async fn shutdown(&self) {
2041        if let Some(validator_components) = &*self.validator_components.lock().await {
2042            validator_components.consensus_manager.shutdown().await;
2043        }
2044    }
2045
2046    async fn reconfigure_state(
2047        &self,
2048        state: &Arc<AuthorityState>,
2049        cur_epoch_store: &AuthorityPerEpochStore,
2050        next_epoch_committee: Committee,
2051        next_epoch_start_system_state: EpochStartSystemState,
2052        global_state_hasher: Arc<GlobalStateHasher>,
2053    ) -> Arc<AuthorityPerEpochStore> {
2054        let next_epoch = next_epoch_committee.epoch();
2055
2056        let last_checkpoint = self
2057            .checkpoint_store
2058            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2059            .expect("Error loading last checkpoint for current epoch")
2060            .expect("Could not load last checkpoint for current epoch");
2061
2062        let last_checkpoint_seq = *last_checkpoint.sequence_number();
2063
2064        assert_eq!(
2065            Some(last_checkpoint_seq),
2066            self.checkpoint_store
2067                .get_highest_executed_checkpoint_seq_number()
2068                .expect("Error loading highest executed checkpoint sequence number")
2069        );
2070
2071        let epoch_start_configuration = EpochStartConfiguration::new(
2072            next_epoch_start_system_state,
2073            *last_checkpoint.digest(),
2074            state.get_object_store().as_ref(),
2075            EpochFlag::default_flags_for_new_epoch(&state.config),
2076        )
2077        .expect("EpochStartConfiguration construction cannot fail");
2078
2079        let new_epoch_store = self
2080            .state
2081            .reconfigure(
2082                cur_epoch_store,
2083                self.config.supported_protocol_versions.unwrap(),
2084                next_epoch_committee,
2085                epoch_start_configuration,
2086                global_state_hasher,
2087                &self.config.expensive_safety_check_config,
2088                last_checkpoint_seq,
2089            )
2090            .await
2091            .expect("Reconfigure authority state cannot fail");
2092        info!(next_epoch, "Node State has been reconfigured");
2093        assert_eq!(next_epoch, new_epoch_store.epoch());
2094        self.state.get_reconfig_api().update_epoch_flags_metrics(
2095            cur_epoch_store.epoch_start_config().flags(),
2096            new_epoch_store.epoch_start_config().flags(),
2097        );
2098
2099        new_epoch_store
2100    }
2101
2102    pub fn get_config(&self) -> &NodeConfig {
2103        &self.config
2104    }
2105
2106    pub fn randomness_handle(&self) -> randomness::Handle {
2107        self.randomness_handle.clone()
2108    }
2109
2110    /// Get a short prefix of a digest for metric labels
2111    fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2112        let digest_str = digest.to_string();
2113        if digest_str.len() >= 8 {
2114            digest_str[0..8].to_string()
2115        } else {
2116            digest_str
2117        }
2118    }
2119
2120    /// Check for previously detected forks and handle them appropriately.
2121    /// For validators with fork recovery config, clear the fork if it matches the recovery config.
2122    /// For all other cases, block node startup if a fork is detected.
2123    async fn check_and_recover_forks(
2124        checkpoint_store: &CheckpointStore,
2125        checkpoint_metrics: &CheckpointMetrics,
2126        is_validator: bool,
2127        fork_recovery: Option<&ForkRecoveryConfig>,
2128    ) -> Result<()> {
2129        // Fork detection and recovery is only relevant for validators
2130        // Fullnodes should sync from validators and don't need fork checking
2131        if !is_validator {
2132            return Ok(());
2133        }
2134
2135        // Try to recover from forks if recovery config is provided
2136        if let Some(recovery) = fork_recovery {
2137            Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2138            Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2139        }
2140
2141        if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2142            .get_checkpoint_fork_detected()
2143            .map_err(|e| {
2144                error!("Failed to check for checkpoint fork: {:?}", e);
2145                e
2146            })?
2147        {
2148            Self::handle_checkpoint_fork(
2149                checkpoint_seq,
2150                checkpoint_digest,
2151                checkpoint_metrics,
2152                fork_recovery,
2153            )
2154            .await?;
2155        }
2156        if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2157            .get_transaction_fork_detected()
2158            .map_err(|e| {
2159                error!("Failed to check for transaction fork: {:?}", e);
2160                e
2161            })?
2162        {
2163            Self::handle_transaction_fork(
2164                tx_digest,
2165                expected_effects,
2166                actual_effects,
2167                checkpoint_metrics,
2168                fork_recovery,
2169            )
2170            .await?;
2171        }
2172
2173        Ok(())
2174    }
2175
2176    fn try_recover_checkpoint_fork(
2177        checkpoint_store: &CheckpointStore,
2178        recovery: &ForkRecoveryConfig,
2179    ) -> Result<()> {
2180        // If configured overrides include a checkpoint whose locally computed digest mismatches,
2181        // clear locally computed checkpoints from that sequence (inclusive).
2182        for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2183            let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2184                anyhow::bail!(
2185                    "Invalid checkpoint digest override for seq {}: {}",
2186                    seq,
2187                    expected_digest_str
2188                );
2189            };
2190
2191            if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2192                let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2193                if local_digest != expected_digest {
2194                    info!(
2195                        seq,
2196                        local = %Self::get_digest_prefix(local_digest),
2197                        expected = %Self::get_digest_prefix(expected_digest),
2198                        "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2199                        seq
2200                    );
2201                    checkpoint_store
2202                        .clear_locally_computed_checkpoints_from(*seq)
2203                        .context(
2204                            "Failed to clear locally computed checkpoints from override seq",
2205                        )?;
2206                }
2207            }
2208        }
2209
2210        if let Some((checkpoint_seq, checkpoint_digest)) =
2211            checkpoint_store.get_checkpoint_fork_detected()?
2212            && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2213        {
2214            info!(
2215                "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2216                checkpoint_seq, checkpoint_digest
2217            );
2218            checkpoint_store
2219                .clear_checkpoint_fork_detected()
2220                .expect("Failed to clear checkpoint fork detected marker");
2221        }
2222        Ok(())
2223    }
2224
2225    fn try_recover_transaction_fork(
2226        checkpoint_store: &CheckpointStore,
2227        recovery: &ForkRecoveryConfig,
2228    ) -> Result<()> {
2229        if recovery.transaction_overrides.is_empty() {
2230            return Ok(());
2231        }
2232
2233        if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2234            && recovery
2235                .transaction_overrides
2236                .contains_key(&tx_digest.to_string())
2237        {
2238            info!(
2239                "Fork recovery enabled: clearing transaction fork for tx {:?}",
2240                tx_digest
2241            );
2242            checkpoint_store
2243                .clear_transaction_fork_detected()
2244                .expect("Failed to clear transaction fork detected marker");
2245        }
2246        Ok(())
2247    }
2248
2249    fn get_current_timestamp() -> u64 {
2250        std::time::SystemTime::now()
2251            .duration_since(std::time::SystemTime::UNIX_EPOCH)
2252            .unwrap()
2253            .as_secs()
2254    }
2255
2256    async fn handle_checkpoint_fork(
2257        checkpoint_seq: u64,
2258        checkpoint_digest: CheckpointDigest,
2259        checkpoint_metrics: &CheckpointMetrics,
2260        fork_recovery: Option<&ForkRecoveryConfig>,
2261    ) -> Result<()> {
2262        checkpoint_metrics
2263            .checkpoint_fork_crash_mode
2264            .with_label_values(&[
2265                &checkpoint_seq.to_string(),
2266                &Self::get_digest_prefix(checkpoint_digest),
2267                &Self::get_current_timestamp().to_string(),
2268            ])
2269            .set(1);
2270
2271        let behavior = fork_recovery
2272            .map(|fr| fr.fork_crash_behavior)
2273            .unwrap_or_default();
2274
2275        match behavior {
2276            ForkCrashBehavior::AwaitForkRecovery => {
2277                error!(
2278                    checkpoint_seq = checkpoint_seq,
2279                    checkpoint_digest = ?checkpoint_digest,
2280                    "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2281                );
2282                futures::future::pending::<()>().await;
2283                unreachable!("pending() should never return");
2284            }
2285            ForkCrashBehavior::ReturnError => {
2286                error!(
2287                    checkpoint_seq = checkpoint_seq,
2288                    checkpoint_digest = ?checkpoint_digest,
2289                    "Checkpoint fork detected! Returning error."
2290                );
2291                Err(anyhow::anyhow!(
2292                    "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2293                    checkpoint_seq,
2294                    checkpoint_digest
2295                ))
2296            }
2297        }
2298    }
2299
2300    async fn handle_transaction_fork(
2301        tx_digest: TransactionDigest,
2302        expected_effects_digest: TransactionEffectsDigest,
2303        actual_effects_digest: TransactionEffectsDigest,
2304        checkpoint_metrics: &CheckpointMetrics,
2305        fork_recovery: Option<&ForkRecoveryConfig>,
2306    ) -> Result<()> {
2307        checkpoint_metrics
2308            .transaction_fork_crash_mode
2309            .with_label_values(&[
2310                &Self::get_digest_prefix(tx_digest),
2311                &Self::get_digest_prefix(expected_effects_digest),
2312                &Self::get_digest_prefix(actual_effects_digest),
2313                &Self::get_current_timestamp().to_string(),
2314            ])
2315            .set(1);
2316
2317        let behavior = fork_recovery
2318            .map(|fr| fr.fork_crash_behavior)
2319            .unwrap_or_default();
2320
2321        match behavior {
2322            ForkCrashBehavior::AwaitForkRecovery => {
2323                error!(
2324                    tx_digest = ?tx_digest,
2325                    expected_effects_digest = ?expected_effects_digest,
2326                    actual_effects_digest = ?actual_effects_digest,
2327                    "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2328                );
2329                futures::future::pending::<()>().await;
2330                unreachable!("pending() should never return");
2331            }
2332            ForkCrashBehavior::ReturnError => {
2333                error!(
2334                    tx_digest = ?tx_digest,
2335                    expected_effects_digest = ?expected_effects_digest,
2336                    actual_effects_digest = ?actual_effects_digest,
2337                    "Transaction fork detected! Returning error."
2338                );
2339                Err(anyhow::anyhow!(
2340                    "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2341                    tx_digest,
2342                    expected_effects_digest,
2343                    actual_effects_digest
2344                ))
2345            }
2346        }
2347    }
2348}
2349
2350#[cfg(not(msim))]
2351impl SuiNode {
2352    async fn fetch_jwks(
2353        _authority: AuthorityName,
2354        provider: &OIDCProvider,
2355    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2356        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2357        use sui_types::error::SuiErrorKind;
2358        let client = reqwest::Client::new();
2359        fetch_jwks(provider, &client, true)
2360            .await
2361            .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2362    }
2363}
2364
2365#[cfg(msim)]
2366impl SuiNode {
2367    pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2368        self.sim_state.sim_node.id()
2369    }
2370
2371    pub fn set_safe_mode_expected(&self, new_value: bool) {
2372        info!("Setting safe mode expected to {}", new_value);
2373        self.sim_state
2374            .sim_safe_mode_expected
2375            .store(new_value, Ordering::Relaxed);
2376    }
2377
2378    #[allow(unused_variables)]
2379    async fn fetch_jwks(
2380        authority: AuthorityName,
2381        provider: &OIDCProvider,
2382    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2383        get_jwk_injector()(authority, provider)
2384    }
2385}
2386
2387enum SpawnOnce {
2388    // Mutex is only needed to make SpawnOnce Send
2389    Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2390    #[allow(unused)]
2391    Started(JoinHandle<()>),
2392}
2393
2394impl SpawnOnce {
2395    pub fn new(
2396        ready_rx: oneshot::Receiver<()>,
2397        future: impl Future<Output = ()> + Send + 'static,
2398    ) -> Self {
2399        Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2400    }
2401
2402    pub async fn start(self) -> Self {
2403        match self {
2404            Self::Unstarted(ready_rx, future) => {
2405                let future = future.into_inner();
2406                let handle = tokio::spawn(future);
2407                ready_rx.await.unwrap();
2408                Self::Started(handle)
2409            }
2410            Self::Started(_) => self,
2411        }
2412    }
2413}
2414
2415/// Updates trusted peer addresses in the p2p network.
2416fn update_peer_addresses(
2417    config: &NodeConfig,
2418    endpoint_manager: &EndpointManager,
2419    epoch_start_state: &EpochStartSystemState,
2420) {
2421    for (peer_id, address) in
2422        epoch_start_state.get_validator_as_p2p_peers(config.protocol_public_key())
2423    {
2424        endpoint_manager.update_endpoint(EndpointId::P2p(peer_id), vec![address]);
2425    }
2426}
2427
2428fn build_kv_store(
2429    state: &Arc<AuthorityState>,
2430    config: &NodeConfig,
2431    registry: &Registry,
2432) -> Result<Arc<TransactionKeyValueStore>> {
2433    let metrics = KeyValueStoreMetrics::new(registry);
2434    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2435
2436    let base_url = &config.transaction_kv_store_read_config.base_url;
2437
2438    if base_url.is_empty() {
2439        info!("no http kv store url provided, using local db only");
2440        return Ok(Arc::new(db_store));
2441    }
2442
2443    let base_url: url::Url = base_url.parse().tap_err(|e| {
2444        error!(
2445            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2446            base_url, e
2447        )
2448    })?;
2449
2450    let network_str = match state.get_chain_identifier().chain() {
2451        Chain::Mainnet => "/mainnet",
2452        _ => {
2453            info!("using local db only for kv store");
2454            return Ok(Arc::new(db_store));
2455        }
2456    };
2457
2458    let base_url = base_url.join(network_str)?.to_string();
2459    let http_store = HttpKVStore::new_kv(
2460        &base_url,
2461        config.transaction_kv_store_read_config.cache_size,
2462        metrics.clone(),
2463    )?;
2464    info!("using local key-value store with fallback to http key-value store");
2465    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2466        db_store,
2467        http_store,
2468        metrics,
2469        "json_rpc_fallback",
2470    )))
2471}
2472
2473async fn build_http_servers(
2474    state: Arc<AuthorityState>,
2475    store: RocksDbStore,
2476    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2477    config: &NodeConfig,
2478    prometheus_registry: &Registry,
2479    server_version: ServerVersion,
2480) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2481    // Validators do not expose these APIs
2482    if config.consensus_config().is_some() {
2483        return Ok((HttpServers::default(), None));
2484    }
2485
2486    let mut router = axum::Router::new();
2487
2488    let json_rpc_router = {
2489        let traffic_controller = state.traffic_controller.clone();
2490        let mut server = JsonRpcServerBuilder::new(
2491            env!("CARGO_PKG_VERSION"),
2492            prometheus_registry,
2493            traffic_controller,
2494            config.policy_config.clone(),
2495        );
2496
2497        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2498
2499        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2500        server.register_module(ReadApi::new(
2501            state.clone(),
2502            kv_store.clone(),
2503            metrics.clone(),
2504        ))?;
2505        server.register_module(CoinReadApi::new(
2506            state.clone(),
2507            kv_store.clone(),
2508            metrics.clone(),
2509        ))?;
2510
2511        // if run_with_range is enabled we want to prevent any transactions
2512        // run_with_range = None is normal operating conditions
2513        if config.run_with_range.is_none() {
2514            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2515        }
2516        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2517        server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2518
2519        if let Some(transaction_orchestrator) = transaction_orchestrator {
2520            server.register_module(TransactionExecutionApi::new(
2521                state.clone(),
2522                transaction_orchestrator.clone(),
2523                metrics.clone(),
2524            ))?;
2525        }
2526
2527        let name_service_config =
2528            if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2529                config.name_service_package_address,
2530                config.name_service_registry_id,
2531                config.name_service_reverse_registry_id,
2532            ) {
2533                sui_name_service::NameServiceConfig::new(
2534                    package_address,
2535                    registry_id,
2536                    reverse_registry_id,
2537                )
2538            } else {
2539                match state.get_chain_identifier().chain() {
2540                    Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2541                    Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2542                    Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2543                }
2544            };
2545
2546        server.register_module(IndexerApi::new(
2547            state.clone(),
2548            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2549            kv_store,
2550            name_service_config,
2551            metrics,
2552            config.indexer_max_subscriptions,
2553        ))?;
2554        server.register_module(MoveUtils::new(state.clone()))?;
2555
2556        let server_type = config.jsonrpc_server_type();
2557
2558        server.to_router(server_type).await?
2559    };
2560
2561    router = router.merge(json_rpc_router);
2562
2563    let (subscription_service_checkpoint_sender, subscription_service_handle) =
2564        SubscriptionService::build(prometheus_registry);
2565    let rpc_router = {
2566        let mut rpc_service =
2567            sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2568        rpc_service.with_server_version(server_version);
2569
2570        if let Some(config) = config.rpc.clone() {
2571            rpc_service.with_config(config);
2572        }
2573
2574        rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2575        rpc_service.with_subscription_service(subscription_service_handle);
2576
2577        if let Some(transaction_orchestrator) = transaction_orchestrator {
2578            rpc_service.with_executor(transaction_orchestrator.clone())
2579        }
2580
2581        rpc_service.into_router().await
2582    };
2583
2584    let layers = ServiceBuilder::new()
2585        .map_request(|mut request: axum::http::Request<_>| {
2586            if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2587                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2588                request.extensions_mut().insert(axum_connect_info);
2589            }
2590            request
2591        })
2592        .layer(axum::middleware::from_fn(server_timing_middleware))
2593        // Setup a permissive CORS policy
2594        .layer(
2595            tower_http::cors::CorsLayer::new()
2596                .allow_methods([http::Method::GET, http::Method::POST])
2597                .allow_origin(tower_http::cors::Any)
2598                .allow_headers(tower_http::cors::Any),
2599        );
2600
2601    router = router.merge(rpc_router).layer(layers);
2602
2603    let https = if let Some((tls_config, https_address)) = config
2604        .rpc()
2605        .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2606    {
2607        let https = sui_http::Builder::new()
2608            .tls_single_cert(tls_config.cert(), tls_config.key())
2609            .and_then(|builder| builder.serve(https_address, router.clone()))
2610            .map_err(|e| anyhow::anyhow!(e))?;
2611
2612        info!(
2613            https_address =? https.local_addr(),
2614            "HTTPS rpc server listening on {}",
2615            https.local_addr()
2616        );
2617
2618        Some(https)
2619    } else {
2620        None
2621    };
2622
2623    let http = sui_http::Builder::new()
2624        .serve(&config.json_rpc_address, router)
2625        .map_err(|e| anyhow::anyhow!(e))?;
2626
2627    info!(
2628        http_address =? http.local_addr(),
2629        "HTTP rpc server listening on {}",
2630        http.local_addr()
2631    );
2632
2633    Ok((
2634        HttpServers {
2635            http: Some(http),
2636            https,
2637        },
2638        Some(subscription_service_checkpoint_sender),
2639    ))
2640}
2641
2642#[cfg(not(test))]
2643fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2644    protocol_config.max_transactions_per_checkpoint() as usize
2645}
2646
2647#[cfg(test)]
2648fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2649    2
2650}
2651
2652#[derive(Default)]
2653struct HttpServers {
2654    #[allow(unused)]
2655    http: Option<sui_http::ServerHandle>,
2656    #[allow(unused)]
2657    https: Option<sui_http::ServerHandle>,
2658}
2659
2660#[cfg(test)]
2661mod tests {
2662    use super::*;
2663    use prometheus::Registry;
2664    use std::collections::BTreeMap;
2665    use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2666    use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2667    use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2668
2669    #[tokio::test]
2670    async fn test_fork_error_and_recovery_both_paths() {
2671        let checkpoint_store = CheckpointStore::new_for_tests();
2672        let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2673
2674        // ---------- Checkpoint fork path ----------
2675        let seq_num = 42;
2676        let digest = CheckpointDigest::random();
2677        checkpoint_store
2678            .record_checkpoint_fork_detected(seq_num, digest)
2679            .unwrap();
2680
2681        let fork_recovery = ForkRecoveryConfig {
2682            transaction_overrides: Default::default(),
2683            checkpoint_overrides: Default::default(),
2684            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2685        };
2686
2687        let r = SuiNode::check_and_recover_forks(
2688            &checkpoint_store,
2689            &checkpoint_metrics,
2690            true,
2691            Some(&fork_recovery),
2692        )
2693        .await;
2694        assert!(r.is_err());
2695        assert!(
2696            r.unwrap_err()
2697                .to_string()
2698                .contains("Checkpoint fork detected")
2699        );
2700
2701        let mut checkpoint_overrides = BTreeMap::new();
2702        checkpoint_overrides.insert(seq_num, digest.to_string());
2703        let fork_recovery_with_override = ForkRecoveryConfig {
2704            transaction_overrides: Default::default(),
2705            checkpoint_overrides,
2706            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2707        };
2708        let r = SuiNode::check_and_recover_forks(
2709            &checkpoint_store,
2710            &checkpoint_metrics,
2711            true,
2712            Some(&fork_recovery_with_override),
2713        )
2714        .await;
2715        assert!(r.is_ok());
2716        assert!(
2717            checkpoint_store
2718                .get_checkpoint_fork_detected()
2719                .unwrap()
2720                .is_none()
2721        );
2722
2723        // ---------- Transaction fork path ----------
2724        let tx_digest = TransactionDigest::random();
2725        let expected_effects = TransactionEffectsDigest::random();
2726        let actual_effects = TransactionEffectsDigest::random();
2727        checkpoint_store
2728            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2729            .unwrap();
2730
2731        let fork_recovery = ForkRecoveryConfig {
2732            transaction_overrides: Default::default(),
2733            checkpoint_overrides: Default::default(),
2734            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2735        };
2736        let r = SuiNode::check_and_recover_forks(
2737            &checkpoint_store,
2738            &checkpoint_metrics,
2739            true,
2740            Some(&fork_recovery),
2741        )
2742        .await;
2743        assert!(r.is_err());
2744        assert!(
2745            r.unwrap_err()
2746                .to_string()
2747                .contains("Transaction fork detected")
2748        );
2749
2750        let mut transaction_overrides = BTreeMap::new();
2751        transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2752        let fork_recovery_with_override = ForkRecoveryConfig {
2753            transaction_overrides,
2754            checkpoint_overrides: Default::default(),
2755            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2756        };
2757        let r = SuiNode::check_and_recover_forks(
2758            &checkpoint_store,
2759            &checkpoint_metrics,
2760            true,
2761            Some(&fork_recovery_with_override),
2762        )
2763        .await;
2764        assert!(r.is_ok());
2765        assert!(
2766            checkpoint_store
2767                .get_transaction_fork_detected()
2768                .unwrap()
2769                .is_none()
2770        );
2771    }
2772}