sui_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::debug_fatal;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::authority::ExecutionEnv;
29use sui_core::authority::RandomnessRoundReceiver;
30use sui_core::authority::authority_store_tables::{
31    AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
32};
33use sui_core::authority::backpressure::BackpressureManager;
34use sui_core::authority::epoch_start_configuration::EpochFlag;
35use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
36use sui_core::authority::shared_object_version_manager::Schedulable;
37use sui_core::consensus_adapter::ConsensusClient;
38use sui_core::consensus_manager::UpdatableConsensusClient;
39use sui_core::epoch::randomness::RandomnessManager;
40use sui_core::execution_cache::build_execution_cache;
41use sui_network::validator::server::SUI_TLS_SERVER_NAME;
42use sui_types::full_checkpoint_content::Checkpoint;
43
44use sui_core::execution_scheduler::SchedulingSource;
45use sui_core::global_state_hasher::GlobalStateHashMetrics;
46use sui_core::storage::RestReadStore;
47use sui_json_rpc::bridge_api::BridgeReadApi;
48use sui_json_rpc_api::JsonRpcMetrics;
49use sui_network::randomness;
50use sui_rpc_api::RpcMetrics;
51use sui_rpc_api::ServerVersion;
52use sui_rpc_api::subscription::SubscriptionService;
53use sui_types::base_types::ConciseableName;
54use sui_types::crypto::RandomnessRound;
55use sui_types::digests::{
56    ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
57};
58use sui_types::executable_transaction::VerifiedExecutableTransaction;
59use sui_types::messages_consensus::AuthorityCapabilitiesV2;
60use sui_types::messages_consensus::ConsensusTransactionKind;
61use sui_types::sui_system_state::SuiSystemState;
62use sui_types::transaction::VerifiedCertificate;
63use tap::tap::TapFallible;
64use tokio::sync::oneshot;
65use tokio::sync::{Mutex, broadcast, mpsc, watch};
66use tokio::task::JoinHandle;
67use tower::ServiceBuilder;
68use tracing::{Instrument, error_span, info};
69use tracing::{debug, error, warn};
70
71use fastcrypto_zkp::bn254::zk_login::JWK;
72pub use handle::SuiNodeHandle;
73use mysten_metrics::{RegistryService, spawn_monitored_task};
74use mysten_service::server_timing::server_timing_middleware;
75use sui_config::node::{DBCheckpointConfig, RunWithRange};
76use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
77use sui_config::node_config_metrics::NodeConfigMetrics;
78use sui_config::{ConsensusConfig, NodeConfig};
79use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
80use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
81use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
82use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
83use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
84use sui_core::authority_aggregator::{AuthAggMetrics, AuthorityAggregator};
85use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
86use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
87use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
88use sui_core::checkpoints::{
89    CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
90    SubmitCheckpointToConsensus,
91};
92use sui_core::consensus_adapter::{
93    CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
94};
95use sui_core::consensus_manager::ConsensusManager;
96use sui_core::consensus_throughput_calculator::{
97    ConsensusThroughputCalculator, ConsensusThroughputProfiler, ThroughputProfileRanges,
98};
99use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
100use sui_core::db_checkpoint_handler::DBCheckpointHandler;
101use sui_core::epoch::committee_store::CommitteeStore;
102use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
103use sui_core::epoch::epoch_metrics::EpochMetrics;
104use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
105use sui_core::global_state_hasher::GlobalStateHasher;
106use sui_core::jsonrpc_index::IndexStore;
107use sui_core::module_cache_metrics::ResolverMetrics;
108use sui_core::overload_monitor::overload_monitor;
109use sui_core::rpc_index::RpcIndexStore;
110use sui_core::signature_verifier::SignatureVerifierMetrics;
111use sui_core::storage::RocksDbStore;
112use sui_core::transaction_orchestrator::TransactionOrchestrator;
113use sui_core::{
114    authority::{AuthorityState, AuthorityStore},
115    authority_client::NetworkAuthorityClient,
116};
117use sui_json_rpc::JsonRpcServerBuilder;
118use sui_json_rpc::coin_api::CoinReadApi;
119use sui_json_rpc::governance_api::GovernanceReadApi;
120use sui_json_rpc::indexer_api::IndexerApi;
121use sui_json_rpc::move_utils::MoveUtils;
122use sui_json_rpc::read_api::ReadApi;
123use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
124use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
125use sui_macros::fail_point;
126use sui_macros::{fail_point_async, replay_log};
127use sui_network::api::ValidatorServer;
128use sui_network::discovery;
129use sui_network::discovery::TrustedPeerChangeEvent;
130use sui_network::state_sync;
131use sui_network::validator::server::ServerBuilder;
132use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
133use sui_snapshot::uploader::StateSnapshotUploader;
134use sui_storage::{
135    http_key_value_store::HttpKVStore,
136    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
137    key_value_store_metrics::KeyValueStoreMetrics,
138};
139use sui_types::base_types::{AuthorityName, EpochId};
140use sui_types::committee::Committee;
141use sui_types::crypto::KeypairTraits;
142use sui_types::error::{SuiError, SuiResult};
143use sui_types::messages_consensus::{
144    AuthorityCapabilitiesV1, ConsensusTransaction, check_total_jwk_size,
145};
146use sui_types::sui_system_state::SuiSystemStateTrait;
147use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
148use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
149use sui_types::supported_protocol_versions::SupportedProtocolVersions;
150use typed_store::DBMetrics;
151use typed_store::rocks::default_db_options;
152
153use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
154
155pub mod admin;
156mod handle;
157pub mod metrics;
158
159pub struct ValidatorComponents {
160    validator_server_handle: SpawnOnce,
161    validator_overload_monitor_handle: Option<JoinHandle<()>>,
162    consensus_manager: Arc<ConsensusManager>,
163    consensus_store_pruner: ConsensusStorePruner,
164    consensus_adapter: Arc<ConsensusAdapter>,
165    checkpoint_metrics: Arc<CheckpointMetrics>,
166    sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
167}
168pub struct P2pComponents {
169    p2p_network: Network,
170    known_peers: HashMap<PeerId, String>,
171    discovery_handle: discovery::Handle,
172    state_sync_handle: state_sync::Handle,
173    randomness_handle: randomness::Handle,
174}
175
176#[cfg(msim)]
177mod simulator {
178    use std::sync::atomic::AtomicBool;
179    use sui_types::error::SuiErrorKind;
180
181    use super::*;
182    pub(super) struct SimState {
183        pub sim_node: sui_simulator::runtime::NodeHandle,
184        pub sim_safe_mode_expected: AtomicBool,
185        _leak_detector: sui_simulator::NodeLeakDetector,
186    }
187
188    impl Default for SimState {
189        fn default() -> Self {
190            Self {
191                sim_node: sui_simulator::runtime::NodeHandle::current(),
192                sim_safe_mode_expected: AtomicBool::new(false),
193                _leak_detector: sui_simulator::NodeLeakDetector::new(),
194            }
195        }
196    }
197
198    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
199        + Send
200        + Sync
201        + 'static;
202
203    fn default_fetch_jwks(
204        _authority: AuthorityName,
205        _provider: &OIDCProvider,
206    ) -> SuiResult<Vec<(JwkId, JWK)>> {
207        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
208        // Just load a default Twitch jwk for testing.
209        parse_jwks(
210            sui_types::zk_login_util::DEFAULT_JWK_BYTES,
211            &OIDCProvider::Twitch,
212            true,
213        )
214        .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
215    }
216
217    thread_local! {
218        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
219    }
220
221    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
222        JWK_INJECTOR.with(|injector| injector.borrow().clone())
223    }
224
225    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
226        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
227    }
228}
229
230#[cfg(msim)]
231pub use simulator::set_jwk_injector;
232#[cfg(msim)]
233use simulator::*;
234use sui_core::authority::authority_store_pruner::{ObjectsCompactionFilter, PrunerWatermarks};
235use sui_core::{
236    consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
237    validator_tx_finalizer::ValidatorTxFinalizer,
238};
239
240const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
241
242pub struct SuiNode {
243    config: NodeConfig,
244    validator_components: Mutex<Option<ValidatorComponents>>,
245
246    /// The http servers responsible for serving RPC traffic (gRPC and JSON-RPC)
247    #[allow(unused)]
248    http_servers: HttpServers,
249
250    state: Arc<AuthorityState>,
251    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
252    registry_service: RegistryService,
253    metrics: Arc<SuiNodeMetrics>,
254    checkpoint_metrics: Arc<CheckpointMetrics>,
255
256    _discovery: discovery::Handle,
257    _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
258    state_sync_handle: state_sync::Handle,
259    randomness_handle: randomness::Handle,
260    checkpoint_store: Arc<CheckpointStore>,
261    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
262    connection_monitor_status: Arc<ConnectionMonitorStatus>,
263
264    /// Broadcast channel to send the starting system state for the next epoch.
265    end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
266
267    /// Broadcast channel to notify state-sync for new validator peers.
268    trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
269
270    backpressure_manager: Arc<BackpressureManager>,
271
272    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
273
274    #[cfg(msim)]
275    sim_state: SimState,
276
277    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
278    // Channel to allow signaling upstream to shutdown sui-node
279    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
280
281    /// AuthorityAggregator of the network, created at start and beginning of each epoch.
282    /// Use ArcSwap so that we could mutate it without taking mut reference.
283    // TODO: Eventually we can make this auth aggregator a shared reference so that this
284    // update will automatically propagate to other uses.
285    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
286
287    subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
288}
289
290impl fmt::Debug for SuiNode {
291    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
292        f.debug_struct("SuiNode")
293            .field("name", &self.state.name.concise())
294            .finish()
295    }
296}
297
298static MAX_JWK_KEYS_PER_FETCH: usize = 100;
299
300impl SuiNode {
301    pub async fn start(
302        config: NodeConfig,
303        registry_service: RegistryService,
304    ) -> Result<Arc<SuiNode>> {
305        Self::start_async(
306            config,
307            registry_service,
308            ServerVersion::new("sui-node", "unknown"),
309        )
310        .await
311    }
312
313    fn start_jwk_updater(
314        config: &NodeConfig,
315        metrics: Arc<SuiNodeMetrics>,
316        authority: AuthorityName,
317        epoch_store: Arc<AuthorityPerEpochStore>,
318        consensus_adapter: Arc<ConsensusAdapter>,
319    ) {
320        let epoch = epoch_store.epoch();
321
322        let supported_providers = config
323            .zklogin_oauth_providers
324            .get(&epoch_store.get_chain_identifier().chain())
325            .unwrap_or(&BTreeSet::new())
326            .iter()
327            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
328            .collect::<Vec<_>>();
329
330        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
331
332        info!(
333            ?fetch_interval,
334            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
335        );
336
337        fn validate_jwk(
338            metrics: &Arc<SuiNodeMetrics>,
339            provider: &OIDCProvider,
340            id: &JwkId,
341            jwk: &JWK,
342        ) -> bool {
343            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
344                warn!(
345                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
346                    id.iss, provider
347                );
348                metrics
349                    .invalid_jwks
350                    .with_label_values(&[&provider.to_string()])
351                    .inc();
352                return false;
353            };
354
355            if iss_provider != *provider {
356                warn!(
357                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
358                    id.iss, provider, iss_provider
359                );
360                metrics
361                    .invalid_jwks
362                    .with_label_values(&[&provider.to_string()])
363                    .inc();
364                return false;
365            }
366
367            if !check_total_jwk_size(id, jwk) {
368                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
369                metrics
370                    .invalid_jwks
371                    .with_label_values(&[&provider.to_string()])
372                    .inc();
373                return false;
374            }
375
376            true
377        }
378
379        // metrics is:
380        //  pub struct SuiNodeMetrics {
381        //      pub jwk_requests: IntCounterVec,
382        //      pub jwk_request_errors: IntCounterVec,
383        //      pub total_jwks: IntCounterVec,
384        //      pub unique_jwks: IntCounterVec,
385        //  }
386
387        for p in supported_providers.into_iter() {
388            let provider_str = p.to_string();
389            let epoch_store = epoch_store.clone();
390            let consensus_adapter = consensus_adapter.clone();
391            let metrics = metrics.clone();
392            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
393                async move {
394                    // note: restart-safe de-duplication happens after consensus, this is
395                    // just best-effort to reduce unneeded submissions.
396                    let mut seen = HashSet::new();
397                    loop {
398                        info!("fetching JWK for provider {:?}", p);
399                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
400                        match Self::fetch_jwks(authority, &p).await {
401                            Err(e) => {
402                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
403                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
404                                // Retry in 30 seconds
405                                tokio::time::sleep(Duration::from_secs(30)).await;
406                                continue;
407                            }
408                            Ok(mut keys) => {
409                                metrics.total_jwks
410                                    .with_label_values(&[&provider_str])
411                                    .inc_by(keys.len() as u64);
412
413                                keys.retain(|(id, jwk)| {
414                                    validate_jwk(&metrics, &p, id, jwk) &&
415                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
416                                    seen.insert((id.clone(), jwk.clone()))
417                                });
418
419                                metrics.unique_jwks
420                                    .with_label_values(&[&provider_str])
421                                    .inc_by(keys.len() as u64);
422
423                                // prevent oauth providers from sending too many keys,
424                                // inadvertently or otherwise
425                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
426                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
427                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
428                                }
429
430                                for (id, jwk) in keys.into_iter() {
431                                    info!("Submitting JWK to consensus: {:?}", id);
432
433                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
434                                    consensus_adapter.submit(txn, None, &epoch_store, None, None)
435                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
436                                        .ok();
437                                }
438                            }
439                        }
440                        tokio::time::sleep(fetch_interval).await;
441                    }
442                }
443                .instrument(error_span!("jwk_updater_task", epoch)),
444            ));
445        }
446    }
447
448    pub async fn start_async(
449        config: NodeConfig,
450        registry_service: RegistryService,
451        server_version: ServerVersion,
452    ) -> Result<Arc<SuiNode>> {
453        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
454        let mut config = config.clone();
455        if config.supported_protocol_versions.is_none() {
456            info!(
457                "populating config.supported_protocol_versions with default {:?}",
458                SupportedProtocolVersions::SYSTEM_DEFAULT
459            );
460            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
461        }
462
463        let run_with_range = config.run_with_range;
464        let is_validator = config.consensus_config().is_some();
465        let is_full_node = !is_validator;
466        let prometheus_registry = registry_service.default_registry();
467
468        info!(node =? config.protocol_public_key(),
469            "Initializing sui-node listening on {}", config.network_address
470        );
471
472        // Initialize metrics to track db usage before creating any stores
473        DBMetrics::init(registry_service.clone());
474
475        // Initialize Mysten metrics.
476        mysten_metrics::init_metrics(&prometheus_registry);
477        // Unsupported (because of the use of static variable) and unnecessary in simtests.
478        #[cfg(not(msim))]
479        mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
480
481        let genesis = config.genesis()?.clone();
482
483        let secret = Arc::pin(config.protocol_key_pair().copy());
484        let genesis_committee = genesis.committee()?;
485        let committee_store = Arc::new(CommitteeStore::new(
486            config.db_path().join("epochs"),
487            &genesis_committee,
488            None,
489        ));
490
491        let pruner_watermarks = Arc::new(PrunerWatermarks::default());
492        let checkpoint_store = CheckpointStore::new(
493            &config.db_path().join("checkpoints"),
494            pruner_watermarks.clone(),
495        );
496        let checkpoint_metrics = CheckpointMetrics::new(&registry_service.default_registry());
497
498        Self::check_and_recover_forks(
499            &checkpoint_store,
500            &checkpoint_metrics,
501            is_validator,
502            config.fork_recovery.as_ref(),
503        )
504        .await?;
505
506        let mut pruner_db = None;
507        if config
508            .authority_store_pruning_config
509            .enable_compaction_filter
510        {
511            pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
512                &config.db_path().join("store"),
513            )));
514        }
515        let compaction_filter = pruner_db
516            .clone()
517            .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
518
519        // By default, only enable write stall on validators for perpetual db.
520        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
521        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
522            enable_write_stall,
523            compaction_filter,
524        };
525        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
526            &config.db_path().join("store"),
527            Some(perpetual_tables_options),
528            Some(pruner_watermarks.epoch_id.clone()),
529        ));
530        let is_genesis = perpetual_tables
531            .database_is_empty()
532            .expect("Database read should not fail at init.");
533
534        let backpressure_manager =
535            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
536
537        let store =
538            AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
539
540        let cur_epoch = store.get_recovery_epoch_at_restart()?;
541        let committee = committee_store
542            .get_committee(&cur_epoch)?
543            .expect("Committee of the current epoch must exist");
544        let epoch_start_configuration = store
545            .get_epoch_start_configuration()?
546            .expect("EpochStartConfiguration of the current epoch must exist");
547        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
548        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
549
550        let cache_traits = build_execution_cache(
551            &config.execution_cache,
552            &prometheus_registry,
553            &store,
554            backpressure_manager.clone(),
555        );
556
557        let auth_agg = {
558            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
559            let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
560            Arc::new(ArcSwap::new(Arc::new(
561                AuthorityAggregator::new_from_epoch_start_state(
562                    epoch_start_configuration.epoch_start_state(),
563                    &committee_store,
564                    safe_client_metrics_base,
565                    auth_agg_metrics,
566                ),
567            )))
568        };
569
570        let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
571        let chain = match config.chain_override_for_testing {
572            Some(chain) => chain,
573            None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
574        };
575
576        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
577        let epoch_store = AuthorityPerEpochStore::new(
578            config.protocol_public_key(),
579            committee.clone(),
580            &config.db_path().join("store"),
581            Some(epoch_options.options),
582            EpochMetrics::new(&registry_service.default_registry()),
583            epoch_start_configuration,
584            cache_traits.backing_package_store.clone(),
585            cache_traits.object_store.clone(),
586            cache_metrics,
587            signature_verifier_metrics,
588            &config.expensive_safety_check_config,
589            (chain_id, chain),
590            checkpoint_store
591                .get_highest_executed_checkpoint_seq_number()
592                .expect("checkpoint store read cannot fail")
593                .unwrap_or(0),
594            Arc::new(SubmittedTransactionCacheMetrics::new(
595                &registry_service.default_registry(),
596            )),
597        )?;
598
599        info!("created epoch store");
600
601        replay_log!(
602            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
603            epoch_store.epoch(),
604            epoch_store.protocol_config()
605        );
606
607        // the database is empty at genesis time
608        if is_genesis {
609            info!("checking SUI conservation at genesis");
610            // When we are opening the db table, the only time when it's safe to
611            // check SUI conservation is at genesis. Otherwise we may be in the middle of
612            // an epoch and the SUI conservation check will fail. This also initialize
613            // the expected_network_sui_amount table.
614            cache_traits
615                .reconfig_api
616                .expensive_check_sui_conservation(&epoch_store)
617                .expect("SUI conservation check cannot fail at genesis");
618        }
619
620        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
621        let default_buffer_stake = epoch_store
622            .protocol_config()
623            .buffer_stake_for_protocol_upgrade_bps();
624        if effective_buffer_stake != default_buffer_stake {
625            warn!(
626                ?effective_buffer_stake,
627                ?default_buffer_stake,
628                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
629            );
630        }
631
632        checkpoint_store.insert_genesis_checkpoint(
633            genesis.checkpoint(),
634            genesis.checkpoint_contents().clone(),
635            &epoch_store,
636        );
637
638        info!("creating state sync store");
639        let state_sync_store = RocksDbStore::new(
640            cache_traits.clone(),
641            committee_store.clone(),
642            checkpoint_store.clone(),
643        );
644
645        let index_store = if is_full_node && config.enable_index_processing {
646            info!("creating index store");
647            Some(Arc::new(IndexStore::new(
648                config.db_path().join("indexes"),
649                &prometheus_registry,
650                epoch_store
651                    .protocol_config()
652                    .max_move_identifier_len_as_option(),
653                config.remove_deprecated_tables,
654            )))
655        } else {
656            None
657        };
658
659        let rpc_index = if is_full_node && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
660            Some(Arc::new(
661                RpcIndexStore::new(
662                    &config.db_path(),
663                    &store,
664                    &checkpoint_store,
665                    &epoch_store,
666                    &cache_traits.backing_package_store,
667                    pruner_watermarks.checkpoint_id.clone(),
668                    config.rpc().cloned().unwrap_or_default(),
669                )
670                .await,
671            ))
672        } else {
673            None
674        };
675
676        let chain_identifier = epoch_store.get_chain_identifier();
677
678        info!("creating archive reader");
679        // Create network
680        // TODO only configure validators as seed/preferred peers for validators and not for
681        // fullnodes once we've had a chance to re-work fullnode configuration generation.
682        let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
683        let (randomness_tx, randomness_rx) = mpsc::channel(
684            config
685                .p2p_config
686                .randomness
687                .clone()
688                .unwrap_or_default()
689                .mailbox_capacity(),
690        );
691        let P2pComponents {
692            p2p_network,
693            known_peers,
694            discovery_handle,
695            state_sync_handle,
696            randomness_handle,
697        } = Self::create_p2p_network(
698            &config,
699            state_sync_store.clone(),
700            chain_identifier,
701            trusted_peer_change_rx,
702            randomness_tx,
703            &prometheus_registry,
704        )?;
705
706        // We must explicitly send this instead of relying on the initial value to trigger
707        // watch value change, so that state-sync is able to process it.
708        send_trusted_peer_change(
709            &config,
710            &trusted_peer_change_tx,
711            epoch_store.epoch_start_state(),
712        )
713        .expect("Initial trusted peers must be set");
714
715        info!("start snapshot upload");
716        // Start uploading state snapshot to remote store
717        let state_snapshot_handle = Self::start_state_snapshot(
718            &config,
719            &prometheus_registry,
720            checkpoint_store.clone(),
721            chain_identifier,
722        )?;
723
724        // Start uploading db checkpoints to remote store
725        info!("start db checkpoint");
726        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
727            &config,
728            &prometheus_registry,
729            state_snapshot_handle.is_some(),
730        )?;
731
732        if !epoch_store
733            .protocol_config()
734            .simplified_unwrap_then_delete()
735        {
736            // We cannot prune tombstones if simplified_unwrap_then_delete is not enabled.
737            config
738                .authority_store_pruning_config
739                .set_killswitch_tombstone_pruning(true);
740        }
741
742        let authority_name = config.protocol_public_key();
743        let validator_tx_finalizer =
744            config
745                .enable_validator_tx_finalizer
746                .then_some(Arc::new(ValidatorTxFinalizer::new(
747                    auth_agg.clone(),
748                    authority_name,
749                    &prometheus_registry,
750                )));
751
752        info!("create authority state");
753        let state = AuthorityState::new(
754            authority_name,
755            secret,
756            config.supported_protocol_versions.unwrap(),
757            store.clone(),
758            cache_traits.clone(),
759            epoch_store.clone(),
760            committee_store.clone(),
761            index_store.clone(),
762            rpc_index,
763            checkpoint_store.clone(),
764            &prometheus_registry,
765            genesis.objects(),
766            &db_checkpoint_config,
767            config.clone(),
768            validator_tx_finalizer,
769            chain_identifier,
770            pruner_db,
771            config.policy_config.clone(),
772            config.firewall_config.clone(),
773            pruner_watermarks,
774        )
775        .await;
776        // ensure genesis txn was executed
777        if epoch_store.epoch() == 0 {
778            let txn = &genesis.transaction();
779            let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
780            let transaction =
781                sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
782                    sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
783                        genesis.transaction().data().clone(),
784                        sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
785                    ),
786                );
787            state
788                .try_execute_immediately(
789                    &transaction,
790                    ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
791                    &epoch_store,
792                )
793                .instrument(span)
794                .await
795                .unwrap();
796        }
797
798        // Start the loop that receives new randomness and generates transactions for it.
799        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
800
801        if config
802            .expensive_safety_check_config
803            .enable_secondary_index_checks()
804            && let Some(indexes) = state.indexes.clone()
805        {
806            sui_core::verify_indexes::verify_indexes(
807                state.get_global_state_hash_store().as_ref(),
808                indexes,
809            )
810            .expect("secondary indexes are inconsistent");
811        }
812
813        let (end_of_epoch_channel, end_of_epoch_receiver) =
814            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
815
816        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
817            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
818                auth_agg.load_full(),
819                state.clone(),
820                end_of_epoch_receiver,
821                &config.db_path(),
822                &prometheus_registry,
823                &config,
824            )))
825        } else {
826            None
827        };
828
829        let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
830            state.clone(),
831            state_sync_store,
832            &transaction_orchestrator.clone(),
833            &config,
834            &prometheus_registry,
835            server_version,
836        )
837        .await?;
838
839        let global_state_hasher = Arc::new(GlobalStateHasher::new(
840            cache_traits.global_state_hash_store.clone(),
841            GlobalStateHashMetrics::new(&prometheus_registry),
842        ));
843
844        let authority_names_to_peer_ids = epoch_store
845            .epoch_start_state()
846            .get_authority_names_to_peer_ids();
847
848        let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
849            "sui",
850            &registry_service.default_registry(),
851        );
852
853        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
854
855        let connection_monitor_handle =
856            mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
857                p2p_network.downgrade(),
858                Arc::new(network_connection_metrics),
859                known_peers,
860            );
861
862        let connection_monitor_status = ConnectionMonitorStatus {
863            connection_statuses: connection_monitor_handle.connection_statuses(),
864            authority_names_to_peer_ids,
865        };
866
867        let connection_monitor_status = Arc::new(connection_monitor_status);
868        let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));
869
870        sui_node_metrics
871            .binary_max_protocol_version
872            .set(ProtocolVersion::MAX.as_u64() as i64);
873        sui_node_metrics
874            .configured_max_protocol_version
875            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
876
877        let validator_components = if state.is_validator(&epoch_store) {
878            let (components, _) = futures::join!(
879                Self::construct_validator_components(
880                    config.clone(),
881                    state.clone(),
882                    committee,
883                    epoch_store.clone(),
884                    checkpoint_store.clone(),
885                    state_sync_handle.clone(),
886                    randomness_handle.clone(),
887                    Arc::downgrade(&global_state_hasher),
888                    backpressure_manager.clone(),
889                    connection_monitor_status.clone(),
890                    &registry_service,
891                    sui_node_metrics.clone(),
892                    checkpoint_metrics.clone(),
893                ),
894                Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
895            );
896            let mut components = components?;
897
898            components.consensus_adapter.submit_recovered(&epoch_store);
899
900            // Start the gRPC server
901            components.validator_server_handle = components.validator_server_handle.start().await;
902
903            Some(components)
904        } else {
905            None
906        };
907
908        // setup shutdown channel
909        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
910
911        let node = Self {
912            config,
913            validator_components: Mutex::new(validator_components),
914            http_servers,
915            state,
916            transaction_orchestrator,
917            registry_service,
918            metrics: sui_node_metrics,
919            checkpoint_metrics,
920
921            _discovery: discovery_handle,
922            _connection_monitor_handle: connection_monitor_handle,
923            state_sync_handle,
924            randomness_handle,
925            checkpoint_store,
926            global_state_hasher: Mutex::new(Some(global_state_hasher)),
927            end_of_epoch_channel,
928            connection_monitor_status,
929            trusted_peer_change_tx,
930            backpressure_manager,
931
932            _db_checkpoint_handle: db_checkpoint_handle,
933
934            #[cfg(msim)]
935            sim_state: Default::default(),
936
937            _state_snapshot_uploader_handle: state_snapshot_handle,
938            shutdown_channel_tx: shutdown_channel,
939
940            auth_agg,
941            subscription_service_checkpoint_sender,
942        };
943
944        info!("SuiNode started!");
945        let node = Arc::new(node);
946        let node_copy = node.clone();
947        spawn_monitored_task!(async move {
948            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
949            if let Err(error) = result {
950                warn!("Reconfiguration finished with error {:?}", error);
951            }
952        });
953
954        Ok(node)
955    }
956
957    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
958        self.end_of_epoch_channel.subscribe()
959    }
960
961    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
962        self.shutdown_channel_tx.subscribe()
963    }
964
965    pub fn current_epoch_for_testing(&self) -> EpochId {
966        self.state.current_epoch_for_testing()
967    }
968
969    pub fn db_checkpoint_path(&self) -> PathBuf {
970        self.config.db_checkpoint_path()
971    }
972
973    // Init reconfig process by starting to reject user certs
974    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
975        info!("close_epoch (current epoch = {})", epoch_store.epoch());
976        self.validator_components
977            .lock()
978            .await
979            .as_ref()
980            .ok_or_else(|| SuiError::from("Node is not a validator"))?
981            .consensus_adapter
982            .close_epoch(epoch_store);
983        Ok(())
984    }
985
986    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
987        self.state
988            .clear_override_protocol_upgrade_buffer_stake(epoch)
989    }
990
991    pub fn set_override_protocol_upgrade_buffer_stake(
992        &self,
993        epoch: EpochId,
994        buffer_stake_bps: u64,
995    ) -> SuiResult {
996        self.state
997            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
998    }
999
1000    // Testing-only API to start epoch close process.
1001    // For production code, please use the non-testing version.
1002    pub async fn close_epoch_for_testing(&self) -> SuiResult {
1003        let epoch_store = self.state.epoch_store_for_testing();
1004        self.close_epoch(&epoch_store).await
1005    }
1006
1007    fn start_state_snapshot(
1008        config: &NodeConfig,
1009        prometheus_registry: &Registry,
1010        checkpoint_store: Arc<CheckpointStore>,
1011        chain_identifier: ChainIdentifier,
1012    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1013        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1014            let snapshot_uploader = StateSnapshotUploader::new(
1015                &config.db_checkpoint_path(),
1016                &config.snapshot_path(),
1017                remote_store_config.clone(),
1018                60,
1019                prometheus_registry,
1020                checkpoint_store,
1021                chain_identifier,
1022            )?;
1023            Ok(Some(snapshot_uploader.start()))
1024        } else {
1025            Ok(None)
1026        }
1027    }
1028
1029    fn start_db_checkpoint(
1030        config: &NodeConfig,
1031        prometheus_registry: &Registry,
1032        state_snapshot_enabled: bool,
1033    ) -> Result<(
1034        DBCheckpointConfig,
1035        Option<tokio::sync::broadcast::Sender<()>>,
1036    )> {
1037        let checkpoint_path = Some(
1038            config
1039                .db_checkpoint_config
1040                .checkpoint_path
1041                .clone()
1042                .unwrap_or_else(|| config.db_checkpoint_path()),
1043        );
1044        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1045            DBCheckpointConfig {
1046                checkpoint_path,
1047                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1048                    true
1049                } else {
1050                    config
1051                        .db_checkpoint_config
1052                        .perform_db_checkpoints_at_epoch_end
1053                },
1054                ..config.db_checkpoint_config.clone()
1055            }
1056        } else {
1057            config.db_checkpoint_config.clone()
1058        };
1059
1060        match (
1061            db_checkpoint_config.object_store_config.as_ref(),
1062            state_snapshot_enabled,
1063        ) {
1064            // If db checkpoint config object store not specified but
1065            // state snapshot object store is specified, create handler
1066            // anyway for marking db checkpoints as completed so that they
1067            // can be uploaded as state snapshots.
1068            (None, false) => Ok((db_checkpoint_config, None)),
1069            (_, _) => {
1070                let handler = DBCheckpointHandler::new(
1071                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1072                    db_checkpoint_config.object_store_config.as_ref(),
1073                    60,
1074                    db_checkpoint_config
1075                        .prune_and_compact_before_upload
1076                        .unwrap_or(true),
1077                    config.authority_store_pruning_config.clone(),
1078                    prometheus_registry,
1079                    state_snapshot_enabled,
1080                )?;
1081                Ok((
1082                    db_checkpoint_config,
1083                    Some(DBCheckpointHandler::start(handler)),
1084                ))
1085            }
1086        }
1087    }
1088
1089    fn create_p2p_network(
1090        config: &NodeConfig,
1091        state_sync_store: RocksDbStore,
1092        chain_identifier: ChainIdentifier,
1093        trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1094        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1095        prometheus_registry: &Registry,
1096    ) -> Result<P2pComponents> {
1097        let (state_sync, state_sync_server) = state_sync::Builder::new()
1098            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1099            .store(state_sync_store)
1100            .archive_config(config.archive_reader_config())
1101            .with_metrics(prometheus_registry)
1102            .build();
1103
1104        let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1105            .config(config.p2p_config.clone())
1106            .build();
1107
1108        let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1109        let known_peers: HashMap<PeerId, String> = discovery_config
1110            .allowlisted_peers
1111            .clone()
1112            .into_iter()
1113            .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1114            .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1115                peer.peer_id
1116                    .map(|peer_id| (peer_id, "seed_peer".to_string()))
1117            }))
1118            .collect();
1119
1120        let (randomness, randomness_router) =
1121            randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1122                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1123                .with_metrics(prometheus_registry)
1124                .build();
1125
1126        let p2p_network = {
1127            let routes = anemo::Router::new()
1128                .add_rpc_service(discovery_server)
1129                .add_rpc_service(state_sync_server);
1130            let routes = routes.merge(randomness_router);
1131
1132            let inbound_network_metrics =
1133                mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1134            let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1135                "sui",
1136                "outbound",
1137                prometheus_registry,
1138            );
1139
1140            let service = ServiceBuilder::new()
1141                .layer(
1142                    TraceLayer::new_for_server_errors()
1143                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1144                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1145                )
1146                .layer(CallbackLayer::new(
1147                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1148                        Arc::new(inbound_network_metrics),
1149                        config.p2p_config.excessive_message_size(),
1150                    ),
1151                ))
1152                .service(routes);
1153
1154            let outbound_layer = ServiceBuilder::new()
1155                .layer(
1156                    TraceLayer::new_for_client_and_server_errors()
1157                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1158                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1159                )
1160                .layer(CallbackLayer::new(
1161                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1162                        Arc::new(outbound_network_metrics),
1163                        config.p2p_config.excessive_message_size(),
1164                    ),
1165                ))
1166                .into_inner();
1167
1168            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1169            // Set the max_frame_size to be 1 GB to work around the issue of there being too many
1170            // staking events in the epoch change txn.
1171            anemo_config.max_frame_size = Some(1 << 30);
1172
1173            // Set a higher default value for socket send/receive buffers if not already
1174            // configured.
1175            let mut quic_config = anemo_config.quic.unwrap_or_default();
1176            if quic_config.socket_send_buffer_size.is_none() {
1177                quic_config.socket_send_buffer_size = Some(20 << 20);
1178            }
1179            if quic_config.socket_receive_buffer_size.is_none() {
1180                quic_config.socket_receive_buffer_size = Some(20 << 20);
1181            }
1182            quic_config.allow_failed_socket_buffer_size_setting = true;
1183
1184            // Set high-performance defaults for quinn transport.
1185            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1186            if quic_config.max_concurrent_bidi_streams.is_none() {
1187                quic_config.max_concurrent_bidi_streams = Some(500);
1188            }
1189            if quic_config.max_concurrent_uni_streams.is_none() {
1190                quic_config.max_concurrent_uni_streams = Some(500);
1191            }
1192            if quic_config.stream_receive_window.is_none() {
1193                quic_config.stream_receive_window = Some(100 << 20);
1194            }
1195            if quic_config.receive_window.is_none() {
1196                quic_config.receive_window = Some(200 << 20);
1197            }
1198            if quic_config.send_window.is_none() {
1199                quic_config.send_window = Some(200 << 20);
1200            }
1201            if quic_config.crypto_buffer_size.is_none() {
1202                quic_config.crypto_buffer_size = Some(1 << 20);
1203            }
1204            if quic_config.max_idle_timeout_ms.is_none() {
1205                quic_config.max_idle_timeout_ms = Some(30_000);
1206            }
1207            if quic_config.keep_alive_interval_ms.is_none() {
1208                quic_config.keep_alive_interval_ms = Some(5_000);
1209            }
1210            anemo_config.quic = Some(quic_config);
1211
1212            let server_name = format!("sui-{}", chain_identifier);
1213            let network = Network::bind(config.p2p_config.listen_address)
1214                .server_name(&server_name)
1215                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1216                .config(anemo_config)
1217                .outbound_request_layer(outbound_layer)
1218                .start(service)?;
1219            info!(
1220                server_name = server_name,
1221                "P2p network started on {}",
1222                network.local_addr()
1223            );
1224
1225            network
1226        };
1227
1228        let discovery_handle =
1229            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1230        let state_sync_handle = state_sync.start(p2p_network.clone());
1231        let randomness_handle = randomness.start(p2p_network.clone());
1232
1233        Ok(P2pComponents {
1234            p2p_network,
1235            known_peers,
1236            discovery_handle,
1237            state_sync_handle,
1238            randomness_handle,
1239        })
1240    }
1241
1242    async fn construct_validator_components(
1243        config: NodeConfig,
1244        state: Arc<AuthorityState>,
1245        committee: Arc<Committee>,
1246        epoch_store: Arc<AuthorityPerEpochStore>,
1247        checkpoint_store: Arc<CheckpointStore>,
1248        state_sync_handle: state_sync::Handle,
1249        randomness_handle: randomness::Handle,
1250        global_state_hasher: Weak<GlobalStateHasher>,
1251        backpressure_manager: Arc<BackpressureManager>,
1252        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1253        registry_service: &RegistryService,
1254        sui_node_metrics: Arc<SuiNodeMetrics>,
1255        checkpoint_metrics: Arc<CheckpointMetrics>,
1256    ) -> Result<ValidatorComponents> {
1257        let mut config_clone = config.clone();
1258        let consensus_config = config_clone
1259            .consensus_config
1260            .as_mut()
1261            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1262
1263        let client = Arc::new(UpdatableConsensusClient::new());
1264        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1265            &committee,
1266            consensus_config,
1267            state.name,
1268            connection_monitor_status.clone(),
1269            &registry_service.default_registry(),
1270            epoch_store.protocol_config().clone(),
1271            client.clone(),
1272            checkpoint_store.clone(),
1273        ));
1274        let consensus_manager = Arc::new(ConsensusManager::new(
1275            &config,
1276            consensus_config,
1277            registry_service,
1278            client,
1279        ));
1280
1281        // This only gets started up once, not on every epoch. (Make call to remove every epoch.)
1282        let consensus_store_pruner = ConsensusStorePruner::new(
1283            consensus_manager.get_storage_base_path(),
1284            consensus_config.db_retention_epochs(),
1285            consensus_config.db_pruner_period(),
1286            &registry_service.default_registry(),
1287        );
1288
1289        let sui_tx_validator_metrics =
1290            SuiTxValidatorMetrics::new(&registry_service.default_registry());
1291
1292        let validator_server_handle = Self::start_grpc_validator_service(
1293            &config,
1294            state.clone(),
1295            consensus_adapter.clone(),
1296            &registry_service.default_registry(),
1297        )
1298        .await?;
1299
1300        // Starts an overload monitor that monitors the execution of the authority.
1301        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1302        let validator_overload_monitor_handle = if config
1303            .authority_overload_config
1304            .max_load_shedding_percentage
1305            > 0
1306        {
1307            let authority_state = Arc::downgrade(&state);
1308            let overload_config = config.authority_overload_config.clone();
1309            fail_point!("starting_overload_monitor");
1310            Some(spawn_monitored_task!(overload_monitor(
1311                authority_state,
1312                overload_config,
1313            )))
1314        } else {
1315            None
1316        };
1317
1318        Self::start_epoch_specific_validator_components(
1319            &config,
1320            state.clone(),
1321            consensus_adapter,
1322            checkpoint_store,
1323            epoch_store,
1324            state_sync_handle,
1325            randomness_handle,
1326            consensus_manager,
1327            consensus_store_pruner,
1328            global_state_hasher,
1329            backpressure_manager,
1330            validator_server_handle,
1331            validator_overload_monitor_handle,
1332            checkpoint_metrics,
1333            sui_node_metrics,
1334            sui_tx_validator_metrics,
1335        )
1336        .await
1337    }
1338
1339    async fn start_epoch_specific_validator_components(
1340        config: &NodeConfig,
1341        state: Arc<AuthorityState>,
1342        consensus_adapter: Arc<ConsensusAdapter>,
1343        checkpoint_store: Arc<CheckpointStore>,
1344        epoch_store: Arc<AuthorityPerEpochStore>,
1345        state_sync_handle: state_sync::Handle,
1346        randomness_handle: randomness::Handle,
1347        consensus_manager: Arc<ConsensusManager>,
1348        consensus_store_pruner: ConsensusStorePruner,
1349        state_hasher: Weak<GlobalStateHasher>,
1350        backpressure_manager: Arc<BackpressureManager>,
1351        validator_server_handle: SpawnOnce,
1352        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1353        checkpoint_metrics: Arc<CheckpointMetrics>,
1354        sui_node_metrics: Arc<SuiNodeMetrics>,
1355        sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1356    ) -> Result<ValidatorComponents> {
1357        let checkpoint_service = Self::build_checkpoint_service(
1358            config,
1359            consensus_adapter.clone(),
1360            checkpoint_store.clone(),
1361            epoch_store.clone(),
1362            state.clone(),
1363            state_sync_handle,
1364            state_hasher,
1365            checkpoint_metrics.clone(),
1366        );
1367
1368        // create a new map that gets injected into both the consensus handler and the consensus adapter
1369        // the consensus handler will write values forwarded from consensus, and the consensus adapter
1370        // will read the values to make decisions about which validator submits a transaction to consensus
1371        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1372
1373        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1374
1375        if epoch_store.randomness_state_enabled() {
1376            let randomness_manager = RandomnessManager::try_new(
1377                Arc::downgrade(&epoch_store),
1378                Box::new(consensus_adapter.clone()),
1379                randomness_handle,
1380                config.protocol_key_pair(),
1381            )
1382            .await;
1383            if let Some(randomness_manager) = randomness_manager {
1384                epoch_store
1385                    .set_randomness_manager(randomness_manager)
1386                    .await?;
1387            }
1388        }
1389
1390        ExecutionTimeObserver::spawn(
1391            epoch_store.clone(),
1392            Box::new(consensus_adapter.clone()),
1393            config
1394                .execution_time_observer_config
1395                .clone()
1396                .unwrap_or_default(),
1397        );
1398
1399        let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1400            None,
1401            state.metrics.clone(),
1402        ));
1403
1404        let throughput_profiler = Arc::new(ConsensusThroughputProfiler::new(
1405            throughput_calculator.clone(),
1406            None,
1407            None,
1408            state.metrics.clone(),
1409            ThroughputProfileRanges::from_chain(epoch_store.get_chain_identifier()),
1410        ));
1411
1412        consensus_adapter.swap_throughput_profiler(throughput_profiler);
1413
1414        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1415            state.clone(),
1416            checkpoint_service.clone(),
1417            epoch_store.clone(),
1418            consensus_adapter.clone(),
1419            low_scoring_authorities,
1420            throughput_calculator,
1421            backpressure_manager,
1422        );
1423
1424        info!("Starting consensus manager asynchronously");
1425
1426        // Spawn consensus startup asynchronously to avoid blocking other components
1427        tokio::spawn({
1428            let config = config.clone();
1429            let epoch_store = epoch_store.clone();
1430            let sui_tx_validator = SuiTxValidator::new(
1431                state.clone(),
1432                consensus_adapter.clone(),
1433                checkpoint_service.clone(),
1434                sui_tx_validator_metrics.clone(),
1435            );
1436            let consensus_manager = consensus_manager.clone();
1437            async move {
1438                consensus_manager
1439                    .start(
1440                        &config,
1441                        epoch_store,
1442                        consensus_handler_initializer,
1443                        sui_tx_validator,
1444                    )
1445                    .await;
1446            }
1447        });
1448        let replay_waiter = consensus_manager.replay_waiter();
1449
1450        info!("Spawning checkpoint service");
1451        let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1452            None
1453        } else {
1454            Some(replay_waiter)
1455        };
1456        checkpoint_service
1457            .spawn(epoch_store.clone(), replay_waiter)
1458            .await;
1459
1460        if epoch_store.authenticator_state_enabled() {
1461            Self::start_jwk_updater(
1462                config,
1463                sui_node_metrics,
1464                state.name,
1465                epoch_store.clone(),
1466                consensus_adapter.clone(),
1467            );
1468        }
1469
1470        Ok(ValidatorComponents {
1471            validator_server_handle,
1472            validator_overload_monitor_handle,
1473            consensus_manager,
1474            consensus_store_pruner,
1475            consensus_adapter,
1476            checkpoint_metrics,
1477            sui_tx_validator_metrics,
1478        })
1479    }
1480
1481    fn build_checkpoint_service(
1482        config: &NodeConfig,
1483        consensus_adapter: Arc<ConsensusAdapter>,
1484        checkpoint_store: Arc<CheckpointStore>,
1485        epoch_store: Arc<AuthorityPerEpochStore>,
1486        state: Arc<AuthorityState>,
1487        state_sync_handle: state_sync::Handle,
1488        state_hasher: Weak<GlobalStateHasher>,
1489        checkpoint_metrics: Arc<CheckpointMetrics>,
1490    ) -> Arc<CheckpointService> {
1491        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1492        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1493
1494        debug!(
1495            "Starting checkpoint service with epoch start timestamp {}
1496            and epoch duration {}",
1497            epoch_start_timestamp_ms, epoch_duration_ms
1498        );
1499
1500        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1501            sender: consensus_adapter,
1502            signer: state.secret.clone(),
1503            authority: config.protocol_public_key(),
1504            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1505                .checked_add(epoch_duration_ms)
1506                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1507            metrics: checkpoint_metrics.clone(),
1508        });
1509
1510        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1511        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1512        let max_checkpoint_size_bytes =
1513            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1514
1515        CheckpointService::build(
1516            state.clone(),
1517            checkpoint_store,
1518            epoch_store,
1519            state.get_transaction_cache_reader().clone(),
1520            state_hasher,
1521            checkpoint_output,
1522            Box::new(certified_checkpoint_output),
1523            checkpoint_metrics,
1524            max_tx_per_checkpoint,
1525            max_checkpoint_size_bytes,
1526        )
1527    }
1528
1529    fn construct_consensus_adapter(
1530        committee: &Committee,
1531        consensus_config: &ConsensusConfig,
1532        authority: AuthorityName,
1533        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1534        prometheus_registry: &Registry,
1535        protocol_config: ProtocolConfig,
1536        consensus_client: Arc<dyn ConsensusClient>,
1537        checkpoint_store: Arc<CheckpointStore>,
1538    ) -> ConsensusAdapter {
1539        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1540        // The consensus adapter allows the authority to send user certificates through consensus.
1541
1542        ConsensusAdapter::new(
1543            consensus_client,
1544            checkpoint_store,
1545            authority,
1546            connection_monitor_status,
1547            consensus_config.max_pending_transactions(),
1548            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1549            consensus_config.max_submit_position,
1550            consensus_config.submit_delay_step_override(),
1551            ca_metrics,
1552            protocol_config,
1553        )
1554    }
1555
1556    async fn start_grpc_validator_service(
1557        config: &NodeConfig,
1558        state: Arc<AuthorityState>,
1559        consensus_adapter: Arc<ConsensusAdapter>,
1560        prometheus_registry: &Registry,
1561    ) -> Result<SpawnOnce> {
1562        let validator_service = ValidatorService::new(
1563            state.clone(),
1564            consensus_adapter,
1565            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1566            config.policy_config.clone().map(|p| p.client_id_source),
1567        );
1568
1569        let mut server_conf = mysten_network::config::Config::new();
1570        server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1571        server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1572        server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1573        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1574        server_conf.load_shed = config.grpc_load_shed;
1575        let mut server_builder =
1576            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1577
1578        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1579
1580        let tls_config = sui_tls::create_rustls_server_config(
1581            config.network_key_pair().copy().private(),
1582            SUI_TLS_SERVER_NAME.to_string(),
1583        );
1584
1585        let network_address = config.network_address().clone();
1586
1587        let (ready_tx, ready_rx) = oneshot::channel();
1588
1589        Ok(SpawnOnce::new(ready_rx, async move {
1590            let server = server_builder
1591                .bind(&network_address, Some(tls_config))
1592                .await
1593                .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1594            let local_addr = server.local_addr();
1595            info!("Listening to traffic on {local_addr}");
1596            ready_tx.send(()).unwrap();
1597            if let Err(err) = server.serve().await {
1598                info!("Server stopped: {err}");
1599            }
1600            info!("Server stopped");
1601        }))
1602    }
1603
1604    /// Re-executes pending consensus certificates, which may not have been committed to disk
1605    /// before the node restarted. This is necessary for the following reasons:
1606    ///
1607    /// 1. For any transaction for which we returned signed effects to a client, we must ensure
1608    ///    that we have re-executed the transaction before we begin accepting grpc requests.
1609    ///    Otherwise we would appear to have forgotten about the transaction.
1610    /// 2. While this is running, we are concurrently waiting for all previously built checkpoints
1611    ///    to be rebuilt. Since there may be dependencies in either direction (from checkpointed
1612    ///    consensus transactions to pending consensus transactions, or vice versa), we must
1613    ///    re-execute pending consensus transactions to ensure that both processes can complete.
1614    /// 3. Also note that for any pending consensus transactions for which we wrote a signed effects
1615    ///    digest to disk, we must re-execute using that digest as the expected effects digest,
1616    ///    to ensure that we cannot arrive at different effects than what we previously signed.
1617    async fn reexecute_pending_consensus_certs(
1618        epoch_store: &Arc<AuthorityPerEpochStore>,
1619        state: &Arc<AuthorityState>,
1620    ) {
1621        let mut pending_consensus_certificates = Vec::new();
1622        let mut additional_certs = Vec::new();
1623
1624        for tx in epoch_store.get_all_pending_consensus_transactions() {
1625            match tx.kind {
1626                // Shared object txns cannot be re-executed at this point, because we must wait for
1627                // consensus replay to assign shared object versions.
1628                ConsensusTransactionKind::CertifiedTransaction(tx) if !tx.is_consensus_tx() => {
1629                    let tx = *tx;
1630                    // new_unchecked is safe because we never submit a transaction to consensus
1631                    // without verifying it
1632                    let tx = VerifiedExecutableTransaction::new_from_certificate(
1633                        VerifiedCertificate::new_unchecked(tx),
1634                    );
1635                    // we only need to re-execute if we previously signed the effects (which indicates we
1636                    // returned the effects to a client).
1637                    if let Some(fx_digest) = epoch_store
1638                        .get_signed_effects_digest(tx.digest())
1639                        .expect("db error")
1640                    {
1641                        pending_consensus_certificates.push((
1642                            Schedulable::Transaction(tx),
1643                            ExecutionEnv::new().with_expected_effects_digest(fx_digest),
1644                        ));
1645                    } else {
1646                        additional_certs.push((
1647                            Schedulable::Transaction(tx),
1648                            ExecutionEnv::new()
1649                                .with_scheduling_source(SchedulingSource::NonFastPath),
1650                        ));
1651                    }
1652                }
1653                _ => (),
1654            }
1655        }
1656
1657        let digests = pending_consensus_certificates
1658            .iter()
1659            // unwrap_digest okay because only user certs are in pending_consensus_certificates
1660            .map(|(tx, _)| *tx.key().unwrap_digest())
1661            .collect::<Vec<_>>();
1662
1663        info!(
1664            "reexecuting {} pending consensus certificates: {:?}",
1665            digests.len(),
1666            digests
1667        );
1668
1669        state
1670            .execution_scheduler()
1671            .enqueue(pending_consensus_certificates, epoch_store);
1672        state
1673            .execution_scheduler()
1674            .enqueue(additional_certs, epoch_store);
1675
1676        // If this times out, the validator will still almost certainly start up fine. But, it is
1677        // possible that it may temporarily "forget" about transactions that it had previously
1678        // executed. This could confuse clients in some circumstances. However, the transactions
1679        // are still in pending_consensus_certificates, so we cannot lose any finality guarantees.
1680        let timeout = if cfg!(msim) { 120 } else { 60 };
1681        if tokio::time::timeout(
1682            std::time::Duration::from_secs(timeout),
1683            state
1684                .get_transaction_cache_reader()
1685                .notify_read_executed_effects_digests(
1686                    "SuiNode::notify_read_executed_effects_digests",
1687                    &digests,
1688                ),
1689        )
1690        .await
1691        .is_err()
1692        {
1693            // Log all the digests that were not executed to help debugging.
1694            let executed_effects_digests = state
1695                .get_transaction_cache_reader()
1696                .multi_get_executed_effects_digests(&digests);
1697            let pending_digests = digests
1698                .iter()
1699                .zip(executed_effects_digests.iter())
1700                .filter_map(|(digest, executed_effects_digest)| {
1701                    if executed_effects_digest.is_none() {
1702                        Some(digest)
1703                    } else {
1704                        None
1705                    }
1706                })
1707                .collect::<Vec<_>>();
1708            debug_fatal!(
1709                "Timed out waiting for effects digests to be executed: {:?}",
1710                pending_digests
1711            );
1712        }
1713    }
1714
1715    pub fn state(&self) -> Arc<AuthorityState> {
1716        self.state.clone()
1717    }
1718
1719    // Only used for testing because of how epoch store is loaded.
1720    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1721        self.state.reference_gas_price_for_testing()
1722    }
1723
1724    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1725        self.state.committee_store().clone()
1726    }
1727
1728    /*
1729    pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1730        self.state.db()
1731    }
1732    */
1733
1734    /// Clone an AuthorityAggregator currently used in this node, if the node is a fullnode.
1735    /// After reconfig, Transaction Driver builds a new AuthorityAggregator. The caller
1736    /// of this function will mostly likely want to call this again
1737    /// to get a fresh one.
1738    pub fn clone_authority_aggregator(
1739        &self,
1740    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1741        self.transaction_orchestrator
1742            .as_ref()
1743            .map(|to| to.clone_authority_aggregator())
1744    }
1745
1746    pub fn transaction_orchestrator(
1747        &self,
1748    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1749        self.transaction_orchestrator.clone()
1750    }
1751
1752    /// This function awaits the completion of checkpoint execution of the current epoch,
1753    /// after which it initiates reconfiguration of the entire system.
1754    pub async fn monitor_reconfiguration(
1755        self: Arc<Self>,
1756        mut epoch_store: Arc<AuthorityPerEpochStore>,
1757    ) -> Result<()> {
1758        let checkpoint_executor_metrics =
1759            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1760
1761        loop {
1762            let mut hasher_guard = self.global_state_hasher.lock().await;
1763            let hasher = hasher_guard.take().unwrap();
1764            info!(
1765                "Creating checkpoint executor for epoch {}",
1766                epoch_store.epoch()
1767            );
1768            let checkpoint_executor = CheckpointExecutor::new(
1769                epoch_store.clone(),
1770                self.checkpoint_store.clone(),
1771                self.state.clone(),
1772                hasher.clone(),
1773                self.backpressure_manager.clone(),
1774                self.config.checkpoint_executor_config.clone(),
1775                checkpoint_executor_metrics.clone(),
1776                self.subscription_service_checkpoint_sender.clone(),
1777            );
1778
1779            let run_with_range = self.config.run_with_range;
1780
1781            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1782
1783            // Update the current protocol version metric.
1784            self.metrics
1785                .current_protocol_version
1786                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1787
1788            // Advertise capabilities to committee, if we are a validator.
1789            if let Some(components) = &*self.validator_components.lock().await {
1790                // TODO: without this sleep, the consensus message is not delivered reliably.
1791                tokio::time::sleep(Duration::from_millis(1)).await;
1792
1793                let config = cur_epoch_store.protocol_config();
1794                let mut supported_protocol_versions = self
1795                    .config
1796                    .supported_protocol_versions
1797                    .expect("Supported versions should be populated")
1798                    // no need to send digests of versions less than the current version
1799                    .truncate_below(config.version);
1800
1801                while supported_protocol_versions.max > config.version {
1802                    let proposed_protocol_config = ProtocolConfig::get_for_version(
1803                        supported_protocol_versions.max,
1804                        cur_epoch_store.get_chain(),
1805                    );
1806
1807                    if proposed_protocol_config.enable_accumulators()
1808                        && !epoch_store.accumulator_root_exists()
1809                    {
1810                        error!(
1811                            "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1812                            supported_protocol_versions.max
1813                        );
1814                        supported_protocol_versions.max = supported_protocol_versions.max.prev();
1815                    } else {
1816                        break;
1817                    }
1818                }
1819
1820                let binary_config = config.binary_config(None);
1821                let transaction = if config.authority_capabilities_v2() {
1822                    ConsensusTransaction::new_capability_notification_v2(
1823                        AuthorityCapabilitiesV2::new(
1824                            self.state.name,
1825                            cur_epoch_store.get_chain_identifier().chain(),
1826                            supported_protocol_versions,
1827                            self.state
1828                                .get_available_system_packages(&binary_config)
1829                                .await,
1830                        ),
1831                    )
1832                } else {
1833                    ConsensusTransaction::new_capability_notification(AuthorityCapabilitiesV1::new(
1834                        self.state.name,
1835                        self.config
1836                            .supported_protocol_versions
1837                            .expect("Supported versions should be populated"),
1838                        self.state
1839                            .get_available_system_packages(&binary_config)
1840                            .await,
1841                    ))
1842                };
1843                info!(?transaction, "submitting capabilities to consensus");
1844                components.consensus_adapter.submit(
1845                    transaction,
1846                    None,
1847                    &cur_epoch_store,
1848                    None,
1849                    None,
1850                )?;
1851            }
1852
1853            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1854
1855            if stop_condition == StopReason::RunWithRangeCondition {
1856                SuiNode::shutdown(&self).await;
1857                self.shutdown_channel_tx
1858                    .send(run_with_range)
1859                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1860                return Ok(());
1861            }
1862
1863            // Safe to call because we are in the middle of reconfiguration.
1864            let latest_system_state = self
1865                .state
1866                .get_object_cache_reader()
1867                .get_sui_system_state_object_unsafe()
1868                .expect("Read Sui System State object cannot fail");
1869
1870            #[cfg(msim)]
1871            if !self
1872                .sim_state
1873                .sim_safe_mode_expected
1874                .load(Ordering::Relaxed)
1875            {
1876                debug_assert!(!latest_system_state.safe_mode());
1877            }
1878
1879            #[cfg(not(msim))]
1880            debug_assert!(!latest_system_state.safe_mode());
1881
1882            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1883                && self.state.is_fullnode(&cur_epoch_store)
1884            {
1885                warn!(
1886                    "Failed to send end of epoch notification to subscriber: {:?}",
1887                    err
1888                );
1889            }
1890
1891            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1892            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1893
1894            self.auth_agg.store(Arc::new(
1895                self.auth_agg
1896                    .load()
1897                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1898            ));
1899
1900            let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1901            let next_epoch = next_epoch_committee.epoch();
1902            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1903
1904            info!(
1905                next_epoch,
1906                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1907            );
1908
1909            fail_point_async!("reconfig_delay");
1910
1911            // We save the connection monitor status map regardless of validator / fullnode status
1912            // so that we don't need to restart the connection monitor every epoch.
1913            // Update the mappings that will be used by the consensus adapter if it exists or is
1914            // about to be created.
1915            let authority_names_to_peer_ids =
1916                new_epoch_start_state.get_authority_names_to_peer_ids();
1917            self.connection_monitor_status
1918                .update_mapping_for_epoch(authority_names_to_peer_ids);
1919
1920            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1921
1922            let _ = send_trusted_peer_change(
1923                &self.config,
1924                &self.trusted_peer_change_tx,
1925                &new_epoch_start_state,
1926            );
1927
1928            let mut validator_components_lock_guard = self.validator_components.lock().await;
1929
1930            // The following code handles 4 different cases, depending on whether the node
1931            // was a validator in the previous epoch, and whether the node is a validator
1932            // in the new epoch.
1933            let new_epoch_store = self
1934                .reconfigure_state(
1935                    &self.state,
1936                    &cur_epoch_store,
1937                    next_epoch_committee.clone(),
1938                    new_epoch_start_state,
1939                    hasher.clone(),
1940                )
1941                .await;
1942
1943            let new_validator_components = if let Some(ValidatorComponents {
1944                validator_server_handle,
1945                validator_overload_monitor_handle,
1946                consensus_manager,
1947                consensus_store_pruner,
1948                consensus_adapter,
1949                checkpoint_metrics,
1950                sui_tx_validator_metrics,
1951            }) = validator_components_lock_guard.take()
1952            {
1953                info!("Reconfiguring the validator.");
1954
1955                consensus_manager.shutdown().await;
1956                info!("Consensus has shut down.");
1957
1958                info!("Epoch store finished reconfiguration.");
1959
1960                // No other components should be holding a strong reference to state hasher
1961                // at this point. Confirm here before we swap in the new hasher.
1962                let global_state_hasher_metrics = Arc::into_inner(hasher)
1963                    .expect("Object state hasher should have no other references at this point")
1964                    .metrics();
1965                let new_hasher = Arc::new(GlobalStateHasher::new(
1966                    self.state.get_global_state_hash_store().clone(),
1967                    global_state_hasher_metrics,
1968                ));
1969                let weak_hasher = Arc::downgrade(&new_hasher);
1970                *hasher_guard = Some(new_hasher);
1971
1972                consensus_store_pruner.prune(next_epoch).await;
1973
1974                if self.state.is_validator(&new_epoch_store) {
1975                    // Only restart consensus if this node is still a validator in the new epoch.
1976                    Some(
1977                        Self::start_epoch_specific_validator_components(
1978                            &self.config,
1979                            self.state.clone(),
1980                            consensus_adapter,
1981                            self.checkpoint_store.clone(),
1982                            new_epoch_store.clone(),
1983                            self.state_sync_handle.clone(),
1984                            self.randomness_handle.clone(),
1985                            consensus_manager,
1986                            consensus_store_pruner,
1987                            weak_hasher,
1988                            self.backpressure_manager.clone(),
1989                            validator_server_handle,
1990                            validator_overload_monitor_handle,
1991                            checkpoint_metrics,
1992                            self.metrics.clone(),
1993                            sui_tx_validator_metrics,
1994                        )
1995                        .await?,
1996                    )
1997                } else {
1998                    info!("This node is no longer a validator after reconfiguration");
1999                    None
2000                }
2001            } else {
2002                // No other components should be holding a strong reference to state hasher
2003                // at this point. Confirm here before we swap in the new hasher.
2004                let global_state_hasher_metrics = Arc::into_inner(hasher)
2005                    .expect("Object state hasher should have no other references at this point")
2006                    .metrics();
2007                let new_hasher = Arc::new(GlobalStateHasher::new(
2008                    self.state.get_global_state_hash_store().clone(),
2009                    global_state_hasher_metrics,
2010                ));
2011                let weak_hasher = Arc::downgrade(&new_hasher);
2012                *hasher_guard = Some(new_hasher);
2013
2014                if self.state.is_validator(&new_epoch_store) {
2015                    info!("Promoting the node from fullnode to validator, starting grpc server");
2016
2017                    let mut components = Self::construct_validator_components(
2018                        self.config.clone(),
2019                        self.state.clone(),
2020                        Arc::new(next_epoch_committee.clone()),
2021                        new_epoch_store.clone(),
2022                        self.checkpoint_store.clone(),
2023                        self.state_sync_handle.clone(),
2024                        self.randomness_handle.clone(),
2025                        weak_hasher,
2026                        self.backpressure_manager.clone(),
2027                        self.connection_monitor_status.clone(),
2028                        &self.registry_service,
2029                        self.metrics.clone(),
2030                        self.checkpoint_metrics.clone(),
2031                    )
2032                    .await?;
2033
2034                    components.validator_server_handle =
2035                        components.validator_server_handle.start().await;
2036
2037                    Some(components)
2038                } else {
2039                    None
2040                }
2041            };
2042            *validator_components_lock_guard = new_validator_components;
2043
2044            // Force releasing current epoch store DB handle, because the
2045            // Arc<AuthorityPerEpochStore> may linger.
2046            cur_epoch_store.release_db_handles();
2047
2048            if cfg!(msim)
2049                && !matches!(
2050                    self.config
2051                        .authority_store_pruning_config
2052                        .num_epochs_to_retain_for_checkpoints(),
2053                    None | Some(u64::MAX) | Some(0)
2054                )
2055            {
2056                self.state
2057                    .prune_checkpoints_for_eligible_epochs_for_testing(
2058                        self.config.clone(),
2059                        sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2060                    )
2061                    .await?;
2062            }
2063
2064            epoch_store = new_epoch_store;
2065            info!("Reconfiguration finished");
2066        }
2067    }
2068
2069    async fn shutdown(&self) {
2070        if let Some(validator_components) = &*self.validator_components.lock().await {
2071            validator_components.consensus_manager.shutdown().await;
2072        }
2073    }
2074
2075    async fn reconfigure_state(
2076        &self,
2077        state: &Arc<AuthorityState>,
2078        cur_epoch_store: &AuthorityPerEpochStore,
2079        next_epoch_committee: Committee,
2080        next_epoch_start_system_state: EpochStartSystemState,
2081        global_state_hasher: Arc<GlobalStateHasher>,
2082    ) -> Arc<AuthorityPerEpochStore> {
2083        let next_epoch = next_epoch_committee.epoch();
2084
2085        let last_checkpoint = self
2086            .checkpoint_store
2087            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2088            .expect("Error loading last checkpoint for current epoch")
2089            .expect("Could not load last checkpoint for current epoch");
2090
2091        let last_checkpoint_seq = *last_checkpoint.sequence_number();
2092
2093        assert_eq!(
2094            Some(last_checkpoint_seq),
2095            self.checkpoint_store
2096                .get_highest_executed_checkpoint_seq_number()
2097                .expect("Error loading highest executed checkpoint sequence number")
2098        );
2099
2100        let epoch_start_configuration = EpochStartConfiguration::new(
2101            next_epoch_start_system_state,
2102            *last_checkpoint.digest(),
2103            state.get_object_store().as_ref(),
2104            EpochFlag::default_flags_for_new_epoch(&state.config),
2105        )
2106        .expect("EpochStartConfiguration construction cannot fail");
2107
2108        let new_epoch_store = self
2109            .state
2110            .reconfigure(
2111                cur_epoch_store,
2112                self.config.supported_protocol_versions.unwrap(),
2113                next_epoch_committee,
2114                epoch_start_configuration,
2115                global_state_hasher,
2116                &self.config.expensive_safety_check_config,
2117                last_checkpoint_seq,
2118            )
2119            .await
2120            .expect("Reconfigure authority state cannot fail");
2121        info!(next_epoch, "Node State has been reconfigured");
2122        assert_eq!(next_epoch, new_epoch_store.epoch());
2123        self.state.get_reconfig_api().update_epoch_flags_metrics(
2124            cur_epoch_store.epoch_start_config().flags(),
2125            new_epoch_store.epoch_start_config().flags(),
2126        );
2127
2128        new_epoch_store
2129    }
2130
2131    pub fn get_config(&self) -> &NodeConfig {
2132        &self.config
2133    }
2134
2135    pub fn randomness_handle(&self) -> randomness::Handle {
2136        self.randomness_handle.clone()
2137    }
2138
2139    /// Get a short prefix of a digest for metric labels
2140    fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2141        let digest_str = digest.to_string();
2142        if digest_str.len() >= 8 {
2143            digest_str[0..8].to_string()
2144        } else {
2145            digest_str
2146        }
2147    }
2148
2149    /// Check for previously detected forks and handle them appropriately.
2150    /// For validators with fork recovery config, clear the fork if it matches the recovery config.
2151    /// For all other cases, block node startup if a fork is detected.
2152    async fn check_and_recover_forks(
2153        checkpoint_store: &CheckpointStore,
2154        checkpoint_metrics: &CheckpointMetrics,
2155        is_validator: bool,
2156        fork_recovery: Option<&ForkRecoveryConfig>,
2157    ) -> Result<()> {
2158        // Fork detection and recovery is only relevant for validators
2159        // Fullnodes should sync from validators and don't need fork checking
2160        if !is_validator {
2161            return Ok(());
2162        }
2163
2164        // Try to recover from forks if recovery config is provided
2165        if let Some(recovery) = fork_recovery {
2166            Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2167            Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2168        }
2169
2170        if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2171            .get_checkpoint_fork_detected()
2172            .map_err(|e| {
2173                error!("Failed to check for checkpoint fork: {:?}", e);
2174                e
2175            })?
2176        {
2177            Self::handle_checkpoint_fork(
2178                checkpoint_seq,
2179                checkpoint_digest,
2180                checkpoint_metrics,
2181                fork_recovery,
2182            )
2183            .await?;
2184        }
2185        if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2186            .get_transaction_fork_detected()
2187            .map_err(|e| {
2188                error!("Failed to check for transaction fork: {:?}", e);
2189                e
2190            })?
2191        {
2192            Self::handle_transaction_fork(
2193                tx_digest,
2194                expected_effects,
2195                actual_effects,
2196                checkpoint_metrics,
2197                fork_recovery,
2198            )
2199            .await?;
2200        }
2201
2202        Ok(())
2203    }
2204
2205    fn try_recover_checkpoint_fork(
2206        checkpoint_store: &CheckpointStore,
2207        recovery: &ForkRecoveryConfig,
2208    ) -> Result<()> {
2209        // If configured overrides include a checkpoint whose locally computed digest mismatches,
2210        // clear locally computed checkpoints from that sequence (inclusive).
2211        for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2212            let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2213                anyhow::bail!(
2214                    "Invalid checkpoint digest override for seq {}: {}",
2215                    seq,
2216                    expected_digest_str
2217                );
2218            };
2219
2220            if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2221                let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2222                if local_digest != expected_digest {
2223                    info!(
2224                        seq,
2225                        local = %Self::get_digest_prefix(local_digest),
2226                        expected = %Self::get_digest_prefix(expected_digest),
2227                        "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2228                        seq
2229                    );
2230                    checkpoint_store
2231                        .clear_locally_computed_checkpoints_from(*seq)
2232                        .context(
2233                            "Failed to clear locally computed checkpoints from override seq",
2234                        )?;
2235                }
2236            }
2237        }
2238
2239        if let Some((checkpoint_seq, checkpoint_digest)) =
2240            checkpoint_store.get_checkpoint_fork_detected()?
2241            && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2242        {
2243            info!(
2244                "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2245                checkpoint_seq, checkpoint_digest
2246            );
2247            checkpoint_store
2248                .clear_checkpoint_fork_detected()
2249                .expect("Failed to clear checkpoint fork detected marker");
2250        }
2251        Ok(())
2252    }
2253
2254    fn try_recover_transaction_fork(
2255        checkpoint_store: &CheckpointStore,
2256        recovery: &ForkRecoveryConfig,
2257    ) -> Result<()> {
2258        if recovery.transaction_overrides.is_empty() {
2259            return Ok(());
2260        }
2261
2262        if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2263            && recovery
2264                .transaction_overrides
2265                .contains_key(&tx_digest.to_string())
2266        {
2267            info!(
2268                "Fork recovery enabled: clearing transaction fork for tx {:?}",
2269                tx_digest
2270            );
2271            checkpoint_store
2272                .clear_transaction_fork_detected()
2273                .expect("Failed to clear transaction fork detected marker");
2274        }
2275        Ok(())
2276    }
2277
2278    fn get_current_timestamp() -> u64 {
2279        std::time::SystemTime::now()
2280            .duration_since(std::time::SystemTime::UNIX_EPOCH)
2281            .unwrap()
2282            .as_secs()
2283    }
2284
2285    async fn handle_checkpoint_fork(
2286        checkpoint_seq: u64,
2287        checkpoint_digest: CheckpointDigest,
2288        checkpoint_metrics: &CheckpointMetrics,
2289        fork_recovery: Option<&ForkRecoveryConfig>,
2290    ) -> Result<()> {
2291        checkpoint_metrics
2292            .checkpoint_fork_crash_mode
2293            .with_label_values(&[
2294                &checkpoint_seq.to_string(),
2295                &Self::get_digest_prefix(checkpoint_digest),
2296                &Self::get_current_timestamp().to_string(),
2297            ])
2298            .set(1);
2299
2300        let behavior = fork_recovery
2301            .map(|fr| fr.fork_crash_behavior)
2302            .unwrap_or_default();
2303
2304        match behavior {
2305            ForkCrashBehavior::AwaitForkRecovery => {
2306                error!(
2307                    checkpoint_seq = checkpoint_seq,
2308                    checkpoint_digest = ?checkpoint_digest,
2309                    "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2310                );
2311                futures::future::pending::<()>().await;
2312                unreachable!("pending() should never return");
2313            }
2314            ForkCrashBehavior::ReturnError => {
2315                error!(
2316                    checkpoint_seq = checkpoint_seq,
2317                    checkpoint_digest = ?checkpoint_digest,
2318                    "Checkpoint fork detected! Returning error."
2319                );
2320                Err(anyhow::anyhow!(
2321                    "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2322                    checkpoint_seq,
2323                    checkpoint_digest
2324                ))
2325            }
2326        }
2327    }
2328
2329    async fn handle_transaction_fork(
2330        tx_digest: TransactionDigest,
2331        expected_effects_digest: TransactionEffectsDigest,
2332        actual_effects_digest: TransactionEffectsDigest,
2333        checkpoint_metrics: &CheckpointMetrics,
2334        fork_recovery: Option<&ForkRecoveryConfig>,
2335    ) -> Result<()> {
2336        checkpoint_metrics
2337            .transaction_fork_crash_mode
2338            .with_label_values(&[
2339                &Self::get_digest_prefix(tx_digest),
2340                &Self::get_digest_prefix(expected_effects_digest),
2341                &Self::get_digest_prefix(actual_effects_digest),
2342                &Self::get_current_timestamp().to_string(),
2343            ])
2344            .set(1);
2345
2346        let behavior = fork_recovery
2347            .map(|fr| fr.fork_crash_behavior)
2348            .unwrap_or_default();
2349
2350        match behavior {
2351            ForkCrashBehavior::AwaitForkRecovery => {
2352                error!(
2353                    tx_digest = ?tx_digest,
2354                    expected_effects_digest = ?expected_effects_digest,
2355                    actual_effects_digest = ?actual_effects_digest,
2356                    "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2357                );
2358                futures::future::pending::<()>().await;
2359                unreachable!("pending() should never return");
2360            }
2361            ForkCrashBehavior::ReturnError => {
2362                error!(
2363                    tx_digest = ?tx_digest,
2364                    expected_effects_digest = ?expected_effects_digest,
2365                    actual_effects_digest = ?actual_effects_digest,
2366                    "Transaction fork detected! Returning error."
2367                );
2368                Err(anyhow::anyhow!(
2369                    "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2370                    tx_digest,
2371                    expected_effects_digest,
2372                    actual_effects_digest
2373                ))
2374            }
2375        }
2376    }
2377}
2378
2379#[cfg(not(msim))]
2380impl SuiNode {
2381    async fn fetch_jwks(
2382        _authority: AuthorityName,
2383        provider: &OIDCProvider,
2384    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2385        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2386        use sui_types::error::SuiErrorKind;
2387        let client = reqwest::Client::new();
2388        fetch_jwks(provider, &client, true)
2389            .await
2390            .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2391    }
2392}
2393
2394#[cfg(msim)]
2395impl SuiNode {
2396    pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2397        self.sim_state.sim_node.id()
2398    }
2399
2400    pub fn set_safe_mode_expected(&self, new_value: bool) {
2401        info!("Setting safe mode expected to {}", new_value);
2402        self.sim_state
2403            .sim_safe_mode_expected
2404            .store(new_value, Ordering::Relaxed);
2405    }
2406
2407    #[allow(unused_variables)]
2408    async fn fetch_jwks(
2409        authority: AuthorityName,
2410        provider: &OIDCProvider,
2411    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2412        get_jwk_injector()(authority, provider)
2413    }
2414}
2415
2416enum SpawnOnce {
2417    // Mutex is only needed to make SpawnOnce Send
2418    Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2419    #[allow(unused)]
2420    Started(JoinHandle<()>),
2421}
2422
2423impl SpawnOnce {
2424    pub fn new(
2425        ready_rx: oneshot::Receiver<()>,
2426        future: impl Future<Output = ()> + Send + 'static,
2427    ) -> Self {
2428        Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2429    }
2430
2431    pub async fn start(self) -> Self {
2432        match self {
2433            Self::Unstarted(ready_rx, future) => {
2434                let future = future.into_inner();
2435                let handle = tokio::spawn(future);
2436                ready_rx.await.unwrap();
2437                Self::Started(handle)
2438            }
2439            Self::Started(_) => self,
2440        }
2441    }
2442}
2443
2444/// Notify state-sync that a new list of trusted peers are now available.
2445fn send_trusted_peer_change(
2446    config: &NodeConfig,
2447    sender: &watch::Sender<TrustedPeerChangeEvent>,
2448    epoch_state_state: &EpochStartSystemState,
2449) -> Result<(), watch::error::SendError<TrustedPeerChangeEvent>> {
2450    sender
2451        .send(TrustedPeerChangeEvent {
2452            new_peers: epoch_state_state.get_validator_as_p2p_peers(config.protocol_public_key()),
2453        })
2454        .tap_err(|err| {
2455            warn!(
2456                "Failed to send validator peer information to state sync: {:?}",
2457                err
2458            );
2459        })
2460}
2461
2462fn build_kv_store(
2463    state: &Arc<AuthorityState>,
2464    config: &NodeConfig,
2465    registry: &Registry,
2466) -> Result<Arc<TransactionKeyValueStore>> {
2467    let metrics = KeyValueStoreMetrics::new(registry);
2468    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2469
2470    let base_url = &config.transaction_kv_store_read_config.base_url;
2471
2472    if base_url.is_empty() {
2473        info!("no http kv store url provided, using local db only");
2474        return Ok(Arc::new(db_store));
2475    }
2476
2477    let base_url: url::Url = base_url.parse().tap_err(|e| {
2478        error!(
2479            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2480            base_url, e
2481        )
2482    })?;
2483
2484    let network_str = match state.get_chain_identifier().chain() {
2485        Chain::Mainnet => "/mainnet",
2486        _ => {
2487            info!("using local db only for kv store");
2488            return Ok(Arc::new(db_store));
2489        }
2490    };
2491
2492    let base_url = base_url.join(network_str)?.to_string();
2493    let http_store = HttpKVStore::new_kv(
2494        &base_url,
2495        config.transaction_kv_store_read_config.cache_size,
2496        metrics.clone(),
2497    )?;
2498    info!("using local key-value store with fallback to http key-value store");
2499    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2500        db_store,
2501        http_store,
2502        metrics,
2503        "json_rpc_fallback",
2504    )))
2505}
2506
2507async fn build_http_servers(
2508    state: Arc<AuthorityState>,
2509    store: RocksDbStore,
2510    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2511    config: &NodeConfig,
2512    prometheus_registry: &Registry,
2513    server_version: ServerVersion,
2514) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2515    // Validators do not expose these APIs
2516    if config.consensus_config().is_some() {
2517        return Ok((HttpServers::default(), None));
2518    }
2519
2520    let mut router = axum::Router::new();
2521
2522    let json_rpc_router = {
2523        let traffic_controller = state.traffic_controller.clone();
2524        let mut server = JsonRpcServerBuilder::new(
2525            env!("CARGO_PKG_VERSION"),
2526            prometheus_registry,
2527            traffic_controller,
2528            config.policy_config.clone(),
2529        );
2530
2531        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2532
2533        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2534        server.register_module(ReadApi::new(
2535            state.clone(),
2536            kv_store.clone(),
2537            metrics.clone(),
2538        ))?;
2539        server.register_module(CoinReadApi::new(
2540            state.clone(),
2541            kv_store.clone(),
2542            metrics.clone(),
2543        ))?;
2544
2545        // if run_with_range is enabled we want to prevent any transactions
2546        // run_with_range = None is normal operating conditions
2547        if config.run_with_range.is_none() {
2548            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2549        }
2550        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2551        server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2552
2553        if let Some(transaction_orchestrator) = transaction_orchestrator {
2554            server.register_module(TransactionExecutionApi::new(
2555                state.clone(),
2556                transaction_orchestrator.clone(),
2557                metrics.clone(),
2558            ))?;
2559        }
2560
2561        let name_service_config =
2562            if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2563                config.name_service_package_address,
2564                config.name_service_registry_id,
2565                config.name_service_reverse_registry_id,
2566            ) {
2567                sui_name_service::NameServiceConfig::new(
2568                    package_address,
2569                    registry_id,
2570                    reverse_registry_id,
2571                )
2572            } else {
2573                match state.get_chain_identifier().chain() {
2574                    Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2575                    Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2576                    Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2577                }
2578            };
2579
2580        server.register_module(IndexerApi::new(
2581            state.clone(),
2582            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2583            kv_store,
2584            name_service_config,
2585            metrics,
2586            config.indexer_max_subscriptions,
2587        ))?;
2588        server.register_module(MoveUtils::new(state.clone()))?;
2589
2590        let server_type = config.jsonrpc_server_type();
2591
2592        server.to_router(server_type).await?
2593    };
2594
2595    router = router.merge(json_rpc_router);
2596
2597    let (subscription_service_checkpoint_sender, subscription_service_handle) =
2598        SubscriptionService::build(prometheus_registry);
2599    let rpc_router = {
2600        let mut rpc_service =
2601            sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2602        rpc_service.with_server_version(server_version);
2603
2604        if let Some(config) = config.rpc.clone() {
2605            rpc_service.with_config(config);
2606        }
2607
2608        rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2609        rpc_service.with_subscription_service(subscription_service_handle);
2610
2611        if let Some(transaction_orchestrator) = transaction_orchestrator {
2612            rpc_service.with_executor(transaction_orchestrator.clone())
2613        }
2614
2615        rpc_service.into_router().await
2616    };
2617
2618    let layers = ServiceBuilder::new()
2619        .map_request(|mut request: axum::http::Request<_>| {
2620            if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2621                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2622                request.extensions_mut().insert(axum_connect_info);
2623            }
2624            request
2625        })
2626        .layer(axum::middleware::from_fn(server_timing_middleware))
2627        // Setup a permissive CORS policy
2628        .layer(
2629            tower_http::cors::CorsLayer::new()
2630                .allow_methods([http::Method::GET, http::Method::POST])
2631                .allow_origin(tower_http::cors::Any)
2632                .allow_headers(tower_http::cors::Any),
2633        );
2634
2635    router = router.merge(rpc_router).layer(layers);
2636
2637    let https = if let Some((tls_config, https_address)) = config
2638        .rpc()
2639        .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2640    {
2641        let https = sui_http::Builder::new()
2642            .tls_single_cert(tls_config.cert(), tls_config.key())
2643            .and_then(|builder| builder.serve(https_address, router.clone()))
2644            .map_err(|e| anyhow::anyhow!(e))?;
2645
2646        info!(
2647            https_address =? https.local_addr(),
2648            "HTTPS rpc server listening on {}",
2649            https.local_addr()
2650        );
2651
2652        Some(https)
2653    } else {
2654        None
2655    };
2656
2657    let http = sui_http::Builder::new()
2658        .serve(&config.json_rpc_address, router)
2659        .map_err(|e| anyhow::anyhow!(e))?;
2660
2661    info!(
2662        http_address =? http.local_addr(),
2663        "HTTP rpc server listening on {}",
2664        http.local_addr()
2665    );
2666
2667    Ok((
2668        HttpServers {
2669            http: Some(http),
2670            https,
2671        },
2672        Some(subscription_service_checkpoint_sender),
2673    ))
2674}
2675
2676#[cfg(not(test))]
2677fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2678    protocol_config.max_transactions_per_checkpoint() as usize
2679}
2680
2681#[cfg(test)]
2682fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2683    2
2684}
2685
2686#[derive(Default)]
2687struct HttpServers {
2688    #[allow(unused)]
2689    http: Option<sui_http::ServerHandle>,
2690    #[allow(unused)]
2691    https: Option<sui_http::ServerHandle>,
2692}
2693
2694#[cfg(test)]
2695mod tests {
2696    use super::*;
2697    use prometheus::Registry;
2698    use std::collections::BTreeMap;
2699    use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2700    use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2701    use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2702
2703    #[tokio::test]
2704    async fn test_fork_error_and_recovery_both_paths() {
2705        let checkpoint_store = CheckpointStore::new_for_tests();
2706        let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2707
2708        // ---------- Checkpoint fork path ----------
2709        let seq_num = 42;
2710        let digest = CheckpointDigest::random();
2711        checkpoint_store
2712            .record_checkpoint_fork_detected(seq_num, digest)
2713            .unwrap();
2714
2715        let fork_recovery = ForkRecoveryConfig {
2716            transaction_overrides: Default::default(),
2717            checkpoint_overrides: Default::default(),
2718            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2719        };
2720
2721        let r = SuiNode::check_and_recover_forks(
2722            &checkpoint_store,
2723            &checkpoint_metrics,
2724            true,
2725            Some(&fork_recovery),
2726        )
2727        .await;
2728        assert!(r.is_err());
2729        assert!(
2730            r.unwrap_err()
2731                .to_string()
2732                .contains("Checkpoint fork detected")
2733        );
2734
2735        let mut checkpoint_overrides = BTreeMap::new();
2736        checkpoint_overrides.insert(seq_num, digest.to_string());
2737        let fork_recovery_with_override = ForkRecoveryConfig {
2738            transaction_overrides: Default::default(),
2739            checkpoint_overrides,
2740            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2741        };
2742        let r = SuiNode::check_and_recover_forks(
2743            &checkpoint_store,
2744            &checkpoint_metrics,
2745            true,
2746            Some(&fork_recovery_with_override),
2747        )
2748        .await;
2749        assert!(r.is_ok());
2750        assert!(
2751            checkpoint_store
2752                .get_checkpoint_fork_detected()
2753                .unwrap()
2754                .is_none()
2755        );
2756
2757        // ---------- Transaction fork path ----------
2758        let tx_digest = TransactionDigest::random();
2759        let expected_effects = TransactionEffectsDigest::random();
2760        let actual_effects = TransactionEffectsDigest::random();
2761        checkpoint_store
2762            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2763            .unwrap();
2764
2765        let fork_recovery = ForkRecoveryConfig {
2766            transaction_overrides: Default::default(),
2767            checkpoint_overrides: Default::default(),
2768            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2769        };
2770        let r = SuiNode::check_and_recover_forks(
2771            &checkpoint_store,
2772            &checkpoint_metrics,
2773            true,
2774            Some(&fork_recovery),
2775        )
2776        .await;
2777        assert!(r.is_err());
2778        assert!(
2779            r.unwrap_err()
2780                .to_string()
2781                .contains("Transaction fork detected")
2782        );
2783
2784        let mut transaction_overrides = BTreeMap::new();
2785        transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2786        let fork_recovery_with_override = ForkRecoveryConfig {
2787            transaction_overrides,
2788            checkpoint_overrides: Default::default(),
2789            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2790        };
2791        let r = SuiNode::check_and_recover_forks(
2792            &checkpoint_store,
2793            &checkpoint_metrics,
2794            true,
2795            Some(&fork_recovery_with_override),
2796        )
2797        .await;
2798        assert!(r.is_ok());
2799        assert!(
2800            checkpoint_store
2801                .get_transaction_fork_detected()
2802                .unwrap()
2803                .is_none()
2804        );
2805    }
2806}