sui_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::admission_queue::{
29    AdmissionQueueContext, AdmissionQueueManager, AdmissionQueueMetrics,
30};
31use sui_core::authority::ExecutionEnv;
32use sui_core::authority::RandomnessRoundReceiver;
33use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
34use sui_core::authority::backpressure::BackpressureManager;
35use sui_core::authority::epoch_start_configuration::EpochFlag;
36use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
37use sui_core::consensus_adapter::ConsensusClient;
38use sui_core::consensus_manager::UpdatableConsensusClient;
39use sui_core::epoch::randomness::RandomnessManager;
40use sui_core::execution_cache::build_execution_cache;
41use sui_network::endpoint_manager::{AddressSource, EndpointId};
42use sui_network::validator::server::SUI_TLS_SERVER_NAME;
43use sui_types::full_checkpoint_content::Checkpoint;
44use sui_types::node_role::NodeRole;
45
46use sui_core::global_state_hasher::GlobalStateHashMetrics;
47use sui_core::storage::RestReadStore;
48use sui_json_rpc::bridge_api::BridgeReadApi;
49use sui_json_rpc_api::JsonRpcMetrics;
50use sui_network::randomness;
51use sui_rpc_api::RpcMetrics;
52use sui_rpc_api::ServerVersion;
53use sui_rpc_api::subscription::SubscriptionService;
54use sui_types::base_types::ConciseableName;
55use sui_types::crypto::RandomnessRound;
56use sui_types::digests::{
57    ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
58};
59use sui_types::messages_consensus::AuthorityCapabilitiesV2;
60use sui_types::sui_system_state::SuiSystemState;
61use tap::tap::TapFallible;
62use tokio::sync::oneshot;
63use tokio::sync::{Mutex, broadcast, mpsc};
64use tokio::task::JoinHandle;
65use tower::ServiceBuilder;
66use tracing::{Instrument, error_span, info};
67use tracing::{debug, error, warn};
68
69// Logs at debug level in test configuration, info level otherwise.
70// JWK logs cause significant volume in tests, but are insignificant in prod,
71// so we keep them at info
72macro_rules! jwk_log {
73    ($($arg:tt)+) => {
74        if in_test_configuration() {
75            debug!($($arg)+);
76        } else {
77            info!($($arg)+);
78        }
79    };
80}
81
82use fastcrypto_zkp::bn254::zk_login::JWK;
83pub use handle::SuiNodeHandle;
84use mysten_metrics::{RegistryService, spawn_monitored_task};
85use mysten_service::server_timing::server_timing_middleware;
86use sui_config::node::{DBCheckpointConfig, RunWithRange};
87use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
88use sui_config::node_config_metrics::NodeConfigMetrics;
89use sui_config::{ConsensusConfig, NodeConfig};
90use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
91use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
92use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
93use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
94use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
95use sui_core::authority_aggregator::AuthorityAggregator;
96use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
97use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
98use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
99use sui_core::checkpoints::{
100    CheckpointMetrics, CheckpointOutput, CheckpointService, CheckpointStore, LogCheckpointOutput,
101    SendCheckpointToStateSync, SubmitCheckpointToConsensus,
102};
103use sui_core::consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics};
104use sui_core::consensus_manager::ConsensusManager;
105use sui_core::consensus_throughput_calculator::ConsensusThroughputCalculator;
106use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
107use sui_core::db_checkpoint_handler::DBCheckpointHandler;
108use sui_core::epoch::committee_store::CommitteeStore;
109use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
110use sui_core::epoch::epoch_metrics::EpochMetrics;
111use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
112use sui_core::global_state_hasher::GlobalStateHasher;
113use sui_core::jsonrpc_index::IndexStore;
114use sui_core::module_cache_metrics::ResolverMetrics;
115use sui_core::overload_monitor::overload_monitor;
116use sui_core::rpc_index::RpcIndexStore;
117use sui_core::signature_verifier::SignatureVerifierMetrics;
118use sui_core::storage::RocksDbStore;
119use sui_core::transaction_orchestrator::TransactionOrchestrator;
120use sui_core::{
121    authority::{AuthorityState, AuthorityStore},
122    authority_client::NetworkAuthorityClient,
123};
124use sui_json_rpc::JsonRpcServerBuilder;
125use sui_json_rpc::coin_api::CoinReadApi;
126use sui_json_rpc::governance_api::GovernanceReadApi;
127use sui_json_rpc::indexer_api::IndexerApi;
128use sui_json_rpc::move_utils::MoveUtils;
129use sui_json_rpc::read_api::ReadApi;
130use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
131use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
132use sui_macros::fail_point;
133use sui_macros::{fail_point_async, replay_log};
134use sui_network::api::ValidatorServer;
135use sui_network::discovery;
136use sui_network::endpoint_manager::EndpointManager;
137use sui_network::state_sync;
138use sui_network::validator::server::ServerBuilder;
139use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
140use sui_snapshot::uploader::StateSnapshotUploader;
141use sui_storage::{
142    http_key_value_store::HttpKVStore,
143    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
144    key_value_store_metrics::KeyValueStoreMetrics,
145};
146use sui_types::base_types::{AuthorityName, EpochId};
147use sui_types::committee::Committee;
148use sui_types::crypto::KeypairTraits;
149use sui_types::error::{SuiError, SuiResult};
150use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
151use sui_types::sui_system_state::SuiSystemStateTrait;
152use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
153use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
154use sui_types::supported_protocol_versions::SupportedProtocolVersions;
155use typed_store::DBMetrics;
156use typed_store::rocks::default_db_options;
157
158use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
159
160pub mod admin;
161mod handle;
162pub mod metrics;
163
164pub struct ValidatorComponents {
165    validator_server_handle: Option<SpawnOnce>,
166    validator_overload_monitor_handle: Option<JoinHandle<()>>,
167    consensus_manager: Arc<ConsensusManager>,
168    consensus_store_pruner: ConsensusStorePruner,
169    consensus_adapter: Arc<ConsensusAdapter>,
170    checkpoint_metrics: Arc<CheckpointMetrics>,
171    sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
172    admission_queue: Option<AdmissionQueueContext>,
173}
174pub struct P2pComponents {
175    p2p_network: Network,
176    known_peers: HashMap<PeerId, String>,
177    discovery_handle: discovery::Handle,
178    state_sync_handle: state_sync::Handle,
179    randomness_handle: randomness::Handle,
180    endpoint_manager: EndpointManager,
181}
182
183#[cfg(msim)]
184mod simulator {
185    use std::sync::atomic::AtomicBool;
186    use sui_types::error::SuiErrorKind;
187
188    use super::*;
189    pub(super) struct SimState {
190        pub sim_node: sui_simulator::runtime::NodeHandle,
191        pub sim_safe_mode_expected: AtomicBool,
192        _leak_detector: sui_simulator::NodeLeakDetector,
193    }
194
195    impl Default for SimState {
196        fn default() -> Self {
197            Self {
198                sim_node: sui_simulator::runtime::NodeHandle::current(),
199                sim_safe_mode_expected: AtomicBool::new(false),
200                _leak_detector: sui_simulator::NodeLeakDetector::new(),
201            }
202        }
203    }
204
205    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
206        + Send
207        + Sync
208        + 'static;
209
210    fn default_fetch_jwks(
211        _authority: AuthorityName,
212        _provider: &OIDCProvider,
213    ) -> SuiResult<Vec<(JwkId, JWK)>> {
214        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
215        // Just load a default Twitch jwk for testing.
216        parse_jwks(
217            sui_types::zk_login_util::DEFAULT_JWK_BYTES,
218            &OIDCProvider::Twitch,
219            true,
220        )
221        .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
222    }
223
224    thread_local! {
225        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
226    }
227
228    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
229        JWK_INJECTOR.with(|injector| injector.borrow().clone())
230    }
231
232    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
233        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
234    }
235}
236
237#[cfg(msim)]
238pub use simulator::set_jwk_injector;
239#[cfg(msim)]
240use simulator::*;
241use sui_core::authority::authority_store_pruner::PrunerWatermarks;
242use sui_core::{
243    consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
244};
245
246const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
247
248pub struct SuiNode {
249    config: NodeConfig,
250    validator_components: Mutex<Option<ValidatorComponents>>,
251
252    /// The http servers responsible for serving RPC traffic (gRPC and JSON-RPC)
253    #[allow(unused)]
254    http_servers: HttpServers,
255
256    state: Arc<AuthorityState>,
257    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
258    registry_service: RegistryService,
259    metrics: Arc<SuiNodeMetrics>,
260    checkpoint_metrics: Arc<CheckpointMetrics>,
261
262    _discovery: discovery::Handle,
263    _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
264    state_sync_handle: state_sync::Handle,
265    randomness_handle: randomness::Handle,
266    checkpoint_store: Arc<CheckpointStore>,
267    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
268
269    /// Broadcast channel to send the starting system state for the next epoch.
270    end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
271
272    /// EndpointManager for updating peer network addresses.
273    endpoint_manager: EndpointManager,
274
275    backpressure_manager: Arc<BackpressureManager>,
276
277    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
278
279    #[cfg(msim)]
280    sim_state: SimState,
281
282    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
283    // Channel to allow signaling upstream to shutdown sui-node
284    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
285
286    /// AuthorityAggregator of the network, created at start and beginning of each epoch.
287    /// Use ArcSwap so that we could mutate it without taking mut reference.
288    // TODO: Eventually we can make this auth aggregator a shared reference so that this
289    // update will automatically propagate to other uses.
290    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
291
292    subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
293}
294
295impl fmt::Debug for SuiNode {
296    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
297        f.debug_struct("SuiNode")
298            .field("name", &self.state.name.concise())
299            .finish()
300    }
301}
302
303static MAX_JWK_KEYS_PER_FETCH: usize = 100;
304
305impl SuiNode {
306    pub async fn start(
307        config: NodeConfig,
308        registry_service: RegistryService,
309    ) -> Result<Arc<SuiNode>> {
310        Self::start_async(
311            config,
312            registry_service,
313            ServerVersion::new("sui-node", "unknown"),
314        )
315        .await
316    }
317
318    fn start_jwk_updater(
319        config: &NodeConfig,
320        metrics: Arc<SuiNodeMetrics>,
321        authority: AuthorityName,
322        epoch_store: Arc<AuthorityPerEpochStore>,
323        consensus_adapter: Arc<ConsensusAdapter>,
324    ) {
325        let epoch = epoch_store.epoch();
326
327        let supported_providers = config
328            .zklogin_oauth_providers
329            .get(&epoch_store.get_chain_identifier().chain())
330            .unwrap_or(&BTreeSet::new())
331            .iter()
332            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
333            .collect::<Vec<_>>();
334
335        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
336
337        info!(
338            ?fetch_interval,
339            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
340        );
341
342        fn validate_jwk(
343            metrics: &Arc<SuiNodeMetrics>,
344            provider: &OIDCProvider,
345            id: &JwkId,
346            jwk: &JWK,
347        ) -> bool {
348            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
349                warn!(
350                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
351                    id.iss, provider
352                );
353                metrics
354                    .invalid_jwks
355                    .with_label_values(&[&provider.to_string()])
356                    .inc();
357                return false;
358            };
359
360            if iss_provider != *provider {
361                warn!(
362                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
363                    id.iss, provider, iss_provider
364                );
365                metrics
366                    .invalid_jwks
367                    .with_label_values(&[&provider.to_string()])
368                    .inc();
369                return false;
370            }
371
372            if !check_total_jwk_size(id, jwk) {
373                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
374                metrics
375                    .invalid_jwks
376                    .with_label_values(&[&provider.to_string()])
377                    .inc();
378                return false;
379            }
380
381            true
382        }
383
384        // metrics is:
385        //  pub struct SuiNodeMetrics {
386        //      pub jwk_requests: IntCounterVec,
387        //      pub jwk_request_errors: IntCounterVec,
388        //      pub total_jwks: IntCounterVec,
389        //      pub unique_jwks: IntCounterVec,
390        //  }
391
392        for p in supported_providers.into_iter() {
393            let provider_str = p.to_string();
394            let epoch_store = epoch_store.clone();
395            let consensus_adapter = consensus_adapter.clone();
396            let metrics = metrics.clone();
397            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
398                async move {
399                    // note: restart-safe de-duplication happens after consensus, this is
400                    // just best-effort to reduce unneeded submissions.
401                    let mut seen = HashSet::new();
402                    loop {
403                        jwk_log!("fetching JWK for provider {:?}", p);
404                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
405                        match Self::fetch_jwks(authority, &p).await {
406                            Err(e) => {
407                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
408                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
409                                // Retry in 30 seconds
410                                tokio::time::sleep(Duration::from_secs(30)).await;
411                                continue;
412                            }
413                            Ok(mut keys) => {
414                                metrics.total_jwks
415                                    .with_label_values(&[&provider_str])
416                                    .inc_by(keys.len() as u64);
417
418                                keys.retain(|(id, jwk)| {
419                                    validate_jwk(&metrics, &p, id, jwk) &&
420                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
421                                    seen.insert((id.clone(), jwk.clone()))
422                                });
423
424                                metrics.unique_jwks
425                                    .with_label_values(&[&provider_str])
426                                    .inc_by(keys.len() as u64);
427
428                                // prevent oauth providers from sending too many keys,
429                                // inadvertently or otherwise
430                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
431                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
432                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
433                                }
434
435                                for (id, jwk) in keys.into_iter() {
436                                    jwk_log!("Submitting JWK to consensus: {:?}", id);
437
438                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
439                                    consensus_adapter.submit(txn, None, &epoch_store, None, None)
440                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
441                                        .ok();
442                                }
443                            }
444                        }
445                        tokio::time::sleep(fetch_interval).await;
446                    }
447                }
448                .instrument(error_span!("jwk_updater_task", epoch)),
449            ));
450        }
451    }
452
453    pub async fn start_async(
454        config: NodeConfig,
455        registry_service: RegistryService,
456        server_version: ServerVersion,
457    ) -> Result<Arc<SuiNode>> {
458        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
459        let mut config = config.clone();
460        if config.supported_protocol_versions.is_none() {
461            info!(
462                "populating config.supported_protocol_versions with default {:?}",
463                SupportedProtocolVersions::SYSTEM_DEFAULT
464            );
465            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
466        }
467
468        let run_with_range = config.run_with_range;
469        let prometheus_registry = registry_service.default_registry();
470        let node_role = config.intended_node_role();
471
472        info!(node =? config.protocol_public_key(),
473            "Initializing sui-node listening on {} with role {:?}", config.network_address, node_role
474        );
475
476        // Initialize metrics to track db usage before creating any stores
477        DBMetrics::init(registry_service.clone());
478
479        // Initialize db sync-to-disk setting from config (falls back to env var if not set)
480        typed_store::init_write_sync(config.enable_db_sync_to_disk);
481
482        // Initialize Mysten metrics.
483        mysten_metrics::init_metrics(&prometheus_registry);
484        // Unsupported (because of the use of static variable) and unnecessary in simtests.
485        #[cfg(not(msim))]
486        mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
487
488        let genesis = config.genesis()?.clone();
489
490        let secret = Arc::pin(config.protocol_key_pair().copy());
491        let genesis_committee = genesis.committee();
492        let committee_store = Arc::new(CommitteeStore::new(
493            config.db_path().join("epochs"),
494            &genesis_committee,
495            None,
496        ));
497
498        let pruner_watermarks = Arc::new(PrunerWatermarks::default());
499        let checkpoint_store = CheckpointStore::new(
500            &config.db_path().join("checkpoints"),
501            pruner_watermarks.clone(),
502        );
503        let checkpoint_metrics = CheckpointMetrics::new(&registry_service.default_registry());
504
505        if node_role.runs_consensus() {
506            Self::check_and_recover_forks(
507                &checkpoint_store,
508                &checkpoint_metrics,
509                config.fork_recovery.as_ref(),
510            )
511            .await?;
512        }
513
514        // By default, only enable write stall on nodes that run consensus.
515        let enable_write_stall = config
516            .enable_db_write_stall
517            .unwrap_or(node_role.runs_consensus());
518        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
519            enable_write_stall,
520            is_validator: node_role.is_validator(),
521        };
522        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
523            &config.db_store_path(),
524            Some(perpetual_tables_options),
525            Some(pruner_watermarks.epoch_id.clone()),
526        ));
527        let is_genesis = perpetual_tables
528            .database_is_empty()
529            .expect("Database read should not fail at init.");
530
531        let backpressure_manager =
532            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
533
534        let store =
535            AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
536
537        let cur_epoch = store.get_recovery_epoch_at_restart()?;
538        let committee = committee_store
539            .get_committee(&cur_epoch)?
540            .expect("Committee of the current epoch must exist");
541        let epoch_start_configuration = store
542            .get_epoch_start_configuration()?
543            .expect("EpochStartConfiguration of the current epoch must exist");
544        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
545        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
546
547        let cache_traits = build_execution_cache(
548            &config.execution_cache,
549            &prometheus_registry,
550            &store,
551            backpressure_manager.clone(),
552        );
553
554        let auth_agg = {
555            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
556            Arc::new(ArcSwap::new(Arc::new(
557                AuthorityAggregator::new_from_epoch_start_state(
558                    epoch_start_configuration.epoch_start_state(),
559                    &committee_store,
560                    safe_client_metrics_base,
561                ),
562            )))
563        };
564
565        let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
566        let chain = match config.chain_override_for_testing {
567            Some(chain) => chain,
568            None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
569        };
570
571        let highest_executed_checkpoint = checkpoint_store
572            .get_highest_executed_checkpoint_seq_number()
573            .expect("checkpoint store read cannot fail")
574            .unwrap_or(0);
575
576        let previous_epoch_last_checkpoint = if cur_epoch == 0 {
577            0
578        } else {
579            checkpoint_store
580                .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
581                .expect("checkpoint store read cannot fail")
582                .unwrap_or(highest_executed_checkpoint)
583        };
584
585        let epoch_options = default_db_options().optimize_db_for_write_throughput(4, false);
586        let epoch_store = AuthorityPerEpochStore::new(
587            config.protocol_public_key(),
588            committee.clone(),
589            &config.db_store_path(),
590            Some(epoch_options.options),
591            EpochMetrics::new(&registry_service.default_registry()),
592            epoch_start_configuration,
593            cache_traits.backing_package_store.clone(),
594            cache_traits.object_store.clone(),
595            cache_metrics,
596            signature_verifier_metrics,
597            &config.expensive_safety_check_config,
598            (chain_id, chain),
599            highest_executed_checkpoint,
600            previous_epoch_last_checkpoint,
601            Arc::new(SubmittedTransactionCacheMetrics::new(
602                &registry_service.default_registry(),
603            )),
604            config.fullnode_sync_mode,
605        )?;
606
607        info!("created epoch store");
608
609        replay_log!(
610            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
611            epoch_store.epoch(),
612            epoch_store.protocol_config()
613        );
614
615        // the database is empty at genesis time
616        if is_genesis {
617            info!("checking SUI conservation at genesis");
618            // When we are opening the db table, the only time when it's safe to
619            // check SUI conservation is at genesis. Otherwise we may be in the middle of
620            // an epoch and the SUI conservation check will fail. This also initialize
621            // the expected_network_sui_amount table.
622            cache_traits
623                .reconfig_api
624                .expensive_check_sui_conservation(&epoch_store)
625                .expect("SUI conservation check cannot fail at genesis");
626        }
627
628        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
629        let default_buffer_stake = epoch_store
630            .protocol_config()
631            .buffer_stake_for_protocol_upgrade_bps();
632        if effective_buffer_stake != default_buffer_stake {
633            warn!(
634                ?effective_buffer_stake,
635                ?default_buffer_stake,
636                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
637            );
638        }
639
640        checkpoint_store.insert_genesis_checkpoint(
641            genesis.checkpoint(),
642            genesis.checkpoint_contents().clone(),
643            &epoch_store,
644        );
645
646        info!("creating state sync store");
647        let state_sync_store = RocksDbStore::new(
648            cache_traits.clone(),
649            committee_store.clone(),
650            checkpoint_store.clone(),
651        );
652
653        let index_store =
654            if node_role.should_enable_index_processing() && config.enable_index_processing {
655                info!("creating jsonrpc index store");
656                Some(Arc::new(IndexStore::new(
657                    config.db_path().join("indexes"),
658                    &prometheus_registry,
659                    epoch_store
660                        .protocol_config()
661                        .max_move_identifier_len_as_option(),
662                    config.remove_deprecated_tables,
663                )))
664            } else {
665                None
666            };
667
668        let rpc_index = if node_role.should_enable_index_processing()
669            && config.rpc().is_some_and(|rpc| rpc.enable_indexing())
670        {
671            info!("creating rpc index store");
672            Some(Arc::new(
673                RpcIndexStore::new(
674                    &config.db_path(),
675                    &store,
676                    &checkpoint_store,
677                    &epoch_store,
678                    &cache_traits.backing_package_store,
679                    pruner_watermarks.checkpoint_id.clone(),
680                    config.rpc().cloned().unwrap_or_default(),
681                )
682                .await,
683            ))
684        } else {
685            None
686        };
687
688        let chain_identifier = epoch_store.get_chain_identifier();
689
690        info!("creating archive reader");
691        // Create network
692        let (randomness_tx, randomness_rx) = mpsc::channel(
693            config
694                .p2p_config
695                .randomness
696                .clone()
697                .unwrap_or_default()
698                .mailbox_capacity(),
699        );
700        let P2pComponents {
701            p2p_network,
702            known_peers,
703            discovery_handle,
704            state_sync_handle,
705            randomness_handle,
706            endpoint_manager,
707        } = Self::create_p2p_network(
708            &config,
709            state_sync_store.clone(),
710            chain_identifier,
711            randomness_tx,
712            &prometheus_registry,
713        )?;
714
715        // Inject configured peer address overrides.
716        for peer in &config.p2p_config.peer_address_overrides {
717            endpoint_manager
718                .update_endpoint(
719                    EndpointId::P2p(peer.peer_id),
720                    AddressSource::Config,
721                    peer.addresses.clone(),
722                )
723                .expect("Updating peer address overrides should not fail");
724        }
725
726        // Send initial peer addresses to the p2p network.
727        update_peer_addresses(
728            &config,
729            &endpoint_manager,
730            epoch_store.epoch_start_state(),
731            None,
732        );
733
734        info!("start snapshot upload");
735        // Start uploading state snapshot to remote store
736        let state_snapshot_handle = Self::start_state_snapshot(
737            &config,
738            &prometheus_registry,
739            checkpoint_store.clone(),
740            chain_identifier,
741        )?;
742
743        // Start uploading db checkpoints to remote store
744        info!("start db checkpoint");
745        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
746            &config,
747            &prometheus_registry,
748            state_snapshot_handle.is_some(),
749        )?;
750
751        if !epoch_store
752            .protocol_config()
753            .simplified_unwrap_then_delete()
754        {
755            // We cannot prune tombstones if simplified_unwrap_then_delete is not enabled.
756            config
757                .authority_store_pruning_config
758                .set_killswitch_tombstone_pruning(true);
759        }
760
761        let authority_name = config.protocol_public_key();
762
763        info!("create authority state");
764        let state = AuthorityState::new(
765            authority_name,
766            secret,
767            config.supported_protocol_versions.unwrap(),
768            store.clone(),
769            cache_traits.clone(),
770            epoch_store.clone(),
771            committee_store.clone(),
772            index_store.clone(),
773            rpc_index,
774            checkpoint_store.clone(),
775            &prometheus_registry,
776            genesis.objects(),
777            &db_checkpoint_config,
778            config.clone(),
779            chain_identifier,
780            config.policy_config.clone(),
781            config.firewall_config.clone(),
782            pruner_watermarks,
783        )
784        .await;
785        // ensure genesis txn was executed
786        if epoch_store.epoch() == 0 {
787            let txn = &genesis.transaction();
788            let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
789            let transaction =
790                sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
791                    sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
792                        genesis.transaction().data().clone(),
793                        sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
794                    ),
795                );
796            state
797                .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
798                .instrument(span)
799                .await
800                .unwrap();
801        }
802
803        // Start the loop that receives new randomness and generates transactions for it.
804        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
805
806        let (end_of_epoch_channel, end_of_epoch_receiver) =
807            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
808
809        let transaction_orchestrator = if node_role.is_fullnode() && run_with_range.is_none() {
810            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
811                auth_agg.load_full(),
812                state.clone(),
813                end_of_epoch_receiver,
814                &config.db_path(),
815                &prometheus_registry,
816                &config,
817            )))
818        } else {
819            None
820        };
821
822        let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
823            state.clone(),
824            state_sync_store,
825            &transaction_orchestrator.clone(),
826            &config,
827            &prometheus_registry,
828            server_version,
829            node_role,
830        )
831        .await?;
832
833        let global_state_hasher = Arc::new(GlobalStateHasher::new(
834            cache_traits.global_state_hash_store.clone(),
835            GlobalStateHashMetrics::new(&prometheus_registry),
836        ));
837
838        let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
839            "sui",
840            &registry_service.default_registry(),
841        );
842
843        let connection_monitor_handle =
844            mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
845                p2p_network.downgrade(),
846                Arc::new(network_connection_metrics),
847                known_peers,
848            );
849
850        let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));
851
852        sui_node_metrics
853            .binary_max_protocol_version
854            .set(ProtocolVersion::MAX.as_u64() as i64);
855        sui_node_metrics
856            .configured_max_protocol_version
857            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
858
859        let node_role = epoch_store.node_role();
860        let validator_components = if node_role.runs_consensus() {
861            let mut components = Self::construct_validator_components(
862                config.clone(),
863                state.clone(),
864                committee,
865                epoch_store.clone(),
866                checkpoint_store.clone(),
867                state_sync_handle.clone(),
868                randomness_handle.clone(),
869                Arc::downgrade(&global_state_hasher),
870                backpressure_manager.clone(),
871                &registry_service,
872                sui_node_metrics.clone(),
873                checkpoint_metrics.clone(),
874                node_role,
875            )
876            .await?;
877
878            if node_role.is_validator() {
879                components
880                    .consensus_adapter
881                    .recover_end_of_publish(&epoch_store);
882
883                // Start the gRPC server
884                components.validator_server_handle = Some(
885                    components
886                        .validator_server_handle
887                        .take()
888                        .unwrap()
889                        .start()
890                        .await,
891                );
892
893                // Set the consensus address updater so that we can update the consensus peer addresses when requested.
894                endpoint_manager
895                    .set_consensus_address_updater(components.consensus_manager.clone());
896            } else {
897                info!("Starting node as Observer — connecting to configured peers");
898            }
899
900            Some(components)
901        } else {
902            None
903        };
904
905        // setup shutdown channel
906        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
907
908        let node = Self {
909            config,
910            validator_components: Mutex::new(validator_components),
911            http_servers,
912            state,
913            transaction_orchestrator,
914            registry_service,
915            metrics: sui_node_metrics,
916            checkpoint_metrics,
917
918            _discovery: discovery_handle,
919            _connection_monitor_handle: connection_monitor_handle,
920            state_sync_handle,
921            randomness_handle,
922            checkpoint_store,
923            global_state_hasher: Mutex::new(Some(global_state_hasher)),
924            end_of_epoch_channel,
925            endpoint_manager,
926            backpressure_manager,
927
928            _db_checkpoint_handle: db_checkpoint_handle,
929
930            #[cfg(msim)]
931            sim_state: Default::default(),
932
933            _state_snapshot_uploader_handle: state_snapshot_handle,
934            shutdown_channel_tx: shutdown_channel,
935
936            auth_agg,
937            subscription_service_checkpoint_sender,
938        };
939
940        info!("SuiNode started!");
941        let node = Arc::new(node);
942        let node_copy = node.clone();
943        spawn_monitored_task!(async move {
944            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
945            if let Err(error) = result {
946                warn!("Reconfiguration finished with error {:?}", error);
947            }
948        });
949
950        Ok(node)
951    }
952
953    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
954        self.end_of_epoch_channel.subscribe()
955    }
956
957    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
958        self.shutdown_channel_tx.subscribe()
959    }
960
961    pub fn current_epoch_for_testing(&self) -> EpochId {
962        self.state.current_epoch_for_testing()
963    }
964
965    pub fn db_checkpoint_path(&self) -> PathBuf {
966        self.config.db_checkpoint_path()
967    }
968
969    // Init reconfig process by starting to reject user certs
970    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
971        info!("close_epoch (current epoch = {})", epoch_store.epoch());
972        self.validator_components
973            .lock()
974            .await
975            .as_ref()
976            .ok_or_else(|| SuiError::from("Node is not a validator"))?
977            .consensus_adapter
978            .close_epoch(epoch_store);
979        Ok(())
980    }
981
982    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
983        self.state
984            .clear_override_protocol_upgrade_buffer_stake(epoch)
985    }
986
987    pub fn set_override_protocol_upgrade_buffer_stake(
988        &self,
989        epoch: EpochId,
990        buffer_stake_bps: u64,
991    ) -> SuiResult {
992        self.state
993            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
994    }
995
996    // Testing-only API to start epoch close process.
997    // For production code, please use the non-testing version.
998    pub async fn close_epoch_for_testing(&self) -> SuiResult {
999        let epoch_store = self.state.epoch_store_for_testing();
1000        self.close_epoch(&epoch_store).await
1001    }
1002
1003    fn start_state_snapshot(
1004        config: &NodeConfig,
1005        prometheus_registry: &Registry,
1006        checkpoint_store: Arc<CheckpointStore>,
1007        chain_identifier: ChainIdentifier,
1008    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1009        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1010            let snapshot_uploader = StateSnapshotUploader::new(
1011                &config.db_checkpoint_path(),
1012                &config.snapshot_path(),
1013                remote_store_config.clone(),
1014                60,
1015                prometheus_registry,
1016                checkpoint_store,
1017                chain_identifier,
1018                config.state_snapshot_write_config.archive_interval_epochs,
1019            )?;
1020            Ok(Some(snapshot_uploader.start()))
1021        } else {
1022            Ok(None)
1023        }
1024    }
1025
1026    fn start_db_checkpoint(
1027        config: &NodeConfig,
1028        prometheus_registry: &Registry,
1029        state_snapshot_enabled: bool,
1030    ) -> Result<(
1031        DBCheckpointConfig,
1032        Option<tokio::sync::broadcast::Sender<()>>,
1033    )> {
1034        let checkpoint_path = Some(
1035            config
1036                .db_checkpoint_config
1037                .checkpoint_path
1038                .clone()
1039                .unwrap_or_else(|| config.db_checkpoint_path()),
1040        );
1041        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1042            DBCheckpointConfig {
1043                checkpoint_path,
1044                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1045                    true
1046                } else {
1047                    config
1048                        .db_checkpoint_config
1049                        .perform_db_checkpoints_at_epoch_end
1050                },
1051                ..config.db_checkpoint_config.clone()
1052            }
1053        } else {
1054            config.db_checkpoint_config.clone()
1055        };
1056
1057        match (
1058            db_checkpoint_config.object_store_config.as_ref(),
1059            state_snapshot_enabled,
1060        ) {
1061            // If db checkpoint config object store not specified but
1062            // state snapshot object store is specified, create handler
1063            // anyway for marking db checkpoints as completed so that they
1064            // can be uploaded as state snapshots.
1065            (None, false) => Ok((db_checkpoint_config, None)),
1066            (_, _) => {
1067                let handler = DBCheckpointHandler::new(
1068                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1069                    db_checkpoint_config.object_store_config.as_ref(),
1070                    60,
1071                    db_checkpoint_config
1072                        .prune_and_compact_before_upload
1073                        .unwrap_or(true),
1074                    config.authority_store_pruning_config.clone(),
1075                    prometheus_registry,
1076                    state_snapshot_enabled,
1077                )?;
1078                Ok((
1079                    db_checkpoint_config,
1080                    Some(DBCheckpointHandler::start(handler)),
1081                ))
1082            }
1083        }
1084    }
1085
1086    fn create_p2p_network(
1087        config: &NodeConfig,
1088        state_sync_store: RocksDbStore,
1089        chain_identifier: ChainIdentifier,
1090        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1091        prometheus_registry: &Registry,
1092    ) -> Result<P2pComponents> {
1093        let mut p2p_config = config.p2p_config.clone();
1094        {
1095            let disc = p2p_config.discovery.get_or_insert_with(Default::default);
1096            if disc.peer_addr_store_path.is_none() {
1097                disc.peer_addr_store_path =
1098                    Some(config.db_path().join("discovery_peer_cache.yaml"));
1099            }
1100        }
1101        let mut discovery_builder = discovery::Builder::new().config(p2p_config.clone());
1102        if let Some(consensus_config) = &config.consensus_config {
1103            let effective_addr = consensus_config
1104                .external_address
1105                .as_ref()
1106                .or(consensus_config.listen_address.as_ref());
1107            if let Some(addr) = effective_addr {
1108                discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1109            }
1110        }
1111        let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1112        let discovery_sender = discovery.sender();
1113
1114        let (state_sync, state_sync_router) = state_sync::Builder::new()
1115            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1116            .store(state_sync_store)
1117            .archive_config(config.archive_reader_config())
1118            .discovery_sender(discovery_sender)
1119            .with_metrics(prometheus_registry)
1120            .build();
1121
1122        let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1123        let known_peers: HashMap<PeerId, String> = discovery_config
1124            .allowlisted_peers
1125            .clone()
1126            .into_iter()
1127            .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1128            .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1129                peer.peer_id
1130                    .map(|peer_id| (peer_id, "seed_peer".to_string()))
1131            }))
1132            .collect();
1133
1134        let (randomness, randomness_router) =
1135            randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1136                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1137                .with_metrics(prometheus_registry)
1138                .build();
1139
1140        let p2p_network = {
1141            let routes = anemo::Router::new()
1142                .add_rpc_service(discovery_server)
1143                .merge(state_sync_router);
1144            let routes = routes.merge(randomness_router);
1145
1146            let inbound_network_metrics =
1147                mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1148            let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1149                "sui",
1150                "outbound",
1151                prometheus_registry,
1152            );
1153
1154            let service = ServiceBuilder::new()
1155                .layer(
1156                    TraceLayer::new_for_server_errors()
1157                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1158                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1159                )
1160                .layer(CallbackLayer::new(
1161                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1162                        Arc::new(inbound_network_metrics),
1163                        config.p2p_config.excessive_message_size(),
1164                    ),
1165                ))
1166                .service(routes);
1167
1168            let outbound_layer = ServiceBuilder::new()
1169                .layer(
1170                    TraceLayer::new_for_client_and_server_errors()
1171                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1172                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1173                )
1174                .layer(CallbackLayer::new(
1175                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1176                        Arc::new(outbound_network_metrics),
1177                        config.p2p_config.excessive_message_size(),
1178                    ),
1179                ))
1180                .into_inner();
1181
1182            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1183            // Inbound requests on this network are small (signatures, queries, summaries).
1184            // Cap request frames at 1 MiB.
1185            anemo_config.max_request_frame_size = Some(1 << 20);
1186            // Responses can be larger (checkpoint contents).
1187            // Cap response frames at 128 MiB.
1188            anemo_config.max_response_frame_size = Some(128 << 20);
1189
1190            // Set a higher default value for socket send/receive buffers if not already
1191            // configured.
1192            let mut quic_config = anemo_config.quic.unwrap_or_default();
1193            if quic_config.socket_send_buffer_size.is_none() {
1194                quic_config.socket_send_buffer_size = Some(20 << 20);
1195            }
1196            if quic_config.socket_receive_buffer_size.is_none() {
1197                quic_config.socket_receive_buffer_size = Some(20 << 20);
1198            }
1199            quic_config.allow_failed_socket_buffer_size_setting = true;
1200
1201            // Set high-performance defaults for quinn transport.
1202            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1203            if quic_config.max_concurrent_bidi_streams.is_none() {
1204                quic_config.max_concurrent_bidi_streams = Some(500);
1205            }
1206            if quic_config.max_concurrent_uni_streams.is_none() {
1207                quic_config.max_concurrent_uni_streams = Some(500);
1208            }
1209            if quic_config.stream_receive_window.is_none() {
1210                quic_config.stream_receive_window = Some(100 << 20);
1211            }
1212            if quic_config.receive_window.is_none() {
1213                quic_config.receive_window = Some(200 << 20);
1214            }
1215            if quic_config.send_window.is_none() {
1216                quic_config.send_window = Some(200 << 20);
1217            }
1218            if quic_config.crypto_buffer_size.is_none() {
1219                quic_config.crypto_buffer_size = Some(1 << 20);
1220            }
1221            if quic_config.max_idle_timeout_ms.is_none() {
1222                quic_config.max_idle_timeout_ms = Some(10_000);
1223            }
1224            if quic_config.keep_alive_interval_ms.is_none() {
1225                quic_config.keep_alive_interval_ms = Some(5_000);
1226            }
1227            anemo_config.quic = Some(quic_config);
1228
1229            let server_name = format!("sui-{}", chain_identifier);
1230            let network = Network::bind(config.p2p_config.listen_address)
1231                .server_name(&server_name)
1232                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1233                .config(anemo_config)
1234                .outbound_request_layer(outbound_layer)
1235                .start(service)?;
1236            info!(
1237                server_name = server_name,
1238                "P2p network started on {}",
1239                network.local_addr()
1240            );
1241
1242            network
1243        };
1244
1245        let discovery_handle =
1246            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1247        let state_sync_handle = state_sync.start(p2p_network.clone());
1248        let randomness_handle = randomness.start(p2p_network.clone());
1249
1250        Ok(P2pComponents {
1251            p2p_network,
1252            known_peers,
1253            discovery_handle,
1254            state_sync_handle,
1255            randomness_handle,
1256            endpoint_manager,
1257        })
1258    }
1259
1260    async fn construct_validator_components(
1261        config: NodeConfig,
1262        state: Arc<AuthorityState>,
1263        committee: Arc<Committee>,
1264        epoch_store: Arc<AuthorityPerEpochStore>,
1265        checkpoint_store: Arc<CheckpointStore>,
1266        state_sync_handle: state_sync::Handle,
1267        randomness_handle: randomness::Handle,
1268        global_state_hasher: Weak<GlobalStateHasher>,
1269        backpressure_manager: Arc<BackpressureManager>,
1270        registry_service: &RegistryService,
1271        sui_node_metrics: Arc<SuiNodeMetrics>,
1272        checkpoint_metrics: Arc<CheckpointMetrics>,
1273        node_role: NodeRole,
1274    ) -> Result<ValidatorComponents> {
1275        let mut config_clone = config.clone();
1276        let consensus_config = config_clone
1277            .consensus_config
1278            .as_mut()
1279            .ok_or_else(|| anyhow!("Node is missing consensus config"))?;
1280
1281        let client = Arc::new(UpdatableConsensusClient::new());
1282        let inflight_slot_freed_notify = Arc::new(tokio::sync::Notify::new());
1283        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1284            &committee,
1285            consensus_config,
1286            state.name,
1287            &registry_service.default_registry(),
1288            client.clone(),
1289            checkpoint_store.clone(),
1290            inflight_slot_freed_notify.clone(),
1291        ));
1292
1293        let consensus_manager = Arc::new(ConsensusManager::new(
1294            &config,
1295            consensus_config,
1296            registry_service,
1297            client,
1298            node_role,
1299        ));
1300
1301        // This only gets started up once, not on every epoch. (Make call to remove every epoch.)
1302        let consensus_store_pruner = ConsensusStorePruner::new(
1303            consensus_manager.get_storage_base_path(),
1304            consensus_config.db_retention_epochs(),
1305            consensus_config.db_pruner_period(),
1306            &registry_service.default_registry(),
1307        );
1308
1309        let sui_tx_validator_metrics =
1310            SuiTxValidatorMetrics::new(&registry_service.default_registry());
1311
1312        let (validator_server_handle, admission_queue) = if node_role.is_validator() {
1313            let (handle, queue) = Self::start_grpc_validator_service(
1314                &config,
1315                state.clone(),
1316                consensus_adapter.clone(),
1317                epoch_store.clone(),
1318                &registry_service.default_registry(),
1319                inflight_slot_freed_notify,
1320            )
1321            .await?;
1322            (Some(handle), queue)
1323        } else {
1324            (None, None)
1325        };
1326
1327        // Starts an overload monitor that monitors the execution of the authority.
1328        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1329        let validator_overload_monitor_handle = if node_role.is_validator()
1330            && config
1331                .authority_overload_config
1332                .max_load_shedding_percentage
1333                > 0
1334        {
1335            let authority_state = Arc::downgrade(&state);
1336            let overload_config = config.authority_overload_config.clone();
1337            fail_point!("starting_overload_monitor");
1338            Some(spawn_monitored_task!(overload_monitor(
1339                authority_state,
1340                overload_config,
1341            )))
1342        } else {
1343            None
1344        };
1345
1346        Self::start_epoch_specific_validator_components(
1347            &config,
1348            state.clone(),
1349            consensus_adapter,
1350            checkpoint_store,
1351            epoch_store,
1352            state_sync_handle,
1353            randomness_handle,
1354            consensus_manager,
1355            consensus_store_pruner,
1356            global_state_hasher,
1357            backpressure_manager,
1358            validator_server_handle,
1359            validator_overload_monitor_handle,
1360            checkpoint_metrics,
1361            sui_node_metrics,
1362            sui_tx_validator_metrics,
1363            admission_queue,
1364            node_role,
1365        )
1366        .await
1367    }
1368
1369    async fn start_epoch_specific_validator_components(
1370        config: &NodeConfig,
1371        state: Arc<AuthorityState>,
1372        consensus_adapter: Arc<ConsensusAdapter>,
1373        checkpoint_store: Arc<CheckpointStore>,
1374        epoch_store: Arc<AuthorityPerEpochStore>,
1375        state_sync_handle: state_sync::Handle,
1376        randomness_handle: randomness::Handle,
1377        consensus_manager: Arc<ConsensusManager>,
1378        consensus_store_pruner: ConsensusStorePruner,
1379        state_hasher: Weak<GlobalStateHasher>,
1380        backpressure_manager: Arc<BackpressureManager>,
1381        validator_server_handle: Option<SpawnOnce>,
1382        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1383        checkpoint_metrics: Arc<CheckpointMetrics>,
1384        sui_node_metrics: Arc<SuiNodeMetrics>,
1385        sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1386        admission_queue: Option<AdmissionQueueContext>,
1387        node_role: NodeRole,
1388    ) -> Result<ValidatorComponents> {
1389        let checkpoint_service = Self::build_checkpoint_service(
1390            config,
1391            consensus_adapter.clone(),
1392            checkpoint_store.clone(),
1393            epoch_store.clone(),
1394            state.clone(),
1395            state_sync_handle,
1396            state_hasher,
1397            checkpoint_metrics.clone(),
1398            node_role,
1399        );
1400
1401        if node_role.runs_consensus() && epoch_store.randomness_state_enabled() {
1402            let randomness_manager = RandomnessManager::try_new(
1403                Arc::downgrade(&epoch_store),
1404                Box::new(consensus_adapter.clone()),
1405                randomness_handle,
1406                config.protocol_key_pair(),
1407            )
1408            .await;
1409            if let Some(randomness_manager) = randomness_manager {
1410                epoch_store
1411                    .set_randomness_manager(randomness_manager)
1412                    .await?;
1413            }
1414        }
1415
1416        if node_role.is_validator() {
1417            ExecutionTimeObserver::spawn(
1418                epoch_store.clone(),
1419                Box::new(consensus_adapter.clone()),
1420                config
1421                    .execution_time_observer_config
1422                    .clone()
1423                    .unwrap_or_default(),
1424            );
1425        }
1426
1427        let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1428            None,
1429            state.metrics.clone(),
1430        ));
1431
1432        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1433            state.clone(),
1434            checkpoint_service.clone(),
1435            epoch_store.clone(),
1436            consensus_adapter.clone(),
1437            throughput_calculator,
1438            backpressure_manager,
1439            config.congestion_log.clone(),
1440        );
1441
1442        info!("Starting consensus manager asynchronously");
1443
1444        // Spawn consensus startup asynchronously to avoid blocking other components
1445        tokio::spawn({
1446            let config = config.clone();
1447            let epoch_store = epoch_store.clone();
1448            let sui_tx_validator = SuiTxValidator::new(
1449                state.clone(),
1450                epoch_store.clone(),
1451                checkpoint_service.clone(),
1452                sui_tx_validator_metrics.clone(),
1453            );
1454            let consensus_manager = consensus_manager.clone();
1455            async move {
1456                consensus_manager
1457                    .start(
1458                        &config,
1459                        epoch_store,
1460                        consensus_handler_initializer,
1461                        sui_tx_validator,
1462                    )
1463                    .await;
1464            }
1465        });
1466        let replay_waiter = consensus_manager.replay_waiter();
1467
1468        info!("Spawning checkpoint service");
1469        let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1470            None
1471        } else {
1472            Some(replay_waiter)
1473        };
1474        checkpoint_service
1475            .spawn(epoch_store.clone(), replay_waiter)
1476            .await;
1477
1478        if node_role.is_validator() && epoch_store.authenticator_state_enabled() {
1479            Self::start_jwk_updater(
1480                config,
1481                sui_node_metrics,
1482                state.name,
1483                epoch_store.clone(),
1484                consensus_adapter.clone(),
1485            );
1486        }
1487
1488        if let Some(ctx) = &admission_queue {
1489            ctx.rotate_for_epoch(epoch_store);
1490        }
1491
1492        Ok(ValidatorComponents {
1493            validator_server_handle,
1494            validator_overload_monitor_handle,
1495            consensus_manager,
1496            consensus_store_pruner,
1497            consensus_adapter,
1498            checkpoint_metrics,
1499            sui_tx_validator_metrics,
1500            admission_queue,
1501        })
1502    }
1503
1504    fn build_checkpoint_service(
1505        config: &NodeConfig,
1506        consensus_adapter: Arc<ConsensusAdapter>,
1507        checkpoint_store: Arc<CheckpointStore>,
1508        epoch_store: Arc<AuthorityPerEpochStore>,
1509        state: Arc<AuthorityState>,
1510        state_sync_handle: state_sync::Handle,
1511        state_hasher: Weak<GlobalStateHasher>,
1512        checkpoint_metrics: Arc<CheckpointMetrics>,
1513        node_role: NodeRole,
1514    ) -> Arc<CheckpointService> {
1515        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1516        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1517
1518        debug!(
1519            "Starting checkpoint service with epoch start timestamp {}
1520            and epoch duration {}",
1521            epoch_start_timestamp_ms, epoch_duration_ms
1522        );
1523
1524        let checkpoint_output: Box<dyn CheckpointOutput> = if node_role.is_validator() {
1525            Box::new(SubmitCheckpointToConsensus {
1526                sender: consensus_adapter,
1527                signer: state.secret.clone(),
1528                authority: config.protocol_public_key(),
1529                next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1530                    .checked_add(epoch_duration_ms)
1531                    .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1532                metrics: checkpoint_metrics.clone(),
1533            })
1534        } else {
1535            LogCheckpointOutput::boxed()
1536        };
1537
1538        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1539        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1540        let max_checkpoint_size_bytes =
1541            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1542
1543        CheckpointService::build(
1544            state.clone(),
1545            checkpoint_store,
1546            epoch_store,
1547            state.get_transaction_cache_reader().clone(),
1548            state_hasher,
1549            checkpoint_output,
1550            Box::new(certified_checkpoint_output),
1551            checkpoint_metrics,
1552            max_tx_per_checkpoint,
1553            max_checkpoint_size_bytes,
1554        )
1555    }
1556
1557    fn construct_consensus_adapter(
1558        committee: &Committee,
1559        consensus_config: &ConsensusConfig,
1560        authority: AuthorityName,
1561        prometheus_registry: &Registry,
1562        consensus_client: Arc<dyn ConsensusClient>,
1563        checkpoint_store: Arc<CheckpointStore>,
1564        inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1565    ) -> ConsensusAdapter {
1566        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1567        // The consensus adapter allows the authority to send user certificates through consensus.
1568
1569        ConsensusAdapter::new(
1570            consensus_client,
1571            checkpoint_store,
1572            authority,
1573            consensus_config.max_pending_transactions(),
1574            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1575            ca_metrics,
1576            inflight_slot_freed_notify,
1577        )
1578    }
1579
1580    async fn start_grpc_validator_service(
1581        config: &NodeConfig,
1582        state: Arc<AuthorityState>,
1583        consensus_adapter: Arc<ConsensusAdapter>,
1584        epoch_store: Arc<AuthorityPerEpochStore>,
1585        prometheus_registry: &Registry,
1586        inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1587    ) -> Result<(SpawnOnce, Option<AdmissionQueueContext>)> {
1588        let overload_config = &config.authority_overload_config;
1589        let admission_queue = overload_config.admission_queue_enabled.then(|| {
1590            let manager = Arc::new(AdmissionQueueManager::new(
1591                consensus_adapter.clone(),
1592                Arc::new(AdmissionQueueMetrics::new(prometheus_registry)),
1593                overload_config.admission_queue_capacity_fraction,
1594                overload_config.admission_queue_bypass_fraction,
1595                overload_config.admission_queue_failover_timeout,
1596                inflight_slot_freed_notify,
1597            ));
1598            AdmissionQueueContext::spawn(manager, epoch_store)
1599        });
1600        let validator_service = ValidatorService::new(
1601            state.clone(),
1602            consensus_adapter,
1603            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1604            config.policy_config.clone().map(|p| p.client_id_source),
1605            admission_queue.clone(),
1606        );
1607
1608        let mut server_conf = mysten_network::config::Config::new();
1609        server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1610        server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1611        server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1612        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1613        server_conf.load_shed = config.grpc_load_shed;
1614        let mut server_builder =
1615            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1616
1617        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1618
1619        let tls_config = sui_tls::create_rustls_server_config(
1620            config.network_key_pair().copy().private(),
1621            SUI_TLS_SERVER_NAME.to_string(),
1622        );
1623
1624        let network_address = config.network_address().clone();
1625
1626        let (ready_tx, ready_rx) = oneshot::channel();
1627
1628        let spawn_once = SpawnOnce::new(ready_rx, async move {
1629            let server = server_builder
1630                .bind(&network_address, Some(tls_config))
1631                .await
1632                .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1633            let local_addr = server.local_addr();
1634            info!("Listening to traffic on {local_addr}");
1635            ready_tx.send(()).unwrap();
1636            if let Err(err) = server.serve().await {
1637                info!("Server stopped: {err}");
1638            }
1639            info!("Server stopped");
1640        });
1641        Ok((spawn_once, admission_queue))
1642    }
1643
1644    pub fn state(&self) -> Arc<AuthorityState> {
1645        self.state.clone()
1646    }
1647
1648    #[cfg(any(test, msim))]
1649    pub fn connection_monitor_handle_for_testing(
1650        &self,
1651    ) -> &mysten_network::anemo_connection_monitor::ConnectionMonitorHandle {
1652        &self._connection_monitor_handle
1653    }
1654
1655    pub fn node_role(&self) -> NodeRole {
1656        self.state.load_epoch_store_one_call_per_task().node_role()
1657    }
1658
1659    // Only used for testing because of how epoch store is loaded.
1660    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1661        self.state.reference_gas_price_for_testing()
1662    }
1663
1664    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1665        self.state.committee_store().clone()
1666    }
1667
1668    /*
1669    pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1670        self.state.db()
1671    }
1672    */
1673
1674    /// Clone an AuthorityAggregator currently used in this node, if the node is a fullnode.
1675    /// After reconfig, Transaction Driver builds a new AuthorityAggregator. The caller
1676    /// of this function will mostly likely want to call this again
1677    /// to get a fresh one.
1678    pub fn clone_authority_aggregator(
1679        &self,
1680    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1681        self.transaction_orchestrator
1682            .as_ref()
1683            .map(|to| to.clone_authority_aggregator())
1684    }
1685
1686    pub fn transaction_orchestrator(
1687        &self,
1688    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1689        self.transaction_orchestrator.clone()
1690    }
1691
1692    /// This function awaits the completion of checkpoint execution of the current epoch,
1693    /// after which it initiates reconfiguration of the entire system.
1694    pub async fn monitor_reconfiguration(
1695        self: Arc<Self>,
1696        mut epoch_store: Arc<AuthorityPerEpochStore>,
1697    ) -> Result<()> {
1698        let checkpoint_executor_metrics =
1699            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1700
1701        loop {
1702            let mut hasher_guard = self.global_state_hasher.lock().await;
1703            let hasher = hasher_guard.take().unwrap();
1704            info!(
1705                "Creating checkpoint executor for epoch {}",
1706                epoch_store.epoch()
1707            );
1708            let checkpoint_executor = CheckpointExecutor::new(
1709                epoch_store.clone(),
1710                self.checkpoint_store.clone(),
1711                self.state.clone(),
1712                hasher.clone(),
1713                self.backpressure_manager.clone(),
1714                self.config.checkpoint_executor_config.clone(),
1715                checkpoint_executor_metrics.clone(),
1716                self.subscription_service_checkpoint_sender.clone(),
1717            );
1718
1719            let run_with_range = self.config.run_with_range;
1720
1721            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1722
1723            // Update the current protocol version metric.
1724            self.metrics
1725                .current_protocol_version
1726                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1727
1728            // Advertise capabilities to committee, if we are a validator.
1729            // FullNodes that state sync via consensus will also have validator components, by they are not supposed to submit any capabilities.
1730            if let Some(components) = &*self.validator_components.lock().await
1731                && cur_epoch_store.is_validator()
1732            {
1733                // TODO: without this sleep, the consensus message is not delivered reliably.
1734                tokio::time::sleep(Duration::from_millis(1)).await;
1735
1736                let config = cur_epoch_store.protocol_config();
1737                let mut supported_protocol_versions = self
1738                    .config
1739                    .supported_protocol_versions
1740                    .expect("Supported versions should be populated")
1741                    // no need to send digests of versions less than the current version
1742                    .truncate_below(config.version);
1743
1744                while supported_protocol_versions.max > config.version {
1745                    let proposed_protocol_config = ProtocolConfig::get_for_version(
1746                        supported_protocol_versions.max,
1747                        cur_epoch_store.get_chain(),
1748                    );
1749
1750                    if proposed_protocol_config.enable_accumulators()
1751                        && !epoch_store.accumulator_root_exists()
1752                    {
1753                        error!(
1754                            "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1755                            supported_protocol_versions.max
1756                        );
1757                        supported_protocol_versions.max = supported_protocol_versions.max.prev();
1758                    } else {
1759                        break;
1760                    }
1761                }
1762
1763                let binary_config = config.binary_config(None);
1764                let transaction = ConsensusTransaction::new_capability_notification_v2(
1765                    AuthorityCapabilitiesV2::new(
1766                        self.state.name,
1767                        cur_epoch_store.get_chain_identifier().chain(),
1768                        supported_protocol_versions,
1769                        self.state
1770                            .get_available_system_packages(&binary_config)
1771                            .await,
1772                    ),
1773                );
1774                info!(?transaction, "submitting capabilities to consensus");
1775                components.consensus_adapter.submit(
1776                    transaction,
1777                    None,
1778                    &cur_epoch_store,
1779                    None,
1780                    None,
1781                )?;
1782            }
1783
1784            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1785
1786            if stop_condition == StopReason::RunWithRangeCondition {
1787                SuiNode::shutdown(&self).await;
1788                self.shutdown_channel_tx
1789                    .send(run_with_range)
1790                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1791                return Ok(());
1792            }
1793
1794            // Safe to call because we are in the middle of reconfiguration.
1795            let latest_system_state = self
1796                .state
1797                .get_object_cache_reader()
1798                .get_sui_system_state_object_unsafe()
1799                .expect("Read Sui System State object cannot fail");
1800
1801            #[cfg(msim)]
1802            if !self
1803                .sim_state
1804                .sim_safe_mode_expected
1805                .load(Ordering::Relaxed)
1806            {
1807                debug_assert!(!latest_system_state.safe_mode());
1808            }
1809
1810            #[cfg(not(msim))]
1811            debug_assert!(!latest_system_state.safe_mode());
1812
1813            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1814                && self.state.is_fullnode(&cur_epoch_store)
1815            {
1816                warn!(
1817                    "Failed to send end of epoch notification to subscriber: {:?}",
1818                    err
1819                );
1820            }
1821
1822            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1823            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1824
1825            self.auth_agg.store(Arc::new(
1826                self.auth_agg
1827                    .load()
1828                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1829            ));
1830
1831            let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1832            let next_epoch = next_epoch_committee.epoch();
1833            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1834
1835            info!(
1836                next_epoch,
1837                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1838            );
1839
1840            fail_point_async!("reconfig_delay");
1841
1842            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1843
1844            update_peer_addresses(
1845                &self.config,
1846                &self.endpoint_manager,
1847                &new_epoch_start_state,
1848                Some(cur_epoch_store.epoch_start_state()),
1849            );
1850
1851            let mut validator_components_lock_guard = self.validator_components.lock().await;
1852
1853            // The following code handles 4 different cases, depending on whether the node
1854            // was a validator in the previous epoch, and whether the node is a validator
1855            // in the new epoch.
1856            let new_epoch_store = self
1857                .reconfigure_state(
1858                    &self.state,
1859                    &cur_epoch_store,
1860                    next_epoch_committee.clone(),
1861                    new_epoch_start_state,
1862                    hasher.clone(),
1863                )
1864                .await;
1865
1866            let new_role = new_epoch_store.node_role();
1867
1868            let new_validator_components = if let Some(ValidatorComponents {
1869                validator_server_handle,
1870                validator_overload_monitor_handle,
1871                consensus_manager,
1872                consensus_store_pruner,
1873                consensus_adapter,
1874                checkpoint_metrics,
1875                sui_tx_validator_metrics,
1876                admission_queue,
1877            }) = validator_components_lock_guard.take()
1878            {
1879                info!("Reconfiguring node (was running consensus).");
1880
1881                consensus_manager.shutdown().await;
1882                info!("Consensus has shut down.");
1883
1884                info!("Epoch store finished reconfiguration.");
1885
1886                // No other components should be holding a strong reference to state hasher
1887                // at this point. Confirm here before we swap in the new hasher.
1888                let global_state_hasher_metrics = Arc::into_inner(hasher)
1889                    .expect("Object state hasher should have no other references at this point")
1890                    .metrics();
1891                let new_hasher = Arc::new(GlobalStateHasher::new(
1892                    self.state.get_global_state_hash_store().clone(),
1893                    global_state_hasher_metrics,
1894                ));
1895                let weak_hasher = Arc::downgrade(&new_hasher);
1896                *hasher_guard = Some(new_hasher);
1897
1898                consensus_store_pruner.prune(next_epoch).await;
1899
1900                if new_role.runs_consensus() {
1901                    info!("Restarting consensus as {new_role}");
1902                    Some(
1903                        Self::start_epoch_specific_validator_components(
1904                            &self.config,
1905                            self.state.clone(),
1906                            consensus_adapter,
1907                            self.checkpoint_store.clone(),
1908                            new_epoch_store.clone(),
1909                            self.state_sync_handle.clone(),
1910                            self.randomness_handle.clone(),
1911                            consensus_manager,
1912                            consensus_store_pruner,
1913                            weak_hasher,
1914                            self.backpressure_manager.clone(),
1915                            validator_server_handle,
1916                            validator_overload_monitor_handle,
1917                            checkpoint_metrics,
1918                            self.metrics.clone(),
1919                            sui_tx_validator_metrics,
1920                            admission_queue,
1921                            new_role,
1922                        )
1923                        .await?,
1924                    )
1925                } else {
1926                    info!(
1927                        "This node has new role {new_role} and no longer runs consensus after reconfiguration"
1928                    );
1929                    None
1930                }
1931            } else {
1932                // No other components should be holding a strong reference to state hasher
1933                // at this point. Confirm here before we swap in the new hasher.
1934                let global_state_hasher_metrics = Arc::into_inner(hasher)
1935                    .expect("Object state hasher should have no other references at this point")
1936                    .metrics();
1937                let new_hasher = Arc::new(GlobalStateHasher::new(
1938                    self.state.get_global_state_hash_store().clone(),
1939                    global_state_hasher_metrics,
1940                ));
1941                let weak_hasher = Arc::downgrade(&new_hasher);
1942                *hasher_guard = Some(new_hasher);
1943
1944                if new_role.runs_consensus() {
1945                    info!("Promoting node to {new_role}, starting consensus components");
1946
1947                    let mut components = Self::construct_validator_components(
1948                        self.config.clone(),
1949                        self.state.clone(),
1950                        Arc::new(next_epoch_committee.clone()),
1951                        new_epoch_store.clone(),
1952                        self.checkpoint_store.clone(),
1953                        self.state_sync_handle.clone(),
1954                        self.randomness_handle.clone(),
1955                        weak_hasher,
1956                        self.backpressure_manager.clone(),
1957                        &self.registry_service,
1958                        self.metrics.clone(),
1959                        self.checkpoint_metrics.clone(),
1960                        new_role,
1961                    )
1962                    .await?;
1963
1964                    if new_role.is_validator() {
1965                        components.validator_server_handle = Some(
1966                            components
1967                                .validator_server_handle
1968                                .take()
1969                                .unwrap()
1970                                .start()
1971                                .await,
1972                        );
1973
1974                        self.endpoint_manager
1975                            .set_consensus_address_updater(components.consensus_manager.clone());
1976                    }
1977
1978                    Some(components)
1979                } else {
1980                    None
1981                }
1982            };
1983            *validator_components_lock_guard = new_validator_components;
1984
1985            // Force releasing current epoch store DB handle, because the
1986            // Arc<AuthorityPerEpochStore> may linger.
1987            cur_epoch_store.release_db_handles();
1988
1989            if cfg!(msim)
1990                && !matches!(
1991                    self.config
1992                        .authority_store_pruning_config
1993                        .num_epochs_to_retain_for_checkpoints(),
1994                    None | Some(u64::MAX) | Some(0)
1995                )
1996            {
1997                self.state
1998                    .prune_checkpoints_for_eligible_epochs_for_testing(
1999                        self.config.clone(),
2000                        sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2001                    )
2002                    .await?;
2003            }
2004
2005            epoch_store = new_epoch_store;
2006            info!("Reconfiguration finished");
2007        }
2008    }
2009
2010    async fn shutdown(&self) {
2011        if let Some(validator_components) = &*self.validator_components.lock().await {
2012            validator_components.consensus_manager.shutdown().await;
2013        }
2014    }
2015
2016    async fn reconfigure_state(
2017        &self,
2018        state: &Arc<AuthorityState>,
2019        cur_epoch_store: &AuthorityPerEpochStore,
2020        next_epoch_committee: Committee,
2021        next_epoch_start_system_state: EpochStartSystemState,
2022        global_state_hasher: Arc<GlobalStateHasher>,
2023    ) -> Arc<AuthorityPerEpochStore> {
2024        let next_epoch = next_epoch_committee.epoch();
2025
2026        let last_checkpoint = self
2027            .checkpoint_store
2028            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2029            .expect("Error loading last checkpoint for current epoch")
2030            .expect("Could not load last checkpoint for current epoch");
2031
2032        let last_checkpoint_seq = *last_checkpoint.sequence_number();
2033
2034        assert_eq!(
2035            Some(last_checkpoint_seq),
2036            self.checkpoint_store
2037                .get_highest_executed_checkpoint_seq_number()
2038                .expect("Error loading highest executed checkpoint sequence number")
2039        );
2040
2041        let epoch_start_configuration = EpochStartConfiguration::new(
2042            next_epoch_start_system_state,
2043            *last_checkpoint.digest(),
2044            state.get_object_store().as_ref(),
2045            EpochFlag::default_flags_for_new_epoch(&state.config),
2046        )
2047        .expect("EpochStartConfiguration construction cannot fail");
2048
2049        let new_epoch_store = self
2050            .state
2051            .reconfigure(
2052                cur_epoch_store,
2053                self.config.supported_protocol_versions.unwrap(),
2054                next_epoch_committee,
2055                epoch_start_configuration,
2056                global_state_hasher,
2057                &self.config.expensive_safety_check_config,
2058                last_checkpoint_seq,
2059            )
2060            .await
2061            .expect("Reconfigure authority state cannot fail");
2062        info!(next_epoch, "Node State has been reconfigured");
2063        assert_eq!(next_epoch, new_epoch_store.epoch());
2064        self.state.get_reconfig_api().update_epoch_flags_metrics(
2065            cur_epoch_store.epoch_start_config().flags(),
2066            new_epoch_store.epoch_start_config().flags(),
2067        );
2068
2069        new_epoch_store
2070    }
2071
2072    pub fn get_config(&self) -> &NodeConfig {
2073        &self.config
2074    }
2075
2076    pub fn randomness_handle(&self) -> randomness::Handle {
2077        self.randomness_handle.clone()
2078    }
2079
2080    pub fn state_sync_handle(&self) -> state_sync::Handle {
2081        self.state_sync_handle.clone()
2082    }
2083
2084    pub fn endpoint_manager(&self) -> &EndpointManager {
2085        &self.endpoint_manager
2086    }
2087
2088    /// Get a short prefix of a digest for metric labels
2089    fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2090        let digest_str = digest.to_string();
2091        if digest_str.len() >= 8 {
2092            digest_str[0..8].to_string()
2093        } else {
2094            digest_str
2095        }
2096    }
2097
2098    /// Check for previously detected forks and handle them appropriately.
2099    /// For validators with fork recovery config, clear the fork if it matches the recovery config.
2100    /// For all other cases, block node startup if a fork is detected.
2101    async fn check_and_recover_forks(
2102        checkpoint_store: &CheckpointStore,
2103        checkpoint_metrics: &CheckpointMetrics,
2104        fork_recovery: Option<&ForkRecoveryConfig>,
2105    ) -> Result<()> {
2106        // Try to recover from forks if recovery config is provided
2107        if let Some(recovery) = fork_recovery {
2108            Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2109            Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2110        }
2111
2112        if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2113            .get_checkpoint_fork_detected()
2114            .map_err(|e| {
2115                error!("Failed to check for checkpoint fork: {:?}", e);
2116                e
2117            })?
2118        {
2119            Self::handle_checkpoint_fork(
2120                checkpoint_seq,
2121                checkpoint_digest,
2122                checkpoint_metrics,
2123                fork_recovery,
2124            )
2125            .await?;
2126        }
2127        if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2128            .get_transaction_fork_detected()
2129            .map_err(|e| {
2130                error!("Failed to check for transaction fork: {:?}", e);
2131                e
2132            })?
2133        {
2134            Self::handle_transaction_fork(
2135                tx_digest,
2136                expected_effects,
2137                actual_effects,
2138                checkpoint_metrics,
2139                fork_recovery,
2140            )
2141            .await?;
2142        }
2143
2144        Ok(())
2145    }
2146
2147    fn try_recover_checkpoint_fork(
2148        checkpoint_store: &CheckpointStore,
2149        recovery: &ForkRecoveryConfig,
2150    ) -> Result<()> {
2151        // If configured overrides include a checkpoint whose locally computed digest mismatches,
2152        // clear locally computed checkpoints from that sequence (inclusive).
2153        for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2154            let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2155                anyhow::bail!(
2156                    "Invalid checkpoint digest override for seq {}: {}",
2157                    seq,
2158                    expected_digest_str
2159                );
2160            };
2161
2162            if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2163                let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2164                if local_digest != expected_digest {
2165                    info!(
2166                        seq,
2167                        local = %Self::get_digest_prefix(local_digest),
2168                        expected = %Self::get_digest_prefix(expected_digest),
2169                        "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2170                        seq
2171                    );
2172                    checkpoint_store
2173                        .clear_locally_computed_checkpoints_from(*seq)
2174                        .context(
2175                            "Failed to clear locally computed checkpoints from override seq",
2176                        )?;
2177                }
2178            }
2179        }
2180
2181        if let Some((checkpoint_seq, checkpoint_digest)) =
2182            checkpoint_store.get_checkpoint_fork_detected()?
2183            && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2184        {
2185            info!(
2186                "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2187                checkpoint_seq, checkpoint_digest
2188            );
2189            checkpoint_store
2190                .clear_checkpoint_fork_detected()
2191                .expect("Failed to clear checkpoint fork detected marker");
2192        }
2193        Ok(())
2194    }
2195
2196    fn try_recover_transaction_fork(
2197        checkpoint_store: &CheckpointStore,
2198        recovery: &ForkRecoveryConfig,
2199    ) -> Result<()> {
2200        if recovery.transaction_overrides.is_empty() {
2201            return Ok(());
2202        }
2203
2204        if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2205            && recovery
2206                .transaction_overrides
2207                .contains_key(&tx_digest.to_string())
2208        {
2209            info!(
2210                "Fork recovery enabled: clearing transaction fork for tx {:?}",
2211                tx_digest
2212            );
2213            checkpoint_store
2214                .clear_transaction_fork_detected()
2215                .expect("Failed to clear transaction fork detected marker");
2216        }
2217        Ok(())
2218    }
2219
2220    fn get_current_timestamp() -> u64 {
2221        std::time::SystemTime::now()
2222            .duration_since(std::time::SystemTime::UNIX_EPOCH)
2223            .unwrap()
2224            .as_secs()
2225    }
2226
2227    async fn handle_checkpoint_fork(
2228        checkpoint_seq: u64,
2229        checkpoint_digest: CheckpointDigest,
2230        checkpoint_metrics: &CheckpointMetrics,
2231        fork_recovery: Option<&ForkRecoveryConfig>,
2232    ) -> Result<()> {
2233        checkpoint_metrics
2234            .checkpoint_fork_crash_mode
2235            .with_label_values(&[
2236                &checkpoint_seq.to_string(),
2237                &Self::get_digest_prefix(checkpoint_digest),
2238                &Self::get_current_timestamp().to_string(),
2239            ])
2240            .set(1);
2241
2242        let behavior = fork_recovery
2243            .map(|fr| fr.fork_crash_behavior)
2244            .unwrap_or_default();
2245
2246        match behavior {
2247            ForkCrashBehavior::AwaitForkRecovery => {
2248                error!(
2249                    checkpoint_seq = checkpoint_seq,
2250                    checkpoint_digest = ?checkpoint_digest,
2251                    "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2252                );
2253                futures::future::pending::<()>().await;
2254                unreachable!("pending() should never return");
2255            }
2256            ForkCrashBehavior::ReturnError => {
2257                error!(
2258                    checkpoint_seq = checkpoint_seq,
2259                    checkpoint_digest = ?checkpoint_digest,
2260                    "Checkpoint fork detected! Returning error."
2261                );
2262                Err(anyhow::anyhow!(
2263                    "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2264                    checkpoint_seq,
2265                    checkpoint_digest
2266                ))
2267            }
2268        }
2269    }
2270
2271    async fn handle_transaction_fork(
2272        tx_digest: TransactionDigest,
2273        expected_effects_digest: TransactionEffectsDigest,
2274        actual_effects_digest: TransactionEffectsDigest,
2275        checkpoint_metrics: &CheckpointMetrics,
2276        fork_recovery: Option<&ForkRecoveryConfig>,
2277    ) -> Result<()> {
2278        checkpoint_metrics
2279            .transaction_fork_crash_mode
2280            .with_label_values(&[
2281                &Self::get_digest_prefix(tx_digest),
2282                &Self::get_digest_prefix(expected_effects_digest),
2283                &Self::get_digest_prefix(actual_effects_digest),
2284                &Self::get_current_timestamp().to_string(),
2285            ])
2286            .set(1);
2287
2288        let behavior = fork_recovery
2289            .map(|fr| fr.fork_crash_behavior)
2290            .unwrap_or_default();
2291
2292        match behavior {
2293            ForkCrashBehavior::AwaitForkRecovery => {
2294                error!(
2295                    tx_digest = ?tx_digest,
2296                    expected_effects_digest = ?expected_effects_digest,
2297                    actual_effects_digest = ?actual_effects_digest,
2298                    "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2299                );
2300                futures::future::pending::<()>().await;
2301                unreachable!("pending() should never return");
2302            }
2303            ForkCrashBehavior::ReturnError => {
2304                error!(
2305                    tx_digest = ?tx_digest,
2306                    expected_effects_digest = ?expected_effects_digest,
2307                    actual_effects_digest = ?actual_effects_digest,
2308                    "Transaction fork detected! Returning error."
2309                );
2310                Err(anyhow::anyhow!(
2311                    "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2312                    tx_digest,
2313                    expected_effects_digest,
2314                    actual_effects_digest
2315                ))
2316            }
2317        }
2318    }
2319}
2320
2321#[cfg(not(msim))]
2322impl SuiNode {
2323    async fn fetch_jwks(
2324        _authority: AuthorityName,
2325        provider: &OIDCProvider,
2326    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2327        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2328        use sui_types::error::SuiErrorKind;
2329        let client = reqwest::Client::new();
2330        fetch_jwks(provider, &client, true)
2331            .await
2332            .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2333    }
2334}
2335
2336#[cfg(msim)]
2337impl SuiNode {
2338    pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2339        self.sim_state.sim_node.id()
2340    }
2341
2342    pub fn set_safe_mode_expected(&self, new_value: bool) {
2343        info!("Setting safe mode expected to {}", new_value);
2344        self.sim_state
2345            .sim_safe_mode_expected
2346            .store(new_value, Ordering::Relaxed);
2347    }
2348
2349    #[allow(unused_variables)]
2350    async fn fetch_jwks(
2351        authority: AuthorityName,
2352        provider: &OIDCProvider,
2353    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2354        get_jwk_injector()(authority, provider)
2355    }
2356}
2357
2358enum SpawnOnce {
2359    // Mutex is only needed to make SpawnOnce Send
2360    Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2361    #[allow(unused)]
2362    Started(JoinHandle<()>),
2363}
2364
2365impl SpawnOnce {
2366    pub fn new(
2367        ready_rx: oneshot::Receiver<()>,
2368        future: impl Future<Output = ()> + Send + 'static,
2369    ) -> Self {
2370        Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2371    }
2372
2373    pub async fn start(self) -> Self {
2374        match self {
2375            Self::Unstarted(ready_rx, future) => {
2376                let future = future.into_inner();
2377                let handle = tokio::spawn(future);
2378                ready_rx.await.unwrap();
2379                Self::Started(handle)
2380            }
2381            Self::Started(_) => self,
2382        }
2383    }
2384}
2385
2386/// Updates trusted peer addresses in the p2p network (for nodes configured as validators).
2387/// When `prev_epoch_start_state` is provided, validators that are no longer in the committee
2388/// have their Chain addresses cleared.
2389fn update_peer_addresses(
2390    config: &NodeConfig,
2391    endpoint_manager: &EndpointManager,
2392    epoch_start_state: &EpochStartSystemState,
2393    prev_epoch_start_state: Option<&EpochStartSystemState>,
2394) {
2395    if config.consensus_config().is_none() {
2396        return;
2397    }
2398    let new_peers: HashSet<PeerId> = epoch_start_state
2399        .get_validator_as_p2p_peers(config.protocol_public_key())
2400        .into_iter()
2401        .map(|(peer_id, address)| {
2402            endpoint_manager
2403                .update_endpoint(
2404                    EndpointId::P2p(peer_id),
2405                    AddressSource::Chain,
2406                    vec![address],
2407                )
2408                .expect("Updating peer addresses should not fail");
2409            peer_id
2410        })
2411        .collect();
2412
2413    // Clear Chain addresses for validators that left the committee.
2414    if let Some(prev) = prev_epoch_start_state {
2415        for (peer_id, _) in prev.get_validator_as_p2p_peers(config.protocol_public_key()) {
2416            if !new_peers.contains(&peer_id) {
2417                endpoint_manager
2418                    .update_endpoint(EndpointId::P2p(peer_id), AddressSource::Chain, vec![])
2419                    .expect("Clearing peer addresses should not fail");
2420            }
2421        }
2422    }
2423}
2424
2425fn build_kv_store(
2426    state: &Arc<AuthorityState>,
2427    config: &NodeConfig,
2428    registry: &Registry,
2429) -> Result<Arc<TransactionKeyValueStore>> {
2430    let metrics = KeyValueStoreMetrics::new(registry);
2431    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2432
2433    let base_url = &config.transaction_kv_store_read_config.base_url;
2434
2435    if base_url.is_empty() {
2436        info!("no http kv store url provided, using local db only");
2437        return Ok(Arc::new(db_store));
2438    }
2439
2440    let base_url: url::Url = base_url.parse().tap_err(|e| {
2441        error!(
2442            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2443            base_url, e
2444        )
2445    })?;
2446
2447    let network_str = match state.get_chain_identifier().chain() {
2448        Chain::Mainnet => "/mainnet",
2449        _ => {
2450            info!("using local db only for kv store");
2451            return Ok(Arc::new(db_store));
2452        }
2453    };
2454
2455    let base_url = base_url.join(network_str)?.to_string();
2456    let http_store = HttpKVStore::new_kv(
2457        &base_url,
2458        config.transaction_kv_store_read_config.cache_size,
2459        metrics.clone(),
2460    )?;
2461    info!("using local key-value store with fallback to http key-value store");
2462    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2463        db_store,
2464        http_store,
2465        metrics,
2466        "json_rpc_fallback",
2467    )))
2468}
2469
2470async fn build_http_servers(
2471    state: Arc<AuthorityState>,
2472    store: RocksDbStore,
2473    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2474    config: &NodeConfig,
2475    prometheus_registry: &Registry,
2476    server_version: ServerVersion,
2477    node_role: NodeRole,
2478) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2479    // Validators do not expose these APIs
2480    if !node_role.should_run_rpc_servers() {
2481        return Ok((HttpServers::default(), None));
2482    }
2483
2484    info!("starting rpc service with config: {:?}", config.rpc);
2485
2486    let mut router = axum::Router::new();
2487
2488    let json_rpc_router = {
2489        let traffic_controller = state.traffic_controller.clone();
2490        let mut server = JsonRpcServerBuilder::new(
2491            env!("CARGO_PKG_VERSION"),
2492            prometheus_registry,
2493            traffic_controller,
2494            config.policy_config.clone(),
2495        );
2496
2497        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2498
2499        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2500        server.register_module(ReadApi::new(
2501            state.clone(),
2502            kv_store.clone(),
2503            metrics.clone(),
2504        ))?;
2505        server.register_module(CoinReadApi::new(
2506            state.clone(),
2507            kv_store.clone(),
2508            metrics.clone(),
2509        ))?;
2510
2511        // if run_with_range is enabled we want to prevent any transactions
2512        // run_with_range = None is normal operating conditions
2513        if config.run_with_range.is_none() {
2514            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2515        }
2516        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2517        server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2518
2519        if let Some(transaction_orchestrator) = transaction_orchestrator {
2520            server.register_module(TransactionExecutionApi::new(
2521                state.clone(),
2522                transaction_orchestrator.clone(),
2523                metrics.clone(),
2524            ))?;
2525        }
2526
2527        let name_service_config =
2528            if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2529                config.name_service_package_address,
2530                config.name_service_registry_id,
2531                config.name_service_reverse_registry_id,
2532            ) {
2533                sui_name_service::NameServiceConfig::new(
2534                    package_address,
2535                    registry_id,
2536                    reverse_registry_id,
2537                )
2538            } else {
2539                match state.get_chain_identifier().chain() {
2540                    Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2541                    Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2542                    Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2543                }
2544            };
2545
2546        server.register_module(IndexerApi::new(
2547            state.clone(),
2548            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2549            kv_store,
2550            name_service_config,
2551            metrics,
2552            config.indexer_max_subscriptions,
2553        ))?;
2554        server.register_module(MoveUtils::new(state.clone()))?;
2555
2556        let server_type = config.jsonrpc_server_type();
2557
2558        server.to_router(server_type).await?
2559    };
2560
2561    router = router.merge(json_rpc_router);
2562
2563    let (subscription_service_checkpoint_sender, subscription_service_handle) =
2564        SubscriptionService::build(prometheus_registry);
2565    let rpc_router = {
2566        let mut rpc_service =
2567            sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2568        rpc_service.with_server_version(server_version);
2569
2570        if let Some(config) = config.rpc.clone() {
2571            rpc_service.with_config(config);
2572        }
2573
2574        rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2575        rpc_service.with_subscription_service(subscription_service_handle);
2576
2577        if let Some(transaction_orchestrator) = transaction_orchestrator {
2578            rpc_service.with_executor(transaction_orchestrator.clone())
2579        }
2580
2581        rpc_service.into_router().await
2582    };
2583
2584    let layers = ServiceBuilder::new()
2585        .map_request(|mut request: axum::http::Request<_>| {
2586            if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2587                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2588                request.extensions_mut().insert(axum_connect_info);
2589            }
2590            request
2591        })
2592        .layer(axum::middleware::from_fn(server_timing_middleware))
2593        // Setup a permissive CORS policy
2594        .layer(
2595            tower_http::cors::CorsLayer::new()
2596                .allow_methods([http::Method::GET, http::Method::POST])
2597                .allow_origin(tower_http::cors::Any)
2598                .allow_headers(tower_http::cors::Any)
2599                .expose_headers(tower_http::cors::Any),
2600        );
2601
2602    router = router.merge(rpc_router).layer(layers);
2603
2604    let https = if let Some((tls_config, https_address)) = config
2605        .rpc()
2606        .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2607    {
2608        let https = sui_http::Builder::new()
2609            .tls_single_cert(tls_config.cert(), tls_config.key())
2610            .and_then(|builder| builder.serve(https_address, router.clone()))
2611            .map_err(|e| anyhow::anyhow!(e))?;
2612
2613        info!(
2614            https_address =? https.local_addr(),
2615            "HTTPS rpc server listening on {}",
2616            https.local_addr()
2617        );
2618
2619        Some(https)
2620    } else {
2621        None
2622    };
2623
2624    let http = sui_http::Builder::new()
2625        .serve(&config.json_rpc_address, router)
2626        .map_err(|e| anyhow::anyhow!(e))?;
2627
2628    info!(
2629        http_address =? http.local_addr(),
2630        "HTTP rpc server listening on {}",
2631        http.local_addr()
2632    );
2633
2634    Ok((
2635        HttpServers {
2636            http: Some(http),
2637            https,
2638        },
2639        Some(subscription_service_checkpoint_sender),
2640    ))
2641}
2642
2643#[cfg(not(test))]
2644fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2645    protocol_config.max_transactions_per_checkpoint() as usize
2646}
2647
2648#[cfg(test)]
2649fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2650    2
2651}
2652
2653#[derive(Default)]
2654struct HttpServers {
2655    #[allow(unused)]
2656    http: Option<sui_http::ServerHandle>,
2657    #[allow(unused)]
2658    https: Option<sui_http::ServerHandle>,
2659}
2660
2661#[cfg(test)]
2662mod tests {
2663    use super::*;
2664    use prometheus::Registry;
2665    use std::collections::BTreeMap;
2666    use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2667    use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2668    use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2669
2670    #[tokio::test]
2671    async fn test_fork_error_and_recovery_both_paths() {
2672        let checkpoint_store = CheckpointStore::new_for_tests();
2673        let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2674
2675        // ---------- Checkpoint fork path ----------
2676        let seq_num = 42;
2677        let digest = CheckpointDigest::random();
2678        checkpoint_store
2679            .record_checkpoint_fork_detected(seq_num, digest)
2680            .unwrap();
2681
2682        let fork_recovery = ForkRecoveryConfig {
2683            transaction_overrides: Default::default(),
2684            checkpoint_overrides: Default::default(),
2685            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2686        };
2687
2688        let r = SuiNode::check_and_recover_forks(
2689            &checkpoint_store,
2690            &checkpoint_metrics,
2691            Some(&fork_recovery),
2692        )
2693        .await;
2694        assert!(r.is_err());
2695        assert!(
2696            r.unwrap_err()
2697                .to_string()
2698                .contains("Checkpoint fork detected")
2699        );
2700
2701        let mut checkpoint_overrides = BTreeMap::new();
2702        checkpoint_overrides.insert(seq_num, digest.to_string());
2703        let fork_recovery_with_override = ForkRecoveryConfig {
2704            transaction_overrides: Default::default(),
2705            checkpoint_overrides,
2706            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2707        };
2708        let r = SuiNode::check_and_recover_forks(
2709            &checkpoint_store,
2710            &checkpoint_metrics,
2711            Some(&fork_recovery_with_override),
2712        )
2713        .await;
2714        assert!(r.is_ok());
2715        assert!(
2716            checkpoint_store
2717                .get_checkpoint_fork_detected()
2718                .unwrap()
2719                .is_none()
2720        );
2721
2722        // ---------- Transaction fork path ----------
2723        let tx_digest = TransactionDigest::random();
2724        let expected_effects = TransactionEffectsDigest::random();
2725        let actual_effects = TransactionEffectsDigest::random();
2726        checkpoint_store
2727            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2728            .unwrap();
2729
2730        let fork_recovery = ForkRecoveryConfig {
2731            transaction_overrides: Default::default(),
2732            checkpoint_overrides: Default::default(),
2733            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2734        };
2735        let r = SuiNode::check_and_recover_forks(
2736            &checkpoint_store,
2737            &checkpoint_metrics,
2738            Some(&fork_recovery),
2739        )
2740        .await;
2741        assert!(r.is_err());
2742        assert!(
2743            r.unwrap_err()
2744                .to_string()
2745                .contains("Transaction fork detected")
2746        );
2747
2748        let mut transaction_overrides = BTreeMap::new();
2749        transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2750        let fork_recovery_with_override = ForkRecoveryConfig {
2751            transaction_overrides,
2752            checkpoint_overrides: Default::default(),
2753            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2754        };
2755        let r = SuiNode::check_and_recover_forks(
2756            &checkpoint_store,
2757            &checkpoint_metrics,
2758            Some(&fork_recovery_with_override),
2759        )
2760        .await;
2761        assert!(r.is_ok());
2762        assert!(
2763            checkpoint_store
2764                .get_transaction_fork_detected()
2765                .unwrap()
2766                .is_none()
2767        );
2768    }
2769}