1use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::admission_queue::{
29 AdmissionQueueContext, AdmissionQueueManager, AdmissionQueueMetrics,
30};
31use sui_core::authority::ExecutionEnv;
32use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
33use sui_core::authority::backpressure::BackpressureManager;
34use sui_core::authority::epoch_start_configuration::EpochFlag;
35use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
36use sui_core::consensus_adapter::ConsensusClient;
37use sui_core::consensus_manager::UpdatableConsensusClient;
38use sui_core::epoch::randomness::RandomnessManager;
39use sui_core::execution_cache::build_execution_cache;
40use sui_core::randomness_round_receiver::{RandomnessRoundReceiver, RandomnessRoundReceiverHandle};
41use sui_network::endpoint_manager::{AddressSource, EndpointId};
42use sui_network::validator::server::SUI_TLS_SERVER_NAME;
43use sui_types::full_checkpoint_content::Checkpoint;
44use sui_types::node_role::NodeRole;
45
46use sui_core::global_state_hasher::GlobalStateHashMetrics;
47use sui_core::storage::RestReadStore;
48use sui_json_rpc::bridge_api::BridgeReadApi;
49use sui_json_rpc_api::JsonRpcMetrics;
50use sui_network::randomness;
51use sui_rpc_api::RpcMetrics;
52use sui_rpc_api::ServerVersion;
53use sui_rpc_api::subscription::SubscriptionService;
54use sui_types::base_types::ConciseableName;
55use sui_types::crypto::RandomnessRound;
56use sui_types::digests::{
57 ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
58};
59use sui_types::messages_consensus::AuthorityCapabilitiesV2;
60use sui_types::sui_system_state::SuiSystemState;
61use tap::tap::TapFallible;
62use tokio::sync::oneshot;
63use tokio::sync::{Mutex, broadcast, mpsc};
64use tokio::task::JoinHandle;
65use tower::ServiceBuilder;
66use tracing::{Instrument, error_span, info};
67use tracing::{debug, error, warn};
68
69macro_rules! jwk_log {
73 ($($arg:tt)+) => {
74 if in_test_configuration() {
75 debug!($($arg)+);
76 } else {
77 info!($($arg)+);
78 }
79 };
80}
81
82use fastcrypto_zkp::bn254::zk_login::JWK;
83pub use handle::SuiNodeHandle;
84use mysten_metrics::{RegistryService, spawn_monitored_task};
85use mysten_service::server_timing::server_timing_middleware;
86use sui_config::node::{DBCheckpointConfig, RunWithRange};
87use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
88use sui_config::node_config_metrics::NodeConfigMetrics;
89use sui_config::{ConsensusConfig, NodeConfig};
90use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
91use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
92use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
93use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
94use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
95use sui_core::authority_aggregator::AuthorityAggregator;
96use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
97use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
98use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
99use sui_core::checkpoints::{
100 CheckpointMetrics, CheckpointOutput, CheckpointService, CheckpointStore, LogCheckpointOutput,
101 SendCheckpointToStateSync, SubmitCheckpointToConsensus,
102};
103use sui_core::consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics};
104use sui_core::consensus_manager::ConsensusManager;
105use sui_core::consensus_throughput_calculator::ConsensusThroughputCalculator;
106use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
107use sui_core::db_checkpoint_handler::DBCheckpointHandler;
108use sui_core::epoch::committee_store::CommitteeStore;
109use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
110use sui_core::epoch::epoch_metrics::EpochMetrics;
111use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
112use sui_core::global_state_hasher::GlobalStateHasher;
113use sui_core::jsonrpc_index::IndexStore;
114use sui_core::module_cache_metrics::ResolverMetrics;
115use sui_core::overload_monitor::overload_monitor;
116use sui_core::rpc_index::RpcIndexStore;
117use sui_core::rpc_store_embed::EmbeddedRpcStore;
118use sui_core::signature_verifier::SignatureVerifierMetrics;
119use sui_core::storage::RocksDbStore;
120use sui_core::storage::RpcStoreReadStore;
121use sui_core::transaction_orchestrator::TransactionOrchestrator;
122use sui_core::{
123 authority::{AuthorityState, AuthorityStore},
124 authority_client::NetworkAuthorityClient,
125};
126use sui_json_rpc::JsonRpcServerBuilder;
127use sui_json_rpc::coin_api::CoinReadApi;
128use sui_json_rpc::governance_api::GovernanceReadApi;
129use sui_json_rpc::indexer_api::IndexerApi;
130use sui_json_rpc::move_utils::MoveUtils;
131use sui_json_rpc::read_api::ReadApi;
132use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
133use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
134use sui_macros::fail_point;
135use sui_macros::{fail_point_async, replay_log};
136use sui_network::api::ValidatorServer;
137use sui_network::discovery;
138use sui_network::endpoint_manager::EndpointManager;
139use sui_network::state_sync;
140use sui_network::validator::server::ServerBuilder;
141use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
142use sui_snapshot::uploader::StateSnapshotUploader;
143use sui_storage::{
144 http_key_value_store::HttpKVStore,
145 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
146 key_value_store_metrics::KeyValueStoreMetrics,
147};
148use sui_types::base_types::{AuthorityName, EpochId};
149use sui_types::committee::Committee;
150use sui_types::crypto::KeypairTraits;
151use sui_types::error::{SuiError, SuiResult};
152use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
153use sui_types::storage::RpcStateReader;
154use sui_types::sui_system_state::SuiSystemStateTrait;
155use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
156use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
157use sui_types::supported_protocol_versions::SupportedProtocolVersions;
158use typed_store::DBMetrics;
159use typed_store::rocks::default_db_options;
160
161use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
162
163pub mod admin;
164pub mod db_shell;
165mod handle;
166pub mod metrics;
167
168pub struct ValidatorComponents {
169 validator_server_handle: Option<SpawnOnce>,
170 validator_overload_monitor_handle: Option<JoinHandle<()>>,
171 consensus_manager: Arc<ConsensusManager>,
172 consensus_store_pruner: ConsensusStorePruner,
173 consensus_adapter: Arc<ConsensusAdapter>,
174 checkpoint_metrics: Arc<CheckpointMetrics>,
175 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
176 admission_queue: Option<AdmissionQueueContext>,
177}
178pub struct P2pComponents {
179 p2p_network: Network,
180 known_peers: HashMap<PeerId, String>,
181 discovery_handle: discovery::Handle,
182 state_sync_handle: state_sync::Handle,
183 randomness_handle: randomness::Handle,
184 endpoint_manager: EndpointManager,
185}
186
187#[cfg(msim)]
188mod simulator {
189 use std::sync::atomic::AtomicBool;
190 use sui_types::error::SuiErrorKind;
191
192 use super::*;
193 pub(super) struct SimState {
194 pub sim_node: sui_simulator::runtime::NodeHandle,
195 pub sim_safe_mode_expected: AtomicBool,
196 _leak_detector: sui_simulator::NodeLeakDetector,
197 }
198
199 impl Default for SimState {
200 fn default() -> Self {
201 Self {
202 sim_node: sui_simulator::runtime::NodeHandle::current(),
203 sim_safe_mode_expected: AtomicBool::new(false),
204 _leak_detector: sui_simulator::NodeLeakDetector::new(),
205 }
206 }
207 }
208
209 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
210 + Send
211 + Sync
212 + 'static;
213
214 fn default_fetch_jwks(
215 _authority: AuthorityName,
216 _provider: &OIDCProvider,
217 ) -> SuiResult<Vec<(JwkId, JWK)>> {
218 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
219 parse_jwks(
221 sui_types::zk_login_util::DEFAULT_JWK_BYTES,
222 &OIDCProvider::Twitch,
223 true,
224 )
225 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
226 }
227
228 thread_local! {
229 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
230 }
231
232 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
233 JWK_INJECTOR.with(|injector| injector.borrow().clone())
234 }
235
236 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
237 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
238 }
239}
240
241#[cfg(msim)]
242pub use simulator::set_jwk_injector;
243#[cfg(msim)]
244use simulator::*;
245use sui_core::authority::authority_store_pruner::PrunerWatermarks;
246use sui_core::{
247 consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
248};
249
250const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
251
252pub struct SuiNode {
253 config: NodeConfig,
254 validator_components: Mutex<Option<ValidatorComponents>>,
255
256 #[allow(unused)]
258 http_servers: HttpServers,
259
260 state: Arc<AuthorityState>,
261 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
262 registry_service: RegistryService,
263 metrics: Arc<SuiNodeMetrics>,
264 checkpoint_metrics: Arc<CheckpointMetrics>,
265
266 _discovery: discovery::Handle,
267 _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
268 state_sync_handle: state_sync::Handle,
269 randomness_handle: randomness::Handle,
270 checkpoint_store: Arc<CheckpointStore>,
271 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
272
273 end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
275
276 endpoint_manager: EndpointManager,
278
279 backpressure_manager: Arc<BackpressureManager>,
280
281 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
282
283 #[cfg(msim)]
284 sim_state: SimState,
285
286 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
287 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
289
290 randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
292
293 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
298
299 subscription_service_checkpoint_sender: Option<tokio::sync::broadcast::Sender<Arc<Checkpoint>>>,
300
301 embedded_rpc_store: Option<EmbeddedRpcStore>,
306}
307
308impl fmt::Debug for SuiNode {
309 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
310 f.debug_struct("SuiNode")
311 .field("name", &self.state.name.concise())
312 .finish()
313 }
314}
315
316static MAX_JWK_KEYS_PER_FETCH: usize = 100;
317
318impl SuiNode {
319 pub async fn start(
320 config: NodeConfig,
321 registry_service: RegistryService,
322 ) -> Result<Arc<SuiNode>> {
323 Self::start_async(
324 config,
325 registry_service,
326 ServerVersion::new("sui-node", "unknown"),
327 )
328 .await
329 }
330
331 fn start_jwk_updater(
332 config: &NodeConfig,
333 metrics: Arc<SuiNodeMetrics>,
334 authority: AuthorityName,
335 epoch_store: Arc<AuthorityPerEpochStore>,
336 consensus_adapter: Arc<ConsensusAdapter>,
337 ) {
338 let epoch = epoch_store.epoch();
339
340 let supported_providers = config
341 .zklogin_oauth_providers
342 .get(&epoch_store.get_chain_identifier().chain())
343 .unwrap_or(&BTreeSet::new())
344 .iter()
345 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
346 .collect::<Vec<_>>();
347
348 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
349
350 info!(
351 ?fetch_interval,
352 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
353 );
354
355 fn validate_jwk(
356 metrics: &Arc<SuiNodeMetrics>,
357 provider: &OIDCProvider,
358 id: &JwkId,
359 jwk: &JWK,
360 ) -> bool {
361 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
362 warn!(
363 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
364 id.iss, provider
365 );
366 metrics
367 .invalid_jwks
368 .with_label_values(&[&provider.to_string()])
369 .inc();
370 return false;
371 };
372
373 if iss_provider != *provider {
374 warn!(
375 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
376 id.iss, provider, iss_provider
377 );
378 metrics
379 .invalid_jwks
380 .with_label_values(&[&provider.to_string()])
381 .inc();
382 return false;
383 }
384
385 if !check_total_jwk_size(id, jwk) {
386 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
387 metrics
388 .invalid_jwks
389 .with_label_values(&[&provider.to_string()])
390 .inc();
391 return false;
392 }
393
394 true
395 }
396
397 for p in supported_providers.into_iter() {
406 let provider_str = p.to_string();
407 let epoch_store = epoch_store.clone();
408 let consensus_adapter = consensus_adapter.clone();
409 let metrics = metrics.clone();
410 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
411 async move {
412 let mut seen = HashSet::new();
415 loop {
416 jwk_log!("fetching JWK for provider {:?}", p);
417 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
418 match Self::fetch_jwks(authority, &p).await {
419 Err(e) => {
420 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
421 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
422 tokio::time::sleep(Duration::from_secs(30)).await;
424 continue;
425 }
426 Ok(mut keys) => {
427 metrics.total_jwks
428 .with_label_values(&[&provider_str])
429 .inc_by(keys.len() as u64);
430
431 keys.retain(|(id, jwk)| {
432 validate_jwk(&metrics, &p, id, jwk) &&
433 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
434 seen.insert((id.clone(), jwk.clone()))
435 });
436
437 metrics.unique_jwks
438 .with_label_values(&[&provider_str])
439 .inc_by(keys.len() as u64);
440
441 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
444 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
445 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
446 }
447
448 for (id, jwk) in keys.into_iter() {
449 jwk_log!("Submitting JWK to consensus: {:?}", id);
450
451 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
452 consensus_adapter.submit(txn, None, &epoch_store, None, None)
453 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
454 .ok();
455 }
456 }
457 }
458 tokio::time::sleep(fetch_interval).await;
459 }
460 }
461 .instrument(error_span!("jwk_updater_task", epoch)),
462 ));
463 }
464 }
465
466 pub async fn start_async(
467 config: NodeConfig,
468 registry_service: RegistryService,
469 server_version: ServerVersion,
470 ) -> Result<Arc<SuiNode>> {
471 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
472 let mut config = config.clone();
473 if config.supported_protocol_versions.is_none() {
474 info!(
475 "populating config.supported_protocol_versions with default {:?}",
476 SupportedProtocolVersions::SYSTEM_DEFAULT
477 );
478 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
479 }
480
481 let run_with_range = config.run_with_range;
482 let prometheus_registry = registry_service.default_registry();
483 let node_role = config.intended_node_role();
484
485 info!(node =? config.protocol_public_key(),
486 "Initializing sui-node listening on {} with role {:?}", config.network_address, node_role
487 );
488
489 DBMetrics::init(registry_service.clone());
491
492 typed_store::init_write_sync(config.enable_db_sync_to_disk);
494
495 mysten_metrics::init_metrics(&prometheus_registry);
497 #[cfg(not(msim))]
499 mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
500
501 let genesis = config.genesis()?.clone();
502
503 let secret = Arc::pin(config.protocol_key_pair().copy());
504 let genesis_committee = genesis.committee();
505 let committee_store = Arc::new(CommitteeStore::new(
506 config.db_path().join("epochs"),
507 &genesis_committee,
508 None,
509 ));
510
511 let pruner_watermarks = Arc::new(PrunerWatermarks::default());
512 let checkpoint_store = CheckpointStore::new(
513 &config.db_path().join("checkpoints"),
514 pruner_watermarks.clone(),
515 );
516 let checkpoint_metrics = CheckpointMetrics::new(®istry_service.default_registry());
517
518 if node_role.runs_consensus() {
519 Self::check_and_recover_forks(
520 &checkpoint_store,
521 &checkpoint_metrics,
522 config.fork_recovery.as_ref(),
523 )
524 .await?;
525 }
526
527 let enable_write_stall = config
529 .enable_db_write_stall
530 .unwrap_or(node_role.runs_consensus());
531 let enable_objects_compactor = node_role.is_validator()
538 || config.authority_store_pruning_config.num_epochs_to_retain == 0;
539 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
540 enable_write_stall,
541 enable_objects_compactor,
542 };
543 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
544 &config.db_store_path(),
545 Some(perpetual_tables_options),
546 Some(pruner_watermarks.epoch_id.clone()),
547 ));
548 let is_genesis = perpetual_tables
549 .database_is_empty()
550 .expect("Database read should not fail at init.");
551
552 let backpressure_manager =
553 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
554
555 let store =
556 AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
557
558 let cur_epoch = store.get_recovery_epoch_at_restart()?;
559 let committee = committee_store
560 .get_committee(&cur_epoch)?
561 .expect("Committee of the current epoch must exist");
562 let epoch_start_configuration = store
563 .get_epoch_start_configuration()?
564 .expect("EpochStartConfiguration of the current epoch must exist");
565 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
566 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
567
568 let cache_traits = build_execution_cache(
569 &config.execution_cache,
570 &prometheus_registry,
571 &store,
572 backpressure_manager.clone(),
573 );
574
575 let auth_agg = {
576 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
577 Arc::new(ArcSwap::new(Arc::new(
578 AuthorityAggregator::new_from_epoch_start_state(
579 epoch_start_configuration.epoch_start_state(),
580 &committee_store,
581 safe_client_metrics_base,
582 ),
583 )))
584 };
585
586 let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
587 let chain = match config.chain_override_for_testing {
588 Some(chain) => chain,
589 None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
590 };
591
592 let highest_executed_checkpoint = checkpoint_store
593 .get_highest_executed_checkpoint_seq_number()
594 .expect("checkpoint store read cannot fail")
595 .unwrap_or(0);
596
597 let previous_epoch_last_checkpoint = if cur_epoch == 0 {
598 0
599 } else {
600 checkpoint_store
601 .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
602 .expect("checkpoint store read cannot fail")
603 .unwrap_or(highest_executed_checkpoint)
604 };
605
606 let epoch_options = default_db_options().optimize_db_for_write_throughput(4, false);
607 let epoch_store = AuthorityPerEpochStore::new(
608 config.protocol_public_key(),
609 committee.clone(),
610 &config.db_store_path(),
611 Some(epoch_options.options),
612 EpochMetrics::new(®istry_service.default_registry()),
613 epoch_start_configuration,
614 cache_traits.backing_package_store.clone(),
615 cache_traits.object_store.clone(),
616 cache_metrics,
617 signature_verifier_metrics,
618 &config.expensive_safety_check_config,
619 (chain_id, chain),
620 highest_executed_checkpoint,
621 previous_epoch_last_checkpoint,
622 Arc::new(SubmittedTransactionCacheMetrics::new(
623 ®istry_service.default_registry(),
624 )),
625 config.fullnode_sync_mode,
626 )?;
627
628 info!("created epoch store");
629
630 replay_log!(
631 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
632 epoch_store.epoch(),
633 epoch_store.protocol_config()
634 );
635
636 if is_genesis {
638 info!("checking SUI conservation at genesis");
639 cache_traits
644 .reconfig_api
645 .expensive_check_sui_conservation(&epoch_store)
646 .expect("SUI conservation check cannot fail at genesis");
647 }
648
649 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
650 let default_buffer_stake = epoch_store
651 .protocol_config()
652 .buffer_stake_for_protocol_upgrade_bps();
653 if effective_buffer_stake != default_buffer_stake {
654 warn!(
655 ?effective_buffer_stake,
656 ?default_buffer_stake,
657 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
658 );
659 }
660
661 checkpoint_store.insert_genesis_checkpoint(
662 genesis.checkpoint(),
663 genesis.checkpoint_contents().clone(),
664 &epoch_store,
665 );
666
667 info!("creating state sync store");
668 let state_sync_store = RocksDbStore::new(
669 cache_traits.clone(),
670 committee_store.clone(),
671 checkpoint_store.clone(),
672 );
673
674 let index_store = if node_role.is_fullnode() && config.enable_index_processing {
675 info!("creating jsonrpc index store");
676 Some(Arc::new(IndexStore::new(
677 config.db_path().join("indexes"),
678 &prometheus_registry,
679 epoch_store
680 .protocol_config()
681 .max_move_identifier_len_as_option(),
682 config.remove_deprecated_tables,
683 )))
684 } else {
685 None
686 };
687
688 let chain_identifier = epoch_store.get_chain_identifier();
689
690 let (rpc_index, mut embedded_rpc_store) =
695 if node_role.is_fullnode() && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
696 if config
697 .rpc()
698 .is_some_and(|rpc| rpc.use_experimental_rpc_store())
699 {
700 info!("creating embedded rpc-store");
701 let ingestion_source = RocksDbStore::new(
704 cache_traits.clone(),
705 committee_store.clone(),
706 checkpoint_store.clone(),
707 );
708 let embedded_rpc_store = EmbeddedRpcStore::bootstrap(
709 &config,
710 &store,
711 &checkpoint_store,
712 ingestion_source,
713 chain_identifier,
714 &prometheus_registry,
715 )
716 .await?;
717 (None, Some(embedded_rpc_store))
718 } else {
719 info!("creating rpc index store");
720 let rpc_index = Arc::new(
721 RpcIndexStore::new(
722 &config.db_path(),
723 &store,
724 &checkpoint_store,
725 &epoch_store,
726 &cache_traits.backing_package_store,
727 config.rpc().cloned().unwrap_or_default(),
728 )
729 .await,
730 );
731 (Some(rpc_index), None)
732 }
733 } else {
734 (None, None)
735 };
736
737 info!("creating archive reader");
738 let (randomness_tx, randomness_rx) = mpsc::channel(
740 config
741 .p2p_config
742 .randomness
743 .clone()
744 .unwrap_or_default()
745 .mailbox_capacity(),
746 );
747 let P2pComponents {
748 p2p_network,
749 known_peers,
750 discovery_handle,
751 state_sync_handle,
752 randomness_handle,
753 endpoint_manager,
754 } = Self::create_p2p_network(
755 &config,
756 state_sync_store.clone(),
757 chain_identifier,
758 randomness_tx,
759 &prometheus_registry,
760 )?;
761
762 for peer in &config.p2p_config.peer_address_overrides {
764 endpoint_manager
765 .update_endpoint(
766 EndpointId::P2p(peer.peer_id),
767 AddressSource::Config,
768 peer.addresses.clone(),
769 )
770 .expect("Updating peer address overrides should not fail");
771 }
772
773 update_peer_addresses(
775 &config,
776 &endpoint_manager,
777 epoch_store.epoch_start_state(),
778 None,
779 );
780
781 info!("start snapshot upload");
782 let state_snapshot_handle = Self::start_state_snapshot(
784 &config,
785 &prometheus_registry,
786 checkpoint_store.clone(),
787 chain_identifier,
788 )?;
789
790 info!("start db checkpoint");
792 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
793 &config,
794 &prometheus_registry,
795 state_snapshot_handle.is_some(),
796 )?;
797
798 if !epoch_store
799 .protocol_config()
800 .simplified_unwrap_then_delete()
801 {
802 config
804 .authority_store_pruning_config
805 .set_killswitch_tombstone_pruning(true);
806 }
807
808 let authority_name = config.protocol_public_key();
809
810 info!("create authority state");
811 let state = AuthorityState::new(
812 authority_name,
813 secret,
814 config.supported_protocol_versions.unwrap(),
815 store.clone(),
816 cache_traits.clone(),
817 epoch_store.clone(),
818 committee_store.clone(),
819 index_store.clone(),
820 rpc_index,
821 embedded_rpc_store.as_ref().map(|embedded| embedded.store()),
822 checkpoint_store.clone(),
823 &prometheus_registry,
824 genesis.objects(),
825 &db_checkpoint_config,
826 config.clone(),
827 chain_identifier,
828 config.policy_config.clone(),
829 config.firewall_config.clone(),
830 pruner_watermarks,
831 )
832 .await;
833 if epoch_store.epoch() == 0 {
835 let txn = &genesis.transaction();
836 let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
837 let transaction =
838 sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
839 sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
840 genesis.transaction().data().clone(),
841 sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
842 ),
843 );
844 let _enter = span.enter();
845 state
846 .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
847 .unwrap();
848 }
849
850 let randomness_receiver_handle =
853 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
854
855 let (end_of_epoch_channel, end_of_epoch_receiver) =
856 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
857
858 let transaction_orchestrator = if node_role.is_fullnode() && run_with_range.is_none() {
859 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
860 auth_agg.load_full(),
861 state.clone(),
862 end_of_epoch_receiver,
863 &config.db_path(),
864 &prometheus_registry,
865 &config,
866 )))
867 } else {
868 None
869 };
870
871 let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
872 state.clone(),
873 state_sync_store,
874 &transaction_orchestrator.clone(),
875 &config,
876 &prometheus_registry,
877 server_version,
878 node_role,
879 embedded_rpc_store.as_ref(),
880 )
881 .await?;
882
883 if let Some(embedded) = embedded_rpc_store.as_mut() {
890 embedded.spawn_indexer(
891 subscription_service_checkpoint_sender.clone(),
892 prometheus_registry.clone(),
893 );
894 }
895
896 let global_state_hasher = Arc::new(GlobalStateHasher::new(
897 cache_traits.global_state_hash_store.clone(),
898 GlobalStateHashMetrics::new(&prometheus_registry),
899 ));
900
901 let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
902 "sui",
903 ®istry_service.default_registry(),
904 );
905
906 let connection_monitor_handle =
907 mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
908 p2p_network.downgrade(),
909 Arc::new(network_connection_metrics),
910 known_peers,
911 );
912
913 let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry()));
914
915 sui_node_metrics
916 .binary_max_protocol_version
917 .set(ProtocolVersion::MAX.as_u64() as i64);
918 sui_node_metrics
919 .configured_max_protocol_version
920 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
921
922 let node_role = epoch_store.node_role();
923 let validator_components = if node_role.runs_consensus() {
924 let mut components = Self::construct_validator_components(
925 config.clone(),
926 state.clone(),
927 committee,
928 epoch_store.clone(),
929 checkpoint_store.clone(),
930 state_sync_handle.clone(),
931 randomness_handle.clone(),
932 Arc::downgrade(&global_state_hasher),
933 backpressure_manager.clone(),
934 ®istry_service,
935 sui_node_metrics.clone(),
936 checkpoint_metrics.clone(),
937 node_role,
938 randomness_receiver_handle.clone(),
939 )
940 .await?;
941
942 if node_role.is_validator() {
943 components
944 .consensus_adapter
945 .recover_end_of_publish(&epoch_store);
946
947 components.validator_server_handle = Some(
949 components
950 .validator_server_handle
951 .take()
952 .unwrap()
953 .start()
954 .await,
955 );
956
957 endpoint_manager
959 .set_consensus_address_updater(components.consensus_manager.clone());
960 } else {
961 info!("Starting node as Observer — connecting to configured peers");
962 }
963
964 Some(components)
965 } else {
966 None
967 };
968
969 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
971
972 let node = Self {
973 config,
974 validator_components: Mutex::new(validator_components),
975 http_servers,
976 state,
977 transaction_orchestrator,
978 registry_service,
979 metrics: sui_node_metrics,
980 checkpoint_metrics,
981
982 _discovery: discovery_handle,
983 _connection_monitor_handle: connection_monitor_handle,
984 state_sync_handle,
985 randomness_handle,
986 checkpoint_store,
987 global_state_hasher: Mutex::new(Some(global_state_hasher)),
988 end_of_epoch_channel,
989 endpoint_manager,
990 backpressure_manager,
991
992 _db_checkpoint_handle: db_checkpoint_handle,
993
994 #[cfg(msim)]
995 sim_state: Default::default(),
996
997 _state_snapshot_uploader_handle: state_snapshot_handle,
998 shutdown_channel_tx: shutdown_channel,
999 randomness_receiver_handle,
1000
1001 auth_agg,
1002 subscription_service_checkpoint_sender,
1003 embedded_rpc_store,
1004 };
1005
1006 info!("SuiNode started!");
1007 let node = Arc::new(node);
1008 let node_copy = node.clone();
1009 spawn_monitored_task!(async move {
1010 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
1011 if let Err(error) = result {
1012 warn!("Reconfiguration finished with error {:?}", error);
1013 }
1014 });
1015
1016 Ok(node)
1017 }
1018
1019 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
1020 self.end_of_epoch_channel.subscribe()
1021 }
1022
1023 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
1024 self.shutdown_channel_tx.subscribe()
1025 }
1026
1027 pub fn current_epoch_for_testing(&self) -> EpochId {
1028 self.state.current_epoch_for_testing()
1029 }
1030
1031 pub fn db_checkpoint_path(&self) -> PathBuf {
1032 self.config.db_checkpoint_path()
1033 }
1034
1035 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
1037 info!("close_epoch (current epoch = {})", epoch_store.epoch());
1038 self.validator_components
1039 .lock()
1040 .await
1041 .as_ref()
1042 .ok_or_else(|| SuiError::from("Node is not a validator"))?
1043 .consensus_adapter
1044 .close_epoch(epoch_store);
1045 Ok(())
1046 }
1047
1048 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
1049 self.state
1050 .clear_override_protocol_upgrade_buffer_stake(epoch)
1051 }
1052
1053 pub fn set_override_protocol_upgrade_buffer_stake(
1054 &self,
1055 epoch: EpochId,
1056 buffer_stake_bps: u64,
1057 ) -> SuiResult {
1058 self.state
1059 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1060 }
1061
1062 pub async fn close_epoch_for_testing(&self) -> SuiResult {
1065 let epoch_store = self.state.epoch_store_for_testing();
1066 self.close_epoch(&epoch_store).await
1067 }
1068
1069 fn start_state_snapshot(
1070 config: &NodeConfig,
1071 prometheus_registry: &Registry,
1072 checkpoint_store: Arc<CheckpointStore>,
1073 chain_identifier: ChainIdentifier,
1074 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1075 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1076 let snapshot_uploader = StateSnapshotUploader::new(
1077 &config.db_checkpoint_path(),
1078 &config.snapshot_path(),
1079 remote_store_config.clone(),
1080 60,
1081 prometheus_registry,
1082 checkpoint_store,
1083 chain_identifier,
1084 config.state_snapshot_write_config.archive_interval_epochs,
1085 )?;
1086 Ok(Some(snapshot_uploader.start()))
1087 } else {
1088 Ok(None)
1089 }
1090 }
1091
1092 fn start_db_checkpoint(
1093 config: &NodeConfig,
1094 prometheus_registry: &Registry,
1095 state_snapshot_enabled: bool,
1096 ) -> Result<(
1097 DBCheckpointConfig,
1098 Option<tokio::sync::broadcast::Sender<()>>,
1099 )> {
1100 let checkpoint_path = Some(
1101 config
1102 .db_checkpoint_config
1103 .checkpoint_path
1104 .clone()
1105 .unwrap_or_else(|| config.db_checkpoint_path()),
1106 );
1107 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1108 DBCheckpointConfig {
1109 checkpoint_path,
1110 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1111 true
1112 } else {
1113 config
1114 .db_checkpoint_config
1115 .perform_db_checkpoints_at_epoch_end
1116 },
1117 ..config.db_checkpoint_config.clone()
1118 }
1119 } else {
1120 config.db_checkpoint_config.clone()
1121 };
1122
1123 match (
1124 db_checkpoint_config.object_store_config.as_ref(),
1125 state_snapshot_enabled,
1126 ) {
1127 (None, false) => Ok((db_checkpoint_config, None)),
1132 (_, _) => {
1133 let handler = DBCheckpointHandler::new(
1134 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1135 db_checkpoint_config.object_store_config.as_ref(),
1136 60,
1137 db_checkpoint_config
1138 .prune_and_compact_before_upload
1139 .unwrap_or(true),
1140 config.authority_store_pruning_config.clone(),
1141 prometheus_registry,
1142 state_snapshot_enabled,
1143 )?;
1144 Ok((
1145 db_checkpoint_config,
1146 Some(DBCheckpointHandler::start(handler)),
1147 ))
1148 }
1149 }
1150 }
1151
1152 fn create_p2p_network(
1153 config: &NodeConfig,
1154 state_sync_store: RocksDbStore,
1155 chain_identifier: ChainIdentifier,
1156 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1157 prometheus_registry: &Registry,
1158 ) -> Result<P2pComponents> {
1159 let mut p2p_config = config.p2p_config.clone();
1160 {
1161 let disc = p2p_config.discovery.get_or_insert_with(Default::default);
1162 if disc.peer_addr_store_path.is_none() {
1163 disc.peer_addr_store_path =
1164 Some(config.db_path().join("discovery_peer_cache.yaml"));
1165 }
1166 }
1167 let mut discovery_builder = discovery::Builder::new().config(p2p_config.clone());
1168 if let Some(consensus_config) = &config.consensus_config {
1169 let effective_addr = consensus_config
1170 .external_address
1171 .as_ref()
1172 .or(consensus_config.listen_address.as_ref());
1173 if let Some(addr) = effective_addr {
1174 discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1175 }
1176 }
1177 let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1178 let discovery_sender = discovery.sender();
1179
1180 let (state_sync, state_sync_router) = state_sync::Builder::new()
1181 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1182 .store(state_sync_store)
1183 .archive_config(config.archive_reader_config())
1184 .discovery_sender(discovery_sender)
1185 .with_metrics(prometheus_registry)
1186 .build();
1187
1188 let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1189 let known_peers: HashMap<PeerId, String> = discovery_config
1190 .allowlisted_peers
1191 .clone()
1192 .into_iter()
1193 .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1194 .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1195 peer.peer_id
1196 .map(|peer_id| (peer_id, "seed_peer".to_string()))
1197 }))
1198 .collect();
1199
1200 let (randomness, randomness_router) =
1201 randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1202 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1203 .with_metrics(prometheus_registry)
1204 .build();
1205
1206 let p2p_network = {
1207 let routes = anemo::Router::new()
1208 .add_rpc_service(discovery_server)
1209 .merge(state_sync_router);
1210 let routes = routes.merge(randomness_router);
1211
1212 let inbound_network_metrics =
1213 mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1214 let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1215 "sui",
1216 "outbound",
1217 prometheus_registry,
1218 );
1219
1220 let service = ServiceBuilder::new()
1221 .layer(
1222 TraceLayer::new_for_server_errors()
1223 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1224 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1225 )
1226 .layer(CallbackLayer::new(
1227 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1228 Arc::new(inbound_network_metrics),
1229 config.p2p_config.excessive_message_size(),
1230 ),
1231 ))
1232 .service(routes);
1233
1234 let outbound_layer = ServiceBuilder::new()
1235 .layer(
1236 TraceLayer::new_for_client_and_server_errors()
1237 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1238 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1239 )
1240 .layer(CallbackLayer::new(
1241 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1242 Arc::new(outbound_network_metrics),
1243 config.p2p_config.excessive_message_size(),
1244 ),
1245 ))
1246 .into_inner();
1247
1248 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1249 anemo_config.max_request_frame_size = Some(1 << 20);
1252 anemo_config.max_response_frame_size = Some(128 << 20);
1255
1256 let mut quic_config = anemo_config.quic.unwrap_or_default();
1259 if quic_config.socket_send_buffer_size.is_none() {
1260 quic_config.socket_send_buffer_size = Some(20 << 20);
1261 }
1262 if quic_config.socket_receive_buffer_size.is_none() {
1263 quic_config.socket_receive_buffer_size = Some(20 << 20);
1264 }
1265 quic_config.allow_failed_socket_buffer_size_setting = true;
1266
1267 if quic_config.max_concurrent_bidi_streams.is_none() {
1270 quic_config.max_concurrent_bidi_streams = Some(500);
1271 }
1272 if quic_config.max_concurrent_uni_streams.is_none() {
1273 quic_config.max_concurrent_uni_streams = Some(500);
1274 }
1275 if quic_config.stream_receive_window.is_none() {
1276 quic_config.stream_receive_window = Some(100 << 20);
1277 }
1278 if quic_config.receive_window.is_none() {
1279 quic_config.receive_window = Some(200 << 20);
1280 }
1281 if quic_config.send_window.is_none() {
1282 quic_config.send_window = Some(200 << 20);
1283 }
1284 if quic_config.crypto_buffer_size.is_none() {
1285 quic_config.crypto_buffer_size = Some(1 << 20);
1286 }
1287 if quic_config.max_idle_timeout_ms.is_none() {
1288 quic_config.max_idle_timeout_ms = Some(10_000);
1289 }
1290 if quic_config.keep_alive_interval_ms.is_none() {
1291 quic_config.keep_alive_interval_ms = Some(5_000);
1292 }
1293 anemo_config.quic = Some(quic_config);
1294
1295 let server_name = format!("sui-{}", chain_identifier);
1296 let network = Network::bind(config.p2p_config.listen_address)
1297 .server_name(&server_name)
1298 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1299 .config(anemo_config)
1300 .outbound_request_layer(outbound_layer)
1301 .start(service)?;
1302 info!(
1303 server_name = server_name,
1304 "P2p network started on {}",
1305 network.local_addr()
1306 );
1307
1308 network
1309 };
1310
1311 let discovery_handle =
1312 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1313 let state_sync_handle = state_sync.start(p2p_network.clone());
1314 let randomness_handle = randomness.start(p2p_network.clone());
1315
1316 Ok(P2pComponents {
1317 p2p_network,
1318 known_peers,
1319 discovery_handle,
1320 state_sync_handle,
1321 randomness_handle,
1322 endpoint_manager,
1323 })
1324 }
1325
1326 async fn construct_validator_components(
1327 config: NodeConfig,
1328 state: Arc<AuthorityState>,
1329 committee: Arc<Committee>,
1330 epoch_store: Arc<AuthorityPerEpochStore>,
1331 checkpoint_store: Arc<CheckpointStore>,
1332 state_sync_handle: state_sync::Handle,
1333 randomness_handle: randomness::Handle,
1334 global_state_hasher: Weak<GlobalStateHasher>,
1335 backpressure_manager: Arc<BackpressureManager>,
1336 registry_service: &RegistryService,
1337 sui_node_metrics: Arc<SuiNodeMetrics>,
1338 checkpoint_metrics: Arc<CheckpointMetrics>,
1339 node_role: NodeRole,
1340 randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
1341 ) -> Result<ValidatorComponents> {
1342 let mut config_clone = config.clone();
1343 let consensus_config = config_clone
1344 .consensus_config
1345 .as_mut()
1346 .ok_or_else(|| anyhow!("Node is missing consensus config"))?;
1347
1348 let client = Arc::new(UpdatableConsensusClient::new());
1349 let inflight_slot_freed_notify = Arc::new(tokio::sync::Notify::new());
1350 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1351 &committee,
1352 consensus_config,
1353 state.name,
1354 ®istry_service.default_registry(),
1355 client.clone(),
1356 checkpoint_store.clone(),
1357 inflight_slot_freed_notify.clone(),
1358 ));
1359
1360 let consensus_manager = Arc::new(ConsensusManager::new(
1361 &config,
1362 consensus_config,
1363 registry_service,
1364 client,
1365 node_role,
1366 ));
1367
1368 let consensus_store_pruner = ConsensusStorePruner::new(
1370 consensus_manager.get_storage_base_path(),
1371 consensus_config.db_retention_epochs(),
1372 consensus_config.db_pruner_period(),
1373 ®istry_service.default_registry(),
1374 );
1375
1376 let sui_tx_validator_metrics =
1377 SuiTxValidatorMetrics::new(®istry_service.default_registry());
1378
1379 let (validator_server_handle, admission_queue) = if node_role.is_validator() {
1380 let (handle, queue) = Self::start_grpc_validator_service(
1381 &config,
1382 state.clone(),
1383 consensus_adapter.clone(),
1384 epoch_store.clone(),
1385 ®istry_service.default_registry(),
1386 inflight_slot_freed_notify,
1387 )
1388 .await?;
1389 (Some(handle), queue)
1390 } else {
1391 (None, None)
1392 };
1393
1394 let validator_overload_monitor_handle = if node_role.is_validator()
1397 && config
1398 .authority_overload_config
1399 .max_load_shedding_percentage
1400 > 0
1401 {
1402 let authority_state = Arc::downgrade(&state);
1403 let overload_config = config.authority_overload_config.clone();
1404 fail_point!("starting_overload_monitor");
1405 Some(spawn_monitored_task!(overload_monitor(
1406 authority_state,
1407 overload_config,
1408 )))
1409 } else {
1410 None
1411 };
1412
1413 Self::start_epoch_specific_validator_components(
1414 &config,
1415 state.clone(),
1416 consensus_adapter,
1417 checkpoint_store,
1418 epoch_store,
1419 state_sync_handle,
1420 randomness_handle,
1421 randomness_receiver_handle,
1422 consensus_manager,
1423 consensus_store_pruner,
1424 global_state_hasher,
1425 backpressure_manager,
1426 validator_server_handle,
1427 validator_overload_monitor_handle,
1428 checkpoint_metrics,
1429 sui_node_metrics,
1430 sui_tx_validator_metrics,
1431 admission_queue,
1432 node_role,
1433 )
1434 .await
1435 }
1436
1437 async fn start_epoch_specific_validator_components(
1438 config: &NodeConfig,
1439 state: Arc<AuthorityState>,
1440 consensus_adapter: Arc<ConsensusAdapter>,
1441 checkpoint_store: Arc<CheckpointStore>,
1442 epoch_store: Arc<AuthorityPerEpochStore>,
1443 state_sync_handle: state_sync::Handle,
1444 randomness_handle: randomness::Handle,
1445 randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
1446 consensus_manager: Arc<ConsensusManager>,
1447 consensus_store_pruner: ConsensusStorePruner,
1448 state_hasher: Weak<GlobalStateHasher>,
1449 backpressure_manager: Arc<BackpressureManager>,
1450 validator_server_handle: Option<SpawnOnce>,
1451 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1452 checkpoint_metrics: Arc<CheckpointMetrics>,
1453 sui_node_metrics: Arc<SuiNodeMetrics>,
1454 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1455 admission_queue: Option<AdmissionQueueContext>,
1456 node_role: NodeRole,
1457 ) -> Result<ValidatorComponents> {
1458 let checkpoint_service = Self::build_checkpoint_service(
1459 config,
1460 consensus_adapter.clone(),
1461 checkpoint_store.clone(),
1462 epoch_store.clone(),
1463 state.clone(),
1464 state_sync_handle,
1465 state_hasher,
1466 checkpoint_metrics.clone(),
1467 node_role,
1468 );
1469
1470 randomness_receiver_handle.clear_public_key();
1473
1474 if node_role.runs_consensus() && epoch_store.randomness_state_enabled() {
1475 let authority_key_pair = if node_role.is_validator() {
1476 Some(config.protocol_key_pair())
1477 } else {
1478 None
1479 };
1480 let randomness_manager = RandomnessManager::try_new(
1481 Arc::downgrade(&epoch_store),
1482 Box::new(consensus_adapter.clone()),
1483 randomness_handle,
1484 authority_key_pair,
1485 randomness_receiver_handle.clone(),
1486 )
1487 .await;
1488 if let Some(randomness_manager) = randomness_manager {
1489 epoch_store
1490 .set_randomness_manager(randomness_manager)
1491 .await?;
1492 }
1493 }
1494
1495 if node_role.is_validator() {
1496 ExecutionTimeObserver::spawn(
1497 epoch_store.clone(),
1498 Box::new(consensus_adapter.clone()),
1499 config
1500 .execution_time_observer_config
1501 .clone()
1502 .unwrap_or_default(),
1503 );
1504 }
1505
1506 let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1507 None,
1508 state.metrics.clone(),
1509 ));
1510
1511 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1512 state.clone(),
1513 checkpoint_service.clone(),
1514 epoch_store.clone(),
1515 consensus_adapter.clone(),
1516 throughput_calculator,
1517 backpressure_manager,
1518 config.congestion_log.clone(),
1519 );
1520
1521 info!("Starting consensus manager asynchronously");
1522
1523 tokio::spawn({
1525 let config = config.clone();
1526 let epoch_store = epoch_store.clone();
1527 let sui_tx_validator = SuiTxValidator::new(
1528 state.clone(),
1529 epoch_store.clone(),
1530 checkpoint_service.clone(),
1531 sui_tx_validator_metrics.clone(),
1532 );
1533 let consensus_manager = consensus_manager.clone();
1534 async move {
1535 consensus_manager
1536 .start(
1537 &config,
1538 epoch_store,
1539 consensus_handler_initializer,
1540 sui_tx_validator,
1541 Some(randomness_receiver_handle),
1542 )
1543 .await;
1544 }
1545 });
1546 let replay_waiter = consensus_manager.replay_waiter();
1547
1548 info!("Spawning checkpoint service");
1549 let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1550 None
1551 } else {
1552 Some(replay_waiter)
1553 };
1554 checkpoint_service
1555 .spawn(epoch_store.clone(), replay_waiter)
1556 .await;
1557
1558 if node_role.is_validator() && epoch_store.authenticator_state_enabled() {
1559 Self::start_jwk_updater(
1560 config,
1561 sui_node_metrics,
1562 state.name,
1563 epoch_store.clone(),
1564 consensus_adapter.clone(),
1565 );
1566 }
1567
1568 if let Some(ctx) = &admission_queue {
1569 ctx.rotate_for_epoch(epoch_store);
1570 }
1571
1572 Ok(ValidatorComponents {
1573 validator_server_handle,
1574 validator_overload_monitor_handle,
1575 consensus_manager,
1576 consensus_store_pruner,
1577 consensus_adapter,
1578 checkpoint_metrics,
1579 sui_tx_validator_metrics,
1580 admission_queue,
1581 })
1582 }
1583
1584 fn build_checkpoint_service(
1585 config: &NodeConfig,
1586 consensus_adapter: Arc<ConsensusAdapter>,
1587 checkpoint_store: Arc<CheckpointStore>,
1588 epoch_store: Arc<AuthorityPerEpochStore>,
1589 state: Arc<AuthorityState>,
1590 state_sync_handle: state_sync::Handle,
1591 state_hasher: Weak<GlobalStateHasher>,
1592 checkpoint_metrics: Arc<CheckpointMetrics>,
1593 node_role: NodeRole,
1594 ) -> Arc<CheckpointService> {
1595 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1596 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1597
1598 debug!(
1599 "Starting checkpoint service with epoch start timestamp {}
1600 and epoch duration {}",
1601 epoch_start_timestamp_ms, epoch_duration_ms
1602 );
1603
1604 let checkpoint_output: Box<dyn CheckpointOutput> = if node_role.is_validator() {
1605 Box::new(SubmitCheckpointToConsensus::new(
1606 consensus_adapter,
1607 state.secret.clone(),
1608 config.protocol_public_key(),
1609 epoch_start_timestamp_ms
1610 .checked_add(epoch_duration_ms)
1611 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1612 checkpoint_metrics.clone(),
1613 ))
1614 } else {
1615 Box::new(LogCheckpointOutput::new(checkpoint_metrics.clone()))
1616 };
1617
1618 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1619
1620 CheckpointService::build(
1621 state.clone(),
1622 checkpoint_store,
1623 epoch_store,
1624 state.get_transaction_cache_reader().clone(),
1625 state_hasher,
1626 checkpoint_output,
1627 Box::new(certified_checkpoint_output),
1628 checkpoint_metrics,
1629 )
1630 }
1631
1632 fn construct_consensus_adapter(
1633 committee: &Committee,
1634 consensus_config: &ConsensusConfig,
1635 authority: AuthorityName,
1636 prometheus_registry: &Registry,
1637 consensus_client: Arc<dyn ConsensusClient>,
1638 checkpoint_store: Arc<CheckpointStore>,
1639 inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1640 ) -> ConsensusAdapter {
1641 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1642 ConsensusAdapter::new(
1645 consensus_client,
1646 checkpoint_store,
1647 authority,
1648 consensus_config.max_pending_transactions(),
1649 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1650 ca_metrics,
1651 inflight_slot_freed_notify,
1652 )
1653 }
1654
1655 async fn start_grpc_validator_service(
1656 config: &NodeConfig,
1657 state: Arc<AuthorityState>,
1658 consensus_adapter: Arc<ConsensusAdapter>,
1659 epoch_store: Arc<AuthorityPerEpochStore>,
1660 prometheus_registry: &Registry,
1661 inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1662 ) -> Result<(SpawnOnce, Option<AdmissionQueueContext>)> {
1663 let overload_config = &config.authority_overload_config;
1664 let admission_queue = overload_config.admission_queue_enabled.then(|| {
1665 let manager = Arc::new(AdmissionQueueManager::new(
1666 consensus_adapter.clone(),
1667 Arc::new(AdmissionQueueMetrics::new(prometheus_registry)),
1668 overload_config.admission_queue_capacity_fraction,
1669 overload_config.admission_queue_bypass_fraction,
1670 overload_config.admission_queue_failover_timeout,
1671 inflight_slot_freed_notify,
1672 ));
1673 AdmissionQueueContext::spawn(manager, epoch_store)
1674 });
1675 let validator_service = ValidatorService::new(
1676 state.clone(),
1677 consensus_adapter,
1678 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1679 config.policy_config.clone().map(|p| p.client_id_source),
1680 admission_queue.clone(),
1681 );
1682
1683 let mut server_conf = mysten_network::config::Config::new();
1684 server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1685 server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1686 server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1687 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1688 server_conf.load_shed = config.grpc_load_shed;
1689 let mut server_builder =
1690 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1691
1692 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1693
1694 let tls_config = sui_tls::create_rustls_server_config(
1695 config.network_key_pair().copy().private(),
1696 SUI_TLS_SERVER_NAME.to_string(),
1697 );
1698
1699 let network_address = config.network_address().clone();
1700
1701 let (ready_tx, ready_rx) = oneshot::channel();
1702
1703 let spawn_once = SpawnOnce::new(ready_rx, async move {
1704 let server = server_builder
1705 .bind(&network_address, Some(tls_config))
1706 .await
1707 .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1708 let local_addr = server.local_addr();
1709 info!("Listening to traffic on {local_addr}");
1710 ready_tx.send(()).unwrap();
1711 if let Err(err) = server.serve().await {
1712 info!("Server stopped: {err}");
1713 }
1714 info!("Server stopped");
1715 });
1716 Ok((spawn_once, admission_queue))
1717 }
1718
1719 pub fn state(&self) -> Arc<AuthorityState> {
1720 self.state.clone()
1721 }
1722
1723 pub fn embedded_rpc_store(&self) -> Option<&EmbeddedRpcStore> {
1729 self.embedded_rpc_store.as_ref()
1730 }
1731
1732 #[cfg(any(test, msim))]
1733 pub fn connection_monitor_handle_for_testing(
1734 &self,
1735 ) -> &mysten_network::anemo_connection_monitor::ConnectionMonitorHandle {
1736 &self._connection_monitor_handle
1737 }
1738
1739 pub fn node_role(&self) -> NodeRole {
1740 self.state.load_epoch_store_one_call_per_task().node_role()
1741 }
1742
1743 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1745 self.state.reference_gas_price_for_testing()
1746 }
1747
1748 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1749 self.state.committee_store().clone()
1750 }
1751
1752 pub fn clone_checkpoint_store(&self) -> Arc<CheckpointStore> {
1753 self.checkpoint_store.clone()
1754 }
1755
1756 pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1757 self.state.authority_store()
1758 }
1759
1760 pub fn clone_consensus_store(
1761 &self,
1762 ) -> Option<Arc<consensus_core::storage::rocksdb_store::RocksDBStore>> {
1763 self.validator_components
1764 .try_lock()
1765 .ok()?
1766 .as_ref()?
1767 .consensus_manager
1768 .consensus_store()
1769 }
1770
1771 pub fn clone_authority_aggregator(
1776 &self,
1777 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1778 self.transaction_orchestrator
1779 .as_ref()
1780 .map(|to| to.clone_authority_aggregator())
1781 }
1782
1783 pub fn transaction_orchestrator(
1784 &self,
1785 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1786 self.transaction_orchestrator.clone()
1787 }
1788
1789 pub async fn monitor_reconfiguration(
1792 self: Arc<Self>,
1793 mut epoch_store: Arc<AuthorityPerEpochStore>,
1794 ) -> Result<()> {
1795 let checkpoint_executor_metrics =
1796 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1797
1798 loop {
1799 let mut hasher_guard = self.global_state_hasher.lock().await;
1800 let hasher = hasher_guard.take().unwrap();
1801 info!(
1802 "Creating checkpoint executor for epoch {}",
1803 epoch_store.epoch()
1804 );
1805 let checkpoint_executor = CheckpointExecutor::new(
1806 epoch_store.clone(),
1807 self.checkpoint_store.clone(),
1808 self.state.clone(),
1809 hasher.clone(),
1810 self.backpressure_manager.clone(),
1811 self.config.checkpoint_executor_config.clone(),
1812 checkpoint_executor_metrics.clone(),
1813 self.subscription_service_checkpoint_sender.clone(),
1814 );
1815
1816 let run_with_range = self.config.run_with_range;
1817
1818 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1819
1820 self.metrics
1822 .current_protocol_version
1823 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1824
1825 if let Some(components) = &*self.validator_components.lock().await
1828 && cur_epoch_store.is_validator()
1829 {
1830 tokio::time::sleep(Duration::from_millis(1)).await;
1832
1833 let config = cur_epoch_store.protocol_config();
1834 let mut supported_protocol_versions = self
1835 .config
1836 .supported_protocol_versions
1837 .expect("Supported versions should be populated")
1838 .truncate_below(config.version);
1840
1841 while supported_protocol_versions.max > config.version {
1842 let proposed_protocol_config = ProtocolConfig::get_for_version(
1843 supported_protocol_versions.max,
1844 cur_epoch_store.get_chain(),
1845 );
1846
1847 if proposed_protocol_config.enable_accumulators()
1848 && !epoch_store.accumulator_root_exists()
1849 {
1850 error!(
1851 "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1852 supported_protocol_versions.max
1853 );
1854 supported_protocol_versions.max = supported_protocol_versions.max.prev();
1855 } else {
1856 break;
1857 }
1858 }
1859
1860 let binary_config = config.binary_config(None);
1861 let transaction = ConsensusTransaction::new_capability_notification_v2(
1862 AuthorityCapabilitiesV2::new(
1863 self.state.name,
1864 cur_epoch_store.get_chain_identifier().chain(),
1865 supported_protocol_versions,
1866 self.state
1867 .get_available_system_packages(&binary_config)
1868 .await,
1869 ),
1870 );
1871 info!(?transaction, "submitting capabilities to consensus");
1872 components.consensus_adapter.submit(
1873 transaction,
1874 None,
1875 &cur_epoch_store,
1876 None,
1877 None,
1878 )?;
1879 }
1880
1881 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1882
1883 if stop_condition == StopReason::RunWithRangeCondition {
1884 SuiNode::shutdown(&self).await;
1885 self.shutdown_channel_tx
1886 .send(run_with_range)
1887 .expect("RunWithRangeCondition met but failed to send shutdown message");
1888 return Ok(());
1889 }
1890
1891 let latest_system_state = self
1893 .state
1894 .get_object_cache_reader()
1895 .get_sui_system_state_object_unsafe()
1896 .expect("Read Sui System State object cannot fail");
1897
1898 #[cfg(msim)]
1899 if !self
1900 .sim_state
1901 .sim_safe_mode_expected
1902 .load(Ordering::Relaxed)
1903 {
1904 debug_assert!(!latest_system_state.safe_mode());
1905 }
1906
1907 #[cfg(not(msim))]
1908 debug_assert!(!latest_system_state.safe_mode());
1909
1910 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1911 && self.state.is_fullnode(&cur_epoch_store)
1912 {
1913 warn!(
1914 "Failed to send end of epoch notification to subscriber: {:?}",
1915 err
1916 );
1917 }
1918
1919 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1920 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1921
1922 self.auth_agg.store(Arc::new(
1923 self.auth_agg
1924 .load()
1925 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1926 ));
1927
1928 let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1929 let next_epoch = next_epoch_committee.epoch();
1930 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1931
1932 info!(
1933 next_epoch,
1934 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1935 );
1936
1937 fail_point_async!("reconfig_delay");
1938
1939 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1940
1941 update_peer_addresses(
1942 &self.config,
1943 &self.endpoint_manager,
1944 &new_epoch_start_state,
1945 Some(cur_epoch_store.epoch_start_state()),
1946 );
1947
1948 let mut validator_components_lock_guard = self.validator_components.lock().await;
1949
1950 let new_epoch_store = self
1954 .reconfigure_state(
1955 &self.state,
1956 &cur_epoch_store,
1957 next_epoch_committee.clone(),
1958 new_epoch_start_state,
1959 hasher.clone(),
1960 )
1961 .await;
1962
1963 let new_role = new_epoch_store.node_role();
1964
1965 let new_validator_components = if let Some(ValidatorComponents {
1966 validator_server_handle,
1967 validator_overload_monitor_handle,
1968 consensus_manager,
1969 consensus_store_pruner,
1970 consensus_adapter,
1971 checkpoint_metrics,
1972 sui_tx_validator_metrics,
1973 admission_queue,
1974 }) = validator_components_lock_guard.take()
1975 {
1976 info!("Reconfiguring node (was running consensus).");
1977
1978 consensus_manager.shutdown().await;
1979 info!("Consensus has shut down.");
1980
1981 info!("Epoch store finished reconfiguration.");
1982
1983 let global_state_hasher_metrics = Arc::into_inner(hasher)
1986 .expect("Object state hasher should have no other references at this point")
1987 .metrics();
1988 let new_hasher = Arc::new(GlobalStateHasher::new(
1989 self.state.get_global_state_hash_store().clone(),
1990 global_state_hasher_metrics,
1991 ));
1992 let weak_hasher = Arc::downgrade(&new_hasher);
1993 *hasher_guard = Some(new_hasher);
1994
1995 consensus_store_pruner.prune(next_epoch).await;
1996
1997 if new_role.runs_consensus() {
1998 info!("Restarting consensus as {new_role}");
1999 Some(
2000 Self::start_epoch_specific_validator_components(
2001 &self.config,
2002 self.state.clone(),
2003 consensus_adapter,
2004 self.checkpoint_store.clone(),
2005 new_epoch_store.clone(),
2006 self.state_sync_handle.clone(),
2007 self.randomness_handle.clone(),
2008 self.randomness_receiver_handle.clone(),
2009 consensus_manager,
2010 consensus_store_pruner,
2011 weak_hasher,
2012 self.backpressure_manager.clone(),
2013 validator_server_handle,
2014 validator_overload_monitor_handle,
2015 checkpoint_metrics,
2016 self.metrics.clone(),
2017 sui_tx_validator_metrics,
2018 admission_queue,
2019 new_role,
2020 )
2021 .await?,
2022 )
2023 } else {
2024 info!(
2025 "This node has new role {new_role} and no longer runs consensus after reconfiguration"
2026 );
2027 None
2028 }
2029 } else {
2030 let global_state_hasher_metrics = Arc::into_inner(hasher)
2033 .expect("Object state hasher should have no other references at this point")
2034 .metrics();
2035 let new_hasher = Arc::new(GlobalStateHasher::new(
2036 self.state.get_global_state_hash_store().clone(),
2037 global_state_hasher_metrics,
2038 ));
2039 let weak_hasher = Arc::downgrade(&new_hasher);
2040 *hasher_guard = Some(new_hasher);
2041
2042 if new_role.runs_consensus() {
2043 info!("Promoting node to {new_role}, starting consensus components");
2044
2045 let mut components = Self::construct_validator_components(
2046 self.config.clone(),
2047 self.state.clone(),
2048 Arc::new(next_epoch_committee.clone()),
2049 new_epoch_store.clone(),
2050 self.checkpoint_store.clone(),
2051 self.state_sync_handle.clone(),
2052 self.randomness_handle.clone(),
2053 weak_hasher,
2054 self.backpressure_manager.clone(),
2055 &self.registry_service,
2056 self.metrics.clone(),
2057 self.checkpoint_metrics.clone(),
2058 new_role,
2059 self.randomness_receiver_handle.clone(),
2060 )
2061 .await?;
2062
2063 if new_role.is_validator() {
2064 components.validator_server_handle = Some(
2065 components
2066 .validator_server_handle
2067 .take()
2068 .unwrap()
2069 .start()
2070 .await,
2071 );
2072
2073 self.endpoint_manager
2074 .set_consensus_address_updater(components.consensus_manager.clone());
2075 }
2076
2077 Some(components)
2078 } else {
2079 None
2080 }
2081 };
2082 *validator_components_lock_guard = new_validator_components;
2083
2084 cur_epoch_store.release_db_handles();
2087
2088 if cfg!(msim)
2089 && !matches!(
2090 self.config
2091 .authority_store_pruning_config
2092 .num_epochs_to_retain_for_checkpoints(),
2093 None | Some(u64::MAX) | Some(0)
2094 )
2095 {
2096 self.state
2097 .prune_checkpoints_for_eligible_epochs_for_testing(
2098 self.config.clone(),
2099 sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2100 )
2101 .await?;
2102 }
2103
2104 epoch_store = new_epoch_store;
2105 info!("Reconfiguration finished");
2106 }
2107 }
2108
2109 async fn shutdown(&self) {
2110 if let Some(validator_components) = &*self.validator_components.lock().await {
2111 validator_components.consensus_manager.shutdown().await;
2112 }
2113 }
2114
2115 async fn reconfigure_state(
2116 &self,
2117 state: &Arc<AuthorityState>,
2118 cur_epoch_store: &AuthorityPerEpochStore,
2119 next_epoch_committee: Committee,
2120 next_epoch_start_system_state: EpochStartSystemState,
2121 global_state_hasher: Arc<GlobalStateHasher>,
2122 ) -> Arc<AuthorityPerEpochStore> {
2123 let next_epoch = next_epoch_committee.epoch();
2124
2125 let last_checkpoint = self
2126 .checkpoint_store
2127 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2128 .expect("Error loading last checkpoint for current epoch")
2129 .expect("Could not load last checkpoint for current epoch");
2130
2131 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2132
2133 assert_eq!(
2134 Some(last_checkpoint_seq),
2135 self.checkpoint_store
2136 .get_highest_executed_checkpoint_seq_number()
2137 .expect("Error loading highest executed checkpoint sequence number")
2138 );
2139
2140 let epoch_start_configuration = EpochStartConfiguration::new(
2141 next_epoch_start_system_state,
2142 *last_checkpoint.digest(),
2143 state.get_object_store().as_ref(),
2144 EpochFlag::default_flags_for_new_epoch(&state.config),
2145 )
2146 .expect("EpochStartConfiguration construction cannot fail");
2147
2148 let new_epoch_store = self
2149 .state
2150 .reconfigure(
2151 cur_epoch_store,
2152 self.config.supported_protocol_versions.unwrap(),
2153 next_epoch_committee,
2154 epoch_start_configuration,
2155 global_state_hasher,
2156 &self.config.expensive_safety_check_config,
2157 last_checkpoint_seq,
2158 )
2159 .await
2160 .expect("Reconfigure authority state cannot fail");
2161 info!(next_epoch, "Node State has been reconfigured");
2162 assert_eq!(next_epoch, new_epoch_store.epoch());
2163 self.state.get_reconfig_api().update_epoch_flags_metrics(
2164 cur_epoch_store.epoch_start_config().flags(),
2165 new_epoch_store.epoch_start_config().flags(),
2166 );
2167
2168 new_epoch_store
2169 }
2170
2171 pub fn get_config(&self) -> &NodeConfig {
2172 &self.config
2173 }
2174
2175 pub fn randomness_handle(&self) -> randomness::Handle {
2176 self.randomness_handle.clone()
2177 }
2178
2179 pub fn state_sync_handle(&self) -> state_sync::Handle {
2180 self.state_sync_handle.clone()
2181 }
2182
2183 pub fn endpoint_manager(&self) -> &EndpointManager {
2184 &self.endpoint_manager
2185 }
2186
2187 fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2189 let digest_str = digest.to_string();
2190 if digest_str.len() >= 8 {
2191 digest_str[0..8].to_string()
2192 } else {
2193 digest_str
2194 }
2195 }
2196
2197 async fn check_and_recover_forks(
2201 checkpoint_store: &CheckpointStore,
2202 checkpoint_metrics: &CheckpointMetrics,
2203 fork_recovery: Option<&ForkRecoveryConfig>,
2204 ) -> Result<()> {
2205 if let Some(recovery) = fork_recovery {
2207 Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2208 Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2209 }
2210
2211 if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2212 .get_checkpoint_fork_detected()
2213 .map_err(|e| {
2214 error!("Failed to check for checkpoint fork: {:?}", e);
2215 e
2216 })?
2217 {
2218 Self::handle_checkpoint_fork(
2219 checkpoint_seq,
2220 checkpoint_digest,
2221 checkpoint_metrics,
2222 fork_recovery,
2223 )
2224 .await?;
2225 }
2226 if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2227 .get_transaction_fork_detected()
2228 .map_err(|e| {
2229 error!("Failed to check for transaction fork: {:?}", e);
2230 e
2231 })?
2232 {
2233 Self::handle_transaction_fork(
2234 tx_digest,
2235 expected_effects,
2236 actual_effects,
2237 checkpoint_metrics,
2238 fork_recovery,
2239 )
2240 .await?;
2241 }
2242
2243 Ok(())
2244 }
2245
2246 fn try_recover_checkpoint_fork(
2247 checkpoint_store: &CheckpointStore,
2248 recovery: &ForkRecoveryConfig,
2249 ) -> Result<()> {
2250 for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2253 let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2254 anyhow::bail!(
2255 "Invalid checkpoint digest override for seq {}: {}",
2256 seq,
2257 expected_digest_str
2258 );
2259 };
2260
2261 if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2262 let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2263 if local_digest != expected_digest {
2264 info!(
2265 seq,
2266 local = %Self::get_digest_prefix(local_digest),
2267 expected = %Self::get_digest_prefix(expected_digest),
2268 "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2269 seq
2270 );
2271 checkpoint_store
2272 .clear_locally_computed_checkpoints_from(*seq)
2273 .context(
2274 "Failed to clear locally computed checkpoints from override seq",
2275 )?;
2276 }
2277 }
2278 }
2279
2280 if let Some((checkpoint_seq, checkpoint_digest)) =
2281 checkpoint_store.get_checkpoint_fork_detected()?
2282 && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2283 {
2284 info!(
2285 "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2286 checkpoint_seq, checkpoint_digest
2287 );
2288 checkpoint_store
2289 .clear_checkpoint_fork_detected()
2290 .expect("Failed to clear checkpoint fork detected marker");
2291 }
2292 Ok(())
2293 }
2294
2295 fn try_recover_transaction_fork(
2296 checkpoint_store: &CheckpointStore,
2297 recovery: &ForkRecoveryConfig,
2298 ) -> Result<()> {
2299 if recovery.transaction_overrides.is_empty() {
2300 return Ok(());
2301 }
2302
2303 if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2304 && recovery
2305 .transaction_overrides
2306 .contains_key(&tx_digest.to_string())
2307 {
2308 info!(
2309 "Fork recovery enabled: clearing transaction fork for tx {:?}",
2310 tx_digest
2311 );
2312 checkpoint_store
2313 .clear_transaction_fork_detected()
2314 .expect("Failed to clear transaction fork detected marker");
2315 }
2316 Ok(())
2317 }
2318
2319 fn get_current_timestamp() -> u64 {
2320 std::time::SystemTime::now()
2321 .duration_since(std::time::SystemTime::UNIX_EPOCH)
2322 .unwrap()
2323 .as_secs()
2324 }
2325
2326 async fn handle_checkpoint_fork(
2327 checkpoint_seq: u64,
2328 checkpoint_digest: CheckpointDigest,
2329 checkpoint_metrics: &CheckpointMetrics,
2330 fork_recovery: Option<&ForkRecoveryConfig>,
2331 ) -> Result<()> {
2332 checkpoint_metrics
2333 .checkpoint_fork_crash_mode
2334 .with_label_values(&[
2335 &checkpoint_seq.to_string(),
2336 &Self::get_digest_prefix(checkpoint_digest),
2337 &Self::get_current_timestamp().to_string(),
2338 ])
2339 .set(1);
2340
2341 let behavior = fork_recovery
2342 .map(|fr| fr.fork_crash_behavior)
2343 .unwrap_or_default();
2344
2345 match behavior {
2346 ForkCrashBehavior::AwaitForkRecovery => {
2347 error!(
2348 checkpoint_seq = checkpoint_seq,
2349 checkpoint_digest = ?checkpoint_digest,
2350 "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2351 );
2352 futures::future::pending::<()>().await;
2353 unreachable!("pending() should never return");
2354 }
2355 ForkCrashBehavior::ReturnError => {
2356 error!(
2357 checkpoint_seq = checkpoint_seq,
2358 checkpoint_digest = ?checkpoint_digest,
2359 "Checkpoint fork detected! Returning error."
2360 );
2361 Err(anyhow::anyhow!(
2362 "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2363 checkpoint_seq,
2364 checkpoint_digest
2365 ))
2366 }
2367 }
2368 }
2369
2370 async fn handle_transaction_fork(
2371 tx_digest: TransactionDigest,
2372 expected_effects_digest: TransactionEffectsDigest,
2373 actual_effects_digest: TransactionEffectsDigest,
2374 checkpoint_metrics: &CheckpointMetrics,
2375 fork_recovery: Option<&ForkRecoveryConfig>,
2376 ) -> Result<()> {
2377 checkpoint_metrics
2378 .transaction_fork_crash_mode
2379 .with_label_values(&[
2380 &Self::get_digest_prefix(tx_digest),
2381 &Self::get_digest_prefix(expected_effects_digest),
2382 &Self::get_digest_prefix(actual_effects_digest),
2383 &Self::get_current_timestamp().to_string(),
2384 ])
2385 .set(1);
2386
2387 let behavior = fork_recovery
2388 .map(|fr| fr.fork_crash_behavior)
2389 .unwrap_or_default();
2390
2391 match behavior {
2392 ForkCrashBehavior::AwaitForkRecovery => {
2393 error!(
2394 tx_digest = ?tx_digest,
2395 expected_effects_digest = ?expected_effects_digest,
2396 actual_effects_digest = ?actual_effects_digest,
2397 "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2398 );
2399 futures::future::pending::<()>().await;
2400 unreachable!("pending() should never return");
2401 }
2402 ForkCrashBehavior::ReturnError => {
2403 error!(
2404 tx_digest = ?tx_digest,
2405 expected_effects_digest = ?expected_effects_digest,
2406 actual_effects_digest = ?actual_effects_digest,
2407 "Transaction fork detected! Returning error."
2408 );
2409 Err(anyhow::anyhow!(
2410 "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2411 tx_digest,
2412 expected_effects_digest,
2413 actual_effects_digest
2414 ))
2415 }
2416 }
2417 }
2418}
2419
2420#[cfg(not(msim))]
2421impl SuiNode {
2422 async fn fetch_jwks(
2423 _authority: AuthorityName,
2424 provider: &OIDCProvider,
2425 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2426 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2427 use sui_types::error::SuiErrorKind;
2428 let client = reqwest::Client::new();
2429 fetch_jwks(provider, &client, true)
2430 .await
2431 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2432 }
2433}
2434
2435#[cfg(msim)]
2436impl SuiNode {
2437 pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2438 self.sim_state.sim_node.id()
2439 }
2440
2441 pub fn set_safe_mode_expected(&self, new_value: bool) {
2442 info!("Setting safe mode expected to {}", new_value);
2443 self.sim_state
2444 .sim_safe_mode_expected
2445 .store(new_value, Ordering::Relaxed);
2446 }
2447
2448 #[allow(unused_variables)]
2449 async fn fetch_jwks(
2450 authority: AuthorityName,
2451 provider: &OIDCProvider,
2452 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2453 get_jwk_injector()(authority, provider)
2454 }
2455}
2456
2457enum SpawnOnce {
2458 Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2460 #[allow(unused)]
2461 Started(JoinHandle<()>),
2462}
2463
2464impl SpawnOnce {
2465 pub fn new(
2466 ready_rx: oneshot::Receiver<()>,
2467 future: impl Future<Output = ()> + Send + 'static,
2468 ) -> Self {
2469 Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2470 }
2471
2472 pub async fn start(self) -> Self {
2473 match self {
2474 Self::Unstarted(ready_rx, future) => {
2475 let future = future.into_inner();
2476 let handle = tokio::spawn(future);
2477 ready_rx.await.unwrap();
2478 Self::Started(handle)
2479 }
2480 Self::Started(_) => self,
2481 }
2482 }
2483}
2484
2485fn update_peer_addresses(
2489 config: &NodeConfig,
2490 endpoint_manager: &EndpointManager,
2491 epoch_start_state: &EpochStartSystemState,
2492 prev_epoch_start_state: Option<&EpochStartSystemState>,
2493) {
2494 if config.consensus_config().is_none() {
2495 return;
2496 }
2497 let new_peers: HashSet<PeerId> = epoch_start_state
2498 .get_validator_as_p2p_peers(config.protocol_public_key())
2499 .into_iter()
2500 .map(|(peer_id, address)| {
2501 endpoint_manager
2502 .update_endpoint(
2503 EndpointId::P2p(peer_id),
2504 AddressSource::Chain,
2505 vec![address],
2506 )
2507 .expect("Updating peer addresses should not fail");
2508 peer_id
2509 })
2510 .collect();
2511
2512 if let Some(prev) = prev_epoch_start_state {
2514 for (peer_id, _) in prev.get_validator_as_p2p_peers(config.protocol_public_key()) {
2515 if !new_peers.contains(&peer_id) {
2516 endpoint_manager
2517 .update_endpoint(EndpointId::P2p(peer_id), AddressSource::Chain, vec![])
2518 .expect("Clearing peer addresses should not fail");
2519 }
2520 }
2521 }
2522}
2523
2524fn build_kv_store(
2525 state: &Arc<AuthorityState>,
2526 config: &NodeConfig,
2527 registry: &Registry,
2528) -> Result<Arc<TransactionKeyValueStore>> {
2529 let metrics = KeyValueStoreMetrics::new(registry);
2530 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2531
2532 let base_url = &config.transaction_kv_store_read_config.base_url;
2533
2534 if base_url.is_empty() {
2535 info!("no http kv store url provided, using local db only");
2536 return Ok(Arc::new(db_store));
2537 }
2538
2539 let base_url: url::Url = base_url.parse().tap_err(|e| {
2540 error!(
2541 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2542 base_url, e
2543 )
2544 })?;
2545
2546 let network_str = match state.get_chain_identifier().chain() {
2547 Chain::Mainnet => "/mainnet",
2548 _ => {
2549 info!("using local db only for kv store");
2550 return Ok(Arc::new(db_store));
2551 }
2552 };
2553
2554 let base_url = base_url.join(network_str)?.to_string();
2555 let http_store = HttpKVStore::new_kv(
2556 &base_url,
2557 config.transaction_kv_store_read_config.cache_size,
2558 metrics.clone(),
2559 )?;
2560 info!("using local key-value store with fallback to http key-value store");
2561 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2562 db_store,
2563 http_store,
2564 metrics,
2565 "json_rpc_fallback",
2566 )))
2567}
2568
2569async fn build_json_rpc_router(
2570 state: &Arc<AuthorityState>,
2571 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2572 config: &NodeConfig,
2573 prometheus_registry: &Registry,
2574) -> Result<axum::Router> {
2575 let traffic_controller = state.traffic_controller.clone();
2576 let mut server = JsonRpcServerBuilder::new(
2577 env!("CARGO_PKG_VERSION"),
2578 prometheus_registry,
2579 traffic_controller,
2580 config.policy_config.clone(),
2581 );
2582
2583 let kv_store = build_kv_store(state, config, prometheus_registry)?;
2584
2585 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2586 server.register_module(ReadApi::new(
2587 state.clone(),
2588 kv_store.clone(),
2589 metrics.clone(),
2590 ))?;
2591 server.register_module(CoinReadApi::new(
2592 state.clone(),
2593 kv_store.clone(),
2594 metrics.clone(),
2595 ))?;
2596
2597 if config.run_with_range.is_none() {
2600 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2601 }
2602 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2603 server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2604
2605 if let Some(transaction_orchestrator) = transaction_orchestrator {
2606 server.register_module(TransactionExecutionApi::new(
2607 state.clone(),
2608 transaction_orchestrator.clone(),
2609 metrics.clone(),
2610 ))?;
2611 }
2612
2613 let name_service_config = if let (
2614 Some(package_address),
2615 Some(registry_id),
2616 Some(reverse_registry_id),
2617 ) = (
2618 config.name_service_package_address,
2619 config.name_service_registry_id,
2620 config.name_service_reverse_registry_id,
2621 ) {
2622 sui_name_service::NameServiceConfig::new(package_address, registry_id, reverse_registry_id)
2623 } else {
2624 match state.get_chain_identifier().chain() {
2625 Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2626 Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2627 Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2628 }
2629 };
2630
2631 server.register_module(IndexerApi::new(
2632 state.clone(),
2633 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2634 kv_store,
2635 name_service_config,
2636 metrics,
2637 config.indexer_max_subscriptions,
2638 ))?;
2639 server.register_module(MoveUtils::new(state.clone()))?;
2640
2641 let server_type = config.jsonrpc_server_type();
2642
2643 Ok(server.to_router(server_type).await?)
2644}
2645
2646async fn build_http_servers(
2647 state: Arc<AuthorityState>,
2648 store: RocksDbStore,
2649 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2650 config: &NodeConfig,
2651 prometheus_registry: &Registry,
2652 server_version: ServerVersion,
2653 node_role: NodeRole,
2654 embedded_rpc_store: Option<&EmbeddedRpcStore>,
2655) -> Result<(
2656 HttpServers,
2657 Option<tokio::sync::broadcast::Sender<Arc<Checkpoint>>>,
2658)> {
2659 if !node_role.is_fullnode() {
2661 return Ok((HttpServers::default(), None));
2662 }
2663
2664 info!("starting rpc service with config: {:?}", config.rpc);
2665
2666 let mut router = axum::Router::new();
2667
2668 if config.json_rpc_enabled() {
2672 router = router.merge(
2673 build_json_rpc_router(
2674 &state,
2675 transaction_orchestrator,
2676 config,
2677 prometheus_registry,
2678 )
2679 .await?,
2680 );
2681 } else {
2682 info!("json-rpc service is disabled");
2683 }
2684
2685 let indexed_checkpoint = embedded_rpc_store.map(|embedded| embedded.indexed_checkpoint_fn());
2689 let (subscription_service_checkpoint_sender, subscription_service_handle) =
2690 SubscriptionService::build(prometheus_registry, indexed_checkpoint);
2691 let rpc_router = {
2692 let reader: Arc<dyn RpcStateReader> = match embedded_rpc_store {
2696 Some(embedded) => Arc::new(RpcStoreReadStore::new(
2697 state.clone(),
2698 store,
2699 embedded.reader(),
2700 )),
2701 None => Arc::new(RestReadStore::new(state.clone(), store)),
2702 };
2703 let mut rpc_service = sui_rpc_api::RpcService::new(reader);
2704 rpc_service.with_server_version(server_version);
2705
2706 if let Some(config) = config.rpc.clone() {
2707 config.validate()?;
2708 rpc_service.with_config(config);
2709 }
2710
2711 rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2712 rpc_service.with_subscription_service(subscription_service_handle);
2713
2714 if let Some(transaction_orchestrator) = transaction_orchestrator {
2715 rpc_service.with_executor(transaction_orchestrator.clone())
2716 }
2717
2718 rpc_service.into_router().await
2719 };
2720
2721 let layers = ServiceBuilder::new()
2722 .map_request(|mut request: axum::http::Request<_>| {
2723 if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2724 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2725 request.extensions_mut().insert(axum_connect_info);
2726 }
2727 request
2728 })
2729 .layer(axum::middleware::from_fn(server_timing_middleware))
2730 .layer(
2732 tower_http::cors::CorsLayer::new()
2733 .allow_methods([http::Method::GET, http::Method::POST])
2734 .allow_origin(tower_http::cors::Any)
2735 .allow_headers(tower_http::cors::Any)
2736 .expose_headers(tower_http::cors::Any),
2737 );
2738
2739 router = router.merge(rpc_router).layer(layers);
2740
2741 let https = if let Some((tls_config, https_address)) = config
2742 .rpc()
2743 .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2744 {
2745 let https = sui_http::Builder::new()
2746 .tls_single_cert(tls_config.cert(), tls_config.key())
2747 .and_then(|builder| builder.serve(https_address, router.clone()))
2748 .map_err(|e| anyhow::anyhow!(e))?;
2749
2750 info!(
2751 https_address =? https.local_addr(),
2752 "HTTPS rpc server listening on {}",
2753 https.local_addr()
2754 );
2755
2756 Some(https)
2757 } else {
2758 None
2759 };
2760
2761 let http = sui_http::Builder::new()
2762 .serve(&config.json_rpc_address, router)
2763 .map_err(|e| anyhow::anyhow!(e))?;
2764
2765 info!(
2766 http_address =? http.local_addr(),
2767 "HTTP rpc server listening on {}",
2768 http.local_addr()
2769 );
2770
2771 Ok((
2772 HttpServers {
2773 http: Some(http),
2774 https,
2775 },
2776 Some(subscription_service_checkpoint_sender),
2777 ))
2778}
2779
2780#[derive(Default)]
2781struct HttpServers {
2782 #[allow(unused)]
2783 http: Option<sui_http::ServerHandle>,
2784 #[allow(unused)]
2785 https: Option<sui_http::ServerHandle>,
2786}
2787
2788#[cfg(test)]
2789mod tests {
2790 use super::*;
2791 use prometheus::Registry;
2792 use std::collections::BTreeMap;
2793 use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2794 use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2795 use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2796
2797 #[tokio::test]
2798 async fn test_fork_error_and_recovery_both_paths() {
2799 let checkpoint_store = CheckpointStore::new_for_tests();
2800 let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2801
2802 let seq_num = 42;
2804 let digest = CheckpointDigest::random();
2805 checkpoint_store
2806 .record_checkpoint_fork_detected(seq_num, digest)
2807 .unwrap();
2808
2809 let fork_recovery = ForkRecoveryConfig {
2810 transaction_overrides: Default::default(),
2811 checkpoint_overrides: Default::default(),
2812 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2813 };
2814
2815 let r = SuiNode::check_and_recover_forks(
2816 &checkpoint_store,
2817 &checkpoint_metrics,
2818 Some(&fork_recovery),
2819 )
2820 .await;
2821 assert!(r.is_err());
2822 assert!(
2823 r.unwrap_err()
2824 .to_string()
2825 .contains("Checkpoint fork detected")
2826 );
2827
2828 let mut checkpoint_overrides = BTreeMap::new();
2829 checkpoint_overrides.insert(seq_num, digest.to_string());
2830 let fork_recovery_with_override = ForkRecoveryConfig {
2831 transaction_overrides: Default::default(),
2832 checkpoint_overrides,
2833 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2834 };
2835 let r = SuiNode::check_and_recover_forks(
2836 &checkpoint_store,
2837 &checkpoint_metrics,
2838 Some(&fork_recovery_with_override),
2839 )
2840 .await;
2841 assert!(r.is_ok());
2842 assert!(
2843 checkpoint_store
2844 .get_checkpoint_fork_detected()
2845 .unwrap()
2846 .is_none()
2847 );
2848
2849 let tx_digest = TransactionDigest::random();
2851 let expected_effects = TransactionEffectsDigest::random();
2852 let actual_effects = TransactionEffectsDigest::random();
2853 checkpoint_store
2854 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2855 .unwrap();
2856
2857 let fork_recovery = ForkRecoveryConfig {
2858 transaction_overrides: Default::default(),
2859 checkpoint_overrides: Default::default(),
2860 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2861 };
2862 let r = SuiNode::check_and_recover_forks(
2863 &checkpoint_store,
2864 &checkpoint_metrics,
2865 Some(&fork_recovery),
2866 )
2867 .await;
2868 assert!(r.is_err());
2869 assert!(
2870 r.unwrap_err()
2871 .to_string()
2872 .contains("Transaction fork detected")
2873 );
2874
2875 let mut transaction_overrides = BTreeMap::new();
2876 transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2877 let fork_recovery_with_override = ForkRecoveryConfig {
2878 transaction_overrides,
2879 checkpoint_overrides: Default::default(),
2880 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2881 };
2882 let r = SuiNode::check_and_recover_forks(
2883 &checkpoint_store,
2884 &checkpoint_metrics,
2885 Some(&fork_recovery_with_override),
2886 )
2887 .await;
2888 assert!(r.is_ok());
2889 assert!(
2890 checkpoint_store
2891 .get_transaction_fork_detected()
2892 .unwrap()
2893 .is_none()
2894 );
2895 }
2896}