sui_node/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::authority::ExecutionEnv;
29use sui_core::authority::RandomnessRoundReceiver;
30use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
31use sui_core::authority::backpressure::BackpressureManager;
32use sui_core::authority::epoch_start_configuration::EpochFlag;
33use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
34use sui_core::consensus_adapter::ConsensusClient;
35use sui_core::consensus_manager::UpdatableConsensusClient;
36use sui_core::epoch::randomness::RandomnessManager;
37use sui_core::execution_cache::build_execution_cache;
38use sui_network::endpoint_manager::{AddressSource, EndpointId};
39use sui_network::validator::server::SUI_TLS_SERVER_NAME;
40use sui_types::full_checkpoint_content::Checkpoint;
41
42use sui_core::global_state_hasher::GlobalStateHashMetrics;
43use sui_core::storage::RestReadStore;
44use sui_json_rpc::bridge_api::BridgeReadApi;
45use sui_json_rpc_api::JsonRpcMetrics;
46use sui_network::randomness;
47use sui_rpc_api::RpcMetrics;
48use sui_rpc_api::ServerVersion;
49use sui_rpc_api::subscription::SubscriptionService;
50use sui_types::base_types::ConciseableName;
51use sui_types::crypto::RandomnessRound;
52use sui_types::digests::{
53    ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
54};
55use sui_types::messages_consensus::AuthorityCapabilitiesV2;
56use sui_types::sui_system_state::SuiSystemState;
57use tap::tap::TapFallible;
58use tokio::sync::oneshot;
59use tokio::sync::{Mutex, broadcast, mpsc};
60use tokio::task::JoinHandle;
61use tower::ServiceBuilder;
62use tracing::{Instrument, error_span, info};
63use tracing::{debug, error, warn};
64
65// Logs at debug level in test configuration, info level otherwise.
66// JWK logs cause significant volume in tests, but are insignificant in prod,
67// so we keep them at info
68macro_rules! jwk_log {
69    ($($arg:tt)+) => {
70        if in_test_configuration() {
71            debug!($($arg)+);
72        } else {
73            info!($($arg)+);
74        }
75    };
76}
77
78use fastcrypto_zkp::bn254::zk_login::JWK;
79pub use handle::SuiNodeHandle;
80use mysten_metrics::{RegistryService, spawn_monitored_task};
81use mysten_service::server_timing::server_timing_middleware;
82use sui_config::node::{DBCheckpointConfig, RunWithRange};
83use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
84use sui_config::node_config_metrics::NodeConfigMetrics;
85use sui_config::{ConsensusConfig, NodeConfig};
86use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
87use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
88use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
89use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
90use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
91use sui_core::authority_aggregator::AuthorityAggregator;
92use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
93use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
94use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
95use sui_core::checkpoints::{
96    CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
97    SubmitCheckpointToConsensus,
98};
99use sui_core::consensus_adapter::{
100    CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
101};
102use sui_core::consensus_manager::ConsensusManager;
103use sui_core::consensus_throughput_calculator::{
104    ConsensusThroughputCalculator, ConsensusThroughputProfiler, ThroughputProfileRanges,
105};
106use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
107use sui_core::db_checkpoint_handler::DBCheckpointHandler;
108use sui_core::epoch::committee_store::CommitteeStore;
109use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
110use sui_core::epoch::epoch_metrics::EpochMetrics;
111use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
112use sui_core::global_state_hasher::GlobalStateHasher;
113use sui_core::jsonrpc_index::IndexStore;
114use sui_core::module_cache_metrics::ResolverMetrics;
115use sui_core::overload_monitor::overload_monitor;
116use sui_core::rpc_index::RpcIndexStore;
117use sui_core::signature_verifier::SignatureVerifierMetrics;
118use sui_core::storage::RocksDbStore;
119use sui_core::transaction_orchestrator::TransactionOrchestrator;
120use sui_core::{
121    authority::{AuthorityState, AuthorityStore},
122    authority_client::NetworkAuthorityClient,
123};
124use sui_json_rpc::JsonRpcServerBuilder;
125use sui_json_rpc::coin_api::CoinReadApi;
126use sui_json_rpc::governance_api::GovernanceReadApi;
127use sui_json_rpc::indexer_api::IndexerApi;
128use sui_json_rpc::move_utils::MoveUtils;
129use sui_json_rpc::read_api::ReadApi;
130use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
131use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
132use sui_macros::fail_point;
133use sui_macros::{fail_point_async, replay_log};
134use sui_network::api::ValidatorServer;
135use sui_network::discovery;
136use sui_network::endpoint_manager::EndpointManager;
137use sui_network::state_sync;
138use sui_network::validator::server::ServerBuilder;
139use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
140use sui_snapshot::uploader::StateSnapshotUploader;
141use sui_storage::{
142    http_key_value_store::HttpKVStore,
143    key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
144    key_value_store_metrics::KeyValueStoreMetrics,
145};
146use sui_types::base_types::{AuthorityName, EpochId};
147use sui_types::committee::Committee;
148use sui_types::crypto::KeypairTraits;
149use sui_types::error::{SuiError, SuiResult};
150use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
151use sui_types::sui_system_state::SuiSystemStateTrait;
152use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
153use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
154use sui_types::supported_protocol_versions::SupportedProtocolVersions;
155use typed_store::DBMetrics;
156use typed_store::rocks::default_db_options;
157
158use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
159
160pub mod admin;
161mod handle;
162pub mod metrics;
163
164pub struct ValidatorComponents {
165    validator_server_handle: SpawnOnce,
166    validator_overload_monitor_handle: Option<JoinHandle<()>>,
167    consensus_manager: Arc<ConsensusManager>,
168    consensus_store_pruner: ConsensusStorePruner,
169    consensus_adapter: Arc<ConsensusAdapter>,
170    checkpoint_metrics: Arc<CheckpointMetrics>,
171    sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
172}
173pub struct P2pComponents {
174    p2p_network: Network,
175    known_peers: HashMap<PeerId, String>,
176    discovery_handle: discovery::Handle,
177    state_sync_handle: state_sync::Handle,
178    randomness_handle: randomness::Handle,
179    endpoint_manager: EndpointManager,
180}
181
182#[cfg(msim)]
183mod simulator {
184    use std::sync::atomic::AtomicBool;
185    use sui_types::error::SuiErrorKind;
186
187    use super::*;
188    pub(super) struct SimState {
189        pub sim_node: sui_simulator::runtime::NodeHandle,
190        pub sim_safe_mode_expected: AtomicBool,
191        _leak_detector: sui_simulator::NodeLeakDetector,
192    }
193
194    impl Default for SimState {
195        fn default() -> Self {
196            Self {
197                sim_node: sui_simulator::runtime::NodeHandle::current(),
198                sim_safe_mode_expected: AtomicBool::new(false),
199                _leak_detector: sui_simulator::NodeLeakDetector::new(),
200            }
201        }
202    }
203
204    type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
205        + Send
206        + Sync
207        + 'static;
208
209    fn default_fetch_jwks(
210        _authority: AuthorityName,
211        _provider: &OIDCProvider,
212    ) -> SuiResult<Vec<(JwkId, JWK)>> {
213        use fastcrypto_zkp::bn254::zk_login::parse_jwks;
214        // Just load a default Twitch jwk for testing.
215        parse_jwks(
216            sui_types::zk_login_util::DEFAULT_JWK_BYTES,
217            &OIDCProvider::Twitch,
218            true,
219        )
220        .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
221    }
222
223    thread_local! {
224        static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
225    }
226
227    pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
228        JWK_INJECTOR.with(|injector| injector.borrow().clone())
229    }
230
231    pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
232        JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
233    }
234}
235
236#[cfg(msim)]
237pub use simulator::set_jwk_injector;
238#[cfg(msim)]
239use simulator::*;
240use sui_core::authority::authority_store_pruner::PrunerWatermarks;
241use sui_core::{
242    consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
243};
244
245const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
246
247pub struct SuiNode {
248    config: NodeConfig,
249    validator_components: Mutex<Option<ValidatorComponents>>,
250
251    /// The http servers responsible for serving RPC traffic (gRPC and JSON-RPC)
252    #[allow(unused)]
253    http_servers: HttpServers,
254
255    state: Arc<AuthorityState>,
256    transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
257    registry_service: RegistryService,
258    metrics: Arc<SuiNodeMetrics>,
259    checkpoint_metrics: Arc<CheckpointMetrics>,
260
261    _discovery: discovery::Handle,
262    _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
263    state_sync_handle: state_sync::Handle,
264    randomness_handle: randomness::Handle,
265    checkpoint_store: Arc<CheckpointStore>,
266    global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
267    connection_monitor_status: Arc<ConnectionMonitorStatus>,
268
269    /// Broadcast channel to send the starting system state for the next epoch.
270    end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
271
272    /// EndpointManager for updating peer network addresses.
273    endpoint_manager: EndpointManager,
274
275    backpressure_manager: Arc<BackpressureManager>,
276
277    _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
278
279    #[cfg(msim)]
280    sim_state: SimState,
281
282    _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
283    // Channel to allow signaling upstream to shutdown sui-node
284    shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
285
286    /// AuthorityAggregator of the network, created at start and beginning of each epoch.
287    /// Use ArcSwap so that we could mutate it without taking mut reference.
288    // TODO: Eventually we can make this auth aggregator a shared reference so that this
289    // update will automatically propagate to other uses.
290    auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
291
292    subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
293}
294
295impl fmt::Debug for SuiNode {
296    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
297        f.debug_struct("SuiNode")
298            .field("name", &self.state.name.concise())
299            .finish()
300    }
301}
302
303static MAX_JWK_KEYS_PER_FETCH: usize = 100;
304
305impl SuiNode {
306    pub async fn start(
307        config: NodeConfig,
308        registry_service: RegistryService,
309    ) -> Result<Arc<SuiNode>> {
310        Self::start_async(
311            config,
312            registry_service,
313            ServerVersion::new("sui-node", "unknown"),
314        )
315        .await
316    }
317
318    fn start_jwk_updater(
319        config: &NodeConfig,
320        metrics: Arc<SuiNodeMetrics>,
321        authority: AuthorityName,
322        epoch_store: Arc<AuthorityPerEpochStore>,
323        consensus_adapter: Arc<ConsensusAdapter>,
324    ) {
325        let epoch = epoch_store.epoch();
326
327        let supported_providers = config
328            .zklogin_oauth_providers
329            .get(&epoch_store.get_chain_identifier().chain())
330            .unwrap_or(&BTreeSet::new())
331            .iter()
332            .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
333            .collect::<Vec<_>>();
334
335        let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
336
337        info!(
338            ?fetch_interval,
339            "Starting JWK updater tasks with supported providers: {:?}", supported_providers
340        );
341
342        fn validate_jwk(
343            metrics: &Arc<SuiNodeMetrics>,
344            provider: &OIDCProvider,
345            id: &JwkId,
346            jwk: &JWK,
347        ) -> bool {
348            let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
349                warn!(
350                    "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
351                    id.iss, provider
352                );
353                metrics
354                    .invalid_jwks
355                    .with_label_values(&[&provider.to_string()])
356                    .inc();
357                return false;
358            };
359
360            if iss_provider != *provider {
361                warn!(
362                    "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
363                    id.iss, provider, iss_provider
364                );
365                metrics
366                    .invalid_jwks
367                    .with_label_values(&[&provider.to_string()])
368                    .inc();
369                return false;
370            }
371
372            if !check_total_jwk_size(id, jwk) {
373                warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
374                metrics
375                    .invalid_jwks
376                    .with_label_values(&[&provider.to_string()])
377                    .inc();
378                return false;
379            }
380
381            true
382        }
383
384        // metrics is:
385        //  pub struct SuiNodeMetrics {
386        //      pub jwk_requests: IntCounterVec,
387        //      pub jwk_request_errors: IntCounterVec,
388        //      pub total_jwks: IntCounterVec,
389        //      pub unique_jwks: IntCounterVec,
390        //  }
391
392        for p in supported_providers.into_iter() {
393            let provider_str = p.to_string();
394            let epoch_store = epoch_store.clone();
395            let consensus_adapter = consensus_adapter.clone();
396            let metrics = metrics.clone();
397            spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
398                async move {
399                    // note: restart-safe de-duplication happens after consensus, this is
400                    // just best-effort to reduce unneeded submissions.
401                    let mut seen = HashSet::new();
402                    loop {
403                        jwk_log!("fetching JWK for provider {:?}", p);
404                        metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
405                        match Self::fetch_jwks(authority, &p).await {
406                            Err(e) => {
407                                metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
408                                warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
409                                // Retry in 30 seconds
410                                tokio::time::sleep(Duration::from_secs(30)).await;
411                                continue;
412                            }
413                            Ok(mut keys) => {
414                                metrics.total_jwks
415                                    .with_label_values(&[&provider_str])
416                                    .inc_by(keys.len() as u64);
417
418                                keys.retain(|(id, jwk)| {
419                                    validate_jwk(&metrics, &p, id, jwk) &&
420                                    !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
421                                    seen.insert((id.clone(), jwk.clone()))
422                                });
423
424                                metrics.unique_jwks
425                                    .with_label_values(&[&provider_str])
426                                    .inc_by(keys.len() as u64);
427
428                                // prevent oauth providers from sending too many keys,
429                                // inadvertently or otherwise
430                                if keys.len() > MAX_JWK_KEYS_PER_FETCH {
431                                    warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
432                                    keys.truncate(MAX_JWK_KEYS_PER_FETCH);
433                                }
434
435                                for (id, jwk) in keys.into_iter() {
436                                    jwk_log!("Submitting JWK to consensus: {:?}", id);
437
438                                    let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
439                                    consensus_adapter.submit(txn, None, &epoch_store, None, None)
440                                        .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
441                                        .ok();
442                                }
443                            }
444                        }
445                        tokio::time::sleep(fetch_interval).await;
446                    }
447                }
448                .instrument(error_span!("jwk_updater_task", epoch)),
449            ));
450        }
451    }
452
453    pub async fn start_async(
454        config: NodeConfig,
455        registry_service: RegistryService,
456        server_version: ServerVersion,
457    ) -> Result<Arc<SuiNode>> {
458        NodeConfigMetrics::new(&registry_service.default_registry()).record_metrics(&config);
459        let mut config = config.clone();
460        if config.supported_protocol_versions.is_none() {
461            info!(
462                "populating config.supported_protocol_versions with default {:?}",
463                SupportedProtocolVersions::SYSTEM_DEFAULT
464            );
465            config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
466        }
467
468        let run_with_range = config.run_with_range;
469        let is_validator = config.consensus_config().is_some();
470        let is_full_node = !is_validator;
471        let prometheus_registry = registry_service.default_registry();
472
473        info!(node =? config.protocol_public_key(),
474            "Initializing sui-node listening on {}", config.network_address
475        );
476
477        // Initialize metrics to track db usage before creating any stores
478        DBMetrics::init(registry_service.clone());
479
480        // Initialize db sync-to-disk setting from config (falls back to env var if not set)
481        typed_store::init_write_sync(config.enable_db_sync_to_disk);
482
483        // Initialize Mysten metrics.
484        mysten_metrics::init_metrics(&prometheus_registry);
485        // Unsupported (because of the use of static variable) and unnecessary in simtests.
486        #[cfg(not(msim))]
487        mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
488
489        let genesis = config.genesis()?.clone();
490
491        let secret = Arc::pin(config.protocol_key_pair().copy());
492        let genesis_committee = genesis.committee();
493        let committee_store = Arc::new(CommitteeStore::new(
494            config.db_path().join("epochs"),
495            &genesis_committee,
496            None,
497        ));
498
499        let pruner_watermarks = Arc::new(PrunerWatermarks::default());
500        let checkpoint_store = CheckpointStore::new(
501            &config.db_path().join("checkpoints"),
502            pruner_watermarks.clone(),
503        );
504        let checkpoint_metrics = CheckpointMetrics::new(&registry_service.default_registry());
505
506        Self::check_and_recover_forks(
507            &checkpoint_store,
508            &checkpoint_metrics,
509            is_validator,
510            config.fork_recovery.as_ref(),
511        )
512        .await?;
513
514        // By default, only enable write stall on validators for perpetual db.
515        let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
516        let perpetual_tables_options = AuthorityPerpetualTablesOptions {
517            enable_write_stall,
518            is_validator,
519        };
520        let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
521            &config.db_path().join("store"),
522            Some(perpetual_tables_options),
523            Some(pruner_watermarks.epoch_id.clone()),
524        ));
525        let is_genesis = perpetual_tables
526            .database_is_empty()
527            .expect("Database read should not fail at init.");
528
529        let backpressure_manager =
530            BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
531
532        let store =
533            AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
534
535        let cur_epoch = store.get_recovery_epoch_at_restart()?;
536        let committee = committee_store
537            .get_committee(&cur_epoch)?
538            .expect("Committee of the current epoch must exist");
539        let epoch_start_configuration = store
540            .get_epoch_start_configuration()?
541            .expect("EpochStartConfiguration of the current epoch must exist");
542        let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
543        let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
544
545        let cache_traits = build_execution_cache(
546            &config.execution_cache,
547            &prometheus_registry,
548            &store,
549            backpressure_manager.clone(),
550        );
551
552        let auth_agg = {
553            let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
554            Arc::new(ArcSwap::new(Arc::new(
555                AuthorityAggregator::new_from_epoch_start_state(
556                    epoch_start_configuration.epoch_start_state(),
557                    &committee_store,
558                    safe_client_metrics_base,
559                ),
560            )))
561        };
562
563        let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
564        let chain = match config.chain_override_for_testing {
565            Some(chain) => chain,
566            None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
567        };
568
569        let highest_executed_checkpoint = checkpoint_store
570            .get_highest_executed_checkpoint_seq_number()
571            .expect("checkpoint store read cannot fail")
572            .unwrap_or(0);
573
574        let previous_epoch_last_checkpoint = if cur_epoch == 0 {
575            0
576        } else {
577            checkpoint_store
578                .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
579                .expect("checkpoint store read cannot fail")
580                .unwrap_or(highest_executed_checkpoint)
581        };
582
583        let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
584        let epoch_store = AuthorityPerEpochStore::new(
585            config.protocol_public_key(),
586            committee.clone(),
587            &config.db_path().join("store"),
588            Some(epoch_options.options),
589            EpochMetrics::new(&registry_service.default_registry()),
590            epoch_start_configuration,
591            cache_traits.backing_package_store.clone(),
592            cache_traits.object_store.clone(),
593            cache_metrics,
594            signature_verifier_metrics,
595            &config.expensive_safety_check_config,
596            (chain_id, chain),
597            highest_executed_checkpoint,
598            previous_epoch_last_checkpoint,
599            Arc::new(SubmittedTransactionCacheMetrics::new(
600                &registry_service.default_registry(),
601            )),
602        )?;
603
604        info!("created epoch store");
605
606        replay_log!(
607            "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
608            epoch_store.epoch(),
609            epoch_store.protocol_config()
610        );
611
612        // the database is empty at genesis time
613        if is_genesis {
614            info!("checking SUI conservation at genesis");
615            // When we are opening the db table, the only time when it's safe to
616            // check SUI conservation is at genesis. Otherwise we may be in the middle of
617            // an epoch and the SUI conservation check will fail. This also initialize
618            // the expected_network_sui_amount table.
619            cache_traits
620                .reconfig_api
621                .expensive_check_sui_conservation(&epoch_store)
622                .expect("SUI conservation check cannot fail at genesis");
623        }
624
625        let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
626        let default_buffer_stake = epoch_store
627            .protocol_config()
628            .buffer_stake_for_protocol_upgrade_bps();
629        if effective_buffer_stake != default_buffer_stake {
630            warn!(
631                ?effective_buffer_stake,
632                ?default_buffer_stake,
633                "buffer_stake_for_protocol_upgrade_bps is currently overridden"
634            );
635        }
636
637        checkpoint_store.insert_genesis_checkpoint(
638            genesis.checkpoint(),
639            genesis.checkpoint_contents().clone(),
640            &epoch_store,
641        );
642
643        info!("creating state sync store");
644        let state_sync_store = RocksDbStore::new(
645            cache_traits.clone(),
646            committee_store.clone(),
647            checkpoint_store.clone(),
648        );
649
650        let index_store = if is_full_node && config.enable_index_processing {
651            info!("creating jsonrpc index store");
652            Some(Arc::new(IndexStore::new(
653                config.db_path().join("indexes"),
654                &prometheus_registry,
655                epoch_store
656                    .protocol_config()
657                    .max_move_identifier_len_as_option(),
658                config.remove_deprecated_tables,
659            )))
660        } else {
661            None
662        };
663
664        let rpc_index = if is_full_node && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
665            info!("creating rpc index store");
666            Some(Arc::new(
667                RpcIndexStore::new(
668                    &config.db_path(),
669                    &store,
670                    &checkpoint_store,
671                    &epoch_store,
672                    &cache_traits.backing_package_store,
673                    pruner_watermarks.checkpoint_id.clone(),
674                    config.rpc().cloned().unwrap_or_default(),
675                )
676                .await,
677            ))
678        } else {
679            None
680        };
681
682        let chain_identifier = epoch_store.get_chain_identifier();
683
684        info!("creating archive reader");
685        // Create network
686        let (randomness_tx, randomness_rx) = mpsc::channel(
687            config
688                .p2p_config
689                .randomness
690                .clone()
691                .unwrap_or_default()
692                .mailbox_capacity(),
693        );
694        let P2pComponents {
695            p2p_network,
696            known_peers,
697            discovery_handle,
698            state_sync_handle,
699            randomness_handle,
700            endpoint_manager,
701        } = Self::create_p2p_network(
702            &config,
703            state_sync_store.clone(),
704            chain_identifier,
705            randomness_tx,
706            &prometheus_registry,
707        )?;
708
709        // Inject configured peer address overrides.
710        for peer in &config.p2p_config.peer_address_overrides {
711            endpoint_manager
712                .update_endpoint(
713                    EndpointId::P2p(peer.peer_id),
714                    AddressSource::Config,
715                    peer.addresses.clone(),
716                )
717                .expect("Updating peer address overrides should not fail");
718        }
719
720        // Send initial peer addresses to the p2p network.
721        update_peer_addresses(&config, &endpoint_manager, epoch_store.epoch_start_state());
722
723        info!("start snapshot upload");
724        // Start uploading state snapshot to remote store
725        let state_snapshot_handle = Self::start_state_snapshot(
726            &config,
727            &prometheus_registry,
728            checkpoint_store.clone(),
729            chain_identifier,
730        )?;
731
732        // Start uploading db checkpoints to remote store
733        info!("start db checkpoint");
734        let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
735            &config,
736            &prometheus_registry,
737            state_snapshot_handle.is_some(),
738        )?;
739
740        if !epoch_store
741            .protocol_config()
742            .simplified_unwrap_then_delete()
743        {
744            // We cannot prune tombstones if simplified_unwrap_then_delete is not enabled.
745            config
746                .authority_store_pruning_config
747                .set_killswitch_tombstone_pruning(true);
748        }
749
750        let authority_name = config.protocol_public_key();
751
752        info!("create authority state");
753        let state = AuthorityState::new(
754            authority_name,
755            secret,
756            config.supported_protocol_versions.unwrap(),
757            store.clone(),
758            cache_traits.clone(),
759            epoch_store.clone(),
760            committee_store.clone(),
761            index_store.clone(),
762            rpc_index,
763            checkpoint_store.clone(),
764            &prometheus_registry,
765            genesis.objects(),
766            &db_checkpoint_config,
767            config.clone(),
768            chain_identifier,
769            config.policy_config.clone(),
770            config.firewall_config.clone(),
771            pruner_watermarks,
772        )
773        .await;
774        // ensure genesis txn was executed
775        if epoch_store.epoch() == 0 {
776            let txn = &genesis.transaction();
777            let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
778            let transaction =
779                sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
780                    sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
781                        genesis.transaction().data().clone(),
782                        sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
783                    ),
784                );
785            state
786                .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
787                .instrument(span)
788                .await
789                .unwrap();
790        }
791
792        // Start the loop that receives new randomness and generates transactions for it.
793        RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
794
795        if config
796            .expensive_safety_check_config
797            .enable_secondary_index_checks()
798            && let Some(indexes) = state.indexes.clone()
799        {
800            sui_core::verify_indexes::verify_indexes(
801                state.get_global_state_hash_store().as_ref(),
802                indexes,
803            )
804            .expect("secondary indexes are inconsistent");
805        }
806
807        let (end_of_epoch_channel, end_of_epoch_receiver) =
808            broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
809
810        let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
811            Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
812                auth_agg.load_full(),
813                state.clone(),
814                end_of_epoch_receiver,
815                &config.db_path(),
816                &prometheus_registry,
817                &config,
818            )))
819        } else {
820            None
821        };
822
823        let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
824            state.clone(),
825            state_sync_store,
826            &transaction_orchestrator.clone(),
827            &config,
828            &prometheus_registry,
829            server_version,
830        )
831        .await?;
832
833        let global_state_hasher = Arc::new(GlobalStateHasher::new(
834            cache_traits.global_state_hash_store.clone(),
835            GlobalStateHashMetrics::new(&prometheus_registry),
836        ));
837
838        let authority_names_to_peer_ids = epoch_store
839            .epoch_start_state()
840            .get_authority_names_to_peer_ids();
841
842        let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
843            "sui",
844            &registry_service.default_registry(),
845        );
846
847        let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
848
849        let connection_monitor_handle =
850            mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
851                p2p_network.downgrade(),
852                Arc::new(network_connection_metrics),
853                known_peers,
854            );
855
856        let connection_monitor_status = ConnectionMonitorStatus {
857            connection_statuses: connection_monitor_handle.connection_statuses(),
858            authority_names_to_peer_ids,
859        };
860
861        let connection_monitor_status = Arc::new(connection_monitor_status);
862        let sui_node_metrics = Arc::new(SuiNodeMetrics::new(&registry_service.default_registry()));
863
864        sui_node_metrics
865            .binary_max_protocol_version
866            .set(ProtocolVersion::MAX.as_u64() as i64);
867        sui_node_metrics
868            .configured_max_protocol_version
869            .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
870
871        let validator_components = if state.is_validator(&epoch_store) {
872            let mut components = Self::construct_validator_components(
873                config.clone(),
874                state.clone(),
875                committee,
876                epoch_store.clone(),
877                checkpoint_store.clone(),
878                state_sync_handle.clone(),
879                randomness_handle.clone(),
880                Arc::downgrade(&global_state_hasher),
881                backpressure_manager.clone(),
882                connection_monitor_status.clone(),
883                &registry_service,
884                sui_node_metrics.clone(),
885                checkpoint_metrics.clone(),
886            )
887            .await?;
888
889            components.consensus_adapter.submit_recovered(&epoch_store);
890
891            // Start the gRPC server
892            components.validator_server_handle = components.validator_server_handle.start().await;
893
894            // Set the consensus address updater so that we can update the consensus peer addresses when requested.
895            endpoint_manager.set_consensus_address_updater(components.consensus_manager.clone());
896
897            Some(components)
898        } else {
899            None
900        };
901
902        // setup shutdown channel
903        let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
904
905        let node = Self {
906            config,
907            validator_components: Mutex::new(validator_components),
908            http_servers,
909            state,
910            transaction_orchestrator,
911            registry_service,
912            metrics: sui_node_metrics,
913            checkpoint_metrics,
914
915            _discovery: discovery_handle,
916            _connection_monitor_handle: connection_monitor_handle,
917            state_sync_handle,
918            randomness_handle,
919            checkpoint_store,
920            global_state_hasher: Mutex::new(Some(global_state_hasher)),
921            end_of_epoch_channel,
922            connection_monitor_status,
923            endpoint_manager,
924            backpressure_manager,
925
926            _db_checkpoint_handle: db_checkpoint_handle,
927
928            #[cfg(msim)]
929            sim_state: Default::default(),
930
931            _state_snapshot_uploader_handle: state_snapshot_handle,
932            shutdown_channel_tx: shutdown_channel,
933
934            auth_agg,
935            subscription_service_checkpoint_sender,
936        };
937
938        info!("SuiNode started!");
939        let node = Arc::new(node);
940        let node_copy = node.clone();
941        spawn_monitored_task!(async move {
942            let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
943            if let Err(error) = result {
944                warn!("Reconfiguration finished with error {:?}", error);
945            }
946        });
947
948        Ok(node)
949    }
950
951    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
952        self.end_of_epoch_channel.subscribe()
953    }
954
955    pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
956        self.shutdown_channel_tx.subscribe()
957    }
958
959    pub fn current_epoch_for_testing(&self) -> EpochId {
960        self.state.current_epoch_for_testing()
961    }
962
963    pub fn db_checkpoint_path(&self) -> PathBuf {
964        self.config.db_checkpoint_path()
965    }
966
967    // Init reconfig process by starting to reject user certs
968    pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
969        info!("close_epoch (current epoch = {})", epoch_store.epoch());
970        self.validator_components
971            .lock()
972            .await
973            .as_ref()
974            .ok_or_else(|| SuiError::from("Node is not a validator"))?
975            .consensus_adapter
976            .close_epoch(epoch_store);
977        Ok(())
978    }
979
980    pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
981        self.state
982            .clear_override_protocol_upgrade_buffer_stake(epoch)
983    }
984
985    pub fn set_override_protocol_upgrade_buffer_stake(
986        &self,
987        epoch: EpochId,
988        buffer_stake_bps: u64,
989    ) -> SuiResult {
990        self.state
991            .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
992    }
993
994    // Testing-only API to start epoch close process.
995    // For production code, please use the non-testing version.
996    pub async fn close_epoch_for_testing(&self) -> SuiResult {
997        let epoch_store = self.state.epoch_store_for_testing();
998        self.close_epoch(&epoch_store).await
999    }
1000
1001    fn start_state_snapshot(
1002        config: &NodeConfig,
1003        prometheus_registry: &Registry,
1004        checkpoint_store: Arc<CheckpointStore>,
1005        chain_identifier: ChainIdentifier,
1006    ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1007        if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1008            let snapshot_uploader = StateSnapshotUploader::new(
1009                &config.db_checkpoint_path(),
1010                &config.snapshot_path(),
1011                remote_store_config.clone(),
1012                60,
1013                prometheus_registry,
1014                checkpoint_store,
1015                chain_identifier,
1016                config.state_snapshot_write_config.archive_interval_epochs,
1017            )?;
1018            Ok(Some(snapshot_uploader.start()))
1019        } else {
1020            Ok(None)
1021        }
1022    }
1023
1024    fn start_db_checkpoint(
1025        config: &NodeConfig,
1026        prometheus_registry: &Registry,
1027        state_snapshot_enabled: bool,
1028    ) -> Result<(
1029        DBCheckpointConfig,
1030        Option<tokio::sync::broadcast::Sender<()>>,
1031    )> {
1032        let checkpoint_path = Some(
1033            config
1034                .db_checkpoint_config
1035                .checkpoint_path
1036                .clone()
1037                .unwrap_or_else(|| config.db_checkpoint_path()),
1038        );
1039        let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1040            DBCheckpointConfig {
1041                checkpoint_path,
1042                perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1043                    true
1044                } else {
1045                    config
1046                        .db_checkpoint_config
1047                        .perform_db_checkpoints_at_epoch_end
1048                },
1049                ..config.db_checkpoint_config.clone()
1050            }
1051        } else {
1052            config.db_checkpoint_config.clone()
1053        };
1054
1055        match (
1056            db_checkpoint_config.object_store_config.as_ref(),
1057            state_snapshot_enabled,
1058        ) {
1059            // If db checkpoint config object store not specified but
1060            // state snapshot object store is specified, create handler
1061            // anyway for marking db checkpoints as completed so that they
1062            // can be uploaded as state snapshots.
1063            (None, false) => Ok((db_checkpoint_config, None)),
1064            (_, _) => {
1065                let handler = DBCheckpointHandler::new(
1066                    &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1067                    db_checkpoint_config.object_store_config.as_ref(),
1068                    60,
1069                    db_checkpoint_config
1070                        .prune_and_compact_before_upload
1071                        .unwrap_or(true),
1072                    config.authority_store_pruning_config.clone(),
1073                    prometheus_registry,
1074                    state_snapshot_enabled,
1075                )?;
1076                Ok((
1077                    db_checkpoint_config,
1078                    Some(DBCheckpointHandler::start(handler)),
1079                ))
1080            }
1081        }
1082    }
1083
1084    fn create_p2p_network(
1085        config: &NodeConfig,
1086        state_sync_store: RocksDbStore,
1087        chain_identifier: ChainIdentifier,
1088        randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1089        prometheus_registry: &Registry,
1090    ) -> Result<P2pComponents> {
1091        let mut p2p_config = config.p2p_config.clone();
1092        {
1093            let disc = p2p_config.discovery.get_or_insert_with(Default::default);
1094            if disc.peer_addr_store_path.is_none() {
1095                disc.peer_addr_store_path =
1096                    Some(config.db_path().join("discovery_peer_cache.yaml"));
1097            }
1098        }
1099        let mut discovery_builder = discovery::Builder::new().config(p2p_config.clone());
1100        if let Some(consensus_config) = &config.consensus_config {
1101            let effective_addr = consensus_config
1102                .external_address
1103                .as_ref()
1104                .or(consensus_config.listen_address.as_ref());
1105            if let Some(addr) = effective_addr {
1106                discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1107            }
1108        }
1109        let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1110        let discovery_sender = discovery.sender();
1111
1112        let (state_sync, state_sync_router) = state_sync::Builder::new()
1113            .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1114            .store(state_sync_store)
1115            .archive_config(config.archive_reader_config())
1116            .discovery_sender(discovery_sender)
1117            .with_metrics(prometheus_registry)
1118            .build();
1119
1120        let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1121        let known_peers: HashMap<PeerId, String> = discovery_config
1122            .allowlisted_peers
1123            .clone()
1124            .into_iter()
1125            .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1126            .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1127                peer.peer_id
1128                    .map(|peer_id| (peer_id, "seed_peer".to_string()))
1129            }))
1130            .collect();
1131
1132        let (randomness, randomness_router) =
1133            randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1134                .config(config.p2p_config.randomness.clone().unwrap_or_default())
1135                .with_metrics(prometheus_registry)
1136                .build();
1137
1138        let p2p_network = {
1139            let routes = anemo::Router::new()
1140                .add_rpc_service(discovery_server)
1141                .merge(state_sync_router);
1142            let routes = routes.merge(randomness_router);
1143
1144            let inbound_network_metrics =
1145                mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1146            let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1147                "sui",
1148                "outbound",
1149                prometheus_registry,
1150            );
1151
1152            let service = ServiceBuilder::new()
1153                .layer(
1154                    TraceLayer::new_for_server_errors()
1155                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1156                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1157                )
1158                .layer(CallbackLayer::new(
1159                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1160                        Arc::new(inbound_network_metrics),
1161                        config.p2p_config.excessive_message_size(),
1162                    ),
1163                ))
1164                .service(routes);
1165
1166            let outbound_layer = ServiceBuilder::new()
1167                .layer(
1168                    TraceLayer::new_for_client_and_server_errors()
1169                        .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1170                        .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1171                )
1172                .layer(CallbackLayer::new(
1173                    mysten_network::metrics::MetricsMakeCallbackHandler::new(
1174                        Arc::new(outbound_network_metrics),
1175                        config.p2p_config.excessive_message_size(),
1176                    ),
1177                ))
1178                .into_inner();
1179
1180            let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1181            // Set the max_frame_size to be 1 GB to work around the issue of there being too many
1182            // staking events in the epoch change txn.
1183            anemo_config.max_frame_size = Some(1 << 30);
1184
1185            // Set a higher default value for socket send/receive buffers if not already
1186            // configured.
1187            let mut quic_config = anemo_config.quic.unwrap_or_default();
1188            if quic_config.socket_send_buffer_size.is_none() {
1189                quic_config.socket_send_buffer_size = Some(20 << 20);
1190            }
1191            if quic_config.socket_receive_buffer_size.is_none() {
1192                quic_config.socket_receive_buffer_size = Some(20 << 20);
1193            }
1194            quic_config.allow_failed_socket_buffer_size_setting = true;
1195
1196            // Set high-performance defaults for quinn transport.
1197            // With 200MiB buffer size and ~500ms RTT, max throughput ~400MiB/s.
1198            if quic_config.max_concurrent_bidi_streams.is_none() {
1199                quic_config.max_concurrent_bidi_streams = Some(500);
1200            }
1201            if quic_config.max_concurrent_uni_streams.is_none() {
1202                quic_config.max_concurrent_uni_streams = Some(500);
1203            }
1204            if quic_config.stream_receive_window.is_none() {
1205                quic_config.stream_receive_window = Some(100 << 20);
1206            }
1207            if quic_config.receive_window.is_none() {
1208                quic_config.receive_window = Some(200 << 20);
1209            }
1210            if quic_config.send_window.is_none() {
1211                quic_config.send_window = Some(200 << 20);
1212            }
1213            if quic_config.crypto_buffer_size.is_none() {
1214                quic_config.crypto_buffer_size = Some(1 << 20);
1215            }
1216            if quic_config.max_idle_timeout_ms.is_none() {
1217                quic_config.max_idle_timeout_ms = Some(10_000);
1218            }
1219            if quic_config.keep_alive_interval_ms.is_none() {
1220                quic_config.keep_alive_interval_ms = Some(5_000);
1221            }
1222            anemo_config.quic = Some(quic_config);
1223
1224            let server_name = format!("sui-{}", chain_identifier);
1225            let network = Network::bind(config.p2p_config.listen_address)
1226                .server_name(&server_name)
1227                .private_key(config.network_key_pair().copy().private().0.to_bytes())
1228                .config(anemo_config)
1229                .outbound_request_layer(outbound_layer)
1230                .start(service)?;
1231            info!(
1232                server_name = server_name,
1233                "P2p network started on {}",
1234                network.local_addr()
1235            );
1236
1237            network
1238        };
1239
1240        let discovery_handle =
1241            discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1242        let state_sync_handle = state_sync.start(p2p_network.clone());
1243        let randomness_handle = randomness.start(p2p_network.clone());
1244
1245        Ok(P2pComponents {
1246            p2p_network,
1247            known_peers,
1248            discovery_handle,
1249            state_sync_handle,
1250            randomness_handle,
1251            endpoint_manager,
1252        })
1253    }
1254
1255    async fn construct_validator_components(
1256        config: NodeConfig,
1257        state: Arc<AuthorityState>,
1258        committee: Arc<Committee>,
1259        epoch_store: Arc<AuthorityPerEpochStore>,
1260        checkpoint_store: Arc<CheckpointStore>,
1261        state_sync_handle: state_sync::Handle,
1262        randomness_handle: randomness::Handle,
1263        global_state_hasher: Weak<GlobalStateHasher>,
1264        backpressure_manager: Arc<BackpressureManager>,
1265        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1266        registry_service: &RegistryService,
1267        sui_node_metrics: Arc<SuiNodeMetrics>,
1268        checkpoint_metrics: Arc<CheckpointMetrics>,
1269    ) -> Result<ValidatorComponents> {
1270        let mut config_clone = config.clone();
1271        let consensus_config = config_clone
1272            .consensus_config
1273            .as_mut()
1274            .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1275
1276        let client = Arc::new(UpdatableConsensusClient::new());
1277        let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1278            &committee,
1279            consensus_config,
1280            state.name,
1281            connection_monitor_status.clone(),
1282            &registry_service.default_registry(),
1283            epoch_store.protocol_config().clone(),
1284            client.clone(),
1285            checkpoint_store.clone(),
1286        ));
1287        let consensus_manager = Arc::new(ConsensusManager::new(
1288            &config,
1289            consensus_config,
1290            registry_service,
1291            client,
1292        ));
1293
1294        // This only gets started up once, not on every epoch. (Make call to remove every epoch.)
1295        let consensus_store_pruner = ConsensusStorePruner::new(
1296            consensus_manager.get_storage_base_path(),
1297            consensus_config.db_retention_epochs(),
1298            consensus_config.db_pruner_period(),
1299            &registry_service.default_registry(),
1300        );
1301
1302        let sui_tx_validator_metrics =
1303            SuiTxValidatorMetrics::new(&registry_service.default_registry());
1304
1305        let validator_server_handle = Self::start_grpc_validator_service(
1306            &config,
1307            state.clone(),
1308            consensus_adapter.clone(),
1309            &registry_service.default_registry(),
1310        )
1311        .await?;
1312
1313        // Starts an overload monitor that monitors the execution of the authority.
1314        // Don't start the overload monitor when max_load_shedding_percentage is 0.
1315        let validator_overload_monitor_handle = if config
1316            .authority_overload_config
1317            .max_load_shedding_percentage
1318            > 0
1319        {
1320            let authority_state = Arc::downgrade(&state);
1321            let overload_config = config.authority_overload_config.clone();
1322            fail_point!("starting_overload_monitor");
1323            Some(spawn_monitored_task!(overload_monitor(
1324                authority_state,
1325                overload_config,
1326            )))
1327        } else {
1328            None
1329        };
1330
1331        Self::start_epoch_specific_validator_components(
1332            &config,
1333            state.clone(),
1334            consensus_adapter,
1335            checkpoint_store,
1336            epoch_store,
1337            state_sync_handle,
1338            randomness_handle,
1339            consensus_manager,
1340            consensus_store_pruner,
1341            global_state_hasher,
1342            backpressure_manager,
1343            validator_server_handle,
1344            validator_overload_monitor_handle,
1345            checkpoint_metrics,
1346            sui_node_metrics,
1347            sui_tx_validator_metrics,
1348        )
1349        .await
1350    }
1351
1352    async fn start_epoch_specific_validator_components(
1353        config: &NodeConfig,
1354        state: Arc<AuthorityState>,
1355        consensus_adapter: Arc<ConsensusAdapter>,
1356        checkpoint_store: Arc<CheckpointStore>,
1357        epoch_store: Arc<AuthorityPerEpochStore>,
1358        state_sync_handle: state_sync::Handle,
1359        randomness_handle: randomness::Handle,
1360        consensus_manager: Arc<ConsensusManager>,
1361        consensus_store_pruner: ConsensusStorePruner,
1362        state_hasher: Weak<GlobalStateHasher>,
1363        backpressure_manager: Arc<BackpressureManager>,
1364        validator_server_handle: SpawnOnce,
1365        validator_overload_monitor_handle: Option<JoinHandle<()>>,
1366        checkpoint_metrics: Arc<CheckpointMetrics>,
1367        sui_node_metrics: Arc<SuiNodeMetrics>,
1368        sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1369    ) -> Result<ValidatorComponents> {
1370        let checkpoint_service = Self::build_checkpoint_service(
1371            config,
1372            consensus_adapter.clone(),
1373            checkpoint_store.clone(),
1374            epoch_store.clone(),
1375            state.clone(),
1376            state_sync_handle,
1377            state_hasher,
1378            checkpoint_metrics.clone(),
1379        );
1380
1381        // create a new map that gets injected into both the consensus handler and the consensus adapter
1382        // the consensus handler will write values forwarded from consensus, and the consensus adapter
1383        // will read the values to make decisions about which validator submits a transaction to consensus
1384        let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1385
1386        consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1387
1388        if epoch_store.randomness_state_enabled() {
1389            let randomness_manager = RandomnessManager::try_new(
1390                Arc::downgrade(&epoch_store),
1391                Box::new(consensus_adapter.clone()),
1392                randomness_handle,
1393                config.protocol_key_pair(),
1394            )
1395            .await;
1396            if let Some(randomness_manager) = randomness_manager {
1397                epoch_store
1398                    .set_randomness_manager(randomness_manager)
1399                    .await?;
1400            }
1401        }
1402
1403        ExecutionTimeObserver::spawn(
1404            epoch_store.clone(),
1405            Box::new(consensus_adapter.clone()),
1406            config
1407                .execution_time_observer_config
1408                .clone()
1409                .unwrap_or_default(),
1410        );
1411
1412        let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1413            None,
1414            state.metrics.clone(),
1415        ));
1416
1417        let throughput_profiler = Arc::new(ConsensusThroughputProfiler::new(
1418            throughput_calculator.clone(),
1419            None,
1420            None,
1421            state.metrics.clone(),
1422            ThroughputProfileRanges::from_chain(epoch_store.get_chain_identifier()),
1423        ));
1424
1425        consensus_adapter.swap_throughput_profiler(throughput_profiler);
1426
1427        let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1428            state.clone(),
1429            checkpoint_service.clone(),
1430            epoch_store.clone(),
1431            consensus_adapter.clone(),
1432            low_scoring_authorities,
1433            throughput_calculator,
1434            backpressure_manager,
1435            config.congestion_log.clone(),
1436        );
1437
1438        info!("Starting consensus manager asynchronously");
1439
1440        // Spawn consensus startup asynchronously to avoid blocking other components
1441        tokio::spawn({
1442            let config = config.clone();
1443            let epoch_store = epoch_store.clone();
1444            let sui_tx_validator = SuiTxValidator::new(
1445                state.clone(),
1446                epoch_store.clone(),
1447                checkpoint_service.clone(),
1448                sui_tx_validator_metrics.clone(),
1449            );
1450            let consensus_manager = consensus_manager.clone();
1451            async move {
1452                consensus_manager
1453                    .start(
1454                        &config,
1455                        epoch_store,
1456                        consensus_handler_initializer,
1457                        sui_tx_validator,
1458                    )
1459                    .await;
1460            }
1461        });
1462        let replay_waiter = consensus_manager.replay_waiter();
1463
1464        info!("Spawning checkpoint service");
1465        let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1466            None
1467        } else {
1468            Some(replay_waiter)
1469        };
1470        checkpoint_service
1471            .spawn(epoch_store.clone(), replay_waiter)
1472            .await;
1473
1474        if epoch_store.authenticator_state_enabled() {
1475            Self::start_jwk_updater(
1476                config,
1477                sui_node_metrics,
1478                state.name,
1479                epoch_store.clone(),
1480                consensus_adapter.clone(),
1481            );
1482        }
1483
1484        Ok(ValidatorComponents {
1485            validator_server_handle,
1486            validator_overload_monitor_handle,
1487            consensus_manager,
1488            consensus_store_pruner,
1489            consensus_adapter,
1490            checkpoint_metrics,
1491            sui_tx_validator_metrics,
1492        })
1493    }
1494
1495    fn build_checkpoint_service(
1496        config: &NodeConfig,
1497        consensus_adapter: Arc<ConsensusAdapter>,
1498        checkpoint_store: Arc<CheckpointStore>,
1499        epoch_store: Arc<AuthorityPerEpochStore>,
1500        state: Arc<AuthorityState>,
1501        state_sync_handle: state_sync::Handle,
1502        state_hasher: Weak<GlobalStateHasher>,
1503        checkpoint_metrics: Arc<CheckpointMetrics>,
1504    ) -> Arc<CheckpointService> {
1505        let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1506        let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1507
1508        debug!(
1509            "Starting checkpoint service with epoch start timestamp {}
1510            and epoch duration {}",
1511            epoch_start_timestamp_ms, epoch_duration_ms
1512        );
1513
1514        let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1515            sender: consensus_adapter,
1516            signer: state.secret.clone(),
1517            authority: config.protocol_public_key(),
1518            next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1519                .checked_add(epoch_duration_ms)
1520                .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1521            metrics: checkpoint_metrics.clone(),
1522        });
1523
1524        let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1525        let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1526        let max_checkpoint_size_bytes =
1527            epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1528
1529        CheckpointService::build(
1530            state.clone(),
1531            checkpoint_store,
1532            epoch_store,
1533            state.get_transaction_cache_reader().clone(),
1534            state_hasher,
1535            checkpoint_output,
1536            Box::new(certified_checkpoint_output),
1537            checkpoint_metrics,
1538            max_tx_per_checkpoint,
1539            max_checkpoint_size_bytes,
1540        )
1541    }
1542
1543    fn construct_consensus_adapter(
1544        committee: &Committee,
1545        consensus_config: &ConsensusConfig,
1546        authority: AuthorityName,
1547        connection_monitor_status: Arc<ConnectionMonitorStatus>,
1548        prometheus_registry: &Registry,
1549        protocol_config: ProtocolConfig,
1550        consensus_client: Arc<dyn ConsensusClient>,
1551        checkpoint_store: Arc<CheckpointStore>,
1552    ) -> ConsensusAdapter {
1553        let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1554        // The consensus adapter allows the authority to send user certificates through consensus.
1555
1556        ConsensusAdapter::new(
1557            consensus_client,
1558            checkpoint_store,
1559            authority,
1560            connection_monitor_status,
1561            consensus_config.max_pending_transactions(),
1562            consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1563            consensus_config.max_submit_position,
1564            consensus_config.submit_delay_step_override(),
1565            ca_metrics,
1566            protocol_config,
1567        )
1568    }
1569
1570    async fn start_grpc_validator_service(
1571        config: &NodeConfig,
1572        state: Arc<AuthorityState>,
1573        consensus_adapter: Arc<ConsensusAdapter>,
1574        prometheus_registry: &Registry,
1575    ) -> Result<SpawnOnce> {
1576        let validator_service = ValidatorService::new(
1577            state.clone(),
1578            consensus_adapter,
1579            Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1580            config.policy_config.clone().map(|p| p.client_id_source),
1581        );
1582
1583        let mut server_conf = mysten_network::config::Config::new();
1584        server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1585        server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1586        server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1587        server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1588        server_conf.load_shed = config.grpc_load_shed;
1589        let mut server_builder =
1590            ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1591
1592        server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1593
1594        let tls_config = sui_tls::create_rustls_server_config(
1595            config.network_key_pair().copy().private(),
1596            SUI_TLS_SERVER_NAME.to_string(),
1597        );
1598
1599        let network_address = config.network_address().clone();
1600
1601        let (ready_tx, ready_rx) = oneshot::channel();
1602
1603        Ok(SpawnOnce::new(ready_rx, async move {
1604            let server = server_builder
1605                .bind(&network_address, Some(tls_config))
1606                .await
1607                .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1608            let local_addr = server.local_addr();
1609            info!("Listening to traffic on {local_addr}");
1610            ready_tx.send(()).unwrap();
1611            if let Err(err) = server.serve().await {
1612                info!("Server stopped: {err}");
1613            }
1614            info!("Server stopped");
1615        }))
1616    }
1617
1618    pub fn state(&self) -> Arc<AuthorityState> {
1619        self.state.clone()
1620    }
1621
1622    // Only used for testing because of how epoch store is loaded.
1623    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1624        self.state.reference_gas_price_for_testing()
1625    }
1626
1627    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1628        self.state.committee_store().clone()
1629    }
1630
1631    /*
1632    pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1633        self.state.db()
1634    }
1635    */
1636
1637    /// Clone an AuthorityAggregator currently used in this node, if the node is a fullnode.
1638    /// After reconfig, Transaction Driver builds a new AuthorityAggregator. The caller
1639    /// of this function will mostly likely want to call this again
1640    /// to get a fresh one.
1641    pub fn clone_authority_aggregator(
1642        &self,
1643    ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1644        self.transaction_orchestrator
1645            .as_ref()
1646            .map(|to| to.clone_authority_aggregator())
1647    }
1648
1649    pub fn transaction_orchestrator(
1650        &self,
1651    ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1652        self.transaction_orchestrator.clone()
1653    }
1654
1655    /// This function awaits the completion of checkpoint execution of the current epoch,
1656    /// after which it initiates reconfiguration of the entire system.
1657    pub async fn monitor_reconfiguration(
1658        self: Arc<Self>,
1659        mut epoch_store: Arc<AuthorityPerEpochStore>,
1660    ) -> Result<()> {
1661        let checkpoint_executor_metrics =
1662            CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1663
1664        loop {
1665            let mut hasher_guard = self.global_state_hasher.lock().await;
1666            let hasher = hasher_guard.take().unwrap();
1667            info!(
1668                "Creating checkpoint executor for epoch {}",
1669                epoch_store.epoch()
1670            );
1671            let checkpoint_executor = CheckpointExecutor::new(
1672                epoch_store.clone(),
1673                self.checkpoint_store.clone(),
1674                self.state.clone(),
1675                hasher.clone(),
1676                self.backpressure_manager.clone(),
1677                self.config.checkpoint_executor_config.clone(),
1678                checkpoint_executor_metrics.clone(),
1679                self.subscription_service_checkpoint_sender.clone(),
1680            );
1681
1682            let run_with_range = self.config.run_with_range;
1683
1684            let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1685
1686            // Update the current protocol version metric.
1687            self.metrics
1688                .current_protocol_version
1689                .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1690
1691            // Advertise capabilities to committee, if we are a validator.
1692            if let Some(components) = &*self.validator_components.lock().await {
1693                // TODO: without this sleep, the consensus message is not delivered reliably.
1694                tokio::time::sleep(Duration::from_millis(1)).await;
1695
1696                let config = cur_epoch_store.protocol_config();
1697                let mut supported_protocol_versions = self
1698                    .config
1699                    .supported_protocol_versions
1700                    .expect("Supported versions should be populated")
1701                    // no need to send digests of versions less than the current version
1702                    .truncate_below(config.version);
1703
1704                while supported_protocol_versions.max > config.version {
1705                    let proposed_protocol_config = ProtocolConfig::get_for_version(
1706                        supported_protocol_versions.max,
1707                        cur_epoch_store.get_chain(),
1708                    );
1709
1710                    if proposed_protocol_config.enable_accumulators()
1711                        && !epoch_store.accumulator_root_exists()
1712                    {
1713                        error!(
1714                            "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1715                            supported_protocol_versions.max
1716                        );
1717                        supported_protocol_versions.max = supported_protocol_versions.max.prev();
1718                    } else {
1719                        break;
1720                    }
1721                }
1722
1723                let binary_config = config.binary_config(None);
1724                let transaction = ConsensusTransaction::new_capability_notification_v2(
1725                    AuthorityCapabilitiesV2::new(
1726                        self.state.name,
1727                        cur_epoch_store.get_chain_identifier().chain(),
1728                        supported_protocol_versions,
1729                        self.state
1730                            .get_available_system_packages(&binary_config)
1731                            .await,
1732                    ),
1733                );
1734                info!(?transaction, "submitting capabilities to consensus");
1735                components.consensus_adapter.submit(
1736                    transaction,
1737                    None,
1738                    &cur_epoch_store,
1739                    None,
1740                    None,
1741                )?;
1742            }
1743
1744            let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1745
1746            if stop_condition == StopReason::RunWithRangeCondition {
1747                SuiNode::shutdown(&self).await;
1748                self.shutdown_channel_tx
1749                    .send(run_with_range)
1750                    .expect("RunWithRangeCondition met but failed to send shutdown message");
1751                return Ok(());
1752            }
1753
1754            // Safe to call because we are in the middle of reconfiguration.
1755            let latest_system_state = self
1756                .state
1757                .get_object_cache_reader()
1758                .get_sui_system_state_object_unsafe()
1759                .expect("Read Sui System State object cannot fail");
1760
1761            #[cfg(msim)]
1762            if !self
1763                .sim_state
1764                .sim_safe_mode_expected
1765                .load(Ordering::Relaxed)
1766            {
1767                debug_assert!(!latest_system_state.safe_mode());
1768            }
1769
1770            #[cfg(not(msim))]
1771            debug_assert!(!latest_system_state.safe_mode());
1772
1773            if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1774                && self.state.is_fullnode(&cur_epoch_store)
1775            {
1776                warn!(
1777                    "Failed to send end of epoch notification to subscriber: {:?}",
1778                    err
1779                );
1780            }
1781
1782            cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1783            let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1784
1785            self.auth_agg.store(Arc::new(
1786                self.auth_agg
1787                    .load()
1788                    .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1789            ));
1790
1791            let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1792            let next_epoch = next_epoch_committee.epoch();
1793            assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1794
1795            info!(
1796                next_epoch,
1797                "Finished executing all checkpoints in epoch. About to reconfigure the system."
1798            );
1799
1800            fail_point_async!("reconfig_delay");
1801
1802            // We save the connection monitor status map regardless of validator / fullnode status
1803            // so that we don't need to restart the connection monitor every epoch.
1804            // Update the mappings that will be used by the consensus adapter if it exists or is
1805            // about to be created.
1806            let authority_names_to_peer_ids =
1807                new_epoch_start_state.get_authority_names_to_peer_ids();
1808            self.connection_monitor_status
1809                .update_mapping_for_epoch(authority_names_to_peer_ids);
1810
1811            cur_epoch_store.record_epoch_reconfig_start_time_metric();
1812
1813            update_peer_addresses(&self.config, &self.endpoint_manager, &new_epoch_start_state);
1814
1815            let mut validator_components_lock_guard = self.validator_components.lock().await;
1816
1817            // The following code handles 4 different cases, depending on whether the node
1818            // was a validator in the previous epoch, and whether the node is a validator
1819            // in the new epoch.
1820            let new_epoch_store = self
1821                .reconfigure_state(
1822                    &self.state,
1823                    &cur_epoch_store,
1824                    next_epoch_committee.clone(),
1825                    new_epoch_start_state,
1826                    hasher.clone(),
1827                )
1828                .await;
1829
1830            let new_validator_components = if let Some(ValidatorComponents {
1831                validator_server_handle,
1832                validator_overload_monitor_handle,
1833                consensus_manager,
1834                consensus_store_pruner,
1835                consensus_adapter,
1836                checkpoint_metrics,
1837                sui_tx_validator_metrics,
1838            }) = validator_components_lock_guard.take()
1839            {
1840                info!("Reconfiguring the validator.");
1841
1842                consensus_manager.shutdown().await;
1843                info!("Consensus has shut down.");
1844
1845                info!("Epoch store finished reconfiguration.");
1846
1847                // No other components should be holding a strong reference to state hasher
1848                // at this point. Confirm here before we swap in the new hasher.
1849                let global_state_hasher_metrics = Arc::into_inner(hasher)
1850                    .expect("Object state hasher should have no other references at this point")
1851                    .metrics();
1852                let new_hasher = Arc::new(GlobalStateHasher::new(
1853                    self.state.get_global_state_hash_store().clone(),
1854                    global_state_hasher_metrics,
1855                ));
1856                let weak_hasher = Arc::downgrade(&new_hasher);
1857                *hasher_guard = Some(new_hasher);
1858
1859                consensus_store_pruner.prune(next_epoch).await;
1860
1861                if self.state.is_validator(&new_epoch_store) {
1862                    // Only restart consensus if this node is still a validator in the new epoch.
1863                    Some(
1864                        Self::start_epoch_specific_validator_components(
1865                            &self.config,
1866                            self.state.clone(),
1867                            consensus_adapter,
1868                            self.checkpoint_store.clone(),
1869                            new_epoch_store.clone(),
1870                            self.state_sync_handle.clone(),
1871                            self.randomness_handle.clone(),
1872                            consensus_manager,
1873                            consensus_store_pruner,
1874                            weak_hasher,
1875                            self.backpressure_manager.clone(),
1876                            validator_server_handle,
1877                            validator_overload_monitor_handle,
1878                            checkpoint_metrics,
1879                            self.metrics.clone(),
1880                            sui_tx_validator_metrics,
1881                        )
1882                        .await?,
1883                    )
1884                } else {
1885                    info!("This node is no longer a validator after reconfiguration");
1886                    None
1887                }
1888            } else {
1889                // No other components should be holding a strong reference to state hasher
1890                // at this point. Confirm here before we swap in the new hasher.
1891                let global_state_hasher_metrics = Arc::into_inner(hasher)
1892                    .expect("Object state hasher should have no other references at this point")
1893                    .metrics();
1894                let new_hasher = Arc::new(GlobalStateHasher::new(
1895                    self.state.get_global_state_hash_store().clone(),
1896                    global_state_hasher_metrics,
1897                ));
1898                let weak_hasher = Arc::downgrade(&new_hasher);
1899                *hasher_guard = Some(new_hasher);
1900
1901                if self.state.is_validator(&new_epoch_store) {
1902                    info!("Promoting the node from fullnode to validator, starting grpc server");
1903
1904                    let mut components = Self::construct_validator_components(
1905                        self.config.clone(),
1906                        self.state.clone(),
1907                        Arc::new(next_epoch_committee.clone()),
1908                        new_epoch_store.clone(),
1909                        self.checkpoint_store.clone(),
1910                        self.state_sync_handle.clone(),
1911                        self.randomness_handle.clone(),
1912                        weak_hasher,
1913                        self.backpressure_manager.clone(),
1914                        self.connection_monitor_status.clone(),
1915                        &self.registry_service,
1916                        self.metrics.clone(),
1917                        self.checkpoint_metrics.clone(),
1918                    )
1919                    .await?;
1920
1921                    components.validator_server_handle =
1922                        components.validator_server_handle.start().await;
1923
1924                    // Set the consensus address updater as the full node got promoted now to a validator.
1925                    self.endpoint_manager
1926                        .set_consensus_address_updater(components.consensus_manager.clone());
1927
1928                    Some(components)
1929                } else {
1930                    None
1931                }
1932            };
1933            *validator_components_lock_guard = new_validator_components;
1934
1935            // Force releasing current epoch store DB handle, because the
1936            // Arc<AuthorityPerEpochStore> may linger.
1937            cur_epoch_store.release_db_handles();
1938
1939            if cfg!(msim)
1940                && !matches!(
1941                    self.config
1942                        .authority_store_pruning_config
1943                        .num_epochs_to_retain_for_checkpoints(),
1944                    None | Some(u64::MAX) | Some(0)
1945                )
1946            {
1947                self.state
1948                    .prune_checkpoints_for_eligible_epochs_for_testing(
1949                        self.config.clone(),
1950                        sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1951                    )
1952                    .await?;
1953            }
1954
1955            epoch_store = new_epoch_store;
1956            info!("Reconfiguration finished");
1957        }
1958    }
1959
1960    async fn shutdown(&self) {
1961        if let Some(validator_components) = &*self.validator_components.lock().await {
1962            validator_components.consensus_manager.shutdown().await;
1963        }
1964    }
1965
1966    async fn reconfigure_state(
1967        &self,
1968        state: &Arc<AuthorityState>,
1969        cur_epoch_store: &AuthorityPerEpochStore,
1970        next_epoch_committee: Committee,
1971        next_epoch_start_system_state: EpochStartSystemState,
1972        global_state_hasher: Arc<GlobalStateHasher>,
1973    ) -> Arc<AuthorityPerEpochStore> {
1974        let next_epoch = next_epoch_committee.epoch();
1975
1976        let last_checkpoint = self
1977            .checkpoint_store
1978            .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1979            .expect("Error loading last checkpoint for current epoch")
1980            .expect("Could not load last checkpoint for current epoch");
1981
1982        let last_checkpoint_seq = *last_checkpoint.sequence_number();
1983
1984        assert_eq!(
1985            Some(last_checkpoint_seq),
1986            self.checkpoint_store
1987                .get_highest_executed_checkpoint_seq_number()
1988                .expect("Error loading highest executed checkpoint sequence number")
1989        );
1990
1991        let epoch_start_configuration = EpochStartConfiguration::new(
1992            next_epoch_start_system_state,
1993            *last_checkpoint.digest(),
1994            state.get_object_store().as_ref(),
1995            EpochFlag::default_flags_for_new_epoch(&state.config),
1996        )
1997        .expect("EpochStartConfiguration construction cannot fail");
1998
1999        let new_epoch_store = self
2000            .state
2001            .reconfigure(
2002                cur_epoch_store,
2003                self.config.supported_protocol_versions.unwrap(),
2004                next_epoch_committee,
2005                epoch_start_configuration,
2006                global_state_hasher,
2007                &self.config.expensive_safety_check_config,
2008                last_checkpoint_seq,
2009            )
2010            .await
2011            .expect("Reconfigure authority state cannot fail");
2012        info!(next_epoch, "Node State has been reconfigured");
2013        assert_eq!(next_epoch, new_epoch_store.epoch());
2014        self.state.get_reconfig_api().update_epoch_flags_metrics(
2015            cur_epoch_store.epoch_start_config().flags(),
2016            new_epoch_store.epoch_start_config().flags(),
2017        );
2018
2019        new_epoch_store
2020    }
2021
2022    pub fn get_config(&self) -> &NodeConfig {
2023        &self.config
2024    }
2025
2026    pub fn randomness_handle(&self) -> randomness::Handle {
2027        self.randomness_handle.clone()
2028    }
2029
2030    pub fn state_sync_handle(&self) -> state_sync::Handle {
2031        self.state_sync_handle.clone()
2032    }
2033
2034    pub fn endpoint_manager(&self) -> &EndpointManager {
2035        &self.endpoint_manager
2036    }
2037
2038    /// Get a short prefix of a digest for metric labels
2039    fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2040        let digest_str = digest.to_string();
2041        if digest_str.len() >= 8 {
2042            digest_str[0..8].to_string()
2043        } else {
2044            digest_str
2045        }
2046    }
2047
2048    /// Check for previously detected forks and handle them appropriately.
2049    /// For validators with fork recovery config, clear the fork if it matches the recovery config.
2050    /// For all other cases, block node startup if a fork is detected.
2051    async fn check_and_recover_forks(
2052        checkpoint_store: &CheckpointStore,
2053        checkpoint_metrics: &CheckpointMetrics,
2054        is_validator: bool,
2055        fork_recovery: Option<&ForkRecoveryConfig>,
2056    ) -> Result<()> {
2057        // Fork detection and recovery is only relevant for validators
2058        // Fullnodes should sync from validators and don't need fork checking
2059        if !is_validator {
2060            return Ok(());
2061        }
2062
2063        // Try to recover from forks if recovery config is provided
2064        if let Some(recovery) = fork_recovery {
2065            Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2066            Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2067        }
2068
2069        if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2070            .get_checkpoint_fork_detected()
2071            .map_err(|e| {
2072                error!("Failed to check for checkpoint fork: {:?}", e);
2073                e
2074            })?
2075        {
2076            Self::handle_checkpoint_fork(
2077                checkpoint_seq,
2078                checkpoint_digest,
2079                checkpoint_metrics,
2080                fork_recovery,
2081            )
2082            .await?;
2083        }
2084        if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2085            .get_transaction_fork_detected()
2086            .map_err(|e| {
2087                error!("Failed to check for transaction fork: {:?}", e);
2088                e
2089            })?
2090        {
2091            Self::handle_transaction_fork(
2092                tx_digest,
2093                expected_effects,
2094                actual_effects,
2095                checkpoint_metrics,
2096                fork_recovery,
2097            )
2098            .await?;
2099        }
2100
2101        Ok(())
2102    }
2103
2104    fn try_recover_checkpoint_fork(
2105        checkpoint_store: &CheckpointStore,
2106        recovery: &ForkRecoveryConfig,
2107    ) -> Result<()> {
2108        // If configured overrides include a checkpoint whose locally computed digest mismatches,
2109        // clear locally computed checkpoints from that sequence (inclusive).
2110        for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2111            let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2112                anyhow::bail!(
2113                    "Invalid checkpoint digest override for seq {}: {}",
2114                    seq,
2115                    expected_digest_str
2116                );
2117            };
2118
2119            if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2120                let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2121                if local_digest != expected_digest {
2122                    info!(
2123                        seq,
2124                        local = %Self::get_digest_prefix(local_digest),
2125                        expected = %Self::get_digest_prefix(expected_digest),
2126                        "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2127                        seq
2128                    );
2129                    checkpoint_store
2130                        .clear_locally_computed_checkpoints_from(*seq)
2131                        .context(
2132                            "Failed to clear locally computed checkpoints from override seq",
2133                        )?;
2134                }
2135            }
2136        }
2137
2138        if let Some((checkpoint_seq, checkpoint_digest)) =
2139            checkpoint_store.get_checkpoint_fork_detected()?
2140            && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2141        {
2142            info!(
2143                "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2144                checkpoint_seq, checkpoint_digest
2145            );
2146            checkpoint_store
2147                .clear_checkpoint_fork_detected()
2148                .expect("Failed to clear checkpoint fork detected marker");
2149        }
2150        Ok(())
2151    }
2152
2153    fn try_recover_transaction_fork(
2154        checkpoint_store: &CheckpointStore,
2155        recovery: &ForkRecoveryConfig,
2156    ) -> Result<()> {
2157        if recovery.transaction_overrides.is_empty() {
2158            return Ok(());
2159        }
2160
2161        if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2162            && recovery
2163                .transaction_overrides
2164                .contains_key(&tx_digest.to_string())
2165        {
2166            info!(
2167                "Fork recovery enabled: clearing transaction fork for tx {:?}",
2168                tx_digest
2169            );
2170            checkpoint_store
2171                .clear_transaction_fork_detected()
2172                .expect("Failed to clear transaction fork detected marker");
2173        }
2174        Ok(())
2175    }
2176
2177    fn get_current_timestamp() -> u64 {
2178        std::time::SystemTime::now()
2179            .duration_since(std::time::SystemTime::UNIX_EPOCH)
2180            .unwrap()
2181            .as_secs()
2182    }
2183
2184    async fn handle_checkpoint_fork(
2185        checkpoint_seq: u64,
2186        checkpoint_digest: CheckpointDigest,
2187        checkpoint_metrics: &CheckpointMetrics,
2188        fork_recovery: Option<&ForkRecoveryConfig>,
2189    ) -> Result<()> {
2190        checkpoint_metrics
2191            .checkpoint_fork_crash_mode
2192            .with_label_values(&[
2193                &checkpoint_seq.to_string(),
2194                &Self::get_digest_prefix(checkpoint_digest),
2195                &Self::get_current_timestamp().to_string(),
2196            ])
2197            .set(1);
2198
2199        let behavior = fork_recovery
2200            .map(|fr| fr.fork_crash_behavior)
2201            .unwrap_or_default();
2202
2203        match behavior {
2204            ForkCrashBehavior::AwaitForkRecovery => {
2205                error!(
2206                    checkpoint_seq = checkpoint_seq,
2207                    checkpoint_digest = ?checkpoint_digest,
2208                    "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2209                );
2210                futures::future::pending::<()>().await;
2211                unreachable!("pending() should never return");
2212            }
2213            ForkCrashBehavior::ReturnError => {
2214                error!(
2215                    checkpoint_seq = checkpoint_seq,
2216                    checkpoint_digest = ?checkpoint_digest,
2217                    "Checkpoint fork detected! Returning error."
2218                );
2219                Err(anyhow::anyhow!(
2220                    "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2221                    checkpoint_seq,
2222                    checkpoint_digest
2223                ))
2224            }
2225        }
2226    }
2227
2228    async fn handle_transaction_fork(
2229        tx_digest: TransactionDigest,
2230        expected_effects_digest: TransactionEffectsDigest,
2231        actual_effects_digest: TransactionEffectsDigest,
2232        checkpoint_metrics: &CheckpointMetrics,
2233        fork_recovery: Option<&ForkRecoveryConfig>,
2234    ) -> Result<()> {
2235        checkpoint_metrics
2236            .transaction_fork_crash_mode
2237            .with_label_values(&[
2238                &Self::get_digest_prefix(tx_digest),
2239                &Self::get_digest_prefix(expected_effects_digest),
2240                &Self::get_digest_prefix(actual_effects_digest),
2241                &Self::get_current_timestamp().to_string(),
2242            ])
2243            .set(1);
2244
2245        let behavior = fork_recovery
2246            .map(|fr| fr.fork_crash_behavior)
2247            .unwrap_or_default();
2248
2249        match behavior {
2250            ForkCrashBehavior::AwaitForkRecovery => {
2251                error!(
2252                    tx_digest = ?tx_digest,
2253                    expected_effects_digest = ?expected_effects_digest,
2254                    actual_effects_digest = ?actual_effects_digest,
2255                    "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2256                );
2257                futures::future::pending::<()>().await;
2258                unreachable!("pending() should never return");
2259            }
2260            ForkCrashBehavior::ReturnError => {
2261                error!(
2262                    tx_digest = ?tx_digest,
2263                    expected_effects_digest = ?expected_effects_digest,
2264                    actual_effects_digest = ?actual_effects_digest,
2265                    "Transaction fork detected! Returning error."
2266                );
2267                Err(anyhow::anyhow!(
2268                    "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2269                    tx_digest,
2270                    expected_effects_digest,
2271                    actual_effects_digest
2272                ))
2273            }
2274        }
2275    }
2276}
2277
2278#[cfg(not(msim))]
2279impl SuiNode {
2280    async fn fetch_jwks(
2281        _authority: AuthorityName,
2282        provider: &OIDCProvider,
2283    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2284        use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2285        use sui_types::error::SuiErrorKind;
2286        let client = reqwest::Client::new();
2287        fetch_jwks(provider, &client, true)
2288            .await
2289            .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2290    }
2291}
2292
2293#[cfg(msim)]
2294impl SuiNode {
2295    pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2296        self.sim_state.sim_node.id()
2297    }
2298
2299    pub fn set_safe_mode_expected(&self, new_value: bool) {
2300        info!("Setting safe mode expected to {}", new_value);
2301        self.sim_state
2302            .sim_safe_mode_expected
2303            .store(new_value, Ordering::Relaxed);
2304    }
2305
2306    #[allow(unused_variables)]
2307    async fn fetch_jwks(
2308        authority: AuthorityName,
2309        provider: &OIDCProvider,
2310    ) -> SuiResult<Vec<(JwkId, JWK)>> {
2311        get_jwk_injector()(authority, provider)
2312    }
2313}
2314
2315enum SpawnOnce {
2316    // Mutex is only needed to make SpawnOnce Send
2317    Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2318    #[allow(unused)]
2319    Started(JoinHandle<()>),
2320}
2321
2322impl SpawnOnce {
2323    pub fn new(
2324        ready_rx: oneshot::Receiver<()>,
2325        future: impl Future<Output = ()> + Send + 'static,
2326    ) -> Self {
2327        Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2328    }
2329
2330    pub async fn start(self) -> Self {
2331        match self {
2332            Self::Unstarted(ready_rx, future) => {
2333                let future = future.into_inner();
2334                let handle = tokio::spawn(future);
2335                ready_rx.await.unwrap();
2336                Self::Started(handle)
2337            }
2338            Self::Started(_) => self,
2339        }
2340    }
2341}
2342
2343/// Updates trusted peer addresses in the p2p network.
2344fn update_peer_addresses(
2345    config: &NodeConfig,
2346    endpoint_manager: &EndpointManager,
2347    epoch_start_state: &EpochStartSystemState,
2348) {
2349    for (peer_id, address) in
2350        epoch_start_state.get_validator_as_p2p_peers(config.protocol_public_key())
2351    {
2352        endpoint_manager
2353            .update_endpoint(
2354                EndpointId::P2p(peer_id),
2355                AddressSource::Chain,
2356                vec![address],
2357            )
2358            .expect("Updating peer addresses should not fail");
2359    }
2360}
2361
2362fn build_kv_store(
2363    state: &Arc<AuthorityState>,
2364    config: &NodeConfig,
2365    registry: &Registry,
2366) -> Result<Arc<TransactionKeyValueStore>> {
2367    let metrics = KeyValueStoreMetrics::new(registry);
2368    let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2369
2370    let base_url = &config.transaction_kv_store_read_config.base_url;
2371
2372    if base_url.is_empty() {
2373        info!("no http kv store url provided, using local db only");
2374        return Ok(Arc::new(db_store));
2375    }
2376
2377    let base_url: url::Url = base_url.parse().tap_err(|e| {
2378        error!(
2379            "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2380            base_url, e
2381        )
2382    })?;
2383
2384    let network_str = match state.get_chain_identifier().chain() {
2385        Chain::Mainnet => "/mainnet",
2386        _ => {
2387            info!("using local db only for kv store");
2388            return Ok(Arc::new(db_store));
2389        }
2390    };
2391
2392    let base_url = base_url.join(network_str)?.to_string();
2393    let http_store = HttpKVStore::new_kv(
2394        &base_url,
2395        config.transaction_kv_store_read_config.cache_size,
2396        metrics.clone(),
2397    )?;
2398    info!("using local key-value store with fallback to http key-value store");
2399    Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2400        db_store,
2401        http_store,
2402        metrics,
2403        "json_rpc_fallback",
2404    )))
2405}
2406
2407async fn build_http_servers(
2408    state: Arc<AuthorityState>,
2409    store: RocksDbStore,
2410    transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2411    config: &NodeConfig,
2412    prometheus_registry: &Registry,
2413    server_version: ServerVersion,
2414) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2415    // Validators do not expose these APIs
2416    if config.consensus_config().is_some() {
2417        return Ok((HttpServers::default(), None));
2418    }
2419
2420    info!("starting rpc service with config: {:?}", config.rpc);
2421
2422    let mut router = axum::Router::new();
2423
2424    let json_rpc_router = {
2425        let traffic_controller = state.traffic_controller.clone();
2426        let mut server = JsonRpcServerBuilder::new(
2427            env!("CARGO_PKG_VERSION"),
2428            prometheus_registry,
2429            traffic_controller,
2430            config.policy_config.clone(),
2431        );
2432
2433        let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2434
2435        let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2436        server.register_module(ReadApi::new(
2437            state.clone(),
2438            kv_store.clone(),
2439            metrics.clone(),
2440        ))?;
2441        server.register_module(CoinReadApi::new(
2442            state.clone(),
2443            kv_store.clone(),
2444            metrics.clone(),
2445        ))?;
2446
2447        // if run_with_range is enabled we want to prevent any transactions
2448        // run_with_range = None is normal operating conditions
2449        if config.run_with_range.is_none() {
2450            server.register_module(TransactionBuilderApi::new(state.clone()))?;
2451        }
2452        server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2453        server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2454
2455        if let Some(transaction_orchestrator) = transaction_orchestrator {
2456            server.register_module(TransactionExecutionApi::new(
2457                state.clone(),
2458                transaction_orchestrator.clone(),
2459                metrics.clone(),
2460            ))?;
2461        }
2462
2463        let name_service_config =
2464            if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2465                config.name_service_package_address,
2466                config.name_service_registry_id,
2467                config.name_service_reverse_registry_id,
2468            ) {
2469                sui_name_service::NameServiceConfig::new(
2470                    package_address,
2471                    registry_id,
2472                    reverse_registry_id,
2473                )
2474            } else {
2475                match state.get_chain_identifier().chain() {
2476                    Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2477                    Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2478                    Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2479                }
2480            };
2481
2482        server.register_module(IndexerApi::new(
2483            state.clone(),
2484            ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2485            kv_store,
2486            name_service_config,
2487            metrics,
2488            config.indexer_max_subscriptions,
2489        ))?;
2490        server.register_module(MoveUtils::new(state.clone()))?;
2491
2492        let server_type = config.jsonrpc_server_type();
2493
2494        server.to_router(server_type).await?
2495    };
2496
2497    router = router.merge(json_rpc_router);
2498
2499    let (subscription_service_checkpoint_sender, subscription_service_handle) =
2500        SubscriptionService::build(prometheus_registry);
2501    let rpc_router = {
2502        let mut rpc_service =
2503            sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2504        rpc_service.with_server_version(server_version);
2505
2506        if let Some(config) = config.rpc.clone() {
2507            rpc_service.with_config(config);
2508        }
2509
2510        rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2511        rpc_service.with_subscription_service(subscription_service_handle);
2512
2513        if let Some(transaction_orchestrator) = transaction_orchestrator {
2514            rpc_service.with_executor(transaction_orchestrator.clone())
2515        }
2516
2517        rpc_service.into_router().await
2518    };
2519
2520    let layers = ServiceBuilder::new()
2521        .map_request(|mut request: axum::http::Request<_>| {
2522            if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2523                let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2524                request.extensions_mut().insert(axum_connect_info);
2525            }
2526            request
2527        })
2528        .layer(axum::middleware::from_fn(server_timing_middleware))
2529        // Setup a permissive CORS policy
2530        .layer(
2531            tower_http::cors::CorsLayer::new()
2532                .allow_methods([http::Method::GET, http::Method::POST])
2533                .allow_origin(tower_http::cors::Any)
2534                .allow_headers(tower_http::cors::Any)
2535                .expose_headers(tower_http::cors::Any),
2536        );
2537
2538    router = router.merge(rpc_router).layer(layers);
2539
2540    let https = if let Some((tls_config, https_address)) = config
2541        .rpc()
2542        .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2543    {
2544        let https = sui_http::Builder::new()
2545            .tls_single_cert(tls_config.cert(), tls_config.key())
2546            .and_then(|builder| builder.serve(https_address, router.clone()))
2547            .map_err(|e| anyhow::anyhow!(e))?;
2548
2549        info!(
2550            https_address =? https.local_addr(),
2551            "HTTPS rpc server listening on {}",
2552            https.local_addr()
2553        );
2554
2555        Some(https)
2556    } else {
2557        None
2558    };
2559
2560    let http = sui_http::Builder::new()
2561        .serve(&config.json_rpc_address, router)
2562        .map_err(|e| anyhow::anyhow!(e))?;
2563
2564    info!(
2565        http_address =? http.local_addr(),
2566        "HTTP rpc server listening on {}",
2567        http.local_addr()
2568    );
2569
2570    Ok((
2571        HttpServers {
2572            http: Some(http),
2573            https,
2574        },
2575        Some(subscription_service_checkpoint_sender),
2576    ))
2577}
2578
2579#[cfg(not(test))]
2580fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2581    protocol_config.max_transactions_per_checkpoint() as usize
2582}
2583
2584#[cfg(test)]
2585fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2586    2
2587}
2588
2589#[derive(Default)]
2590struct HttpServers {
2591    #[allow(unused)]
2592    http: Option<sui_http::ServerHandle>,
2593    #[allow(unused)]
2594    https: Option<sui_http::ServerHandle>,
2595}
2596
2597#[cfg(test)]
2598mod tests {
2599    use super::*;
2600    use prometheus::Registry;
2601    use std::collections::BTreeMap;
2602    use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2603    use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2604    use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2605
2606    #[tokio::test]
2607    async fn test_fork_error_and_recovery_both_paths() {
2608        let checkpoint_store = CheckpointStore::new_for_tests();
2609        let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2610
2611        // ---------- Checkpoint fork path ----------
2612        let seq_num = 42;
2613        let digest = CheckpointDigest::random();
2614        checkpoint_store
2615            .record_checkpoint_fork_detected(seq_num, digest)
2616            .unwrap();
2617
2618        let fork_recovery = ForkRecoveryConfig {
2619            transaction_overrides: Default::default(),
2620            checkpoint_overrides: Default::default(),
2621            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2622        };
2623
2624        let r = SuiNode::check_and_recover_forks(
2625            &checkpoint_store,
2626            &checkpoint_metrics,
2627            true,
2628            Some(&fork_recovery),
2629        )
2630        .await;
2631        assert!(r.is_err());
2632        assert!(
2633            r.unwrap_err()
2634                .to_string()
2635                .contains("Checkpoint fork detected")
2636        );
2637
2638        let mut checkpoint_overrides = BTreeMap::new();
2639        checkpoint_overrides.insert(seq_num, digest.to_string());
2640        let fork_recovery_with_override = ForkRecoveryConfig {
2641            transaction_overrides: Default::default(),
2642            checkpoint_overrides,
2643            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2644        };
2645        let r = SuiNode::check_and_recover_forks(
2646            &checkpoint_store,
2647            &checkpoint_metrics,
2648            true,
2649            Some(&fork_recovery_with_override),
2650        )
2651        .await;
2652        assert!(r.is_ok());
2653        assert!(
2654            checkpoint_store
2655                .get_checkpoint_fork_detected()
2656                .unwrap()
2657                .is_none()
2658        );
2659
2660        // ---------- Transaction fork path ----------
2661        let tx_digest = TransactionDigest::random();
2662        let expected_effects = TransactionEffectsDigest::random();
2663        let actual_effects = TransactionEffectsDigest::random();
2664        checkpoint_store
2665            .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2666            .unwrap();
2667
2668        let fork_recovery = ForkRecoveryConfig {
2669            transaction_overrides: Default::default(),
2670            checkpoint_overrides: Default::default(),
2671            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2672        };
2673        let r = SuiNode::check_and_recover_forks(
2674            &checkpoint_store,
2675            &checkpoint_metrics,
2676            true,
2677            Some(&fork_recovery),
2678        )
2679        .await;
2680        assert!(r.is_err());
2681        assert!(
2682            r.unwrap_err()
2683                .to_string()
2684                .contains("Transaction fork detected")
2685        );
2686
2687        let mut transaction_overrides = BTreeMap::new();
2688        transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2689        let fork_recovery_with_override = ForkRecoveryConfig {
2690            transaction_overrides,
2691            checkpoint_overrides: Default::default(),
2692            fork_crash_behavior: ForkCrashBehavior::ReturnError,
2693        };
2694        let r = SuiNode::check_and_recover_forks(
2695            &checkpoint_store,
2696            &checkpoint_metrics,
2697            true,
2698            Some(&fork_recovery_with_override),
2699        )
2700        .await;
2701        assert!(r.is_ok());
2702        assert!(
2703            checkpoint_store
2704                .get_transaction_fork_detected()
2705                .unwrap()
2706                .is_none()
2707        );
2708    }
2709}