1use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::admission_queue::{
29 AdmissionQueueContext, AdmissionQueueManager, AdmissionQueueMetrics,
30};
31use sui_core::authority::ExecutionEnv;
32use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
33use sui_core::authority::backpressure::BackpressureManager;
34use sui_core::authority::epoch_start_configuration::EpochFlag;
35use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
36use sui_core::consensus_adapter::ConsensusClient;
37use sui_core::consensus_manager::UpdatableConsensusClient;
38use sui_core::epoch::randomness::RandomnessManager;
39use sui_core::execution_cache::build_execution_cache;
40use sui_core::randomness_round_receiver::{RandomnessRoundReceiver, RandomnessRoundReceiverHandle};
41use sui_network::endpoint_manager::{AddressSource, EndpointId};
42use sui_network::validator::server::SUI_TLS_SERVER_NAME;
43use sui_types::full_checkpoint_content::Checkpoint;
44use sui_types::node_role::NodeRole;
45
46use sui_core::global_state_hasher::GlobalStateHashMetrics;
47use sui_core::storage::RestReadStore;
48use sui_json_rpc::bridge_api::BridgeReadApi;
49use sui_json_rpc_api::JsonRpcMetrics;
50use sui_network::randomness;
51use sui_rpc_api::RpcMetrics;
52use sui_rpc_api::ServerVersion;
53use sui_rpc_api::subscription::SubscriptionService;
54use sui_types::base_types::ConciseableName;
55use sui_types::crypto::RandomnessRound;
56use sui_types::digests::{
57 ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
58};
59use sui_types::messages_consensus::AuthorityCapabilitiesV2;
60use sui_types::sui_system_state::SuiSystemState;
61use tap::tap::TapFallible;
62use tokio::sync::oneshot;
63use tokio::sync::{Mutex, broadcast, mpsc};
64use tokio::task::JoinHandle;
65use tower::ServiceBuilder;
66use tracing::{Instrument, error_span, info};
67use tracing::{debug, error, warn};
68
69macro_rules! jwk_log {
73 ($($arg:tt)+) => {
74 if in_test_configuration() {
75 debug!($($arg)+);
76 } else {
77 info!($($arg)+);
78 }
79 };
80}
81
82use fastcrypto_zkp::bn254::zk_login::JWK;
83pub use handle::SuiNodeHandle;
84use mysten_metrics::{RegistryService, spawn_monitored_task};
85use mysten_service::server_timing::server_timing_middleware;
86use sui_config::node::{DBCheckpointConfig, RunWithRange};
87use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
88use sui_config::node_config_metrics::NodeConfigMetrics;
89use sui_config::{ConsensusConfig, NodeConfig};
90use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
91use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
92use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
93use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
94use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
95use sui_core::authority_aggregator::AuthorityAggregator;
96use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
97use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
98use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
99use sui_core::checkpoints::{
100 CheckpointMetrics, CheckpointOutput, CheckpointService, CheckpointStore, LogCheckpointOutput,
101 SendCheckpointToStateSync, SubmitCheckpointToConsensus,
102};
103use sui_core::consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics};
104use sui_core::consensus_manager::ConsensusManager;
105use sui_core::consensus_throughput_calculator::ConsensusThroughputCalculator;
106use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
107use sui_core::db_checkpoint_handler::DBCheckpointHandler;
108use sui_core::epoch::committee_store::CommitteeStore;
109use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
110use sui_core::epoch::epoch_metrics::EpochMetrics;
111use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
112use sui_core::global_state_hasher::GlobalStateHasher;
113use sui_core::jsonrpc_index::IndexStore;
114use sui_core::module_cache_metrics::ResolverMetrics;
115use sui_core::overload_monitor::overload_monitor;
116use sui_core::rpc_index::RpcIndexStore;
117use sui_core::rpc_store_embed::EmbeddedRpcStore;
118use sui_core::signature_verifier::SignatureVerifierMetrics;
119use sui_core::storage::RocksDbStore;
120use sui_core::storage::RpcStoreReadStore;
121use sui_core::transaction_orchestrator::TransactionOrchestrator;
122use sui_core::{
123 authority::{AuthorityState, AuthorityStore},
124 authority_client::NetworkAuthorityClient,
125};
126use sui_json_rpc::JsonRpcServerBuilder;
127use sui_json_rpc::coin_api::CoinReadApi;
128use sui_json_rpc::governance_api::GovernanceReadApi;
129use sui_json_rpc::indexer_api::IndexerApi;
130use sui_json_rpc::move_utils::MoveUtils;
131use sui_json_rpc::read_api::ReadApi;
132use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
133use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
134use sui_macros::fail_point;
135use sui_macros::{fail_point_async, replay_log};
136use sui_network::api::ValidatorServer;
137use sui_network::discovery;
138use sui_network::endpoint_manager::EndpointManager;
139use sui_network::state_sync;
140use sui_network::validator::server::ServerBuilder;
141use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
142use sui_snapshot::uploader::StateSnapshotUploader;
143use sui_storage::{
144 http_key_value_store::HttpKVStore,
145 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
146 key_value_store_metrics::KeyValueStoreMetrics,
147};
148use sui_types::base_types::{AuthorityName, EpochId};
149use sui_types::committee::Committee;
150use sui_types::crypto::KeypairTraits;
151use sui_types::error::{SuiError, SuiResult};
152use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
153use sui_types::storage::RpcStateReader;
154use sui_types::sui_system_state::SuiSystemStateTrait;
155use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
156use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
157use sui_types::supported_protocol_versions::SupportedProtocolVersions;
158use typed_store::DBMetrics;
159use typed_store::rocks::default_db_options;
160
161use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
162
163pub mod admin;
164pub mod db_shell;
165mod handle;
166pub mod metrics;
167
168pub struct ValidatorComponents {
169 validator_server_handle: Option<SpawnOnce>,
170 validator_overload_monitor_handle: Option<JoinHandle<()>>,
171 consensus_manager: Arc<ConsensusManager>,
172 consensus_store_pruner: ConsensusStorePruner,
173 consensus_adapter: Arc<ConsensusAdapter>,
174 checkpoint_metrics: Arc<CheckpointMetrics>,
175 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
176 admission_queue: Option<AdmissionQueueContext>,
177}
178pub struct P2pComponents {
179 p2p_network: Network,
180 known_peers: HashMap<PeerId, String>,
181 discovery_handle: discovery::Handle,
182 state_sync_handle: state_sync::Handle,
183 randomness_handle: randomness::Handle,
184 endpoint_manager: EndpointManager,
185}
186
187#[cfg(msim)]
188mod simulator {
189 use std::sync::atomic::AtomicBool;
190 use sui_types::error::SuiErrorKind;
191
192 use super::*;
193 pub(super) struct SimState {
194 pub sim_node: sui_simulator::runtime::NodeHandle,
195 pub sim_safe_mode_expected: AtomicBool,
196 _leak_detector: sui_simulator::NodeLeakDetector,
197 }
198
199 impl Default for SimState {
200 fn default() -> Self {
201 Self {
202 sim_node: sui_simulator::runtime::NodeHandle::current(),
203 sim_safe_mode_expected: AtomicBool::new(false),
204 _leak_detector: sui_simulator::NodeLeakDetector::new(),
205 }
206 }
207 }
208
209 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
210 + Send
211 + Sync
212 + 'static;
213
214 fn default_fetch_jwks(
215 _authority: AuthorityName,
216 _provider: &OIDCProvider,
217 ) -> SuiResult<Vec<(JwkId, JWK)>> {
218 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
219 parse_jwks(
221 sui_types::zk_login_util::DEFAULT_JWK_BYTES,
222 &OIDCProvider::Twitch,
223 true,
224 )
225 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
226 }
227
228 thread_local! {
229 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
230 }
231
232 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
233 JWK_INJECTOR.with(|injector| injector.borrow().clone())
234 }
235
236 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
237 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
238 }
239}
240
241#[cfg(msim)]
242pub use simulator::set_jwk_injector;
243#[cfg(msim)]
244use simulator::*;
245use sui_core::authority::authority_store_pruner::PrunerWatermarks;
246use sui_core::{
247 consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
248};
249
250const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
251
252pub struct SuiNode {
253 config: NodeConfig,
254 validator_components: Mutex<Option<ValidatorComponents>>,
255
256 #[allow(unused)]
258 http_servers: HttpServers,
259
260 state: Arc<AuthorityState>,
261 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
262 registry_service: RegistryService,
263 metrics: Arc<SuiNodeMetrics>,
264 checkpoint_metrics: Arc<CheckpointMetrics>,
265
266 _discovery: discovery::Handle,
267 _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
268 state_sync_handle: state_sync::Handle,
269 randomness_handle: randomness::Handle,
270 checkpoint_store: Arc<CheckpointStore>,
271 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
272
273 end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
275
276 endpoint_manager: EndpointManager,
278
279 backpressure_manager: Arc<BackpressureManager>,
280
281 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
282
283 #[cfg(msim)]
284 sim_state: SimState,
285
286 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
287 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
289
290 randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
292
293 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
298
299 subscription_service_checkpoint_sender: Option<tokio::sync::broadcast::Sender<Arc<Checkpoint>>>,
300
301 embedded_rpc_store: Option<EmbeddedRpcStore>,
306}
307
308impl fmt::Debug for SuiNode {
309 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
310 f.debug_struct("SuiNode")
311 .field("name", &self.state.name.concise())
312 .finish()
313 }
314}
315
316static MAX_JWK_KEYS_PER_FETCH: usize = 100;
317
318impl SuiNode {
319 pub async fn start(
320 config: NodeConfig,
321 registry_service: RegistryService,
322 ) -> Result<Arc<SuiNode>> {
323 Self::start_async(
324 config,
325 registry_service,
326 ServerVersion::new("sui-node", "unknown"),
327 )
328 .await
329 }
330
331 fn start_jwk_updater(
332 config: &NodeConfig,
333 metrics: Arc<SuiNodeMetrics>,
334 authority: AuthorityName,
335 epoch_store: Arc<AuthorityPerEpochStore>,
336 consensus_adapter: Arc<ConsensusAdapter>,
337 ) {
338 let epoch = epoch_store.epoch();
339
340 let supported_providers = config
341 .zklogin_oauth_providers
342 .get(&epoch_store.get_chain_identifier().chain())
343 .unwrap_or(&BTreeSet::new())
344 .iter()
345 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
346 .collect::<Vec<_>>();
347
348 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
349
350 info!(
351 ?fetch_interval,
352 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
353 );
354
355 fn validate_jwk(
356 metrics: &Arc<SuiNodeMetrics>,
357 provider: &OIDCProvider,
358 id: &JwkId,
359 jwk: &JWK,
360 ) -> bool {
361 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
362 warn!(
363 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
364 id.iss, provider
365 );
366 metrics
367 .invalid_jwks
368 .with_label_values(&[&provider.to_string()])
369 .inc();
370 return false;
371 };
372
373 if iss_provider != *provider {
374 warn!(
375 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
376 id.iss, provider, iss_provider
377 );
378 metrics
379 .invalid_jwks
380 .with_label_values(&[&provider.to_string()])
381 .inc();
382 return false;
383 }
384
385 if !check_total_jwk_size(id, jwk) {
386 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
387 metrics
388 .invalid_jwks
389 .with_label_values(&[&provider.to_string()])
390 .inc();
391 return false;
392 }
393
394 true
395 }
396
397 for p in supported_providers.into_iter() {
406 let provider_str = p.to_string();
407 let epoch_store = epoch_store.clone();
408 let consensus_adapter = consensus_adapter.clone();
409 let metrics = metrics.clone();
410 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
411 async move {
412 let mut seen = HashSet::new();
415 loop {
416 jwk_log!("fetching JWK for provider {:?}", p);
417 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
418 match Self::fetch_jwks(authority, &p).await {
419 Err(e) => {
420 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
421 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
422 tokio::time::sleep(Duration::from_secs(30)).await;
424 continue;
425 }
426 Ok(mut keys) => {
427 metrics.total_jwks
428 .with_label_values(&[&provider_str])
429 .inc_by(keys.len() as u64);
430
431 keys.retain(|(id, jwk)| {
432 validate_jwk(&metrics, &p, id, jwk) &&
433 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
434 seen.insert((id.clone(), jwk.clone()))
435 });
436
437 metrics.unique_jwks
438 .with_label_values(&[&provider_str])
439 .inc_by(keys.len() as u64);
440
441 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
444 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
445 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
446 }
447
448 for (id, jwk) in keys.into_iter() {
449 jwk_log!("Submitting JWK to consensus: {:?}", id);
450
451 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
452 consensus_adapter.submit(txn, None, &epoch_store, None, None)
453 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
454 .ok();
455 }
456 }
457 }
458 tokio::time::sleep(fetch_interval).await;
459 }
460 }
461 .instrument(error_span!("jwk_updater_task", epoch)),
462 ));
463 }
464 }
465
466 pub async fn start_async(
467 config: NodeConfig,
468 registry_service: RegistryService,
469 server_version: ServerVersion,
470 ) -> Result<Arc<SuiNode>> {
471 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
472 let mut config = config.clone();
473 if config.supported_protocol_versions.is_none() {
474 info!(
475 "populating config.supported_protocol_versions with default {:?}",
476 SupportedProtocolVersions::SYSTEM_DEFAULT
477 );
478 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
479 }
480
481 let run_with_range = config.run_with_range;
482 let prometheus_registry = registry_service.default_registry();
483 let node_role = config.intended_node_role();
484
485 info!(node =? config.protocol_public_key(),
486 "Initializing sui-node listening on {} with role {:?}", config.network_address, node_role
487 );
488
489 DBMetrics::init(registry_service.clone());
491
492 typed_store::init_write_sync(config.enable_db_sync_to_disk);
494
495 mysten_metrics::init_metrics(&prometheus_registry);
497 #[cfg(not(msim))]
499 mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
500
501 let genesis = config.genesis()?.clone();
502
503 let secret = Arc::pin(config.protocol_key_pair().copy());
504 let genesis_committee = genesis.committee();
505 let committee_store = Arc::new(CommitteeStore::new(
506 config.db_path().join("epochs"),
507 &genesis_committee,
508 None,
509 ));
510
511 let pruner_watermarks = Arc::new(PrunerWatermarks::default());
512 let checkpoint_store = CheckpointStore::new(
513 &config.db_path().join("checkpoints"),
514 pruner_watermarks.clone(),
515 );
516 let checkpoint_metrics = CheckpointMetrics::new(®istry_service.default_registry());
517
518 if node_role.runs_consensus() {
519 Self::check_and_recover_forks(
520 &checkpoint_store,
521 &checkpoint_metrics,
522 config.fork_recovery.as_ref(),
523 )
524 .await?;
525 }
526
527 let enable_write_stall = config
529 .enable_db_write_stall
530 .unwrap_or(node_role.runs_consensus());
531 let enable_objects_compactor = node_role.is_validator()
538 || config.authority_store_pruning_config.num_epochs_to_retain == 0;
539 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
540 enable_write_stall,
541 enable_objects_compactor,
542 };
543 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
544 &config.db_store_path(),
545 Some(perpetual_tables_options),
546 Some(pruner_watermarks.epoch_id.clone()),
547 ));
548 let is_genesis = perpetual_tables
549 .database_is_empty()
550 .expect("Database read should not fail at init.");
551
552 let backpressure_manager =
553 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
554
555 let store =
556 AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
557
558 let cur_epoch = store.get_recovery_epoch_at_restart()?;
559 let committee = committee_store
560 .get_committee(&cur_epoch)?
561 .expect("Committee of the current epoch must exist");
562 let epoch_start_configuration = store
563 .get_epoch_start_configuration()?
564 .expect("EpochStartConfiguration of the current epoch must exist");
565 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
566 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
567
568 let cache_traits = build_execution_cache(
569 &config.execution_cache,
570 &prometheus_registry,
571 &store,
572 backpressure_manager.clone(),
573 );
574
575 let auth_agg = {
576 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
577 Arc::new(ArcSwap::new(Arc::new(
578 AuthorityAggregator::new_from_epoch_start_state(
579 epoch_start_configuration.epoch_start_state(),
580 &committee_store,
581 safe_client_metrics_base,
582 ),
583 )))
584 };
585
586 let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
587 let chain = match config.chain_override_for_testing {
588 Some(chain) => chain,
589 None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
590 };
591
592 let highest_executed_checkpoint = checkpoint_store
593 .get_highest_executed_checkpoint_seq_number()
594 .expect("checkpoint store read cannot fail")
595 .unwrap_or(0);
596
597 let previous_epoch_last_checkpoint = if cur_epoch == 0 {
598 0
599 } else {
600 checkpoint_store
601 .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
602 .expect("checkpoint store read cannot fail")
603 .unwrap_or(highest_executed_checkpoint)
604 };
605
606 let epoch_options = default_db_options().optimize_db_for_write_throughput(4, false);
607 let epoch_store = AuthorityPerEpochStore::new(
608 config.protocol_public_key(),
609 committee.clone(),
610 &config.db_store_path(),
611 Some(epoch_options.options),
612 EpochMetrics::new(®istry_service.default_registry()),
613 epoch_start_configuration,
614 cache_traits.backing_package_store.clone(),
615 cache_traits.object_store.clone(),
616 cache_metrics,
617 signature_verifier_metrics,
618 &config.expensive_safety_check_config,
619 (chain_id, chain),
620 highest_executed_checkpoint,
621 previous_epoch_last_checkpoint,
622 Arc::new(SubmittedTransactionCacheMetrics::new(
623 ®istry_service.default_registry(),
624 )),
625 config.fullnode_sync_mode,
626 )?;
627
628 info!("created epoch store");
629
630 replay_log!(
631 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
632 epoch_store.epoch(),
633 epoch_store.protocol_config()
634 );
635
636 if is_genesis {
638 info!("checking SUI conservation at genesis");
639 cache_traits
644 .reconfig_api
645 .expensive_check_sui_conservation(&epoch_store)
646 .expect("SUI conservation check cannot fail at genesis");
647 }
648
649 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
650 let default_buffer_stake = epoch_store
651 .protocol_config()
652 .buffer_stake_for_protocol_upgrade_bps();
653 if effective_buffer_stake != default_buffer_stake {
654 warn!(
655 ?effective_buffer_stake,
656 ?default_buffer_stake,
657 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
658 );
659 }
660
661 checkpoint_store.insert_genesis_checkpoint(
662 genesis.checkpoint(),
663 genesis.checkpoint_contents().clone(),
664 &epoch_store,
665 );
666
667 info!("creating state sync store");
668 let state_sync_store = RocksDbStore::new(
669 cache_traits.clone(),
670 committee_store.clone(),
671 checkpoint_store.clone(),
672 );
673
674 let index_store =
675 if node_role.should_enable_index_processing() && config.enable_index_processing {
676 info!("creating jsonrpc index store");
677 Some(Arc::new(IndexStore::new(
678 config.db_path().join("indexes"),
679 &prometheus_registry,
680 epoch_store
681 .protocol_config()
682 .max_move_identifier_len_as_option(),
683 config.remove_deprecated_tables,
684 )))
685 } else {
686 None
687 };
688
689 let chain_identifier = epoch_store.get_chain_identifier();
690
691 let (rpc_index, mut embedded_rpc_store) = if node_role.should_enable_index_processing()
696 && config.rpc().is_some_and(|rpc| rpc.enable_indexing())
697 {
698 if config
699 .rpc()
700 .is_some_and(|rpc| rpc.use_experimental_rpc_store())
701 {
702 info!("creating embedded rpc-store");
703 let ingestion_source = RocksDbStore::new(
706 cache_traits.clone(),
707 committee_store.clone(),
708 checkpoint_store.clone(),
709 );
710 let embedded_rpc_store = EmbeddedRpcStore::bootstrap(
711 &config,
712 &store,
713 &checkpoint_store,
714 ingestion_source,
715 chain_identifier,
716 &prometheus_registry,
717 )
718 .await?;
719 (None, Some(embedded_rpc_store))
720 } else {
721 info!("creating rpc index store");
722 let rpc_index = Arc::new(
723 RpcIndexStore::new(
724 &config.db_path(),
725 &store,
726 &checkpoint_store,
727 &epoch_store,
728 &cache_traits.backing_package_store,
729 config.rpc().cloned().unwrap_or_default(),
730 )
731 .await,
732 );
733 (Some(rpc_index), None)
734 }
735 } else {
736 (None, None)
737 };
738
739 info!("creating archive reader");
740 let (randomness_tx, randomness_rx) = mpsc::channel(
742 config
743 .p2p_config
744 .randomness
745 .clone()
746 .unwrap_or_default()
747 .mailbox_capacity(),
748 );
749 let P2pComponents {
750 p2p_network,
751 known_peers,
752 discovery_handle,
753 state_sync_handle,
754 randomness_handle,
755 endpoint_manager,
756 } = Self::create_p2p_network(
757 &config,
758 state_sync_store.clone(),
759 chain_identifier,
760 randomness_tx,
761 &prometheus_registry,
762 )?;
763
764 for peer in &config.p2p_config.peer_address_overrides {
766 endpoint_manager
767 .update_endpoint(
768 EndpointId::P2p(peer.peer_id),
769 AddressSource::Config,
770 peer.addresses.clone(),
771 )
772 .expect("Updating peer address overrides should not fail");
773 }
774
775 update_peer_addresses(
777 &config,
778 &endpoint_manager,
779 epoch_store.epoch_start_state(),
780 None,
781 );
782
783 info!("start snapshot upload");
784 let state_snapshot_handle = Self::start_state_snapshot(
786 &config,
787 &prometheus_registry,
788 checkpoint_store.clone(),
789 chain_identifier,
790 )?;
791
792 info!("start db checkpoint");
794 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
795 &config,
796 &prometheus_registry,
797 state_snapshot_handle.is_some(),
798 )?;
799
800 if !epoch_store
801 .protocol_config()
802 .simplified_unwrap_then_delete()
803 {
804 config
806 .authority_store_pruning_config
807 .set_killswitch_tombstone_pruning(true);
808 }
809
810 let authority_name = config.protocol_public_key();
811
812 info!("create authority state");
813 let state = AuthorityState::new(
814 authority_name,
815 secret,
816 config.supported_protocol_versions.unwrap(),
817 store.clone(),
818 cache_traits.clone(),
819 epoch_store.clone(),
820 committee_store.clone(),
821 index_store.clone(),
822 rpc_index,
823 embedded_rpc_store.as_ref().map(|embedded| embedded.store()),
824 checkpoint_store.clone(),
825 &prometheus_registry,
826 genesis.objects(),
827 &db_checkpoint_config,
828 config.clone(),
829 chain_identifier,
830 config.policy_config.clone(),
831 config.firewall_config.clone(),
832 pruner_watermarks,
833 )
834 .await;
835 if epoch_store.epoch() == 0 {
837 let txn = &genesis.transaction();
838 let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
839 let transaction =
840 sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
841 sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
842 genesis.transaction().data().clone(),
843 sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
844 ),
845 );
846 let _enter = span.enter();
847 state
848 .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
849 .unwrap();
850 }
851
852 let randomness_receiver_handle =
855 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
856
857 let (end_of_epoch_channel, end_of_epoch_receiver) =
858 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
859
860 let transaction_orchestrator = if node_role.is_fullnode() && run_with_range.is_none() {
861 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
862 auth_agg.load_full(),
863 state.clone(),
864 end_of_epoch_receiver,
865 &config.db_path(),
866 &prometheus_registry,
867 &config,
868 )))
869 } else {
870 None
871 };
872
873 let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
874 state.clone(),
875 state_sync_store,
876 &transaction_orchestrator.clone(),
877 &config,
878 &prometheus_registry,
879 server_version,
880 node_role,
881 embedded_rpc_store.as_ref(),
882 )
883 .await?;
884
885 if let Some(embedded) = embedded_rpc_store.as_mut() {
892 embedded.spawn_indexer(
893 subscription_service_checkpoint_sender.clone(),
894 prometheus_registry.clone(),
895 );
896 }
897
898 let global_state_hasher = Arc::new(GlobalStateHasher::new(
899 cache_traits.global_state_hash_store.clone(),
900 GlobalStateHashMetrics::new(&prometheus_registry),
901 ));
902
903 let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
904 "sui",
905 ®istry_service.default_registry(),
906 );
907
908 let connection_monitor_handle =
909 mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
910 p2p_network.downgrade(),
911 Arc::new(network_connection_metrics),
912 known_peers,
913 );
914
915 let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry()));
916
917 sui_node_metrics
918 .binary_max_protocol_version
919 .set(ProtocolVersion::MAX.as_u64() as i64);
920 sui_node_metrics
921 .configured_max_protocol_version
922 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
923
924 let node_role = epoch_store.node_role();
925 let validator_components = if node_role.runs_consensus() {
926 let mut components = Self::construct_validator_components(
927 config.clone(),
928 state.clone(),
929 committee,
930 epoch_store.clone(),
931 checkpoint_store.clone(),
932 state_sync_handle.clone(),
933 randomness_handle.clone(),
934 Arc::downgrade(&global_state_hasher),
935 backpressure_manager.clone(),
936 ®istry_service,
937 sui_node_metrics.clone(),
938 checkpoint_metrics.clone(),
939 node_role,
940 randomness_receiver_handle.clone(),
941 )
942 .await?;
943
944 if node_role.is_validator() {
945 components
946 .consensus_adapter
947 .recover_end_of_publish(&epoch_store);
948
949 components.validator_server_handle = Some(
951 components
952 .validator_server_handle
953 .take()
954 .unwrap()
955 .start()
956 .await,
957 );
958
959 endpoint_manager
961 .set_consensus_address_updater(components.consensus_manager.clone());
962 } else {
963 info!("Starting node as Observer — connecting to configured peers");
964 }
965
966 Some(components)
967 } else {
968 None
969 };
970
971 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
973
974 let node = Self {
975 config,
976 validator_components: Mutex::new(validator_components),
977 http_servers,
978 state,
979 transaction_orchestrator,
980 registry_service,
981 metrics: sui_node_metrics,
982 checkpoint_metrics,
983
984 _discovery: discovery_handle,
985 _connection_monitor_handle: connection_monitor_handle,
986 state_sync_handle,
987 randomness_handle,
988 checkpoint_store,
989 global_state_hasher: Mutex::new(Some(global_state_hasher)),
990 end_of_epoch_channel,
991 endpoint_manager,
992 backpressure_manager,
993
994 _db_checkpoint_handle: db_checkpoint_handle,
995
996 #[cfg(msim)]
997 sim_state: Default::default(),
998
999 _state_snapshot_uploader_handle: state_snapshot_handle,
1000 shutdown_channel_tx: shutdown_channel,
1001 randomness_receiver_handle,
1002
1003 auth_agg,
1004 subscription_service_checkpoint_sender,
1005 embedded_rpc_store,
1006 };
1007
1008 info!("SuiNode started!");
1009 let node = Arc::new(node);
1010 let node_copy = node.clone();
1011 spawn_monitored_task!(async move {
1012 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
1013 if let Err(error) = result {
1014 warn!("Reconfiguration finished with error {:?}", error);
1015 }
1016 });
1017
1018 Ok(node)
1019 }
1020
1021 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
1022 self.end_of_epoch_channel.subscribe()
1023 }
1024
1025 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
1026 self.shutdown_channel_tx.subscribe()
1027 }
1028
1029 pub fn current_epoch_for_testing(&self) -> EpochId {
1030 self.state.current_epoch_for_testing()
1031 }
1032
1033 pub fn db_checkpoint_path(&self) -> PathBuf {
1034 self.config.db_checkpoint_path()
1035 }
1036
1037 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
1039 info!("close_epoch (current epoch = {})", epoch_store.epoch());
1040 self.validator_components
1041 .lock()
1042 .await
1043 .as_ref()
1044 .ok_or_else(|| SuiError::from("Node is not a validator"))?
1045 .consensus_adapter
1046 .close_epoch(epoch_store);
1047 Ok(())
1048 }
1049
1050 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
1051 self.state
1052 .clear_override_protocol_upgrade_buffer_stake(epoch)
1053 }
1054
1055 pub fn set_override_protocol_upgrade_buffer_stake(
1056 &self,
1057 epoch: EpochId,
1058 buffer_stake_bps: u64,
1059 ) -> SuiResult {
1060 self.state
1061 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1062 }
1063
1064 pub async fn close_epoch_for_testing(&self) -> SuiResult {
1067 let epoch_store = self.state.epoch_store_for_testing();
1068 self.close_epoch(&epoch_store).await
1069 }
1070
1071 fn start_state_snapshot(
1072 config: &NodeConfig,
1073 prometheus_registry: &Registry,
1074 checkpoint_store: Arc<CheckpointStore>,
1075 chain_identifier: ChainIdentifier,
1076 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1077 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1078 let snapshot_uploader = StateSnapshotUploader::new(
1079 &config.db_checkpoint_path(),
1080 &config.snapshot_path(),
1081 remote_store_config.clone(),
1082 60,
1083 prometheus_registry,
1084 checkpoint_store,
1085 chain_identifier,
1086 config.state_snapshot_write_config.archive_interval_epochs,
1087 )?;
1088 Ok(Some(snapshot_uploader.start()))
1089 } else {
1090 Ok(None)
1091 }
1092 }
1093
1094 fn start_db_checkpoint(
1095 config: &NodeConfig,
1096 prometheus_registry: &Registry,
1097 state_snapshot_enabled: bool,
1098 ) -> Result<(
1099 DBCheckpointConfig,
1100 Option<tokio::sync::broadcast::Sender<()>>,
1101 )> {
1102 let checkpoint_path = Some(
1103 config
1104 .db_checkpoint_config
1105 .checkpoint_path
1106 .clone()
1107 .unwrap_or_else(|| config.db_checkpoint_path()),
1108 );
1109 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1110 DBCheckpointConfig {
1111 checkpoint_path,
1112 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1113 true
1114 } else {
1115 config
1116 .db_checkpoint_config
1117 .perform_db_checkpoints_at_epoch_end
1118 },
1119 ..config.db_checkpoint_config.clone()
1120 }
1121 } else {
1122 config.db_checkpoint_config.clone()
1123 };
1124
1125 match (
1126 db_checkpoint_config.object_store_config.as_ref(),
1127 state_snapshot_enabled,
1128 ) {
1129 (None, false) => Ok((db_checkpoint_config, None)),
1134 (_, _) => {
1135 let handler = DBCheckpointHandler::new(
1136 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1137 db_checkpoint_config.object_store_config.as_ref(),
1138 60,
1139 db_checkpoint_config
1140 .prune_and_compact_before_upload
1141 .unwrap_or(true),
1142 config.authority_store_pruning_config.clone(),
1143 prometheus_registry,
1144 state_snapshot_enabled,
1145 )?;
1146 Ok((
1147 db_checkpoint_config,
1148 Some(DBCheckpointHandler::start(handler)),
1149 ))
1150 }
1151 }
1152 }
1153
1154 fn create_p2p_network(
1155 config: &NodeConfig,
1156 state_sync_store: RocksDbStore,
1157 chain_identifier: ChainIdentifier,
1158 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1159 prometheus_registry: &Registry,
1160 ) -> Result<P2pComponents> {
1161 let mut p2p_config = config.p2p_config.clone();
1162 {
1163 let disc = p2p_config.discovery.get_or_insert_with(Default::default);
1164 if disc.peer_addr_store_path.is_none() {
1165 disc.peer_addr_store_path =
1166 Some(config.db_path().join("discovery_peer_cache.yaml"));
1167 }
1168 }
1169 let mut discovery_builder = discovery::Builder::new().config(p2p_config.clone());
1170 if let Some(consensus_config) = &config.consensus_config {
1171 let effective_addr = consensus_config
1172 .external_address
1173 .as_ref()
1174 .or(consensus_config.listen_address.as_ref());
1175 if let Some(addr) = effective_addr {
1176 discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1177 }
1178 }
1179 let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1180 let discovery_sender = discovery.sender();
1181
1182 let (state_sync, state_sync_router) = state_sync::Builder::new()
1183 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1184 .store(state_sync_store)
1185 .archive_config(config.archive_reader_config())
1186 .discovery_sender(discovery_sender)
1187 .with_metrics(prometheus_registry)
1188 .build();
1189
1190 let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1191 let known_peers: HashMap<PeerId, String> = discovery_config
1192 .allowlisted_peers
1193 .clone()
1194 .into_iter()
1195 .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1196 .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1197 peer.peer_id
1198 .map(|peer_id| (peer_id, "seed_peer".to_string()))
1199 }))
1200 .collect();
1201
1202 let (randomness, randomness_router) =
1203 randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1204 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1205 .with_metrics(prometheus_registry)
1206 .build();
1207
1208 let p2p_network = {
1209 let routes = anemo::Router::new()
1210 .add_rpc_service(discovery_server)
1211 .merge(state_sync_router);
1212 let routes = routes.merge(randomness_router);
1213
1214 let inbound_network_metrics =
1215 mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1216 let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1217 "sui",
1218 "outbound",
1219 prometheus_registry,
1220 );
1221
1222 let service = ServiceBuilder::new()
1223 .layer(
1224 TraceLayer::new_for_server_errors()
1225 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1226 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1227 )
1228 .layer(CallbackLayer::new(
1229 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1230 Arc::new(inbound_network_metrics),
1231 config.p2p_config.excessive_message_size(),
1232 ),
1233 ))
1234 .service(routes);
1235
1236 let outbound_layer = ServiceBuilder::new()
1237 .layer(
1238 TraceLayer::new_for_client_and_server_errors()
1239 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1240 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1241 )
1242 .layer(CallbackLayer::new(
1243 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1244 Arc::new(outbound_network_metrics),
1245 config.p2p_config.excessive_message_size(),
1246 ),
1247 ))
1248 .into_inner();
1249
1250 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1251 anemo_config.max_request_frame_size = Some(1 << 20);
1254 anemo_config.max_response_frame_size = Some(128 << 20);
1257
1258 let mut quic_config = anemo_config.quic.unwrap_or_default();
1261 if quic_config.socket_send_buffer_size.is_none() {
1262 quic_config.socket_send_buffer_size = Some(20 << 20);
1263 }
1264 if quic_config.socket_receive_buffer_size.is_none() {
1265 quic_config.socket_receive_buffer_size = Some(20 << 20);
1266 }
1267 quic_config.allow_failed_socket_buffer_size_setting = true;
1268
1269 if quic_config.max_concurrent_bidi_streams.is_none() {
1272 quic_config.max_concurrent_bidi_streams = Some(500);
1273 }
1274 if quic_config.max_concurrent_uni_streams.is_none() {
1275 quic_config.max_concurrent_uni_streams = Some(500);
1276 }
1277 if quic_config.stream_receive_window.is_none() {
1278 quic_config.stream_receive_window = Some(100 << 20);
1279 }
1280 if quic_config.receive_window.is_none() {
1281 quic_config.receive_window = Some(200 << 20);
1282 }
1283 if quic_config.send_window.is_none() {
1284 quic_config.send_window = Some(200 << 20);
1285 }
1286 if quic_config.crypto_buffer_size.is_none() {
1287 quic_config.crypto_buffer_size = Some(1 << 20);
1288 }
1289 if quic_config.max_idle_timeout_ms.is_none() {
1290 quic_config.max_idle_timeout_ms = Some(10_000);
1291 }
1292 if quic_config.keep_alive_interval_ms.is_none() {
1293 quic_config.keep_alive_interval_ms = Some(5_000);
1294 }
1295 anemo_config.quic = Some(quic_config);
1296
1297 let server_name = format!("sui-{}", chain_identifier);
1298 let network = Network::bind(config.p2p_config.listen_address)
1299 .server_name(&server_name)
1300 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1301 .config(anemo_config)
1302 .outbound_request_layer(outbound_layer)
1303 .start(service)?;
1304 info!(
1305 server_name = server_name,
1306 "P2p network started on {}",
1307 network.local_addr()
1308 );
1309
1310 network
1311 };
1312
1313 let discovery_handle =
1314 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1315 let state_sync_handle = state_sync.start(p2p_network.clone());
1316 let randomness_handle = randomness.start(p2p_network.clone());
1317
1318 Ok(P2pComponents {
1319 p2p_network,
1320 known_peers,
1321 discovery_handle,
1322 state_sync_handle,
1323 randomness_handle,
1324 endpoint_manager,
1325 })
1326 }
1327
1328 async fn construct_validator_components(
1329 config: NodeConfig,
1330 state: Arc<AuthorityState>,
1331 committee: Arc<Committee>,
1332 epoch_store: Arc<AuthorityPerEpochStore>,
1333 checkpoint_store: Arc<CheckpointStore>,
1334 state_sync_handle: state_sync::Handle,
1335 randomness_handle: randomness::Handle,
1336 global_state_hasher: Weak<GlobalStateHasher>,
1337 backpressure_manager: Arc<BackpressureManager>,
1338 registry_service: &RegistryService,
1339 sui_node_metrics: Arc<SuiNodeMetrics>,
1340 checkpoint_metrics: Arc<CheckpointMetrics>,
1341 node_role: NodeRole,
1342 randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
1343 ) -> Result<ValidatorComponents> {
1344 let mut config_clone = config.clone();
1345 let consensus_config = config_clone
1346 .consensus_config
1347 .as_mut()
1348 .ok_or_else(|| anyhow!("Node is missing consensus config"))?;
1349
1350 let client = Arc::new(UpdatableConsensusClient::new());
1351 let inflight_slot_freed_notify = Arc::new(tokio::sync::Notify::new());
1352 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1353 &committee,
1354 consensus_config,
1355 state.name,
1356 ®istry_service.default_registry(),
1357 client.clone(),
1358 checkpoint_store.clone(),
1359 inflight_slot_freed_notify.clone(),
1360 ));
1361
1362 let consensus_manager = Arc::new(ConsensusManager::new(
1363 &config,
1364 consensus_config,
1365 registry_service,
1366 client,
1367 node_role,
1368 ));
1369
1370 let consensus_store_pruner = ConsensusStorePruner::new(
1372 consensus_manager.get_storage_base_path(),
1373 consensus_config.db_retention_epochs(),
1374 consensus_config.db_pruner_period(),
1375 ®istry_service.default_registry(),
1376 );
1377
1378 let sui_tx_validator_metrics =
1379 SuiTxValidatorMetrics::new(®istry_service.default_registry());
1380
1381 let (validator_server_handle, admission_queue) = if node_role.is_validator() {
1382 let (handle, queue) = Self::start_grpc_validator_service(
1383 &config,
1384 state.clone(),
1385 consensus_adapter.clone(),
1386 epoch_store.clone(),
1387 ®istry_service.default_registry(),
1388 inflight_slot_freed_notify,
1389 )
1390 .await?;
1391 (Some(handle), queue)
1392 } else {
1393 (None, None)
1394 };
1395
1396 let validator_overload_monitor_handle = if node_role.is_validator()
1399 && config
1400 .authority_overload_config
1401 .max_load_shedding_percentage
1402 > 0
1403 {
1404 let authority_state = Arc::downgrade(&state);
1405 let overload_config = config.authority_overload_config.clone();
1406 fail_point!("starting_overload_monitor");
1407 Some(spawn_monitored_task!(overload_monitor(
1408 authority_state,
1409 overload_config,
1410 )))
1411 } else {
1412 None
1413 };
1414
1415 Self::start_epoch_specific_validator_components(
1416 &config,
1417 state.clone(),
1418 consensus_adapter,
1419 checkpoint_store,
1420 epoch_store,
1421 state_sync_handle,
1422 randomness_handle,
1423 randomness_receiver_handle,
1424 consensus_manager,
1425 consensus_store_pruner,
1426 global_state_hasher,
1427 backpressure_manager,
1428 validator_server_handle,
1429 validator_overload_monitor_handle,
1430 checkpoint_metrics,
1431 sui_node_metrics,
1432 sui_tx_validator_metrics,
1433 admission_queue,
1434 node_role,
1435 )
1436 .await
1437 }
1438
1439 async fn start_epoch_specific_validator_components(
1440 config: &NodeConfig,
1441 state: Arc<AuthorityState>,
1442 consensus_adapter: Arc<ConsensusAdapter>,
1443 checkpoint_store: Arc<CheckpointStore>,
1444 epoch_store: Arc<AuthorityPerEpochStore>,
1445 state_sync_handle: state_sync::Handle,
1446 randomness_handle: randomness::Handle,
1447 randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
1448 consensus_manager: Arc<ConsensusManager>,
1449 consensus_store_pruner: ConsensusStorePruner,
1450 state_hasher: Weak<GlobalStateHasher>,
1451 backpressure_manager: Arc<BackpressureManager>,
1452 validator_server_handle: Option<SpawnOnce>,
1453 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1454 checkpoint_metrics: Arc<CheckpointMetrics>,
1455 sui_node_metrics: Arc<SuiNodeMetrics>,
1456 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1457 admission_queue: Option<AdmissionQueueContext>,
1458 node_role: NodeRole,
1459 ) -> Result<ValidatorComponents> {
1460 let checkpoint_service = Self::build_checkpoint_service(
1461 config,
1462 consensus_adapter.clone(),
1463 checkpoint_store.clone(),
1464 epoch_store.clone(),
1465 state.clone(),
1466 state_sync_handle,
1467 state_hasher,
1468 checkpoint_metrics.clone(),
1469 node_role,
1470 );
1471
1472 randomness_receiver_handle.clear_public_key();
1475
1476 if node_role.runs_consensus() && epoch_store.randomness_state_enabled() {
1477 let authority_key_pair = if node_role.is_validator() {
1478 Some(config.protocol_key_pair())
1479 } else {
1480 None
1481 };
1482 let randomness_manager = RandomnessManager::try_new(
1483 Arc::downgrade(&epoch_store),
1484 Box::new(consensus_adapter.clone()),
1485 randomness_handle,
1486 authority_key_pair,
1487 randomness_receiver_handle.clone(),
1488 )
1489 .await;
1490 if let Some(randomness_manager) = randomness_manager {
1491 epoch_store
1492 .set_randomness_manager(randomness_manager)
1493 .await?;
1494 }
1495 }
1496
1497 if node_role.is_validator() {
1498 ExecutionTimeObserver::spawn(
1499 epoch_store.clone(),
1500 Box::new(consensus_adapter.clone()),
1501 config
1502 .execution_time_observer_config
1503 .clone()
1504 .unwrap_or_default(),
1505 );
1506 }
1507
1508 let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1509 None,
1510 state.metrics.clone(),
1511 ));
1512
1513 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1514 state.clone(),
1515 checkpoint_service.clone(),
1516 epoch_store.clone(),
1517 consensus_adapter.clone(),
1518 throughput_calculator,
1519 backpressure_manager,
1520 config.congestion_log.clone(),
1521 );
1522
1523 info!("Starting consensus manager asynchronously");
1524
1525 tokio::spawn({
1527 let config = config.clone();
1528 let epoch_store = epoch_store.clone();
1529 let sui_tx_validator = SuiTxValidator::new(
1530 state.clone(),
1531 epoch_store.clone(),
1532 checkpoint_service.clone(),
1533 sui_tx_validator_metrics.clone(),
1534 );
1535 let consensus_manager = consensus_manager.clone();
1536 async move {
1537 consensus_manager
1538 .start(
1539 &config,
1540 epoch_store,
1541 consensus_handler_initializer,
1542 sui_tx_validator,
1543 Some(randomness_receiver_handle),
1544 )
1545 .await;
1546 }
1547 });
1548 let replay_waiter = consensus_manager.replay_waiter();
1549
1550 info!("Spawning checkpoint service");
1551 let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1552 None
1553 } else {
1554 Some(replay_waiter)
1555 };
1556 checkpoint_service
1557 .spawn(epoch_store.clone(), replay_waiter)
1558 .await;
1559
1560 if node_role.is_validator() && epoch_store.authenticator_state_enabled() {
1561 Self::start_jwk_updater(
1562 config,
1563 sui_node_metrics,
1564 state.name,
1565 epoch_store.clone(),
1566 consensus_adapter.clone(),
1567 );
1568 }
1569
1570 if let Some(ctx) = &admission_queue {
1571 ctx.rotate_for_epoch(epoch_store);
1572 }
1573
1574 Ok(ValidatorComponents {
1575 validator_server_handle,
1576 validator_overload_monitor_handle,
1577 consensus_manager,
1578 consensus_store_pruner,
1579 consensus_adapter,
1580 checkpoint_metrics,
1581 sui_tx_validator_metrics,
1582 admission_queue,
1583 })
1584 }
1585
1586 fn build_checkpoint_service(
1587 config: &NodeConfig,
1588 consensus_adapter: Arc<ConsensusAdapter>,
1589 checkpoint_store: Arc<CheckpointStore>,
1590 epoch_store: Arc<AuthorityPerEpochStore>,
1591 state: Arc<AuthorityState>,
1592 state_sync_handle: state_sync::Handle,
1593 state_hasher: Weak<GlobalStateHasher>,
1594 checkpoint_metrics: Arc<CheckpointMetrics>,
1595 node_role: NodeRole,
1596 ) -> Arc<CheckpointService> {
1597 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1598 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1599
1600 debug!(
1601 "Starting checkpoint service with epoch start timestamp {}
1602 and epoch duration {}",
1603 epoch_start_timestamp_ms, epoch_duration_ms
1604 );
1605
1606 let checkpoint_output: Box<dyn CheckpointOutput> = if node_role.is_validator() {
1607 Box::new(SubmitCheckpointToConsensus {
1608 sender: consensus_adapter,
1609 signer: state.secret.clone(),
1610 authority: config.protocol_public_key(),
1611 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1612 .checked_add(epoch_duration_ms)
1613 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1614 metrics: checkpoint_metrics.clone(),
1615 })
1616 } else {
1617 LogCheckpointOutput::boxed()
1618 };
1619
1620 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1621
1622 CheckpointService::build(
1623 state.clone(),
1624 checkpoint_store,
1625 epoch_store,
1626 state.get_transaction_cache_reader().clone(),
1627 state_hasher,
1628 checkpoint_output,
1629 Box::new(certified_checkpoint_output),
1630 checkpoint_metrics,
1631 )
1632 }
1633
1634 fn construct_consensus_adapter(
1635 committee: &Committee,
1636 consensus_config: &ConsensusConfig,
1637 authority: AuthorityName,
1638 prometheus_registry: &Registry,
1639 consensus_client: Arc<dyn ConsensusClient>,
1640 checkpoint_store: Arc<CheckpointStore>,
1641 inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1642 ) -> ConsensusAdapter {
1643 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1644 ConsensusAdapter::new(
1647 consensus_client,
1648 checkpoint_store,
1649 authority,
1650 consensus_config.max_pending_transactions(),
1651 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1652 ca_metrics,
1653 inflight_slot_freed_notify,
1654 )
1655 }
1656
1657 async fn start_grpc_validator_service(
1658 config: &NodeConfig,
1659 state: Arc<AuthorityState>,
1660 consensus_adapter: Arc<ConsensusAdapter>,
1661 epoch_store: Arc<AuthorityPerEpochStore>,
1662 prometheus_registry: &Registry,
1663 inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1664 ) -> Result<(SpawnOnce, Option<AdmissionQueueContext>)> {
1665 let overload_config = &config.authority_overload_config;
1666 let admission_queue = overload_config.admission_queue_enabled.then(|| {
1667 let manager = Arc::new(AdmissionQueueManager::new(
1668 consensus_adapter.clone(),
1669 Arc::new(AdmissionQueueMetrics::new(prometheus_registry)),
1670 overload_config.admission_queue_capacity_fraction,
1671 overload_config.admission_queue_bypass_fraction,
1672 overload_config.admission_queue_failover_timeout,
1673 inflight_slot_freed_notify,
1674 ));
1675 AdmissionQueueContext::spawn(manager, epoch_store)
1676 });
1677 let validator_service = ValidatorService::new(
1678 state.clone(),
1679 consensus_adapter,
1680 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1681 config.policy_config.clone().map(|p| p.client_id_source),
1682 admission_queue.clone(),
1683 );
1684
1685 let mut server_conf = mysten_network::config::Config::new();
1686 server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1687 server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1688 server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1689 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1690 server_conf.load_shed = config.grpc_load_shed;
1691 let mut server_builder =
1692 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1693
1694 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1695
1696 let tls_config = sui_tls::create_rustls_server_config(
1697 config.network_key_pair().copy().private(),
1698 SUI_TLS_SERVER_NAME.to_string(),
1699 );
1700
1701 let network_address = config.network_address().clone();
1702
1703 let (ready_tx, ready_rx) = oneshot::channel();
1704
1705 let spawn_once = SpawnOnce::new(ready_rx, async move {
1706 let server = server_builder
1707 .bind(&network_address, Some(tls_config))
1708 .await
1709 .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1710 let local_addr = server.local_addr();
1711 info!("Listening to traffic on {local_addr}");
1712 ready_tx.send(()).unwrap();
1713 if let Err(err) = server.serve().await {
1714 info!("Server stopped: {err}");
1715 }
1716 info!("Server stopped");
1717 });
1718 Ok((spawn_once, admission_queue))
1719 }
1720
1721 pub fn state(&self) -> Arc<AuthorityState> {
1722 self.state.clone()
1723 }
1724
1725 pub fn embedded_rpc_store(&self) -> Option<&EmbeddedRpcStore> {
1731 self.embedded_rpc_store.as_ref()
1732 }
1733
1734 #[cfg(any(test, msim))]
1735 pub fn connection_monitor_handle_for_testing(
1736 &self,
1737 ) -> &mysten_network::anemo_connection_monitor::ConnectionMonitorHandle {
1738 &self._connection_monitor_handle
1739 }
1740
1741 pub fn node_role(&self) -> NodeRole {
1742 self.state.load_epoch_store_one_call_per_task().node_role()
1743 }
1744
1745 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1747 self.state.reference_gas_price_for_testing()
1748 }
1749
1750 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1751 self.state.committee_store().clone()
1752 }
1753
1754 pub fn clone_checkpoint_store(&self) -> Arc<CheckpointStore> {
1755 self.checkpoint_store.clone()
1756 }
1757
1758 pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1759 self.state.authority_store()
1760 }
1761
1762 pub fn clone_consensus_store(
1763 &self,
1764 ) -> Option<Arc<consensus_core::storage::rocksdb_store::RocksDBStore>> {
1765 self.validator_components
1766 .try_lock()
1767 .ok()?
1768 .as_ref()?
1769 .consensus_manager
1770 .consensus_store()
1771 }
1772
1773 pub fn clone_authority_aggregator(
1778 &self,
1779 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1780 self.transaction_orchestrator
1781 .as_ref()
1782 .map(|to| to.clone_authority_aggregator())
1783 }
1784
1785 pub fn transaction_orchestrator(
1786 &self,
1787 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1788 self.transaction_orchestrator.clone()
1789 }
1790
1791 pub async fn monitor_reconfiguration(
1794 self: Arc<Self>,
1795 mut epoch_store: Arc<AuthorityPerEpochStore>,
1796 ) -> Result<()> {
1797 let checkpoint_executor_metrics =
1798 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1799
1800 loop {
1801 let mut hasher_guard = self.global_state_hasher.lock().await;
1802 let hasher = hasher_guard.take().unwrap();
1803 info!(
1804 "Creating checkpoint executor for epoch {}",
1805 epoch_store.epoch()
1806 );
1807 let checkpoint_executor = CheckpointExecutor::new(
1808 epoch_store.clone(),
1809 self.checkpoint_store.clone(),
1810 self.state.clone(),
1811 hasher.clone(),
1812 self.backpressure_manager.clone(),
1813 self.config.checkpoint_executor_config.clone(),
1814 checkpoint_executor_metrics.clone(),
1815 self.subscription_service_checkpoint_sender.clone(),
1816 );
1817
1818 let run_with_range = self.config.run_with_range;
1819
1820 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1821
1822 self.metrics
1824 .current_protocol_version
1825 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1826
1827 if let Some(components) = &*self.validator_components.lock().await
1830 && cur_epoch_store.is_validator()
1831 {
1832 tokio::time::sleep(Duration::from_millis(1)).await;
1834
1835 let config = cur_epoch_store.protocol_config();
1836 let mut supported_protocol_versions = self
1837 .config
1838 .supported_protocol_versions
1839 .expect("Supported versions should be populated")
1840 .truncate_below(config.version);
1842
1843 while supported_protocol_versions.max > config.version {
1844 let proposed_protocol_config = ProtocolConfig::get_for_version(
1845 supported_protocol_versions.max,
1846 cur_epoch_store.get_chain(),
1847 );
1848
1849 if proposed_protocol_config.enable_accumulators()
1850 && !epoch_store.accumulator_root_exists()
1851 {
1852 error!(
1853 "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1854 supported_protocol_versions.max
1855 );
1856 supported_protocol_versions.max = supported_protocol_versions.max.prev();
1857 } else {
1858 break;
1859 }
1860 }
1861
1862 let binary_config = config.binary_config(None);
1863 let transaction = ConsensusTransaction::new_capability_notification_v2(
1864 AuthorityCapabilitiesV2::new(
1865 self.state.name,
1866 cur_epoch_store.get_chain_identifier().chain(),
1867 supported_protocol_versions,
1868 self.state
1869 .get_available_system_packages(&binary_config)
1870 .await,
1871 ),
1872 );
1873 info!(?transaction, "submitting capabilities to consensus");
1874 components.consensus_adapter.submit(
1875 transaction,
1876 None,
1877 &cur_epoch_store,
1878 None,
1879 None,
1880 )?;
1881 }
1882
1883 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1884
1885 if stop_condition == StopReason::RunWithRangeCondition {
1886 SuiNode::shutdown(&self).await;
1887 self.shutdown_channel_tx
1888 .send(run_with_range)
1889 .expect("RunWithRangeCondition met but failed to send shutdown message");
1890 return Ok(());
1891 }
1892
1893 let latest_system_state = self
1895 .state
1896 .get_object_cache_reader()
1897 .get_sui_system_state_object_unsafe()
1898 .expect("Read Sui System State object cannot fail");
1899
1900 #[cfg(msim)]
1901 if !self
1902 .sim_state
1903 .sim_safe_mode_expected
1904 .load(Ordering::Relaxed)
1905 {
1906 debug_assert!(!latest_system_state.safe_mode());
1907 }
1908
1909 #[cfg(not(msim))]
1910 debug_assert!(!latest_system_state.safe_mode());
1911
1912 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1913 && self.state.is_fullnode(&cur_epoch_store)
1914 {
1915 warn!(
1916 "Failed to send end of epoch notification to subscriber: {:?}",
1917 err
1918 );
1919 }
1920
1921 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1922 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1923
1924 self.auth_agg.store(Arc::new(
1925 self.auth_agg
1926 .load()
1927 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1928 ));
1929
1930 let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1931 let next_epoch = next_epoch_committee.epoch();
1932 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1933
1934 info!(
1935 next_epoch,
1936 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1937 );
1938
1939 fail_point_async!("reconfig_delay");
1940
1941 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1942
1943 update_peer_addresses(
1944 &self.config,
1945 &self.endpoint_manager,
1946 &new_epoch_start_state,
1947 Some(cur_epoch_store.epoch_start_state()),
1948 );
1949
1950 let mut validator_components_lock_guard = self.validator_components.lock().await;
1951
1952 let new_epoch_store = self
1956 .reconfigure_state(
1957 &self.state,
1958 &cur_epoch_store,
1959 next_epoch_committee.clone(),
1960 new_epoch_start_state,
1961 hasher.clone(),
1962 )
1963 .await;
1964
1965 let new_role = new_epoch_store.node_role();
1966
1967 let new_validator_components = if let Some(ValidatorComponents {
1968 validator_server_handle,
1969 validator_overload_monitor_handle,
1970 consensus_manager,
1971 consensus_store_pruner,
1972 consensus_adapter,
1973 checkpoint_metrics,
1974 sui_tx_validator_metrics,
1975 admission_queue,
1976 }) = validator_components_lock_guard.take()
1977 {
1978 info!("Reconfiguring node (was running consensus).");
1979
1980 consensus_manager.shutdown().await;
1981 info!("Consensus has shut down.");
1982
1983 info!("Epoch store finished reconfiguration.");
1984
1985 let global_state_hasher_metrics = Arc::into_inner(hasher)
1988 .expect("Object state hasher should have no other references at this point")
1989 .metrics();
1990 let new_hasher = Arc::new(GlobalStateHasher::new(
1991 self.state.get_global_state_hash_store().clone(),
1992 global_state_hasher_metrics,
1993 ));
1994 let weak_hasher = Arc::downgrade(&new_hasher);
1995 *hasher_guard = Some(new_hasher);
1996
1997 consensus_store_pruner.prune(next_epoch).await;
1998
1999 if new_role.runs_consensus() {
2000 info!("Restarting consensus as {new_role}");
2001 Some(
2002 Self::start_epoch_specific_validator_components(
2003 &self.config,
2004 self.state.clone(),
2005 consensus_adapter,
2006 self.checkpoint_store.clone(),
2007 new_epoch_store.clone(),
2008 self.state_sync_handle.clone(),
2009 self.randomness_handle.clone(),
2010 self.randomness_receiver_handle.clone(),
2011 consensus_manager,
2012 consensus_store_pruner,
2013 weak_hasher,
2014 self.backpressure_manager.clone(),
2015 validator_server_handle,
2016 validator_overload_monitor_handle,
2017 checkpoint_metrics,
2018 self.metrics.clone(),
2019 sui_tx_validator_metrics,
2020 admission_queue,
2021 new_role,
2022 )
2023 .await?,
2024 )
2025 } else {
2026 info!(
2027 "This node has new role {new_role} and no longer runs consensus after reconfiguration"
2028 );
2029 None
2030 }
2031 } else {
2032 let global_state_hasher_metrics = Arc::into_inner(hasher)
2035 .expect("Object state hasher should have no other references at this point")
2036 .metrics();
2037 let new_hasher = Arc::new(GlobalStateHasher::new(
2038 self.state.get_global_state_hash_store().clone(),
2039 global_state_hasher_metrics,
2040 ));
2041 let weak_hasher = Arc::downgrade(&new_hasher);
2042 *hasher_guard = Some(new_hasher);
2043
2044 if new_role.runs_consensus() {
2045 info!("Promoting node to {new_role}, starting consensus components");
2046
2047 let mut components = Self::construct_validator_components(
2048 self.config.clone(),
2049 self.state.clone(),
2050 Arc::new(next_epoch_committee.clone()),
2051 new_epoch_store.clone(),
2052 self.checkpoint_store.clone(),
2053 self.state_sync_handle.clone(),
2054 self.randomness_handle.clone(),
2055 weak_hasher,
2056 self.backpressure_manager.clone(),
2057 &self.registry_service,
2058 self.metrics.clone(),
2059 self.checkpoint_metrics.clone(),
2060 new_role,
2061 self.randomness_receiver_handle.clone(),
2062 )
2063 .await?;
2064
2065 if new_role.is_validator() {
2066 components.validator_server_handle = Some(
2067 components
2068 .validator_server_handle
2069 .take()
2070 .unwrap()
2071 .start()
2072 .await,
2073 );
2074
2075 self.endpoint_manager
2076 .set_consensus_address_updater(components.consensus_manager.clone());
2077 }
2078
2079 Some(components)
2080 } else {
2081 None
2082 }
2083 };
2084 *validator_components_lock_guard = new_validator_components;
2085
2086 cur_epoch_store.release_db_handles();
2089
2090 if cfg!(msim)
2091 && !matches!(
2092 self.config
2093 .authority_store_pruning_config
2094 .num_epochs_to_retain_for_checkpoints(),
2095 None | Some(u64::MAX) | Some(0)
2096 )
2097 {
2098 self.state
2099 .prune_checkpoints_for_eligible_epochs_for_testing(
2100 self.config.clone(),
2101 sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2102 )
2103 .await?;
2104 }
2105
2106 epoch_store = new_epoch_store;
2107 info!("Reconfiguration finished");
2108 }
2109 }
2110
2111 async fn shutdown(&self) {
2112 if let Some(validator_components) = &*self.validator_components.lock().await {
2113 validator_components.consensus_manager.shutdown().await;
2114 }
2115 }
2116
2117 async fn reconfigure_state(
2118 &self,
2119 state: &Arc<AuthorityState>,
2120 cur_epoch_store: &AuthorityPerEpochStore,
2121 next_epoch_committee: Committee,
2122 next_epoch_start_system_state: EpochStartSystemState,
2123 global_state_hasher: Arc<GlobalStateHasher>,
2124 ) -> Arc<AuthorityPerEpochStore> {
2125 let next_epoch = next_epoch_committee.epoch();
2126
2127 let last_checkpoint = self
2128 .checkpoint_store
2129 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2130 .expect("Error loading last checkpoint for current epoch")
2131 .expect("Could not load last checkpoint for current epoch");
2132
2133 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2134
2135 assert_eq!(
2136 Some(last_checkpoint_seq),
2137 self.checkpoint_store
2138 .get_highest_executed_checkpoint_seq_number()
2139 .expect("Error loading highest executed checkpoint sequence number")
2140 );
2141
2142 let epoch_start_configuration = EpochStartConfiguration::new(
2143 next_epoch_start_system_state,
2144 *last_checkpoint.digest(),
2145 state.get_object_store().as_ref(),
2146 EpochFlag::default_flags_for_new_epoch(&state.config),
2147 )
2148 .expect("EpochStartConfiguration construction cannot fail");
2149
2150 let new_epoch_store = self
2151 .state
2152 .reconfigure(
2153 cur_epoch_store,
2154 self.config.supported_protocol_versions.unwrap(),
2155 next_epoch_committee,
2156 epoch_start_configuration,
2157 global_state_hasher,
2158 &self.config.expensive_safety_check_config,
2159 last_checkpoint_seq,
2160 )
2161 .await
2162 .expect("Reconfigure authority state cannot fail");
2163 info!(next_epoch, "Node State has been reconfigured");
2164 assert_eq!(next_epoch, new_epoch_store.epoch());
2165 self.state.get_reconfig_api().update_epoch_flags_metrics(
2166 cur_epoch_store.epoch_start_config().flags(),
2167 new_epoch_store.epoch_start_config().flags(),
2168 );
2169
2170 new_epoch_store
2171 }
2172
2173 pub fn get_config(&self) -> &NodeConfig {
2174 &self.config
2175 }
2176
2177 pub fn randomness_handle(&self) -> randomness::Handle {
2178 self.randomness_handle.clone()
2179 }
2180
2181 pub fn state_sync_handle(&self) -> state_sync::Handle {
2182 self.state_sync_handle.clone()
2183 }
2184
2185 pub fn endpoint_manager(&self) -> &EndpointManager {
2186 &self.endpoint_manager
2187 }
2188
2189 fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2191 let digest_str = digest.to_string();
2192 if digest_str.len() >= 8 {
2193 digest_str[0..8].to_string()
2194 } else {
2195 digest_str
2196 }
2197 }
2198
2199 async fn check_and_recover_forks(
2203 checkpoint_store: &CheckpointStore,
2204 checkpoint_metrics: &CheckpointMetrics,
2205 fork_recovery: Option<&ForkRecoveryConfig>,
2206 ) -> Result<()> {
2207 if let Some(recovery) = fork_recovery {
2209 Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2210 Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2211 }
2212
2213 if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2214 .get_checkpoint_fork_detected()
2215 .map_err(|e| {
2216 error!("Failed to check for checkpoint fork: {:?}", e);
2217 e
2218 })?
2219 {
2220 Self::handle_checkpoint_fork(
2221 checkpoint_seq,
2222 checkpoint_digest,
2223 checkpoint_metrics,
2224 fork_recovery,
2225 )
2226 .await?;
2227 }
2228 if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2229 .get_transaction_fork_detected()
2230 .map_err(|e| {
2231 error!("Failed to check for transaction fork: {:?}", e);
2232 e
2233 })?
2234 {
2235 Self::handle_transaction_fork(
2236 tx_digest,
2237 expected_effects,
2238 actual_effects,
2239 checkpoint_metrics,
2240 fork_recovery,
2241 )
2242 .await?;
2243 }
2244
2245 Ok(())
2246 }
2247
2248 fn try_recover_checkpoint_fork(
2249 checkpoint_store: &CheckpointStore,
2250 recovery: &ForkRecoveryConfig,
2251 ) -> Result<()> {
2252 for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2255 let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2256 anyhow::bail!(
2257 "Invalid checkpoint digest override for seq {}: {}",
2258 seq,
2259 expected_digest_str
2260 );
2261 };
2262
2263 if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2264 let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2265 if local_digest != expected_digest {
2266 info!(
2267 seq,
2268 local = %Self::get_digest_prefix(local_digest),
2269 expected = %Self::get_digest_prefix(expected_digest),
2270 "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2271 seq
2272 );
2273 checkpoint_store
2274 .clear_locally_computed_checkpoints_from(*seq)
2275 .context(
2276 "Failed to clear locally computed checkpoints from override seq",
2277 )?;
2278 }
2279 }
2280 }
2281
2282 if let Some((checkpoint_seq, checkpoint_digest)) =
2283 checkpoint_store.get_checkpoint_fork_detected()?
2284 && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2285 {
2286 info!(
2287 "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2288 checkpoint_seq, checkpoint_digest
2289 );
2290 checkpoint_store
2291 .clear_checkpoint_fork_detected()
2292 .expect("Failed to clear checkpoint fork detected marker");
2293 }
2294 Ok(())
2295 }
2296
2297 fn try_recover_transaction_fork(
2298 checkpoint_store: &CheckpointStore,
2299 recovery: &ForkRecoveryConfig,
2300 ) -> Result<()> {
2301 if recovery.transaction_overrides.is_empty() {
2302 return Ok(());
2303 }
2304
2305 if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2306 && recovery
2307 .transaction_overrides
2308 .contains_key(&tx_digest.to_string())
2309 {
2310 info!(
2311 "Fork recovery enabled: clearing transaction fork for tx {:?}",
2312 tx_digest
2313 );
2314 checkpoint_store
2315 .clear_transaction_fork_detected()
2316 .expect("Failed to clear transaction fork detected marker");
2317 }
2318 Ok(())
2319 }
2320
2321 fn get_current_timestamp() -> u64 {
2322 std::time::SystemTime::now()
2323 .duration_since(std::time::SystemTime::UNIX_EPOCH)
2324 .unwrap()
2325 .as_secs()
2326 }
2327
2328 async fn handle_checkpoint_fork(
2329 checkpoint_seq: u64,
2330 checkpoint_digest: CheckpointDigest,
2331 checkpoint_metrics: &CheckpointMetrics,
2332 fork_recovery: Option<&ForkRecoveryConfig>,
2333 ) -> Result<()> {
2334 checkpoint_metrics
2335 .checkpoint_fork_crash_mode
2336 .with_label_values(&[
2337 &checkpoint_seq.to_string(),
2338 &Self::get_digest_prefix(checkpoint_digest),
2339 &Self::get_current_timestamp().to_string(),
2340 ])
2341 .set(1);
2342
2343 let behavior = fork_recovery
2344 .map(|fr| fr.fork_crash_behavior)
2345 .unwrap_or_default();
2346
2347 match behavior {
2348 ForkCrashBehavior::AwaitForkRecovery => {
2349 error!(
2350 checkpoint_seq = checkpoint_seq,
2351 checkpoint_digest = ?checkpoint_digest,
2352 "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2353 );
2354 futures::future::pending::<()>().await;
2355 unreachable!("pending() should never return");
2356 }
2357 ForkCrashBehavior::ReturnError => {
2358 error!(
2359 checkpoint_seq = checkpoint_seq,
2360 checkpoint_digest = ?checkpoint_digest,
2361 "Checkpoint fork detected! Returning error."
2362 );
2363 Err(anyhow::anyhow!(
2364 "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2365 checkpoint_seq,
2366 checkpoint_digest
2367 ))
2368 }
2369 }
2370 }
2371
2372 async fn handle_transaction_fork(
2373 tx_digest: TransactionDigest,
2374 expected_effects_digest: TransactionEffectsDigest,
2375 actual_effects_digest: TransactionEffectsDigest,
2376 checkpoint_metrics: &CheckpointMetrics,
2377 fork_recovery: Option<&ForkRecoveryConfig>,
2378 ) -> Result<()> {
2379 checkpoint_metrics
2380 .transaction_fork_crash_mode
2381 .with_label_values(&[
2382 &Self::get_digest_prefix(tx_digest),
2383 &Self::get_digest_prefix(expected_effects_digest),
2384 &Self::get_digest_prefix(actual_effects_digest),
2385 &Self::get_current_timestamp().to_string(),
2386 ])
2387 .set(1);
2388
2389 let behavior = fork_recovery
2390 .map(|fr| fr.fork_crash_behavior)
2391 .unwrap_or_default();
2392
2393 match behavior {
2394 ForkCrashBehavior::AwaitForkRecovery => {
2395 error!(
2396 tx_digest = ?tx_digest,
2397 expected_effects_digest = ?expected_effects_digest,
2398 actual_effects_digest = ?actual_effects_digest,
2399 "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2400 );
2401 futures::future::pending::<()>().await;
2402 unreachable!("pending() should never return");
2403 }
2404 ForkCrashBehavior::ReturnError => {
2405 error!(
2406 tx_digest = ?tx_digest,
2407 expected_effects_digest = ?expected_effects_digest,
2408 actual_effects_digest = ?actual_effects_digest,
2409 "Transaction fork detected! Returning error."
2410 );
2411 Err(anyhow::anyhow!(
2412 "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2413 tx_digest,
2414 expected_effects_digest,
2415 actual_effects_digest
2416 ))
2417 }
2418 }
2419 }
2420}
2421
2422#[cfg(not(msim))]
2423impl SuiNode {
2424 async fn fetch_jwks(
2425 _authority: AuthorityName,
2426 provider: &OIDCProvider,
2427 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2428 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2429 use sui_types::error::SuiErrorKind;
2430 let client = reqwest::Client::new();
2431 fetch_jwks(provider, &client, true)
2432 .await
2433 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2434 }
2435}
2436
2437#[cfg(msim)]
2438impl SuiNode {
2439 pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2440 self.sim_state.sim_node.id()
2441 }
2442
2443 pub fn set_safe_mode_expected(&self, new_value: bool) {
2444 info!("Setting safe mode expected to {}", new_value);
2445 self.sim_state
2446 .sim_safe_mode_expected
2447 .store(new_value, Ordering::Relaxed);
2448 }
2449
2450 #[allow(unused_variables)]
2451 async fn fetch_jwks(
2452 authority: AuthorityName,
2453 provider: &OIDCProvider,
2454 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2455 get_jwk_injector()(authority, provider)
2456 }
2457}
2458
2459enum SpawnOnce {
2460 Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2462 #[allow(unused)]
2463 Started(JoinHandle<()>),
2464}
2465
2466impl SpawnOnce {
2467 pub fn new(
2468 ready_rx: oneshot::Receiver<()>,
2469 future: impl Future<Output = ()> + Send + 'static,
2470 ) -> Self {
2471 Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2472 }
2473
2474 pub async fn start(self) -> Self {
2475 match self {
2476 Self::Unstarted(ready_rx, future) => {
2477 let future = future.into_inner();
2478 let handle = tokio::spawn(future);
2479 ready_rx.await.unwrap();
2480 Self::Started(handle)
2481 }
2482 Self::Started(_) => self,
2483 }
2484 }
2485}
2486
2487fn update_peer_addresses(
2491 config: &NodeConfig,
2492 endpoint_manager: &EndpointManager,
2493 epoch_start_state: &EpochStartSystemState,
2494 prev_epoch_start_state: Option<&EpochStartSystemState>,
2495) {
2496 if config.consensus_config().is_none() {
2497 return;
2498 }
2499 let new_peers: HashSet<PeerId> = epoch_start_state
2500 .get_validator_as_p2p_peers(config.protocol_public_key())
2501 .into_iter()
2502 .map(|(peer_id, address)| {
2503 endpoint_manager
2504 .update_endpoint(
2505 EndpointId::P2p(peer_id),
2506 AddressSource::Chain,
2507 vec![address],
2508 )
2509 .expect("Updating peer addresses should not fail");
2510 peer_id
2511 })
2512 .collect();
2513
2514 if let Some(prev) = prev_epoch_start_state {
2516 for (peer_id, _) in prev.get_validator_as_p2p_peers(config.protocol_public_key()) {
2517 if !new_peers.contains(&peer_id) {
2518 endpoint_manager
2519 .update_endpoint(EndpointId::P2p(peer_id), AddressSource::Chain, vec![])
2520 .expect("Clearing peer addresses should not fail");
2521 }
2522 }
2523 }
2524}
2525
2526fn build_kv_store(
2527 state: &Arc<AuthorityState>,
2528 config: &NodeConfig,
2529 registry: &Registry,
2530) -> Result<Arc<TransactionKeyValueStore>> {
2531 let metrics = KeyValueStoreMetrics::new(registry);
2532 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2533
2534 let base_url = &config.transaction_kv_store_read_config.base_url;
2535
2536 if base_url.is_empty() {
2537 info!("no http kv store url provided, using local db only");
2538 return Ok(Arc::new(db_store));
2539 }
2540
2541 let base_url: url::Url = base_url.parse().tap_err(|e| {
2542 error!(
2543 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2544 base_url, e
2545 )
2546 })?;
2547
2548 let network_str = match state.get_chain_identifier().chain() {
2549 Chain::Mainnet => "/mainnet",
2550 _ => {
2551 info!("using local db only for kv store");
2552 return Ok(Arc::new(db_store));
2553 }
2554 };
2555
2556 let base_url = base_url.join(network_str)?.to_string();
2557 let http_store = HttpKVStore::new_kv(
2558 &base_url,
2559 config.transaction_kv_store_read_config.cache_size,
2560 metrics.clone(),
2561 )?;
2562 info!("using local key-value store with fallback to http key-value store");
2563 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2564 db_store,
2565 http_store,
2566 metrics,
2567 "json_rpc_fallback",
2568 )))
2569}
2570
2571async fn build_json_rpc_router(
2572 state: &Arc<AuthorityState>,
2573 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2574 config: &NodeConfig,
2575 prometheus_registry: &Registry,
2576) -> Result<axum::Router> {
2577 let traffic_controller = state.traffic_controller.clone();
2578 let mut server = JsonRpcServerBuilder::new(
2579 env!("CARGO_PKG_VERSION"),
2580 prometheus_registry,
2581 traffic_controller,
2582 config.policy_config.clone(),
2583 );
2584
2585 let kv_store = build_kv_store(state, config, prometheus_registry)?;
2586
2587 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2588 server.register_module(ReadApi::new(
2589 state.clone(),
2590 kv_store.clone(),
2591 metrics.clone(),
2592 ))?;
2593 server.register_module(CoinReadApi::new(
2594 state.clone(),
2595 kv_store.clone(),
2596 metrics.clone(),
2597 ))?;
2598
2599 if config.run_with_range.is_none() {
2602 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2603 }
2604 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2605 server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2606
2607 if let Some(transaction_orchestrator) = transaction_orchestrator {
2608 server.register_module(TransactionExecutionApi::new(
2609 state.clone(),
2610 transaction_orchestrator.clone(),
2611 metrics.clone(),
2612 ))?;
2613 }
2614
2615 let name_service_config = if let (
2616 Some(package_address),
2617 Some(registry_id),
2618 Some(reverse_registry_id),
2619 ) = (
2620 config.name_service_package_address,
2621 config.name_service_registry_id,
2622 config.name_service_reverse_registry_id,
2623 ) {
2624 sui_name_service::NameServiceConfig::new(package_address, registry_id, reverse_registry_id)
2625 } else {
2626 match state.get_chain_identifier().chain() {
2627 Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2628 Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2629 Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2630 }
2631 };
2632
2633 server.register_module(IndexerApi::new(
2634 state.clone(),
2635 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2636 kv_store,
2637 name_service_config,
2638 metrics,
2639 config.indexer_max_subscriptions,
2640 ))?;
2641 server.register_module(MoveUtils::new(state.clone()))?;
2642
2643 let server_type = config.jsonrpc_server_type();
2644
2645 Ok(server.to_router(server_type).await?)
2646}
2647
2648async fn build_http_servers(
2649 state: Arc<AuthorityState>,
2650 store: RocksDbStore,
2651 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2652 config: &NodeConfig,
2653 prometheus_registry: &Registry,
2654 server_version: ServerVersion,
2655 node_role: NodeRole,
2656 embedded_rpc_store: Option<&EmbeddedRpcStore>,
2657) -> Result<(
2658 HttpServers,
2659 Option<tokio::sync::broadcast::Sender<Arc<Checkpoint>>>,
2660)> {
2661 if !node_role.should_run_rpc_servers() {
2663 return Ok((HttpServers::default(), None));
2664 }
2665
2666 info!("starting rpc service with config: {:?}", config.rpc);
2667
2668 let mut router = axum::Router::new();
2669
2670 if config.json_rpc_enabled() {
2674 router = router.merge(
2675 build_json_rpc_router(
2676 &state,
2677 transaction_orchestrator,
2678 config,
2679 prometheus_registry,
2680 )
2681 .await?,
2682 );
2683 } else {
2684 info!("json-rpc service is disabled");
2685 }
2686
2687 let indexed_checkpoint = embedded_rpc_store.map(|embedded| embedded.indexed_checkpoint_fn());
2691 let (subscription_service_checkpoint_sender, subscription_service_handle) =
2692 SubscriptionService::build(prometheus_registry, indexed_checkpoint);
2693 let rpc_router = {
2694 let reader: Arc<dyn RpcStateReader> = match embedded_rpc_store {
2698 Some(embedded) => Arc::new(RpcStoreReadStore::new(
2699 state.clone(),
2700 store,
2701 embedded.reader(),
2702 )),
2703 None => Arc::new(RestReadStore::new(state.clone(), store)),
2704 };
2705 let mut rpc_service = sui_rpc_api::RpcService::new(reader);
2706 rpc_service.with_server_version(server_version);
2707
2708 if let Some(config) = config.rpc.clone() {
2709 config.validate()?;
2710 rpc_service.with_config(config);
2711 }
2712
2713 rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2714 rpc_service.with_subscription_service(subscription_service_handle);
2715
2716 if let Some(transaction_orchestrator) = transaction_orchestrator {
2717 rpc_service.with_executor(transaction_orchestrator.clone())
2718 }
2719
2720 rpc_service.into_router().await
2721 };
2722
2723 let layers = ServiceBuilder::new()
2724 .map_request(|mut request: axum::http::Request<_>| {
2725 if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2726 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2727 request.extensions_mut().insert(axum_connect_info);
2728 }
2729 request
2730 })
2731 .layer(axum::middleware::from_fn(server_timing_middleware))
2732 .layer(
2734 tower_http::cors::CorsLayer::new()
2735 .allow_methods([http::Method::GET, http::Method::POST])
2736 .allow_origin(tower_http::cors::Any)
2737 .allow_headers(tower_http::cors::Any)
2738 .expose_headers(tower_http::cors::Any),
2739 );
2740
2741 router = router.merge(rpc_router).layer(layers);
2742
2743 let https = if let Some((tls_config, https_address)) = config
2744 .rpc()
2745 .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2746 {
2747 let https = sui_http::Builder::new()
2748 .tls_single_cert(tls_config.cert(), tls_config.key())
2749 .and_then(|builder| builder.serve(https_address, router.clone()))
2750 .map_err(|e| anyhow::anyhow!(e))?;
2751
2752 info!(
2753 https_address =? https.local_addr(),
2754 "HTTPS rpc server listening on {}",
2755 https.local_addr()
2756 );
2757
2758 Some(https)
2759 } else {
2760 None
2761 };
2762
2763 let http = sui_http::Builder::new()
2764 .serve(&config.json_rpc_address, router)
2765 .map_err(|e| anyhow::anyhow!(e))?;
2766
2767 info!(
2768 http_address =? http.local_addr(),
2769 "HTTP rpc server listening on {}",
2770 http.local_addr()
2771 );
2772
2773 Ok((
2774 HttpServers {
2775 http: Some(http),
2776 https,
2777 },
2778 Some(subscription_service_checkpoint_sender),
2779 ))
2780}
2781
2782#[derive(Default)]
2783struct HttpServers {
2784 #[allow(unused)]
2785 http: Option<sui_http::ServerHandle>,
2786 #[allow(unused)]
2787 https: Option<sui_http::ServerHandle>,
2788}
2789
2790#[cfg(test)]
2791mod tests {
2792 use super::*;
2793 use prometheus::Registry;
2794 use std::collections::BTreeMap;
2795 use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2796 use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2797 use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2798
2799 #[tokio::test]
2800 async fn test_fork_error_and_recovery_both_paths() {
2801 let checkpoint_store = CheckpointStore::new_for_tests();
2802 let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2803
2804 let seq_num = 42;
2806 let digest = CheckpointDigest::random();
2807 checkpoint_store
2808 .record_checkpoint_fork_detected(seq_num, digest)
2809 .unwrap();
2810
2811 let fork_recovery = ForkRecoveryConfig {
2812 transaction_overrides: Default::default(),
2813 checkpoint_overrides: Default::default(),
2814 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2815 };
2816
2817 let r = SuiNode::check_and_recover_forks(
2818 &checkpoint_store,
2819 &checkpoint_metrics,
2820 Some(&fork_recovery),
2821 )
2822 .await;
2823 assert!(r.is_err());
2824 assert!(
2825 r.unwrap_err()
2826 .to_string()
2827 .contains("Checkpoint fork detected")
2828 );
2829
2830 let mut checkpoint_overrides = BTreeMap::new();
2831 checkpoint_overrides.insert(seq_num, digest.to_string());
2832 let fork_recovery_with_override = ForkRecoveryConfig {
2833 transaction_overrides: Default::default(),
2834 checkpoint_overrides,
2835 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2836 };
2837 let r = SuiNode::check_and_recover_forks(
2838 &checkpoint_store,
2839 &checkpoint_metrics,
2840 Some(&fork_recovery_with_override),
2841 )
2842 .await;
2843 assert!(r.is_ok());
2844 assert!(
2845 checkpoint_store
2846 .get_checkpoint_fork_detected()
2847 .unwrap()
2848 .is_none()
2849 );
2850
2851 let tx_digest = TransactionDigest::random();
2853 let expected_effects = TransactionEffectsDigest::random();
2854 let actual_effects = TransactionEffectsDigest::random();
2855 checkpoint_store
2856 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2857 .unwrap();
2858
2859 let fork_recovery = ForkRecoveryConfig {
2860 transaction_overrides: Default::default(),
2861 checkpoint_overrides: Default::default(),
2862 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2863 };
2864 let r = SuiNode::check_and_recover_forks(
2865 &checkpoint_store,
2866 &checkpoint_metrics,
2867 Some(&fork_recovery),
2868 )
2869 .await;
2870 assert!(r.is_err());
2871 assert!(
2872 r.unwrap_err()
2873 .to_string()
2874 .contains("Transaction fork detected")
2875 );
2876
2877 let mut transaction_overrides = BTreeMap::new();
2878 transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2879 let fork_recovery_with_override = ForkRecoveryConfig {
2880 transaction_overrides,
2881 checkpoint_overrides: Default::default(),
2882 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2883 };
2884 let r = SuiNode::check_and_recover_forks(
2885 &checkpoint_store,
2886 &checkpoint_metrics,
2887 Some(&fork_recovery_with_override),
2888 )
2889 .await;
2890 assert!(r.is_ok());
2891 assert!(
2892 checkpoint_store
2893 .get_transaction_fork_detected()
2894 .unwrap()
2895 .is_none()
2896 );
2897 }
2898}