1use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::admission_queue::{
29 AdmissionQueueContext, AdmissionQueueManager, AdmissionQueueMetrics,
30};
31use sui_core::authority::ExecutionEnv;
32use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
33use sui_core::authority::backpressure::BackpressureManager;
34use sui_core::authority::epoch_start_configuration::EpochFlag;
35use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
36use sui_core::consensus_adapter::ConsensusClient;
37use sui_core::consensus_manager::UpdatableConsensusClient;
38use sui_core::epoch::randomness::RandomnessManager;
39use sui_core::execution_cache::build_execution_cache;
40use sui_core::randomness_round_receiver::{RandomnessRoundReceiver, RandomnessRoundReceiverHandle};
41use sui_network::endpoint_manager::{AddressSource, EndpointId};
42use sui_network::validator::server::SUI_TLS_SERVER_NAME;
43use sui_types::full_checkpoint_content::Checkpoint;
44use sui_types::node_role::NodeRole;
45
46use sui_core::global_state_hasher::GlobalStateHashMetrics;
47use sui_core::storage::RestReadStore;
48use sui_json_rpc::bridge_api::BridgeReadApi;
49use sui_json_rpc_api::JsonRpcMetrics;
50use sui_network::randomness;
51use sui_rpc_api::RpcMetrics;
52use sui_rpc_api::ServerVersion;
53use sui_rpc_api::subscription::SubscriptionService;
54use sui_types::base_types::ConciseableName;
55use sui_types::crypto::RandomnessRound;
56use sui_types::digests::{
57 ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
58};
59use sui_types::messages_consensus::AuthorityCapabilitiesV2;
60use sui_types::sui_system_state::SuiSystemState;
61use tap::tap::TapFallible;
62use tokio::sync::oneshot;
63use tokio::sync::{Mutex, broadcast, mpsc};
64use tokio::task::JoinHandle;
65use tower::ServiceBuilder;
66use tracing::{Instrument, error_span, info};
67use tracing::{debug, error, warn};
68
69macro_rules! jwk_log {
73 ($($arg:tt)+) => {
74 if in_test_configuration() {
75 debug!($($arg)+);
76 } else {
77 info!($($arg)+);
78 }
79 };
80}
81
82use fastcrypto_zkp::bn254::zk_login::JWK;
83pub use handle::SuiNodeHandle;
84use mysten_metrics::{RegistryService, spawn_monitored_task};
85use mysten_service::server_timing::server_timing_middleware;
86use sui_config::node::{DBCheckpointConfig, RunWithRange};
87use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
88use sui_config::node_config_metrics::NodeConfigMetrics;
89use sui_config::{ConsensusConfig, NodeConfig};
90use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
91use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
92use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
93use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
94use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
95use sui_core::authority_aggregator::AuthorityAggregator;
96use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
97use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
98use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
99use sui_core::checkpoints::{
100 CheckpointMetrics, CheckpointOutput, CheckpointService, CheckpointStore, LogCheckpointOutput,
101 SendCheckpointToStateSync, SubmitCheckpointToConsensus,
102};
103use sui_core::consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics};
104use sui_core::consensus_manager::ConsensusManager;
105use sui_core::consensus_throughput_calculator::ConsensusThroughputCalculator;
106use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
107use sui_core::db_checkpoint_handler::DBCheckpointHandler;
108use sui_core::epoch::committee_store::CommitteeStore;
109use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
110use sui_core::epoch::epoch_metrics::EpochMetrics;
111use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
112use sui_core::global_state_hasher::GlobalStateHasher;
113use sui_core::jsonrpc_index::IndexStore;
114use sui_core::module_cache_metrics::ResolverMetrics;
115use sui_core::overload_monitor::overload_monitor;
116use sui_core::rpc_index::RpcIndexStore;
117use sui_core::signature_verifier::SignatureVerifierMetrics;
118use sui_core::storage::RocksDbStore;
119use sui_core::transaction_orchestrator::TransactionOrchestrator;
120use sui_core::{
121 authority::{AuthorityState, AuthorityStore},
122 authority_client::NetworkAuthorityClient,
123};
124use sui_json_rpc::JsonRpcServerBuilder;
125use sui_json_rpc::coin_api::CoinReadApi;
126use sui_json_rpc::governance_api::GovernanceReadApi;
127use sui_json_rpc::indexer_api::IndexerApi;
128use sui_json_rpc::move_utils::MoveUtils;
129use sui_json_rpc::read_api::ReadApi;
130use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
131use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
132use sui_macros::fail_point;
133use sui_macros::{fail_point_async, replay_log};
134use sui_network::api::ValidatorServer;
135use sui_network::discovery;
136use sui_network::endpoint_manager::EndpointManager;
137use sui_network::state_sync;
138use sui_network::validator::server::ServerBuilder;
139use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
140use sui_snapshot::uploader::StateSnapshotUploader;
141use sui_storage::{
142 http_key_value_store::HttpKVStore,
143 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
144 key_value_store_metrics::KeyValueStoreMetrics,
145};
146use sui_types::base_types::{AuthorityName, EpochId};
147use sui_types::committee::Committee;
148use sui_types::crypto::KeypairTraits;
149use sui_types::error::{SuiError, SuiResult};
150use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
151use sui_types::sui_system_state::SuiSystemStateTrait;
152use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
153use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
154use sui_types::supported_protocol_versions::SupportedProtocolVersions;
155use typed_store::DBMetrics;
156use typed_store::rocks::default_db_options;
157
158use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
159
160pub mod admin;
161mod handle;
162pub mod metrics;
163
164pub struct ValidatorComponents {
165 validator_server_handle: Option<SpawnOnce>,
166 validator_overload_monitor_handle: Option<JoinHandle<()>>,
167 consensus_manager: Arc<ConsensusManager>,
168 consensus_store_pruner: ConsensusStorePruner,
169 consensus_adapter: Arc<ConsensusAdapter>,
170 checkpoint_metrics: Arc<CheckpointMetrics>,
171 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
172 admission_queue: Option<AdmissionQueueContext>,
173}
174pub struct P2pComponents {
175 p2p_network: Network,
176 known_peers: HashMap<PeerId, String>,
177 discovery_handle: discovery::Handle,
178 state_sync_handle: state_sync::Handle,
179 randomness_handle: randomness::Handle,
180 endpoint_manager: EndpointManager,
181}
182
183#[cfg(msim)]
184mod simulator {
185 use std::sync::atomic::AtomicBool;
186 use sui_types::error::SuiErrorKind;
187
188 use super::*;
189 pub(super) struct SimState {
190 pub sim_node: sui_simulator::runtime::NodeHandle,
191 pub sim_safe_mode_expected: AtomicBool,
192 _leak_detector: sui_simulator::NodeLeakDetector,
193 }
194
195 impl Default for SimState {
196 fn default() -> Self {
197 Self {
198 sim_node: sui_simulator::runtime::NodeHandle::current(),
199 sim_safe_mode_expected: AtomicBool::new(false),
200 _leak_detector: sui_simulator::NodeLeakDetector::new(),
201 }
202 }
203 }
204
205 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
206 + Send
207 + Sync
208 + 'static;
209
210 fn default_fetch_jwks(
211 _authority: AuthorityName,
212 _provider: &OIDCProvider,
213 ) -> SuiResult<Vec<(JwkId, JWK)>> {
214 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
215 parse_jwks(
217 sui_types::zk_login_util::DEFAULT_JWK_BYTES,
218 &OIDCProvider::Twitch,
219 true,
220 )
221 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
222 }
223
224 thread_local! {
225 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
226 }
227
228 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
229 JWK_INJECTOR.with(|injector| injector.borrow().clone())
230 }
231
232 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
233 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
234 }
235}
236
237#[cfg(msim)]
238pub use simulator::set_jwk_injector;
239#[cfg(msim)]
240use simulator::*;
241use sui_core::authority::authority_store_pruner::PrunerWatermarks;
242use sui_core::{
243 consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
244};
245
246const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
247
248pub struct SuiNode {
249 config: NodeConfig,
250 validator_components: Mutex<Option<ValidatorComponents>>,
251
252 #[allow(unused)]
254 http_servers: HttpServers,
255
256 state: Arc<AuthorityState>,
257 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
258 registry_service: RegistryService,
259 metrics: Arc<SuiNodeMetrics>,
260 checkpoint_metrics: Arc<CheckpointMetrics>,
261
262 _discovery: discovery::Handle,
263 _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
264 state_sync_handle: state_sync::Handle,
265 randomness_handle: randomness::Handle,
266 checkpoint_store: Arc<CheckpointStore>,
267 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
268
269 end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
271
272 endpoint_manager: EndpointManager,
274
275 backpressure_manager: Arc<BackpressureManager>,
276
277 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
278
279 #[cfg(msim)]
280 sim_state: SimState,
281
282 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
283 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
285
286 randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
288
289 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
294
295 subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
296}
297
298impl fmt::Debug for SuiNode {
299 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
300 f.debug_struct("SuiNode")
301 .field("name", &self.state.name.concise())
302 .finish()
303 }
304}
305
306static MAX_JWK_KEYS_PER_FETCH: usize = 100;
307
308impl SuiNode {
309 pub async fn start(
310 config: NodeConfig,
311 registry_service: RegistryService,
312 ) -> Result<Arc<SuiNode>> {
313 Self::start_async(
314 config,
315 registry_service,
316 ServerVersion::new("sui-node", "unknown"),
317 )
318 .await
319 }
320
321 fn start_jwk_updater(
322 config: &NodeConfig,
323 metrics: Arc<SuiNodeMetrics>,
324 authority: AuthorityName,
325 epoch_store: Arc<AuthorityPerEpochStore>,
326 consensus_adapter: Arc<ConsensusAdapter>,
327 ) {
328 let epoch = epoch_store.epoch();
329
330 let supported_providers = config
331 .zklogin_oauth_providers
332 .get(&epoch_store.get_chain_identifier().chain())
333 .unwrap_or(&BTreeSet::new())
334 .iter()
335 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
336 .collect::<Vec<_>>();
337
338 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
339
340 info!(
341 ?fetch_interval,
342 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
343 );
344
345 fn validate_jwk(
346 metrics: &Arc<SuiNodeMetrics>,
347 provider: &OIDCProvider,
348 id: &JwkId,
349 jwk: &JWK,
350 ) -> bool {
351 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
352 warn!(
353 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
354 id.iss, provider
355 );
356 metrics
357 .invalid_jwks
358 .with_label_values(&[&provider.to_string()])
359 .inc();
360 return false;
361 };
362
363 if iss_provider != *provider {
364 warn!(
365 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
366 id.iss, provider, iss_provider
367 );
368 metrics
369 .invalid_jwks
370 .with_label_values(&[&provider.to_string()])
371 .inc();
372 return false;
373 }
374
375 if !check_total_jwk_size(id, jwk) {
376 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
377 metrics
378 .invalid_jwks
379 .with_label_values(&[&provider.to_string()])
380 .inc();
381 return false;
382 }
383
384 true
385 }
386
387 for p in supported_providers.into_iter() {
396 let provider_str = p.to_string();
397 let epoch_store = epoch_store.clone();
398 let consensus_adapter = consensus_adapter.clone();
399 let metrics = metrics.clone();
400 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
401 async move {
402 let mut seen = HashSet::new();
405 loop {
406 jwk_log!("fetching JWK for provider {:?}", p);
407 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
408 match Self::fetch_jwks(authority, &p).await {
409 Err(e) => {
410 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
411 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
412 tokio::time::sleep(Duration::from_secs(30)).await;
414 continue;
415 }
416 Ok(mut keys) => {
417 metrics.total_jwks
418 .with_label_values(&[&provider_str])
419 .inc_by(keys.len() as u64);
420
421 keys.retain(|(id, jwk)| {
422 validate_jwk(&metrics, &p, id, jwk) &&
423 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
424 seen.insert((id.clone(), jwk.clone()))
425 });
426
427 metrics.unique_jwks
428 .with_label_values(&[&provider_str])
429 .inc_by(keys.len() as u64);
430
431 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
434 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
435 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
436 }
437
438 for (id, jwk) in keys.into_iter() {
439 jwk_log!("Submitting JWK to consensus: {:?}", id);
440
441 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
442 consensus_adapter.submit(txn, None, &epoch_store, None, None)
443 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
444 .ok();
445 }
446 }
447 }
448 tokio::time::sleep(fetch_interval).await;
449 }
450 }
451 .instrument(error_span!("jwk_updater_task", epoch)),
452 ));
453 }
454 }
455
456 pub async fn start_async(
457 config: NodeConfig,
458 registry_service: RegistryService,
459 server_version: ServerVersion,
460 ) -> Result<Arc<SuiNode>> {
461 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
462 let mut config = config.clone();
463 if config.supported_protocol_versions.is_none() {
464 info!(
465 "populating config.supported_protocol_versions with default {:?}",
466 SupportedProtocolVersions::SYSTEM_DEFAULT
467 );
468 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
469 }
470
471 let run_with_range = config.run_with_range;
472 let prometheus_registry = registry_service.default_registry();
473 let node_role = config.intended_node_role();
474
475 info!(node =? config.protocol_public_key(),
476 "Initializing sui-node listening on {} with role {:?}", config.network_address, node_role
477 );
478
479 DBMetrics::init(registry_service.clone());
481
482 typed_store::init_write_sync(config.enable_db_sync_to_disk);
484
485 mysten_metrics::init_metrics(&prometheus_registry);
487 #[cfg(not(msim))]
489 mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
490
491 let genesis = config.genesis()?.clone();
492
493 let secret = Arc::pin(config.protocol_key_pair().copy());
494 let genesis_committee = genesis.committee();
495 let committee_store = Arc::new(CommitteeStore::new(
496 config.db_path().join("epochs"),
497 &genesis_committee,
498 None,
499 ));
500
501 let pruner_watermarks = Arc::new(PrunerWatermarks::default());
502 let checkpoint_store = CheckpointStore::new(
503 &config.db_path().join("checkpoints"),
504 pruner_watermarks.clone(),
505 );
506 let checkpoint_metrics = CheckpointMetrics::new(®istry_service.default_registry());
507
508 if node_role.runs_consensus() {
509 Self::check_and_recover_forks(
510 &checkpoint_store,
511 &checkpoint_metrics,
512 config.fork_recovery.as_ref(),
513 )
514 .await?;
515 }
516
517 let enable_write_stall = config
519 .enable_db_write_stall
520 .unwrap_or(node_role.runs_consensus());
521 let enable_objects_compactor = node_role.is_validator()
528 || config.authority_store_pruning_config.num_epochs_to_retain == 0;
529 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
530 enable_write_stall,
531 enable_objects_compactor,
532 };
533 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
534 &config.db_store_path(),
535 Some(perpetual_tables_options),
536 Some(pruner_watermarks.epoch_id.clone()),
537 ));
538 let is_genesis = perpetual_tables
539 .database_is_empty()
540 .expect("Database read should not fail at init.");
541
542 let backpressure_manager =
543 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
544
545 let store =
546 AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
547
548 let cur_epoch = store.get_recovery_epoch_at_restart()?;
549 let committee = committee_store
550 .get_committee(&cur_epoch)?
551 .expect("Committee of the current epoch must exist");
552 let epoch_start_configuration = store
553 .get_epoch_start_configuration()?
554 .expect("EpochStartConfiguration of the current epoch must exist");
555 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
556 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
557
558 let cache_traits = build_execution_cache(
559 &config.execution_cache,
560 &prometheus_registry,
561 &store,
562 backpressure_manager.clone(),
563 );
564
565 let auth_agg = {
566 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
567 Arc::new(ArcSwap::new(Arc::new(
568 AuthorityAggregator::new_from_epoch_start_state(
569 epoch_start_configuration.epoch_start_state(),
570 &committee_store,
571 safe_client_metrics_base,
572 ),
573 )))
574 };
575
576 let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
577 let chain = match config.chain_override_for_testing {
578 Some(chain) => chain,
579 None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
580 };
581
582 let highest_executed_checkpoint = checkpoint_store
583 .get_highest_executed_checkpoint_seq_number()
584 .expect("checkpoint store read cannot fail")
585 .unwrap_or(0);
586
587 let previous_epoch_last_checkpoint = if cur_epoch == 0 {
588 0
589 } else {
590 checkpoint_store
591 .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
592 .expect("checkpoint store read cannot fail")
593 .unwrap_or(highest_executed_checkpoint)
594 };
595
596 let epoch_options = default_db_options().optimize_db_for_write_throughput(4, false);
597 let epoch_store = AuthorityPerEpochStore::new(
598 config.protocol_public_key(),
599 committee.clone(),
600 &config.db_store_path(),
601 Some(epoch_options.options),
602 EpochMetrics::new(®istry_service.default_registry()),
603 epoch_start_configuration,
604 cache_traits.backing_package_store.clone(),
605 cache_traits.object_store.clone(),
606 cache_metrics,
607 signature_verifier_metrics,
608 &config.expensive_safety_check_config,
609 (chain_id, chain),
610 highest_executed_checkpoint,
611 previous_epoch_last_checkpoint,
612 Arc::new(SubmittedTransactionCacheMetrics::new(
613 ®istry_service.default_registry(),
614 )),
615 config.fullnode_sync_mode,
616 )?;
617
618 info!("created epoch store");
619
620 replay_log!(
621 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
622 epoch_store.epoch(),
623 epoch_store.protocol_config()
624 );
625
626 if is_genesis {
628 info!("checking SUI conservation at genesis");
629 cache_traits
634 .reconfig_api
635 .expensive_check_sui_conservation(&epoch_store)
636 .expect("SUI conservation check cannot fail at genesis");
637 }
638
639 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
640 let default_buffer_stake = epoch_store
641 .protocol_config()
642 .buffer_stake_for_protocol_upgrade_bps();
643 if effective_buffer_stake != default_buffer_stake {
644 warn!(
645 ?effective_buffer_stake,
646 ?default_buffer_stake,
647 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
648 );
649 }
650
651 checkpoint_store.insert_genesis_checkpoint(
652 genesis.checkpoint(),
653 genesis.checkpoint_contents().clone(),
654 &epoch_store,
655 );
656
657 info!("creating state sync store");
658 let state_sync_store = RocksDbStore::new(
659 cache_traits.clone(),
660 committee_store.clone(),
661 checkpoint_store.clone(),
662 );
663
664 let index_store =
665 if node_role.should_enable_index_processing() && config.enable_index_processing {
666 info!("creating jsonrpc index store");
667 Some(Arc::new(IndexStore::new(
668 config.db_path().join("indexes"),
669 &prometheus_registry,
670 epoch_store
671 .protocol_config()
672 .max_move_identifier_len_as_option(),
673 config.remove_deprecated_tables,
674 )))
675 } else {
676 None
677 };
678
679 let rpc_index = if node_role.should_enable_index_processing()
680 && config.rpc().is_some_and(|rpc| rpc.enable_indexing())
681 {
682 info!("creating rpc index store");
683 Some(Arc::new(
684 RpcIndexStore::new(
685 &config.db_path(),
686 &store,
687 &checkpoint_store,
688 &epoch_store,
689 &cache_traits.backing_package_store,
690 config.rpc().cloned().unwrap_or_default(),
691 )
692 .await,
693 ))
694 } else {
695 None
696 };
697
698 let chain_identifier = epoch_store.get_chain_identifier();
699
700 info!("creating archive reader");
701 let (randomness_tx, randomness_rx) = mpsc::channel(
703 config
704 .p2p_config
705 .randomness
706 .clone()
707 .unwrap_or_default()
708 .mailbox_capacity(),
709 );
710 let P2pComponents {
711 p2p_network,
712 known_peers,
713 discovery_handle,
714 state_sync_handle,
715 randomness_handle,
716 endpoint_manager,
717 } = Self::create_p2p_network(
718 &config,
719 state_sync_store.clone(),
720 chain_identifier,
721 randomness_tx,
722 &prometheus_registry,
723 )?;
724
725 for peer in &config.p2p_config.peer_address_overrides {
727 endpoint_manager
728 .update_endpoint(
729 EndpointId::P2p(peer.peer_id),
730 AddressSource::Config,
731 peer.addresses.clone(),
732 )
733 .expect("Updating peer address overrides should not fail");
734 }
735
736 update_peer_addresses(
738 &config,
739 &endpoint_manager,
740 epoch_store.epoch_start_state(),
741 None,
742 );
743
744 info!("start snapshot upload");
745 let state_snapshot_handle = Self::start_state_snapshot(
747 &config,
748 &prometheus_registry,
749 checkpoint_store.clone(),
750 chain_identifier,
751 )?;
752
753 info!("start db checkpoint");
755 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
756 &config,
757 &prometheus_registry,
758 state_snapshot_handle.is_some(),
759 )?;
760
761 if !epoch_store
762 .protocol_config()
763 .simplified_unwrap_then_delete()
764 {
765 config
767 .authority_store_pruning_config
768 .set_killswitch_tombstone_pruning(true);
769 }
770
771 let authority_name = config.protocol_public_key();
772
773 info!("create authority state");
774 let state = AuthorityState::new(
775 authority_name,
776 secret,
777 config.supported_protocol_versions.unwrap(),
778 store.clone(),
779 cache_traits.clone(),
780 epoch_store.clone(),
781 committee_store.clone(),
782 index_store.clone(),
783 rpc_index,
784 checkpoint_store.clone(),
785 &prometheus_registry,
786 genesis.objects(),
787 &db_checkpoint_config,
788 config.clone(),
789 chain_identifier,
790 config.policy_config.clone(),
791 config.firewall_config.clone(),
792 pruner_watermarks,
793 )
794 .await;
795 if epoch_store.epoch() == 0 {
797 let txn = &genesis.transaction();
798 let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
799 let transaction =
800 sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
801 sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
802 genesis.transaction().data().clone(),
803 sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
804 ),
805 );
806 let _enter = span.enter();
807 state
808 .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
809 .unwrap();
810 }
811
812 let randomness_receiver_handle =
815 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
816
817 let (end_of_epoch_channel, end_of_epoch_receiver) =
818 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
819
820 let transaction_orchestrator = if node_role.is_fullnode() && run_with_range.is_none() {
821 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
822 auth_agg.load_full(),
823 state.clone(),
824 end_of_epoch_receiver,
825 &config.db_path(),
826 &prometheus_registry,
827 &config,
828 )))
829 } else {
830 None
831 };
832
833 let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
834 state.clone(),
835 state_sync_store,
836 &transaction_orchestrator.clone(),
837 &config,
838 &prometheus_registry,
839 server_version,
840 node_role,
841 )
842 .await?;
843
844 let global_state_hasher = Arc::new(GlobalStateHasher::new(
845 cache_traits.global_state_hash_store.clone(),
846 GlobalStateHashMetrics::new(&prometheus_registry),
847 ));
848
849 let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
850 "sui",
851 ®istry_service.default_registry(),
852 );
853
854 let connection_monitor_handle =
855 mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
856 p2p_network.downgrade(),
857 Arc::new(network_connection_metrics),
858 known_peers,
859 );
860
861 let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry()));
862
863 sui_node_metrics
864 .binary_max_protocol_version
865 .set(ProtocolVersion::MAX.as_u64() as i64);
866 sui_node_metrics
867 .configured_max_protocol_version
868 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
869
870 let node_role = epoch_store.node_role();
871 let validator_components = if node_role.runs_consensus() {
872 let mut components = Self::construct_validator_components(
873 config.clone(),
874 state.clone(),
875 committee,
876 epoch_store.clone(),
877 checkpoint_store.clone(),
878 state_sync_handle.clone(),
879 randomness_handle.clone(),
880 Arc::downgrade(&global_state_hasher),
881 backpressure_manager.clone(),
882 ®istry_service,
883 sui_node_metrics.clone(),
884 checkpoint_metrics.clone(),
885 node_role,
886 randomness_receiver_handle.clone(),
887 )
888 .await?;
889
890 if node_role.is_validator() {
891 components
892 .consensus_adapter
893 .recover_end_of_publish(&epoch_store);
894
895 components.validator_server_handle = Some(
897 components
898 .validator_server_handle
899 .take()
900 .unwrap()
901 .start()
902 .await,
903 );
904
905 endpoint_manager
907 .set_consensus_address_updater(components.consensus_manager.clone());
908 } else {
909 info!("Starting node as Observer — connecting to configured peers");
910 }
911
912 Some(components)
913 } else {
914 None
915 };
916
917 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
919
920 let node = Self {
921 config,
922 validator_components: Mutex::new(validator_components),
923 http_servers,
924 state,
925 transaction_orchestrator,
926 registry_service,
927 metrics: sui_node_metrics,
928 checkpoint_metrics,
929
930 _discovery: discovery_handle,
931 _connection_monitor_handle: connection_monitor_handle,
932 state_sync_handle,
933 randomness_handle,
934 checkpoint_store,
935 global_state_hasher: Mutex::new(Some(global_state_hasher)),
936 end_of_epoch_channel,
937 endpoint_manager,
938 backpressure_manager,
939
940 _db_checkpoint_handle: db_checkpoint_handle,
941
942 #[cfg(msim)]
943 sim_state: Default::default(),
944
945 _state_snapshot_uploader_handle: state_snapshot_handle,
946 shutdown_channel_tx: shutdown_channel,
947 randomness_receiver_handle,
948
949 auth_agg,
950 subscription_service_checkpoint_sender,
951 };
952
953 info!("SuiNode started!");
954 let node = Arc::new(node);
955 let node_copy = node.clone();
956 spawn_monitored_task!(async move {
957 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
958 if let Err(error) = result {
959 warn!("Reconfiguration finished with error {:?}", error);
960 }
961 });
962
963 Ok(node)
964 }
965
966 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
967 self.end_of_epoch_channel.subscribe()
968 }
969
970 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
971 self.shutdown_channel_tx.subscribe()
972 }
973
974 pub fn current_epoch_for_testing(&self) -> EpochId {
975 self.state.current_epoch_for_testing()
976 }
977
978 pub fn db_checkpoint_path(&self) -> PathBuf {
979 self.config.db_checkpoint_path()
980 }
981
982 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
984 info!("close_epoch (current epoch = {})", epoch_store.epoch());
985 self.validator_components
986 .lock()
987 .await
988 .as_ref()
989 .ok_or_else(|| SuiError::from("Node is not a validator"))?
990 .consensus_adapter
991 .close_epoch(epoch_store);
992 Ok(())
993 }
994
995 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
996 self.state
997 .clear_override_protocol_upgrade_buffer_stake(epoch)
998 }
999
1000 pub fn set_override_protocol_upgrade_buffer_stake(
1001 &self,
1002 epoch: EpochId,
1003 buffer_stake_bps: u64,
1004 ) -> SuiResult {
1005 self.state
1006 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1007 }
1008
1009 pub async fn close_epoch_for_testing(&self) -> SuiResult {
1012 let epoch_store = self.state.epoch_store_for_testing();
1013 self.close_epoch(&epoch_store).await
1014 }
1015
1016 fn start_state_snapshot(
1017 config: &NodeConfig,
1018 prometheus_registry: &Registry,
1019 checkpoint_store: Arc<CheckpointStore>,
1020 chain_identifier: ChainIdentifier,
1021 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1022 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1023 let snapshot_uploader = StateSnapshotUploader::new(
1024 &config.db_checkpoint_path(),
1025 &config.snapshot_path(),
1026 remote_store_config.clone(),
1027 60,
1028 prometheus_registry,
1029 checkpoint_store,
1030 chain_identifier,
1031 config.state_snapshot_write_config.archive_interval_epochs,
1032 )?;
1033 Ok(Some(snapshot_uploader.start()))
1034 } else {
1035 Ok(None)
1036 }
1037 }
1038
1039 fn start_db_checkpoint(
1040 config: &NodeConfig,
1041 prometheus_registry: &Registry,
1042 state_snapshot_enabled: bool,
1043 ) -> Result<(
1044 DBCheckpointConfig,
1045 Option<tokio::sync::broadcast::Sender<()>>,
1046 )> {
1047 let checkpoint_path = Some(
1048 config
1049 .db_checkpoint_config
1050 .checkpoint_path
1051 .clone()
1052 .unwrap_or_else(|| config.db_checkpoint_path()),
1053 );
1054 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1055 DBCheckpointConfig {
1056 checkpoint_path,
1057 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1058 true
1059 } else {
1060 config
1061 .db_checkpoint_config
1062 .perform_db_checkpoints_at_epoch_end
1063 },
1064 ..config.db_checkpoint_config.clone()
1065 }
1066 } else {
1067 config.db_checkpoint_config.clone()
1068 };
1069
1070 match (
1071 db_checkpoint_config.object_store_config.as_ref(),
1072 state_snapshot_enabled,
1073 ) {
1074 (None, false) => Ok((db_checkpoint_config, None)),
1079 (_, _) => {
1080 let handler = DBCheckpointHandler::new(
1081 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1082 db_checkpoint_config.object_store_config.as_ref(),
1083 60,
1084 db_checkpoint_config
1085 .prune_and_compact_before_upload
1086 .unwrap_or(true),
1087 config.authority_store_pruning_config.clone(),
1088 prometheus_registry,
1089 state_snapshot_enabled,
1090 )?;
1091 Ok((
1092 db_checkpoint_config,
1093 Some(DBCheckpointHandler::start(handler)),
1094 ))
1095 }
1096 }
1097 }
1098
1099 fn create_p2p_network(
1100 config: &NodeConfig,
1101 state_sync_store: RocksDbStore,
1102 chain_identifier: ChainIdentifier,
1103 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1104 prometheus_registry: &Registry,
1105 ) -> Result<P2pComponents> {
1106 let mut p2p_config = config.p2p_config.clone();
1107 {
1108 let disc = p2p_config.discovery.get_or_insert_with(Default::default);
1109 if disc.peer_addr_store_path.is_none() {
1110 disc.peer_addr_store_path =
1111 Some(config.db_path().join("discovery_peer_cache.yaml"));
1112 }
1113 }
1114 let mut discovery_builder = discovery::Builder::new().config(p2p_config.clone());
1115 if let Some(consensus_config) = &config.consensus_config {
1116 let effective_addr = consensus_config
1117 .external_address
1118 .as_ref()
1119 .or(consensus_config.listen_address.as_ref());
1120 if let Some(addr) = effective_addr {
1121 discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1122 }
1123 }
1124 let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1125 let discovery_sender = discovery.sender();
1126
1127 let (state_sync, state_sync_router) = state_sync::Builder::new()
1128 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1129 .store(state_sync_store)
1130 .archive_config(config.archive_reader_config())
1131 .discovery_sender(discovery_sender)
1132 .with_metrics(prometheus_registry)
1133 .build();
1134
1135 let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1136 let known_peers: HashMap<PeerId, String> = discovery_config
1137 .allowlisted_peers
1138 .clone()
1139 .into_iter()
1140 .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1141 .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1142 peer.peer_id
1143 .map(|peer_id| (peer_id, "seed_peer".to_string()))
1144 }))
1145 .collect();
1146
1147 let (randomness, randomness_router) =
1148 randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1149 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1150 .with_metrics(prometheus_registry)
1151 .build();
1152
1153 let p2p_network = {
1154 let routes = anemo::Router::new()
1155 .add_rpc_service(discovery_server)
1156 .merge(state_sync_router);
1157 let routes = routes.merge(randomness_router);
1158
1159 let inbound_network_metrics =
1160 mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1161 let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1162 "sui",
1163 "outbound",
1164 prometheus_registry,
1165 );
1166
1167 let service = ServiceBuilder::new()
1168 .layer(
1169 TraceLayer::new_for_server_errors()
1170 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1171 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1172 )
1173 .layer(CallbackLayer::new(
1174 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1175 Arc::new(inbound_network_metrics),
1176 config.p2p_config.excessive_message_size(),
1177 ),
1178 ))
1179 .service(routes);
1180
1181 let outbound_layer = ServiceBuilder::new()
1182 .layer(
1183 TraceLayer::new_for_client_and_server_errors()
1184 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1185 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1186 )
1187 .layer(CallbackLayer::new(
1188 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1189 Arc::new(outbound_network_metrics),
1190 config.p2p_config.excessive_message_size(),
1191 ),
1192 ))
1193 .into_inner();
1194
1195 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1196 anemo_config.max_request_frame_size = Some(1 << 20);
1199 anemo_config.max_response_frame_size = Some(128 << 20);
1202
1203 let mut quic_config = anemo_config.quic.unwrap_or_default();
1206 if quic_config.socket_send_buffer_size.is_none() {
1207 quic_config.socket_send_buffer_size = Some(20 << 20);
1208 }
1209 if quic_config.socket_receive_buffer_size.is_none() {
1210 quic_config.socket_receive_buffer_size = Some(20 << 20);
1211 }
1212 quic_config.allow_failed_socket_buffer_size_setting = true;
1213
1214 if quic_config.max_concurrent_bidi_streams.is_none() {
1217 quic_config.max_concurrent_bidi_streams = Some(500);
1218 }
1219 if quic_config.max_concurrent_uni_streams.is_none() {
1220 quic_config.max_concurrent_uni_streams = Some(500);
1221 }
1222 if quic_config.stream_receive_window.is_none() {
1223 quic_config.stream_receive_window = Some(100 << 20);
1224 }
1225 if quic_config.receive_window.is_none() {
1226 quic_config.receive_window = Some(200 << 20);
1227 }
1228 if quic_config.send_window.is_none() {
1229 quic_config.send_window = Some(200 << 20);
1230 }
1231 if quic_config.crypto_buffer_size.is_none() {
1232 quic_config.crypto_buffer_size = Some(1 << 20);
1233 }
1234 if quic_config.max_idle_timeout_ms.is_none() {
1235 quic_config.max_idle_timeout_ms = Some(10_000);
1236 }
1237 if quic_config.keep_alive_interval_ms.is_none() {
1238 quic_config.keep_alive_interval_ms = Some(5_000);
1239 }
1240 anemo_config.quic = Some(quic_config);
1241
1242 let server_name = format!("sui-{}", chain_identifier);
1243 let network = Network::bind(config.p2p_config.listen_address)
1244 .server_name(&server_name)
1245 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1246 .config(anemo_config)
1247 .outbound_request_layer(outbound_layer)
1248 .start(service)?;
1249 info!(
1250 server_name = server_name,
1251 "P2p network started on {}",
1252 network.local_addr()
1253 );
1254
1255 network
1256 };
1257
1258 let discovery_handle =
1259 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1260 let state_sync_handle = state_sync.start(p2p_network.clone());
1261 let randomness_handle = randomness.start(p2p_network.clone());
1262
1263 Ok(P2pComponents {
1264 p2p_network,
1265 known_peers,
1266 discovery_handle,
1267 state_sync_handle,
1268 randomness_handle,
1269 endpoint_manager,
1270 })
1271 }
1272
1273 async fn construct_validator_components(
1274 config: NodeConfig,
1275 state: Arc<AuthorityState>,
1276 committee: Arc<Committee>,
1277 epoch_store: Arc<AuthorityPerEpochStore>,
1278 checkpoint_store: Arc<CheckpointStore>,
1279 state_sync_handle: state_sync::Handle,
1280 randomness_handle: randomness::Handle,
1281 global_state_hasher: Weak<GlobalStateHasher>,
1282 backpressure_manager: Arc<BackpressureManager>,
1283 registry_service: &RegistryService,
1284 sui_node_metrics: Arc<SuiNodeMetrics>,
1285 checkpoint_metrics: Arc<CheckpointMetrics>,
1286 node_role: NodeRole,
1287 randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
1288 ) -> Result<ValidatorComponents> {
1289 let mut config_clone = config.clone();
1290 let consensus_config = config_clone
1291 .consensus_config
1292 .as_mut()
1293 .ok_or_else(|| anyhow!("Node is missing consensus config"))?;
1294
1295 let client = Arc::new(UpdatableConsensusClient::new());
1296 let inflight_slot_freed_notify = Arc::new(tokio::sync::Notify::new());
1297 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1298 &committee,
1299 consensus_config,
1300 state.name,
1301 ®istry_service.default_registry(),
1302 client.clone(),
1303 checkpoint_store.clone(),
1304 inflight_slot_freed_notify.clone(),
1305 ));
1306
1307 let consensus_manager = Arc::new(ConsensusManager::new(
1308 &config,
1309 consensus_config,
1310 registry_service,
1311 client,
1312 node_role,
1313 ));
1314
1315 let consensus_store_pruner = ConsensusStorePruner::new(
1317 consensus_manager.get_storage_base_path(),
1318 consensus_config.db_retention_epochs(),
1319 consensus_config.db_pruner_period(),
1320 ®istry_service.default_registry(),
1321 );
1322
1323 let sui_tx_validator_metrics =
1324 SuiTxValidatorMetrics::new(®istry_service.default_registry());
1325
1326 let (validator_server_handle, admission_queue) = if node_role.is_validator() {
1327 let (handle, queue) = Self::start_grpc_validator_service(
1328 &config,
1329 state.clone(),
1330 consensus_adapter.clone(),
1331 epoch_store.clone(),
1332 ®istry_service.default_registry(),
1333 inflight_slot_freed_notify,
1334 )
1335 .await?;
1336 (Some(handle), queue)
1337 } else {
1338 (None, None)
1339 };
1340
1341 let validator_overload_monitor_handle = if node_role.is_validator()
1344 && config
1345 .authority_overload_config
1346 .max_load_shedding_percentage
1347 > 0
1348 {
1349 let authority_state = Arc::downgrade(&state);
1350 let overload_config = config.authority_overload_config.clone();
1351 fail_point!("starting_overload_monitor");
1352 Some(spawn_monitored_task!(overload_monitor(
1353 authority_state,
1354 overload_config,
1355 )))
1356 } else {
1357 None
1358 };
1359
1360 Self::start_epoch_specific_validator_components(
1361 &config,
1362 state.clone(),
1363 consensus_adapter,
1364 checkpoint_store,
1365 epoch_store,
1366 state_sync_handle,
1367 randomness_handle,
1368 randomness_receiver_handle,
1369 consensus_manager,
1370 consensus_store_pruner,
1371 global_state_hasher,
1372 backpressure_manager,
1373 validator_server_handle,
1374 validator_overload_monitor_handle,
1375 checkpoint_metrics,
1376 sui_node_metrics,
1377 sui_tx_validator_metrics,
1378 admission_queue,
1379 node_role,
1380 )
1381 .await
1382 }
1383
1384 async fn start_epoch_specific_validator_components(
1385 config: &NodeConfig,
1386 state: Arc<AuthorityState>,
1387 consensus_adapter: Arc<ConsensusAdapter>,
1388 checkpoint_store: Arc<CheckpointStore>,
1389 epoch_store: Arc<AuthorityPerEpochStore>,
1390 state_sync_handle: state_sync::Handle,
1391 randomness_handle: randomness::Handle,
1392 randomness_receiver_handle: Arc<RandomnessRoundReceiverHandle>,
1393 consensus_manager: Arc<ConsensusManager>,
1394 consensus_store_pruner: ConsensusStorePruner,
1395 state_hasher: Weak<GlobalStateHasher>,
1396 backpressure_manager: Arc<BackpressureManager>,
1397 validator_server_handle: Option<SpawnOnce>,
1398 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1399 checkpoint_metrics: Arc<CheckpointMetrics>,
1400 sui_node_metrics: Arc<SuiNodeMetrics>,
1401 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1402 admission_queue: Option<AdmissionQueueContext>,
1403 node_role: NodeRole,
1404 ) -> Result<ValidatorComponents> {
1405 let checkpoint_service = Self::build_checkpoint_service(
1406 config,
1407 consensus_adapter.clone(),
1408 checkpoint_store.clone(),
1409 epoch_store.clone(),
1410 state.clone(),
1411 state_sync_handle,
1412 state_hasher,
1413 checkpoint_metrics.clone(),
1414 node_role,
1415 );
1416
1417 randomness_receiver_handle.clear_public_key();
1420
1421 if node_role.runs_consensus() && epoch_store.randomness_state_enabled() {
1422 let authority_key_pair = if node_role.is_validator() {
1423 Some(config.protocol_key_pair())
1424 } else {
1425 None
1426 };
1427 let randomness_manager = RandomnessManager::try_new(
1428 Arc::downgrade(&epoch_store),
1429 Box::new(consensus_adapter.clone()),
1430 randomness_handle,
1431 authority_key_pair,
1432 randomness_receiver_handle.clone(),
1433 )
1434 .await;
1435 if let Some(randomness_manager) = randomness_manager {
1436 epoch_store
1437 .set_randomness_manager(randomness_manager)
1438 .await?;
1439 }
1440 }
1441
1442 if node_role.is_validator() {
1443 ExecutionTimeObserver::spawn(
1444 epoch_store.clone(),
1445 Box::new(consensus_adapter.clone()),
1446 config
1447 .execution_time_observer_config
1448 .clone()
1449 .unwrap_or_default(),
1450 );
1451 }
1452
1453 let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1454 None,
1455 state.metrics.clone(),
1456 ));
1457
1458 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1459 state.clone(),
1460 checkpoint_service.clone(),
1461 epoch_store.clone(),
1462 consensus_adapter.clone(),
1463 throughput_calculator,
1464 backpressure_manager,
1465 config.congestion_log.clone(),
1466 );
1467
1468 info!("Starting consensus manager asynchronously");
1469
1470 tokio::spawn({
1472 let config = config.clone();
1473 let epoch_store = epoch_store.clone();
1474 let sui_tx_validator = SuiTxValidator::new(
1475 state.clone(),
1476 epoch_store.clone(),
1477 checkpoint_service.clone(),
1478 sui_tx_validator_metrics.clone(),
1479 );
1480 let consensus_manager = consensus_manager.clone();
1481 async move {
1482 consensus_manager
1483 .start(
1484 &config,
1485 epoch_store,
1486 consensus_handler_initializer,
1487 sui_tx_validator,
1488 Some(randomness_receiver_handle),
1489 )
1490 .await;
1491 }
1492 });
1493 let replay_waiter = consensus_manager.replay_waiter();
1494
1495 info!("Spawning checkpoint service");
1496 let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1497 None
1498 } else {
1499 Some(replay_waiter)
1500 };
1501 checkpoint_service
1502 .spawn(epoch_store.clone(), replay_waiter)
1503 .await;
1504
1505 if node_role.is_validator() && epoch_store.authenticator_state_enabled() {
1506 Self::start_jwk_updater(
1507 config,
1508 sui_node_metrics,
1509 state.name,
1510 epoch_store.clone(),
1511 consensus_adapter.clone(),
1512 );
1513 }
1514
1515 if let Some(ctx) = &admission_queue {
1516 ctx.rotate_for_epoch(epoch_store);
1517 }
1518
1519 Ok(ValidatorComponents {
1520 validator_server_handle,
1521 validator_overload_monitor_handle,
1522 consensus_manager,
1523 consensus_store_pruner,
1524 consensus_adapter,
1525 checkpoint_metrics,
1526 sui_tx_validator_metrics,
1527 admission_queue,
1528 })
1529 }
1530
1531 fn build_checkpoint_service(
1532 config: &NodeConfig,
1533 consensus_adapter: Arc<ConsensusAdapter>,
1534 checkpoint_store: Arc<CheckpointStore>,
1535 epoch_store: Arc<AuthorityPerEpochStore>,
1536 state: Arc<AuthorityState>,
1537 state_sync_handle: state_sync::Handle,
1538 state_hasher: Weak<GlobalStateHasher>,
1539 checkpoint_metrics: Arc<CheckpointMetrics>,
1540 node_role: NodeRole,
1541 ) -> Arc<CheckpointService> {
1542 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1543 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1544
1545 debug!(
1546 "Starting checkpoint service with epoch start timestamp {}
1547 and epoch duration {}",
1548 epoch_start_timestamp_ms, epoch_duration_ms
1549 );
1550
1551 let checkpoint_output: Box<dyn CheckpointOutput> = if node_role.is_validator() {
1552 Box::new(SubmitCheckpointToConsensus {
1553 sender: consensus_adapter,
1554 signer: state.secret.clone(),
1555 authority: config.protocol_public_key(),
1556 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1557 .checked_add(epoch_duration_ms)
1558 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1559 metrics: checkpoint_metrics.clone(),
1560 })
1561 } else {
1562 LogCheckpointOutput::boxed()
1563 };
1564
1565 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1566 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1567 let max_checkpoint_size_bytes =
1568 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1569
1570 CheckpointService::build(
1571 state.clone(),
1572 checkpoint_store,
1573 epoch_store,
1574 state.get_transaction_cache_reader().clone(),
1575 state_hasher,
1576 checkpoint_output,
1577 Box::new(certified_checkpoint_output),
1578 checkpoint_metrics,
1579 max_tx_per_checkpoint,
1580 max_checkpoint_size_bytes,
1581 )
1582 }
1583
1584 fn construct_consensus_adapter(
1585 committee: &Committee,
1586 consensus_config: &ConsensusConfig,
1587 authority: AuthorityName,
1588 prometheus_registry: &Registry,
1589 consensus_client: Arc<dyn ConsensusClient>,
1590 checkpoint_store: Arc<CheckpointStore>,
1591 inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1592 ) -> ConsensusAdapter {
1593 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1594 ConsensusAdapter::new(
1597 consensus_client,
1598 checkpoint_store,
1599 authority,
1600 consensus_config.max_pending_transactions(),
1601 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1602 ca_metrics,
1603 inflight_slot_freed_notify,
1604 )
1605 }
1606
1607 async fn start_grpc_validator_service(
1608 config: &NodeConfig,
1609 state: Arc<AuthorityState>,
1610 consensus_adapter: Arc<ConsensusAdapter>,
1611 epoch_store: Arc<AuthorityPerEpochStore>,
1612 prometheus_registry: &Registry,
1613 inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1614 ) -> Result<(SpawnOnce, Option<AdmissionQueueContext>)> {
1615 let overload_config = &config.authority_overload_config;
1616 let admission_queue = overload_config.admission_queue_enabled.then(|| {
1617 let manager = Arc::new(AdmissionQueueManager::new(
1618 consensus_adapter.clone(),
1619 Arc::new(AdmissionQueueMetrics::new(prometheus_registry)),
1620 overload_config.admission_queue_capacity_fraction,
1621 overload_config.admission_queue_bypass_fraction,
1622 overload_config.admission_queue_failover_timeout,
1623 inflight_slot_freed_notify,
1624 ));
1625 AdmissionQueueContext::spawn(manager, epoch_store)
1626 });
1627 let validator_service = ValidatorService::new(
1628 state.clone(),
1629 consensus_adapter,
1630 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1631 config.policy_config.clone().map(|p| p.client_id_source),
1632 admission_queue.clone(),
1633 );
1634
1635 let mut server_conf = mysten_network::config::Config::new();
1636 server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1637 server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1638 server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1639 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1640 server_conf.load_shed = config.grpc_load_shed;
1641 let mut server_builder =
1642 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1643
1644 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1645
1646 let tls_config = sui_tls::create_rustls_server_config(
1647 config.network_key_pair().copy().private(),
1648 SUI_TLS_SERVER_NAME.to_string(),
1649 );
1650
1651 let network_address = config.network_address().clone();
1652
1653 let (ready_tx, ready_rx) = oneshot::channel();
1654
1655 let spawn_once = SpawnOnce::new(ready_rx, async move {
1656 let server = server_builder
1657 .bind(&network_address, Some(tls_config))
1658 .await
1659 .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1660 let local_addr = server.local_addr();
1661 info!("Listening to traffic on {local_addr}");
1662 ready_tx.send(()).unwrap();
1663 if let Err(err) = server.serve().await {
1664 info!("Server stopped: {err}");
1665 }
1666 info!("Server stopped");
1667 });
1668 Ok((spawn_once, admission_queue))
1669 }
1670
1671 pub fn state(&self) -> Arc<AuthorityState> {
1672 self.state.clone()
1673 }
1674
1675 #[cfg(any(test, msim))]
1676 pub fn connection_monitor_handle_for_testing(
1677 &self,
1678 ) -> &mysten_network::anemo_connection_monitor::ConnectionMonitorHandle {
1679 &self._connection_monitor_handle
1680 }
1681
1682 pub fn node_role(&self) -> NodeRole {
1683 self.state.load_epoch_store_one_call_per_task().node_role()
1684 }
1685
1686 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1688 self.state.reference_gas_price_for_testing()
1689 }
1690
1691 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1692 self.state.committee_store().clone()
1693 }
1694
1695 pub fn clone_authority_aggregator(
1706 &self,
1707 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1708 self.transaction_orchestrator
1709 .as_ref()
1710 .map(|to| to.clone_authority_aggregator())
1711 }
1712
1713 pub fn transaction_orchestrator(
1714 &self,
1715 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1716 self.transaction_orchestrator.clone()
1717 }
1718
1719 pub async fn monitor_reconfiguration(
1722 self: Arc<Self>,
1723 mut epoch_store: Arc<AuthorityPerEpochStore>,
1724 ) -> Result<()> {
1725 let checkpoint_executor_metrics =
1726 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1727
1728 loop {
1729 let mut hasher_guard = self.global_state_hasher.lock().await;
1730 let hasher = hasher_guard.take().unwrap();
1731 info!(
1732 "Creating checkpoint executor for epoch {}",
1733 epoch_store.epoch()
1734 );
1735 let checkpoint_executor = CheckpointExecutor::new(
1736 epoch_store.clone(),
1737 self.checkpoint_store.clone(),
1738 self.state.clone(),
1739 hasher.clone(),
1740 self.backpressure_manager.clone(),
1741 self.config.checkpoint_executor_config.clone(),
1742 checkpoint_executor_metrics.clone(),
1743 self.subscription_service_checkpoint_sender.clone(),
1744 );
1745
1746 let run_with_range = self.config.run_with_range;
1747
1748 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1749
1750 self.metrics
1752 .current_protocol_version
1753 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1754
1755 if let Some(components) = &*self.validator_components.lock().await
1758 && cur_epoch_store.is_validator()
1759 {
1760 tokio::time::sleep(Duration::from_millis(1)).await;
1762
1763 let config = cur_epoch_store.protocol_config();
1764 let mut supported_protocol_versions = self
1765 .config
1766 .supported_protocol_versions
1767 .expect("Supported versions should be populated")
1768 .truncate_below(config.version);
1770
1771 while supported_protocol_versions.max > config.version {
1772 let proposed_protocol_config = ProtocolConfig::get_for_version(
1773 supported_protocol_versions.max,
1774 cur_epoch_store.get_chain(),
1775 );
1776
1777 if proposed_protocol_config.enable_accumulators()
1778 && !epoch_store.accumulator_root_exists()
1779 {
1780 error!(
1781 "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1782 supported_protocol_versions.max
1783 );
1784 supported_protocol_versions.max = supported_protocol_versions.max.prev();
1785 } else {
1786 break;
1787 }
1788 }
1789
1790 let binary_config = config.binary_config(None);
1791 let transaction = ConsensusTransaction::new_capability_notification_v2(
1792 AuthorityCapabilitiesV2::new(
1793 self.state.name,
1794 cur_epoch_store.get_chain_identifier().chain(),
1795 supported_protocol_versions,
1796 self.state
1797 .get_available_system_packages(&binary_config)
1798 .await,
1799 ),
1800 );
1801 info!(?transaction, "submitting capabilities to consensus");
1802 components.consensus_adapter.submit(
1803 transaction,
1804 None,
1805 &cur_epoch_store,
1806 None,
1807 None,
1808 )?;
1809 }
1810
1811 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1812
1813 if stop_condition == StopReason::RunWithRangeCondition {
1814 SuiNode::shutdown(&self).await;
1815 self.shutdown_channel_tx
1816 .send(run_with_range)
1817 .expect("RunWithRangeCondition met but failed to send shutdown message");
1818 return Ok(());
1819 }
1820
1821 let latest_system_state = self
1823 .state
1824 .get_object_cache_reader()
1825 .get_sui_system_state_object_unsafe()
1826 .expect("Read Sui System State object cannot fail");
1827
1828 #[cfg(msim)]
1829 if !self
1830 .sim_state
1831 .sim_safe_mode_expected
1832 .load(Ordering::Relaxed)
1833 {
1834 debug_assert!(!latest_system_state.safe_mode());
1835 }
1836
1837 #[cfg(not(msim))]
1838 debug_assert!(!latest_system_state.safe_mode());
1839
1840 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1841 && self.state.is_fullnode(&cur_epoch_store)
1842 {
1843 warn!(
1844 "Failed to send end of epoch notification to subscriber: {:?}",
1845 err
1846 );
1847 }
1848
1849 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1850 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1851
1852 self.auth_agg.store(Arc::new(
1853 self.auth_agg
1854 .load()
1855 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1856 ));
1857
1858 let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1859 let next_epoch = next_epoch_committee.epoch();
1860 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1861
1862 info!(
1863 next_epoch,
1864 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1865 );
1866
1867 fail_point_async!("reconfig_delay");
1868
1869 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1870
1871 update_peer_addresses(
1872 &self.config,
1873 &self.endpoint_manager,
1874 &new_epoch_start_state,
1875 Some(cur_epoch_store.epoch_start_state()),
1876 );
1877
1878 let mut validator_components_lock_guard = self.validator_components.lock().await;
1879
1880 let new_epoch_store = self
1884 .reconfigure_state(
1885 &self.state,
1886 &cur_epoch_store,
1887 next_epoch_committee.clone(),
1888 new_epoch_start_state,
1889 hasher.clone(),
1890 )
1891 .await;
1892
1893 let new_role = new_epoch_store.node_role();
1894
1895 let new_validator_components = if let Some(ValidatorComponents {
1896 validator_server_handle,
1897 validator_overload_monitor_handle,
1898 consensus_manager,
1899 consensus_store_pruner,
1900 consensus_adapter,
1901 checkpoint_metrics,
1902 sui_tx_validator_metrics,
1903 admission_queue,
1904 }) = validator_components_lock_guard.take()
1905 {
1906 info!("Reconfiguring node (was running consensus).");
1907
1908 consensus_manager.shutdown().await;
1909 info!("Consensus has shut down.");
1910
1911 info!("Epoch store finished reconfiguration.");
1912
1913 let global_state_hasher_metrics = Arc::into_inner(hasher)
1916 .expect("Object state hasher should have no other references at this point")
1917 .metrics();
1918 let new_hasher = Arc::new(GlobalStateHasher::new(
1919 self.state.get_global_state_hash_store().clone(),
1920 global_state_hasher_metrics,
1921 ));
1922 let weak_hasher = Arc::downgrade(&new_hasher);
1923 *hasher_guard = Some(new_hasher);
1924
1925 consensus_store_pruner.prune(next_epoch).await;
1926
1927 if new_role.runs_consensus() {
1928 info!("Restarting consensus as {new_role}");
1929 Some(
1930 Self::start_epoch_specific_validator_components(
1931 &self.config,
1932 self.state.clone(),
1933 consensus_adapter,
1934 self.checkpoint_store.clone(),
1935 new_epoch_store.clone(),
1936 self.state_sync_handle.clone(),
1937 self.randomness_handle.clone(),
1938 self.randomness_receiver_handle.clone(),
1939 consensus_manager,
1940 consensus_store_pruner,
1941 weak_hasher,
1942 self.backpressure_manager.clone(),
1943 validator_server_handle,
1944 validator_overload_monitor_handle,
1945 checkpoint_metrics,
1946 self.metrics.clone(),
1947 sui_tx_validator_metrics,
1948 admission_queue,
1949 new_role,
1950 )
1951 .await?,
1952 )
1953 } else {
1954 info!(
1955 "This node has new role {new_role} and no longer runs consensus after reconfiguration"
1956 );
1957 None
1958 }
1959 } else {
1960 let global_state_hasher_metrics = Arc::into_inner(hasher)
1963 .expect("Object state hasher should have no other references at this point")
1964 .metrics();
1965 let new_hasher = Arc::new(GlobalStateHasher::new(
1966 self.state.get_global_state_hash_store().clone(),
1967 global_state_hasher_metrics,
1968 ));
1969 let weak_hasher = Arc::downgrade(&new_hasher);
1970 *hasher_guard = Some(new_hasher);
1971
1972 if new_role.runs_consensus() {
1973 info!("Promoting node to {new_role}, starting consensus components");
1974
1975 let mut components = Self::construct_validator_components(
1976 self.config.clone(),
1977 self.state.clone(),
1978 Arc::new(next_epoch_committee.clone()),
1979 new_epoch_store.clone(),
1980 self.checkpoint_store.clone(),
1981 self.state_sync_handle.clone(),
1982 self.randomness_handle.clone(),
1983 weak_hasher,
1984 self.backpressure_manager.clone(),
1985 &self.registry_service,
1986 self.metrics.clone(),
1987 self.checkpoint_metrics.clone(),
1988 new_role,
1989 self.randomness_receiver_handle.clone(),
1990 )
1991 .await?;
1992
1993 if new_role.is_validator() {
1994 components.validator_server_handle = Some(
1995 components
1996 .validator_server_handle
1997 .take()
1998 .unwrap()
1999 .start()
2000 .await,
2001 );
2002
2003 self.endpoint_manager
2004 .set_consensus_address_updater(components.consensus_manager.clone());
2005 }
2006
2007 Some(components)
2008 } else {
2009 None
2010 }
2011 };
2012 *validator_components_lock_guard = new_validator_components;
2013
2014 cur_epoch_store.release_db_handles();
2017
2018 if cfg!(msim)
2019 && !matches!(
2020 self.config
2021 .authority_store_pruning_config
2022 .num_epochs_to_retain_for_checkpoints(),
2023 None | Some(u64::MAX) | Some(0)
2024 )
2025 {
2026 self.state
2027 .prune_checkpoints_for_eligible_epochs_for_testing(
2028 self.config.clone(),
2029 sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2030 )
2031 .await?;
2032 }
2033
2034 epoch_store = new_epoch_store;
2035 info!("Reconfiguration finished");
2036 }
2037 }
2038
2039 async fn shutdown(&self) {
2040 if let Some(validator_components) = &*self.validator_components.lock().await {
2041 validator_components.consensus_manager.shutdown().await;
2042 }
2043 }
2044
2045 async fn reconfigure_state(
2046 &self,
2047 state: &Arc<AuthorityState>,
2048 cur_epoch_store: &AuthorityPerEpochStore,
2049 next_epoch_committee: Committee,
2050 next_epoch_start_system_state: EpochStartSystemState,
2051 global_state_hasher: Arc<GlobalStateHasher>,
2052 ) -> Arc<AuthorityPerEpochStore> {
2053 let next_epoch = next_epoch_committee.epoch();
2054
2055 let last_checkpoint = self
2056 .checkpoint_store
2057 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2058 .expect("Error loading last checkpoint for current epoch")
2059 .expect("Could not load last checkpoint for current epoch");
2060
2061 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2062
2063 assert_eq!(
2064 Some(last_checkpoint_seq),
2065 self.checkpoint_store
2066 .get_highest_executed_checkpoint_seq_number()
2067 .expect("Error loading highest executed checkpoint sequence number")
2068 );
2069
2070 let epoch_start_configuration = EpochStartConfiguration::new(
2071 next_epoch_start_system_state,
2072 *last_checkpoint.digest(),
2073 state.get_object_store().as_ref(),
2074 EpochFlag::default_flags_for_new_epoch(&state.config),
2075 )
2076 .expect("EpochStartConfiguration construction cannot fail");
2077
2078 let new_epoch_store = self
2079 .state
2080 .reconfigure(
2081 cur_epoch_store,
2082 self.config.supported_protocol_versions.unwrap(),
2083 next_epoch_committee,
2084 epoch_start_configuration,
2085 global_state_hasher,
2086 &self.config.expensive_safety_check_config,
2087 last_checkpoint_seq,
2088 )
2089 .await
2090 .expect("Reconfigure authority state cannot fail");
2091 info!(next_epoch, "Node State has been reconfigured");
2092 assert_eq!(next_epoch, new_epoch_store.epoch());
2093 self.state.get_reconfig_api().update_epoch_flags_metrics(
2094 cur_epoch_store.epoch_start_config().flags(),
2095 new_epoch_store.epoch_start_config().flags(),
2096 );
2097
2098 new_epoch_store
2099 }
2100
2101 pub fn get_config(&self) -> &NodeConfig {
2102 &self.config
2103 }
2104
2105 pub fn randomness_handle(&self) -> randomness::Handle {
2106 self.randomness_handle.clone()
2107 }
2108
2109 pub fn state_sync_handle(&self) -> state_sync::Handle {
2110 self.state_sync_handle.clone()
2111 }
2112
2113 pub fn endpoint_manager(&self) -> &EndpointManager {
2114 &self.endpoint_manager
2115 }
2116
2117 fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2119 let digest_str = digest.to_string();
2120 if digest_str.len() >= 8 {
2121 digest_str[0..8].to_string()
2122 } else {
2123 digest_str
2124 }
2125 }
2126
2127 async fn check_and_recover_forks(
2131 checkpoint_store: &CheckpointStore,
2132 checkpoint_metrics: &CheckpointMetrics,
2133 fork_recovery: Option<&ForkRecoveryConfig>,
2134 ) -> Result<()> {
2135 if let Some(recovery) = fork_recovery {
2137 Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2138 Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2139 }
2140
2141 if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2142 .get_checkpoint_fork_detected()
2143 .map_err(|e| {
2144 error!("Failed to check for checkpoint fork: {:?}", e);
2145 e
2146 })?
2147 {
2148 Self::handle_checkpoint_fork(
2149 checkpoint_seq,
2150 checkpoint_digest,
2151 checkpoint_metrics,
2152 fork_recovery,
2153 )
2154 .await?;
2155 }
2156 if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2157 .get_transaction_fork_detected()
2158 .map_err(|e| {
2159 error!("Failed to check for transaction fork: {:?}", e);
2160 e
2161 })?
2162 {
2163 Self::handle_transaction_fork(
2164 tx_digest,
2165 expected_effects,
2166 actual_effects,
2167 checkpoint_metrics,
2168 fork_recovery,
2169 )
2170 .await?;
2171 }
2172
2173 Ok(())
2174 }
2175
2176 fn try_recover_checkpoint_fork(
2177 checkpoint_store: &CheckpointStore,
2178 recovery: &ForkRecoveryConfig,
2179 ) -> Result<()> {
2180 for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2183 let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2184 anyhow::bail!(
2185 "Invalid checkpoint digest override for seq {}: {}",
2186 seq,
2187 expected_digest_str
2188 );
2189 };
2190
2191 if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2192 let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2193 if local_digest != expected_digest {
2194 info!(
2195 seq,
2196 local = %Self::get_digest_prefix(local_digest),
2197 expected = %Self::get_digest_prefix(expected_digest),
2198 "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2199 seq
2200 );
2201 checkpoint_store
2202 .clear_locally_computed_checkpoints_from(*seq)
2203 .context(
2204 "Failed to clear locally computed checkpoints from override seq",
2205 )?;
2206 }
2207 }
2208 }
2209
2210 if let Some((checkpoint_seq, checkpoint_digest)) =
2211 checkpoint_store.get_checkpoint_fork_detected()?
2212 && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2213 {
2214 info!(
2215 "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2216 checkpoint_seq, checkpoint_digest
2217 );
2218 checkpoint_store
2219 .clear_checkpoint_fork_detected()
2220 .expect("Failed to clear checkpoint fork detected marker");
2221 }
2222 Ok(())
2223 }
2224
2225 fn try_recover_transaction_fork(
2226 checkpoint_store: &CheckpointStore,
2227 recovery: &ForkRecoveryConfig,
2228 ) -> Result<()> {
2229 if recovery.transaction_overrides.is_empty() {
2230 return Ok(());
2231 }
2232
2233 if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2234 && recovery
2235 .transaction_overrides
2236 .contains_key(&tx_digest.to_string())
2237 {
2238 info!(
2239 "Fork recovery enabled: clearing transaction fork for tx {:?}",
2240 tx_digest
2241 );
2242 checkpoint_store
2243 .clear_transaction_fork_detected()
2244 .expect("Failed to clear transaction fork detected marker");
2245 }
2246 Ok(())
2247 }
2248
2249 fn get_current_timestamp() -> u64 {
2250 std::time::SystemTime::now()
2251 .duration_since(std::time::SystemTime::UNIX_EPOCH)
2252 .unwrap()
2253 .as_secs()
2254 }
2255
2256 async fn handle_checkpoint_fork(
2257 checkpoint_seq: u64,
2258 checkpoint_digest: CheckpointDigest,
2259 checkpoint_metrics: &CheckpointMetrics,
2260 fork_recovery: Option<&ForkRecoveryConfig>,
2261 ) -> Result<()> {
2262 checkpoint_metrics
2263 .checkpoint_fork_crash_mode
2264 .with_label_values(&[
2265 &checkpoint_seq.to_string(),
2266 &Self::get_digest_prefix(checkpoint_digest),
2267 &Self::get_current_timestamp().to_string(),
2268 ])
2269 .set(1);
2270
2271 let behavior = fork_recovery
2272 .map(|fr| fr.fork_crash_behavior)
2273 .unwrap_or_default();
2274
2275 match behavior {
2276 ForkCrashBehavior::AwaitForkRecovery => {
2277 error!(
2278 checkpoint_seq = checkpoint_seq,
2279 checkpoint_digest = ?checkpoint_digest,
2280 "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2281 );
2282 futures::future::pending::<()>().await;
2283 unreachable!("pending() should never return");
2284 }
2285 ForkCrashBehavior::ReturnError => {
2286 error!(
2287 checkpoint_seq = checkpoint_seq,
2288 checkpoint_digest = ?checkpoint_digest,
2289 "Checkpoint fork detected! Returning error."
2290 );
2291 Err(anyhow::anyhow!(
2292 "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2293 checkpoint_seq,
2294 checkpoint_digest
2295 ))
2296 }
2297 }
2298 }
2299
2300 async fn handle_transaction_fork(
2301 tx_digest: TransactionDigest,
2302 expected_effects_digest: TransactionEffectsDigest,
2303 actual_effects_digest: TransactionEffectsDigest,
2304 checkpoint_metrics: &CheckpointMetrics,
2305 fork_recovery: Option<&ForkRecoveryConfig>,
2306 ) -> Result<()> {
2307 checkpoint_metrics
2308 .transaction_fork_crash_mode
2309 .with_label_values(&[
2310 &Self::get_digest_prefix(tx_digest),
2311 &Self::get_digest_prefix(expected_effects_digest),
2312 &Self::get_digest_prefix(actual_effects_digest),
2313 &Self::get_current_timestamp().to_string(),
2314 ])
2315 .set(1);
2316
2317 let behavior = fork_recovery
2318 .map(|fr| fr.fork_crash_behavior)
2319 .unwrap_or_default();
2320
2321 match behavior {
2322 ForkCrashBehavior::AwaitForkRecovery => {
2323 error!(
2324 tx_digest = ?tx_digest,
2325 expected_effects_digest = ?expected_effects_digest,
2326 actual_effects_digest = ?actual_effects_digest,
2327 "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2328 );
2329 futures::future::pending::<()>().await;
2330 unreachable!("pending() should never return");
2331 }
2332 ForkCrashBehavior::ReturnError => {
2333 error!(
2334 tx_digest = ?tx_digest,
2335 expected_effects_digest = ?expected_effects_digest,
2336 actual_effects_digest = ?actual_effects_digest,
2337 "Transaction fork detected! Returning error."
2338 );
2339 Err(anyhow::anyhow!(
2340 "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2341 tx_digest,
2342 expected_effects_digest,
2343 actual_effects_digest
2344 ))
2345 }
2346 }
2347 }
2348}
2349
2350#[cfg(not(msim))]
2351impl SuiNode {
2352 async fn fetch_jwks(
2353 _authority: AuthorityName,
2354 provider: &OIDCProvider,
2355 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2356 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2357 use sui_types::error::SuiErrorKind;
2358 let client = reqwest::Client::new();
2359 fetch_jwks(provider, &client, true)
2360 .await
2361 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2362 }
2363}
2364
2365#[cfg(msim)]
2366impl SuiNode {
2367 pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2368 self.sim_state.sim_node.id()
2369 }
2370
2371 pub fn set_safe_mode_expected(&self, new_value: bool) {
2372 info!("Setting safe mode expected to {}", new_value);
2373 self.sim_state
2374 .sim_safe_mode_expected
2375 .store(new_value, Ordering::Relaxed);
2376 }
2377
2378 #[allow(unused_variables)]
2379 async fn fetch_jwks(
2380 authority: AuthorityName,
2381 provider: &OIDCProvider,
2382 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2383 get_jwk_injector()(authority, provider)
2384 }
2385}
2386
2387enum SpawnOnce {
2388 Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2390 #[allow(unused)]
2391 Started(JoinHandle<()>),
2392}
2393
2394impl SpawnOnce {
2395 pub fn new(
2396 ready_rx: oneshot::Receiver<()>,
2397 future: impl Future<Output = ()> + Send + 'static,
2398 ) -> Self {
2399 Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2400 }
2401
2402 pub async fn start(self) -> Self {
2403 match self {
2404 Self::Unstarted(ready_rx, future) => {
2405 let future = future.into_inner();
2406 let handle = tokio::spawn(future);
2407 ready_rx.await.unwrap();
2408 Self::Started(handle)
2409 }
2410 Self::Started(_) => self,
2411 }
2412 }
2413}
2414
2415fn update_peer_addresses(
2419 config: &NodeConfig,
2420 endpoint_manager: &EndpointManager,
2421 epoch_start_state: &EpochStartSystemState,
2422 prev_epoch_start_state: Option<&EpochStartSystemState>,
2423) {
2424 if config.consensus_config().is_none() {
2425 return;
2426 }
2427 let new_peers: HashSet<PeerId> = epoch_start_state
2428 .get_validator_as_p2p_peers(config.protocol_public_key())
2429 .into_iter()
2430 .map(|(peer_id, address)| {
2431 endpoint_manager
2432 .update_endpoint(
2433 EndpointId::P2p(peer_id),
2434 AddressSource::Chain,
2435 vec![address],
2436 )
2437 .expect("Updating peer addresses should not fail");
2438 peer_id
2439 })
2440 .collect();
2441
2442 if let Some(prev) = prev_epoch_start_state {
2444 for (peer_id, _) in prev.get_validator_as_p2p_peers(config.protocol_public_key()) {
2445 if !new_peers.contains(&peer_id) {
2446 endpoint_manager
2447 .update_endpoint(EndpointId::P2p(peer_id), AddressSource::Chain, vec![])
2448 .expect("Clearing peer addresses should not fail");
2449 }
2450 }
2451 }
2452}
2453
2454fn build_kv_store(
2455 state: &Arc<AuthorityState>,
2456 config: &NodeConfig,
2457 registry: &Registry,
2458) -> Result<Arc<TransactionKeyValueStore>> {
2459 let metrics = KeyValueStoreMetrics::new(registry);
2460 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2461
2462 let base_url = &config.transaction_kv_store_read_config.base_url;
2463
2464 if base_url.is_empty() {
2465 info!("no http kv store url provided, using local db only");
2466 return Ok(Arc::new(db_store));
2467 }
2468
2469 let base_url: url::Url = base_url.parse().tap_err(|e| {
2470 error!(
2471 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2472 base_url, e
2473 )
2474 })?;
2475
2476 let network_str = match state.get_chain_identifier().chain() {
2477 Chain::Mainnet => "/mainnet",
2478 _ => {
2479 info!("using local db only for kv store");
2480 return Ok(Arc::new(db_store));
2481 }
2482 };
2483
2484 let base_url = base_url.join(network_str)?.to_string();
2485 let http_store = HttpKVStore::new_kv(
2486 &base_url,
2487 config.transaction_kv_store_read_config.cache_size,
2488 metrics.clone(),
2489 )?;
2490 info!("using local key-value store with fallback to http key-value store");
2491 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2492 db_store,
2493 http_store,
2494 metrics,
2495 "json_rpc_fallback",
2496 )))
2497}
2498
2499async fn build_http_servers(
2500 state: Arc<AuthorityState>,
2501 store: RocksDbStore,
2502 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2503 config: &NodeConfig,
2504 prometheus_registry: &Registry,
2505 server_version: ServerVersion,
2506 node_role: NodeRole,
2507) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2508 if !node_role.should_run_rpc_servers() {
2510 return Ok((HttpServers::default(), None));
2511 }
2512
2513 info!("starting rpc service with config: {:?}", config.rpc);
2514
2515 let mut router = axum::Router::new();
2516
2517 let json_rpc_router = {
2518 let traffic_controller = state.traffic_controller.clone();
2519 let mut server = JsonRpcServerBuilder::new(
2520 env!("CARGO_PKG_VERSION"),
2521 prometheus_registry,
2522 traffic_controller,
2523 config.policy_config.clone(),
2524 );
2525
2526 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2527
2528 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2529 server.register_module(ReadApi::new(
2530 state.clone(),
2531 kv_store.clone(),
2532 metrics.clone(),
2533 ))?;
2534 server.register_module(CoinReadApi::new(
2535 state.clone(),
2536 kv_store.clone(),
2537 metrics.clone(),
2538 ))?;
2539
2540 if config.run_with_range.is_none() {
2543 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2544 }
2545 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2546 server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2547
2548 if let Some(transaction_orchestrator) = transaction_orchestrator {
2549 server.register_module(TransactionExecutionApi::new(
2550 state.clone(),
2551 transaction_orchestrator.clone(),
2552 metrics.clone(),
2553 ))?;
2554 }
2555
2556 let name_service_config =
2557 if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2558 config.name_service_package_address,
2559 config.name_service_registry_id,
2560 config.name_service_reverse_registry_id,
2561 ) {
2562 sui_name_service::NameServiceConfig::new(
2563 package_address,
2564 registry_id,
2565 reverse_registry_id,
2566 )
2567 } else {
2568 match state.get_chain_identifier().chain() {
2569 Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2570 Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2571 Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2572 }
2573 };
2574
2575 server.register_module(IndexerApi::new(
2576 state.clone(),
2577 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2578 kv_store,
2579 name_service_config,
2580 metrics,
2581 config.indexer_max_subscriptions,
2582 ))?;
2583 server.register_module(MoveUtils::new(state.clone()))?;
2584
2585 let server_type = config.jsonrpc_server_type();
2586
2587 server.to_router(server_type).await?
2588 };
2589
2590 router = router.merge(json_rpc_router);
2591
2592 let (subscription_service_checkpoint_sender, subscription_service_handle) =
2593 SubscriptionService::build(prometheus_registry);
2594 let rpc_router = {
2595 let mut rpc_service =
2596 sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2597 rpc_service.with_server_version(server_version);
2598
2599 if let Some(config) = config.rpc.clone() {
2600 config.validate()?;
2601 rpc_service.with_config(config);
2602 }
2603
2604 rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2605 rpc_service.with_subscription_service(subscription_service_handle);
2606
2607 if let Some(transaction_orchestrator) = transaction_orchestrator {
2608 rpc_service.with_executor(transaction_orchestrator.clone())
2609 }
2610
2611 rpc_service.into_router().await
2612 };
2613
2614 let layers = ServiceBuilder::new()
2615 .map_request(|mut request: axum::http::Request<_>| {
2616 if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2617 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2618 request.extensions_mut().insert(axum_connect_info);
2619 }
2620 request
2621 })
2622 .layer(axum::middleware::from_fn(server_timing_middleware))
2623 .layer(
2625 tower_http::cors::CorsLayer::new()
2626 .allow_methods([http::Method::GET, http::Method::POST])
2627 .allow_origin(tower_http::cors::Any)
2628 .allow_headers(tower_http::cors::Any)
2629 .expose_headers(tower_http::cors::Any),
2630 );
2631
2632 router = router.merge(rpc_router).layer(layers);
2633
2634 let https = if let Some((tls_config, https_address)) = config
2635 .rpc()
2636 .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2637 {
2638 let https = sui_http::Builder::new()
2639 .tls_single_cert(tls_config.cert(), tls_config.key())
2640 .and_then(|builder| builder.serve(https_address, router.clone()))
2641 .map_err(|e| anyhow::anyhow!(e))?;
2642
2643 info!(
2644 https_address =? https.local_addr(),
2645 "HTTPS rpc server listening on {}",
2646 https.local_addr()
2647 );
2648
2649 Some(https)
2650 } else {
2651 None
2652 };
2653
2654 let http = sui_http::Builder::new()
2655 .serve(&config.json_rpc_address, router)
2656 .map_err(|e| anyhow::anyhow!(e))?;
2657
2658 info!(
2659 http_address =? http.local_addr(),
2660 "HTTP rpc server listening on {}",
2661 http.local_addr()
2662 );
2663
2664 Ok((
2665 HttpServers {
2666 http: Some(http),
2667 https,
2668 },
2669 Some(subscription_service_checkpoint_sender),
2670 ))
2671}
2672
2673#[cfg(not(test))]
2674fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2675 protocol_config.max_transactions_per_checkpoint() as usize
2676}
2677
2678#[cfg(test)]
2679fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2680 2
2681}
2682
2683#[derive(Default)]
2684struct HttpServers {
2685 #[allow(unused)]
2686 http: Option<sui_http::ServerHandle>,
2687 #[allow(unused)]
2688 https: Option<sui_http::ServerHandle>,
2689}
2690
2691#[cfg(test)]
2692mod tests {
2693 use super::*;
2694 use prometheus::Registry;
2695 use std::collections::BTreeMap;
2696 use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2697 use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2698 use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2699
2700 #[tokio::test]
2701 async fn test_fork_error_and_recovery_both_paths() {
2702 let checkpoint_store = CheckpointStore::new_for_tests();
2703 let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2704
2705 let seq_num = 42;
2707 let digest = CheckpointDigest::random();
2708 checkpoint_store
2709 .record_checkpoint_fork_detected(seq_num, digest)
2710 .unwrap();
2711
2712 let fork_recovery = ForkRecoveryConfig {
2713 transaction_overrides: Default::default(),
2714 checkpoint_overrides: Default::default(),
2715 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2716 };
2717
2718 let r = SuiNode::check_and_recover_forks(
2719 &checkpoint_store,
2720 &checkpoint_metrics,
2721 Some(&fork_recovery),
2722 )
2723 .await;
2724 assert!(r.is_err());
2725 assert!(
2726 r.unwrap_err()
2727 .to_string()
2728 .contains("Checkpoint fork detected")
2729 );
2730
2731 let mut checkpoint_overrides = BTreeMap::new();
2732 checkpoint_overrides.insert(seq_num, digest.to_string());
2733 let fork_recovery_with_override = ForkRecoveryConfig {
2734 transaction_overrides: Default::default(),
2735 checkpoint_overrides,
2736 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2737 };
2738 let r = SuiNode::check_and_recover_forks(
2739 &checkpoint_store,
2740 &checkpoint_metrics,
2741 Some(&fork_recovery_with_override),
2742 )
2743 .await;
2744 assert!(r.is_ok());
2745 assert!(
2746 checkpoint_store
2747 .get_checkpoint_fork_detected()
2748 .unwrap()
2749 .is_none()
2750 );
2751
2752 let tx_digest = TransactionDigest::random();
2754 let expected_effects = TransactionEffectsDigest::random();
2755 let actual_effects = TransactionEffectsDigest::random();
2756 checkpoint_store
2757 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2758 .unwrap();
2759
2760 let fork_recovery = ForkRecoveryConfig {
2761 transaction_overrides: Default::default(),
2762 checkpoint_overrides: Default::default(),
2763 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2764 };
2765 let r = SuiNode::check_and_recover_forks(
2766 &checkpoint_store,
2767 &checkpoint_metrics,
2768 Some(&fork_recovery),
2769 )
2770 .await;
2771 assert!(r.is_err());
2772 assert!(
2773 r.unwrap_err()
2774 .to_string()
2775 .contains("Transaction fork detected")
2776 );
2777
2778 let mut transaction_overrides = BTreeMap::new();
2779 transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2780 let fork_recovery_with_override = ForkRecoveryConfig {
2781 transaction_overrides,
2782 checkpoint_overrides: Default::default(),
2783 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2784 };
2785 let r = SuiNode::check_and_recover_forks(
2786 &checkpoint_store,
2787 &checkpoint_metrics,
2788 Some(&fork_recovery_with_override),
2789 )
2790 .await;
2791 assert!(r.is_ok());
2792 assert!(
2793 checkpoint_store
2794 .get_transaction_fork_detected()
2795 .unwrap()
2796 .is_none()
2797 );
2798 }
2799}