sui_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::admission_queue::{
29    AdmissionQueueContext, AdmissionQueueManager, AdmissionQueueMetrics,
30};
31use sui_core::authority::ExecutionEnv;
32use sui_core::authority::RandomnessRoundReceiver;
33use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
34use sui_core::authority::backpressure::BackpressureManager;
35use sui_core::authority::epoch_start_configuration::EpochFlag;
36use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
37use sui_core::consensus_adapter::ConsensusClient;
38use sui_core::consensus_manager::UpdatableConsensusClient;
39use sui_core::epoch::randomness::RandomnessManager;
40use sui_core::execution_cache::build_execution_cache;
41use sui_network::endpoint_manager::{AddressSource, EndpointId};
42use sui_network::validator::server::SUI_TLS_SERVER_NAME;
43use sui_types::full_checkpoint_content::Checkpoint;
44use sui_types::node_role::NodeRole;
45
46use sui_core::global_state_hasher::GlobalStateHashMetrics;
47use sui_core::storage::RestReadStore;
48use sui_json_rpc::bridge_api::BridgeReadApi;
49use sui_json_rpc_api::JsonRpcMetrics;
50use sui_network::randomness;
51use sui_rpc_api::RpcMetrics;
52use sui_rpc_api::ServerVersion;
53use sui_rpc_api::subscription::SubscriptionService;
54use sui_types::base_types::ConciseableName;
55use sui_types::crypto::RandomnessRound;
56use sui_types::digests::{
57    ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
58};
59use sui_types::messages_consensus::AuthorityCapabilitiesV2;
60use sui_types::sui_system_state::SuiSystemState;
61use tap::tap::TapFallible;
62use tokio::sync::oneshot;
63use tokio::sync::{Mutex, broadcast, mpsc};
64use tokio::task::JoinHandle;
65use tower::ServiceBuilder;
66use tracing::{Instrument, error_span, info};
67use tracing::{debug, error, warn};
68
69// Logs at debug level in test configuration, info level otherwise.
70// JWK logs cause significant volume in tests, but are insignificant in prod,
71// so we keep them at info
72macro_rules! jwk_log {
73    ($($arg:tt)+) => {
74        if in_test_configuration() {
75            debug!($($arg)+);
76        } else {
77            info!($($arg)+);
78        }
79    };
80}
81
82use fastcrypto_zkp::bn254::zk_login::JWK;
83pub use handle::SuiNodeHandle;
84use mysten_metrics::{RegistryService, spawn_monitored_task};
85use mysten_service::server_timing::server_timing_middleware;
86use sui_config::node::{DBCheckpointConfig, RunWithRange};
87use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
88use sui_config::node_config_metrics::NodeConfigMetrics;
89use sui_config::{ConsensusConfig, NodeConfig};
90use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
91use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
92use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
93use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
94use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
95use sui_core::authority_aggregator::AuthorityAggregator;
96use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
97use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
98use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
99use sui_core::checkpoints::{
100    CheckpointMetrics, CheckpointOutput, CheckpointService, CheckpointStore, LogCheckpointOutput,
101    SendCheckpointToStateSync, SubmitCheckpointToConsensus,
102};
103use sui_core::consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics};
104use sui_core::consensus_manager::ConsensusManager;
105use sui_core::consensus_throughput_calculator::ConsensusThroughputCalculator;
106use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
107use sui_core::db_checkpoint_handler::DBCheckpointHandler;
108use sui_core::epoch::committee_store::CommitteeStore;
109use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
110use sui_core::epoch::epoch_metrics::EpochMetrics;
111use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
112use sui_core::global_state_hasher::GlobalStateHasher;
113use sui_core::jsonrpc_index::IndexStore;
114use sui_core::module_cache_metrics::ResolverMetrics;
115use sui_core::overload_monitor::overload_monitor;
116use sui_core::rpc_index::RpcIndexStore;
117use sui_core::signature_verifier::SignatureVerifierMetrics;
118use sui_core::storage::RocksDbStore;
119use sui_core::transaction_orchestrator::TransactionOrchestrator;
120use sui_core::{
121    authority::{AuthorityState, AuthorityStore},
122    authority_client::NetworkAuthorityClient,
123};
124use sui_json_rpc::JsonRpcServerBuilder;
125use sui_json_rpc::coin_api::CoinReadApi;
126use sui_json_rpc::governance_api::GovernanceReadApi;
127use sui_json_rpc::indexer_api::IndexerApi;
128use sui_json_rpc::move_utils::MoveUtils;
129use sui_json_rpc::read_api::ReadApi;
130use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
131use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
132use sui_macros::fail_point;
133use sui_macros::{fail_point_async, replay_log};
134use sui_network::api::ValidatorServer;
135use sui_network::discovery;
136use sui_network::endpoint_manager::EndpointManager;
137use sui_network::state_sync;
138use sui_network::validator::server::ServerBuilder;
139use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
140use sui_snapshot::uploader::StateSnapshotUploader;
141use sui_storage::{
142    http_key_value_store::HttpKVStore,
143    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
144    key_value_store_metrics::KeyValueStoreMetrics,
145};
146use sui_types::base_types::{AuthorityName, EpochId};
147use sui_types::committee::Committee;
148use sui_types::crypto::KeypairTraits;
149use sui_types::error::{SuiError, SuiResult};
150use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
151use sui_types::sui_system_state::SuiSystemStateTrait;
152use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
153use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
154use sui_types::supported_protocol_versions::SupportedProtocolVersions;
155use typed_store::DBMetrics;
156use typed_store::rocks::default_db_options;
157
158use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
159
160pub mod admin;
161mod handle;
162pub mod metrics;
163
164pub struct ValidatorComponents {
165    validator_server_handle: Option<SpawnOnce>,
166    validator_overload_monitor_handle: Option<JoinHandle<()>>,
167    consensus_manager: Arc<ConsensusManager>,
168    consensus_store_pruner: ConsensusStorePruner,
169    consensus_adapter: Arc<ConsensusAdapter>,
170    checkpoint_metrics: Arc<CheckpointMetrics>,
171    sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
172    admission_queue: Option<AdmissionQueueContext>,
173}
174pub struct P2pComponents {
175    p2p_network: Network,
176    known_peers: HashMap<PeerId, String>,
177    discovery_handle: discovery::Handle,
178    state_sync_handle: state_sync::Handle,
179    randomness_handle: randomness::Handle,
180    endpoint_manager: EndpointManager,
181}
182
183#[cfg(msim)]
184mod simulator {
185    use std::sync::atomic::AtomicBool;
186    use sui_types::error::SuiErrorKind;
187
188    use super::*;
189    pub(super) struct SimState {
190        pub sim_node: sui_simulator::runtime::NodeHandle,
191        pub sim_safe_mode_expected: AtomicBool,
192        _leak_detector: sui_simulator::NodeLeakDetector,
193    }
194
195    impl Default for SimState {
196        fn default() -> Self {
197            Self {
198                sim_node: sui_simulator::runtime::NodeHandle::current(),
199                sim_safe_mode_expected: AtomicBool::new(false),
200                _leak_detector: sui_simulator::NodeLeakDetector::new(),
201            }
202        }
203    }
204
205    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
206        + Send
207        + Sync
208        + 'static;
209
210    fn default_fetch_jwks(
211        _authority: AuthorityName,
212        _provider: &OIDCProvider,
213    ) -> SuiResult<Vec<(JwkId, JWK)>> {
214        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
215        // Just load a default Twitch jwk for testing.
216        parse_jwks(
217            sui_types::zk_login_util::DEFAULT_JWK_BYTES,
218            &OIDCProvider::Twitch,
219            true,
220        )
221        .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
222    }
223
224    thread_local! {
225        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
226    }
227
228    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
229        JWK_INJECTOR.with(|injector| injector.borrow().clone())
230    }
231
232    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
233        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
234    }
235}
236
237#[cfg(msim)]
238pub use simulator::set_jwk_injector;
239#[cfg(msim)]
240use simulator::*;
241use sui_core::authority::authority_store_pruner::PrunerWatermarks;
242use sui_core::{
243    consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
244};
245
246const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
247
248pub struct SuiNode {
249    config: NodeConfig,
250    validator_components: Mutex<Option<ValidatorComponents>>,
251
252    /// The http servers responsible for serving RPC traffic (gRPC and JSON-RPC)
253    #[allow(unused)]
254    http_servers: HttpServers,
255
256    state: Arc<AuthorityState>,
257    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
258    registry_service: RegistryService,
259    metrics: Arc<SuiNodeMetrics>,
260    checkpoint_metrics: Arc<CheckpointMetrics>,
261
262    _discovery: discovery::Handle,
263    _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
264    state_sync_handle: state_sync::Handle,
265    randomness_handle: randomness::Handle,
266    checkpoint_store: Arc<CheckpointStore>,
267    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
268
269    /// Broadcast channel to send the starting system state for the next epoch.
270    end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
271
272    /// EndpointManager for updating peer network addresses.
273    endpoint_manager: EndpointManager,
274
275    backpressure_manager: Arc<BackpressureManager>,
276
277    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
278
279    #[cfg(msim)]
280    sim_state: SimState,
281
282    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
283    // Channel to allow signaling upstream to shutdown sui-node
284    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
285
286    /// AuthorityAggregator of the network, created at start and beginning of each epoch.
287    /// Use ArcSwap so that we could mutate it without taking mut reference.
288    // TODO: Eventually we can make this auth aggregator a shared reference so that this
289    // update will automatically propagate to other uses.
290    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
291
292    subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
293}
294
295impl fmt::Debug for SuiNode {
296    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
297        f.debug_struct("SuiNode")
298            .field("name", &self.state.name.concise())
299            .finish()
300    }
301}
302
303static MAX_JWK_KEYS_PER_FETCH: usize = 100;
304
305impl SuiNode {
306    pub async fn start(
307        config: NodeConfig,
308        registry_service: RegistryService,
309    ) -> Result<Arc<SuiNode>> {
310        Self::start_async(
311            config,
312            registry_service,
313            ServerVersion::new("sui-node", "unknown"),
314        )
315        .await
316    }
317
318    fn start_jwk_updater(
319        config: &NodeConfig,
320        metrics: Arc<SuiNodeMetrics>,
321        authority: AuthorityName,
322        epoch_store: Arc<AuthorityPerEpochStore>,
323        consensus_adapter: Arc<ConsensusAdapter>,
324    ) {
325        let epoch = epoch_store.epoch();
326
327        let supported_providers = config
328            .zklogin_oauth_providers
329            .get(&epoch_store.get_chain_identifier().chain())
330            .unwrap_or(&BTreeSet::new())
331            .iter()
332            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
333            .collect::<Vec<_>>();
334
335        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
336
337        info!(
338            ?fetch_interval,
339            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
340        );
341
342        fn validate_jwk(
343            metrics: &Arc<SuiNodeMetrics>,
344            provider: &OIDCProvider,
345            id: &JwkId,
346            jwk: &JWK,
347        ) -> bool {
348            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
349                warn!(
350                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
351                    id.iss, provider
352                );
353                metrics
354                    .invalid_jwks
355                    .with_label_values(&[&provider.to_string()])
356                    .inc();
357                return false;
358            };
359
360            if iss_provider != *provider {
361                warn!(
362                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
363                    id.iss, provider, iss_provider
364                );
365                metrics
366                    .invalid_jwks
367                    .with_label_values(&[&provider.to_string()])
368                    .inc();
369                return false;
370            }
371
372            if !check_total_jwk_size(id, jwk) {
373                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
374                metrics
375                    .invalid_jwks
376                    .with_label_values(&[&provider.to_string()])
377                    .inc();
378                return false;
379            }
380
381            true
382        }
383
384        // metrics is:
385        //  pub struct SuiNodeMetrics {
386        //      pub jwk_requests: IntCounterVec,
387        //      pub jwk_request_errors: IntCounterVec,
388        //      pub total_jwks: IntCounterVec,
389        //      pub unique_jwks: IntCounterVec,
390        //  }
391
392        for p in supported_providers.into_iter() {
393            let provider_str = p.to_string();
394            let epoch_store = epoch_store.clone();
395            let consensus_adapter = consensus_adapter.clone();
396            let metrics = metrics.clone();
397            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
398                async move {
399                    // note: restart-safe de-duplication happens after consensus, this is
400                    // just best-effort to reduce unneeded submissions.
401                    let mut seen = HashSet::new();
402                    loop {
403                        jwk_log!("fetching JWK for provider {:?}", p);
404                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
405                        match Self::fetch_jwks(authority, &p).await {
406                            Err(e) => {
407                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
408                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
409                                // Retry in 30 seconds
410                                tokio::time::sleep(Duration::from_secs(30)).await;
411                                continue;
412                            }
413                            Ok(mut keys) => {
414                                metrics.total_jwks
415                                    .with_label_values(&[&provider_str])
416                                    .inc_by(keys.len() as u64);
417
418                                keys.retain(|(id, jwk)| {
419                                    validate_jwk(&metrics, &p, id, jwk) &&
420                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
421                                    seen.insert((id.clone(), jwk.clone()))
422                                });
423
424                                metrics.unique_jwks
425                                    .with_label_values(&[&provider_str])
426                                    .inc_by(keys.len() as u64);
427
428                                // prevent oauth providers from sending too many keys,
429                                // inadvertently or otherwise
430                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
431                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
432                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
433                                }
434
435                                for (id, jwk) in keys.into_iter() {
436                                    jwk_log!("Submitting JWK to consensus: {:?}", id);
437
438                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
439                                    consensus_adapter.submit(txn, None, &epoch_store, None, None)
440                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
441                                        .ok();
442                                }
443                            }
444                        }
445                        tokio::time::sleep(fetch_interval).await;
446                    }
447                }
448                .instrument(error_span!("jwk_updater_task", epoch)),
449            ));
450        }
451    }
452
453    pub async fn start_async(
454        config: NodeConfig,
455        registry_service: RegistryService,
456        server_version: ServerVersion,
457    ) -> Result<Arc<SuiNode>> {
458        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
459        let mut config = config.clone();
460        if config.supported_protocol_versions.is_none() {
461            info!(
462                "populating config.supported_protocol_versions with default {:?}",
463                SupportedProtocolVersions::SYSTEM_DEFAULT
464            );
465            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
466        }
467
468        let run_with_range = config.run_with_range;
469        let prometheus_registry = registry_service.default_registry();
470
471        info!(node =? config.protocol_public_key(),
472            "Initializing sui-node listening on {}", config.network_address
473        );
474
475        // Initialize metrics to track db usage before creating any stores
476        DBMetrics::init(registry_service.clone());
477
478        // Initialize db sync-to-disk setting from config (falls back to env var if not set)
479        typed_store::init_write_sync(config.enable_db_sync_to_disk);
480
481        // Initialize Mysten metrics.
482        mysten_metrics::init_metrics(&prometheus_registry);
483        // Unsupported (because of the use of static variable) and unnecessary in simtests.
484        #[cfg(not(msim))]
485        mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
486
487        let genesis = config.genesis()?.clone();
488
489        let secret = Arc::pin(config.protocol_key_pair().copy());
490        let genesis_committee = genesis.committee();
491        let committee_store = Arc::new(CommitteeStore::new(
492            config.db_path().join("epochs"),
493            &genesis_committee,
494            None,
495        ));
496
497        let node_role = config.intended_node_role();
498
499        let pruner_watermarks = Arc::new(PrunerWatermarks::default());
500        let checkpoint_store = CheckpointStore::new(
501            &config.db_path().join("checkpoints"),
502            pruner_watermarks.clone(),
503        );
504        let checkpoint_metrics = CheckpointMetrics::new(&registry_service.default_registry());
505
506        if node_role.runs_consensus() {
507            Self::check_and_recover_forks(
508                &checkpoint_store,
509                &checkpoint_metrics,
510                config.fork_recovery.as_ref(),
511            )
512            .await?;
513        }
514
515        // By default, only enable write stall on nodes that run consensus.
516        let enable_write_stall = config
517            .enable_db_write_stall
518            .unwrap_or(node_role.runs_consensus());
519        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
520            enable_write_stall,
521            is_validator: node_role.is_validator(),
522        };
523        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
524            &config.db_store_path(),
525            Some(perpetual_tables_options),
526            Some(pruner_watermarks.epoch_id.clone()),
527        ));
528        let is_genesis = perpetual_tables
529            .database_is_empty()
530            .expect("Database read should not fail at init.");
531
532        let backpressure_manager =
533            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
534
535        let store =
536            AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
537
538        let cur_epoch = store.get_recovery_epoch_at_restart()?;
539        let committee = committee_store
540            .get_committee(&cur_epoch)?
541            .expect("Committee of the current epoch must exist");
542        let epoch_start_configuration = store
543            .get_epoch_start_configuration()?
544            .expect("EpochStartConfiguration of the current epoch must exist");
545        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
546        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
547
548        let cache_traits = build_execution_cache(
549            &config.execution_cache,
550            &prometheus_registry,
551            &store,
552            backpressure_manager.clone(),
553        );
554
555        let auth_agg = {
556            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
557            Arc::new(ArcSwap::new(Arc::new(
558                AuthorityAggregator::new_from_epoch_start_state(
559                    epoch_start_configuration.epoch_start_state(),
560                    &committee_store,
561                    safe_client_metrics_base,
562                ),
563            )))
564        };
565
566        let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
567        let chain = match config.chain_override_for_testing {
568            Some(chain) => chain,
569            None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
570        };
571
572        let highest_executed_checkpoint = checkpoint_store
573            .get_highest_executed_checkpoint_seq_number()
574            .expect("checkpoint store read cannot fail")
575            .unwrap_or(0);
576
577        let previous_epoch_last_checkpoint = if cur_epoch == 0 {
578            0
579        } else {
580            checkpoint_store
581                .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
582                .expect("checkpoint store read cannot fail")
583                .unwrap_or(highest_executed_checkpoint)
584        };
585
586        let epoch_options = default_db_options().optimize_db_for_write_throughput(4, false);
587        let epoch_store = AuthorityPerEpochStore::new(
588            config.protocol_public_key(),
589            committee.clone(),
590            &config.db_store_path(),
591            Some(epoch_options.options),
592            EpochMetrics::new(&registry_service.default_registry()),
593            epoch_start_configuration,
594            cache_traits.backing_package_store.clone(),
595            cache_traits.object_store.clone(),
596            cache_metrics,
597            signature_verifier_metrics,
598            &config.expensive_safety_check_config,
599            (chain_id, chain),
600            highest_executed_checkpoint,
601            previous_epoch_last_checkpoint,
602            Arc::new(SubmittedTransactionCacheMetrics::new(
603                &registry_service.default_registry(),
604            )),
605            config.fullnode_sync_mode,
606        )?;
607
608        info!("created epoch store");
609
610        replay_log!(
611            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
612            epoch_store.epoch(),
613            epoch_store.protocol_config()
614        );
615
616        // the database is empty at genesis time
617        if is_genesis {
618            info!("checking SUI conservation at genesis");
619            // When we are opening the db table, the only time when it's safe to
620            // check SUI conservation is at genesis. Otherwise we may be in the middle of
621            // an epoch and the SUI conservation check will fail. This also initialize
622            // the expected_network_sui_amount table.
623            cache_traits
624                .reconfig_api
625                .expensive_check_sui_conservation(&epoch_store)
626                .expect("SUI conservation check cannot fail at genesis");
627        }
628
629        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
630        let default_buffer_stake = epoch_store
631            .protocol_config()
632            .buffer_stake_for_protocol_upgrade_bps();
633        if effective_buffer_stake != default_buffer_stake {
634            warn!(
635                ?effective_buffer_stake,
636                ?default_buffer_stake,
637                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
638            );
639        }
640
641        checkpoint_store.insert_genesis_checkpoint(
642            genesis.checkpoint(),
643            genesis.checkpoint_contents().clone(),
644            &epoch_store,
645        );
646
647        info!("creating state sync store");
648        let state_sync_store = RocksDbStore::new(
649            cache_traits.clone(),
650            committee_store.clone(),
651            checkpoint_store.clone(),
652        );
653
654        let index_store =
655            if node_role.should_enable_index_processing() && config.enable_index_processing {
656                info!("creating jsonrpc index store");
657                Some(Arc::new(IndexStore::new(
658                    config.db_path().join("indexes"),
659                    &prometheus_registry,
660                    epoch_store
661                        .protocol_config()
662                        .max_move_identifier_len_as_option(),
663                    config.remove_deprecated_tables,
664                )))
665            } else {
666                None
667            };
668
669        let rpc_index = if node_role.should_enable_index_processing()
670            && config.rpc().is_some_and(|rpc| rpc.enable_indexing())
671        {
672            info!("creating rpc index store");
673            Some(Arc::new(
674                RpcIndexStore::new(
675                    &config.db_path(),
676                    &store,
677                    &checkpoint_store,
678                    &epoch_store,
679                    &cache_traits.backing_package_store,
680                    pruner_watermarks.checkpoint_id.clone(),
681                    config.rpc().cloned().unwrap_or_default(),
682                )
683                .await,
684            ))
685        } else {
686            None
687        };
688
689        let chain_identifier = epoch_store.get_chain_identifier();
690
691        info!("creating archive reader");
692        // Create network
693        let (randomness_tx, randomness_rx) = mpsc::channel(
694            config
695                .p2p_config
696                .randomness
697                .clone()
698                .unwrap_or_default()
699                .mailbox_capacity(),
700        );
701        let P2pComponents {
702            p2p_network,
703            known_peers,
704            discovery_handle,
705            state_sync_handle,
706            randomness_handle,
707            endpoint_manager,
708        } = Self::create_p2p_network(
709            &config,
710            state_sync_store.clone(),
711            chain_identifier,
712            randomness_tx,
713            &prometheus_registry,
714        )?;
715
716        // Inject configured peer address overrides.
717        for peer in &config.p2p_config.peer_address_overrides {
718            endpoint_manager
719                .update_endpoint(
720                    EndpointId::P2p(peer.peer_id),
721                    AddressSource::Config,
722                    peer.addresses.clone(),
723                )
724                .expect("Updating peer address overrides should not fail");
725        }
726
727        // Send initial peer addresses to the p2p network.
728        update_peer_addresses(&config, &endpoint_manager, epoch_store.epoch_start_state());
729
730        info!("start snapshot upload");
731        // Start uploading state snapshot to remote store
732        let state_snapshot_handle = Self::start_state_snapshot(
733            &config,
734            &prometheus_registry,
735            checkpoint_store.clone(),
736            chain_identifier,
737        )?;
738
739        // Start uploading db checkpoints to remote store
740        info!("start db checkpoint");
741        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
742            &config,
743            &prometheus_registry,
744            state_snapshot_handle.is_some(),
745        )?;
746
747        if !epoch_store
748            .protocol_config()
749            .simplified_unwrap_then_delete()
750        {
751            // We cannot prune tombstones if simplified_unwrap_then_delete is not enabled.
752            config
753                .authority_store_pruning_config
754                .set_killswitch_tombstone_pruning(true);
755        }
756
757        let authority_name = config.protocol_public_key();
758
759        info!("create authority state");
760        let state = AuthorityState::new(
761            authority_name,
762            secret,
763            config.supported_protocol_versions.unwrap(),
764            store.clone(),
765            cache_traits.clone(),
766            epoch_store.clone(),
767            committee_store.clone(),
768            index_store.clone(),
769            rpc_index,
770            checkpoint_store.clone(),
771            &prometheus_registry,
772            genesis.objects(),
773            &db_checkpoint_config,
774            config.clone(),
775            chain_identifier,
776            config.policy_config.clone(),
777            config.firewall_config.clone(),
778            pruner_watermarks,
779        )
780        .await;
781        // ensure genesis txn was executed
782        if epoch_store.epoch() == 0 {
783            let txn = &genesis.transaction();
784            let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
785            let transaction =
786                sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
787                    sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
788                        genesis.transaction().data().clone(),
789                        sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
790                    ),
791                );
792            state
793                .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
794                .instrument(span)
795                .await
796                .unwrap();
797        }
798
799        // Start the loop that receives new randomness and generates transactions for it.
800        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
801
802        if config
803            .expensive_safety_check_config
804            .enable_secondary_index_checks()
805            && let Some(indexes) = state.indexes.clone()
806        {
807            sui_core::verify_indexes::verify_indexes(
808                state.get_global_state_hash_store().as_ref(),
809                indexes,
810            )
811            .expect("secondary indexes are inconsistent");
812        }
813
814        let (end_of_epoch_channel, end_of_epoch_receiver) =
815            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
816
817        let transaction_orchestrator = if node_role.is_fullnode() && run_with_range.is_none() {
818            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
819                auth_agg.load_full(),
820                state.clone(),
821                end_of_epoch_receiver,
822                &config.db_path(),
823                &prometheus_registry,
824                &config,
825            )))
826        } else {
827            None
828        };
829
830        let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
831            state.clone(),
832            state_sync_store,
833            &transaction_orchestrator.clone(),
834            &config,
835            &prometheus_registry,
836            server_version,
837            node_role,
838        )
839        .await?;
840
841        let global_state_hasher = Arc::new(GlobalStateHasher::new(
842            cache_traits.global_state_hash_store.clone(),
843            GlobalStateHashMetrics::new(&prometheus_registry),
844        ));
845
846        let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
847            "sui",
848            &registry_service.default_registry(),
849        );
850
851        let connection_monitor_handle =
852            mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
853                p2p_network.downgrade(),
854                Arc::new(network_connection_metrics),
855                known_peers,
856            );
857
858        let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));
859
860        sui_node_metrics
861            .binary_max_protocol_version
862            .set(ProtocolVersion::MAX.as_u64() as i64);
863        sui_node_metrics
864            .configured_max_protocol_version
865            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
866
867        let node_role = epoch_store.node_role();
868        let validator_components = if node_role.runs_consensus() {
869            let mut components = Self::construct_validator_components(
870                config.clone(),
871                state.clone(),
872                committee,
873                epoch_store.clone(),
874                checkpoint_store.clone(),
875                state_sync_handle.clone(),
876                randomness_handle.clone(),
877                Arc::downgrade(&global_state_hasher),
878                backpressure_manager.clone(),
879                &registry_service,
880                sui_node_metrics.clone(),
881                checkpoint_metrics.clone(),
882                node_role,
883            )
884            .await?;
885
886            if node_role.is_validator() {
887                components
888                    .consensus_adapter
889                    .recover_end_of_publish(&epoch_store);
890
891                // Start the gRPC server
892                components.validator_server_handle = Some(
893                    components
894                        .validator_server_handle
895                        .take()
896                        .unwrap()
897                        .start()
898                        .await,
899                );
900
901                // Set the consensus address updater so that we can update the consensus peer addresses when requested.
902                endpoint_manager
903                    .set_consensus_address_updater(components.consensus_manager.clone());
904            } else {
905                info!("Starting node as Observer — connecting to configured peers");
906            }
907
908            Some(components)
909        } else {
910            None
911        };
912
913        // setup shutdown channel
914        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
915
916        let node = Self {
917            config,
918            validator_components: Mutex::new(validator_components),
919            http_servers,
920            state,
921            transaction_orchestrator,
922            registry_service,
923            metrics: sui_node_metrics,
924            checkpoint_metrics,
925
926            _discovery: discovery_handle,
927            _connection_monitor_handle: connection_monitor_handle,
928            state_sync_handle,
929            randomness_handle,
930            checkpoint_store,
931            global_state_hasher: Mutex::new(Some(global_state_hasher)),
932            end_of_epoch_channel,
933            endpoint_manager,
934            backpressure_manager,
935
936            _db_checkpoint_handle: db_checkpoint_handle,
937
938            #[cfg(msim)]
939            sim_state: Default::default(),
940
941            _state_snapshot_uploader_handle: state_snapshot_handle,
942            shutdown_channel_tx: shutdown_channel,
943
944            auth_agg,
945            subscription_service_checkpoint_sender,
946        };
947
948        info!("SuiNode started!");
949        let node = Arc::new(node);
950        let node_copy = node.clone();
951        spawn_monitored_task!(async move {
952            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
953            if let Err(error) = result {
954                warn!("Reconfiguration finished with error {:?}", error);
955            }
956        });
957
958        Ok(node)
959    }
960
961    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
962        self.end_of_epoch_channel.subscribe()
963    }
964
965    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
966        self.shutdown_channel_tx.subscribe()
967    }
968
969    pub fn current_epoch_for_testing(&self) -> EpochId {
970        self.state.current_epoch_for_testing()
971    }
972
973    pub fn db_checkpoint_path(&self) -> PathBuf {
974        self.config.db_checkpoint_path()
975    }
976
977    // Init reconfig process by starting to reject user certs
978    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
979        info!("close_epoch (current epoch = {})", epoch_store.epoch());
980        self.validator_components
981            .lock()
982            .await
983            .as_ref()
984            .ok_or_else(|| SuiError::from("Node is not a validator"))?
985            .consensus_adapter
986            .close_epoch(epoch_store);
987        Ok(())
988    }
989
990    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
991        self.state
992            .clear_override_protocol_upgrade_buffer_stake(epoch)
993    }
994
995    pub fn set_override_protocol_upgrade_buffer_stake(
996        &self,
997        epoch: EpochId,
998        buffer_stake_bps: u64,
999    ) -> SuiResult {
1000        self.state
1001            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1002    }
1003
1004    // Testing-only API to start epoch close process.
1005    // For production code, please use the non-testing version.
1006    pub async fn close_epoch_for_testing(&self) -> SuiResult {
1007        let epoch_store = self.state.epoch_store_for_testing();
1008        self.close_epoch(&epoch_store).await
1009    }
1010
1011    fn start_state_snapshot(
1012        config: &NodeConfig,
1013        prometheus_registry: &Registry,
1014        checkpoint_store: Arc<CheckpointStore>,
1015        chain_identifier: ChainIdentifier,
1016    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1017        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1018            let snapshot_uploader = StateSnapshotUploader::new(
1019                &config.db_checkpoint_path(),
1020                &config.snapshot_path(),
1021                remote_store_config.clone(),
1022                60,
1023                prometheus_registry,
1024                checkpoint_store,
1025                chain_identifier,
1026                config.state_snapshot_write_config.archive_interval_epochs,
1027            )?;
1028            Ok(Some(snapshot_uploader.start()))
1029        } else {
1030            Ok(None)
1031        }
1032    }
1033
1034    fn start_db_checkpoint(
1035        config: &NodeConfig,
1036        prometheus_registry: &Registry,
1037        state_snapshot_enabled: bool,
1038    ) -> Result<(
1039        DBCheckpointConfig,
1040        Option<tokio::sync::broadcast::Sender<()>>,
1041    )> {
1042        let checkpoint_path = Some(
1043            config
1044                .db_checkpoint_config
1045                .checkpoint_path
1046                .clone()
1047                .unwrap_or_else(|| config.db_checkpoint_path()),
1048        );
1049        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1050            DBCheckpointConfig {
1051                checkpoint_path,
1052                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1053                    true
1054                } else {
1055                    config
1056                        .db_checkpoint_config
1057                        .perform_db_checkpoints_at_epoch_end
1058                },
1059                ..config.db_checkpoint_config.clone()
1060            }
1061        } else {
1062            config.db_checkpoint_config.clone()
1063        };
1064
1065        match (
1066            db_checkpoint_config.object_store_config.as_ref(),
1067            state_snapshot_enabled,
1068        ) {
1069            // If db checkpoint config object store not specified but
1070            // state snapshot object store is specified, create handler
1071            // anyway for marking db checkpoints as completed so that they
1072            // can be uploaded as state snapshots.
1073            (None, false) => Ok((db_checkpoint_config, None)),
1074            (_, _) => {
1075                let handler = DBCheckpointHandler::new(
1076                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1077                    db_checkpoint_config.object_store_config.as_ref(),
1078                    60,
1079                    db_checkpoint_config
1080                        .prune_and_compact_before_upload
1081                        .unwrap_or(true),
1082                    config.authority_store_pruning_config.clone(),
1083                    prometheus_registry,
1084                    state_snapshot_enabled,
1085                )?;
1086                Ok((
1087                    db_checkpoint_config,
1088                    Some(DBCheckpointHandler::start(handler)),
1089                ))
1090            }
1091        }
1092    }
1093
1094    fn create_p2p_network(
1095        config: &NodeConfig,
1096        state_sync_store: RocksDbStore,
1097        chain_identifier: ChainIdentifier,
1098        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1099        prometheus_registry: &Registry,
1100    ) -> Result<P2pComponents> {
1101        let mut p2p_config = config.p2p_config.clone();
1102        {
1103            let disc = p2p_config.discovery.get_or_insert_with(Default::default);
1104            if disc.peer_addr_store_path.is_none() {
1105                disc.peer_addr_store_path =
1106                    Some(config.db_path().join("discovery_peer_cache.yaml"));
1107            }
1108        }
1109        let mut discovery_builder = discovery::Builder::new().config(p2p_config.clone());
1110        if let Some(consensus_config) = &config.consensus_config {
1111            let effective_addr = consensus_config
1112                .external_address
1113                .as_ref()
1114                .or(consensus_config.listen_address.as_ref());
1115            if let Some(addr) = effective_addr {
1116                discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1117            }
1118        }
1119        let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1120        let discovery_sender = discovery.sender();
1121
1122        let (state_sync, state_sync_router) = state_sync::Builder::new()
1123            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1124            .store(state_sync_store)
1125            .archive_config(config.archive_reader_config())
1126            .discovery_sender(discovery_sender)
1127            .with_metrics(prometheus_registry)
1128            .build();
1129
1130        let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1131        let known_peers: HashMap<PeerId, String> = discovery_config
1132            .allowlisted_peers
1133            .clone()
1134            .into_iter()
1135            .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1136            .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1137                peer.peer_id
1138                    .map(|peer_id| (peer_id, "seed_peer".to_string()))
1139            }))
1140            .collect();
1141
1142        let (randomness, randomness_router) =
1143            randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1144                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1145                .with_metrics(prometheus_registry)
1146                .build();
1147
1148        let p2p_network = {
1149            let routes = anemo::Router::new()
1150                .add_rpc_service(discovery_server)
1151                .merge(state_sync_router);
1152            let routes = routes.merge(randomness_router);
1153
1154            let inbound_network_metrics =
1155                mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1156            let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1157                "sui",
1158                "outbound",
1159                prometheus_registry,
1160            );
1161
1162            let service = ServiceBuilder::new()
1163                .layer(
1164                    TraceLayer::new_for_server_errors()
1165                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1166                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1167                )
1168                .layer(CallbackLayer::new(
1169                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1170                        Arc::new(inbound_network_metrics),
1171                        config.p2p_config.excessive_message_size(),
1172                    ),
1173                ))
1174                .service(routes);
1175
1176            let outbound_layer = ServiceBuilder::new()
1177                .layer(
1178                    TraceLayer::new_for_client_and_server_errors()
1179                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1180                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1181                )
1182                .layer(CallbackLayer::new(
1183                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1184                        Arc::new(outbound_network_metrics),
1185                        config.p2p_config.excessive_message_size(),
1186                    ),
1187                ))
1188                .into_inner();
1189
1190            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1191            // Inbound requests on this network are small (signatures, queries, summaries).
1192            // Cap request frames at 1 MiB.
1193            anemo_config.max_request_frame_size = Some(1 << 20);
1194            // Responses can be larger (checkpoint contents).
1195            // Cap response frames at 128 MiB.
1196            anemo_config.max_response_frame_size = Some(128 << 20);
1197
1198            // Set a higher default value for socket send/receive buffers if not already
1199            // configured.
1200            let mut quic_config = anemo_config.quic.unwrap_or_default();
1201            if quic_config.socket_send_buffer_size.is_none() {
1202                quic_config.socket_send_buffer_size = Some(20 << 20);
1203            }
1204            if quic_config.socket_receive_buffer_size.is_none() {
1205                quic_config.socket_receive_buffer_size = Some(20 << 20);
1206            }
1207            quic_config.allow_failed_socket_buffer_size_setting = true;
1208
1209            // Set high-performance defaults for quinn transport.
1210            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1211            if quic_config.max_concurrent_bidi_streams.is_none() {
1212                quic_config.max_concurrent_bidi_streams = Some(500);
1213            }
1214            if quic_config.max_concurrent_uni_streams.is_none() {
1215                quic_config.max_concurrent_uni_streams = Some(500);
1216            }
1217            if quic_config.stream_receive_window.is_none() {
1218                quic_config.stream_receive_window = Some(100 << 20);
1219            }
1220            if quic_config.receive_window.is_none() {
1221                quic_config.receive_window = Some(200 << 20);
1222            }
1223            if quic_config.send_window.is_none() {
1224                quic_config.send_window = Some(200 << 20);
1225            }
1226            if quic_config.crypto_buffer_size.is_none() {
1227                quic_config.crypto_buffer_size = Some(1 << 20);
1228            }
1229            if quic_config.max_idle_timeout_ms.is_none() {
1230                quic_config.max_idle_timeout_ms = Some(10_000);
1231            }
1232            if quic_config.keep_alive_interval_ms.is_none() {
1233                quic_config.keep_alive_interval_ms = Some(5_000);
1234            }
1235            anemo_config.quic = Some(quic_config);
1236
1237            let server_name = format!("sui-{}", chain_identifier);
1238            let network = Network::bind(config.p2p_config.listen_address)
1239                .server_name(&server_name)
1240                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1241                .config(anemo_config)
1242                .outbound_request_layer(outbound_layer)
1243                .start(service)?;
1244            info!(
1245                server_name = server_name,
1246                "P2p network started on {}",
1247                network.local_addr()
1248            );
1249
1250            network
1251        };
1252
1253        let discovery_handle =
1254            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1255        let state_sync_handle = state_sync.start(p2p_network.clone());
1256        let randomness_handle = randomness.start(p2p_network.clone());
1257
1258        Ok(P2pComponents {
1259            p2p_network,
1260            known_peers,
1261            discovery_handle,
1262            state_sync_handle,
1263            randomness_handle,
1264            endpoint_manager,
1265        })
1266    }
1267
1268    async fn construct_validator_components(
1269        config: NodeConfig,
1270        state: Arc<AuthorityState>,
1271        committee: Arc<Committee>,
1272        epoch_store: Arc<AuthorityPerEpochStore>,
1273        checkpoint_store: Arc<CheckpointStore>,
1274        state_sync_handle: state_sync::Handle,
1275        randomness_handle: randomness::Handle,
1276        global_state_hasher: Weak<GlobalStateHasher>,
1277        backpressure_manager: Arc<BackpressureManager>,
1278        registry_service: &RegistryService,
1279        sui_node_metrics: Arc<SuiNodeMetrics>,
1280        checkpoint_metrics: Arc<CheckpointMetrics>,
1281        node_role: NodeRole,
1282    ) -> Result<ValidatorComponents> {
1283        let mut config_clone = config.clone();
1284        let consensus_config = config_clone
1285            .consensus_config
1286            .as_mut()
1287            .ok_or_else(|| anyhow!("Node is missing consensus config"))?;
1288
1289        let client = Arc::new(UpdatableConsensusClient::new());
1290        let inflight_slot_freed_notify = Arc::new(tokio::sync::Notify::new());
1291        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1292            &committee,
1293            consensus_config,
1294            state.name,
1295            &registry_service.default_registry(),
1296            client.clone(),
1297            checkpoint_store.clone(),
1298            inflight_slot_freed_notify.clone(),
1299        ));
1300
1301        let consensus_manager = Arc::new(ConsensusManager::new(
1302            &config,
1303            consensus_config,
1304            registry_service,
1305            client,
1306            node_role,
1307        ));
1308
1309        // This only gets started up once, not on every epoch. (Make call to remove every epoch.)
1310        let consensus_store_pruner = ConsensusStorePruner::new(
1311            consensus_manager.get_storage_base_path(),
1312            consensus_config.db_retention_epochs(),
1313            consensus_config.db_pruner_period(),
1314            &registry_service.default_registry(),
1315        );
1316
1317        let sui_tx_validator_metrics =
1318            SuiTxValidatorMetrics::new(&registry_service.default_registry());
1319
1320        let (validator_server_handle, admission_queue) = if node_role.is_validator() {
1321            let (handle, queue) = Self::start_grpc_validator_service(
1322                &config,
1323                state.clone(),
1324                consensus_adapter.clone(),
1325                epoch_store.clone(),
1326                &registry_service.default_registry(),
1327                inflight_slot_freed_notify,
1328            )
1329            .await?;
1330            (Some(handle), queue)
1331        } else {
1332            (None, None)
1333        };
1334
1335        // Starts an overload monitor that monitors the execution of the authority.
1336        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1337        let validator_overload_monitor_handle = if node_role.is_validator()
1338            && config
1339                .authority_overload_config
1340                .max_load_shedding_percentage
1341                > 0
1342        {
1343            let authority_state = Arc::downgrade(&state);
1344            let overload_config = config.authority_overload_config.clone();
1345            fail_point!("starting_overload_monitor");
1346            Some(spawn_monitored_task!(overload_monitor(
1347                authority_state,
1348                overload_config,
1349            )))
1350        } else {
1351            None
1352        };
1353
1354        Self::start_epoch_specific_validator_components(
1355            &config,
1356            state.clone(),
1357            consensus_adapter,
1358            checkpoint_store,
1359            epoch_store,
1360            state_sync_handle,
1361            randomness_handle,
1362            consensus_manager,
1363            consensus_store_pruner,
1364            global_state_hasher,
1365            backpressure_manager,
1366            validator_server_handle,
1367            validator_overload_monitor_handle,
1368            checkpoint_metrics,
1369            sui_node_metrics,
1370            sui_tx_validator_metrics,
1371            admission_queue,
1372            node_role,
1373        )
1374        .await
1375    }
1376
1377    async fn start_epoch_specific_validator_components(
1378        config: &NodeConfig,
1379        state: Arc<AuthorityState>,
1380        consensus_adapter: Arc<ConsensusAdapter>,
1381        checkpoint_store: Arc<CheckpointStore>,
1382        epoch_store: Arc<AuthorityPerEpochStore>,
1383        state_sync_handle: state_sync::Handle,
1384        randomness_handle: randomness::Handle,
1385        consensus_manager: Arc<ConsensusManager>,
1386        consensus_store_pruner: ConsensusStorePruner,
1387        state_hasher: Weak<GlobalStateHasher>,
1388        backpressure_manager: Arc<BackpressureManager>,
1389        validator_server_handle: Option<SpawnOnce>,
1390        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1391        checkpoint_metrics: Arc<CheckpointMetrics>,
1392        sui_node_metrics: Arc<SuiNodeMetrics>,
1393        sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1394        admission_queue: Option<AdmissionQueueContext>,
1395        node_role: NodeRole,
1396    ) -> Result<ValidatorComponents> {
1397        let checkpoint_service = Self::build_checkpoint_service(
1398            config,
1399            consensus_adapter.clone(),
1400            checkpoint_store.clone(),
1401            epoch_store.clone(),
1402            state.clone(),
1403            state_sync_handle,
1404            state_hasher,
1405            checkpoint_metrics.clone(),
1406            node_role,
1407        );
1408
1409        if node_role.runs_consensus() && epoch_store.randomness_state_enabled() {
1410            let randomness_manager = RandomnessManager::try_new(
1411                Arc::downgrade(&epoch_store),
1412                Box::new(consensus_adapter.clone()),
1413                randomness_handle,
1414                config.protocol_key_pair(),
1415            )
1416            .await;
1417            if let Some(randomness_manager) = randomness_manager {
1418                epoch_store
1419                    .set_randomness_manager(randomness_manager)
1420                    .await?;
1421            }
1422        }
1423
1424        if node_role.is_validator() {
1425            ExecutionTimeObserver::spawn(
1426                epoch_store.clone(),
1427                Box::new(consensus_adapter.clone()),
1428                config
1429                    .execution_time_observer_config
1430                    .clone()
1431                    .unwrap_or_default(),
1432            );
1433        }
1434
1435        let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1436            None,
1437            state.metrics.clone(),
1438        ));
1439
1440        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1441            state.clone(),
1442            checkpoint_service.clone(),
1443            epoch_store.clone(),
1444            consensus_adapter.clone(),
1445            throughput_calculator,
1446            backpressure_manager,
1447            config.congestion_log.clone(),
1448        );
1449
1450        info!("Starting consensus manager asynchronously");
1451
1452        // Spawn consensus startup asynchronously to avoid blocking other components
1453        tokio::spawn({
1454            let config = config.clone();
1455            let epoch_store = epoch_store.clone();
1456            let sui_tx_validator = SuiTxValidator::new(
1457                state.clone(),
1458                epoch_store.clone(),
1459                checkpoint_service.clone(),
1460                sui_tx_validator_metrics.clone(),
1461            );
1462            let consensus_manager = consensus_manager.clone();
1463            async move {
1464                consensus_manager
1465                    .start(
1466                        &config,
1467                        epoch_store,
1468                        consensus_handler_initializer,
1469                        sui_tx_validator,
1470                    )
1471                    .await;
1472            }
1473        });
1474        let replay_waiter = consensus_manager.replay_waiter();
1475
1476        info!("Spawning checkpoint service");
1477        let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1478            None
1479        } else {
1480            Some(replay_waiter)
1481        };
1482        checkpoint_service
1483            .spawn(epoch_store.clone(), replay_waiter)
1484            .await;
1485
1486        if node_role.is_validator() && epoch_store.authenticator_state_enabled() {
1487            Self::start_jwk_updater(
1488                config,
1489                sui_node_metrics,
1490                state.name,
1491                epoch_store.clone(),
1492                consensus_adapter.clone(),
1493            );
1494        }
1495
1496        if let Some(ctx) = &admission_queue {
1497            ctx.rotate_for_epoch(epoch_store);
1498        }
1499
1500        Ok(ValidatorComponents {
1501            validator_server_handle,
1502            validator_overload_monitor_handle,
1503            consensus_manager,
1504            consensus_store_pruner,
1505            consensus_adapter,
1506            checkpoint_metrics,
1507            sui_tx_validator_metrics,
1508            admission_queue,
1509        })
1510    }
1511
1512    fn build_checkpoint_service(
1513        config: &NodeConfig,
1514        consensus_adapter: Arc<ConsensusAdapter>,
1515        checkpoint_store: Arc<CheckpointStore>,
1516        epoch_store: Arc<AuthorityPerEpochStore>,
1517        state: Arc<AuthorityState>,
1518        state_sync_handle: state_sync::Handle,
1519        state_hasher: Weak<GlobalStateHasher>,
1520        checkpoint_metrics: Arc<CheckpointMetrics>,
1521        node_role: NodeRole,
1522    ) -> Arc<CheckpointService> {
1523        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1524        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1525
1526        debug!(
1527            "Starting checkpoint service with epoch start timestamp {}
1528            and epoch duration {}",
1529            epoch_start_timestamp_ms, epoch_duration_ms
1530        );
1531
1532        let checkpoint_output: Box<dyn CheckpointOutput> = if node_role.is_validator() {
1533            Box::new(SubmitCheckpointToConsensus {
1534                sender: consensus_adapter,
1535                signer: state.secret.clone(),
1536                authority: config.protocol_public_key(),
1537                next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1538                    .checked_add(epoch_duration_ms)
1539                    .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1540                metrics: checkpoint_metrics.clone(),
1541            })
1542        } else {
1543            LogCheckpointOutput::boxed()
1544        };
1545
1546        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1547        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1548        let max_checkpoint_size_bytes =
1549            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1550
1551        CheckpointService::build(
1552            state.clone(),
1553            checkpoint_store,
1554            epoch_store,
1555            state.get_transaction_cache_reader().clone(),
1556            state_hasher,
1557            checkpoint_output,
1558            Box::new(certified_checkpoint_output),
1559            checkpoint_metrics,
1560            max_tx_per_checkpoint,
1561            max_checkpoint_size_bytes,
1562        )
1563    }
1564
1565    fn construct_consensus_adapter(
1566        committee: &Committee,
1567        consensus_config: &ConsensusConfig,
1568        authority: AuthorityName,
1569        prometheus_registry: &Registry,
1570        consensus_client: Arc<dyn ConsensusClient>,
1571        checkpoint_store: Arc<CheckpointStore>,
1572        inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1573    ) -> ConsensusAdapter {
1574        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1575        // The consensus adapter allows the authority to send user certificates through consensus.
1576
1577        ConsensusAdapter::new(
1578            consensus_client,
1579            checkpoint_store,
1580            authority,
1581            consensus_config.max_pending_transactions(),
1582            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1583            ca_metrics,
1584            inflight_slot_freed_notify,
1585        )
1586    }
1587
1588    async fn start_grpc_validator_service(
1589        config: &NodeConfig,
1590        state: Arc<AuthorityState>,
1591        consensus_adapter: Arc<ConsensusAdapter>,
1592        epoch_store: Arc<AuthorityPerEpochStore>,
1593        prometheus_registry: &Registry,
1594        inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1595    ) -> Result<(SpawnOnce, Option<AdmissionQueueContext>)> {
1596        let overload_config = &config.authority_overload_config;
1597        let admission_queue = overload_config.admission_queue_enabled.then(|| {
1598            let manager = Arc::new(AdmissionQueueManager::new(
1599                consensus_adapter.clone(),
1600                Arc::new(AdmissionQueueMetrics::new(prometheus_registry)),
1601                overload_config.admission_queue_capacity_fraction,
1602                overload_config.admission_queue_bypass_fraction,
1603                overload_config.admission_queue_failover_timeout,
1604                inflight_slot_freed_notify,
1605            ));
1606            AdmissionQueueContext::spawn(manager, epoch_store)
1607        });
1608        let validator_service = ValidatorService::new(
1609            state.clone(),
1610            consensus_adapter,
1611            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1612            config.policy_config.clone().map(|p| p.client_id_source),
1613            admission_queue.clone(),
1614        );
1615
1616        let mut server_conf = mysten_network::config::Config::new();
1617        server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1618        server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1619        server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1620        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1621        server_conf.load_shed = config.grpc_load_shed;
1622        let mut server_builder =
1623            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1624
1625        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1626
1627        let tls_config = sui_tls::create_rustls_server_config(
1628            config.network_key_pair().copy().private(),
1629            SUI_TLS_SERVER_NAME.to_string(),
1630        );
1631
1632        let network_address = config.network_address().clone();
1633
1634        let (ready_tx, ready_rx) = oneshot::channel();
1635
1636        let spawn_once = SpawnOnce::new(ready_rx, async move {
1637            let server = server_builder
1638                .bind(&network_address, Some(tls_config))
1639                .await
1640                .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1641            let local_addr = server.local_addr();
1642            info!("Listening to traffic on {local_addr}");
1643            ready_tx.send(()).unwrap();
1644            if let Err(err) = server.serve().await {
1645                info!("Server stopped: {err}");
1646            }
1647            info!("Server stopped");
1648        });
1649        Ok((spawn_once, admission_queue))
1650    }
1651
1652    pub fn state(&self) -> Arc<AuthorityState> {
1653        self.state.clone()
1654    }
1655
1656    pub fn node_role(&self) -> NodeRole {
1657        self.state.load_epoch_store_one_call_per_task().node_role()
1658    }
1659
1660    // Only used for testing because of how epoch store is loaded.
1661    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1662        self.state.reference_gas_price_for_testing()
1663    }
1664
1665    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1666        self.state.committee_store().clone()
1667    }
1668
1669    /*
1670    pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1671        self.state.db()
1672    }
1673    */
1674
1675    /// Clone an AuthorityAggregator currently used in this node, if the node is a fullnode.
1676    /// After reconfig, Transaction Driver builds a new AuthorityAggregator. The caller
1677    /// of this function will mostly likely want to call this again
1678    /// to get a fresh one.
1679    pub fn clone_authority_aggregator(
1680        &self,
1681    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1682        self.transaction_orchestrator
1683            .as_ref()
1684            .map(|to| to.clone_authority_aggregator())
1685    }
1686
1687    pub fn transaction_orchestrator(
1688        &self,
1689    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1690        self.transaction_orchestrator.clone()
1691    }
1692
1693    /// This function awaits the completion of checkpoint execution of the current epoch,
1694    /// after which it initiates reconfiguration of the entire system.
1695    pub async fn monitor_reconfiguration(
1696        self: Arc<Self>,
1697        mut epoch_store: Arc<AuthorityPerEpochStore>,
1698    ) -> Result<()> {
1699        let checkpoint_executor_metrics =
1700            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1701
1702        loop {
1703            let mut hasher_guard = self.global_state_hasher.lock().await;
1704            let hasher = hasher_guard.take().unwrap();
1705            info!(
1706                "Creating checkpoint executor for epoch {}",
1707                epoch_store.epoch()
1708            );
1709            let checkpoint_executor = CheckpointExecutor::new(
1710                epoch_store.clone(),
1711                self.checkpoint_store.clone(),
1712                self.state.clone(),
1713                hasher.clone(),
1714                self.backpressure_manager.clone(),
1715                self.config.checkpoint_executor_config.clone(),
1716                checkpoint_executor_metrics.clone(),
1717                self.subscription_service_checkpoint_sender.clone(),
1718            );
1719
1720            let run_with_range = self.config.run_with_range;
1721
1722            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1723
1724            // Update the current protocol version metric.
1725            self.metrics
1726                .current_protocol_version
1727                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1728
1729            // Advertise capabilities to committee, if we are a validator.
1730            // FullNodes that state sync via consensus will also have validator components, by they are not supposed to submit any capabilities.
1731            if let Some(components) = &*self.validator_components.lock().await
1732                && cur_epoch_store.is_validator()
1733            {
1734                // TODO: without this sleep, the consensus message is not delivered reliably.
1735                tokio::time::sleep(Duration::from_millis(1)).await;
1736
1737                let config = cur_epoch_store.protocol_config();
1738                let mut supported_protocol_versions = self
1739                    .config
1740                    .supported_protocol_versions
1741                    .expect("Supported versions should be populated")
1742                    // no need to send digests of versions less than the current version
1743                    .truncate_below(config.version);
1744
1745                while supported_protocol_versions.max > config.version {
1746                    let proposed_protocol_config = ProtocolConfig::get_for_version(
1747                        supported_protocol_versions.max,
1748                        cur_epoch_store.get_chain(),
1749                    );
1750
1751                    if proposed_protocol_config.enable_accumulators()
1752                        && !epoch_store.accumulator_root_exists()
1753                    {
1754                        error!(
1755                            "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1756                            supported_protocol_versions.max
1757                        );
1758                        supported_protocol_versions.max = supported_protocol_versions.max.prev();
1759                    } else {
1760                        break;
1761                    }
1762                }
1763
1764                let binary_config = config.binary_config(None);
1765                let transaction = ConsensusTransaction::new_capability_notification_v2(
1766                    AuthorityCapabilitiesV2::new(
1767                        self.state.name,
1768                        cur_epoch_store.get_chain_identifier().chain(),
1769                        supported_protocol_versions,
1770                        self.state
1771                            .get_available_system_packages(&binary_config)
1772                            .await,
1773                    ),
1774                );
1775                info!(?transaction, "submitting capabilities to consensus");
1776                components.consensus_adapter.submit(
1777                    transaction,
1778                    None,
1779                    &cur_epoch_store,
1780                    None,
1781                    None,
1782                )?;
1783            }
1784
1785            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1786
1787            if stop_condition == StopReason::RunWithRangeCondition {
1788                SuiNode::shutdown(&self).await;
1789                self.shutdown_channel_tx
1790                    .send(run_with_range)
1791                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1792                return Ok(());
1793            }
1794
1795            // Safe to call because we are in the middle of reconfiguration.
1796            let latest_system_state = self
1797                .state
1798                .get_object_cache_reader()
1799                .get_sui_system_state_object_unsafe()
1800                .expect("Read Sui System State object cannot fail");
1801
1802            #[cfg(msim)]
1803            if !self
1804                .sim_state
1805                .sim_safe_mode_expected
1806                .load(Ordering::Relaxed)
1807            {
1808                debug_assert!(!latest_system_state.safe_mode());
1809            }
1810
1811            #[cfg(not(msim))]
1812            debug_assert!(!latest_system_state.safe_mode());
1813
1814            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1815                && self.state.is_fullnode(&cur_epoch_store)
1816            {
1817                warn!(
1818                    "Failed to send end of epoch notification to subscriber: {:?}",
1819                    err
1820                );
1821            }
1822
1823            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1824            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1825
1826            self.auth_agg.store(Arc::new(
1827                self.auth_agg
1828                    .load()
1829                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1830            ));
1831
1832            let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1833            let next_epoch = next_epoch_committee.epoch();
1834            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1835
1836            info!(
1837                next_epoch,
1838                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1839            );
1840
1841            fail_point_async!("reconfig_delay");
1842
1843            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1844
1845            update_peer_addresses(&self.config, &self.endpoint_manager, &new_epoch_start_state);
1846
1847            let mut validator_components_lock_guard = self.validator_components.lock().await;
1848
1849            // The following code handles 4 different cases, depending on whether the node
1850            // was a validator in the previous epoch, and whether the node is a validator
1851            // in the new epoch.
1852            let new_epoch_store = self
1853                .reconfigure_state(
1854                    &self.state,
1855                    &cur_epoch_store,
1856                    next_epoch_committee.clone(),
1857                    new_epoch_start_state,
1858                    hasher.clone(),
1859                )
1860                .await;
1861
1862            let new_role = new_epoch_store.node_role();
1863
1864            let new_validator_components = if let Some(ValidatorComponents {
1865                validator_server_handle,
1866                validator_overload_monitor_handle,
1867                consensus_manager,
1868                consensus_store_pruner,
1869                consensus_adapter,
1870                checkpoint_metrics,
1871                sui_tx_validator_metrics,
1872                admission_queue,
1873            }) = validator_components_lock_guard.take()
1874            {
1875                info!("Reconfiguring node (was running consensus).");
1876
1877                consensus_manager.shutdown().await;
1878                info!("Consensus has shut down.");
1879
1880                info!("Epoch store finished reconfiguration.");
1881
1882                // No other components should be holding a strong reference to state hasher
1883                // at this point. Confirm here before we swap in the new hasher.
1884                let global_state_hasher_metrics = Arc::into_inner(hasher)
1885                    .expect("Object state hasher should have no other references at this point")
1886                    .metrics();
1887                let new_hasher = Arc::new(GlobalStateHasher::new(
1888                    self.state.get_global_state_hash_store().clone(),
1889                    global_state_hasher_metrics,
1890                ));
1891                let weak_hasher = Arc::downgrade(&new_hasher);
1892                *hasher_guard = Some(new_hasher);
1893
1894                consensus_store_pruner.prune(next_epoch).await;
1895
1896                if new_role.runs_consensus() {
1897                    info!("Restarting consensus as {new_role}");
1898                    Some(
1899                        Self::start_epoch_specific_validator_components(
1900                            &self.config,
1901                            self.state.clone(),
1902                            consensus_adapter,
1903                            self.checkpoint_store.clone(),
1904                            new_epoch_store.clone(),
1905                            self.state_sync_handle.clone(),
1906                            self.randomness_handle.clone(),
1907                            consensus_manager,
1908                            consensus_store_pruner,
1909                            weak_hasher,
1910                            self.backpressure_manager.clone(),
1911                            validator_server_handle,
1912                            validator_overload_monitor_handle,
1913                            checkpoint_metrics,
1914                            self.metrics.clone(),
1915                            sui_tx_validator_metrics,
1916                            admission_queue,
1917                            new_role,
1918                        )
1919                        .await?,
1920                    )
1921                } else {
1922                    info!(
1923                        "This node has new role {new_role} and no longer runs consensus after reconfiguration"
1924                    );
1925                    None
1926                }
1927            } else {
1928                // No other components should be holding a strong reference to state hasher
1929                // at this point. Confirm here before we swap in the new hasher.
1930                let global_state_hasher_metrics = Arc::into_inner(hasher)
1931                    .expect("Object state hasher should have no other references at this point")
1932                    .metrics();
1933                let new_hasher = Arc::new(GlobalStateHasher::new(
1934                    self.state.get_global_state_hash_store().clone(),
1935                    global_state_hasher_metrics,
1936                ));
1937                let weak_hasher = Arc::downgrade(&new_hasher);
1938                *hasher_guard = Some(new_hasher);
1939
1940                if new_role.runs_consensus() {
1941                    info!("Promoting node to {new_role}, starting consensus components");
1942
1943                    let mut components = Self::construct_validator_components(
1944                        self.config.clone(),
1945                        self.state.clone(),
1946                        Arc::new(next_epoch_committee.clone()),
1947                        new_epoch_store.clone(),
1948                        self.checkpoint_store.clone(),
1949                        self.state_sync_handle.clone(),
1950                        self.randomness_handle.clone(),
1951                        weak_hasher,
1952                        self.backpressure_manager.clone(),
1953                        &self.registry_service,
1954                        self.metrics.clone(),
1955                        self.checkpoint_metrics.clone(),
1956                        new_role,
1957                    )
1958                    .await?;
1959
1960                    if new_role.is_validator() {
1961                        components.validator_server_handle = Some(
1962                            components
1963                                .validator_server_handle
1964                                .take()
1965                                .unwrap()
1966                                .start()
1967                                .await,
1968                        );
1969
1970                        self.endpoint_manager
1971                            .set_consensus_address_updater(components.consensus_manager.clone());
1972                    }
1973
1974                    Some(components)
1975                } else {
1976                    None
1977                }
1978            };
1979            *validator_components_lock_guard = new_validator_components;
1980
1981            // Force releasing current epoch store DB handle, because the
1982            // Arc<AuthorityPerEpochStore> may linger.
1983            cur_epoch_store.release_db_handles();
1984
1985            if cfg!(msim)
1986                && !matches!(
1987                    self.config
1988                        .authority_store_pruning_config
1989                        .num_epochs_to_retain_for_checkpoints(),
1990                    None | Some(u64::MAX) | Some(0)
1991                )
1992            {
1993                self.state
1994                    .prune_checkpoints_for_eligible_epochs_for_testing(
1995                        self.config.clone(),
1996                        sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1997                    )
1998                    .await?;
1999            }
2000
2001            epoch_store = new_epoch_store;
2002            info!("Reconfiguration finished");
2003        }
2004    }
2005
2006    async fn shutdown(&self) {
2007        if let Some(validator_components) = &*self.validator_components.lock().await {
2008            validator_components.consensus_manager.shutdown().await;
2009        }
2010    }
2011
2012    async fn reconfigure_state(
2013        &self,
2014        state: &Arc<AuthorityState>,
2015        cur_epoch_store: &AuthorityPerEpochStore,
2016        next_epoch_committee: Committee,
2017        next_epoch_start_system_state: EpochStartSystemState,
2018        global_state_hasher: Arc<GlobalStateHasher>,
2019    ) -> Arc<AuthorityPerEpochStore> {
2020        let next_epoch = next_epoch_committee.epoch();
2021
2022        let last_checkpoint = self
2023            .checkpoint_store
2024            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2025            .expect("Error loading last checkpoint for current epoch")
2026            .expect("Could not load last checkpoint for current epoch");
2027
2028        let last_checkpoint_seq = *last_checkpoint.sequence_number();
2029
2030        assert_eq!(
2031            Some(last_checkpoint_seq),
2032            self.checkpoint_store
2033                .get_highest_executed_checkpoint_seq_number()
2034                .expect("Error loading highest executed checkpoint sequence number")
2035        );
2036
2037        let epoch_start_configuration = EpochStartConfiguration::new(
2038            next_epoch_start_system_state,
2039            *last_checkpoint.digest(),
2040            state.get_object_store().as_ref(),
2041            EpochFlag::default_flags_for_new_epoch(&state.config),
2042        )
2043        .expect("EpochStartConfiguration construction cannot fail");
2044
2045        let new_epoch_store = self
2046            .state
2047            .reconfigure(
2048                cur_epoch_store,
2049                self.config.supported_protocol_versions.unwrap(),
2050                next_epoch_committee,
2051                epoch_start_configuration,
2052                global_state_hasher,
2053                &self.config.expensive_safety_check_config,
2054                last_checkpoint_seq,
2055            )
2056            .await
2057            .expect("Reconfigure authority state cannot fail");
2058        info!(next_epoch, "Node State has been reconfigured");
2059        assert_eq!(next_epoch, new_epoch_store.epoch());
2060        self.state.get_reconfig_api().update_epoch_flags_metrics(
2061            cur_epoch_store.epoch_start_config().flags(),
2062            new_epoch_store.epoch_start_config().flags(),
2063        );
2064
2065        new_epoch_store
2066    }
2067
2068    pub fn get_config(&self) -> &NodeConfig {
2069        &self.config
2070    }
2071
2072    pub fn randomness_handle(&self) -> randomness::Handle {
2073        self.randomness_handle.clone()
2074    }
2075
2076    pub fn state_sync_handle(&self) -> state_sync::Handle {
2077        self.state_sync_handle.clone()
2078    }
2079
2080    pub fn endpoint_manager(&self) -> &EndpointManager {
2081        &self.endpoint_manager
2082    }
2083
2084    /// Get a short prefix of a digest for metric labels
2085    fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2086        let digest_str = digest.to_string();
2087        if digest_str.len() >= 8 {
2088            digest_str[0..8].to_string()
2089        } else {
2090            digest_str
2091        }
2092    }
2093
2094    /// Check for previously detected forks and handle them appropriately.
2095    /// For validators with fork recovery config, clear the fork if it matches the recovery config.
2096    /// For all other cases, block node startup if a fork is detected.
2097    async fn check_and_recover_forks(
2098        checkpoint_store: &CheckpointStore,
2099        checkpoint_metrics: &CheckpointMetrics,
2100        fork_recovery: Option<&ForkRecoveryConfig>,
2101    ) -> Result<()> {
2102        // Try to recover from forks if recovery config is provided
2103        if let Some(recovery) = fork_recovery {
2104            Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2105            Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2106        }
2107
2108        if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2109            .get_checkpoint_fork_detected()
2110            .map_err(|e| {
2111                error!("Failed to check for checkpoint fork: {:?}", e);
2112                e
2113            })?
2114        {
2115            Self::handle_checkpoint_fork(
2116                checkpoint_seq,
2117                checkpoint_digest,
2118                checkpoint_metrics,
2119                fork_recovery,
2120            )
2121            .await?;
2122        }
2123        if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2124            .get_transaction_fork_detected()
2125            .map_err(|e| {
2126                error!("Failed to check for transaction fork: {:?}", e);
2127                e
2128            })?
2129        {
2130            Self::handle_transaction_fork(
2131                tx_digest,
2132                expected_effects,
2133                actual_effects,
2134                checkpoint_metrics,
2135                fork_recovery,
2136            )
2137            .await?;
2138        }
2139
2140        Ok(())
2141    }
2142
2143    fn try_recover_checkpoint_fork(
2144        checkpoint_store: &CheckpointStore,
2145        recovery: &ForkRecoveryConfig,
2146    ) -> Result<()> {
2147        // If configured overrides include a checkpoint whose locally computed digest mismatches,
2148        // clear locally computed checkpoints from that sequence (inclusive).
2149        for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2150            let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2151                anyhow::bail!(
2152                    "Invalid checkpoint digest override for seq {}: {}",
2153                    seq,
2154                    expected_digest_str
2155                );
2156            };
2157
2158            if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2159                let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2160                if local_digest != expected_digest {
2161                    info!(
2162                        seq,
2163                        local = %Self::get_digest_prefix(local_digest),
2164                        expected = %Self::get_digest_prefix(expected_digest),
2165                        "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2166                        seq
2167                    );
2168                    checkpoint_store
2169                        .clear_locally_computed_checkpoints_from(*seq)
2170                        .context(
2171                            "Failed to clear locally computed checkpoints from override seq",
2172                        )?;
2173                }
2174            }
2175        }
2176
2177        if let Some((checkpoint_seq, checkpoint_digest)) =
2178            checkpoint_store.get_checkpoint_fork_detected()?
2179            && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2180        {
2181            info!(
2182                "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2183                checkpoint_seq, checkpoint_digest
2184            );
2185            checkpoint_store
2186                .clear_checkpoint_fork_detected()
2187                .expect("Failed to clear checkpoint fork detected marker");
2188        }
2189        Ok(())
2190    }
2191
2192    fn try_recover_transaction_fork(
2193        checkpoint_store: &CheckpointStore,
2194        recovery: &ForkRecoveryConfig,
2195    ) -> Result<()> {
2196        if recovery.transaction_overrides.is_empty() {
2197            return Ok(());
2198        }
2199
2200        if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2201            && recovery
2202                .transaction_overrides
2203                .contains_key(&tx_digest.to_string())
2204        {
2205            info!(
2206                "Fork recovery enabled: clearing transaction fork for tx {:?}",
2207                tx_digest
2208            );
2209            checkpoint_store
2210                .clear_transaction_fork_detected()
2211                .expect("Failed to clear transaction fork detected marker");
2212        }
2213        Ok(())
2214    }
2215
2216    fn get_current_timestamp() -> u64 {
2217        std::time::SystemTime::now()
2218            .duration_since(std::time::SystemTime::UNIX_EPOCH)
2219            .unwrap()
2220            .as_secs()
2221    }
2222
2223    async fn handle_checkpoint_fork(
2224        checkpoint_seq: u64,
2225        checkpoint_digest: CheckpointDigest,
2226        checkpoint_metrics: &CheckpointMetrics,
2227        fork_recovery: Option<&ForkRecoveryConfig>,
2228    ) -> Result<()> {
2229        checkpoint_metrics
2230            .checkpoint_fork_crash_mode
2231            .with_label_values(&[
2232                &checkpoint_seq.to_string(),
2233                &Self::get_digest_prefix(checkpoint_digest),
2234                &Self::get_current_timestamp().to_string(),
2235            ])
2236            .set(1);
2237
2238        let behavior = fork_recovery
2239            .map(|fr| fr.fork_crash_behavior)
2240            .unwrap_or_default();
2241
2242        match behavior {
2243            ForkCrashBehavior::AwaitForkRecovery => {
2244                error!(
2245                    checkpoint_seq = checkpoint_seq,
2246                    checkpoint_digest = ?checkpoint_digest,
2247                    "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2248                );
2249                futures::future::pending::<()>().await;
2250                unreachable!("pending() should never return");
2251            }
2252            ForkCrashBehavior::ReturnError => {
2253                error!(
2254                    checkpoint_seq = checkpoint_seq,
2255                    checkpoint_digest = ?checkpoint_digest,
2256                    "Checkpoint fork detected! Returning error."
2257                );
2258                Err(anyhow::anyhow!(
2259                    "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2260                    checkpoint_seq,
2261                    checkpoint_digest
2262                ))
2263            }
2264        }
2265    }
2266
2267    async fn handle_transaction_fork(
2268        tx_digest: TransactionDigest,
2269        expected_effects_digest: TransactionEffectsDigest,
2270        actual_effects_digest: TransactionEffectsDigest,
2271        checkpoint_metrics: &CheckpointMetrics,
2272        fork_recovery: Option<&ForkRecoveryConfig>,
2273    ) -> Result<()> {
2274        checkpoint_metrics
2275            .transaction_fork_crash_mode
2276            .with_label_values(&[
2277                &Self::get_digest_prefix(tx_digest),
2278                &Self::get_digest_prefix(expected_effects_digest),
2279                &Self::get_digest_prefix(actual_effects_digest),
2280                &Self::get_current_timestamp().to_string(),
2281            ])
2282            .set(1);
2283
2284        let behavior = fork_recovery
2285            .map(|fr| fr.fork_crash_behavior)
2286            .unwrap_or_default();
2287
2288        match behavior {
2289            ForkCrashBehavior::AwaitForkRecovery => {
2290                error!(
2291                    tx_digest = ?tx_digest,
2292                    expected_effects_digest = ?expected_effects_digest,
2293                    actual_effects_digest = ?actual_effects_digest,
2294                    "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2295                );
2296                futures::future::pending::<()>().await;
2297                unreachable!("pending() should never return");
2298            }
2299            ForkCrashBehavior::ReturnError => {
2300                error!(
2301                    tx_digest = ?tx_digest,
2302                    expected_effects_digest = ?expected_effects_digest,
2303                    actual_effects_digest = ?actual_effects_digest,
2304                    "Transaction fork detected! Returning error."
2305                );
2306                Err(anyhow::anyhow!(
2307                    "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2308                    tx_digest,
2309                    expected_effects_digest,
2310                    actual_effects_digest
2311                ))
2312            }
2313        }
2314    }
2315}
2316
2317#[cfg(not(msim))]
2318impl SuiNode {
2319    async fn fetch_jwks(
2320        _authority: AuthorityName,
2321        provider: &OIDCProvider,
2322    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2323        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2324        use sui_types::error::SuiErrorKind;
2325        let client = reqwest::Client::new();
2326        fetch_jwks(provider, &client, true)
2327            .await
2328            .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2329    }
2330}
2331
2332#[cfg(msim)]
2333impl SuiNode {
2334    pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2335        self.sim_state.sim_node.id()
2336    }
2337
2338    pub fn set_safe_mode_expected(&self, new_value: bool) {
2339        info!("Setting safe mode expected to {}", new_value);
2340        self.sim_state
2341            .sim_safe_mode_expected
2342            .store(new_value, Ordering::Relaxed);
2343    }
2344
2345    #[allow(unused_variables)]
2346    async fn fetch_jwks(
2347        authority: AuthorityName,
2348        provider: &OIDCProvider,
2349    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2350        get_jwk_injector()(authority, provider)
2351    }
2352}
2353
2354enum SpawnOnce {
2355    // Mutex is only needed to make SpawnOnce Send
2356    Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2357    #[allow(unused)]
2358    Started(JoinHandle<()>),
2359}
2360
2361impl SpawnOnce {
2362    pub fn new(
2363        ready_rx: oneshot::Receiver<()>,
2364        future: impl Future<Output = ()> + Send + 'static,
2365    ) -> Self {
2366        Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2367    }
2368
2369    pub async fn start(self) -> Self {
2370        match self {
2371            Self::Unstarted(ready_rx, future) => {
2372                let future = future.into_inner();
2373                let handle = tokio::spawn(future);
2374                ready_rx.await.unwrap();
2375                Self::Started(handle)
2376            }
2377            Self::Started(_) => self,
2378        }
2379    }
2380}
2381
2382/// Updates trusted peer addresses in the p2p network.
2383fn update_peer_addresses(
2384    config: &NodeConfig,
2385    endpoint_manager: &EndpointManager,
2386    epoch_start_state: &EpochStartSystemState,
2387) {
2388    for (peer_id, address) in
2389        epoch_start_state.get_validator_as_p2p_peers(config.protocol_public_key())
2390    {
2391        endpoint_manager
2392            .update_endpoint(
2393                EndpointId::P2p(peer_id),
2394                AddressSource::Chain,
2395                vec![address],
2396            )
2397            .expect("Updating peer addresses should not fail");
2398    }
2399}
2400
2401fn build_kv_store(
2402    state: &Arc<AuthorityState>,
2403    config: &NodeConfig,
2404    registry: &Registry,
2405) -> Result<Arc<TransactionKeyValueStore>> {
2406    let metrics = KeyValueStoreMetrics::new(registry);
2407    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2408
2409    let base_url = &config.transaction_kv_store_read_config.base_url;
2410
2411    if base_url.is_empty() {
2412        info!("no http kv store url provided, using local db only");
2413        return Ok(Arc::new(db_store));
2414    }
2415
2416    let base_url: url::Url = base_url.parse().tap_err(|e| {
2417        error!(
2418            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2419            base_url, e
2420        )
2421    })?;
2422
2423    let network_str = match state.get_chain_identifier().chain() {
2424        Chain::Mainnet => "/mainnet",
2425        _ => {
2426            info!("using local db only for kv store");
2427            return Ok(Arc::new(db_store));
2428        }
2429    };
2430
2431    let base_url = base_url.join(network_str)?.to_string();
2432    let http_store = HttpKVStore::new_kv(
2433        &base_url,
2434        config.transaction_kv_store_read_config.cache_size,
2435        metrics.clone(),
2436    )?;
2437    info!("using local key-value store with fallback to http key-value store");
2438    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2439        db_store,
2440        http_store,
2441        metrics,
2442        "json_rpc_fallback",
2443    )))
2444}
2445
2446async fn build_http_servers(
2447    state: Arc<AuthorityState>,
2448    store: RocksDbStore,
2449    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2450    config: &NodeConfig,
2451    prometheus_registry: &Registry,
2452    server_version: ServerVersion,
2453    node_role: NodeRole,
2454) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2455    // Validators do not expose these APIs
2456    if !node_role.should_run_rpc_servers() {
2457        return Ok((HttpServers::default(), None));
2458    }
2459
2460    info!("starting rpc service with config: {:?}", config.rpc);
2461
2462    let mut router = axum::Router::new();
2463
2464    let json_rpc_router = {
2465        let traffic_controller = state.traffic_controller.clone();
2466        let mut server = JsonRpcServerBuilder::new(
2467            env!("CARGO_PKG_VERSION"),
2468            prometheus_registry,
2469            traffic_controller,
2470            config.policy_config.clone(),
2471        );
2472
2473        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2474
2475        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2476        server.register_module(ReadApi::new(
2477            state.clone(),
2478            kv_store.clone(),
2479            metrics.clone(),
2480        ))?;
2481        server.register_module(CoinReadApi::new(
2482            state.clone(),
2483            kv_store.clone(),
2484            metrics.clone(),
2485        ))?;
2486
2487        // if run_with_range is enabled we want to prevent any transactions
2488        // run_with_range = None is normal operating conditions
2489        if config.run_with_range.is_none() {
2490            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2491        }
2492        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2493        server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2494
2495        if let Some(transaction_orchestrator) = transaction_orchestrator {
2496            server.register_module(TransactionExecutionApi::new(
2497                state.clone(),
2498                transaction_orchestrator.clone(),
2499                metrics.clone(),
2500            ))?;
2501        }
2502
2503        let name_service_config =
2504            if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2505                config.name_service_package_address,
2506                config.name_service_registry_id,
2507                config.name_service_reverse_registry_id,
2508            ) {
2509                sui_name_service::NameServiceConfig::new(
2510                    package_address,
2511                    registry_id,
2512                    reverse_registry_id,
2513                )
2514            } else {
2515                match state.get_chain_identifier().chain() {
2516                    Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2517                    Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2518                    Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2519                }
2520            };
2521
2522        server.register_module(IndexerApi::new(
2523            state.clone(),
2524            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2525            kv_store,
2526            name_service_config,
2527            metrics,
2528            config.indexer_max_subscriptions,
2529        ))?;
2530        server.register_module(MoveUtils::new(state.clone()))?;
2531
2532        let server_type = config.jsonrpc_server_type();
2533
2534        server.to_router(server_type).await?
2535    };
2536
2537    router = router.merge(json_rpc_router);
2538
2539    let (subscription_service_checkpoint_sender, subscription_service_handle) =
2540        SubscriptionService::build(prometheus_registry);
2541    let rpc_router = {
2542        let mut rpc_service =
2543            sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2544        rpc_service.with_server_version(server_version);
2545
2546        if let Some(config) = config.rpc.clone() {
2547            rpc_service.with_config(config);
2548        }
2549
2550        rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2551        rpc_service.with_subscription_service(subscription_service_handle);
2552
2553        if let Some(transaction_orchestrator) = transaction_orchestrator {
2554            rpc_service.with_executor(transaction_orchestrator.clone())
2555        }
2556
2557        rpc_service.into_router().await
2558    };
2559
2560    let layers = ServiceBuilder::new()
2561        .map_request(|mut request: axum::http::Request<_>| {
2562            if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2563                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2564                request.extensions_mut().insert(axum_connect_info);
2565            }
2566            request
2567        })
2568        .layer(axum::middleware::from_fn(server_timing_middleware))
2569        // Setup a permissive CORS policy
2570        .layer(
2571            tower_http::cors::CorsLayer::new()
2572                .allow_methods([http::Method::GET, http::Method::POST])
2573                .allow_origin(tower_http::cors::Any)
2574                .allow_headers(tower_http::cors::Any)
2575                .expose_headers(tower_http::cors::Any),
2576        );
2577
2578    router = router.merge(rpc_router).layer(layers);
2579
2580    let https = if let Some((tls_config, https_address)) = config
2581        .rpc()
2582        .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2583    {
2584        let https = sui_http::Builder::new()
2585            .tls_single_cert(tls_config.cert(), tls_config.key())
2586            .and_then(|builder| builder.serve(https_address, router.clone()))
2587            .map_err(|e| anyhow::anyhow!(e))?;
2588
2589        info!(
2590            https_address =? https.local_addr(),
2591            "HTTPS rpc server listening on {}",
2592            https.local_addr()
2593        );
2594
2595        Some(https)
2596    } else {
2597        None
2598    };
2599
2600    let http = sui_http::Builder::new()
2601        .serve(&config.json_rpc_address, router)
2602        .map_err(|e| anyhow::anyhow!(e))?;
2603
2604    info!(
2605        http_address =? http.local_addr(),
2606        "HTTP rpc server listening on {}",
2607        http.local_addr()
2608    );
2609
2610    Ok((
2611        HttpServers {
2612            http: Some(http),
2613            https,
2614        },
2615        Some(subscription_service_checkpoint_sender),
2616    ))
2617}
2618
2619#[cfg(not(test))]
2620fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2621    protocol_config.max_transactions_per_checkpoint() as usize
2622}
2623
2624#[cfg(test)]
2625fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2626    2
2627}
2628
2629#[derive(Default)]
2630struct HttpServers {
2631    #[allow(unused)]
2632    http: Option<sui_http::ServerHandle>,
2633    #[allow(unused)]
2634    https: Option<sui_http::ServerHandle>,
2635}
2636
2637#[cfg(test)]
2638mod tests {
2639    use super::*;
2640    use prometheus::Registry;
2641    use std::collections::BTreeMap;
2642    use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2643    use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2644    use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2645
2646    #[tokio::test]
2647    async fn test_fork_error_and_recovery_both_paths() {
2648        let checkpoint_store = CheckpointStore::new_for_tests();
2649        let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2650
2651        // ---------- Checkpoint fork path ----------
2652        let seq_num = 42;
2653        let digest = CheckpointDigest::random();
2654        checkpoint_store
2655            .record_checkpoint_fork_detected(seq_num, digest)
2656            .unwrap();
2657
2658        let fork_recovery = ForkRecoveryConfig {
2659            transaction_overrides: Default::default(),
2660            checkpoint_overrides: Default::default(),
2661            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2662        };
2663
2664        let r = SuiNode::check_and_recover_forks(
2665            &checkpoint_store,
2666            &checkpoint_metrics,
2667            Some(&fork_recovery),
2668        )
2669        .await;
2670        assert!(r.is_err());
2671        assert!(
2672            r.unwrap_err()
2673                .to_string()
2674                .contains("Checkpoint fork detected")
2675        );
2676
2677        let mut checkpoint_overrides = BTreeMap::new();
2678        checkpoint_overrides.insert(seq_num, digest.to_string());
2679        let fork_recovery_with_override = ForkRecoveryConfig {
2680            transaction_overrides: Default::default(),
2681            checkpoint_overrides,
2682            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2683        };
2684        let r = SuiNode::check_and_recover_forks(
2685            &checkpoint_store,
2686            &checkpoint_metrics,
2687            Some(&fork_recovery_with_override),
2688        )
2689        .await;
2690        assert!(r.is_ok());
2691        assert!(
2692            checkpoint_store
2693                .get_checkpoint_fork_detected()
2694                .unwrap()
2695                .is_none()
2696        );
2697
2698        // ---------- Transaction fork path ----------
2699        let tx_digest = TransactionDigest::random();
2700        let expected_effects = TransactionEffectsDigest::random();
2701        let actual_effects = TransactionEffectsDigest::random();
2702        checkpoint_store
2703            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2704            .unwrap();
2705
2706        let fork_recovery = ForkRecoveryConfig {
2707            transaction_overrides: Default::default(),
2708            checkpoint_overrides: Default::default(),
2709            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2710        };
2711        let r = SuiNode::check_and_recover_forks(
2712            &checkpoint_store,
2713            &checkpoint_metrics,
2714            Some(&fork_recovery),
2715        )
2716        .await;
2717        assert!(r.is_err());
2718        assert!(
2719            r.unwrap_err()
2720                .to_string()
2721                .contains("Transaction fork detected")
2722        );
2723
2724        let mut transaction_overrides = BTreeMap::new();
2725        transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2726        let fork_recovery_with_override = ForkRecoveryConfig {
2727            transaction_overrides,
2728            checkpoint_overrides: Default::default(),
2729            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2730        };
2731        let r = SuiNode::check_and_recover_forks(
2732            &checkpoint_store,
2733            &checkpoint_metrics,
2734            Some(&fork_recovery_with_override),
2735        )
2736        .await;
2737        assert!(r.is_ok());
2738        assert!(
2739            checkpoint_store
2740                .get_transaction_fork_detected()
2741                .unwrap()
2742                .is_none()
2743        );
2744    }
2745}