1use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::admission_queue::{
29 AdmissionQueueContext, AdmissionQueueManager, AdmissionQueueMetrics,
30};
31use sui_core::authority::ExecutionEnv;
32use sui_core::authority::RandomnessRoundReceiver;
33use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
34use sui_core::authority::backpressure::BackpressureManager;
35use sui_core::authority::epoch_start_configuration::EpochFlag;
36use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
37use sui_core::consensus_adapter::ConsensusClient;
38use sui_core::consensus_manager::UpdatableConsensusClient;
39use sui_core::epoch::randomness::RandomnessManager;
40use sui_core::execution_cache::build_execution_cache;
41use sui_network::endpoint_manager::{AddressSource, EndpointId};
42use sui_network::validator::server::SUI_TLS_SERVER_NAME;
43use sui_types::full_checkpoint_content::Checkpoint;
44use sui_types::node_role::NodeRole;
45
46use sui_core::global_state_hasher::GlobalStateHashMetrics;
47use sui_core::storage::RestReadStore;
48use sui_json_rpc::bridge_api::BridgeReadApi;
49use sui_json_rpc_api::JsonRpcMetrics;
50use sui_network::randomness;
51use sui_rpc_api::RpcMetrics;
52use sui_rpc_api::ServerVersion;
53use sui_rpc_api::subscription::SubscriptionService;
54use sui_types::base_types::ConciseableName;
55use sui_types::crypto::RandomnessRound;
56use sui_types::digests::{
57 ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
58};
59use sui_types::messages_consensus::AuthorityCapabilitiesV2;
60use sui_types::sui_system_state::SuiSystemState;
61use tap::tap::TapFallible;
62use tokio::sync::oneshot;
63use tokio::sync::{Mutex, broadcast, mpsc};
64use tokio::task::JoinHandle;
65use tower::ServiceBuilder;
66use tracing::{Instrument, error_span, info};
67use tracing::{debug, error, warn};
68
69macro_rules! jwk_log {
73 ($($arg:tt)+) => {
74 if in_test_configuration() {
75 debug!($($arg)+);
76 } else {
77 info!($($arg)+);
78 }
79 };
80}
81
82use fastcrypto_zkp::bn254::zk_login::JWK;
83pub use handle::SuiNodeHandle;
84use mysten_metrics::{RegistryService, spawn_monitored_task};
85use mysten_service::server_timing::server_timing_middleware;
86use sui_config::node::{DBCheckpointConfig, RunWithRange};
87use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
88use sui_config::node_config_metrics::NodeConfigMetrics;
89use sui_config::{ConsensusConfig, NodeConfig};
90use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
91use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
92use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
93use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
94use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
95use sui_core::authority_aggregator::AuthorityAggregator;
96use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
97use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
98use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
99use sui_core::checkpoints::{
100 CheckpointMetrics, CheckpointOutput, CheckpointService, CheckpointStore, LogCheckpointOutput,
101 SendCheckpointToStateSync, SubmitCheckpointToConsensus,
102};
103use sui_core::consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics};
104use sui_core::consensus_manager::ConsensusManager;
105use sui_core::consensus_throughput_calculator::ConsensusThroughputCalculator;
106use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
107use sui_core::db_checkpoint_handler::DBCheckpointHandler;
108use sui_core::epoch::committee_store::CommitteeStore;
109use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
110use sui_core::epoch::epoch_metrics::EpochMetrics;
111use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
112use sui_core::global_state_hasher::GlobalStateHasher;
113use sui_core::jsonrpc_index::IndexStore;
114use sui_core::module_cache_metrics::ResolverMetrics;
115use sui_core::overload_monitor::overload_monitor;
116use sui_core::rpc_index::RpcIndexStore;
117use sui_core::signature_verifier::SignatureVerifierMetrics;
118use sui_core::storage::RocksDbStore;
119use sui_core::transaction_orchestrator::TransactionOrchestrator;
120use sui_core::{
121 authority::{AuthorityState, AuthorityStore},
122 authority_client::NetworkAuthorityClient,
123};
124use sui_json_rpc::JsonRpcServerBuilder;
125use sui_json_rpc::coin_api::CoinReadApi;
126use sui_json_rpc::governance_api::GovernanceReadApi;
127use sui_json_rpc::indexer_api::IndexerApi;
128use sui_json_rpc::move_utils::MoveUtils;
129use sui_json_rpc::read_api::ReadApi;
130use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
131use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
132use sui_macros::fail_point;
133use sui_macros::{fail_point_async, replay_log};
134use sui_network::api::ValidatorServer;
135use sui_network::discovery;
136use sui_network::endpoint_manager::EndpointManager;
137use sui_network::state_sync;
138use sui_network::validator::server::ServerBuilder;
139use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
140use sui_snapshot::uploader::StateSnapshotUploader;
141use sui_storage::{
142 http_key_value_store::HttpKVStore,
143 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
144 key_value_store_metrics::KeyValueStoreMetrics,
145};
146use sui_types::base_types::{AuthorityName, EpochId};
147use sui_types::committee::Committee;
148use sui_types::crypto::KeypairTraits;
149use sui_types::error::{SuiError, SuiResult};
150use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
151use sui_types::sui_system_state::SuiSystemStateTrait;
152use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
153use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
154use sui_types::supported_protocol_versions::SupportedProtocolVersions;
155use typed_store::DBMetrics;
156use typed_store::rocks::default_db_options;
157
158use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
159
160pub mod admin;
161mod handle;
162pub mod metrics;
163
164pub struct ValidatorComponents {
165 validator_server_handle: Option<SpawnOnce>,
166 validator_overload_monitor_handle: Option<JoinHandle<()>>,
167 consensus_manager: Arc<ConsensusManager>,
168 consensus_store_pruner: ConsensusStorePruner,
169 consensus_adapter: Arc<ConsensusAdapter>,
170 checkpoint_metrics: Arc<CheckpointMetrics>,
171 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
172 admission_queue: Option<AdmissionQueueContext>,
173}
174pub struct P2pComponents {
175 p2p_network: Network,
176 known_peers: HashMap<PeerId, String>,
177 discovery_handle: discovery::Handle,
178 state_sync_handle: state_sync::Handle,
179 randomness_handle: randomness::Handle,
180 endpoint_manager: EndpointManager,
181}
182
183#[cfg(msim)]
184mod simulator {
185 use std::sync::atomic::AtomicBool;
186 use sui_types::error::SuiErrorKind;
187
188 use super::*;
189 pub(super) struct SimState {
190 pub sim_node: sui_simulator::runtime::NodeHandle,
191 pub sim_safe_mode_expected: AtomicBool,
192 _leak_detector: sui_simulator::NodeLeakDetector,
193 }
194
195 impl Default for SimState {
196 fn default() -> Self {
197 Self {
198 sim_node: sui_simulator::runtime::NodeHandle::current(),
199 sim_safe_mode_expected: AtomicBool::new(false),
200 _leak_detector: sui_simulator::NodeLeakDetector::new(),
201 }
202 }
203 }
204
205 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
206 + Send
207 + Sync
208 + 'static;
209
210 fn default_fetch_jwks(
211 _authority: AuthorityName,
212 _provider: &OIDCProvider,
213 ) -> SuiResult<Vec<(JwkId, JWK)>> {
214 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
215 parse_jwks(
217 sui_types::zk_login_util::DEFAULT_JWK_BYTES,
218 &OIDCProvider::Twitch,
219 true,
220 )
221 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
222 }
223
224 thread_local! {
225 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
226 }
227
228 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
229 JWK_INJECTOR.with(|injector| injector.borrow().clone())
230 }
231
232 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
233 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
234 }
235}
236
237#[cfg(msim)]
238pub use simulator::set_jwk_injector;
239#[cfg(msim)]
240use simulator::*;
241use sui_core::authority::authority_store_pruner::PrunerWatermarks;
242use sui_core::{
243 consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
244};
245
246const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
247
248pub struct SuiNode {
249 config: NodeConfig,
250 validator_components: Mutex<Option<ValidatorComponents>>,
251
252 #[allow(unused)]
254 http_servers: HttpServers,
255
256 state: Arc<AuthorityState>,
257 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
258 registry_service: RegistryService,
259 metrics: Arc<SuiNodeMetrics>,
260 checkpoint_metrics: Arc<CheckpointMetrics>,
261
262 _discovery: discovery::Handle,
263 _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
264 state_sync_handle: state_sync::Handle,
265 randomness_handle: randomness::Handle,
266 checkpoint_store: Arc<CheckpointStore>,
267 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
268
269 end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
271
272 endpoint_manager: EndpointManager,
274
275 backpressure_manager: Arc<BackpressureManager>,
276
277 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
278
279 #[cfg(msim)]
280 sim_state: SimState,
281
282 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
283 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
285
286 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
291
292 subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
293}
294
295impl fmt::Debug for SuiNode {
296 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
297 f.debug_struct("SuiNode")
298 .field("name", &self.state.name.concise())
299 .finish()
300 }
301}
302
303static MAX_JWK_KEYS_PER_FETCH: usize = 100;
304
305impl SuiNode {
306 pub async fn start(
307 config: NodeConfig,
308 registry_service: RegistryService,
309 ) -> Result<Arc<SuiNode>> {
310 Self::start_async(
311 config,
312 registry_service,
313 ServerVersion::new("sui-node", "unknown"),
314 )
315 .await
316 }
317
318 fn start_jwk_updater(
319 config: &NodeConfig,
320 metrics: Arc<SuiNodeMetrics>,
321 authority: AuthorityName,
322 epoch_store: Arc<AuthorityPerEpochStore>,
323 consensus_adapter: Arc<ConsensusAdapter>,
324 ) {
325 let epoch = epoch_store.epoch();
326
327 let supported_providers = config
328 .zklogin_oauth_providers
329 .get(&epoch_store.get_chain_identifier().chain())
330 .unwrap_or(&BTreeSet::new())
331 .iter()
332 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
333 .collect::<Vec<_>>();
334
335 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
336
337 info!(
338 ?fetch_interval,
339 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
340 );
341
342 fn validate_jwk(
343 metrics: &Arc<SuiNodeMetrics>,
344 provider: &OIDCProvider,
345 id: &JwkId,
346 jwk: &JWK,
347 ) -> bool {
348 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
349 warn!(
350 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
351 id.iss, provider
352 );
353 metrics
354 .invalid_jwks
355 .with_label_values(&[&provider.to_string()])
356 .inc();
357 return false;
358 };
359
360 if iss_provider != *provider {
361 warn!(
362 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
363 id.iss, provider, iss_provider
364 );
365 metrics
366 .invalid_jwks
367 .with_label_values(&[&provider.to_string()])
368 .inc();
369 return false;
370 }
371
372 if !check_total_jwk_size(id, jwk) {
373 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
374 metrics
375 .invalid_jwks
376 .with_label_values(&[&provider.to_string()])
377 .inc();
378 return false;
379 }
380
381 true
382 }
383
384 for p in supported_providers.into_iter() {
393 let provider_str = p.to_string();
394 let epoch_store = epoch_store.clone();
395 let consensus_adapter = consensus_adapter.clone();
396 let metrics = metrics.clone();
397 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
398 async move {
399 let mut seen = HashSet::new();
402 loop {
403 jwk_log!("fetching JWK for provider {:?}", p);
404 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
405 match Self::fetch_jwks(authority, &p).await {
406 Err(e) => {
407 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
408 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
409 tokio::time::sleep(Duration::from_secs(30)).await;
411 continue;
412 }
413 Ok(mut keys) => {
414 metrics.total_jwks
415 .with_label_values(&[&provider_str])
416 .inc_by(keys.len() as u64);
417
418 keys.retain(|(id, jwk)| {
419 validate_jwk(&metrics, &p, id, jwk) &&
420 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
421 seen.insert((id.clone(), jwk.clone()))
422 });
423
424 metrics.unique_jwks
425 .with_label_values(&[&provider_str])
426 .inc_by(keys.len() as u64);
427
428 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
431 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
432 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
433 }
434
435 for (id, jwk) in keys.into_iter() {
436 jwk_log!("Submitting JWK to consensus: {:?}", id);
437
438 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
439 consensus_adapter.submit(txn, None, &epoch_store, None, None)
440 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
441 .ok();
442 }
443 }
444 }
445 tokio::time::sleep(fetch_interval).await;
446 }
447 }
448 .instrument(error_span!("jwk_updater_task", epoch)),
449 ));
450 }
451 }
452
453 pub async fn start_async(
454 config: NodeConfig,
455 registry_service: RegistryService,
456 server_version: ServerVersion,
457 ) -> Result<Arc<SuiNode>> {
458 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
459 let mut config = config.clone();
460 if config.supported_protocol_versions.is_none() {
461 info!(
462 "populating config.supported_protocol_versions with default {:?}",
463 SupportedProtocolVersions::SYSTEM_DEFAULT
464 );
465 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
466 }
467
468 let run_with_range = config.run_with_range;
469 let prometheus_registry = registry_service.default_registry();
470 let node_role = config.intended_node_role();
471
472 info!(node =? config.protocol_public_key(),
473 "Initializing sui-node listening on {} with role {:?}", config.network_address, node_role
474 );
475
476 DBMetrics::init(registry_service.clone());
478
479 typed_store::init_write_sync(config.enable_db_sync_to_disk);
481
482 mysten_metrics::init_metrics(&prometheus_registry);
484 #[cfg(not(msim))]
486 mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
487
488 let genesis = config.genesis()?.clone();
489
490 let secret = Arc::pin(config.protocol_key_pair().copy());
491 let genesis_committee = genesis.committee();
492 let committee_store = Arc::new(CommitteeStore::new(
493 config.db_path().join("epochs"),
494 &genesis_committee,
495 None,
496 ));
497
498 let pruner_watermarks = Arc::new(PrunerWatermarks::default());
499 let checkpoint_store = CheckpointStore::new(
500 &config.db_path().join("checkpoints"),
501 pruner_watermarks.clone(),
502 );
503 let checkpoint_metrics = CheckpointMetrics::new(®istry_service.default_registry());
504
505 if node_role.runs_consensus() {
506 Self::check_and_recover_forks(
507 &checkpoint_store,
508 &checkpoint_metrics,
509 config.fork_recovery.as_ref(),
510 )
511 .await?;
512 }
513
514 let enable_write_stall = config
516 .enable_db_write_stall
517 .unwrap_or(node_role.runs_consensus());
518 let enable_objects_compactor = node_role.is_validator()
525 || config.authority_store_pruning_config.num_epochs_to_retain == 0;
526 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
527 enable_write_stall,
528 enable_objects_compactor,
529 };
530 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
531 &config.db_store_path(),
532 Some(perpetual_tables_options),
533 Some(pruner_watermarks.epoch_id.clone()),
534 ));
535 let is_genesis = perpetual_tables
536 .database_is_empty()
537 .expect("Database read should not fail at init.");
538
539 let backpressure_manager =
540 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
541
542 let store =
543 AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
544
545 let cur_epoch = store.get_recovery_epoch_at_restart()?;
546 let committee = committee_store
547 .get_committee(&cur_epoch)?
548 .expect("Committee of the current epoch must exist");
549 let epoch_start_configuration = store
550 .get_epoch_start_configuration()?
551 .expect("EpochStartConfiguration of the current epoch must exist");
552 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
553 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
554
555 let cache_traits = build_execution_cache(
556 &config.execution_cache,
557 &prometheus_registry,
558 &store,
559 backpressure_manager.clone(),
560 );
561
562 let auth_agg = {
563 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
564 Arc::new(ArcSwap::new(Arc::new(
565 AuthorityAggregator::new_from_epoch_start_state(
566 epoch_start_configuration.epoch_start_state(),
567 &committee_store,
568 safe_client_metrics_base,
569 ),
570 )))
571 };
572
573 let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
574 let chain = match config.chain_override_for_testing {
575 Some(chain) => chain,
576 None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
577 };
578
579 let highest_executed_checkpoint = checkpoint_store
580 .get_highest_executed_checkpoint_seq_number()
581 .expect("checkpoint store read cannot fail")
582 .unwrap_or(0);
583
584 let previous_epoch_last_checkpoint = if cur_epoch == 0 {
585 0
586 } else {
587 checkpoint_store
588 .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
589 .expect("checkpoint store read cannot fail")
590 .unwrap_or(highest_executed_checkpoint)
591 };
592
593 let epoch_options = default_db_options().optimize_db_for_write_throughput(4, false);
594 let epoch_store = AuthorityPerEpochStore::new(
595 config.protocol_public_key(),
596 committee.clone(),
597 &config.db_store_path(),
598 Some(epoch_options.options),
599 EpochMetrics::new(®istry_service.default_registry()),
600 epoch_start_configuration,
601 cache_traits.backing_package_store.clone(),
602 cache_traits.object_store.clone(),
603 cache_metrics,
604 signature_verifier_metrics,
605 &config.expensive_safety_check_config,
606 (chain_id, chain),
607 highest_executed_checkpoint,
608 previous_epoch_last_checkpoint,
609 Arc::new(SubmittedTransactionCacheMetrics::new(
610 ®istry_service.default_registry(),
611 )),
612 config.fullnode_sync_mode,
613 )?;
614
615 info!("created epoch store");
616
617 replay_log!(
618 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
619 epoch_store.epoch(),
620 epoch_store.protocol_config()
621 );
622
623 if is_genesis {
625 info!("checking SUI conservation at genesis");
626 cache_traits
631 .reconfig_api
632 .expensive_check_sui_conservation(&epoch_store)
633 .expect("SUI conservation check cannot fail at genesis");
634 }
635
636 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
637 let default_buffer_stake = epoch_store
638 .protocol_config()
639 .buffer_stake_for_protocol_upgrade_bps();
640 if effective_buffer_stake != default_buffer_stake {
641 warn!(
642 ?effective_buffer_stake,
643 ?default_buffer_stake,
644 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
645 );
646 }
647
648 checkpoint_store.insert_genesis_checkpoint(
649 genesis.checkpoint(),
650 genesis.checkpoint_contents().clone(),
651 &epoch_store,
652 );
653
654 info!("creating state sync store");
655 let state_sync_store = RocksDbStore::new(
656 cache_traits.clone(),
657 committee_store.clone(),
658 checkpoint_store.clone(),
659 );
660
661 let index_store =
662 if node_role.should_enable_index_processing() && config.enable_index_processing {
663 info!("creating jsonrpc index store");
664 Some(Arc::new(IndexStore::new(
665 config.db_path().join("indexes"),
666 &prometheus_registry,
667 epoch_store
668 .protocol_config()
669 .max_move_identifier_len_as_option(),
670 config.remove_deprecated_tables,
671 )))
672 } else {
673 None
674 };
675
676 let rpc_index = if node_role.should_enable_index_processing()
677 && config.rpc().is_some_and(|rpc| rpc.enable_indexing())
678 {
679 info!("creating rpc index store");
680 Some(Arc::new(
681 RpcIndexStore::new(
682 &config.db_path(),
683 &store,
684 &checkpoint_store,
685 &epoch_store,
686 &cache_traits.backing_package_store,
687 pruner_watermarks.checkpoint_id.clone(),
688 config.rpc().cloned().unwrap_or_default(),
689 )
690 .await,
691 ))
692 } else {
693 None
694 };
695
696 let chain_identifier = epoch_store.get_chain_identifier();
697
698 info!("creating archive reader");
699 let (randomness_tx, randomness_rx) = mpsc::channel(
701 config
702 .p2p_config
703 .randomness
704 .clone()
705 .unwrap_or_default()
706 .mailbox_capacity(),
707 );
708 let P2pComponents {
709 p2p_network,
710 known_peers,
711 discovery_handle,
712 state_sync_handle,
713 randomness_handle,
714 endpoint_manager,
715 } = Self::create_p2p_network(
716 &config,
717 state_sync_store.clone(),
718 chain_identifier,
719 randomness_tx,
720 &prometheus_registry,
721 )?;
722
723 for peer in &config.p2p_config.peer_address_overrides {
725 endpoint_manager
726 .update_endpoint(
727 EndpointId::P2p(peer.peer_id),
728 AddressSource::Config,
729 peer.addresses.clone(),
730 )
731 .expect("Updating peer address overrides should not fail");
732 }
733
734 update_peer_addresses(
736 &config,
737 &endpoint_manager,
738 epoch_store.epoch_start_state(),
739 None,
740 );
741
742 info!("start snapshot upload");
743 let state_snapshot_handle = Self::start_state_snapshot(
745 &config,
746 &prometheus_registry,
747 checkpoint_store.clone(),
748 chain_identifier,
749 )?;
750
751 info!("start db checkpoint");
753 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
754 &config,
755 &prometheus_registry,
756 state_snapshot_handle.is_some(),
757 )?;
758
759 if !epoch_store
760 .protocol_config()
761 .simplified_unwrap_then_delete()
762 {
763 config
765 .authority_store_pruning_config
766 .set_killswitch_tombstone_pruning(true);
767 }
768
769 let authority_name = config.protocol_public_key();
770
771 info!("create authority state");
772 let state = AuthorityState::new(
773 authority_name,
774 secret,
775 config.supported_protocol_versions.unwrap(),
776 store.clone(),
777 cache_traits.clone(),
778 epoch_store.clone(),
779 committee_store.clone(),
780 index_store.clone(),
781 rpc_index,
782 checkpoint_store.clone(),
783 &prometheus_registry,
784 genesis.objects(),
785 &db_checkpoint_config,
786 config.clone(),
787 chain_identifier,
788 config.policy_config.clone(),
789 config.firewall_config.clone(),
790 pruner_watermarks,
791 )
792 .await;
793 if epoch_store.epoch() == 0 {
795 let txn = &genesis.transaction();
796 let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
797 let transaction =
798 sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
799 sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
800 genesis.transaction().data().clone(),
801 sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
802 ),
803 );
804 state
805 .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
806 .instrument(span)
807 .await
808 .unwrap();
809 }
810
811 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
813
814 let (end_of_epoch_channel, end_of_epoch_receiver) =
815 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
816
817 let transaction_orchestrator = if node_role.is_fullnode() && run_with_range.is_none() {
818 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
819 auth_agg.load_full(),
820 state.clone(),
821 end_of_epoch_receiver,
822 &config.db_path(),
823 &prometheus_registry,
824 &config,
825 )))
826 } else {
827 None
828 };
829
830 let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
831 state.clone(),
832 state_sync_store,
833 &transaction_orchestrator.clone(),
834 &config,
835 &prometheus_registry,
836 server_version,
837 node_role,
838 )
839 .await?;
840
841 let global_state_hasher = Arc::new(GlobalStateHasher::new(
842 cache_traits.global_state_hash_store.clone(),
843 GlobalStateHashMetrics::new(&prometheus_registry),
844 ));
845
846 let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
847 "sui",
848 ®istry_service.default_registry(),
849 );
850
851 let connection_monitor_handle =
852 mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
853 p2p_network.downgrade(),
854 Arc::new(network_connection_metrics),
855 known_peers,
856 );
857
858 let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry()));
859
860 sui_node_metrics
861 .binary_max_protocol_version
862 .set(ProtocolVersion::MAX.as_u64() as i64);
863 sui_node_metrics
864 .configured_max_protocol_version
865 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
866
867 let node_role = epoch_store.node_role();
868 let validator_components = if node_role.runs_consensus() {
869 let mut components = Self::construct_validator_components(
870 config.clone(),
871 state.clone(),
872 committee,
873 epoch_store.clone(),
874 checkpoint_store.clone(),
875 state_sync_handle.clone(),
876 randomness_handle.clone(),
877 Arc::downgrade(&global_state_hasher),
878 backpressure_manager.clone(),
879 ®istry_service,
880 sui_node_metrics.clone(),
881 checkpoint_metrics.clone(),
882 node_role,
883 )
884 .await?;
885
886 if node_role.is_validator() {
887 components
888 .consensus_adapter
889 .recover_end_of_publish(&epoch_store);
890
891 components.validator_server_handle = Some(
893 components
894 .validator_server_handle
895 .take()
896 .unwrap()
897 .start()
898 .await,
899 );
900
901 endpoint_manager
903 .set_consensus_address_updater(components.consensus_manager.clone());
904 } else {
905 info!("Starting node as Observer — connecting to configured peers");
906 }
907
908 Some(components)
909 } else {
910 None
911 };
912
913 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
915
916 let node = Self {
917 config,
918 validator_components: Mutex::new(validator_components),
919 http_servers,
920 state,
921 transaction_orchestrator,
922 registry_service,
923 metrics: sui_node_metrics,
924 checkpoint_metrics,
925
926 _discovery: discovery_handle,
927 _connection_monitor_handle: connection_monitor_handle,
928 state_sync_handle,
929 randomness_handle,
930 checkpoint_store,
931 global_state_hasher: Mutex::new(Some(global_state_hasher)),
932 end_of_epoch_channel,
933 endpoint_manager,
934 backpressure_manager,
935
936 _db_checkpoint_handle: db_checkpoint_handle,
937
938 #[cfg(msim)]
939 sim_state: Default::default(),
940
941 _state_snapshot_uploader_handle: state_snapshot_handle,
942 shutdown_channel_tx: shutdown_channel,
943
944 auth_agg,
945 subscription_service_checkpoint_sender,
946 };
947
948 info!("SuiNode started!");
949 let node = Arc::new(node);
950 let node_copy = node.clone();
951 spawn_monitored_task!(async move {
952 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
953 if let Err(error) = result {
954 warn!("Reconfiguration finished with error {:?}", error);
955 }
956 });
957
958 Ok(node)
959 }
960
961 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
962 self.end_of_epoch_channel.subscribe()
963 }
964
965 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
966 self.shutdown_channel_tx.subscribe()
967 }
968
969 pub fn current_epoch_for_testing(&self) -> EpochId {
970 self.state.current_epoch_for_testing()
971 }
972
973 pub fn db_checkpoint_path(&self) -> PathBuf {
974 self.config.db_checkpoint_path()
975 }
976
977 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
979 info!("close_epoch (current epoch = {})", epoch_store.epoch());
980 self.validator_components
981 .lock()
982 .await
983 .as_ref()
984 .ok_or_else(|| SuiError::from("Node is not a validator"))?
985 .consensus_adapter
986 .close_epoch(epoch_store);
987 Ok(())
988 }
989
990 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
991 self.state
992 .clear_override_protocol_upgrade_buffer_stake(epoch)
993 }
994
995 pub fn set_override_protocol_upgrade_buffer_stake(
996 &self,
997 epoch: EpochId,
998 buffer_stake_bps: u64,
999 ) -> SuiResult {
1000 self.state
1001 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1002 }
1003
1004 pub async fn close_epoch_for_testing(&self) -> SuiResult {
1007 let epoch_store = self.state.epoch_store_for_testing();
1008 self.close_epoch(&epoch_store).await
1009 }
1010
1011 fn start_state_snapshot(
1012 config: &NodeConfig,
1013 prometheus_registry: &Registry,
1014 checkpoint_store: Arc<CheckpointStore>,
1015 chain_identifier: ChainIdentifier,
1016 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1017 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1018 let snapshot_uploader = StateSnapshotUploader::new(
1019 &config.db_checkpoint_path(),
1020 &config.snapshot_path(),
1021 remote_store_config.clone(),
1022 60,
1023 prometheus_registry,
1024 checkpoint_store,
1025 chain_identifier,
1026 config.state_snapshot_write_config.archive_interval_epochs,
1027 )?;
1028 Ok(Some(snapshot_uploader.start()))
1029 } else {
1030 Ok(None)
1031 }
1032 }
1033
1034 fn start_db_checkpoint(
1035 config: &NodeConfig,
1036 prometheus_registry: &Registry,
1037 state_snapshot_enabled: bool,
1038 ) -> Result<(
1039 DBCheckpointConfig,
1040 Option<tokio::sync::broadcast::Sender<()>>,
1041 )> {
1042 let checkpoint_path = Some(
1043 config
1044 .db_checkpoint_config
1045 .checkpoint_path
1046 .clone()
1047 .unwrap_or_else(|| config.db_checkpoint_path()),
1048 );
1049 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1050 DBCheckpointConfig {
1051 checkpoint_path,
1052 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1053 true
1054 } else {
1055 config
1056 .db_checkpoint_config
1057 .perform_db_checkpoints_at_epoch_end
1058 },
1059 ..config.db_checkpoint_config.clone()
1060 }
1061 } else {
1062 config.db_checkpoint_config.clone()
1063 };
1064
1065 match (
1066 db_checkpoint_config.object_store_config.as_ref(),
1067 state_snapshot_enabled,
1068 ) {
1069 (None, false) => Ok((db_checkpoint_config, None)),
1074 (_, _) => {
1075 let handler = DBCheckpointHandler::new(
1076 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1077 db_checkpoint_config.object_store_config.as_ref(),
1078 60,
1079 db_checkpoint_config
1080 .prune_and_compact_before_upload
1081 .unwrap_or(true),
1082 config.authority_store_pruning_config.clone(),
1083 prometheus_registry,
1084 state_snapshot_enabled,
1085 )?;
1086 Ok((
1087 db_checkpoint_config,
1088 Some(DBCheckpointHandler::start(handler)),
1089 ))
1090 }
1091 }
1092 }
1093
1094 fn create_p2p_network(
1095 config: &NodeConfig,
1096 state_sync_store: RocksDbStore,
1097 chain_identifier: ChainIdentifier,
1098 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1099 prometheus_registry: &Registry,
1100 ) -> Result<P2pComponents> {
1101 let mut p2p_config = config.p2p_config.clone();
1102 {
1103 let disc = p2p_config.discovery.get_or_insert_with(Default::default);
1104 if disc.peer_addr_store_path.is_none() {
1105 disc.peer_addr_store_path =
1106 Some(config.db_path().join("discovery_peer_cache.yaml"));
1107 }
1108 }
1109 let mut discovery_builder = discovery::Builder::new().config(p2p_config.clone());
1110 if let Some(consensus_config) = &config.consensus_config {
1111 let effective_addr = consensus_config
1112 .external_address
1113 .as_ref()
1114 .or(consensus_config.listen_address.as_ref());
1115 if let Some(addr) = effective_addr {
1116 discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1117 }
1118 }
1119 let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1120 let discovery_sender = discovery.sender();
1121
1122 let (state_sync, state_sync_router) = state_sync::Builder::new()
1123 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1124 .store(state_sync_store)
1125 .archive_config(config.archive_reader_config())
1126 .discovery_sender(discovery_sender)
1127 .with_metrics(prometheus_registry)
1128 .build();
1129
1130 let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1131 let known_peers: HashMap<PeerId, String> = discovery_config
1132 .allowlisted_peers
1133 .clone()
1134 .into_iter()
1135 .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1136 .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1137 peer.peer_id
1138 .map(|peer_id| (peer_id, "seed_peer".to_string()))
1139 }))
1140 .collect();
1141
1142 let (randomness, randomness_router) =
1143 randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1144 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1145 .with_metrics(prometheus_registry)
1146 .build();
1147
1148 let p2p_network = {
1149 let routes = anemo::Router::new()
1150 .add_rpc_service(discovery_server)
1151 .merge(state_sync_router);
1152 let routes = routes.merge(randomness_router);
1153
1154 let inbound_network_metrics =
1155 mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1156 let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1157 "sui",
1158 "outbound",
1159 prometheus_registry,
1160 );
1161
1162 let service = ServiceBuilder::new()
1163 .layer(
1164 TraceLayer::new_for_server_errors()
1165 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1166 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1167 )
1168 .layer(CallbackLayer::new(
1169 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1170 Arc::new(inbound_network_metrics),
1171 config.p2p_config.excessive_message_size(),
1172 ),
1173 ))
1174 .service(routes);
1175
1176 let outbound_layer = ServiceBuilder::new()
1177 .layer(
1178 TraceLayer::new_for_client_and_server_errors()
1179 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1180 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1181 )
1182 .layer(CallbackLayer::new(
1183 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1184 Arc::new(outbound_network_metrics),
1185 config.p2p_config.excessive_message_size(),
1186 ),
1187 ))
1188 .into_inner();
1189
1190 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1191 anemo_config.max_request_frame_size = Some(1 << 20);
1194 anemo_config.max_response_frame_size = Some(128 << 20);
1197
1198 let mut quic_config = anemo_config.quic.unwrap_or_default();
1201 if quic_config.socket_send_buffer_size.is_none() {
1202 quic_config.socket_send_buffer_size = Some(20 << 20);
1203 }
1204 if quic_config.socket_receive_buffer_size.is_none() {
1205 quic_config.socket_receive_buffer_size = Some(20 << 20);
1206 }
1207 quic_config.allow_failed_socket_buffer_size_setting = true;
1208
1209 if quic_config.max_concurrent_bidi_streams.is_none() {
1212 quic_config.max_concurrent_bidi_streams = Some(500);
1213 }
1214 if quic_config.max_concurrent_uni_streams.is_none() {
1215 quic_config.max_concurrent_uni_streams = Some(500);
1216 }
1217 if quic_config.stream_receive_window.is_none() {
1218 quic_config.stream_receive_window = Some(100 << 20);
1219 }
1220 if quic_config.receive_window.is_none() {
1221 quic_config.receive_window = Some(200 << 20);
1222 }
1223 if quic_config.send_window.is_none() {
1224 quic_config.send_window = Some(200 << 20);
1225 }
1226 if quic_config.crypto_buffer_size.is_none() {
1227 quic_config.crypto_buffer_size = Some(1 << 20);
1228 }
1229 if quic_config.max_idle_timeout_ms.is_none() {
1230 quic_config.max_idle_timeout_ms = Some(10_000);
1231 }
1232 if quic_config.keep_alive_interval_ms.is_none() {
1233 quic_config.keep_alive_interval_ms = Some(5_000);
1234 }
1235 anemo_config.quic = Some(quic_config);
1236
1237 let server_name = format!("sui-{}", chain_identifier);
1238 let network = Network::bind(config.p2p_config.listen_address)
1239 .server_name(&server_name)
1240 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1241 .config(anemo_config)
1242 .outbound_request_layer(outbound_layer)
1243 .start(service)?;
1244 info!(
1245 server_name = server_name,
1246 "P2p network started on {}",
1247 network.local_addr()
1248 );
1249
1250 network
1251 };
1252
1253 let discovery_handle =
1254 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1255 let state_sync_handle = state_sync.start(p2p_network.clone());
1256 let randomness_handle = randomness.start(p2p_network.clone());
1257
1258 Ok(P2pComponents {
1259 p2p_network,
1260 known_peers,
1261 discovery_handle,
1262 state_sync_handle,
1263 randomness_handle,
1264 endpoint_manager,
1265 })
1266 }
1267
1268 async fn construct_validator_components(
1269 config: NodeConfig,
1270 state: Arc<AuthorityState>,
1271 committee: Arc<Committee>,
1272 epoch_store: Arc<AuthorityPerEpochStore>,
1273 checkpoint_store: Arc<CheckpointStore>,
1274 state_sync_handle: state_sync::Handle,
1275 randomness_handle: randomness::Handle,
1276 global_state_hasher: Weak<GlobalStateHasher>,
1277 backpressure_manager: Arc<BackpressureManager>,
1278 registry_service: &RegistryService,
1279 sui_node_metrics: Arc<SuiNodeMetrics>,
1280 checkpoint_metrics: Arc<CheckpointMetrics>,
1281 node_role: NodeRole,
1282 ) -> Result<ValidatorComponents> {
1283 let mut config_clone = config.clone();
1284 let consensus_config = config_clone
1285 .consensus_config
1286 .as_mut()
1287 .ok_or_else(|| anyhow!("Node is missing consensus config"))?;
1288
1289 let client = Arc::new(UpdatableConsensusClient::new());
1290 let inflight_slot_freed_notify = Arc::new(tokio::sync::Notify::new());
1291 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1292 &committee,
1293 consensus_config,
1294 state.name,
1295 ®istry_service.default_registry(),
1296 client.clone(),
1297 checkpoint_store.clone(),
1298 inflight_slot_freed_notify.clone(),
1299 ));
1300
1301 let consensus_manager = Arc::new(ConsensusManager::new(
1302 &config,
1303 consensus_config,
1304 registry_service,
1305 client,
1306 node_role,
1307 ));
1308
1309 let consensus_store_pruner = ConsensusStorePruner::new(
1311 consensus_manager.get_storage_base_path(),
1312 consensus_config.db_retention_epochs(),
1313 consensus_config.db_pruner_period(),
1314 ®istry_service.default_registry(),
1315 );
1316
1317 let sui_tx_validator_metrics =
1318 SuiTxValidatorMetrics::new(®istry_service.default_registry());
1319
1320 let (validator_server_handle, admission_queue) = if node_role.is_validator() {
1321 let (handle, queue) = Self::start_grpc_validator_service(
1322 &config,
1323 state.clone(),
1324 consensus_adapter.clone(),
1325 epoch_store.clone(),
1326 ®istry_service.default_registry(),
1327 inflight_slot_freed_notify,
1328 )
1329 .await?;
1330 (Some(handle), queue)
1331 } else {
1332 (None, None)
1333 };
1334
1335 let validator_overload_monitor_handle = if node_role.is_validator()
1338 && config
1339 .authority_overload_config
1340 .max_load_shedding_percentage
1341 > 0
1342 {
1343 let authority_state = Arc::downgrade(&state);
1344 let overload_config = config.authority_overload_config.clone();
1345 fail_point!("starting_overload_monitor");
1346 Some(spawn_monitored_task!(overload_monitor(
1347 authority_state,
1348 overload_config,
1349 )))
1350 } else {
1351 None
1352 };
1353
1354 Self::start_epoch_specific_validator_components(
1355 &config,
1356 state.clone(),
1357 consensus_adapter,
1358 checkpoint_store,
1359 epoch_store,
1360 state_sync_handle,
1361 randomness_handle,
1362 consensus_manager,
1363 consensus_store_pruner,
1364 global_state_hasher,
1365 backpressure_manager,
1366 validator_server_handle,
1367 validator_overload_monitor_handle,
1368 checkpoint_metrics,
1369 sui_node_metrics,
1370 sui_tx_validator_metrics,
1371 admission_queue,
1372 node_role,
1373 )
1374 .await
1375 }
1376
1377 async fn start_epoch_specific_validator_components(
1378 config: &NodeConfig,
1379 state: Arc<AuthorityState>,
1380 consensus_adapter: Arc<ConsensusAdapter>,
1381 checkpoint_store: Arc<CheckpointStore>,
1382 epoch_store: Arc<AuthorityPerEpochStore>,
1383 state_sync_handle: state_sync::Handle,
1384 randomness_handle: randomness::Handle,
1385 consensus_manager: Arc<ConsensusManager>,
1386 consensus_store_pruner: ConsensusStorePruner,
1387 state_hasher: Weak<GlobalStateHasher>,
1388 backpressure_manager: Arc<BackpressureManager>,
1389 validator_server_handle: Option<SpawnOnce>,
1390 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1391 checkpoint_metrics: Arc<CheckpointMetrics>,
1392 sui_node_metrics: Arc<SuiNodeMetrics>,
1393 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1394 admission_queue: Option<AdmissionQueueContext>,
1395 node_role: NodeRole,
1396 ) -> Result<ValidatorComponents> {
1397 let checkpoint_service = Self::build_checkpoint_service(
1398 config,
1399 consensus_adapter.clone(),
1400 checkpoint_store.clone(),
1401 epoch_store.clone(),
1402 state.clone(),
1403 state_sync_handle,
1404 state_hasher,
1405 checkpoint_metrics.clone(),
1406 node_role,
1407 );
1408
1409 if node_role.runs_consensus() && epoch_store.randomness_state_enabled() {
1410 let randomness_manager = RandomnessManager::try_new(
1411 Arc::downgrade(&epoch_store),
1412 Box::new(consensus_adapter.clone()),
1413 randomness_handle,
1414 config.protocol_key_pair(),
1415 )
1416 .await;
1417 if let Some(randomness_manager) = randomness_manager {
1418 epoch_store
1419 .set_randomness_manager(randomness_manager)
1420 .await?;
1421 }
1422 }
1423
1424 if node_role.is_validator() {
1425 ExecutionTimeObserver::spawn(
1426 epoch_store.clone(),
1427 Box::new(consensus_adapter.clone()),
1428 config
1429 .execution_time_observer_config
1430 .clone()
1431 .unwrap_or_default(),
1432 );
1433 }
1434
1435 let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1436 None,
1437 state.metrics.clone(),
1438 ));
1439
1440 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1441 state.clone(),
1442 checkpoint_service.clone(),
1443 epoch_store.clone(),
1444 consensus_adapter.clone(),
1445 throughput_calculator,
1446 backpressure_manager,
1447 config.congestion_log.clone(),
1448 );
1449
1450 info!("Starting consensus manager asynchronously");
1451
1452 tokio::spawn({
1454 let config = config.clone();
1455 let epoch_store = epoch_store.clone();
1456 let sui_tx_validator = SuiTxValidator::new(
1457 state.clone(),
1458 epoch_store.clone(),
1459 checkpoint_service.clone(),
1460 sui_tx_validator_metrics.clone(),
1461 );
1462 let consensus_manager = consensus_manager.clone();
1463 async move {
1464 consensus_manager
1465 .start(
1466 &config,
1467 epoch_store,
1468 consensus_handler_initializer,
1469 sui_tx_validator,
1470 )
1471 .await;
1472 }
1473 });
1474 let replay_waiter = consensus_manager.replay_waiter();
1475
1476 info!("Spawning checkpoint service");
1477 let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1478 None
1479 } else {
1480 Some(replay_waiter)
1481 };
1482 checkpoint_service
1483 .spawn(epoch_store.clone(), replay_waiter)
1484 .await;
1485
1486 if node_role.is_validator() && epoch_store.authenticator_state_enabled() {
1487 Self::start_jwk_updater(
1488 config,
1489 sui_node_metrics,
1490 state.name,
1491 epoch_store.clone(),
1492 consensus_adapter.clone(),
1493 );
1494 }
1495
1496 if let Some(ctx) = &admission_queue {
1497 ctx.rotate_for_epoch(epoch_store);
1498 }
1499
1500 Ok(ValidatorComponents {
1501 validator_server_handle,
1502 validator_overload_monitor_handle,
1503 consensus_manager,
1504 consensus_store_pruner,
1505 consensus_adapter,
1506 checkpoint_metrics,
1507 sui_tx_validator_metrics,
1508 admission_queue,
1509 })
1510 }
1511
1512 fn build_checkpoint_service(
1513 config: &NodeConfig,
1514 consensus_adapter: Arc<ConsensusAdapter>,
1515 checkpoint_store: Arc<CheckpointStore>,
1516 epoch_store: Arc<AuthorityPerEpochStore>,
1517 state: Arc<AuthorityState>,
1518 state_sync_handle: state_sync::Handle,
1519 state_hasher: Weak<GlobalStateHasher>,
1520 checkpoint_metrics: Arc<CheckpointMetrics>,
1521 node_role: NodeRole,
1522 ) -> Arc<CheckpointService> {
1523 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1524 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1525
1526 debug!(
1527 "Starting checkpoint service with epoch start timestamp {}
1528 and epoch duration {}",
1529 epoch_start_timestamp_ms, epoch_duration_ms
1530 );
1531
1532 let checkpoint_output: Box<dyn CheckpointOutput> = if node_role.is_validator() {
1533 Box::new(SubmitCheckpointToConsensus {
1534 sender: consensus_adapter,
1535 signer: state.secret.clone(),
1536 authority: config.protocol_public_key(),
1537 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1538 .checked_add(epoch_duration_ms)
1539 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1540 metrics: checkpoint_metrics.clone(),
1541 })
1542 } else {
1543 LogCheckpointOutput::boxed()
1544 };
1545
1546 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1547 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1548 let max_checkpoint_size_bytes =
1549 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1550
1551 CheckpointService::build(
1552 state.clone(),
1553 checkpoint_store,
1554 epoch_store,
1555 state.get_transaction_cache_reader().clone(),
1556 state_hasher,
1557 checkpoint_output,
1558 Box::new(certified_checkpoint_output),
1559 checkpoint_metrics,
1560 max_tx_per_checkpoint,
1561 max_checkpoint_size_bytes,
1562 )
1563 }
1564
1565 fn construct_consensus_adapter(
1566 committee: &Committee,
1567 consensus_config: &ConsensusConfig,
1568 authority: AuthorityName,
1569 prometheus_registry: &Registry,
1570 consensus_client: Arc<dyn ConsensusClient>,
1571 checkpoint_store: Arc<CheckpointStore>,
1572 inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1573 ) -> ConsensusAdapter {
1574 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1575 ConsensusAdapter::new(
1578 consensus_client,
1579 checkpoint_store,
1580 authority,
1581 consensus_config.max_pending_transactions(),
1582 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1583 ca_metrics,
1584 inflight_slot_freed_notify,
1585 )
1586 }
1587
1588 async fn start_grpc_validator_service(
1589 config: &NodeConfig,
1590 state: Arc<AuthorityState>,
1591 consensus_adapter: Arc<ConsensusAdapter>,
1592 epoch_store: Arc<AuthorityPerEpochStore>,
1593 prometheus_registry: &Registry,
1594 inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1595 ) -> Result<(SpawnOnce, Option<AdmissionQueueContext>)> {
1596 let overload_config = &config.authority_overload_config;
1597 let admission_queue = overload_config.admission_queue_enabled.then(|| {
1598 let manager = Arc::new(AdmissionQueueManager::new(
1599 consensus_adapter.clone(),
1600 Arc::new(AdmissionQueueMetrics::new(prometheus_registry)),
1601 overload_config.admission_queue_capacity_fraction,
1602 overload_config.admission_queue_bypass_fraction,
1603 overload_config.admission_queue_failover_timeout,
1604 inflight_slot_freed_notify,
1605 ));
1606 AdmissionQueueContext::spawn(manager, epoch_store)
1607 });
1608 let validator_service = ValidatorService::new(
1609 state.clone(),
1610 consensus_adapter,
1611 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1612 config.policy_config.clone().map(|p| p.client_id_source),
1613 admission_queue.clone(),
1614 );
1615
1616 let mut server_conf = mysten_network::config::Config::new();
1617 server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1618 server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1619 server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1620 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1621 server_conf.load_shed = config.grpc_load_shed;
1622 let mut server_builder =
1623 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1624
1625 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1626
1627 let tls_config = sui_tls::create_rustls_server_config(
1628 config.network_key_pair().copy().private(),
1629 SUI_TLS_SERVER_NAME.to_string(),
1630 );
1631
1632 let network_address = config.network_address().clone();
1633
1634 let (ready_tx, ready_rx) = oneshot::channel();
1635
1636 let spawn_once = SpawnOnce::new(ready_rx, async move {
1637 let server = server_builder
1638 .bind(&network_address, Some(tls_config))
1639 .await
1640 .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1641 let local_addr = server.local_addr();
1642 info!("Listening to traffic on {local_addr}");
1643 ready_tx.send(()).unwrap();
1644 if let Err(err) = server.serve().await {
1645 info!("Server stopped: {err}");
1646 }
1647 info!("Server stopped");
1648 });
1649 Ok((spawn_once, admission_queue))
1650 }
1651
1652 pub fn state(&self) -> Arc<AuthorityState> {
1653 self.state.clone()
1654 }
1655
1656 #[cfg(any(test, msim))]
1657 pub fn connection_monitor_handle_for_testing(
1658 &self,
1659 ) -> &mysten_network::anemo_connection_monitor::ConnectionMonitorHandle {
1660 &self._connection_monitor_handle
1661 }
1662
1663 pub fn node_role(&self) -> NodeRole {
1664 self.state.load_epoch_store_one_call_per_task().node_role()
1665 }
1666
1667 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1669 self.state.reference_gas_price_for_testing()
1670 }
1671
1672 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1673 self.state.committee_store().clone()
1674 }
1675
1676 pub fn clone_authority_aggregator(
1687 &self,
1688 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1689 self.transaction_orchestrator
1690 .as_ref()
1691 .map(|to| to.clone_authority_aggregator())
1692 }
1693
1694 pub fn transaction_orchestrator(
1695 &self,
1696 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1697 self.transaction_orchestrator.clone()
1698 }
1699
1700 pub async fn monitor_reconfiguration(
1703 self: Arc<Self>,
1704 mut epoch_store: Arc<AuthorityPerEpochStore>,
1705 ) -> Result<()> {
1706 let checkpoint_executor_metrics =
1707 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1708
1709 loop {
1710 let mut hasher_guard = self.global_state_hasher.lock().await;
1711 let hasher = hasher_guard.take().unwrap();
1712 info!(
1713 "Creating checkpoint executor for epoch {}",
1714 epoch_store.epoch()
1715 );
1716 let checkpoint_executor = CheckpointExecutor::new(
1717 epoch_store.clone(),
1718 self.checkpoint_store.clone(),
1719 self.state.clone(),
1720 hasher.clone(),
1721 self.backpressure_manager.clone(),
1722 self.config.checkpoint_executor_config.clone(),
1723 checkpoint_executor_metrics.clone(),
1724 self.subscription_service_checkpoint_sender.clone(),
1725 );
1726
1727 let run_with_range = self.config.run_with_range;
1728
1729 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1730
1731 self.metrics
1733 .current_protocol_version
1734 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1735
1736 if let Some(components) = &*self.validator_components.lock().await
1739 && cur_epoch_store.is_validator()
1740 {
1741 tokio::time::sleep(Duration::from_millis(1)).await;
1743
1744 let config = cur_epoch_store.protocol_config();
1745 let mut supported_protocol_versions = self
1746 .config
1747 .supported_protocol_versions
1748 .expect("Supported versions should be populated")
1749 .truncate_below(config.version);
1751
1752 while supported_protocol_versions.max > config.version {
1753 let proposed_protocol_config = ProtocolConfig::get_for_version(
1754 supported_protocol_versions.max,
1755 cur_epoch_store.get_chain(),
1756 );
1757
1758 if proposed_protocol_config.enable_accumulators()
1759 && !epoch_store.accumulator_root_exists()
1760 {
1761 error!(
1762 "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1763 supported_protocol_versions.max
1764 );
1765 supported_protocol_versions.max = supported_protocol_versions.max.prev();
1766 } else {
1767 break;
1768 }
1769 }
1770
1771 let binary_config = config.binary_config(None);
1772 let transaction = ConsensusTransaction::new_capability_notification_v2(
1773 AuthorityCapabilitiesV2::new(
1774 self.state.name,
1775 cur_epoch_store.get_chain_identifier().chain(),
1776 supported_protocol_versions,
1777 self.state
1778 .get_available_system_packages(&binary_config)
1779 .await,
1780 ),
1781 );
1782 info!(?transaction, "submitting capabilities to consensus");
1783 components.consensus_adapter.submit(
1784 transaction,
1785 None,
1786 &cur_epoch_store,
1787 None,
1788 None,
1789 )?;
1790 }
1791
1792 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1793
1794 if stop_condition == StopReason::RunWithRangeCondition {
1795 SuiNode::shutdown(&self).await;
1796 self.shutdown_channel_tx
1797 .send(run_with_range)
1798 .expect("RunWithRangeCondition met but failed to send shutdown message");
1799 return Ok(());
1800 }
1801
1802 let latest_system_state = self
1804 .state
1805 .get_object_cache_reader()
1806 .get_sui_system_state_object_unsafe()
1807 .expect("Read Sui System State object cannot fail");
1808
1809 #[cfg(msim)]
1810 if !self
1811 .sim_state
1812 .sim_safe_mode_expected
1813 .load(Ordering::Relaxed)
1814 {
1815 debug_assert!(!latest_system_state.safe_mode());
1816 }
1817
1818 #[cfg(not(msim))]
1819 debug_assert!(!latest_system_state.safe_mode());
1820
1821 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1822 && self.state.is_fullnode(&cur_epoch_store)
1823 {
1824 warn!(
1825 "Failed to send end of epoch notification to subscriber: {:?}",
1826 err
1827 );
1828 }
1829
1830 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1831 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1832
1833 self.auth_agg.store(Arc::new(
1834 self.auth_agg
1835 .load()
1836 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1837 ));
1838
1839 let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1840 let next_epoch = next_epoch_committee.epoch();
1841 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1842
1843 info!(
1844 next_epoch,
1845 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1846 );
1847
1848 fail_point_async!("reconfig_delay");
1849
1850 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1851
1852 update_peer_addresses(
1853 &self.config,
1854 &self.endpoint_manager,
1855 &new_epoch_start_state,
1856 Some(cur_epoch_store.epoch_start_state()),
1857 );
1858
1859 let mut validator_components_lock_guard = self.validator_components.lock().await;
1860
1861 let new_epoch_store = self
1865 .reconfigure_state(
1866 &self.state,
1867 &cur_epoch_store,
1868 next_epoch_committee.clone(),
1869 new_epoch_start_state,
1870 hasher.clone(),
1871 )
1872 .await;
1873
1874 let new_role = new_epoch_store.node_role();
1875
1876 let new_validator_components = if let Some(ValidatorComponents {
1877 validator_server_handle,
1878 validator_overload_monitor_handle,
1879 consensus_manager,
1880 consensus_store_pruner,
1881 consensus_adapter,
1882 checkpoint_metrics,
1883 sui_tx_validator_metrics,
1884 admission_queue,
1885 }) = validator_components_lock_guard.take()
1886 {
1887 info!("Reconfiguring node (was running consensus).");
1888
1889 consensus_manager.shutdown().await;
1890 info!("Consensus has shut down.");
1891
1892 info!("Epoch store finished reconfiguration.");
1893
1894 let global_state_hasher_metrics = Arc::into_inner(hasher)
1897 .expect("Object state hasher should have no other references at this point")
1898 .metrics();
1899 let new_hasher = Arc::new(GlobalStateHasher::new(
1900 self.state.get_global_state_hash_store().clone(),
1901 global_state_hasher_metrics,
1902 ));
1903 let weak_hasher = Arc::downgrade(&new_hasher);
1904 *hasher_guard = Some(new_hasher);
1905
1906 consensus_store_pruner.prune(next_epoch).await;
1907
1908 if new_role.runs_consensus() {
1909 info!("Restarting consensus as {new_role}");
1910 Some(
1911 Self::start_epoch_specific_validator_components(
1912 &self.config,
1913 self.state.clone(),
1914 consensus_adapter,
1915 self.checkpoint_store.clone(),
1916 new_epoch_store.clone(),
1917 self.state_sync_handle.clone(),
1918 self.randomness_handle.clone(),
1919 consensus_manager,
1920 consensus_store_pruner,
1921 weak_hasher,
1922 self.backpressure_manager.clone(),
1923 validator_server_handle,
1924 validator_overload_monitor_handle,
1925 checkpoint_metrics,
1926 self.metrics.clone(),
1927 sui_tx_validator_metrics,
1928 admission_queue,
1929 new_role,
1930 )
1931 .await?,
1932 )
1933 } else {
1934 info!(
1935 "This node has new role {new_role} and no longer runs consensus after reconfiguration"
1936 );
1937 None
1938 }
1939 } else {
1940 let global_state_hasher_metrics = Arc::into_inner(hasher)
1943 .expect("Object state hasher should have no other references at this point")
1944 .metrics();
1945 let new_hasher = Arc::new(GlobalStateHasher::new(
1946 self.state.get_global_state_hash_store().clone(),
1947 global_state_hasher_metrics,
1948 ));
1949 let weak_hasher = Arc::downgrade(&new_hasher);
1950 *hasher_guard = Some(new_hasher);
1951
1952 if new_role.runs_consensus() {
1953 info!("Promoting node to {new_role}, starting consensus components");
1954
1955 let mut components = Self::construct_validator_components(
1956 self.config.clone(),
1957 self.state.clone(),
1958 Arc::new(next_epoch_committee.clone()),
1959 new_epoch_store.clone(),
1960 self.checkpoint_store.clone(),
1961 self.state_sync_handle.clone(),
1962 self.randomness_handle.clone(),
1963 weak_hasher,
1964 self.backpressure_manager.clone(),
1965 &self.registry_service,
1966 self.metrics.clone(),
1967 self.checkpoint_metrics.clone(),
1968 new_role,
1969 )
1970 .await?;
1971
1972 if new_role.is_validator() {
1973 components.validator_server_handle = Some(
1974 components
1975 .validator_server_handle
1976 .take()
1977 .unwrap()
1978 .start()
1979 .await,
1980 );
1981
1982 self.endpoint_manager
1983 .set_consensus_address_updater(components.consensus_manager.clone());
1984 }
1985
1986 Some(components)
1987 } else {
1988 None
1989 }
1990 };
1991 *validator_components_lock_guard = new_validator_components;
1992
1993 cur_epoch_store.release_db_handles();
1996
1997 if cfg!(msim)
1998 && !matches!(
1999 self.config
2000 .authority_store_pruning_config
2001 .num_epochs_to_retain_for_checkpoints(),
2002 None | Some(u64::MAX) | Some(0)
2003 )
2004 {
2005 self.state
2006 .prune_checkpoints_for_eligible_epochs_for_testing(
2007 self.config.clone(),
2008 sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2009 )
2010 .await?;
2011 }
2012
2013 epoch_store = new_epoch_store;
2014 info!("Reconfiguration finished");
2015 }
2016 }
2017
2018 async fn shutdown(&self) {
2019 if let Some(validator_components) = &*self.validator_components.lock().await {
2020 validator_components.consensus_manager.shutdown().await;
2021 }
2022 }
2023
2024 async fn reconfigure_state(
2025 &self,
2026 state: &Arc<AuthorityState>,
2027 cur_epoch_store: &AuthorityPerEpochStore,
2028 next_epoch_committee: Committee,
2029 next_epoch_start_system_state: EpochStartSystemState,
2030 global_state_hasher: Arc<GlobalStateHasher>,
2031 ) -> Arc<AuthorityPerEpochStore> {
2032 let next_epoch = next_epoch_committee.epoch();
2033
2034 let last_checkpoint = self
2035 .checkpoint_store
2036 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2037 .expect("Error loading last checkpoint for current epoch")
2038 .expect("Could not load last checkpoint for current epoch");
2039
2040 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2041
2042 assert_eq!(
2043 Some(last_checkpoint_seq),
2044 self.checkpoint_store
2045 .get_highest_executed_checkpoint_seq_number()
2046 .expect("Error loading highest executed checkpoint sequence number")
2047 );
2048
2049 let epoch_start_configuration = EpochStartConfiguration::new(
2050 next_epoch_start_system_state,
2051 *last_checkpoint.digest(),
2052 state.get_object_store().as_ref(),
2053 EpochFlag::default_flags_for_new_epoch(&state.config),
2054 )
2055 .expect("EpochStartConfiguration construction cannot fail");
2056
2057 let new_epoch_store = self
2058 .state
2059 .reconfigure(
2060 cur_epoch_store,
2061 self.config.supported_protocol_versions.unwrap(),
2062 next_epoch_committee,
2063 epoch_start_configuration,
2064 global_state_hasher,
2065 &self.config.expensive_safety_check_config,
2066 last_checkpoint_seq,
2067 )
2068 .await
2069 .expect("Reconfigure authority state cannot fail");
2070 info!(next_epoch, "Node State has been reconfigured");
2071 assert_eq!(next_epoch, new_epoch_store.epoch());
2072 self.state.get_reconfig_api().update_epoch_flags_metrics(
2073 cur_epoch_store.epoch_start_config().flags(),
2074 new_epoch_store.epoch_start_config().flags(),
2075 );
2076
2077 new_epoch_store
2078 }
2079
2080 pub fn get_config(&self) -> &NodeConfig {
2081 &self.config
2082 }
2083
2084 pub fn randomness_handle(&self) -> randomness::Handle {
2085 self.randomness_handle.clone()
2086 }
2087
2088 pub fn state_sync_handle(&self) -> state_sync::Handle {
2089 self.state_sync_handle.clone()
2090 }
2091
2092 pub fn endpoint_manager(&self) -> &EndpointManager {
2093 &self.endpoint_manager
2094 }
2095
2096 fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2098 let digest_str = digest.to_string();
2099 if digest_str.len() >= 8 {
2100 digest_str[0..8].to_string()
2101 } else {
2102 digest_str
2103 }
2104 }
2105
2106 async fn check_and_recover_forks(
2110 checkpoint_store: &CheckpointStore,
2111 checkpoint_metrics: &CheckpointMetrics,
2112 fork_recovery: Option<&ForkRecoveryConfig>,
2113 ) -> Result<()> {
2114 if let Some(recovery) = fork_recovery {
2116 Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2117 Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2118 }
2119
2120 if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2121 .get_checkpoint_fork_detected()
2122 .map_err(|e| {
2123 error!("Failed to check for checkpoint fork: {:?}", e);
2124 e
2125 })?
2126 {
2127 Self::handle_checkpoint_fork(
2128 checkpoint_seq,
2129 checkpoint_digest,
2130 checkpoint_metrics,
2131 fork_recovery,
2132 )
2133 .await?;
2134 }
2135 if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2136 .get_transaction_fork_detected()
2137 .map_err(|e| {
2138 error!("Failed to check for transaction fork: {:?}", e);
2139 e
2140 })?
2141 {
2142 Self::handle_transaction_fork(
2143 tx_digest,
2144 expected_effects,
2145 actual_effects,
2146 checkpoint_metrics,
2147 fork_recovery,
2148 )
2149 .await?;
2150 }
2151
2152 Ok(())
2153 }
2154
2155 fn try_recover_checkpoint_fork(
2156 checkpoint_store: &CheckpointStore,
2157 recovery: &ForkRecoveryConfig,
2158 ) -> Result<()> {
2159 for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2162 let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2163 anyhow::bail!(
2164 "Invalid checkpoint digest override for seq {}: {}",
2165 seq,
2166 expected_digest_str
2167 );
2168 };
2169
2170 if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2171 let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2172 if local_digest != expected_digest {
2173 info!(
2174 seq,
2175 local = %Self::get_digest_prefix(local_digest),
2176 expected = %Self::get_digest_prefix(expected_digest),
2177 "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2178 seq
2179 );
2180 checkpoint_store
2181 .clear_locally_computed_checkpoints_from(*seq)
2182 .context(
2183 "Failed to clear locally computed checkpoints from override seq",
2184 )?;
2185 }
2186 }
2187 }
2188
2189 if let Some((checkpoint_seq, checkpoint_digest)) =
2190 checkpoint_store.get_checkpoint_fork_detected()?
2191 && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2192 {
2193 info!(
2194 "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2195 checkpoint_seq, checkpoint_digest
2196 );
2197 checkpoint_store
2198 .clear_checkpoint_fork_detected()
2199 .expect("Failed to clear checkpoint fork detected marker");
2200 }
2201 Ok(())
2202 }
2203
2204 fn try_recover_transaction_fork(
2205 checkpoint_store: &CheckpointStore,
2206 recovery: &ForkRecoveryConfig,
2207 ) -> Result<()> {
2208 if recovery.transaction_overrides.is_empty() {
2209 return Ok(());
2210 }
2211
2212 if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2213 && recovery
2214 .transaction_overrides
2215 .contains_key(&tx_digest.to_string())
2216 {
2217 info!(
2218 "Fork recovery enabled: clearing transaction fork for tx {:?}",
2219 tx_digest
2220 );
2221 checkpoint_store
2222 .clear_transaction_fork_detected()
2223 .expect("Failed to clear transaction fork detected marker");
2224 }
2225 Ok(())
2226 }
2227
2228 fn get_current_timestamp() -> u64 {
2229 std::time::SystemTime::now()
2230 .duration_since(std::time::SystemTime::UNIX_EPOCH)
2231 .unwrap()
2232 .as_secs()
2233 }
2234
2235 async fn handle_checkpoint_fork(
2236 checkpoint_seq: u64,
2237 checkpoint_digest: CheckpointDigest,
2238 checkpoint_metrics: &CheckpointMetrics,
2239 fork_recovery: Option<&ForkRecoveryConfig>,
2240 ) -> Result<()> {
2241 checkpoint_metrics
2242 .checkpoint_fork_crash_mode
2243 .with_label_values(&[
2244 &checkpoint_seq.to_string(),
2245 &Self::get_digest_prefix(checkpoint_digest),
2246 &Self::get_current_timestamp().to_string(),
2247 ])
2248 .set(1);
2249
2250 let behavior = fork_recovery
2251 .map(|fr| fr.fork_crash_behavior)
2252 .unwrap_or_default();
2253
2254 match behavior {
2255 ForkCrashBehavior::AwaitForkRecovery => {
2256 error!(
2257 checkpoint_seq = checkpoint_seq,
2258 checkpoint_digest = ?checkpoint_digest,
2259 "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2260 );
2261 futures::future::pending::<()>().await;
2262 unreachable!("pending() should never return");
2263 }
2264 ForkCrashBehavior::ReturnError => {
2265 error!(
2266 checkpoint_seq = checkpoint_seq,
2267 checkpoint_digest = ?checkpoint_digest,
2268 "Checkpoint fork detected! Returning error."
2269 );
2270 Err(anyhow::anyhow!(
2271 "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2272 checkpoint_seq,
2273 checkpoint_digest
2274 ))
2275 }
2276 }
2277 }
2278
2279 async fn handle_transaction_fork(
2280 tx_digest: TransactionDigest,
2281 expected_effects_digest: TransactionEffectsDigest,
2282 actual_effects_digest: TransactionEffectsDigest,
2283 checkpoint_metrics: &CheckpointMetrics,
2284 fork_recovery: Option<&ForkRecoveryConfig>,
2285 ) -> Result<()> {
2286 checkpoint_metrics
2287 .transaction_fork_crash_mode
2288 .with_label_values(&[
2289 &Self::get_digest_prefix(tx_digest),
2290 &Self::get_digest_prefix(expected_effects_digest),
2291 &Self::get_digest_prefix(actual_effects_digest),
2292 &Self::get_current_timestamp().to_string(),
2293 ])
2294 .set(1);
2295
2296 let behavior = fork_recovery
2297 .map(|fr| fr.fork_crash_behavior)
2298 .unwrap_or_default();
2299
2300 match behavior {
2301 ForkCrashBehavior::AwaitForkRecovery => {
2302 error!(
2303 tx_digest = ?tx_digest,
2304 expected_effects_digest = ?expected_effects_digest,
2305 actual_effects_digest = ?actual_effects_digest,
2306 "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2307 );
2308 futures::future::pending::<()>().await;
2309 unreachable!("pending() should never return");
2310 }
2311 ForkCrashBehavior::ReturnError => {
2312 error!(
2313 tx_digest = ?tx_digest,
2314 expected_effects_digest = ?expected_effects_digest,
2315 actual_effects_digest = ?actual_effects_digest,
2316 "Transaction fork detected! Returning error."
2317 );
2318 Err(anyhow::anyhow!(
2319 "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2320 tx_digest,
2321 expected_effects_digest,
2322 actual_effects_digest
2323 ))
2324 }
2325 }
2326 }
2327}
2328
2329#[cfg(not(msim))]
2330impl SuiNode {
2331 async fn fetch_jwks(
2332 _authority: AuthorityName,
2333 provider: &OIDCProvider,
2334 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2335 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2336 use sui_types::error::SuiErrorKind;
2337 let client = reqwest::Client::new();
2338 fetch_jwks(provider, &client, true)
2339 .await
2340 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2341 }
2342}
2343
2344#[cfg(msim)]
2345impl SuiNode {
2346 pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2347 self.sim_state.sim_node.id()
2348 }
2349
2350 pub fn set_safe_mode_expected(&self, new_value: bool) {
2351 info!("Setting safe mode expected to {}", new_value);
2352 self.sim_state
2353 .sim_safe_mode_expected
2354 .store(new_value, Ordering::Relaxed);
2355 }
2356
2357 #[allow(unused_variables)]
2358 async fn fetch_jwks(
2359 authority: AuthorityName,
2360 provider: &OIDCProvider,
2361 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2362 get_jwk_injector()(authority, provider)
2363 }
2364}
2365
2366enum SpawnOnce {
2367 Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2369 #[allow(unused)]
2370 Started(JoinHandle<()>),
2371}
2372
2373impl SpawnOnce {
2374 pub fn new(
2375 ready_rx: oneshot::Receiver<()>,
2376 future: impl Future<Output = ()> + Send + 'static,
2377 ) -> Self {
2378 Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2379 }
2380
2381 pub async fn start(self) -> Self {
2382 match self {
2383 Self::Unstarted(ready_rx, future) => {
2384 let future = future.into_inner();
2385 let handle = tokio::spawn(future);
2386 ready_rx.await.unwrap();
2387 Self::Started(handle)
2388 }
2389 Self::Started(_) => self,
2390 }
2391 }
2392}
2393
2394fn update_peer_addresses(
2398 config: &NodeConfig,
2399 endpoint_manager: &EndpointManager,
2400 epoch_start_state: &EpochStartSystemState,
2401 prev_epoch_start_state: Option<&EpochStartSystemState>,
2402) {
2403 if config.consensus_config().is_none() {
2404 return;
2405 }
2406 let new_peers: HashSet<PeerId> = epoch_start_state
2407 .get_validator_as_p2p_peers(config.protocol_public_key())
2408 .into_iter()
2409 .map(|(peer_id, address)| {
2410 endpoint_manager
2411 .update_endpoint(
2412 EndpointId::P2p(peer_id),
2413 AddressSource::Chain,
2414 vec![address],
2415 )
2416 .expect("Updating peer addresses should not fail");
2417 peer_id
2418 })
2419 .collect();
2420
2421 if let Some(prev) = prev_epoch_start_state {
2423 for (peer_id, _) in prev.get_validator_as_p2p_peers(config.protocol_public_key()) {
2424 if !new_peers.contains(&peer_id) {
2425 endpoint_manager
2426 .update_endpoint(EndpointId::P2p(peer_id), AddressSource::Chain, vec![])
2427 .expect("Clearing peer addresses should not fail");
2428 }
2429 }
2430 }
2431}
2432
2433fn build_kv_store(
2434 state: &Arc<AuthorityState>,
2435 config: &NodeConfig,
2436 registry: &Registry,
2437) -> Result<Arc<TransactionKeyValueStore>> {
2438 let metrics = KeyValueStoreMetrics::new(registry);
2439 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2440
2441 let base_url = &config.transaction_kv_store_read_config.base_url;
2442
2443 if base_url.is_empty() {
2444 info!("no http kv store url provided, using local db only");
2445 return Ok(Arc::new(db_store));
2446 }
2447
2448 let base_url: url::Url = base_url.parse().tap_err(|e| {
2449 error!(
2450 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2451 base_url, e
2452 )
2453 })?;
2454
2455 let network_str = match state.get_chain_identifier().chain() {
2456 Chain::Mainnet => "/mainnet",
2457 _ => {
2458 info!("using local db only for kv store");
2459 return Ok(Arc::new(db_store));
2460 }
2461 };
2462
2463 let base_url = base_url.join(network_str)?.to_string();
2464 let http_store = HttpKVStore::new_kv(
2465 &base_url,
2466 config.transaction_kv_store_read_config.cache_size,
2467 metrics.clone(),
2468 )?;
2469 info!("using local key-value store with fallback to http key-value store");
2470 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2471 db_store,
2472 http_store,
2473 metrics,
2474 "json_rpc_fallback",
2475 )))
2476}
2477
2478async fn build_http_servers(
2479 state: Arc<AuthorityState>,
2480 store: RocksDbStore,
2481 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2482 config: &NodeConfig,
2483 prometheus_registry: &Registry,
2484 server_version: ServerVersion,
2485 node_role: NodeRole,
2486) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2487 if !node_role.should_run_rpc_servers() {
2489 return Ok((HttpServers::default(), None));
2490 }
2491
2492 info!("starting rpc service with config: {:?}", config.rpc);
2493
2494 let mut router = axum::Router::new();
2495
2496 let json_rpc_router = {
2497 let traffic_controller = state.traffic_controller.clone();
2498 let mut server = JsonRpcServerBuilder::new(
2499 env!("CARGO_PKG_VERSION"),
2500 prometheus_registry,
2501 traffic_controller,
2502 config.policy_config.clone(),
2503 );
2504
2505 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2506
2507 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2508 server.register_module(ReadApi::new(
2509 state.clone(),
2510 kv_store.clone(),
2511 metrics.clone(),
2512 ))?;
2513 server.register_module(CoinReadApi::new(
2514 state.clone(),
2515 kv_store.clone(),
2516 metrics.clone(),
2517 ))?;
2518
2519 if config.run_with_range.is_none() {
2522 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2523 }
2524 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2525 server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2526
2527 if let Some(transaction_orchestrator) = transaction_orchestrator {
2528 server.register_module(TransactionExecutionApi::new(
2529 state.clone(),
2530 transaction_orchestrator.clone(),
2531 metrics.clone(),
2532 ))?;
2533 }
2534
2535 let name_service_config =
2536 if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2537 config.name_service_package_address,
2538 config.name_service_registry_id,
2539 config.name_service_reverse_registry_id,
2540 ) {
2541 sui_name_service::NameServiceConfig::new(
2542 package_address,
2543 registry_id,
2544 reverse_registry_id,
2545 )
2546 } else {
2547 match state.get_chain_identifier().chain() {
2548 Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2549 Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2550 Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2551 }
2552 };
2553
2554 server.register_module(IndexerApi::new(
2555 state.clone(),
2556 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2557 kv_store,
2558 name_service_config,
2559 metrics,
2560 config.indexer_max_subscriptions,
2561 ))?;
2562 server.register_module(MoveUtils::new(state.clone()))?;
2563
2564 let server_type = config.jsonrpc_server_type();
2565
2566 server.to_router(server_type).await?
2567 };
2568
2569 router = router.merge(json_rpc_router);
2570
2571 let (subscription_service_checkpoint_sender, subscription_service_handle) =
2572 SubscriptionService::build(prometheus_registry);
2573 let rpc_router = {
2574 let mut rpc_service =
2575 sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2576 rpc_service.with_server_version(server_version);
2577
2578 if let Some(config) = config.rpc.clone() {
2579 config.validate()?;
2580 rpc_service.with_config(config);
2581 }
2582
2583 rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2584 rpc_service.with_subscription_service(subscription_service_handle);
2585
2586 if let Some(transaction_orchestrator) = transaction_orchestrator {
2587 rpc_service.with_executor(transaction_orchestrator.clone())
2588 }
2589
2590 rpc_service.into_router().await
2591 };
2592
2593 let layers = ServiceBuilder::new()
2594 .map_request(|mut request: axum::http::Request<_>| {
2595 if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2596 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2597 request.extensions_mut().insert(axum_connect_info);
2598 }
2599 request
2600 })
2601 .layer(axum::middleware::from_fn(server_timing_middleware))
2602 .layer(
2604 tower_http::cors::CorsLayer::new()
2605 .allow_methods([http::Method::GET, http::Method::POST])
2606 .allow_origin(tower_http::cors::Any)
2607 .allow_headers(tower_http::cors::Any)
2608 .expose_headers(tower_http::cors::Any),
2609 );
2610
2611 router = router.merge(rpc_router).layer(layers);
2612
2613 let https = if let Some((tls_config, https_address)) = config
2614 .rpc()
2615 .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2616 {
2617 let https = sui_http::Builder::new()
2618 .tls_single_cert(tls_config.cert(), tls_config.key())
2619 .and_then(|builder| builder.serve(https_address, router.clone()))
2620 .map_err(|e| anyhow::anyhow!(e))?;
2621
2622 info!(
2623 https_address =? https.local_addr(),
2624 "HTTPS rpc server listening on {}",
2625 https.local_addr()
2626 );
2627
2628 Some(https)
2629 } else {
2630 None
2631 };
2632
2633 let http = sui_http::Builder::new()
2634 .serve(&config.json_rpc_address, router)
2635 .map_err(|e| anyhow::anyhow!(e))?;
2636
2637 info!(
2638 http_address =? http.local_addr(),
2639 "HTTP rpc server listening on {}",
2640 http.local_addr()
2641 );
2642
2643 Ok((
2644 HttpServers {
2645 http: Some(http),
2646 https,
2647 },
2648 Some(subscription_service_checkpoint_sender),
2649 ))
2650}
2651
2652#[cfg(not(test))]
2653fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2654 protocol_config.max_transactions_per_checkpoint() as usize
2655}
2656
2657#[cfg(test)]
2658fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2659 2
2660}
2661
2662#[derive(Default)]
2663struct HttpServers {
2664 #[allow(unused)]
2665 http: Option<sui_http::ServerHandle>,
2666 #[allow(unused)]
2667 https: Option<sui_http::ServerHandle>,
2668}
2669
2670#[cfg(test)]
2671mod tests {
2672 use super::*;
2673 use prometheus::Registry;
2674 use std::collections::BTreeMap;
2675 use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2676 use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2677 use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2678
2679 #[tokio::test]
2680 async fn test_fork_error_and_recovery_both_paths() {
2681 let checkpoint_store = CheckpointStore::new_for_tests();
2682 let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2683
2684 let seq_num = 42;
2686 let digest = CheckpointDigest::random();
2687 checkpoint_store
2688 .record_checkpoint_fork_detected(seq_num, digest)
2689 .unwrap();
2690
2691 let fork_recovery = ForkRecoveryConfig {
2692 transaction_overrides: Default::default(),
2693 checkpoint_overrides: Default::default(),
2694 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2695 };
2696
2697 let r = SuiNode::check_and_recover_forks(
2698 &checkpoint_store,
2699 &checkpoint_metrics,
2700 Some(&fork_recovery),
2701 )
2702 .await;
2703 assert!(r.is_err());
2704 assert!(
2705 r.unwrap_err()
2706 .to_string()
2707 .contains("Checkpoint fork detected")
2708 );
2709
2710 let mut checkpoint_overrides = BTreeMap::new();
2711 checkpoint_overrides.insert(seq_num, digest.to_string());
2712 let fork_recovery_with_override = ForkRecoveryConfig {
2713 transaction_overrides: Default::default(),
2714 checkpoint_overrides,
2715 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2716 };
2717 let r = SuiNode::check_and_recover_forks(
2718 &checkpoint_store,
2719 &checkpoint_metrics,
2720 Some(&fork_recovery_with_override),
2721 )
2722 .await;
2723 assert!(r.is_ok());
2724 assert!(
2725 checkpoint_store
2726 .get_checkpoint_fork_detected()
2727 .unwrap()
2728 .is_none()
2729 );
2730
2731 let tx_digest = TransactionDigest::random();
2733 let expected_effects = TransactionEffectsDigest::random();
2734 let actual_effects = TransactionEffectsDigest::random();
2735 checkpoint_store
2736 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2737 .unwrap();
2738
2739 let fork_recovery = ForkRecoveryConfig {
2740 transaction_overrides: Default::default(),
2741 checkpoint_overrides: Default::default(),
2742 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2743 };
2744 let r = SuiNode::check_and_recover_forks(
2745 &checkpoint_store,
2746 &checkpoint_metrics,
2747 Some(&fork_recovery),
2748 )
2749 .await;
2750 assert!(r.is_err());
2751 assert!(
2752 r.unwrap_err()
2753 .to_string()
2754 .contains("Transaction fork detected")
2755 );
2756
2757 let mut transaction_overrides = BTreeMap::new();
2758 transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2759 let fork_recovery_with_override = ForkRecoveryConfig {
2760 transaction_overrides,
2761 checkpoint_overrides: Default::default(),
2762 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2763 };
2764 let r = SuiNode::check_and_recover_forks(
2765 &checkpoint_store,
2766 &checkpoint_metrics,
2767 Some(&fork_recovery_with_override),
2768 )
2769 .await;
2770 assert!(r.is_ok());
2771 assert!(
2772 checkpoint_store
2773 .get_transaction_fork_detected()
2774 .unwrap()
2775 .is_none()
2776 );
2777 }
2778}