1use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::admission_queue::{
29 AdmissionQueueContext, AdmissionQueueManager, AdmissionQueueMetrics,
30};
31use sui_core::authority::ExecutionEnv;
32use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
33use sui_core::authority::backpressure::BackpressureManager;
34use sui_core::authority::epoch_start_configuration::EpochFlag;
35use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
36use sui_core::consensus_adapter::ConsensusClient;
37use sui_core::consensus_manager::UpdatableConsensusClient;
38use sui_core::epoch::randomness::RandomnessManager;
39use sui_core::execution_cache::build_execution_cache;
40use sui_core::randomness_round_receiver::{RandomnessRoundReceiver, RandomnessRoundReceiverHandle};
41use sui_network::endpoint_manager::{AddressSource, EndpointId};
42use sui_network::validator::server::SUI_TLS_SERVER_NAME;
43use sui_types::full_checkpoint_content::Checkpoint;
44use sui_types::node_role::NodeRole;
45
46use sui_core::global_state_hasher::GlobalStateHashMetrics;
47use sui_core::storage::RestReadStore;
48use sui_json_rpc::bridge_api::BridgeReadApi;
49use sui_json_rpc_api::JsonRpcMetrics;
50use sui_network::randomness;
51use sui_rpc_api::RpcMetrics;
52use sui_rpc_api::ServerVersion;
53use sui_rpc_api::subscription::SubscriptionService;
54use sui_types::base_types::ConciseableName;
55use sui_types::crypto::RandomnessRound;
56use sui_types::digests::{
57 ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
58};
59use sui_types::messages_consensus::AuthorityCapabilitiesV2;
60use sui_types::sui_system_state::SuiSystemState;
61use tap::tap::TapFallible;
62use tokio::sync::oneshot;
63use tokio::sync::{Mutex, broadcast, mpsc};
64use tokio::task::JoinHandle;
65use tower::ServiceBuilder;
66use tracing::{Instrument, error_span, info};
67use tracing::{debug, error, warn};
68
69macro_rules! jwk_log {
73 ($($arg:tt)+) => {
74 if in_test_configuration() {
75 debug!($($arg)+);
76 } else {
77 info!($($arg)+);
78 }
79 };
80}
81
82use fastcrypto_zkp::bn254::zk_login::JWK;
83pub use handle::SuiNodeHandle;
84use mysten_metrics::{RegistryService, spawn_monitored_task};
85use mysten_service::server_timing::server_timing_middleware;
86use sui_config::node::{DBCheckpointConfig, RunWithRange};
87use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
88use sui_config::node_config_metrics::NodeConfigMetrics;
89use sui_config::{ConsensusConfig, NodeConfig};
90use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
91use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
92use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
93use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
94use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
95use sui_core::authority_aggregator::AuthorityAggregator;
96use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
97use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
98use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
99use sui_core::checkpoints::{
100 CheckpointMetrics, CheckpointOutput, CheckpointService, CheckpointStore, LogCheckpointOutput,
101 SendCheckpointToStateSync, SubmitCheckpointToConsensus,
102};
103use sui_core::consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics};
104use sui_core::consensus_manager::ConsensusManager;
105use sui_core::consensus_throughput_calculator::ConsensusThroughputCalculator;
106use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
107use sui_core::db_checkpoint_handler::DBCheckpointHandler;
108use sui_core::epoch::committee_store::CommitteeStore;
109use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
110use sui_core::epoch::epoch_metrics::EpochMetrics;
111use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
112use sui_core::global_state_hasher::GlobalStateHasher;
113use sui_core::jsonrpc_index::IndexStore;
114use sui_core::module_cache_metrics::ResolverMetrics;
115use sui_core::overload_monitor::overload_monitor;
116use sui_core::rpc_index::RpcIndexStore;
117use sui_core::signature_verifier::SignatureVerifierMetrics;
118use sui_core::storage::RocksDbStore;
119use sui_core::transaction_orchestrator::TransactionOrchestrator;
120use sui_core::{
121 authority::{AuthorityState, AuthorityStore},
122 authority_client::NetworkAuthorityClient,
123};
124use sui_json_rpc::JsonRpcServerBuilder;
125use sui_json_rpc::coin_api::CoinReadApi;
126use sui_json_rpc::governance_api::GovernanceReadApi;
127use sui_json_rpc::indexer_api::IndexerApi;
128use sui_json_rpc::move_utils::MoveUtils;
129use sui_json_rpc::read_api::ReadApi;
130use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
131use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
132use sui_macros::fail_point;
133use sui_macros::{fail_point_async, replay_log};
134use sui_network::api::ValidatorServer;
135use sui_network::discovery;
136use sui_network::endpoint_manager::EndpointManager;
137use sui_network::state_sync;
138use sui_network::validator::server::ServerBuilder;
139use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
140use sui_snapshot::uploader::StateSnapshotUploader;
141use sui_storage::{
142 http_key_value_store::HttpKVStore,
143 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
144 key_value_store_metrics::KeyValueStoreMetrics,
145};
146use sui_types::base_types::{AuthorityName, EpochId};
147use sui_types::committee::Committee;
148use sui_types::crypto::KeypairTraits;
149use sui_types::error::{SuiError, SuiResult};
150use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
151use sui_types::sui_system_state::SuiSystemStateTrait;
152use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
153use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
154use sui_types::supported_protocol_versions::SupportedProtocolVersions;
155use typed_store::DBMetrics;
156use typed_store::rocks::default_db_options;
157
158use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
159
160pub mod admin;
161pub mod db_shell;
162mod handle;
163pub mod metrics;
164
165pub struct ValidatorComponents {
166 validator_server_handle: Option<SpawnOnce>,
167 validator_overload_monitor_handle: Option<JoinHandle<()>>,
168 consensus_manager: Arc<ConsensusManager>,
169 consensus_store_pruner: ConsensusStorePruner,
170 consensus_adapter: Arc<ConsensusAdapter>,
171 checkpoint_metrics: Arc<CheckpointMetrics>,
172 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
173 admission_queue: Option<AdmissionQueueContext>,
174}
175pub struct P2pComponents {
176 p2p_network: Network,
177 known_peers: HashMap<PeerId, String>,
178 discovery_handle: discovery::Handle,
179 state_sync_handle: state_sync::Handle,
180 randomness_handle: randomness::Handle,
181 endpoint_manager: EndpointManager,
182}
183
184#[cfg(msim)]
185mod simulator {
186 use std::sync::atomic::AtomicBool;
187 use sui_types::error::SuiErrorKind;
188
189 use super::*;
190 pub(super) struct SimState {
191 pub sim_node: sui_simulator::runtime::NodeHandle,
192 pub sim_safe_mode_expected: AtomicBool,
193 _leak_detector: sui_simulator::NodeLeakDetector,
194 }
195
196 impl Default for SimState {
197 fn default() -> Self {
198 Self {
199 sim_node: sui_simulator::runtime::NodeHandle::current(),
200 sim_safe_mode_expected: AtomicBool::new(false),
201 _leak_detector: sui_simulator::NodeLeakDetector::new(),
202 }
203 }
204 }
205
206 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
207 + Send
208 + Sync
209 + 'static;
210
211 fn default_fetch_jwks(
212 _authority: AuthorityName,
213 _provider: &OIDCProvider,
214 ) -> SuiResult<Vec<(JwkId, JWK)>> {
215 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
216 parse_jwks(
218 sui_types::zk_login_util::DEFAULT_JWK_BYTES,
219 &OIDCProvider::Twitch,
220 true,
221 )
222 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
223 }
224
225 thread_local! {
226 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
227 }
228
229 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
230 JWK_INJECTOR.with(|injector| injector.borrow().clone())
231 }
232
233 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
234 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
235 }
236}
237
238#[cfg(msim)]
239pub use simulator::set_jwk_injector;
240#[cfg(msim)]
241use simulator::*;
242use sui_core::authority::authority_store_pruner::PrunerWatermarks;
243use sui_core::{
244 consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
245};
246
247const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
248
249pub struct SuiNode {
250 config: NodeConfig,
251 validator_components: Mutex<Option<ValidatorComponents>>,
252
253 #[allow(unused)]
255 http_servers: HttpServers,
256
257 state: Arc<AuthorityState>,
258 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
259 registry_service: RegistryService,
260 metrics: Arc<SuiNodeMetrics>,
261 checkpoint_metrics: Arc<CheckpointMetrics>,
262
263 _discovery: discovery::Handle,
264 _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
265 state_sync_handle: state_sync::Handle,
266 randomness_handle: randomness::Handle,
267 checkpoint_store: Arc<CheckpointStore>,
268 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
269
270 end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
272
273 endpoint_manager: EndpointManager,
275
276 backpressure_manager: Arc<BackpressureManager>,
277
278 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
279
280 #[cfg(msim)]
281 sim_state: SimState,
282
283 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
284 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
286
287 randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
289
290 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
295
296 subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
297}
298
299impl fmt::Debug for SuiNode {
300 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
301 f.debug_struct("SuiNode")
302 .field("name", &self.state.name.concise())
303 .finish()
304 }
305}
306
307static MAX_JWK_KEYS_PER_FETCH: usize = 100;
308
309impl SuiNode {
310 pub async fn start(
311 config: NodeConfig,
312 registry_service: RegistryService,
313 ) -> Result<Arc<SuiNode>> {
314 Self::start_async(
315 config,
316 registry_service,
317 ServerVersion::new("sui-node", "unknown"),
318 )
319 .await
320 }
321
322 fn start_jwk_updater(
323 config: &NodeConfig,
324 metrics: Arc<SuiNodeMetrics>,
325 authority: AuthorityName,
326 epoch_store: Arc<AuthorityPerEpochStore>,
327 consensus_adapter: Arc<ConsensusAdapter>,
328 ) {
329 let epoch = epoch_store.epoch();
330
331 let supported_providers = config
332 .zklogin_oauth_providers
333 .get(&epoch_store.get_chain_identifier().chain())
334 .unwrap_or(&BTreeSet::new())
335 .iter()
336 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
337 .collect::<Vec<_>>();
338
339 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
340
341 info!(
342 ?fetch_interval,
343 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
344 );
345
346 fn validate_jwk(
347 metrics: &Arc<SuiNodeMetrics>,
348 provider: &OIDCProvider,
349 id: &JwkId,
350 jwk: &JWK,
351 ) -> bool {
352 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
353 warn!(
354 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
355 id.iss, provider
356 );
357 metrics
358 .invalid_jwks
359 .with_label_values(&[&provider.to_string()])
360 .inc();
361 return false;
362 };
363
364 if iss_provider != *provider {
365 warn!(
366 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
367 id.iss, provider, iss_provider
368 );
369 metrics
370 .invalid_jwks
371 .with_label_values(&[&provider.to_string()])
372 .inc();
373 return false;
374 }
375
376 if !check_total_jwk_size(id, jwk) {
377 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
378 metrics
379 .invalid_jwks
380 .with_label_values(&[&provider.to_string()])
381 .inc();
382 return false;
383 }
384
385 true
386 }
387
388 for p in supported_providers.into_iter() {
397 let provider_str = p.to_string();
398 let epoch_store = epoch_store.clone();
399 let consensus_adapter = consensus_adapter.clone();
400 let metrics = metrics.clone();
401 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
402 async move {
403 let mut seen = HashSet::new();
406 loop {
407 jwk_log!("fetching JWK for provider {:?}", p);
408 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
409 match Self::fetch_jwks(authority, &p).await {
410 Err(e) => {
411 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
412 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
413 tokio::time::sleep(Duration::from_secs(30)).await;
415 continue;
416 }
417 Ok(mut keys) => {
418 metrics.total_jwks
419 .with_label_values(&[&provider_str])
420 .inc_by(keys.len() as u64);
421
422 keys.retain(|(id, jwk)| {
423 validate_jwk(&metrics, &p, id, jwk) &&
424 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
425 seen.insert((id.clone(), jwk.clone()))
426 });
427
428 metrics.unique_jwks
429 .with_label_values(&[&provider_str])
430 .inc_by(keys.len() as u64);
431
432 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
435 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
436 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
437 }
438
439 for (id, jwk) in keys.into_iter() {
440 jwk_log!("Submitting JWK to consensus: {:?}", id);
441
442 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
443 consensus_adapter.submit(txn, None, &epoch_store, None, None)
444 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
445 .ok();
446 }
447 }
448 }
449 tokio::time::sleep(fetch_interval).await;
450 }
451 }
452 .instrument(error_span!("jwk_updater_task", epoch)),
453 ));
454 }
455 }
456
457 pub async fn start_async(
458 config: NodeConfig,
459 registry_service: RegistryService,
460 server_version: ServerVersion,
461 ) -> Result<Arc<SuiNode>> {
462 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
463 let mut config = config.clone();
464 if config.supported_protocol_versions.is_none() {
465 info!(
466 "populating config.supported_protocol_versions with default {:?}",
467 SupportedProtocolVersions::SYSTEM_DEFAULT
468 );
469 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
470 }
471
472 let run_with_range = config.run_with_range;
473 let prometheus_registry = registry_service.default_registry();
474 let node_role = config.intended_node_role();
475
476 info!(node =? config.protocol_public_key(),
477 "Initializing sui-node listening on {} with role {:?}", config.network_address, node_role
478 );
479
480 DBMetrics::init(registry_service.clone());
482
483 typed_store::init_write_sync(config.enable_db_sync_to_disk);
485
486 mysten_metrics::init_metrics(&prometheus_registry);
488 #[cfg(not(msim))]
490 mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
491
492 let genesis = config.genesis()?.clone();
493
494 let secret = Arc::pin(config.protocol_key_pair().copy());
495 let genesis_committee = genesis.committee();
496 let committee_store = Arc::new(CommitteeStore::new(
497 config.db_path().join("epochs"),
498 &genesis_committee,
499 None,
500 ));
501
502 let pruner_watermarks = Arc::new(PrunerWatermarks::default());
503 let checkpoint_store = CheckpointStore::new(
504 &config.db_path().join("checkpoints"),
505 pruner_watermarks.clone(),
506 );
507 let checkpoint_metrics = CheckpointMetrics::new(®istry_service.default_registry());
508
509 if node_role.runs_consensus() {
510 Self::check_and_recover_forks(
511 &checkpoint_store,
512 &checkpoint_metrics,
513 config.fork_recovery.as_ref(),
514 )
515 .await?;
516 }
517
518 let enable_write_stall = config
520 .enable_db_write_stall
521 .unwrap_or(node_role.runs_consensus());
522 let enable_objects_compactor = node_role.is_validator()
529 || config.authority_store_pruning_config.num_epochs_to_retain == 0;
530 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
531 enable_write_stall,
532 enable_objects_compactor,
533 };
534 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
535 &config.db_store_path(),
536 Some(perpetual_tables_options),
537 Some(pruner_watermarks.epoch_id.clone()),
538 ));
539 let is_genesis = perpetual_tables
540 .database_is_empty()
541 .expect("Database read should not fail at init.");
542
543 let backpressure_manager =
544 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
545
546 let store =
547 AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
548
549 let cur_epoch = store.get_recovery_epoch_at_restart()?;
550 let committee = committee_store
551 .get_committee(&cur_epoch)?
552 .expect("Committee of the current epoch must exist");
553 let epoch_start_configuration = store
554 .get_epoch_start_configuration()?
555 .expect("EpochStartConfiguration of the current epoch must exist");
556 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
557 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
558
559 let cache_traits = build_execution_cache(
560 &config.execution_cache,
561 &prometheus_registry,
562 &store,
563 backpressure_manager.clone(),
564 );
565
566 let auth_agg = {
567 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
568 Arc::new(ArcSwap::new(Arc::new(
569 AuthorityAggregator::new_from_epoch_start_state(
570 epoch_start_configuration.epoch_start_state(),
571 &committee_store,
572 safe_client_metrics_base,
573 ),
574 )))
575 };
576
577 let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
578 let chain = match config.chain_override_for_testing {
579 Some(chain) => chain,
580 None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
581 };
582
583 let highest_executed_checkpoint = checkpoint_store
584 .get_highest_executed_checkpoint_seq_number()
585 .expect("checkpoint store read cannot fail")
586 .unwrap_or(0);
587
588 let previous_epoch_last_checkpoint = if cur_epoch == 0 {
589 0
590 } else {
591 checkpoint_store
592 .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
593 .expect("checkpoint store read cannot fail")
594 .unwrap_or(highest_executed_checkpoint)
595 };
596
597 let epoch_options = default_db_options().optimize_db_for_write_throughput(4, false);
598 let epoch_store = AuthorityPerEpochStore::new(
599 config.protocol_public_key(),
600 committee.clone(),
601 &config.db_store_path(),
602 Some(epoch_options.options),
603 EpochMetrics::new(®istry_service.default_registry()),
604 epoch_start_configuration,
605 cache_traits.backing_package_store.clone(),
606 cache_traits.object_store.clone(),
607 cache_metrics,
608 signature_verifier_metrics,
609 &config.expensive_safety_check_config,
610 (chain_id, chain),
611 highest_executed_checkpoint,
612 previous_epoch_last_checkpoint,
613 Arc::new(SubmittedTransactionCacheMetrics::new(
614 ®istry_service.default_registry(),
615 )),
616 config.fullnode_sync_mode,
617 )?;
618
619 info!("created epoch store");
620
621 replay_log!(
622 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
623 epoch_store.epoch(),
624 epoch_store.protocol_config()
625 );
626
627 if is_genesis {
629 info!("checking SUI conservation at genesis");
630 cache_traits
635 .reconfig_api
636 .expensive_check_sui_conservation(&epoch_store)
637 .expect("SUI conservation check cannot fail at genesis");
638 }
639
640 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
641 let default_buffer_stake = epoch_store
642 .protocol_config()
643 .buffer_stake_for_protocol_upgrade_bps();
644 if effective_buffer_stake != default_buffer_stake {
645 warn!(
646 ?effective_buffer_stake,
647 ?default_buffer_stake,
648 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
649 );
650 }
651
652 checkpoint_store.insert_genesis_checkpoint(
653 genesis.checkpoint(),
654 genesis.checkpoint_contents().clone(),
655 &epoch_store,
656 );
657
658 info!("creating state sync store");
659 let state_sync_store = RocksDbStore::new(
660 cache_traits.clone(),
661 committee_store.clone(),
662 checkpoint_store.clone(),
663 );
664
665 let index_store =
666 if node_role.should_enable_index_processing() && config.enable_index_processing {
667 info!("creating jsonrpc index store");
668 Some(Arc::new(IndexStore::new(
669 config.db_path().join("indexes"),
670 &prometheus_registry,
671 epoch_store
672 .protocol_config()
673 .max_move_identifier_len_as_option(),
674 config.remove_deprecated_tables,
675 )))
676 } else {
677 None
678 };
679
680 let rpc_index = if node_role.should_enable_index_processing()
681 && config.rpc().is_some_and(|rpc| rpc.enable_indexing())
682 {
683 info!("creating rpc index store");
684 Some(Arc::new(
685 RpcIndexStore::new(
686 &config.db_path(),
687 &store,
688 &checkpoint_store,
689 &epoch_store,
690 &cache_traits.backing_package_store,
691 config.rpc().cloned().unwrap_or_default(),
692 )
693 .await,
694 ))
695 } else {
696 None
697 };
698
699 let chain_identifier = epoch_store.get_chain_identifier();
700
701 info!("creating archive reader");
702 let (randomness_tx, randomness_rx) = mpsc::channel(
704 config
705 .p2p_config
706 .randomness
707 .clone()
708 .unwrap_or_default()
709 .mailbox_capacity(),
710 );
711 let P2pComponents {
712 p2p_network,
713 known_peers,
714 discovery_handle,
715 state_sync_handle,
716 randomness_handle,
717 endpoint_manager,
718 } = Self::create_p2p_network(
719 &config,
720 state_sync_store.clone(),
721 chain_identifier,
722 randomness_tx,
723 &prometheus_registry,
724 )?;
725
726 for peer in &config.p2p_config.peer_address_overrides {
728 endpoint_manager
729 .update_endpoint(
730 EndpointId::P2p(peer.peer_id),
731 AddressSource::Config,
732 peer.addresses.clone(),
733 )
734 .expect("Updating peer address overrides should not fail");
735 }
736
737 update_peer_addresses(
739 &config,
740 &endpoint_manager,
741 epoch_store.epoch_start_state(),
742 None,
743 );
744
745 info!("start snapshot upload");
746 let state_snapshot_handle = Self::start_state_snapshot(
748 &config,
749 &prometheus_registry,
750 checkpoint_store.clone(),
751 chain_identifier,
752 )?;
753
754 info!("start db checkpoint");
756 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
757 &config,
758 &prometheus_registry,
759 state_snapshot_handle.is_some(),
760 )?;
761
762 if !epoch_store
763 .protocol_config()
764 .simplified_unwrap_then_delete()
765 {
766 config
768 .authority_store_pruning_config
769 .set_killswitch_tombstone_pruning(true);
770 }
771
772 let authority_name = config.protocol_public_key();
773
774 info!("create authority state");
775 let state = AuthorityState::new(
776 authority_name,
777 secret,
778 config.supported_protocol_versions.unwrap(),
779 store.clone(),
780 cache_traits.clone(),
781 epoch_store.clone(),
782 committee_store.clone(),
783 index_store.clone(),
784 rpc_index,
785 checkpoint_store.clone(),
786 &prometheus_registry,
787 genesis.objects(),
788 &db_checkpoint_config,
789 config.clone(),
790 chain_identifier,
791 config.policy_config.clone(),
792 config.firewall_config.clone(),
793 pruner_watermarks,
794 )
795 .await;
796 if epoch_store.epoch() == 0 {
798 let txn = &genesis.transaction();
799 let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
800 let transaction =
801 sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
802 sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
803 genesis.transaction().data().clone(),
804 sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
805 ),
806 );
807 let _enter = span.enter();
808 state
809 .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
810 .unwrap();
811 }
812
813 let randomness_receiver_handle =
816 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
817
818 let (end_of_epoch_channel, end_of_epoch_receiver) =
819 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
820
821 let transaction_orchestrator = if node_role.is_fullnode() && run_with_range.is_none() {
822 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
823 auth_agg.load_full(),
824 state.clone(),
825 end_of_epoch_receiver,
826 &config.db_path(),
827 &prometheus_registry,
828 &config,
829 )))
830 } else {
831 None
832 };
833
834 let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
835 state.clone(),
836 state_sync_store,
837 &transaction_orchestrator.clone(),
838 &config,
839 &prometheus_registry,
840 server_version,
841 node_role,
842 )
843 .await?;
844
845 let global_state_hasher = Arc::new(GlobalStateHasher::new(
846 cache_traits.global_state_hash_store.clone(),
847 GlobalStateHashMetrics::new(&prometheus_registry),
848 ));
849
850 let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
851 "sui",
852 ®istry_service.default_registry(),
853 );
854
855 let connection_monitor_handle =
856 mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
857 p2p_network.downgrade(),
858 Arc::new(network_connection_metrics),
859 known_peers,
860 );
861
862 let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry()));
863
864 sui_node_metrics
865 .binary_max_protocol_version
866 .set(ProtocolVersion::MAX.as_u64() as i64);
867 sui_node_metrics
868 .configured_max_protocol_version
869 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
870
871 let node_role = epoch_store.node_role();
872 let validator_components = if node_role.runs_consensus() {
873 let mut components = Self::construct_validator_components(
874 config.clone(),
875 state.clone(),
876 committee,
877 epoch_store.clone(),
878 checkpoint_store.clone(),
879 state_sync_handle.clone(),
880 randomness_handle.clone(),
881 Arc::downgrade(&global_state_hasher),
882 backpressure_manager.clone(),
883 ®istry_service,
884 sui_node_metrics.clone(),
885 checkpoint_metrics.clone(),
886 node_role,
887 randomness_receiver_handle.clone(),
888 )
889 .await?;
890
891 if node_role.is_validator() {
892 components
893 .consensus_adapter
894 .recover_end_of_publish(&epoch_store);
895
896 components.validator_server_handle = Some(
898 components
899 .validator_server_handle
900 .take()
901 .unwrap()
902 .start()
903 .await,
904 );
905
906 endpoint_manager
908 .set_consensus_address_updater(components.consensus_manager.clone());
909 } else {
910 info!("Starting node as Observer — connecting to configured peers");
911 }
912
913 Some(components)
914 } else {
915 None
916 };
917
918 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
920
921 let node = Self {
922 config,
923 validator_components: Mutex::new(validator_components),
924 http_servers,
925 state,
926 transaction_orchestrator,
927 registry_service,
928 metrics: sui_node_metrics,
929 checkpoint_metrics,
930
931 _discovery: discovery_handle,
932 _connection_monitor_handle: connection_monitor_handle,
933 state_sync_handle,
934 randomness_handle,
935 checkpoint_store,
936 global_state_hasher: Mutex::new(Some(global_state_hasher)),
937 end_of_epoch_channel,
938 endpoint_manager,
939 backpressure_manager,
940
941 _db_checkpoint_handle: db_checkpoint_handle,
942
943 #[cfg(msim)]
944 sim_state: Default::default(),
945
946 _state_snapshot_uploader_handle: state_snapshot_handle,
947 shutdown_channel_tx: shutdown_channel,
948 randomness_receiver_handle,
949
950 auth_agg,
951 subscription_service_checkpoint_sender,
952 };
953
954 info!("SuiNode started!");
955 let node = Arc::new(node);
956 let node_copy = node.clone();
957 spawn_monitored_task!(async move {
958 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
959 if let Err(error) = result {
960 warn!("Reconfiguration finished with error {:?}", error);
961 }
962 });
963
964 Ok(node)
965 }
966
967 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
968 self.end_of_epoch_channel.subscribe()
969 }
970
971 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
972 self.shutdown_channel_tx.subscribe()
973 }
974
975 pub fn current_epoch_for_testing(&self) -> EpochId {
976 self.state.current_epoch_for_testing()
977 }
978
979 pub fn db_checkpoint_path(&self) -> PathBuf {
980 self.config.db_checkpoint_path()
981 }
982
983 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
985 info!("close_epoch (current epoch = {})", epoch_store.epoch());
986 self.validator_components
987 .lock()
988 .await
989 .as_ref()
990 .ok_or_else(|| SuiError::from("Node is not a validator"))?
991 .consensus_adapter
992 .close_epoch(epoch_store);
993 Ok(())
994 }
995
996 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
997 self.state
998 .clear_override_protocol_upgrade_buffer_stake(epoch)
999 }
1000
1001 pub fn set_override_protocol_upgrade_buffer_stake(
1002 &self,
1003 epoch: EpochId,
1004 buffer_stake_bps: u64,
1005 ) -> SuiResult {
1006 self.state
1007 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1008 }
1009
1010 pub async fn close_epoch_for_testing(&self) -> SuiResult {
1013 let epoch_store = self.state.epoch_store_for_testing();
1014 self.close_epoch(&epoch_store).await
1015 }
1016
1017 fn start_state_snapshot(
1018 config: &NodeConfig,
1019 prometheus_registry: &Registry,
1020 checkpoint_store: Arc<CheckpointStore>,
1021 chain_identifier: ChainIdentifier,
1022 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1023 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1024 let snapshot_uploader = StateSnapshotUploader::new(
1025 &config.db_checkpoint_path(),
1026 &config.snapshot_path(),
1027 remote_store_config.clone(),
1028 60,
1029 prometheus_registry,
1030 checkpoint_store,
1031 chain_identifier,
1032 config.state_snapshot_write_config.archive_interval_epochs,
1033 )?;
1034 Ok(Some(snapshot_uploader.start()))
1035 } else {
1036 Ok(None)
1037 }
1038 }
1039
1040 fn start_db_checkpoint(
1041 config: &NodeConfig,
1042 prometheus_registry: &Registry,
1043 state_snapshot_enabled: bool,
1044 ) -> Result<(
1045 DBCheckpointConfig,
1046 Option<tokio::sync::broadcast::Sender<()>>,
1047 )> {
1048 let checkpoint_path = Some(
1049 config
1050 .db_checkpoint_config
1051 .checkpoint_path
1052 .clone()
1053 .unwrap_or_else(|| config.db_checkpoint_path()),
1054 );
1055 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1056 DBCheckpointConfig {
1057 checkpoint_path,
1058 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1059 true
1060 } else {
1061 config
1062 .db_checkpoint_config
1063 .perform_db_checkpoints_at_epoch_end
1064 },
1065 ..config.db_checkpoint_config.clone()
1066 }
1067 } else {
1068 config.db_checkpoint_config.clone()
1069 };
1070
1071 match (
1072 db_checkpoint_config.object_store_config.as_ref(),
1073 state_snapshot_enabled,
1074 ) {
1075 (None, false) => Ok((db_checkpoint_config, None)),
1080 (_, _) => {
1081 let handler = DBCheckpointHandler::new(
1082 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1083 db_checkpoint_config.object_store_config.as_ref(),
1084 60,
1085 db_checkpoint_config
1086 .prune_and_compact_before_upload
1087 .unwrap_or(true),
1088 config.authority_store_pruning_config.clone(),
1089 prometheus_registry,
1090 state_snapshot_enabled,
1091 )?;
1092 Ok((
1093 db_checkpoint_config,
1094 Some(DBCheckpointHandler::start(handler)),
1095 ))
1096 }
1097 }
1098 }
1099
1100 fn create_p2p_network(
1101 config: &NodeConfig,
1102 state_sync_store: RocksDbStore,
1103 chain_identifier: ChainIdentifier,
1104 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1105 prometheus_registry: &Registry,
1106 ) -> Result<P2pComponents> {
1107 let mut p2p_config = config.p2p_config.clone();
1108 {
1109 let disc = p2p_config.discovery.get_or_insert_with(Default::default);
1110 if disc.peer_addr_store_path.is_none() {
1111 disc.peer_addr_store_path =
1112 Some(config.db_path().join("discovery_peer_cache.yaml"));
1113 }
1114 }
1115 let mut discovery_builder = discovery::Builder::new().config(p2p_config.clone());
1116 if let Some(consensus_config) = &config.consensus_config {
1117 let effective_addr = consensus_config
1118 .external_address
1119 .as_ref()
1120 .or(consensus_config.listen_address.as_ref());
1121 if let Some(addr) = effective_addr {
1122 discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1123 }
1124 }
1125 let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1126 let discovery_sender = discovery.sender();
1127
1128 let (state_sync, state_sync_router) = state_sync::Builder::new()
1129 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1130 .store(state_sync_store)
1131 .archive_config(config.archive_reader_config())
1132 .discovery_sender(discovery_sender)
1133 .with_metrics(prometheus_registry)
1134 .build();
1135
1136 let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1137 let known_peers: HashMap<PeerId, String> = discovery_config
1138 .allowlisted_peers
1139 .clone()
1140 .into_iter()
1141 .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1142 .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1143 peer.peer_id
1144 .map(|peer_id| (peer_id, "seed_peer".to_string()))
1145 }))
1146 .collect();
1147
1148 let (randomness, randomness_router) =
1149 randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1150 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1151 .with_metrics(prometheus_registry)
1152 .build();
1153
1154 let p2p_network = {
1155 let routes = anemo::Router::new()
1156 .add_rpc_service(discovery_server)
1157 .merge(state_sync_router);
1158 let routes = routes.merge(randomness_router);
1159
1160 let inbound_network_metrics =
1161 mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1162 let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1163 "sui",
1164 "outbound",
1165 prometheus_registry,
1166 );
1167
1168 let service = ServiceBuilder::new()
1169 .layer(
1170 TraceLayer::new_for_server_errors()
1171 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1172 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1173 )
1174 .layer(CallbackLayer::new(
1175 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1176 Arc::new(inbound_network_metrics),
1177 config.p2p_config.excessive_message_size(),
1178 ),
1179 ))
1180 .service(routes);
1181
1182 let outbound_layer = ServiceBuilder::new()
1183 .layer(
1184 TraceLayer::new_for_client_and_server_errors()
1185 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1186 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1187 )
1188 .layer(CallbackLayer::new(
1189 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1190 Arc::new(outbound_network_metrics),
1191 config.p2p_config.excessive_message_size(),
1192 ),
1193 ))
1194 .into_inner();
1195
1196 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1197 anemo_config.max_request_frame_size = Some(1 << 20);
1200 anemo_config.max_response_frame_size = Some(128 << 20);
1203
1204 let mut quic_config = anemo_config.quic.unwrap_or_default();
1207 if quic_config.socket_send_buffer_size.is_none() {
1208 quic_config.socket_send_buffer_size = Some(20 << 20);
1209 }
1210 if quic_config.socket_receive_buffer_size.is_none() {
1211 quic_config.socket_receive_buffer_size = Some(20 << 20);
1212 }
1213 quic_config.allow_failed_socket_buffer_size_setting = true;
1214
1215 if quic_config.max_concurrent_bidi_streams.is_none() {
1218 quic_config.max_concurrent_bidi_streams = Some(500);
1219 }
1220 if quic_config.max_concurrent_uni_streams.is_none() {
1221 quic_config.max_concurrent_uni_streams = Some(500);
1222 }
1223 if quic_config.stream_receive_window.is_none() {
1224 quic_config.stream_receive_window = Some(100 << 20);
1225 }
1226 if quic_config.receive_window.is_none() {
1227 quic_config.receive_window = Some(200 << 20);
1228 }
1229 if quic_config.send_window.is_none() {
1230 quic_config.send_window = Some(200 << 20);
1231 }
1232 if quic_config.crypto_buffer_size.is_none() {
1233 quic_config.crypto_buffer_size = Some(1 << 20);
1234 }
1235 if quic_config.max_idle_timeout_ms.is_none() {
1236 quic_config.max_idle_timeout_ms = Some(10_000);
1237 }
1238 if quic_config.keep_alive_interval_ms.is_none() {
1239 quic_config.keep_alive_interval_ms = Some(5_000);
1240 }
1241 anemo_config.quic = Some(quic_config);
1242
1243 let server_name = format!("sui-{}", chain_identifier);
1244 let network = Network::bind(config.p2p_config.listen_address)
1245 .server_name(&server_name)
1246 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1247 .config(anemo_config)
1248 .outbound_request_layer(outbound_layer)
1249 .start(service)?;
1250 info!(
1251 server_name = server_name,
1252 "P2p network started on {}",
1253 network.local_addr()
1254 );
1255
1256 network
1257 };
1258
1259 let discovery_handle =
1260 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1261 let state_sync_handle = state_sync.start(p2p_network.clone());
1262 let randomness_handle = randomness.start(p2p_network.clone());
1263
1264 Ok(P2pComponents {
1265 p2p_network,
1266 known_peers,
1267 discovery_handle,
1268 state_sync_handle,
1269 randomness_handle,
1270 endpoint_manager,
1271 })
1272 }
1273
1274 async fn construct_validator_components(
1275 config: NodeConfig,
1276 state: Arc<AuthorityState>,
1277 committee: Arc<Committee>,
1278 epoch_store: Arc<AuthorityPerEpochStore>,
1279 checkpoint_store: Arc<CheckpointStore>,
1280 state_sync_handle: state_sync::Handle,
1281 randomness_handle: randomness::Handle,
1282 global_state_hasher: Weak<GlobalStateHasher>,
1283 backpressure_manager: Arc<BackpressureManager>,
1284 registry_service: &RegistryService,
1285 sui_node_metrics: Arc<SuiNodeMetrics>,
1286 checkpoint_metrics: Arc<CheckpointMetrics>,
1287 node_role: NodeRole,
1288 randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
1289 ) -> Result<ValidatorComponents> {
1290 let mut config_clone = config.clone();
1291 let consensus_config = config_clone
1292 .consensus_config
1293 .as_mut()
1294 .ok_or_else(|| anyhow!("Node is missing consensus config"))?;
1295
1296 let client = Arc::new(UpdatableConsensusClient::new());
1297 let inflight_slot_freed_notify = Arc::new(tokio::sync::Notify::new());
1298 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1299 &committee,
1300 consensus_config,
1301 state.name,
1302 ®istry_service.default_registry(),
1303 client.clone(),
1304 checkpoint_store.clone(),
1305 inflight_slot_freed_notify.clone(),
1306 ));
1307
1308 let consensus_manager = Arc::new(ConsensusManager::new(
1309 &config,
1310 consensus_config,
1311 registry_service,
1312 client,
1313 node_role,
1314 ));
1315
1316 let consensus_store_pruner = ConsensusStorePruner::new(
1318 consensus_manager.get_storage_base_path(),
1319 consensus_config.db_retention_epochs(),
1320 consensus_config.db_pruner_period(),
1321 ®istry_service.default_registry(),
1322 );
1323
1324 let sui_tx_validator_metrics =
1325 SuiTxValidatorMetrics::new(®istry_service.default_registry());
1326
1327 let (validator_server_handle, admission_queue) = if node_role.is_validator() {
1328 let (handle, queue) = Self::start_grpc_validator_service(
1329 &config,
1330 state.clone(),
1331 consensus_adapter.clone(),
1332 epoch_store.clone(),
1333 ®istry_service.default_registry(),
1334 inflight_slot_freed_notify,
1335 )
1336 .await?;
1337 (Some(handle), queue)
1338 } else {
1339 (None, None)
1340 };
1341
1342 let validator_overload_monitor_handle = if node_role.is_validator()
1345 && config
1346 .authority_overload_config
1347 .max_load_shedding_percentage
1348 > 0
1349 {
1350 let authority_state = Arc::downgrade(&state);
1351 let overload_config = config.authority_overload_config.clone();
1352 fail_point!("starting_overload_monitor");
1353 Some(spawn_monitored_task!(overload_monitor(
1354 authority_state,
1355 overload_config,
1356 )))
1357 } else {
1358 None
1359 };
1360
1361 Self::start_epoch_specific_validator_components(
1362 &config,
1363 state.clone(),
1364 consensus_adapter,
1365 checkpoint_store,
1366 epoch_store,
1367 state_sync_handle,
1368 randomness_handle,
1369 randomness_receiver_handle,
1370 consensus_manager,
1371 consensus_store_pruner,
1372 global_state_hasher,
1373 backpressure_manager,
1374 validator_server_handle,
1375 validator_overload_monitor_handle,
1376 checkpoint_metrics,
1377 sui_node_metrics,
1378 sui_tx_validator_metrics,
1379 admission_queue,
1380 node_role,
1381 )
1382 .await
1383 }
1384
1385 async fn start_epoch_specific_validator_components(
1386 config: &NodeConfig,
1387 state: Arc<AuthorityState>,
1388 consensus_adapter: Arc<ConsensusAdapter>,
1389 checkpoint_store: Arc<CheckpointStore>,
1390 epoch_store: Arc<AuthorityPerEpochStore>,
1391 state_sync_handle: state_sync::Handle,
1392 randomness_handle: randomness::Handle,
1393 randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
1394 consensus_manager: Arc<ConsensusManager>,
1395 consensus_store_pruner: ConsensusStorePruner,
1396 state_hasher: Weak<GlobalStateHasher>,
1397 backpressure_manager: Arc<BackpressureManager>,
1398 validator_server_handle: Option<SpawnOnce>,
1399 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1400 checkpoint_metrics: Arc<CheckpointMetrics>,
1401 sui_node_metrics: Arc<SuiNodeMetrics>,
1402 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1403 admission_queue: Option<AdmissionQueueContext>,
1404 node_role: NodeRole,
1405 ) -> Result<ValidatorComponents> {
1406 let checkpoint_service = Self::build_checkpoint_service(
1407 config,
1408 consensus_adapter.clone(),
1409 checkpoint_store.clone(),
1410 epoch_store.clone(),
1411 state.clone(),
1412 state_sync_handle,
1413 state_hasher,
1414 checkpoint_metrics.clone(),
1415 node_role,
1416 );
1417
1418 randomness_receiver_handle.clear_public_key();
1421
1422 if node_role.runs_consensus() && epoch_store.randomness_state_enabled() {
1423 let authority_key_pair = if node_role.is_validator() {
1424 Some(config.protocol_key_pair())
1425 } else {
1426 None
1427 };
1428 let randomness_manager = RandomnessManager::try_new(
1429 Arc::downgrade(&epoch_store),
1430 Box::new(consensus_adapter.clone()),
1431 randomness_handle,
1432 authority_key_pair,
1433 randomness_receiver_handle.clone(),
1434 )
1435 .await;
1436 if let Some(randomness_manager) = randomness_manager {
1437 epoch_store
1438 .set_randomness_manager(randomness_manager)
1439 .await?;
1440 }
1441 }
1442
1443 if node_role.is_validator() {
1444 ExecutionTimeObserver::spawn(
1445 epoch_store.clone(),
1446 Box::new(consensus_adapter.clone()),
1447 config
1448 .execution_time_observer_config
1449 .clone()
1450 .unwrap_or_default(),
1451 );
1452 }
1453
1454 let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1455 None,
1456 state.metrics.clone(),
1457 ));
1458
1459 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1460 state.clone(),
1461 checkpoint_service.clone(),
1462 epoch_store.clone(),
1463 consensus_adapter.clone(),
1464 throughput_calculator,
1465 backpressure_manager,
1466 config.congestion_log.clone(),
1467 );
1468
1469 info!("Starting consensus manager asynchronously");
1470
1471 tokio::spawn({
1473 let config = config.clone();
1474 let epoch_store = epoch_store.clone();
1475 let sui_tx_validator = SuiTxValidator::new(
1476 state.clone(),
1477 epoch_store.clone(),
1478 checkpoint_service.clone(),
1479 sui_tx_validator_metrics.clone(),
1480 );
1481 let consensus_manager = consensus_manager.clone();
1482 async move {
1483 consensus_manager
1484 .start(
1485 &config,
1486 epoch_store,
1487 consensus_handler_initializer,
1488 sui_tx_validator,
1489 Some(randomness_receiver_handle),
1490 )
1491 .await;
1492 }
1493 });
1494 let replay_waiter = consensus_manager.replay_waiter();
1495
1496 info!("Spawning checkpoint service");
1497 let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1498 None
1499 } else {
1500 Some(replay_waiter)
1501 };
1502 checkpoint_service
1503 .spawn(epoch_store.clone(), replay_waiter)
1504 .await;
1505
1506 if node_role.is_validator() && epoch_store.authenticator_state_enabled() {
1507 Self::start_jwk_updater(
1508 config,
1509 sui_node_metrics,
1510 state.name,
1511 epoch_store.clone(),
1512 consensus_adapter.clone(),
1513 );
1514 }
1515
1516 if let Some(ctx) = &admission_queue {
1517 ctx.rotate_for_epoch(epoch_store);
1518 }
1519
1520 Ok(ValidatorComponents {
1521 validator_server_handle,
1522 validator_overload_monitor_handle,
1523 consensus_manager,
1524 consensus_store_pruner,
1525 consensus_adapter,
1526 checkpoint_metrics,
1527 sui_tx_validator_metrics,
1528 admission_queue,
1529 })
1530 }
1531
1532 fn build_checkpoint_service(
1533 config: &NodeConfig,
1534 consensus_adapter: Arc<ConsensusAdapter>,
1535 checkpoint_store: Arc<CheckpointStore>,
1536 epoch_store: Arc<AuthorityPerEpochStore>,
1537 state: Arc<AuthorityState>,
1538 state_sync_handle: state_sync::Handle,
1539 state_hasher: Weak<GlobalStateHasher>,
1540 checkpoint_metrics: Arc<CheckpointMetrics>,
1541 node_role: NodeRole,
1542 ) -> Arc<CheckpointService> {
1543 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1544 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1545
1546 debug!(
1547 "Starting checkpoint service with epoch start timestamp {}
1548 and epoch duration {}",
1549 epoch_start_timestamp_ms, epoch_duration_ms
1550 );
1551
1552 let checkpoint_output: Box<dyn CheckpointOutput> = if node_role.is_validator() {
1553 Box::new(SubmitCheckpointToConsensus {
1554 sender: consensus_adapter,
1555 signer: state.secret.clone(),
1556 authority: config.protocol_public_key(),
1557 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1558 .checked_add(epoch_duration_ms)
1559 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1560 metrics: checkpoint_metrics.clone(),
1561 })
1562 } else {
1563 LogCheckpointOutput::boxed()
1564 };
1565
1566 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1567 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1568 let max_checkpoint_size_bytes =
1569 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1570
1571 CheckpointService::build(
1572 state.clone(),
1573 checkpoint_store,
1574 epoch_store,
1575 state.get_transaction_cache_reader().clone(),
1576 state_hasher,
1577 checkpoint_output,
1578 Box::new(certified_checkpoint_output),
1579 checkpoint_metrics,
1580 max_tx_per_checkpoint,
1581 max_checkpoint_size_bytes,
1582 )
1583 }
1584
1585 fn construct_consensus_adapter(
1586 committee: &Committee,
1587 consensus_config: &ConsensusConfig,
1588 authority: AuthorityName,
1589 prometheus_registry: &Registry,
1590 consensus_client: Arc<dyn ConsensusClient>,
1591 checkpoint_store: Arc<CheckpointStore>,
1592 inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1593 ) -> ConsensusAdapter {
1594 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1595 ConsensusAdapter::new(
1598 consensus_client,
1599 checkpoint_store,
1600 authority,
1601 consensus_config.max_pending_transactions(),
1602 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1603 ca_metrics,
1604 inflight_slot_freed_notify,
1605 )
1606 }
1607
1608 async fn start_grpc_validator_service(
1609 config: &NodeConfig,
1610 state: Arc<AuthorityState>,
1611 consensus_adapter: Arc<ConsensusAdapter>,
1612 epoch_store: Arc<AuthorityPerEpochStore>,
1613 prometheus_registry: &Registry,
1614 inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1615 ) -> Result<(SpawnOnce, Option<AdmissionQueueContext>)> {
1616 let overload_config = &config.authority_overload_config;
1617 let admission_queue = overload_config.admission_queue_enabled.then(|| {
1618 let manager = Arc::new(AdmissionQueueManager::new(
1619 consensus_adapter.clone(),
1620 Arc::new(AdmissionQueueMetrics::new(prometheus_registry)),
1621 overload_config.admission_queue_capacity_fraction,
1622 overload_config.admission_queue_bypass_fraction,
1623 overload_config.admission_queue_failover_timeout,
1624 inflight_slot_freed_notify,
1625 ));
1626 AdmissionQueueContext::spawn(manager, epoch_store)
1627 });
1628 let validator_service = ValidatorService::new(
1629 state.clone(),
1630 consensus_adapter,
1631 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1632 config.policy_config.clone().map(|p| p.client_id_source),
1633 admission_queue.clone(),
1634 );
1635
1636 let mut server_conf = mysten_network::config::Config::new();
1637 server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1638 server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1639 server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1640 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1641 server_conf.load_shed = config.grpc_load_shed;
1642 let mut server_builder =
1643 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1644
1645 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1646
1647 let tls_config = sui_tls::create_rustls_server_config(
1648 config.network_key_pair().copy().private(),
1649 SUI_TLS_SERVER_NAME.to_string(),
1650 );
1651
1652 let network_address = config.network_address().clone();
1653
1654 let (ready_tx, ready_rx) = oneshot::channel();
1655
1656 let spawn_once = SpawnOnce::new(ready_rx, async move {
1657 let server = server_builder
1658 .bind(&network_address, Some(tls_config))
1659 .await
1660 .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1661 let local_addr = server.local_addr();
1662 info!("Listening to traffic on {local_addr}");
1663 ready_tx.send(()).unwrap();
1664 if let Err(err) = server.serve().await {
1665 info!("Server stopped: {err}");
1666 }
1667 info!("Server stopped");
1668 });
1669 Ok((spawn_once, admission_queue))
1670 }
1671
1672 pub fn state(&self) -> Arc<AuthorityState> {
1673 self.state.clone()
1674 }
1675
1676 #[cfg(any(test, msim))]
1677 pub fn connection_monitor_handle_for_testing(
1678 &self,
1679 ) -> &mysten_network::anemo_connection_monitor::ConnectionMonitorHandle {
1680 &self._connection_monitor_handle
1681 }
1682
1683 pub fn node_role(&self) -> NodeRole {
1684 self.state.load_epoch_store_one_call_per_task().node_role()
1685 }
1686
1687 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1689 self.state.reference_gas_price_for_testing()
1690 }
1691
1692 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1693 self.state.committee_store().clone()
1694 }
1695
1696 pub fn clone_checkpoint_store(&self) -> Arc<CheckpointStore> {
1697 self.checkpoint_store.clone()
1698 }
1699
1700 pub fn clone_authority_store(&self) -> Arc<AuthorityStore> {
1701 self.state.authority_store()
1702 }
1703
1704 pub fn clone_consensus_store(
1705 &self,
1706 ) -> Option<Arc<consensus_core::storage::rocksdb_store::RocksDBStore>> {
1707 self.validator_components
1708 .try_lock()
1709 .ok()?
1710 .as_ref()?
1711 .consensus_manager
1712 .consensus_store()
1713 }
1714
1715 pub fn clone_authority_aggregator(
1720 &self,
1721 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1722 self.transaction_orchestrator
1723 .as_ref()
1724 .map(|to| to.clone_authority_aggregator())
1725 }
1726
1727 pub fn transaction_orchestrator(
1728 &self,
1729 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1730 self.transaction_orchestrator.clone()
1731 }
1732
1733 pub async fn monitor_reconfiguration(
1736 self: Arc<Self>,
1737 mut epoch_store: Arc<AuthorityPerEpochStore>,
1738 ) -> Result<()> {
1739 let checkpoint_executor_metrics =
1740 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1741
1742 loop {
1743 let mut hasher_guard = self.global_state_hasher.lock().await;
1744 let hasher = hasher_guard.take().unwrap();
1745 info!(
1746 "Creating checkpoint executor for epoch {}",
1747 epoch_store.epoch()
1748 );
1749 let checkpoint_executor = CheckpointExecutor::new(
1750 epoch_store.clone(),
1751 self.checkpoint_store.clone(),
1752 self.state.clone(),
1753 hasher.clone(),
1754 self.backpressure_manager.clone(),
1755 self.config.checkpoint_executor_config.clone(),
1756 checkpoint_executor_metrics.clone(),
1757 self.subscription_service_checkpoint_sender.clone(),
1758 );
1759
1760 let run_with_range = self.config.run_with_range;
1761
1762 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1763
1764 self.metrics
1766 .current_protocol_version
1767 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1768
1769 if let Some(components) = &*self.validator_components.lock().await
1772 && cur_epoch_store.is_validator()
1773 {
1774 tokio::time::sleep(Duration::from_millis(1)).await;
1776
1777 let config = cur_epoch_store.protocol_config();
1778 let mut supported_protocol_versions = self
1779 .config
1780 .supported_protocol_versions
1781 .expect("Supported versions should be populated")
1782 .truncate_below(config.version);
1784
1785 while supported_protocol_versions.max > config.version {
1786 let proposed_protocol_config = ProtocolConfig::get_for_version(
1787 supported_protocol_versions.max,
1788 cur_epoch_store.get_chain(),
1789 );
1790
1791 if proposed_protocol_config.enable_accumulators()
1792 && !epoch_store.accumulator_root_exists()
1793 {
1794 error!(
1795 "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1796 supported_protocol_versions.max
1797 );
1798 supported_protocol_versions.max = supported_protocol_versions.max.prev();
1799 } else {
1800 break;
1801 }
1802 }
1803
1804 let binary_config = config.binary_config(None);
1805 let transaction = ConsensusTransaction::new_capability_notification_v2(
1806 AuthorityCapabilitiesV2::new(
1807 self.state.name,
1808 cur_epoch_store.get_chain_identifier().chain(),
1809 supported_protocol_versions,
1810 self.state
1811 .get_available_system_packages(&binary_config)
1812 .await,
1813 ),
1814 );
1815 info!(?transaction, "submitting capabilities to consensus");
1816 components.consensus_adapter.submit(
1817 transaction,
1818 None,
1819 &cur_epoch_store,
1820 None,
1821 None,
1822 )?;
1823 }
1824
1825 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1826
1827 if stop_condition == StopReason::RunWithRangeCondition {
1828 SuiNode::shutdown(&self).await;
1829 self.shutdown_channel_tx
1830 .send(run_with_range)
1831 .expect("RunWithRangeCondition met but failed to send shutdown message");
1832 return Ok(());
1833 }
1834
1835 let latest_system_state = self
1837 .state
1838 .get_object_cache_reader()
1839 .get_sui_system_state_object_unsafe()
1840 .expect("Read Sui System State object cannot fail");
1841
1842 #[cfg(msim)]
1843 if !self
1844 .sim_state
1845 .sim_safe_mode_expected
1846 .load(Ordering::Relaxed)
1847 {
1848 debug_assert!(!latest_system_state.safe_mode());
1849 }
1850
1851 #[cfg(not(msim))]
1852 debug_assert!(!latest_system_state.safe_mode());
1853
1854 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1855 && self.state.is_fullnode(&cur_epoch_store)
1856 {
1857 warn!(
1858 "Failed to send end of epoch notification to subscriber: {:?}",
1859 err
1860 );
1861 }
1862
1863 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1864 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1865
1866 self.auth_agg.store(Arc::new(
1867 self.auth_agg
1868 .load()
1869 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1870 ));
1871
1872 let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1873 let next_epoch = next_epoch_committee.epoch();
1874 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1875
1876 info!(
1877 next_epoch,
1878 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1879 );
1880
1881 fail_point_async!("reconfig_delay");
1882
1883 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1884
1885 update_peer_addresses(
1886 &self.config,
1887 &self.endpoint_manager,
1888 &new_epoch_start_state,
1889 Some(cur_epoch_store.epoch_start_state()),
1890 );
1891
1892 let mut validator_components_lock_guard = self.validator_components.lock().await;
1893
1894 let new_epoch_store = self
1898 .reconfigure_state(
1899 &self.state,
1900 &cur_epoch_store,
1901 next_epoch_committee.clone(),
1902 new_epoch_start_state,
1903 hasher.clone(),
1904 )
1905 .await;
1906
1907 let new_role = new_epoch_store.node_role();
1908
1909 let new_validator_components = if let Some(ValidatorComponents {
1910 validator_server_handle,
1911 validator_overload_monitor_handle,
1912 consensus_manager,
1913 consensus_store_pruner,
1914 consensus_adapter,
1915 checkpoint_metrics,
1916 sui_tx_validator_metrics,
1917 admission_queue,
1918 }) = validator_components_lock_guard.take()
1919 {
1920 info!("Reconfiguring node (was running consensus).");
1921
1922 consensus_manager.shutdown().await;
1923 info!("Consensus has shut down.");
1924
1925 info!("Epoch store finished reconfiguration.");
1926
1927 let global_state_hasher_metrics = Arc::into_inner(hasher)
1930 .expect("Object state hasher should have no other references at this point")
1931 .metrics();
1932 let new_hasher = Arc::new(GlobalStateHasher::new(
1933 self.state.get_global_state_hash_store().clone(),
1934 global_state_hasher_metrics,
1935 ));
1936 let weak_hasher = Arc::downgrade(&new_hasher);
1937 *hasher_guard = Some(new_hasher);
1938
1939 consensus_store_pruner.prune(next_epoch).await;
1940
1941 if new_role.runs_consensus() {
1942 info!("Restarting consensus as {new_role}");
1943 Some(
1944 Self::start_epoch_specific_validator_components(
1945 &self.config,
1946 self.state.clone(),
1947 consensus_adapter,
1948 self.checkpoint_store.clone(),
1949 new_epoch_store.clone(),
1950 self.state_sync_handle.clone(),
1951 self.randomness_handle.clone(),
1952 self.randomness_receiver_handle.clone(),
1953 consensus_manager,
1954 consensus_store_pruner,
1955 weak_hasher,
1956 self.backpressure_manager.clone(),
1957 validator_server_handle,
1958 validator_overload_monitor_handle,
1959 checkpoint_metrics,
1960 self.metrics.clone(),
1961 sui_tx_validator_metrics,
1962 admission_queue,
1963 new_role,
1964 )
1965 .await?,
1966 )
1967 } else {
1968 info!(
1969 "This node has new role {new_role} and no longer runs consensus after reconfiguration"
1970 );
1971 None
1972 }
1973 } else {
1974 let global_state_hasher_metrics = Arc::into_inner(hasher)
1977 .expect("Object state hasher should have no other references at this point")
1978 .metrics();
1979 let new_hasher = Arc::new(GlobalStateHasher::new(
1980 self.state.get_global_state_hash_store().clone(),
1981 global_state_hasher_metrics,
1982 ));
1983 let weak_hasher = Arc::downgrade(&new_hasher);
1984 *hasher_guard = Some(new_hasher);
1985
1986 if new_role.runs_consensus() {
1987 info!("Promoting node to {new_role}, starting consensus components");
1988
1989 let mut components = Self::construct_validator_components(
1990 self.config.clone(),
1991 self.state.clone(),
1992 Arc::new(next_epoch_committee.clone()),
1993 new_epoch_store.clone(),
1994 self.checkpoint_store.clone(),
1995 self.state_sync_handle.clone(),
1996 self.randomness_handle.clone(),
1997 weak_hasher,
1998 self.backpressure_manager.clone(),
1999 &self.registry_service,
2000 self.metrics.clone(),
2001 self.checkpoint_metrics.clone(),
2002 new_role,
2003 self.randomness_receiver_handle.clone(),
2004 )
2005 .await?;
2006
2007 if new_role.is_validator() {
2008 components.validator_server_handle = Some(
2009 components
2010 .validator_server_handle
2011 .take()
2012 .unwrap()
2013 .start()
2014 .await,
2015 );
2016
2017 self.endpoint_manager
2018 .set_consensus_address_updater(components.consensus_manager.clone());
2019 }
2020
2021 Some(components)
2022 } else {
2023 None
2024 }
2025 };
2026 *validator_components_lock_guard = new_validator_components;
2027
2028 cur_epoch_store.release_db_handles();
2031
2032 if cfg!(msim)
2033 && !matches!(
2034 self.config
2035 .authority_store_pruning_config
2036 .num_epochs_to_retain_for_checkpoints(),
2037 None | Some(u64::MAX) | Some(0)
2038 )
2039 {
2040 self.state
2041 .prune_checkpoints_for_eligible_epochs_for_testing(
2042 self.config.clone(),
2043 sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2044 )
2045 .await?;
2046 }
2047
2048 epoch_store = new_epoch_store;
2049 info!("Reconfiguration finished");
2050 }
2051 }
2052
2053 async fn shutdown(&self) {
2054 if let Some(validator_components) = &*self.validator_components.lock().await {
2055 validator_components.consensus_manager.shutdown().await;
2056 }
2057 }
2058
2059 async fn reconfigure_state(
2060 &self,
2061 state: &Arc<AuthorityState>,
2062 cur_epoch_store: &AuthorityPerEpochStore,
2063 next_epoch_committee: Committee,
2064 next_epoch_start_system_state: EpochStartSystemState,
2065 global_state_hasher: Arc<GlobalStateHasher>,
2066 ) -> Arc<AuthorityPerEpochStore> {
2067 let next_epoch = next_epoch_committee.epoch();
2068
2069 let last_checkpoint = self
2070 .checkpoint_store
2071 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2072 .expect("Error loading last checkpoint for current epoch")
2073 .expect("Could not load last checkpoint for current epoch");
2074
2075 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2076
2077 assert_eq!(
2078 Some(last_checkpoint_seq),
2079 self.checkpoint_store
2080 .get_highest_executed_checkpoint_seq_number()
2081 .expect("Error loading highest executed checkpoint sequence number")
2082 );
2083
2084 let epoch_start_configuration = EpochStartConfiguration::new(
2085 next_epoch_start_system_state,
2086 *last_checkpoint.digest(),
2087 state.get_object_store().as_ref(),
2088 EpochFlag::default_flags_for_new_epoch(&state.config),
2089 )
2090 .expect("EpochStartConfiguration construction cannot fail");
2091
2092 let new_epoch_store = self
2093 .state
2094 .reconfigure(
2095 cur_epoch_store,
2096 self.config.supported_protocol_versions.unwrap(),
2097 next_epoch_committee,
2098 epoch_start_configuration,
2099 global_state_hasher,
2100 &self.config.expensive_safety_check_config,
2101 last_checkpoint_seq,
2102 )
2103 .await
2104 .expect("Reconfigure authority state cannot fail");
2105 info!(next_epoch, "Node State has been reconfigured");
2106 assert_eq!(next_epoch, new_epoch_store.epoch());
2107 self.state.get_reconfig_api().update_epoch_flags_metrics(
2108 cur_epoch_store.epoch_start_config().flags(),
2109 new_epoch_store.epoch_start_config().flags(),
2110 );
2111
2112 new_epoch_store
2113 }
2114
2115 pub fn get_config(&self) -> &NodeConfig {
2116 &self.config
2117 }
2118
2119 pub fn randomness_handle(&self) -> randomness::Handle {
2120 self.randomness_handle.clone()
2121 }
2122
2123 pub fn state_sync_handle(&self) -> state_sync::Handle {
2124 self.state_sync_handle.clone()
2125 }
2126
2127 pub fn endpoint_manager(&self) -> &EndpointManager {
2128 &self.endpoint_manager
2129 }
2130
2131 fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2133 let digest_str = digest.to_string();
2134 if digest_str.len() >= 8 {
2135 digest_str[0..8].to_string()
2136 } else {
2137 digest_str
2138 }
2139 }
2140
2141 async fn check_and_recover_forks(
2145 checkpoint_store: &CheckpointStore,
2146 checkpoint_metrics: &CheckpointMetrics,
2147 fork_recovery: Option<&ForkRecoveryConfig>,
2148 ) -> Result<()> {
2149 if let Some(recovery) = fork_recovery {
2151 Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2152 Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2153 }
2154
2155 if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2156 .get_checkpoint_fork_detected()
2157 .map_err(|e| {
2158 error!("Failed to check for checkpoint fork: {:?}", e);
2159 e
2160 })?
2161 {
2162 Self::handle_checkpoint_fork(
2163 checkpoint_seq,
2164 checkpoint_digest,
2165 checkpoint_metrics,
2166 fork_recovery,
2167 )
2168 .await?;
2169 }
2170 if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2171 .get_transaction_fork_detected()
2172 .map_err(|e| {
2173 error!("Failed to check for transaction fork: {:?}", e);
2174 e
2175 })?
2176 {
2177 Self::handle_transaction_fork(
2178 tx_digest,
2179 expected_effects,
2180 actual_effects,
2181 checkpoint_metrics,
2182 fork_recovery,
2183 )
2184 .await?;
2185 }
2186
2187 Ok(())
2188 }
2189
2190 fn try_recover_checkpoint_fork(
2191 checkpoint_store: &CheckpointStore,
2192 recovery: &ForkRecoveryConfig,
2193 ) -> Result<()> {
2194 for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2197 let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2198 anyhow::bail!(
2199 "Invalid checkpoint digest override for seq {}: {}",
2200 seq,
2201 expected_digest_str
2202 );
2203 };
2204
2205 if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2206 let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2207 if local_digest != expected_digest {
2208 info!(
2209 seq,
2210 local = %Self::get_digest_prefix(local_digest),
2211 expected = %Self::get_digest_prefix(expected_digest),
2212 "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2213 seq
2214 );
2215 checkpoint_store
2216 .clear_locally_computed_checkpoints_from(*seq)
2217 .context(
2218 "Failed to clear locally computed checkpoints from override seq",
2219 )?;
2220 }
2221 }
2222 }
2223
2224 if let Some((checkpoint_seq, checkpoint_digest)) =
2225 checkpoint_store.get_checkpoint_fork_detected()?
2226 && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2227 {
2228 info!(
2229 "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2230 checkpoint_seq, checkpoint_digest
2231 );
2232 checkpoint_store
2233 .clear_checkpoint_fork_detected()
2234 .expect("Failed to clear checkpoint fork detected marker");
2235 }
2236 Ok(())
2237 }
2238
2239 fn try_recover_transaction_fork(
2240 checkpoint_store: &CheckpointStore,
2241 recovery: &ForkRecoveryConfig,
2242 ) -> Result<()> {
2243 if recovery.transaction_overrides.is_empty() {
2244 return Ok(());
2245 }
2246
2247 if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2248 && recovery
2249 .transaction_overrides
2250 .contains_key(&tx_digest.to_string())
2251 {
2252 info!(
2253 "Fork recovery enabled: clearing transaction fork for tx {:?}",
2254 tx_digest
2255 );
2256 checkpoint_store
2257 .clear_transaction_fork_detected()
2258 .expect("Failed to clear transaction fork detected marker");
2259 }
2260 Ok(())
2261 }
2262
2263 fn get_current_timestamp() -> u64 {
2264 std::time::SystemTime::now()
2265 .duration_since(std::time::SystemTime::UNIX_EPOCH)
2266 .unwrap()
2267 .as_secs()
2268 }
2269
2270 async fn handle_checkpoint_fork(
2271 checkpoint_seq: u64,
2272 checkpoint_digest: CheckpointDigest,
2273 checkpoint_metrics: &CheckpointMetrics,
2274 fork_recovery: Option<&ForkRecoveryConfig>,
2275 ) -> Result<()> {
2276 checkpoint_metrics
2277 .checkpoint_fork_crash_mode
2278 .with_label_values(&[
2279 &checkpoint_seq.to_string(),
2280 &Self::get_digest_prefix(checkpoint_digest),
2281 &Self::get_current_timestamp().to_string(),
2282 ])
2283 .set(1);
2284
2285 let behavior = fork_recovery
2286 .map(|fr| fr.fork_crash_behavior)
2287 .unwrap_or_default();
2288
2289 match behavior {
2290 ForkCrashBehavior::AwaitForkRecovery => {
2291 error!(
2292 checkpoint_seq = checkpoint_seq,
2293 checkpoint_digest = ?checkpoint_digest,
2294 "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2295 );
2296 futures::future::pending::<()>().await;
2297 unreachable!("pending() should never return");
2298 }
2299 ForkCrashBehavior::ReturnError => {
2300 error!(
2301 checkpoint_seq = checkpoint_seq,
2302 checkpoint_digest = ?checkpoint_digest,
2303 "Checkpoint fork detected! Returning error."
2304 );
2305 Err(anyhow::anyhow!(
2306 "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2307 checkpoint_seq,
2308 checkpoint_digest
2309 ))
2310 }
2311 }
2312 }
2313
2314 async fn handle_transaction_fork(
2315 tx_digest: TransactionDigest,
2316 expected_effects_digest: TransactionEffectsDigest,
2317 actual_effects_digest: TransactionEffectsDigest,
2318 checkpoint_metrics: &CheckpointMetrics,
2319 fork_recovery: Option<&ForkRecoveryConfig>,
2320 ) -> Result<()> {
2321 checkpoint_metrics
2322 .transaction_fork_crash_mode
2323 .with_label_values(&[
2324 &Self::get_digest_prefix(tx_digest),
2325 &Self::get_digest_prefix(expected_effects_digest),
2326 &Self::get_digest_prefix(actual_effects_digest),
2327 &Self::get_current_timestamp().to_string(),
2328 ])
2329 .set(1);
2330
2331 let behavior = fork_recovery
2332 .map(|fr| fr.fork_crash_behavior)
2333 .unwrap_or_default();
2334
2335 match behavior {
2336 ForkCrashBehavior::AwaitForkRecovery => {
2337 error!(
2338 tx_digest = ?tx_digest,
2339 expected_effects_digest = ?expected_effects_digest,
2340 actual_effects_digest = ?actual_effects_digest,
2341 "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2342 );
2343 futures::future::pending::<()>().await;
2344 unreachable!("pending() should never return");
2345 }
2346 ForkCrashBehavior::ReturnError => {
2347 error!(
2348 tx_digest = ?tx_digest,
2349 expected_effects_digest = ?expected_effects_digest,
2350 actual_effects_digest = ?actual_effects_digest,
2351 "Transaction fork detected! Returning error."
2352 );
2353 Err(anyhow::anyhow!(
2354 "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2355 tx_digest,
2356 expected_effects_digest,
2357 actual_effects_digest
2358 ))
2359 }
2360 }
2361 }
2362}
2363
2364#[cfg(not(msim))]
2365impl SuiNode {
2366 async fn fetch_jwks(
2367 _authority: AuthorityName,
2368 provider: &OIDCProvider,
2369 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2370 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2371 use sui_types::error::SuiErrorKind;
2372 let client = reqwest::Client::new();
2373 fetch_jwks(provider, &client, true)
2374 .await
2375 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2376 }
2377}
2378
2379#[cfg(msim)]
2380impl SuiNode {
2381 pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2382 self.sim_state.sim_node.id()
2383 }
2384
2385 pub fn set_safe_mode_expected(&self, new_value: bool) {
2386 info!("Setting safe mode expected to {}", new_value);
2387 self.sim_state
2388 .sim_safe_mode_expected
2389 .store(new_value, Ordering::Relaxed);
2390 }
2391
2392 #[allow(unused_variables)]
2393 async fn fetch_jwks(
2394 authority: AuthorityName,
2395 provider: &OIDCProvider,
2396 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2397 get_jwk_injector()(authority, provider)
2398 }
2399}
2400
2401enum SpawnOnce {
2402 Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2404 #[allow(unused)]
2405 Started(JoinHandle<()>),
2406}
2407
2408impl SpawnOnce {
2409 pub fn new(
2410 ready_rx: oneshot::Receiver<()>,
2411 future: impl Future<Output = ()> + Send + 'static,
2412 ) -> Self {
2413 Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2414 }
2415
2416 pub async fn start(self) -> Self {
2417 match self {
2418 Self::Unstarted(ready_rx, future) => {
2419 let future = future.into_inner();
2420 let handle = tokio::spawn(future);
2421 ready_rx.await.unwrap();
2422 Self::Started(handle)
2423 }
2424 Self::Started(_) => self,
2425 }
2426 }
2427}
2428
2429fn update_peer_addresses(
2433 config: &NodeConfig,
2434 endpoint_manager: &EndpointManager,
2435 epoch_start_state: &EpochStartSystemState,
2436 prev_epoch_start_state: Option<&EpochStartSystemState>,
2437) {
2438 if config.consensus_config().is_none() {
2439 return;
2440 }
2441 let new_peers: HashSet<PeerId> = epoch_start_state
2442 .get_validator_as_p2p_peers(config.protocol_public_key())
2443 .into_iter()
2444 .map(|(peer_id, address)| {
2445 endpoint_manager
2446 .update_endpoint(
2447 EndpointId::P2p(peer_id),
2448 AddressSource::Chain,
2449 vec![address],
2450 )
2451 .expect("Updating peer addresses should not fail");
2452 peer_id
2453 })
2454 .collect();
2455
2456 if let Some(prev) = prev_epoch_start_state {
2458 for (peer_id, _) in prev.get_validator_as_p2p_peers(config.protocol_public_key()) {
2459 if !new_peers.contains(&peer_id) {
2460 endpoint_manager
2461 .update_endpoint(EndpointId::P2p(peer_id), AddressSource::Chain, vec![])
2462 .expect("Clearing peer addresses should not fail");
2463 }
2464 }
2465 }
2466}
2467
2468fn build_kv_store(
2469 state: &Arc<AuthorityState>,
2470 config: &NodeConfig,
2471 registry: &Registry,
2472) -> Result<Arc<TransactionKeyValueStore>> {
2473 let metrics = KeyValueStoreMetrics::new(registry);
2474 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2475
2476 let base_url = &config.transaction_kv_store_read_config.base_url;
2477
2478 if base_url.is_empty() {
2479 info!("no http kv store url provided, using local db only");
2480 return Ok(Arc::new(db_store));
2481 }
2482
2483 let base_url: url::Url = base_url.parse().tap_err(|e| {
2484 error!(
2485 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2486 base_url, e
2487 )
2488 })?;
2489
2490 let network_str = match state.get_chain_identifier().chain() {
2491 Chain::Mainnet => "/mainnet",
2492 _ => {
2493 info!("using local db only for kv store");
2494 return Ok(Arc::new(db_store));
2495 }
2496 };
2497
2498 let base_url = base_url.join(network_str)?.to_string();
2499 let http_store = HttpKVStore::new_kv(
2500 &base_url,
2501 config.transaction_kv_store_read_config.cache_size,
2502 metrics.clone(),
2503 )?;
2504 info!("using local key-value store with fallback to http key-value store");
2505 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2506 db_store,
2507 http_store,
2508 metrics,
2509 "json_rpc_fallback",
2510 )))
2511}
2512
2513async fn build_http_servers(
2514 state: Arc<AuthorityState>,
2515 store: RocksDbStore,
2516 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2517 config: &NodeConfig,
2518 prometheus_registry: &Registry,
2519 server_version: ServerVersion,
2520 node_role: NodeRole,
2521) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2522 if !node_role.should_run_rpc_servers() {
2524 return Ok((HttpServers::default(), None));
2525 }
2526
2527 info!("starting rpc service with config: {:?}", config.rpc);
2528
2529 let mut router = axum::Router::new();
2530
2531 let json_rpc_router = {
2532 let traffic_controller = state.traffic_controller.clone();
2533 let mut server = JsonRpcServerBuilder::new(
2534 env!("CARGO_PKG_VERSION"),
2535 prometheus_registry,
2536 traffic_controller,
2537 config.policy_config.clone(),
2538 );
2539
2540 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2541
2542 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2543 server.register_module(ReadApi::new(
2544 state.clone(),
2545 kv_store.clone(),
2546 metrics.clone(),
2547 ))?;
2548 server.register_module(CoinReadApi::new(
2549 state.clone(),
2550 kv_store.clone(),
2551 metrics.clone(),
2552 ))?;
2553
2554 if config.run_with_range.is_none() {
2557 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2558 }
2559 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2560 server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2561
2562 if let Some(transaction_orchestrator) = transaction_orchestrator {
2563 server.register_module(TransactionExecutionApi::new(
2564 state.clone(),
2565 transaction_orchestrator.clone(),
2566 metrics.clone(),
2567 ))?;
2568 }
2569
2570 let name_service_config =
2571 if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2572 config.name_service_package_address,
2573 config.name_service_registry_id,
2574 config.name_service_reverse_registry_id,
2575 ) {
2576 sui_name_service::NameServiceConfig::new(
2577 package_address,
2578 registry_id,
2579 reverse_registry_id,
2580 )
2581 } else {
2582 match state.get_chain_identifier().chain() {
2583 Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2584 Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2585 Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2586 }
2587 };
2588
2589 server.register_module(IndexerApi::new(
2590 state.clone(),
2591 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2592 kv_store,
2593 name_service_config,
2594 metrics,
2595 config.indexer_max_subscriptions,
2596 ))?;
2597 server.register_module(MoveUtils::new(state.clone()))?;
2598
2599 let server_type = config.jsonrpc_server_type();
2600
2601 server.to_router(server_type).await?
2602 };
2603
2604 router = router.merge(json_rpc_router);
2605
2606 let (subscription_service_checkpoint_sender, subscription_service_handle) =
2607 SubscriptionService::build(prometheus_registry);
2608 let rpc_router = {
2609 let mut rpc_service =
2610 sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2611 rpc_service.with_server_version(server_version);
2612
2613 if let Some(config) = config.rpc.clone() {
2614 config.validate()?;
2615 rpc_service.with_config(config);
2616 }
2617
2618 rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2619 rpc_service.with_subscription_service(subscription_service_handle);
2620
2621 if let Some(transaction_orchestrator) = transaction_orchestrator {
2622 rpc_service.with_executor(transaction_orchestrator.clone())
2623 }
2624
2625 rpc_service.into_router().await
2626 };
2627
2628 let layers = ServiceBuilder::new()
2629 .map_request(|mut request: axum::http::Request<_>| {
2630 if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2631 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2632 request.extensions_mut().insert(axum_connect_info);
2633 }
2634 request
2635 })
2636 .layer(axum::middleware::from_fn(server_timing_middleware))
2637 .layer(
2639 tower_http::cors::CorsLayer::new()
2640 .allow_methods([http::Method::GET, http::Method::POST])
2641 .allow_origin(tower_http::cors::Any)
2642 .allow_headers(tower_http::cors::Any)
2643 .expose_headers(tower_http::cors::Any),
2644 );
2645
2646 router = router.merge(rpc_router).layer(layers);
2647
2648 let https = if let Some((tls_config, https_address)) = config
2649 .rpc()
2650 .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2651 {
2652 let https = sui_http::Builder::new()
2653 .tls_single_cert(tls_config.cert(), tls_config.key())
2654 .and_then(|builder| builder.serve(https_address, router.clone()))
2655 .map_err(|e| anyhow::anyhow!(e))?;
2656
2657 info!(
2658 https_address =? https.local_addr(),
2659 "HTTPS rpc server listening on {}",
2660 https.local_addr()
2661 );
2662
2663 Some(https)
2664 } else {
2665 None
2666 };
2667
2668 let http = sui_http::Builder::new()
2669 .serve(&config.json_rpc_address, router)
2670 .map_err(|e| anyhow::anyhow!(e))?;
2671
2672 info!(
2673 http_address =? http.local_addr(),
2674 "HTTP rpc server listening on {}",
2675 http.local_addr()
2676 );
2677
2678 Ok((
2679 HttpServers {
2680 http: Some(http),
2681 https,
2682 },
2683 Some(subscription_service_checkpoint_sender),
2684 ))
2685}
2686
2687#[cfg(not(test))]
2688fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2689 protocol_config.max_transactions_per_checkpoint() as usize
2690}
2691
2692#[cfg(test)]
2693fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2694 2
2695}
2696
2697#[derive(Default)]
2698struct HttpServers {
2699 #[allow(unused)]
2700 http: Option<sui_http::ServerHandle>,
2701 #[allow(unused)]
2702 https: Option<sui_http::ServerHandle>,
2703}
2704
2705#[cfg(test)]
2706mod tests {
2707 use super::*;
2708 use prometheus::Registry;
2709 use std::collections::BTreeMap;
2710 use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2711 use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2712 use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2713
2714 #[tokio::test]
2715 async fn test_fork_error_and_recovery_both_paths() {
2716 let checkpoint_store = CheckpointStore::new_for_tests();
2717 let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2718
2719 let seq_num = 42;
2721 let digest = CheckpointDigest::random();
2722 checkpoint_store
2723 .record_checkpoint_fork_detected(seq_num, digest)
2724 .unwrap();
2725
2726 let fork_recovery = ForkRecoveryConfig {
2727 transaction_overrides: Default::default(),
2728 checkpoint_overrides: Default::default(),
2729 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2730 };
2731
2732 let r = SuiNode::check_and_recover_forks(
2733 &checkpoint_store,
2734 &checkpoint_metrics,
2735 Some(&fork_recovery),
2736 )
2737 .await;
2738 assert!(r.is_err());
2739 assert!(
2740 r.unwrap_err()
2741 .to_string()
2742 .contains("Checkpoint fork detected")
2743 );
2744
2745 let mut checkpoint_overrides = BTreeMap::new();
2746 checkpoint_overrides.insert(seq_num, digest.to_string());
2747 let fork_recovery_with_override = ForkRecoveryConfig {
2748 transaction_overrides: Default::default(),
2749 checkpoint_overrides,
2750 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2751 };
2752 let r = SuiNode::check_and_recover_forks(
2753 &checkpoint_store,
2754 &checkpoint_metrics,
2755 Some(&fork_recovery_with_override),
2756 )
2757 .await;
2758 assert!(r.is_ok());
2759 assert!(
2760 checkpoint_store
2761 .get_checkpoint_fork_detected()
2762 .unwrap()
2763 .is_none()
2764 );
2765
2766 let tx_digest = TransactionDigest::random();
2768 let expected_effects = TransactionEffectsDigest::random();
2769 let actual_effects = TransactionEffectsDigest::random();
2770 checkpoint_store
2771 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2772 .unwrap();
2773
2774 let fork_recovery = ForkRecoveryConfig {
2775 transaction_overrides: Default::default(),
2776 checkpoint_overrides: Default::default(),
2777 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2778 };
2779 let r = SuiNode::check_and_recover_forks(
2780 &checkpoint_store,
2781 &checkpoint_metrics,
2782 Some(&fork_recovery),
2783 )
2784 .await;
2785 assert!(r.is_err());
2786 assert!(
2787 r.unwrap_err()
2788 .to_string()
2789 .contains("Transaction fork detected")
2790 );
2791
2792 let mut transaction_overrides = BTreeMap::new();
2793 transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2794 let fork_recovery_with_override = ForkRecoveryConfig {
2795 transaction_overrides,
2796 checkpoint_overrides: Default::default(),
2797 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2798 };
2799 let r = SuiNode::check_and_recover_forks(
2800 &checkpoint_store,
2801 &checkpoint_metrics,
2802 Some(&fork_recovery_with_override),
2803 )
2804 .await;
2805 assert!(r.is_ok());
2806 assert!(
2807 checkpoint_store
2808 .get_transaction_fork_detected()
2809 .unwrap()
2810 .is_none()
2811 );
2812 }
2813}