sui_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::admission_queue::{
29    AdmissionQueueContext, AdmissionQueueManager, AdmissionQueueMetrics,
30};
31use sui_core::authority::ExecutionEnv;
32use sui_core::authority::RandomnessRoundReceiver;
33use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
34use sui_core::authority::backpressure::BackpressureManager;
35use sui_core::authority::epoch_start_configuration::EpochFlag;
36use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
37use sui_core::consensus_adapter::ConsensusClient;
38use sui_core::consensus_manager::UpdatableConsensusClient;
39use sui_core::epoch::randomness::RandomnessManager;
40use sui_core::execution_cache::build_execution_cache;
41use sui_network::endpoint_manager::{AddressSource, EndpointId};
42use sui_network::validator::server::SUI_TLS_SERVER_NAME;
43use sui_types::full_checkpoint_content::Checkpoint;
44
45use sui_core::global_state_hasher::GlobalStateHashMetrics;
46use sui_core::storage::RestReadStore;
47use sui_json_rpc::bridge_api::BridgeReadApi;
48use sui_json_rpc_api::JsonRpcMetrics;
49use sui_network::randomness;
50use sui_rpc_api::RpcMetrics;
51use sui_rpc_api::ServerVersion;
52use sui_rpc_api::subscription::SubscriptionService;
53use sui_types::base_types::ConciseableName;
54use sui_types::crypto::RandomnessRound;
55use sui_types::digests::{
56    ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
57};
58use sui_types::messages_consensus::AuthorityCapabilitiesV2;
59use sui_types::sui_system_state::SuiSystemState;
60use tap::tap::TapFallible;
61use tokio::sync::oneshot;
62use tokio::sync::{Mutex, broadcast, mpsc};
63use tokio::task::JoinHandle;
64use tower::ServiceBuilder;
65use tracing::{Instrument, error_span, info};
66use tracing::{debug, error, warn};
67
68// Logs at debug level in test configuration, info level otherwise.
69// JWK logs cause significant volume in tests, but are insignificant in prod,
70// so we keep them at info
71macro_rules! jwk_log {
72    ($($arg:tt)+) => {
73        if in_test_configuration() {
74            debug!($($arg)+);
75        } else {
76            info!($($arg)+);
77        }
78    };
79}
80
81use fastcrypto_zkp::bn254::zk_login::JWK;
82pub use handle::SuiNodeHandle;
83use mysten_metrics::{RegistryService, spawn_monitored_task};
84use mysten_service::server_timing::server_timing_middleware;
85use sui_config::node::{DBCheckpointConfig, RunWithRange};
86use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
87use sui_config::node_config_metrics::NodeConfigMetrics;
88use sui_config::{ConsensusConfig, NodeConfig};
89use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
90use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
91use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
92use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
93use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
94use sui_core::authority_aggregator::AuthorityAggregator;
95use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
96use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
97use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
98use sui_core::checkpoints::{
99    CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
100    SubmitCheckpointToConsensus,
101};
102use sui_core::consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics};
103use sui_core::consensus_manager::ConsensusManager;
104use sui_core::consensus_throughput_calculator::ConsensusThroughputCalculator;
105use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
106use sui_core::db_checkpoint_handler::DBCheckpointHandler;
107use sui_core::epoch::committee_store::CommitteeStore;
108use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
109use sui_core::epoch::epoch_metrics::EpochMetrics;
110use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
111use sui_core::global_state_hasher::GlobalStateHasher;
112use sui_core::jsonrpc_index::IndexStore;
113use sui_core::module_cache_metrics::ResolverMetrics;
114use sui_core::overload_monitor::overload_monitor;
115use sui_core::rpc_index::RpcIndexStore;
116use sui_core::signature_verifier::SignatureVerifierMetrics;
117use sui_core::storage::RocksDbStore;
118use sui_core::transaction_orchestrator::TransactionOrchestrator;
119use sui_core::{
120    authority::{AuthorityState, AuthorityStore},
121    authority_client::NetworkAuthorityClient,
122};
123use sui_json_rpc::JsonRpcServerBuilder;
124use sui_json_rpc::coin_api::CoinReadApi;
125use sui_json_rpc::governance_api::GovernanceReadApi;
126use sui_json_rpc::indexer_api::IndexerApi;
127use sui_json_rpc::move_utils::MoveUtils;
128use sui_json_rpc::read_api::ReadApi;
129use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
130use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
131use sui_macros::fail_point;
132use sui_macros::{fail_point_async, replay_log};
133use sui_network::api::ValidatorServer;
134use sui_network::discovery;
135use sui_network::endpoint_manager::EndpointManager;
136use sui_network::state_sync;
137use sui_network::validator::server::ServerBuilder;
138use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
139use sui_snapshot::uploader::StateSnapshotUploader;
140use sui_storage::{
141    http_key_value_store::HttpKVStore,
142    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
143    key_value_store_metrics::KeyValueStoreMetrics,
144};
145use sui_types::base_types::{AuthorityName, EpochId};
146use sui_types::committee::Committee;
147use sui_types::crypto::KeypairTraits;
148use sui_types::error::{SuiError, SuiResult};
149use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
150use sui_types::sui_system_state::SuiSystemStateTrait;
151use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
152use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
153use sui_types::supported_protocol_versions::SupportedProtocolVersions;
154use typed_store::DBMetrics;
155use typed_store::rocks::default_db_options;
156
157use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
158
159pub mod admin;
160mod handle;
161pub mod metrics;
162
163pub struct ValidatorComponents {
164    validator_server_handle: SpawnOnce,
165    validator_overload_monitor_handle: Option<JoinHandle<()>>,
166    consensus_manager: Arc<ConsensusManager>,
167    consensus_store_pruner: ConsensusStorePruner,
168    consensus_adapter: Arc<ConsensusAdapter>,
169    checkpoint_metrics: Arc<CheckpointMetrics>,
170    sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
171    admission_queue: Option<AdmissionQueueContext>,
172}
173pub struct P2pComponents {
174    p2p_network: Network,
175    known_peers: HashMap<PeerId, String>,
176    discovery_handle: discovery::Handle,
177    state_sync_handle: state_sync::Handle,
178    randomness_handle: randomness::Handle,
179    endpoint_manager: EndpointManager,
180}
181
182#[cfg(msim)]
183mod simulator {
184    use std::sync::atomic::AtomicBool;
185    use sui_types::error::SuiErrorKind;
186
187    use super::*;
188    pub(super) struct SimState {
189        pub sim_node: sui_simulator::runtime::NodeHandle,
190        pub sim_safe_mode_expected: AtomicBool,
191        _leak_detector: sui_simulator::NodeLeakDetector,
192    }
193
194    impl Default for SimState {
195        fn default() -> Self {
196            Self {
197                sim_node: sui_simulator::runtime::NodeHandle::current(),
198                sim_safe_mode_expected: AtomicBool::new(false),
199                _leak_detector: sui_simulator::NodeLeakDetector::new(),
200            }
201        }
202    }
203
204    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
205        + Send
206        + Sync
207        + 'static;
208
209    fn default_fetch_jwks(
210        _authority: AuthorityName,
211        _provider: &OIDCProvider,
212    ) -> SuiResult<Vec<(JwkId, JWK)>> {
213        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
214        // Just load a default Twitch jwk for testing.
215        parse_jwks(
216            sui_types::zk_login_util::DEFAULT_JWK_BYTES,
217            &OIDCProvider::Twitch,
218            true,
219        )
220        .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
221    }
222
223    thread_local! {
224        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
225    }
226
227    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
228        JWK_INJECTOR.with(|injector| injector.borrow().clone())
229    }
230
231    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
232        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
233    }
234}
235
236#[cfg(msim)]
237pub use simulator::set_jwk_injector;
238#[cfg(msim)]
239use simulator::*;
240use sui_core::authority::authority_store_pruner::PrunerWatermarks;
241use sui_core::{
242    consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
243};
244
245const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
246
247pub struct SuiNode {
248    config: NodeConfig,
249    validator_components: Mutex<Option<ValidatorComponents>>,
250
251    /// The http servers responsible for serving RPC traffic (gRPC and JSON-RPC)
252    #[allow(unused)]
253    http_servers: HttpServers,
254
255    state: Arc<AuthorityState>,
256    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
257    registry_service: RegistryService,
258    metrics: Arc<SuiNodeMetrics>,
259    checkpoint_metrics: Arc<CheckpointMetrics>,
260
261    _discovery: discovery::Handle,
262    _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
263    state_sync_handle: state_sync::Handle,
264    randomness_handle: randomness::Handle,
265    checkpoint_store: Arc<CheckpointStore>,
266    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
267
268    /// Broadcast channel to send the starting system state for the next epoch.
269    end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
270
271    /// EndpointManager for updating peer network addresses.
272    endpoint_manager: EndpointManager,
273
274    backpressure_manager: Arc<BackpressureManager>,
275
276    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
277
278    #[cfg(msim)]
279    sim_state: SimState,
280
281    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
282    // Channel to allow signaling upstream to shutdown sui-node
283    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
284
285    /// AuthorityAggregator of the network, created at start and beginning of each epoch.
286    /// Use ArcSwap so that we could mutate it without taking mut reference.
287    // TODO: Eventually we can make this auth aggregator a shared reference so that this
288    // update will automatically propagate to other uses.
289    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
290
291    subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
292}
293
294impl fmt::Debug for SuiNode {
295    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
296        f.debug_struct("SuiNode")
297            .field("name", &self.state.name.concise())
298            .finish()
299    }
300}
301
302static MAX_JWK_KEYS_PER_FETCH: usize = 100;
303
304impl SuiNode {
305    pub async fn start(
306        config: NodeConfig,
307        registry_service: RegistryService,
308    ) -> Result<Arc<SuiNode>> {
309        Self::start_async(
310            config,
311            registry_service,
312            ServerVersion::new("sui-node", "unknown"),
313        )
314        .await
315    }
316
317    fn start_jwk_updater(
318        config: &NodeConfig,
319        metrics: Arc<SuiNodeMetrics>,
320        authority: AuthorityName,
321        epoch_store: Arc<AuthorityPerEpochStore>,
322        consensus_adapter: Arc<ConsensusAdapter>,
323    ) {
324        let epoch = epoch_store.epoch();
325
326        let supported_providers = config
327            .zklogin_oauth_providers
328            .get(&epoch_store.get_chain_identifier().chain())
329            .unwrap_or(&BTreeSet::new())
330            .iter()
331            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
332            .collect::<Vec<_>>();
333
334        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
335
336        info!(
337            ?fetch_interval,
338            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
339        );
340
341        fn validate_jwk(
342            metrics: &Arc<SuiNodeMetrics>,
343            provider: &OIDCProvider,
344            id: &JwkId,
345            jwk: &JWK,
346        ) -> bool {
347            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
348                warn!(
349                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
350                    id.iss, provider
351                );
352                metrics
353                    .invalid_jwks
354                    .with_label_values(&[&provider.to_string()])
355                    .inc();
356                return false;
357            };
358
359            if iss_provider != *provider {
360                warn!(
361                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
362                    id.iss, provider, iss_provider
363                );
364                metrics
365                    .invalid_jwks
366                    .with_label_values(&[&provider.to_string()])
367                    .inc();
368                return false;
369            }
370
371            if !check_total_jwk_size(id, jwk) {
372                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
373                metrics
374                    .invalid_jwks
375                    .with_label_values(&[&provider.to_string()])
376                    .inc();
377                return false;
378            }
379
380            true
381        }
382
383        // metrics is:
384        //  pub struct SuiNodeMetrics {
385        //      pub jwk_requests: IntCounterVec,
386        //      pub jwk_request_errors: IntCounterVec,
387        //      pub total_jwks: IntCounterVec,
388        //      pub unique_jwks: IntCounterVec,
389        //  }
390
391        for p in supported_providers.into_iter() {
392            let provider_str = p.to_string();
393            let epoch_store = epoch_store.clone();
394            let consensus_adapter = consensus_adapter.clone();
395            let metrics = metrics.clone();
396            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
397                async move {
398                    // note: restart-safe de-duplication happens after consensus, this is
399                    // just best-effort to reduce unneeded submissions.
400                    let mut seen = HashSet::new();
401                    loop {
402                        jwk_log!("fetching JWK for provider {:?}", p);
403                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
404                        match Self::fetch_jwks(authority, &p).await {
405                            Err(e) => {
406                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
407                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
408                                // Retry in 30 seconds
409                                tokio::time::sleep(Duration::from_secs(30)).await;
410                                continue;
411                            }
412                            Ok(mut keys) => {
413                                metrics.total_jwks
414                                    .with_label_values(&[&provider_str])
415                                    .inc_by(keys.len() as u64);
416
417                                keys.retain(|(id, jwk)| {
418                                    validate_jwk(&metrics, &p, id, jwk) &&
419                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
420                                    seen.insert((id.clone(), jwk.clone()))
421                                });
422
423                                metrics.unique_jwks
424                                    .with_label_values(&[&provider_str])
425                                    .inc_by(keys.len() as u64);
426
427                                // prevent oauth providers from sending too many keys,
428                                // inadvertently or otherwise
429                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
430                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
431                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
432                                }
433
434                                for (id, jwk) in keys.into_iter() {
435                                    jwk_log!("Submitting JWK to consensus: {:?}", id);
436
437                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
438                                    consensus_adapter.submit(txn, None, &epoch_store, None, None)
439                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
440                                        .ok();
441                                }
442                            }
443                        }
444                        tokio::time::sleep(fetch_interval).await;
445                    }
446                }
447                .instrument(error_span!("jwk_updater_task", epoch)),
448            ));
449        }
450    }
451
452    pub async fn start_async(
453        config: NodeConfig,
454        registry_service: RegistryService,
455        server_version: ServerVersion,
456    ) -> Result<Arc<SuiNode>> {
457        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
458        let mut config = config.clone();
459        if config.supported_protocol_versions.is_none() {
460            info!(
461                "populating config.supported_protocol_versions with default {:?}",
462                SupportedProtocolVersions::SYSTEM_DEFAULT
463            );
464            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
465        }
466
467        let run_with_range = config.run_with_range;
468        let is_validator = config.consensus_config().is_some();
469        let is_full_node = !is_validator;
470        let prometheus_registry = registry_service.default_registry();
471
472        info!(node =? config.protocol_public_key(),
473            "Initializing sui-node listening on {}", config.network_address
474        );
475
476        // Initialize metrics to track db usage before creating any stores
477        DBMetrics::init(registry_service.clone());
478
479        // Initialize db sync-to-disk setting from config (falls back to env var if not set)
480        typed_store::init_write_sync(config.enable_db_sync_to_disk);
481
482        // Initialize Mysten metrics.
483        mysten_metrics::init_metrics(&prometheus_registry);
484        // Unsupported (because of the use of static variable) and unnecessary in simtests.
485        #[cfg(not(msim))]
486        mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
487
488        let genesis = config.genesis()?.clone();
489
490        let secret = Arc::pin(config.protocol_key_pair().copy());
491        let genesis_committee = genesis.committee();
492        let committee_store = Arc::new(CommitteeStore::new(
493            config.db_path().join("epochs"),
494            &genesis_committee,
495            None,
496        ));
497
498        let pruner_watermarks = Arc::new(PrunerWatermarks::default());
499        let checkpoint_store = CheckpointStore::new(
500            &config.db_path().join("checkpoints"),
501            pruner_watermarks.clone(),
502        );
503        let checkpoint_metrics = CheckpointMetrics::new(&registry_service.default_registry());
504
505        Self::check_and_recover_forks(
506            &checkpoint_store,
507            &checkpoint_metrics,
508            is_validator,
509            config.fork_recovery.as_ref(),
510        )
511        .await?;
512
513        // By default, only enable write stall on validators for perpetual db.
514        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
515        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
516            enable_write_stall,
517            is_validator,
518        };
519        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
520            &config.db_path().join("store"),
521            Some(perpetual_tables_options),
522            Some(pruner_watermarks.epoch_id.clone()),
523        ));
524        let is_genesis = perpetual_tables
525            .database_is_empty()
526            .expect("Database read should not fail at init.");
527
528        let backpressure_manager =
529            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
530
531        let store =
532            AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
533
534        let cur_epoch = store.get_recovery_epoch_at_restart()?;
535        let committee = committee_store
536            .get_committee(&cur_epoch)?
537            .expect("Committee of the current epoch must exist");
538        let epoch_start_configuration = store
539            .get_epoch_start_configuration()?
540            .expect("EpochStartConfiguration of the current epoch must exist");
541        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
542        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
543
544        let cache_traits = build_execution_cache(
545            &config.execution_cache,
546            &prometheus_registry,
547            &store,
548            backpressure_manager.clone(),
549        );
550
551        let auth_agg = {
552            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
553            Arc::new(ArcSwap::new(Arc::new(
554                AuthorityAggregator::new_from_epoch_start_state(
555                    epoch_start_configuration.epoch_start_state(),
556                    &committee_store,
557                    safe_client_metrics_base,
558                ),
559            )))
560        };
561
562        let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
563        let chain = match config.chain_override_for_testing {
564            Some(chain) => chain,
565            None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
566        };
567
568        let highest_executed_checkpoint = checkpoint_store
569            .get_highest_executed_checkpoint_seq_number()
570            .expect("checkpoint store read cannot fail")
571            .unwrap_or(0);
572
573        let previous_epoch_last_checkpoint = if cur_epoch == 0 {
574            0
575        } else {
576            checkpoint_store
577                .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
578                .expect("checkpoint store read cannot fail")
579                .unwrap_or(highest_executed_checkpoint)
580        };
581
582        let epoch_options = default_db_options().optimize_db_for_write_throughput(4, false);
583        let epoch_store = AuthorityPerEpochStore::new(
584            config.protocol_public_key(),
585            committee.clone(),
586            &config.db_path().join("store"),
587            Some(epoch_options.options),
588            EpochMetrics::new(&registry_service.default_registry()),
589            epoch_start_configuration,
590            cache_traits.backing_package_store.clone(),
591            cache_traits.object_store.clone(),
592            cache_metrics,
593            signature_verifier_metrics,
594            &config.expensive_safety_check_config,
595            (chain_id, chain),
596            highest_executed_checkpoint,
597            previous_epoch_last_checkpoint,
598            Arc::new(SubmittedTransactionCacheMetrics::new(
599                &registry_service.default_registry(),
600            )),
601        )?;
602
603        info!("created epoch store");
604
605        replay_log!(
606            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
607            epoch_store.epoch(),
608            epoch_store.protocol_config()
609        );
610
611        // the database is empty at genesis time
612        if is_genesis {
613            info!("checking SUI conservation at genesis");
614            // When we are opening the db table, the only time when it's safe to
615            // check SUI conservation is at genesis. Otherwise we may be in the middle of
616            // an epoch and the SUI conservation check will fail. This also initialize
617            // the expected_network_sui_amount table.
618            cache_traits
619                .reconfig_api
620                .expensive_check_sui_conservation(&epoch_store)
621                .expect("SUI conservation check cannot fail at genesis");
622        }
623
624        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
625        let default_buffer_stake = epoch_store
626            .protocol_config()
627            .buffer_stake_for_protocol_upgrade_bps();
628        if effective_buffer_stake != default_buffer_stake {
629            warn!(
630                ?effective_buffer_stake,
631                ?default_buffer_stake,
632                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
633            );
634        }
635
636        checkpoint_store.insert_genesis_checkpoint(
637            genesis.checkpoint(),
638            genesis.checkpoint_contents().clone(),
639            &epoch_store,
640        );
641
642        info!("creating state sync store");
643        let state_sync_store = RocksDbStore::new(
644            cache_traits.clone(),
645            committee_store.clone(),
646            checkpoint_store.clone(),
647        );
648
649        let index_store = if is_full_node && config.enable_index_processing {
650            info!("creating jsonrpc index store");
651            Some(Arc::new(IndexStore::new(
652                config.db_path().join("indexes"),
653                &prometheus_registry,
654                epoch_store
655                    .protocol_config()
656                    .max_move_identifier_len_as_option(),
657                config.remove_deprecated_tables,
658            )))
659        } else {
660            None
661        };
662
663        let rpc_index = if is_full_node && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
664            info!("creating rpc index store");
665            Some(Arc::new(
666                RpcIndexStore::new(
667                    &config.db_path(),
668                    &store,
669                    &checkpoint_store,
670                    &epoch_store,
671                    &cache_traits.backing_package_store,
672                    pruner_watermarks.checkpoint_id.clone(),
673                    config.rpc().cloned().unwrap_or_default(),
674                )
675                .await,
676            ))
677        } else {
678            None
679        };
680
681        let chain_identifier = epoch_store.get_chain_identifier();
682
683        info!("creating archive reader");
684        // Create network
685        let (randomness_tx, randomness_rx) = mpsc::channel(
686            config
687                .p2p_config
688                .randomness
689                .clone()
690                .unwrap_or_default()
691                .mailbox_capacity(),
692        );
693        let P2pComponents {
694            p2p_network,
695            known_peers,
696            discovery_handle,
697            state_sync_handle,
698            randomness_handle,
699            endpoint_manager,
700        } = Self::create_p2p_network(
701            &config,
702            state_sync_store.clone(),
703            chain_identifier,
704            randomness_tx,
705            &prometheus_registry,
706        )?;
707
708        // Inject configured peer address overrides.
709        for peer in &config.p2p_config.peer_address_overrides {
710            endpoint_manager
711                .update_endpoint(
712                    EndpointId::P2p(peer.peer_id),
713                    AddressSource::Config,
714                    peer.addresses.clone(),
715                )
716                .expect("Updating peer address overrides should not fail");
717        }
718
719        // Send initial peer addresses to the p2p network.
720        update_peer_addresses(&config, &endpoint_manager, epoch_store.epoch_start_state());
721
722        info!("start snapshot upload");
723        // Start uploading state snapshot to remote store
724        let state_snapshot_handle = Self::start_state_snapshot(
725            &config,
726            &prometheus_registry,
727            checkpoint_store.clone(),
728            chain_identifier,
729        )?;
730
731        // Start uploading db checkpoints to remote store
732        info!("start db checkpoint");
733        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
734            &config,
735            &prometheus_registry,
736            state_snapshot_handle.is_some(),
737        )?;
738
739        if !epoch_store
740            .protocol_config()
741            .simplified_unwrap_then_delete()
742        {
743            // We cannot prune tombstones if simplified_unwrap_then_delete is not enabled.
744            config
745                .authority_store_pruning_config
746                .set_killswitch_tombstone_pruning(true);
747        }
748
749        let authority_name = config.protocol_public_key();
750
751        info!("create authority state");
752        let state = AuthorityState::new(
753            authority_name,
754            secret,
755            config.supported_protocol_versions.unwrap(),
756            store.clone(),
757            cache_traits.clone(),
758            epoch_store.clone(),
759            committee_store.clone(),
760            index_store.clone(),
761            rpc_index,
762            checkpoint_store.clone(),
763            &prometheus_registry,
764            genesis.objects(),
765            &db_checkpoint_config,
766            config.clone(),
767            chain_identifier,
768            config.policy_config.clone(),
769            config.firewall_config.clone(),
770            pruner_watermarks,
771        )
772        .await;
773        // ensure genesis txn was executed
774        if epoch_store.epoch() == 0 {
775            let txn = &genesis.transaction();
776            let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
777            let transaction =
778                sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
779                    sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
780                        genesis.transaction().data().clone(),
781                        sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
782                    ),
783                );
784            state
785                .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
786                .instrument(span)
787                .await
788                .unwrap();
789        }
790
791        // Start the loop that receives new randomness and generates transactions for it.
792        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
793
794        if config
795            .expensive_safety_check_config
796            .enable_secondary_index_checks()
797            && let Some(indexes) = state.indexes.clone()
798        {
799            sui_core::verify_indexes::verify_indexes(
800                state.get_global_state_hash_store().as_ref(),
801                indexes,
802            )
803            .expect("secondary indexes are inconsistent");
804        }
805
806        let (end_of_epoch_channel, end_of_epoch_receiver) =
807            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
808
809        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
810            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
811                auth_agg.load_full(),
812                state.clone(),
813                end_of_epoch_receiver,
814                &config.db_path(),
815                &prometheus_registry,
816                &config,
817            )))
818        } else {
819            None
820        };
821
822        let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
823            state.clone(),
824            state_sync_store,
825            &transaction_orchestrator.clone(),
826            &config,
827            &prometheus_registry,
828            server_version,
829        )
830        .await?;
831
832        let global_state_hasher = Arc::new(GlobalStateHasher::new(
833            cache_traits.global_state_hash_store.clone(),
834            GlobalStateHashMetrics::new(&prometheus_registry),
835        ));
836
837        let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
838            "sui",
839            &registry_service.default_registry(),
840        );
841
842        let connection_monitor_handle =
843            mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
844                p2p_network.downgrade(),
845                Arc::new(network_connection_metrics),
846                known_peers,
847            );
848
849        let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));
850
851        sui_node_metrics
852            .binary_max_protocol_version
853            .set(ProtocolVersion::MAX.as_u64() as i64);
854        sui_node_metrics
855            .configured_max_protocol_version
856            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
857
858        let validator_components = if state.is_validator(&epoch_store) {
859            let mut components = Self::construct_validator_components(
860                config.clone(),
861                state.clone(),
862                committee,
863                epoch_store.clone(),
864                checkpoint_store.clone(),
865                state_sync_handle.clone(),
866                randomness_handle.clone(),
867                Arc::downgrade(&global_state_hasher),
868                backpressure_manager.clone(),
869                &registry_service,
870                sui_node_metrics.clone(),
871                checkpoint_metrics.clone(),
872            )
873            .await?;
874
875            components
876                .consensus_adapter
877                .recover_end_of_publish(&epoch_store);
878
879            // Start the gRPC server
880            components.validator_server_handle = components.validator_server_handle.start().await;
881
882            // Set the consensus address updater so that we can update the consensus peer addresses when requested.
883            endpoint_manager.set_consensus_address_updater(components.consensus_manager.clone());
884
885            Some(components)
886        } else {
887            None
888        };
889
890        // setup shutdown channel
891        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
892
893        let node = Self {
894            config,
895            validator_components: Mutex::new(validator_components),
896            http_servers,
897            state,
898            transaction_orchestrator,
899            registry_service,
900            metrics: sui_node_metrics,
901            checkpoint_metrics,
902
903            _discovery: discovery_handle,
904            _connection_monitor_handle: connection_monitor_handle,
905            state_sync_handle,
906            randomness_handle,
907            checkpoint_store,
908            global_state_hasher: Mutex::new(Some(global_state_hasher)),
909            end_of_epoch_channel,
910            endpoint_manager,
911            backpressure_manager,
912
913            _db_checkpoint_handle: db_checkpoint_handle,
914
915            #[cfg(msim)]
916            sim_state: Default::default(),
917
918            _state_snapshot_uploader_handle: state_snapshot_handle,
919            shutdown_channel_tx: shutdown_channel,
920
921            auth_agg,
922            subscription_service_checkpoint_sender,
923        };
924
925        info!("SuiNode started!");
926        let node = Arc::new(node);
927        let node_copy = node.clone();
928        spawn_monitored_task!(async move {
929            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
930            if let Err(error) = result {
931                warn!("Reconfiguration finished with error {:?}", error);
932            }
933        });
934
935        Ok(node)
936    }
937
938    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
939        self.end_of_epoch_channel.subscribe()
940    }
941
942    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
943        self.shutdown_channel_tx.subscribe()
944    }
945
946    pub fn current_epoch_for_testing(&self) -> EpochId {
947        self.state.current_epoch_for_testing()
948    }
949
950    pub fn db_checkpoint_path(&self) -> PathBuf {
951        self.config.db_checkpoint_path()
952    }
953
954    // Init reconfig process by starting to reject user certs
955    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
956        info!("close_epoch (current epoch = {})", epoch_store.epoch());
957        self.validator_components
958            .lock()
959            .await
960            .as_ref()
961            .ok_or_else(|| SuiError::from("Node is not a validator"))?
962            .consensus_adapter
963            .close_epoch(epoch_store);
964        Ok(())
965    }
966
967    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
968        self.state
969            .clear_override_protocol_upgrade_buffer_stake(epoch)
970    }
971
972    pub fn set_override_protocol_upgrade_buffer_stake(
973        &self,
974        epoch: EpochId,
975        buffer_stake_bps: u64,
976    ) -> SuiResult {
977        self.state
978            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
979    }
980
981    // Testing-only API to start epoch close process.
982    // For production code, please use the non-testing version.
983    pub async fn close_epoch_for_testing(&self) -> SuiResult {
984        let epoch_store = self.state.epoch_store_for_testing();
985        self.close_epoch(&epoch_store).await
986    }
987
988    fn start_state_snapshot(
989        config: &NodeConfig,
990        prometheus_registry: &Registry,
991        checkpoint_store: Arc<CheckpointStore>,
992        chain_identifier: ChainIdentifier,
993    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
994        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
995            let snapshot_uploader = StateSnapshotUploader::new(
996                &config.db_checkpoint_path(),
997                &config.snapshot_path(),
998                remote_store_config.clone(),
999                60,
1000                prometheus_registry,
1001                checkpoint_store,
1002                chain_identifier,
1003                config.state_snapshot_write_config.archive_interval_epochs,
1004            )?;
1005            Ok(Some(snapshot_uploader.start()))
1006        } else {
1007            Ok(None)
1008        }
1009    }
1010
1011    fn start_db_checkpoint(
1012        config: &NodeConfig,
1013        prometheus_registry: &Registry,
1014        state_snapshot_enabled: bool,
1015    ) -> Result<(
1016        DBCheckpointConfig,
1017        Option<tokio::sync::broadcast::Sender<()>>,
1018    )> {
1019        let checkpoint_path = Some(
1020            config
1021                .db_checkpoint_config
1022                .checkpoint_path
1023                .clone()
1024                .unwrap_or_else(|| config.db_checkpoint_path()),
1025        );
1026        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1027            DBCheckpointConfig {
1028                checkpoint_path,
1029                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1030                    true
1031                } else {
1032                    config
1033                        .db_checkpoint_config
1034                        .perform_db_checkpoints_at_epoch_end
1035                },
1036                ..config.db_checkpoint_config.clone()
1037            }
1038        } else {
1039            config.db_checkpoint_config.clone()
1040        };
1041
1042        match (
1043            db_checkpoint_config.object_store_config.as_ref(),
1044            state_snapshot_enabled,
1045        ) {
1046            // If db checkpoint config object store not specified but
1047            // state snapshot object store is specified, create handler
1048            // anyway for marking db checkpoints as completed so that they
1049            // can be uploaded as state snapshots.
1050            (None, false) => Ok((db_checkpoint_config, None)),
1051            (_, _) => {
1052                let handler = DBCheckpointHandler::new(
1053                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1054                    db_checkpoint_config.object_store_config.as_ref(),
1055                    60,
1056                    db_checkpoint_config
1057                        .prune_and_compact_before_upload
1058                        .unwrap_or(true),
1059                    config.authority_store_pruning_config.clone(),
1060                    prometheus_registry,
1061                    state_snapshot_enabled,
1062                )?;
1063                Ok((
1064                    db_checkpoint_config,
1065                    Some(DBCheckpointHandler::start(handler)),
1066                ))
1067            }
1068        }
1069    }
1070
1071    fn create_p2p_network(
1072        config: &NodeConfig,
1073        state_sync_store: RocksDbStore,
1074        chain_identifier: ChainIdentifier,
1075        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1076        prometheus_registry: &Registry,
1077    ) -> Result<P2pComponents> {
1078        let mut p2p_config = config.p2p_config.clone();
1079        {
1080            let disc = p2p_config.discovery.get_or_insert_with(Default::default);
1081            if disc.peer_addr_store_path.is_none() {
1082                disc.peer_addr_store_path =
1083                    Some(config.db_path().join("discovery_peer_cache.yaml"));
1084            }
1085        }
1086        let mut discovery_builder = discovery::Builder::new().config(p2p_config.clone());
1087        if let Some(consensus_config) = &config.consensus_config {
1088            let effective_addr = consensus_config
1089                .external_address
1090                .as_ref()
1091                .or(consensus_config.listen_address.as_ref());
1092            if let Some(addr) = effective_addr {
1093                discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1094            }
1095        }
1096        let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1097        let discovery_sender = discovery.sender();
1098
1099        let (state_sync, state_sync_router) = state_sync::Builder::new()
1100            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1101            .store(state_sync_store)
1102            .archive_config(config.archive_reader_config())
1103            .discovery_sender(discovery_sender)
1104            .with_metrics(prometheus_registry)
1105            .build();
1106
1107        let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1108        let known_peers: HashMap<PeerId, String> = discovery_config
1109            .allowlisted_peers
1110            .clone()
1111            .into_iter()
1112            .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1113            .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1114                peer.peer_id
1115                    .map(|peer_id| (peer_id, "seed_peer".to_string()))
1116            }))
1117            .collect();
1118
1119        let (randomness, randomness_router) =
1120            randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1121                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1122                .with_metrics(prometheus_registry)
1123                .build();
1124
1125        let p2p_network = {
1126            let routes = anemo::Router::new()
1127                .add_rpc_service(discovery_server)
1128                .merge(state_sync_router);
1129            let routes = routes.merge(randomness_router);
1130
1131            let inbound_network_metrics =
1132                mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1133            let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1134                "sui",
1135                "outbound",
1136                prometheus_registry,
1137            );
1138
1139            let service = ServiceBuilder::new()
1140                .layer(
1141                    TraceLayer::new_for_server_errors()
1142                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1143                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1144                )
1145                .layer(CallbackLayer::new(
1146                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1147                        Arc::new(inbound_network_metrics),
1148                        config.p2p_config.excessive_message_size(),
1149                    ),
1150                ))
1151                .service(routes);
1152
1153            let outbound_layer = ServiceBuilder::new()
1154                .layer(
1155                    TraceLayer::new_for_client_and_server_errors()
1156                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1157                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1158                )
1159                .layer(CallbackLayer::new(
1160                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1161                        Arc::new(outbound_network_metrics),
1162                        config.p2p_config.excessive_message_size(),
1163                    ),
1164                ))
1165                .into_inner();
1166
1167            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1168            // Inbound requests on this network are small (signatures, queries, summaries).
1169            // Cap request frames at 1 MiB.
1170            anemo_config.max_request_frame_size = Some(1 << 20);
1171            // Responses can be larger (checkpoint contents).
1172            // Cap response frames at 128 MiB.
1173            anemo_config.max_response_frame_size = Some(128 << 20);
1174
1175            // Set a higher default value for socket send/receive buffers if not already
1176            // configured.
1177            let mut quic_config = anemo_config.quic.unwrap_or_default();
1178            if quic_config.socket_send_buffer_size.is_none() {
1179                quic_config.socket_send_buffer_size = Some(20 << 20);
1180            }
1181            if quic_config.socket_receive_buffer_size.is_none() {
1182                quic_config.socket_receive_buffer_size = Some(20 << 20);
1183            }
1184            quic_config.allow_failed_socket_buffer_size_setting = true;
1185
1186            // Set high-performance defaults for quinn transport.
1187            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1188            if quic_config.max_concurrent_bidi_streams.is_none() {
1189                quic_config.max_concurrent_bidi_streams = Some(500);
1190            }
1191            if quic_config.max_concurrent_uni_streams.is_none() {
1192                quic_config.max_concurrent_uni_streams = Some(500);
1193            }
1194            if quic_config.stream_receive_window.is_none() {
1195                quic_config.stream_receive_window = Some(100 << 20);
1196            }
1197            if quic_config.receive_window.is_none() {
1198                quic_config.receive_window = Some(200 << 20);
1199            }
1200            if quic_config.send_window.is_none() {
1201                quic_config.send_window = Some(200 << 20);
1202            }
1203            if quic_config.crypto_buffer_size.is_none() {
1204                quic_config.crypto_buffer_size = Some(1 << 20);
1205            }
1206            if quic_config.max_idle_timeout_ms.is_none() {
1207                quic_config.max_idle_timeout_ms = Some(10_000);
1208            }
1209            if quic_config.keep_alive_interval_ms.is_none() {
1210                quic_config.keep_alive_interval_ms = Some(5_000);
1211            }
1212            anemo_config.quic = Some(quic_config);
1213
1214            let server_name = format!("sui-{}", chain_identifier);
1215            let network = Network::bind(config.p2p_config.listen_address)
1216                .server_name(&server_name)
1217                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1218                .config(anemo_config)
1219                .outbound_request_layer(outbound_layer)
1220                .start(service)?;
1221            info!(
1222                server_name = server_name,
1223                "P2p network started on {}",
1224                network.local_addr()
1225            );
1226
1227            network
1228        };
1229
1230        let discovery_handle =
1231            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1232        let state_sync_handle = state_sync.start(p2p_network.clone());
1233        let randomness_handle = randomness.start(p2p_network.clone());
1234
1235        Ok(P2pComponents {
1236            p2p_network,
1237            known_peers,
1238            discovery_handle,
1239            state_sync_handle,
1240            randomness_handle,
1241            endpoint_manager,
1242        })
1243    }
1244
1245    async fn construct_validator_components(
1246        config: NodeConfig,
1247        state: Arc<AuthorityState>,
1248        committee: Arc<Committee>,
1249        epoch_store: Arc<AuthorityPerEpochStore>,
1250        checkpoint_store: Arc<CheckpointStore>,
1251        state_sync_handle: state_sync::Handle,
1252        randomness_handle: randomness::Handle,
1253        global_state_hasher: Weak<GlobalStateHasher>,
1254        backpressure_manager: Arc<BackpressureManager>,
1255        registry_service: &RegistryService,
1256        sui_node_metrics: Arc<SuiNodeMetrics>,
1257        checkpoint_metrics: Arc<CheckpointMetrics>,
1258    ) -> Result<ValidatorComponents> {
1259        let mut config_clone = config.clone();
1260        let consensus_config = config_clone
1261            .consensus_config
1262            .as_mut()
1263            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1264
1265        let client = Arc::new(UpdatableConsensusClient::new());
1266        let inflight_slot_freed_notify = Arc::new(tokio::sync::Notify::new());
1267        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1268            &committee,
1269            consensus_config,
1270            state.name,
1271            &registry_service.default_registry(),
1272            client.clone(),
1273            checkpoint_store.clone(),
1274            inflight_slot_freed_notify.clone(),
1275        ));
1276        let consensus_manager = Arc::new(ConsensusManager::new(
1277            &config,
1278            consensus_config,
1279            registry_service,
1280            client,
1281        ));
1282
1283        // This only gets started up once, not on every epoch. (Make call to remove every epoch.)
1284        let consensus_store_pruner = ConsensusStorePruner::new(
1285            consensus_manager.get_storage_base_path(),
1286            consensus_config.db_retention_epochs(),
1287            consensus_config.db_pruner_period(),
1288            &registry_service.default_registry(),
1289        );
1290
1291        let sui_tx_validator_metrics =
1292            SuiTxValidatorMetrics::new(&registry_service.default_registry());
1293
1294        let (validator_server_handle, admission_queue) = Self::start_grpc_validator_service(
1295            &config,
1296            state.clone(),
1297            consensus_adapter.clone(),
1298            epoch_store.clone(),
1299            &registry_service.default_registry(),
1300            inflight_slot_freed_notify,
1301        )
1302        .await?;
1303
1304        // Starts an overload monitor that monitors the execution of the authority.
1305        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1306        let validator_overload_monitor_handle = if config
1307            .authority_overload_config
1308            .max_load_shedding_percentage
1309            > 0
1310        {
1311            let authority_state = Arc::downgrade(&state);
1312            let overload_config = config.authority_overload_config.clone();
1313            fail_point!("starting_overload_monitor");
1314            Some(spawn_monitored_task!(overload_monitor(
1315                authority_state,
1316                overload_config,
1317            )))
1318        } else {
1319            None
1320        };
1321
1322        Self::start_epoch_specific_validator_components(
1323            &config,
1324            state.clone(),
1325            consensus_adapter,
1326            checkpoint_store,
1327            epoch_store,
1328            state_sync_handle,
1329            randomness_handle,
1330            consensus_manager,
1331            consensus_store_pruner,
1332            global_state_hasher,
1333            backpressure_manager,
1334            validator_server_handle,
1335            validator_overload_monitor_handle,
1336            checkpoint_metrics,
1337            sui_node_metrics,
1338            sui_tx_validator_metrics,
1339            admission_queue,
1340        )
1341        .await
1342    }
1343
1344    async fn start_epoch_specific_validator_components(
1345        config: &NodeConfig,
1346        state: Arc<AuthorityState>,
1347        consensus_adapter: Arc<ConsensusAdapter>,
1348        checkpoint_store: Arc<CheckpointStore>,
1349        epoch_store: Arc<AuthorityPerEpochStore>,
1350        state_sync_handle: state_sync::Handle,
1351        randomness_handle: randomness::Handle,
1352        consensus_manager: Arc<ConsensusManager>,
1353        consensus_store_pruner: ConsensusStorePruner,
1354        state_hasher: Weak<GlobalStateHasher>,
1355        backpressure_manager: Arc<BackpressureManager>,
1356        validator_server_handle: SpawnOnce,
1357        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1358        checkpoint_metrics: Arc<CheckpointMetrics>,
1359        sui_node_metrics: Arc<SuiNodeMetrics>,
1360        sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1361        admission_queue: Option<AdmissionQueueContext>,
1362    ) -> Result<ValidatorComponents> {
1363        let checkpoint_service = Self::build_checkpoint_service(
1364            config,
1365            consensus_adapter.clone(),
1366            checkpoint_store.clone(),
1367            epoch_store.clone(),
1368            state.clone(),
1369            state_sync_handle,
1370            state_hasher,
1371            checkpoint_metrics.clone(),
1372        );
1373
1374        if epoch_store.randomness_state_enabled() {
1375            let randomness_manager = RandomnessManager::try_new(
1376                Arc::downgrade(&epoch_store),
1377                Box::new(consensus_adapter.clone()),
1378                randomness_handle,
1379                config.protocol_key_pair(),
1380            )
1381            .await;
1382            if let Some(randomness_manager) = randomness_manager {
1383                epoch_store
1384                    .set_randomness_manager(randomness_manager)
1385                    .await?;
1386            }
1387        }
1388
1389        ExecutionTimeObserver::spawn(
1390            epoch_store.clone(),
1391            Box::new(consensus_adapter.clone()),
1392            config
1393                .execution_time_observer_config
1394                .clone()
1395                .unwrap_or_default(),
1396        );
1397
1398        let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1399            None,
1400            state.metrics.clone(),
1401        ));
1402
1403        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1404            state.clone(),
1405            checkpoint_service.clone(),
1406            epoch_store.clone(),
1407            consensus_adapter.clone(),
1408            throughput_calculator,
1409            backpressure_manager,
1410            config.congestion_log.clone(),
1411        );
1412
1413        info!("Starting consensus manager asynchronously");
1414
1415        // Spawn consensus startup asynchronously to avoid blocking other components
1416        tokio::spawn({
1417            let config = config.clone();
1418            let epoch_store = epoch_store.clone();
1419            let sui_tx_validator = SuiTxValidator::new(
1420                state.clone(),
1421                epoch_store.clone(),
1422                checkpoint_service.clone(),
1423                sui_tx_validator_metrics.clone(),
1424            );
1425            let consensus_manager = consensus_manager.clone();
1426            async move {
1427                consensus_manager
1428                    .start(
1429                        &config,
1430                        epoch_store,
1431                        consensus_handler_initializer,
1432                        sui_tx_validator,
1433                    )
1434                    .await;
1435            }
1436        });
1437        let replay_waiter = consensus_manager.replay_waiter();
1438
1439        info!("Spawning checkpoint service");
1440        let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1441            None
1442        } else {
1443            Some(replay_waiter)
1444        };
1445        checkpoint_service
1446            .spawn(epoch_store.clone(), replay_waiter)
1447            .await;
1448
1449        if epoch_store.authenticator_state_enabled() {
1450            Self::start_jwk_updater(
1451                config,
1452                sui_node_metrics,
1453                state.name,
1454                epoch_store.clone(),
1455                consensus_adapter.clone(),
1456            );
1457        }
1458
1459        if let Some(ctx) = &admission_queue {
1460            ctx.rotate_for_epoch(epoch_store);
1461        }
1462
1463        Ok(ValidatorComponents {
1464            validator_server_handle,
1465            validator_overload_monitor_handle,
1466            consensus_manager,
1467            consensus_store_pruner,
1468            consensus_adapter,
1469            checkpoint_metrics,
1470            sui_tx_validator_metrics,
1471            admission_queue,
1472        })
1473    }
1474
1475    fn build_checkpoint_service(
1476        config: &NodeConfig,
1477        consensus_adapter: Arc<ConsensusAdapter>,
1478        checkpoint_store: Arc<CheckpointStore>,
1479        epoch_store: Arc<AuthorityPerEpochStore>,
1480        state: Arc<AuthorityState>,
1481        state_sync_handle: state_sync::Handle,
1482        state_hasher: Weak<GlobalStateHasher>,
1483        checkpoint_metrics: Arc<CheckpointMetrics>,
1484    ) -> Arc<CheckpointService> {
1485        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1486        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1487
1488        debug!(
1489            "Starting checkpoint service with epoch start timestamp {}
1490            and epoch duration {}",
1491            epoch_start_timestamp_ms, epoch_duration_ms
1492        );
1493
1494        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1495            sender: consensus_adapter,
1496            signer: state.secret.clone(),
1497            authority: config.protocol_public_key(),
1498            next_reconfiguration_timestamp_ms: epoch_store.next_reconfiguration_timestamp_ms(),
1499            metrics: checkpoint_metrics.clone(),
1500        });
1501
1502        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1503        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1504        let max_checkpoint_size_bytes =
1505            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1506
1507        CheckpointService::build(
1508            state.clone(),
1509            checkpoint_store,
1510            epoch_store,
1511            state.get_transaction_cache_reader().clone(),
1512            state_hasher,
1513            checkpoint_output,
1514            Box::new(certified_checkpoint_output),
1515            checkpoint_metrics,
1516            max_tx_per_checkpoint,
1517            max_checkpoint_size_bytes,
1518        )
1519    }
1520
1521    fn construct_consensus_adapter(
1522        committee: &Committee,
1523        consensus_config: &ConsensusConfig,
1524        authority: AuthorityName,
1525        prometheus_registry: &Registry,
1526        consensus_client: Arc<dyn ConsensusClient>,
1527        checkpoint_store: Arc<CheckpointStore>,
1528        inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1529    ) -> ConsensusAdapter {
1530        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1531        // The consensus adapter allows the authority to send user certificates through consensus.
1532
1533        ConsensusAdapter::new(
1534            consensus_client,
1535            checkpoint_store,
1536            authority,
1537            consensus_config.max_pending_transactions(),
1538            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1539            ca_metrics,
1540            inflight_slot_freed_notify,
1541        )
1542    }
1543
1544    async fn start_grpc_validator_service(
1545        config: &NodeConfig,
1546        state: Arc<AuthorityState>,
1547        consensus_adapter: Arc<ConsensusAdapter>,
1548        epoch_store: Arc<AuthorityPerEpochStore>,
1549        prometheus_registry: &Registry,
1550        inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1551    ) -> Result<(SpawnOnce, Option<AdmissionQueueContext>)> {
1552        let overload_config = &config.authority_overload_config;
1553        let admission_queue = overload_config.admission_queue_enabled.then(|| {
1554            let manager = Arc::new(AdmissionQueueManager::new(
1555                consensus_adapter.clone(),
1556                Arc::new(AdmissionQueueMetrics::new(prometheus_registry)),
1557                overload_config.admission_queue_capacity_fraction,
1558                overload_config.admission_queue_bypass_fraction,
1559                overload_config.admission_queue_failover_timeout,
1560                inflight_slot_freed_notify,
1561            ));
1562            AdmissionQueueContext::spawn(manager, epoch_store)
1563        });
1564        let validator_service = ValidatorService::new(
1565            state.clone(),
1566            consensus_adapter,
1567            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1568            config.policy_config.clone().map(|p| p.client_id_source),
1569            admission_queue.clone(),
1570        );
1571
1572        let mut server_conf = mysten_network::config::Config::new();
1573        server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1574        server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1575        server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1576        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1577        server_conf.load_shed = config.grpc_load_shed;
1578        let mut server_builder =
1579            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1580
1581        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1582
1583        let tls_config = sui_tls::create_rustls_server_config(
1584            config.network_key_pair().copy().private(),
1585            SUI_TLS_SERVER_NAME.to_string(),
1586        );
1587
1588        let network_address = config.network_address().clone();
1589
1590        let (ready_tx, ready_rx) = oneshot::channel();
1591
1592        let spawn_once = SpawnOnce::new(ready_rx, async move {
1593            let server = server_builder
1594                .bind(&network_address, Some(tls_config))
1595                .await
1596                .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1597            let local_addr = server.local_addr();
1598            info!("Listening to traffic on {local_addr}");
1599            ready_tx.send(()).unwrap();
1600            if let Err(err) = server.serve().await {
1601                info!("Server stopped: {err}");
1602            }
1603            info!("Server stopped");
1604        });
1605        Ok((spawn_once, admission_queue))
1606    }
1607
1608    pub fn state(&self) -> Arc<AuthorityState> {
1609        self.state.clone()
1610    }
1611
1612    // Only used for testing because of how epoch store is loaded.
1613    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1614        self.state.reference_gas_price_for_testing()
1615    }
1616
1617    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1618        self.state.committee_store().clone()
1619    }
1620
1621    /*
1622    pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1623        self.state.db()
1624    }
1625    */
1626
1627    /// Clone an AuthorityAggregator currently used in this node, if the node is a fullnode.
1628    /// After reconfig, Transaction Driver builds a new AuthorityAggregator. The caller
1629    /// of this function will mostly likely want to call this again
1630    /// to get a fresh one.
1631    pub fn clone_authority_aggregator(
1632        &self,
1633    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1634        self.transaction_orchestrator
1635            .as_ref()
1636            .map(|to| to.clone_authority_aggregator())
1637    }
1638
1639    pub fn transaction_orchestrator(
1640        &self,
1641    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1642        self.transaction_orchestrator.clone()
1643    }
1644
1645    /// This function awaits the completion of checkpoint execution of the current epoch,
1646    /// after which it initiates reconfiguration of the entire system.
1647    pub async fn monitor_reconfiguration(
1648        self: Arc<Self>,
1649        mut epoch_store: Arc<AuthorityPerEpochStore>,
1650    ) -> Result<()> {
1651        let checkpoint_executor_metrics =
1652            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1653
1654        loop {
1655            let mut hasher_guard = self.global_state_hasher.lock().await;
1656            let hasher = hasher_guard.take().unwrap();
1657            info!(
1658                "Creating checkpoint executor for epoch {}",
1659                epoch_store.epoch()
1660            );
1661            let checkpoint_executor = CheckpointExecutor::new(
1662                epoch_store.clone(),
1663                self.checkpoint_store.clone(),
1664                self.state.clone(),
1665                hasher.clone(),
1666                self.backpressure_manager.clone(),
1667                self.config.checkpoint_executor_config.clone(),
1668                checkpoint_executor_metrics.clone(),
1669                self.subscription_service_checkpoint_sender.clone(),
1670            );
1671
1672            let run_with_range = self.config.run_with_range;
1673
1674            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1675
1676            // Update the current protocol version metric.
1677            self.metrics
1678                .current_protocol_version
1679                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1680
1681            // Advertise capabilities to committee, if we are a validator.
1682            if let Some(components) = &*self.validator_components.lock().await {
1683                // TODO: without this sleep, the consensus message is not delivered reliably.
1684                tokio::time::sleep(Duration::from_millis(1)).await;
1685
1686                let config = cur_epoch_store.protocol_config();
1687                let mut supported_protocol_versions = self
1688                    .config
1689                    .supported_protocol_versions
1690                    .expect("Supported versions should be populated")
1691                    // no need to send digests of versions less than the current version
1692                    .truncate_below(config.version);
1693
1694                while supported_protocol_versions.max > config.version {
1695                    let proposed_protocol_config = ProtocolConfig::get_for_version(
1696                        supported_protocol_versions.max,
1697                        cur_epoch_store.get_chain(),
1698                    );
1699
1700                    if proposed_protocol_config.enable_accumulators()
1701                        && !epoch_store.accumulator_root_exists()
1702                    {
1703                        error!(
1704                            "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1705                            supported_protocol_versions.max
1706                        );
1707                        supported_protocol_versions.max = supported_protocol_versions.max.prev();
1708                    } else {
1709                        break;
1710                    }
1711                }
1712
1713                let binary_config = config.binary_config(None);
1714                let transaction = ConsensusTransaction::new_capability_notification_v2(
1715                    AuthorityCapabilitiesV2::new(
1716                        self.state.name,
1717                        cur_epoch_store.get_chain_identifier().chain(),
1718                        supported_protocol_versions,
1719                        self.state
1720                            .get_available_system_packages(&binary_config)
1721                            .await,
1722                    ),
1723                );
1724                info!(?transaction, "submitting capabilities to consensus");
1725                components.consensus_adapter.submit(
1726                    transaction,
1727                    None,
1728                    &cur_epoch_store,
1729                    None,
1730                    None,
1731                )?;
1732            }
1733
1734            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1735
1736            if stop_condition == StopReason::RunWithRangeCondition {
1737                SuiNode::shutdown(&self).await;
1738                self.shutdown_channel_tx
1739                    .send(run_with_range)
1740                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1741                return Ok(());
1742            }
1743
1744            // Safe to call because we are in the middle of reconfiguration.
1745            let latest_system_state = self
1746                .state
1747                .get_object_cache_reader()
1748                .get_sui_system_state_object_unsafe()
1749                .expect("Read Sui System State object cannot fail");
1750
1751            #[cfg(msim)]
1752            if !self
1753                .sim_state
1754                .sim_safe_mode_expected
1755                .load(Ordering::Relaxed)
1756            {
1757                debug_assert!(!latest_system_state.safe_mode());
1758            }
1759
1760            #[cfg(not(msim))]
1761            debug_assert!(!latest_system_state.safe_mode());
1762
1763            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1764                && self.state.is_fullnode(&cur_epoch_store)
1765            {
1766                warn!(
1767                    "Failed to send end of epoch notification to subscriber: {:?}",
1768                    err
1769                );
1770            }
1771
1772            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1773            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1774
1775            self.auth_agg.store(Arc::new(
1776                self.auth_agg
1777                    .load()
1778                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1779            ));
1780
1781            let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1782            let next_epoch = next_epoch_committee.epoch();
1783            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1784
1785            info!(
1786                next_epoch,
1787                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1788            );
1789
1790            fail_point_async!("reconfig_delay");
1791
1792            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1793
1794            update_peer_addresses(&self.config, &self.endpoint_manager, &new_epoch_start_state);
1795
1796            let mut validator_components_lock_guard = self.validator_components.lock().await;
1797
1798            // The following code handles 4 different cases, depending on whether the node
1799            // was a validator in the previous epoch, and whether the node is a validator
1800            // in the new epoch.
1801            let new_epoch_store = self
1802                .reconfigure_state(
1803                    &self.state,
1804                    &cur_epoch_store,
1805                    next_epoch_committee.clone(),
1806                    new_epoch_start_state,
1807                    hasher.clone(),
1808                )
1809                .await;
1810
1811            let new_validator_components = if let Some(ValidatorComponents {
1812                validator_server_handle,
1813                validator_overload_monitor_handle,
1814                consensus_manager,
1815                consensus_store_pruner,
1816                consensus_adapter,
1817                checkpoint_metrics,
1818                sui_tx_validator_metrics,
1819                admission_queue,
1820            }) = validator_components_lock_guard.take()
1821            {
1822                info!("Reconfiguring the validator.");
1823
1824                consensus_manager.shutdown().await;
1825                info!("Consensus has shut down.");
1826
1827                info!("Epoch store finished reconfiguration.");
1828
1829                // No other components should be holding a strong reference to state hasher
1830                // at this point. Confirm here before we swap in the new hasher.
1831                let global_state_hasher_metrics = Arc::into_inner(hasher)
1832                    .expect("Object state hasher should have no other references at this point")
1833                    .metrics();
1834                let new_hasher = Arc::new(GlobalStateHasher::new(
1835                    self.state.get_global_state_hash_store().clone(),
1836                    global_state_hasher_metrics,
1837                ));
1838                let weak_hasher = Arc::downgrade(&new_hasher);
1839                *hasher_guard = Some(new_hasher);
1840
1841                consensus_store_pruner.prune(next_epoch).await;
1842
1843                if self.state.is_validator(&new_epoch_store) {
1844                    // Only restart consensus if this node is still a validator in the new epoch.
1845                    Some(
1846                        Self::start_epoch_specific_validator_components(
1847                            &self.config,
1848                            self.state.clone(),
1849                            consensus_adapter,
1850                            self.checkpoint_store.clone(),
1851                            new_epoch_store.clone(),
1852                            self.state_sync_handle.clone(),
1853                            self.randomness_handle.clone(),
1854                            consensus_manager,
1855                            consensus_store_pruner,
1856                            weak_hasher,
1857                            self.backpressure_manager.clone(),
1858                            validator_server_handle,
1859                            validator_overload_monitor_handle,
1860                            checkpoint_metrics,
1861                            self.metrics.clone(),
1862                            sui_tx_validator_metrics,
1863                            admission_queue,
1864                        )
1865                        .await?,
1866                    )
1867                } else {
1868                    info!("This node is no longer a validator after reconfiguration");
1869                    None
1870                }
1871            } else {
1872                // No other components should be holding a strong reference to state hasher
1873                // at this point. Confirm here before we swap in the new hasher.
1874                let global_state_hasher_metrics = Arc::into_inner(hasher)
1875                    .expect("Object state hasher should have no other references at this point")
1876                    .metrics();
1877                let new_hasher = Arc::new(GlobalStateHasher::new(
1878                    self.state.get_global_state_hash_store().clone(),
1879                    global_state_hasher_metrics,
1880                ));
1881                let weak_hasher = Arc::downgrade(&new_hasher);
1882                *hasher_guard = Some(new_hasher);
1883
1884                if self.state.is_validator(&new_epoch_store) {
1885                    info!("Promoting the node from fullnode to validator, starting grpc server");
1886
1887                    let mut components = Self::construct_validator_components(
1888                        self.config.clone(),
1889                        self.state.clone(),
1890                        Arc::new(next_epoch_committee.clone()),
1891                        new_epoch_store.clone(),
1892                        self.checkpoint_store.clone(),
1893                        self.state_sync_handle.clone(),
1894                        self.randomness_handle.clone(),
1895                        weak_hasher,
1896                        self.backpressure_manager.clone(),
1897                        &self.registry_service,
1898                        self.metrics.clone(),
1899                        self.checkpoint_metrics.clone(),
1900                    )
1901                    .await?;
1902
1903                    components.validator_server_handle =
1904                        components.validator_server_handle.start().await;
1905
1906                    // Set the consensus address updater as the full node got promoted now to a validator.
1907                    self.endpoint_manager
1908                        .set_consensus_address_updater(components.consensus_manager.clone());
1909
1910                    Some(components)
1911                } else {
1912                    None
1913                }
1914            };
1915            *validator_components_lock_guard = new_validator_components;
1916
1917            // Force releasing current epoch store DB handle, because the
1918            // Arc<AuthorityPerEpochStore> may linger.
1919            cur_epoch_store.release_db_handles();
1920
1921            if cfg!(msim)
1922                && !matches!(
1923                    self.config
1924                        .authority_store_pruning_config
1925                        .num_epochs_to_retain_for_checkpoints(),
1926                    None | Some(u64::MAX) | Some(0)
1927                )
1928            {
1929                self.state
1930                    .prune_checkpoints_for_eligible_epochs_for_testing(
1931                        self.config.clone(),
1932                        sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1933                    )
1934                    .await?;
1935            }
1936
1937            epoch_store = new_epoch_store;
1938            info!("Reconfiguration finished");
1939        }
1940    }
1941
1942    async fn shutdown(&self) {
1943        if let Some(validator_components) = &*self.validator_components.lock().await {
1944            validator_components.consensus_manager.shutdown().await;
1945        }
1946    }
1947
1948    async fn reconfigure_state(
1949        &self,
1950        state: &Arc<AuthorityState>,
1951        cur_epoch_store: &AuthorityPerEpochStore,
1952        next_epoch_committee: Committee,
1953        next_epoch_start_system_state: EpochStartSystemState,
1954        global_state_hasher: Arc<GlobalStateHasher>,
1955    ) -> Arc<AuthorityPerEpochStore> {
1956        let next_epoch = next_epoch_committee.epoch();
1957
1958        let last_checkpoint = self
1959            .checkpoint_store
1960            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1961            .expect("Error loading last checkpoint for current epoch")
1962            .expect("Could not load last checkpoint for current epoch");
1963
1964        let last_checkpoint_seq = *last_checkpoint.sequence_number();
1965
1966        assert_eq!(
1967            Some(last_checkpoint_seq),
1968            self.checkpoint_store
1969                .get_highest_executed_checkpoint_seq_number()
1970                .expect("Error loading highest executed checkpoint sequence number")
1971        );
1972
1973        let epoch_start_configuration = EpochStartConfiguration::new(
1974            next_epoch_start_system_state,
1975            *last_checkpoint.digest(),
1976            state.get_object_store().as_ref(),
1977            EpochFlag::default_flags_for_new_epoch(&state.config),
1978        )
1979        .expect("EpochStartConfiguration construction cannot fail");
1980
1981        let new_epoch_store = self
1982            .state
1983            .reconfigure(
1984                cur_epoch_store,
1985                self.config.supported_protocol_versions.unwrap(),
1986                next_epoch_committee,
1987                epoch_start_configuration,
1988                global_state_hasher,
1989                &self.config.expensive_safety_check_config,
1990                last_checkpoint_seq,
1991            )
1992            .await
1993            .expect("Reconfigure authority state cannot fail");
1994        info!(next_epoch, "Node State has been reconfigured");
1995        assert_eq!(next_epoch, new_epoch_store.epoch());
1996        self.state.get_reconfig_api().update_epoch_flags_metrics(
1997            cur_epoch_store.epoch_start_config().flags(),
1998            new_epoch_store.epoch_start_config().flags(),
1999        );
2000
2001        new_epoch_store
2002    }
2003
2004    pub fn get_config(&self) -> &NodeConfig {
2005        &self.config
2006    }
2007
2008    pub fn randomness_handle(&self) -> randomness::Handle {
2009        self.randomness_handle.clone()
2010    }
2011
2012    pub fn state_sync_handle(&self) -> state_sync::Handle {
2013        self.state_sync_handle.clone()
2014    }
2015
2016    pub fn endpoint_manager(&self) -> &EndpointManager {
2017        &self.endpoint_manager
2018    }
2019
2020    /// Get a short prefix of a digest for metric labels
2021    fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2022        let digest_str = digest.to_string();
2023        if digest_str.len() >= 8 {
2024            digest_str[0..8].to_string()
2025        } else {
2026            digest_str
2027        }
2028    }
2029
2030    /// Check for previously detected forks and handle them appropriately.
2031    /// For validators with fork recovery config, clear the fork if it matches the recovery config.
2032    /// For all other cases, block node startup if a fork is detected.
2033    async fn check_and_recover_forks(
2034        checkpoint_store: &CheckpointStore,
2035        checkpoint_metrics: &CheckpointMetrics,
2036        is_validator: bool,
2037        fork_recovery: Option<&ForkRecoveryConfig>,
2038    ) -> Result<()> {
2039        // Fork detection and recovery is only relevant for validators
2040        // Fullnodes should sync from validators and don't need fork checking
2041        if !is_validator {
2042            return Ok(());
2043        }
2044
2045        // Try to recover from forks if recovery config is provided
2046        if let Some(recovery) = fork_recovery {
2047            Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2048            Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2049        }
2050
2051        if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2052            .get_checkpoint_fork_detected()
2053            .map_err(|e| {
2054                error!("Failed to check for checkpoint fork: {:?}", e);
2055                e
2056            })?
2057        {
2058            Self::handle_checkpoint_fork(
2059                checkpoint_seq,
2060                checkpoint_digest,
2061                checkpoint_metrics,
2062                fork_recovery,
2063            )
2064            .await?;
2065        }
2066        if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2067            .get_transaction_fork_detected()
2068            .map_err(|e| {
2069                error!("Failed to check for transaction fork: {:?}", e);
2070                e
2071            })?
2072        {
2073            Self::handle_transaction_fork(
2074                tx_digest,
2075                expected_effects,
2076                actual_effects,
2077                checkpoint_metrics,
2078                fork_recovery,
2079            )
2080            .await?;
2081        }
2082
2083        Ok(())
2084    }
2085
2086    fn try_recover_checkpoint_fork(
2087        checkpoint_store: &CheckpointStore,
2088        recovery: &ForkRecoveryConfig,
2089    ) -> Result<()> {
2090        // If configured overrides include a checkpoint whose locally computed digest mismatches,
2091        // clear locally computed checkpoints from that sequence (inclusive).
2092        for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2093            let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2094                anyhow::bail!(
2095                    "Invalid checkpoint digest override for seq {}: {}",
2096                    seq,
2097                    expected_digest_str
2098                );
2099            };
2100
2101            if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2102                let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2103                if local_digest != expected_digest {
2104                    info!(
2105                        seq,
2106                        local = %Self::get_digest_prefix(local_digest),
2107                        expected = %Self::get_digest_prefix(expected_digest),
2108                        "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2109                        seq
2110                    );
2111                    checkpoint_store
2112                        .clear_locally_computed_checkpoints_from(*seq)
2113                        .context(
2114                            "Failed to clear locally computed checkpoints from override seq",
2115                        )?;
2116                }
2117            }
2118        }
2119
2120        if let Some((checkpoint_seq, checkpoint_digest)) =
2121            checkpoint_store.get_checkpoint_fork_detected()?
2122            && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2123        {
2124            info!(
2125                "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2126                checkpoint_seq, checkpoint_digest
2127            );
2128            checkpoint_store
2129                .clear_checkpoint_fork_detected()
2130                .expect("Failed to clear checkpoint fork detected marker");
2131        }
2132        Ok(())
2133    }
2134
2135    fn try_recover_transaction_fork(
2136        checkpoint_store: &CheckpointStore,
2137        recovery: &ForkRecoveryConfig,
2138    ) -> Result<()> {
2139        if recovery.transaction_overrides.is_empty() {
2140            return Ok(());
2141        }
2142
2143        if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2144            && recovery
2145                .transaction_overrides
2146                .contains_key(&tx_digest.to_string())
2147        {
2148            info!(
2149                "Fork recovery enabled: clearing transaction fork for tx {:?}",
2150                tx_digest
2151            );
2152            checkpoint_store
2153                .clear_transaction_fork_detected()
2154                .expect("Failed to clear transaction fork detected marker");
2155        }
2156        Ok(())
2157    }
2158
2159    fn get_current_timestamp() -> u64 {
2160        std::time::SystemTime::now()
2161            .duration_since(std::time::SystemTime::UNIX_EPOCH)
2162            .unwrap()
2163            .as_secs()
2164    }
2165
2166    async fn handle_checkpoint_fork(
2167        checkpoint_seq: u64,
2168        checkpoint_digest: CheckpointDigest,
2169        checkpoint_metrics: &CheckpointMetrics,
2170        fork_recovery: Option<&ForkRecoveryConfig>,
2171    ) -> Result<()> {
2172        checkpoint_metrics
2173            .checkpoint_fork_crash_mode
2174            .with_label_values(&[
2175                &checkpoint_seq.to_string(),
2176                &Self::get_digest_prefix(checkpoint_digest),
2177                &Self::get_current_timestamp().to_string(),
2178            ])
2179            .set(1);
2180
2181        let behavior = fork_recovery
2182            .map(|fr| fr.fork_crash_behavior)
2183            .unwrap_or_default();
2184
2185        match behavior {
2186            ForkCrashBehavior::AwaitForkRecovery => {
2187                error!(
2188                    checkpoint_seq = checkpoint_seq,
2189                    checkpoint_digest = ?checkpoint_digest,
2190                    "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2191                );
2192                futures::future::pending::<()>().await;
2193                unreachable!("pending() should never return");
2194            }
2195            ForkCrashBehavior::ReturnError => {
2196                error!(
2197                    checkpoint_seq = checkpoint_seq,
2198                    checkpoint_digest = ?checkpoint_digest,
2199                    "Checkpoint fork detected! Returning error."
2200                );
2201                Err(anyhow::anyhow!(
2202                    "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2203                    checkpoint_seq,
2204                    checkpoint_digest
2205                ))
2206            }
2207        }
2208    }
2209
2210    async fn handle_transaction_fork(
2211        tx_digest: TransactionDigest,
2212        expected_effects_digest: TransactionEffectsDigest,
2213        actual_effects_digest: TransactionEffectsDigest,
2214        checkpoint_metrics: &CheckpointMetrics,
2215        fork_recovery: Option<&ForkRecoveryConfig>,
2216    ) -> Result<()> {
2217        checkpoint_metrics
2218            .transaction_fork_crash_mode
2219            .with_label_values(&[
2220                &Self::get_digest_prefix(tx_digest),
2221                &Self::get_digest_prefix(expected_effects_digest),
2222                &Self::get_digest_prefix(actual_effects_digest),
2223                &Self::get_current_timestamp().to_string(),
2224            ])
2225            .set(1);
2226
2227        let behavior = fork_recovery
2228            .map(|fr| fr.fork_crash_behavior)
2229            .unwrap_or_default();
2230
2231        match behavior {
2232            ForkCrashBehavior::AwaitForkRecovery => {
2233                error!(
2234                    tx_digest = ?tx_digest,
2235                    expected_effects_digest = ?expected_effects_digest,
2236                    actual_effects_digest = ?actual_effects_digest,
2237                    "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2238                );
2239                futures::future::pending::<()>().await;
2240                unreachable!("pending() should never return");
2241            }
2242            ForkCrashBehavior::ReturnError => {
2243                error!(
2244                    tx_digest = ?tx_digest,
2245                    expected_effects_digest = ?expected_effects_digest,
2246                    actual_effects_digest = ?actual_effects_digest,
2247                    "Transaction fork detected! Returning error."
2248                );
2249                Err(anyhow::anyhow!(
2250                    "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2251                    tx_digest,
2252                    expected_effects_digest,
2253                    actual_effects_digest
2254                ))
2255            }
2256        }
2257    }
2258}
2259
2260#[cfg(not(msim))]
2261impl SuiNode {
2262    async fn fetch_jwks(
2263        _authority: AuthorityName,
2264        provider: &OIDCProvider,
2265    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2266        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2267        use sui_types::error::SuiErrorKind;
2268        let client = reqwest::Client::new();
2269        fetch_jwks(provider, &client, true)
2270            .await
2271            .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2272    }
2273}
2274
2275#[cfg(msim)]
2276impl SuiNode {
2277    pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2278        self.sim_state.sim_node.id()
2279    }
2280
2281    pub fn set_safe_mode_expected(&self, new_value: bool) {
2282        info!("Setting safe mode expected to {}", new_value);
2283        self.sim_state
2284            .sim_safe_mode_expected
2285            .store(new_value, Ordering::Relaxed);
2286    }
2287
2288    #[allow(unused_variables)]
2289    async fn fetch_jwks(
2290        authority: AuthorityName,
2291        provider: &OIDCProvider,
2292    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2293        get_jwk_injector()(authority, provider)
2294    }
2295}
2296
2297enum SpawnOnce {
2298    // Mutex is only needed to make SpawnOnce Send
2299    Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2300    #[allow(unused)]
2301    Started(JoinHandle<()>),
2302}
2303
2304impl SpawnOnce {
2305    pub fn new(
2306        ready_rx: oneshot::Receiver<()>,
2307        future: impl Future<Output = ()> + Send + 'static,
2308    ) -> Self {
2309        Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2310    }
2311
2312    pub async fn start(self) -> Self {
2313        match self {
2314            Self::Unstarted(ready_rx, future) => {
2315                let future = future.into_inner();
2316                let handle = tokio::spawn(future);
2317                ready_rx.await.unwrap();
2318                Self::Started(handle)
2319            }
2320            Self::Started(_) => self,
2321        }
2322    }
2323}
2324
2325/// Updates trusted peer addresses in the p2p network.
2326fn update_peer_addresses(
2327    config: &NodeConfig,
2328    endpoint_manager: &EndpointManager,
2329    epoch_start_state: &EpochStartSystemState,
2330) {
2331    for (peer_id, address) in
2332        epoch_start_state.get_validator_as_p2p_peers(config.protocol_public_key())
2333    {
2334        endpoint_manager
2335            .update_endpoint(
2336                EndpointId::P2p(peer_id),
2337                AddressSource::Chain,
2338                vec![address],
2339            )
2340            .expect("Updating peer addresses should not fail");
2341    }
2342}
2343
2344fn build_kv_store(
2345    state: &Arc<AuthorityState>,
2346    config: &NodeConfig,
2347    registry: &Registry,
2348) -> Result<Arc<TransactionKeyValueStore>> {
2349    let metrics = KeyValueStoreMetrics::new(registry);
2350    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2351
2352    let base_url = &config.transaction_kv_store_read_config.base_url;
2353
2354    if base_url.is_empty() {
2355        info!("no http kv store url provided, using local db only");
2356        return Ok(Arc::new(db_store));
2357    }
2358
2359    let base_url: url::Url = base_url.parse().tap_err(|e| {
2360        error!(
2361            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2362            base_url, e
2363        )
2364    })?;
2365
2366    let network_str = match state.get_chain_identifier().chain() {
2367        Chain::Mainnet => "/mainnet",
2368        _ => {
2369            info!("using local db only for kv store");
2370            return Ok(Arc::new(db_store));
2371        }
2372    };
2373
2374    let base_url = base_url.join(network_str)?.to_string();
2375    let http_store = HttpKVStore::new_kv(
2376        &base_url,
2377        config.transaction_kv_store_read_config.cache_size,
2378        metrics.clone(),
2379    )?;
2380    info!("using local key-value store with fallback to http key-value store");
2381    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2382        db_store,
2383        http_store,
2384        metrics,
2385        "json_rpc_fallback",
2386    )))
2387}
2388
2389async fn build_http_servers(
2390    state: Arc<AuthorityState>,
2391    store: RocksDbStore,
2392    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2393    config: &NodeConfig,
2394    prometheus_registry: &Registry,
2395    server_version: ServerVersion,
2396) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2397    // Validators do not expose these APIs
2398    if config.consensus_config().is_some() {
2399        return Ok((HttpServers::default(), None));
2400    }
2401
2402    info!("starting rpc service with config: {:?}", config.rpc);
2403
2404    let mut router = axum::Router::new();
2405
2406    let json_rpc_router = {
2407        let traffic_controller = state.traffic_controller.clone();
2408        let mut server = JsonRpcServerBuilder::new(
2409            env!("CARGO_PKG_VERSION"),
2410            prometheus_registry,
2411            traffic_controller,
2412            config.policy_config.clone(),
2413        );
2414
2415        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2416
2417        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2418        server.register_module(ReadApi::new(
2419            state.clone(),
2420            kv_store.clone(),
2421            metrics.clone(),
2422        ))?;
2423        server.register_module(CoinReadApi::new(
2424            state.clone(),
2425            kv_store.clone(),
2426            metrics.clone(),
2427        ))?;
2428
2429        // if run_with_range is enabled we want to prevent any transactions
2430        // run_with_range = None is normal operating conditions
2431        if config.run_with_range.is_none() {
2432            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2433        }
2434        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2435        server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2436
2437        if let Some(transaction_orchestrator) = transaction_orchestrator {
2438            server.register_module(TransactionExecutionApi::new(
2439                state.clone(),
2440                transaction_orchestrator.clone(),
2441                metrics.clone(),
2442            ))?;
2443        }
2444
2445        let name_service_config =
2446            if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2447                config.name_service_package_address,
2448                config.name_service_registry_id,
2449                config.name_service_reverse_registry_id,
2450            ) {
2451                sui_name_service::NameServiceConfig::new(
2452                    package_address,
2453                    registry_id,
2454                    reverse_registry_id,
2455                )
2456            } else {
2457                match state.get_chain_identifier().chain() {
2458                    Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2459                    Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2460                    Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2461                }
2462            };
2463
2464        server.register_module(IndexerApi::new(
2465            state.clone(),
2466            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2467            kv_store,
2468            name_service_config,
2469            metrics,
2470            config.indexer_max_subscriptions,
2471        ))?;
2472        server.register_module(MoveUtils::new(state.clone()))?;
2473
2474        let server_type = config.jsonrpc_server_type();
2475
2476        server.to_router(server_type).await?
2477    };
2478
2479    router = router.merge(json_rpc_router);
2480
2481    let (subscription_service_checkpoint_sender, subscription_service_handle) =
2482        SubscriptionService::build(prometheus_registry);
2483    let rpc_router = {
2484        let mut rpc_service =
2485            sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2486        rpc_service.with_server_version(server_version);
2487
2488        if let Some(config) = config.rpc.clone() {
2489            rpc_service.with_config(config);
2490        }
2491
2492        rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2493        rpc_service.with_subscription_service(subscription_service_handle);
2494
2495        if let Some(transaction_orchestrator) = transaction_orchestrator {
2496            rpc_service.with_executor(transaction_orchestrator.clone())
2497        }
2498
2499        rpc_service.into_router().await
2500    };
2501
2502    let layers = ServiceBuilder::new()
2503        .map_request(|mut request: axum::http::Request<_>| {
2504            if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2505                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2506                request.extensions_mut().insert(axum_connect_info);
2507            }
2508            request
2509        })
2510        .layer(axum::middleware::from_fn(server_timing_middleware))
2511        // Setup a permissive CORS policy
2512        .layer(
2513            tower_http::cors::CorsLayer::new()
2514                .allow_methods([http::Method::GET, http::Method::POST])
2515                .allow_origin(tower_http::cors::Any)
2516                .allow_headers(tower_http::cors::Any)
2517                .expose_headers(tower_http::cors::Any),
2518        );
2519
2520    router = router.merge(rpc_router).layer(layers);
2521
2522    let https = if let Some((tls_config, https_address)) = config
2523        .rpc()
2524        .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2525    {
2526        let https = sui_http::Builder::new()
2527            .tls_single_cert(tls_config.cert(), tls_config.key())
2528            .and_then(|builder| builder.serve(https_address, router.clone()))
2529            .map_err(|e| anyhow::anyhow!(e))?;
2530
2531        info!(
2532            https_address =? https.local_addr(),
2533            "HTTPS rpc server listening on {}",
2534            https.local_addr()
2535        );
2536
2537        Some(https)
2538    } else {
2539        None
2540    };
2541
2542    let http = sui_http::Builder::new()
2543        .serve(&config.json_rpc_address, router)
2544        .map_err(|e| anyhow::anyhow!(e))?;
2545
2546    info!(
2547        http_address =? http.local_addr(),
2548        "HTTP rpc server listening on {}",
2549        http.local_addr()
2550    );
2551
2552    Ok((
2553        HttpServers {
2554            http: Some(http),
2555            https,
2556        },
2557        Some(subscription_service_checkpoint_sender),
2558    ))
2559}
2560
2561#[cfg(not(test))]
2562fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2563    protocol_config.max_transactions_per_checkpoint() as usize
2564}
2565
2566#[cfg(test)]
2567fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2568    2
2569}
2570
2571#[derive(Default)]
2572struct HttpServers {
2573    #[allow(unused)]
2574    http: Option<sui_http::ServerHandle>,
2575    #[allow(unused)]
2576    https: Option<sui_http::ServerHandle>,
2577}
2578
2579#[cfg(test)]
2580mod tests {
2581    use super::*;
2582    use prometheus::Registry;
2583    use std::collections::BTreeMap;
2584    use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2585    use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2586    use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2587
2588    #[tokio::test]
2589    async fn test_fork_error_and_recovery_both_paths() {
2590        let checkpoint_store = CheckpointStore::new_for_tests();
2591        let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2592
2593        // ---------- Checkpoint fork path ----------
2594        let seq_num = 42;
2595        let digest = CheckpointDigest::random();
2596        checkpoint_store
2597            .record_checkpoint_fork_detected(seq_num, digest)
2598            .unwrap();
2599
2600        let fork_recovery = ForkRecoveryConfig {
2601            transaction_overrides: Default::default(),
2602            checkpoint_overrides: Default::default(),
2603            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2604        };
2605
2606        let r = SuiNode::check_and_recover_forks(
2607            &checkpoint_store,
2608            &checkpoint_metrics,
2609            true,
2610            Some(&fork_recovery),
2611        )
2612        .await;
2613        assert!(r.is_err());
2614        assert!(
2615            r.unwrap_err()
2616                .to_string()
2617                .contains("Checkpoint fork detected")
2618        );
2619
2620        let mut checkpoint_overrides = BTreeMap::new();
2621        checkpoint_overrides.insert(seq_num, digest.to_string());
2622        let fork_recovery_with_override = ForkRecoveryConfig {
2623            transaction_overrides: Default::default(),
2624            checkpoint_overrides,
2625            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2626        };
2627        let r = SuiNode::check_and_recover_forks(
2628            &checkpoint_store,
2629            &checkpoint_metrics,
2630            true,
2631            Some(&fork_recovery_with_override),
2632        )
2633        .await;
2634        assert!(r.is_ok());
2635        assert!(
2636            checkpoint_store
2637                .get_checkpoint_fork_detected()
2638                .unwrap()
2639                .is_none()
2640        );
2641
2642        // ---------- Transaction fork path ----------
2643        let tx_digest = TransactionDigest::random();
2644        let expected_effects = TransactionEffectsDigest::random();
2645        let actual_effects = TransactionEffectsDigest::random();
2646        checkpoint_store
2647            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2648            .unwrap();
2649
2650        let fork_recovery = ForkRecoveryConfig {
2651            transaction_overrides: Default::default(),
2652            checkpoint_overrides: Default::default(),
2653            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2654        };
2655        let r = SuiNode::check_and_recover_forks(
2656            &checkpoint_store,
2657            &checkpoint_metrics,
2658            true,
2659            Some(&fork_recovery),
2660        )
2661        .await;
2662        assert!(r.is_err());
2663        assert!(
2664            r.unwrap_err()
2665                .to_string()
2666                .contains("Transaction fork detected")
2667        );
2668
2669        let mut transaction_overrides = BTreeMap::new();
2670        transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2671        let fork_recovery_with_override = ForkRecoveryConfig {
2672            transaction_overrides,
2673            checkpoint_overrides: Default::default(),
2674            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2675        };
2676        let r = SuiNode::check_and_recover_forks(
2677            &checkpoint_store,
2678            &checkpoint_metrics,
2679            true,
2680            Some(&fork_recovery_with_override),
2681        )
2682        .await;
2683        assert!(r.is_ok());
2684        assert!(
2685            checkpoint_store
2686                .get_transaction_fork_detected()
2687                .unwrap()
2688                .is_none()
2689        );
2690    }
2691}