1use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::admission_queue::{
29 AdmissionQueueContext, AdmissionQueueManager, AdmissionQueueMetrics,
30};
31use sui_core::authority::ExecutionEnv;
32use sui_core::authority::RandomnessRoundReceiver;
33use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
34use sui_core::authority::backpressure::BackpressureManager;
35use sui_core::authority::epoch_start_configuration::EpochFlag;
36use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
37use sui_core::consensus_adapter::ConsensusClient;
38use sui_core::consensus_manager::UpdatableConsensusClient;
39use sui_core::epoch::randomness::RandomnessManager;
40use sui_core::execution_cache::build_execution_cache;
41use sui_network::endpoint_manager::{AddressSource, EndpointId};
42use sui_network::validator::server::SUI_TLS_SERVER_NAME;
43use sui_types::full_checkpoint_content::Checkpoint;
44use sui_types::node_role::NodeRole;
45
46use sui_core::global_state_hasher::GlobalStateHashMetrics;
47use sui_core::storage::RestReadStore;
48use sui_json_rpc::bridge_api::BridgeReadApi;
49use sui_json_rpc_api::JsonRpcMetrics;
50use sui_network::randomness;
51use sui_rpc_api::RpcMetrics;
52use sui_rpc_api::ServerVersion;
53use sui_rpc_api::subscription::SubscriptionService;
54use sui_types::base_types::ConciseableName;
55use sui_types::crypto::RandomnessRound;
56use sui_types::digests::{
57 ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
58};
59use sui_types::messages_consensus::AuthorityCapabilitiesV2;
60use sui_types::sui_system_state::SuiSystemState;
61use tap::tap::TapFallible;
62use tokio::sync::oneshot;
63use tokio::sync::{Mutex, broadcast, mpsc};
64use tokio::task::JoinHandle;
65use tower::ServiceBuilder;
66use tracing::{Instrument, error_span, info};
67use tracing::{debug, error, warn};
68
69macro_rules! jwk_log {
73 ($($arg:tt)+) => {
74 if in_test_configuration() {
75 debug!($($arg)+);
76 } else {
77 info!($($arg)+);
78 }
79 };
80}
81
82use fastcrypto_zkp::bn254::zk_login::JWK;
83pub use handle::SuiNodeHandle;
84use mysten_metrics::{RegistryService, spawn_monitored_task};
85use mysten_service::server_timing::server_timing_middleware;
86use sui_config::node::{DBCheckpointConfig, RunWithRange};
87use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
88use sui_config::node_config_metrics::NodeConfigMetrics;
89use sui_config::{ConsensusConfig, NodeConfig};
90use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
91use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
92use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
93use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
94use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
95use sui_core::authority_aggregator::AuthorityAggregator;
96use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
97use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
98use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
99use sui_core::checkpoints::{
100 CheckpointMetrics, CheckpointOutput, CheckpointService, CheckpointStore, LogCheckpointOutput,
101 SendCheckpointToStateSync, SubmitCheckpointToConsensus,
102};
103use sui_core::consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics};
104use sui_core::consensus_manager::ConsensusManager;
105use sui_core::consensus_throughput_calculator::ConsensusThroughputCalculator;
106use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
107use sui_core::db_checkpoint_handler::DBCheckpointHandler;
108use sui_core::epoch::committee_store::CommitteeStore;
109use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
110use sui_core::epoch::epoch_metrics::EpochMetrics;
111use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
112use sui_core::global_state_hasher::GlobalStateHasher;
113use sui_core::jsonrpc_index::IndexStore;
114use sui_core::module_cache_metrics::ResolverMetrics;
115use sui_core::overload_monitor::overload_monitor;
116use sui_core::rpc_index::RpcIndexStore;
117use sui_core::signature_verifier::SignatureVerifierMetrics;
118use sui_core::storage::RocksDbStore;
119use sui_core::transaction_orchestrator::TransactionOrchestrator;
120use sui_core::{
121 authority::{AuthorityState, AuthorityStore},
122 authority_client::NetworkAuthorityClient,
123};
124use sui_json_rpc::JsonRpcServerBuilder;
125use sui_json_rpc::coin_api::CoinReadApi;
126use sui_json_rpc::governance_api::GovernanceReadApi;
127use sui_json_rpc::indexer_api::IndexerApi;
128use sui_json_rpc::move_utils::MoveUtils;
129use sui_json_rpc::read_api::ReadApi;
130use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
131use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
132use sui_macros::fail_point;
133use sui_macros::{fail_point_async, replay_log};
134use sui_network::api::ValidatorServer;
135use sui_network::discovery;
136use sui_network::endpoint_manager::EndpointManager;
137use sui_network::state_sync;
138use sui_network::validator::server::ServerBuilder;
139use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
140use sui_snapshot::uploader::StateSnapshotUploader;
141use sui_storage::{
142 http_key_value_store::HttpKVStore,
143 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
144 key_value_store_metrics::KeyValueStoreMetrics,
145};
146use sui_types::base_types::{AuthorityName, EpochId};
147use sui_types::committee::Committee;
148use sui_types::crypto::KeypairTraits;
149use sui_types::error::{SuiError, SuiResult};
150use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
151use sui_types::sui_system_state::SuiSystemStateTrait;
152use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
153use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
154use sui_types::supported_protocol_versions::SupportedProtocolVersions;
155use typed_store::DBMetrics;
156use typed_store::rocks::default_db_options;
157
158use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
159
160pub mod admin;
161mod handle;
162pub mod metrics;
163
164pub struct ValidatorComponents {
165 validator_server_handle: Option<SpawnOnce>,
166 validator_overload_monitor_handle: Option<JoinHandle<()>>,
167 consensus_manager: Arc<ConsensusManager>,
168 consensus_store_pruner: ConsensusStorePruner,
169 consensus_adapter: Arc<ConsensusAdapter>,
170 checkpoint_metrics: Arc<CheckpointMetrics>,
171 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
172 admission_queue: Option<AdmissionQueueContext>,
173}
174pub struct P2pComponents {
175 p2p_network: Network,
176 known_peers: HashMap<PeerId, String>,
177 discovery_handle: discovery::Handle,
178 state_sync_handle: state_sync::Handle,
179 randomness_handle: randomness::Handle,
180 endpoint_manager: EndpointManager,
181}
182
183#[cfg(msim)]
184mod simulator {
185 use std::sync::atomic::AtomicBool;
186 use sui_types::error::SuiErrorKind;
187
188 use super::*;
189 pub(super) struct SimState {
190 pub sim_node: sui_simulator::runtime::NodeHandle,
191 pub sim_safe_mode_expected: AtomicBool,
192 _leak_detector: sui_simulator::NodeLeakDetector,
193 }
194
195 impl Default for SimState {
196 fn default() -> Self {
197 Self {
198 sim_node: sui_simulator::runtime::NodeHandle::current(),
199 sim_safe_mode_expected: AtomicBool::new(false),
200 _leak_detector: sui_simulator::NodeLeakDetector::new(),
201 }
202 }
203 }
204
205 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
206 + Send
207 + Sync
208 + 'static;
209
210 fn default_fetch_jwks(
211 _authority: AuthorityName,
212 _provider: &OIDCProvider,
213 ) -> SuiResult<Vec<(JwkId, JWK)>> {
214 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
215 parse_jwks(
217 sui_types::zk_login_util::DEFAULT_JWK_BYTES,
218 &OIDCProvider::Twitch,
219 true,
220 )
221 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
222 }
223
224 thread_local! {
225 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
226 }
227
228 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
229 JWK_INJECTOR.with(|injector| injector.borrow().clone())
230 }
231
232 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
233 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
234 }
235}
236
237#[cfg(msim)]
238pub use simulator::set_jwk_injector;
239#[cfg(msim)]
240use simulator::*;
241use sui_core::authority::authority_store_pruner::PrunerWatermarks;
242use sui_core::{
243 consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
244};
245
246const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
247
248pub struct SuiNode {
249 config: NodeConfig,
250 validator_components: Mutex<Option<ValidatorComponents>>,
251
252 #[allow(unused)]
254 http_servers: HttpServers,
255
256 state: Arc<AuthorityState>,
257 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
258 registry_service: RegistryService,
259 metrics: Arc<SuiNodeMetrics>,
260 checkpoint_metrics: Arc<CheckpointMetrics>,
261
262 _discovery: discovery::Handle,
263 _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
264 state_sync_handle: state_sync::Handle,
265 randomness_handle: randomness::Handle,
266 checkpoint_store: Arc<CheckpointStore>,
267 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
268
269 end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
271
272 endpoint_manager: EndpointManager,
274
275 backpressure_manager: Arc<BackpressureManager>,
276
277 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
278
279 #[cfg(msim)]
280 sim_state: SimState,
281
282 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
283 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
285
286 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
291
292 subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
293}
294
295impl fmt::Debug for SuiNode {
296 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
297 f.debug_struct("SuiNode")
298 .field("name", &self.state.name.concise())
299 .finish()
300 }
301}
302
303static MAX_JWK_KEYS_PER_FETCH: usize = 100;
304
305impl SuiNode {
306 pub async fn start(
307 config: NodeConfig,
308 registry_service: RegistryService,
309 ) -> Result<Arc<SuiNode>> {
310 Self::start_async(
311 config,
312 registry_service,
313 ServerVersion::new("sui-node", "unknown"),
314 )
315 .await
316 }
317
318 fn start_jwk_updater(
319 config: &NodeConfig,
320 metrics: Arc<SuiNodeMetrics>,
321 authority: AuthorityName,
322 epoch_store: Arc<AuthorityPerEpochStore>,
323 consensus_adapter: Arc<ConsensusAdapter>,
324 ) {
325 let epoch = epoch_store.epoch();
326
327 let supported_providers = config
328 .zklogin_oauth_providers
329 .get(&epoch_store.get_chain_identifier().chain())
330 .unwrap_or(&BTreeSet::new())
331 .iter()
332 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
333 .collect::<Vec<_>>();
334
335 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
336
337 info!(
338 ?fetch_interval,
339 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
340 );
341
342 fn validate_jwk(
343 metrics: &Arc<SuiNodeMetrics>,
344 provider: &OIDCProvider,
345 id: &JwkId,
346 jwk: &JWK,
347 ) -> bool {
348 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
349 warn!(
350 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
351 id.iss, provider
352 );
353 metrics
354 .invalid_jwks
355 .with_label_values(&[&provider.to_string()])
356 .inc();
357 return false;
358 };
359
360 if iss_provider != *provider {
361 warn!(
362 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
363 id.iss, provider, iss_provider
364 );
365 metrics
366 .invalid_jwks
367 .with_label_values(&[&provider.to_string()])
368 .inc();
369 return false;
370 }
371
372 if !check_total_jwk_size(id, jwk) {
373 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
374 metrics
375 .invalid_jwks
376 .with_label_values(&[&provider.to_string()])
377 .inc();
378 return false;
379 }
380
381 true
382 }
383
384 for p in supported_providers.into_iter() {
393 let provider_str = p.to_string();
394 let epoch_store = epoch_store.clone();
395 let consensus_adapter = consensus_adapter.clone();
396 let metrics = metrics.clone();
397 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
398 async move {
399 let mut seen = HashSet::new();
402 loop {
403 jwk_log!("fetching JWK for provider {:?}", p);
404 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
405 match Self::fetch_jwks(authority, &p).await {
406 Err(e) => {
407 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
408 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
409 tokio::time::sleep(Duration::from_secs(30)).await;
411 continue;
412 }
413 Ok(mut keys) => {
414 metrics.total_jwks
415 .with_label_values(&[&provider_str])
416 .inc_by(keys.len() as u64);
417
418 keys.retain(|(id, jwk)| {
419 validate_jwk(&metrics, &p, id, jwk) &&
420 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
421 seen.insert((id.clone(), jwk.clone()))
422 });
423
424 metrics.unique_jwks
425 .with_label_values(&[&provider_str])
426 .inc_by(keys.len() as u64);
427
428 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
431 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
432 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
433 }
434
435 for (id, jwk) in keys.into_iter() {
436 jwk_log!("Submitting JWK to consensus: {:?}", id);
437
438 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
439 consensus_adapter.submit(txn, None, &epoch_store, None, None)
440 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
441 .ok();
442 }
443 }
444 }
445 tokio::time::sleep(fetch_interval).await;
446 }
447 }
448 .instrument(error_span!("jwk_updater_task", epoch)),
449 ));
450 }
451 }
452
453 pub async fn start_async(
454 config: NodeConfig,
455 registry_service: RegistryService,
456 server_version: ServerVersion,
457 ) -> Result<Arc<SuiNode>> {
458 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
459 let mut config = config.clone();
460 if config.supported_protocol_versions.is_none() {
461 info!(
462 "populating config.supported_protocol_versions with default {:?}",
463 SupportedProtocolVersions::SYSTEM_DEFAULT
464 );
465 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
466 }
467
468 let run_with_range = config.run_with_range;
469 let prometheus_registry = registry_service.default_registry();
470 let node_role = config.intended_node_role();
471
472 info!(node =? config.protocol_public_key(),
473 "Initializing sui-node listening on {} with role {:?}", config.network_address, node_role
474 );
475
476 DBMetrics::init(registry_service.clone());
478
479 typed_store::init_write_sync(config.enable_db_sync_to_disk);
481
482 mysten_metrics::init_metrics(&prometheus_registry);
484 #[cfg(not(msim))]
486 mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
487
488 let genesis = config.genesis()?.clone();
489
490 let secret = Arc::pin(config.protocol_key_pair().copy());
491 let genesis_committee = genesis.committee();
492 let committee_store = Arc::new(CommitteeStore::new(
493 config.db_path().join("epochs"),
494 &genesis_committee,
495 None,
496 ));
497
498 let pruner_watermarks = Arc::new(PrunerWatermarks::default());
499 let checkpoint_store = CheckpointStore::new(
500 &config.db_path().join("checkpoints"),
501 pruner_watermarks.clone(),
502 );
503 let checkpoint_metrics = CheckpointMetrics::new(®istry_service.default_registry());
504
505 if node_role.runs_consensus() {
506 Self::check_and_recover_forks(
507 &checkpoint_store,
508 &checkpoint_metrics,
509 config.fork_recovery.as_ref(),
510 )
511 .await?;
512 }
513
514 let enable_write_stall = config
516 .enable_db_write_stall
517 .unwrap_or(node_role.runs_consensus());
518 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
519 enable_write_stall,
520 is_validator: node_role.is_validator(),
521 };
522 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
523 &config.db_store_path(),
524 Some(perpetual_tables_options),
525 Some(pruner_watermarks.epoch_id.clone()),
526 ));
527 let is_genesis = perpetual_tables
528 .database_is_empty()
529 .expect("Database read should not fail at init.");
530
531 let backpressure_manager =
532 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
533
534 let store =
535 AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
536
537 let cur_epoch = store.get_recovery_epoch_at_restart()?;
538 let committee = committee_store
539 .get_committee(&cur_epoch)?
540 .expect("Committee of the current epoch must exist");
541 let epoch_start_configuration = store
542 .get_epoch_start_configuration()?
543 .expect("EpochStartConfiguration of the current epoch must exist");
544 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
545 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
546
547 let cache_traits = build_execution_cache(
548 &config.execution_cache,
549 &prometheus_registry,
550 &store,
551 backpressure_manager.clone(),
552 );
553
554 let auth_agg = {
555 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
556 Arc::new(ArcSwap::new(Arc::new(
557 AuthorityAggregator::new_from_epoch_start_state(
558 epoch_start_configuration.epoch_start_state(),
559 &committee_store,
560 safe_client_metrics_base,
561 ),
562 )))
563 };
564
565 let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
566 let chain = match config.chain_override_for_testing {
567 Some(chain) => chain,
568 None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
569 };
570
571 let highest_executed_checkpoint = checkpoint_store
572 .get_highest_executed_checkpoint_seq_number()
573 .expect("checkpoint store read cannot fail")
574 .unwrap_or(0);
575
576 let previous_epoch_last_checkpoint = if cur_epoch == 0 {
577 0
578 } else {
579 checkpoint_store
580 .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
581 .expect("checkpoint store read cannot fail")
582 .unwrap_or(highest_executed_checkpoint)
583 };
584
585 let epoch_options = default_db_options().optimize_db_for_write_throughput(4, false);
586 let epoch_store = AuthorityPerEpochStore::new(
587 config.protocol_public_key(),
588 committee.clone(),
589 &config.db_store_path(),
590 Some(epoch_options.options),
591 EpochMetrics::new(®istry_service.default_registry()),
592 epoch_start_configuration,
593 cache_traits.backing_package_store.clone(),
594 cache_traits.object_store.clone(),
595 cache_metrics,
596 signature_verifier_metrics,
597 &config.expensive_safety_check_config,
598 (chain_id, chain),
599 highest_executed_checkpoint,
600 previous_epoch_last_checkpoint,
601 Arc::new(SubmittedTransactionCacheMetrics::new(
602 ®istry_service.default_registry(),
603 )),
604 config.fullnode_sync_mode,
605 )?;
606
607 info!("created epoch store");
608
609 replay_log!(
610 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
611 epoch_store.epoch(),
612 epoch_store.protocol_config()
613 );
614
615 if is_genesis {
617 info!("checking SUI conservation at genesis");
618 cache_traits
623 .reconfig_api
624 .expensive_check_sui_conservation(&epoch_store)
625 .expect("SUI conservation check cannot fail at genesis");
626 }
627
628 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
629 let default_buffer_stake = epoch_store
630 .protocol_config()
631 .buffer_stake_for_protocol_upgrade_bps();
632 if effective_buffer_stake != default_buffer_stake {
633 warn!(
634 ?effective_buffer_stake,
635 ?default_buffer_stake,
636 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
637 );
638 }
639
640 checkpoint_store.insert_genesis_checkpoint(
641 genesis.checkpoint(),
642 genesis.checkpoint_contents().clone(),
643 &epoch_store,
644 );
645
646 info!("creating state sync store");
647 let state_sync_store = RocksDbStore::new(
648 cache_traits.clone(),
649 committee_store.clone(),
650 checkpoint_store.clone(),
651 );
652
653 let index_store =
654 if node_role.should_enable_index_processing() && config.enable_index_processing {
655 info!("creating jsonrpc index store");
656 Some(Arc::new(IndexStore::new(
657 config.db_path().join("indexes"),
658 &prometheus_registry,
659 epoch_store
660 .protocol_config()
661 .max_move_identifier_len_as_option(),
662 config.remove_deprecated_tables,
663 )))
664 } else {
665 None
666 };
667
668 let rpc_index = if node_role.should_enable_index_processing()
669 && config.rpc().is_some_and(|rpc| rpc.enable_indexing())
670 {
671 info!("creating rpc index store");
672 Some(Arc::new(
673 RpcIndexStore::new(
674 &config.db_path(),
675 &store,
676 &checkpoint_store,
677 &epoch_store,
678 &cache_traits.backing_package_store,
679 pruner_watermarks.checkpoint_id.clone(),
680 config.rpc().cloned().unwrap_or_default(),
681 )
682 .await,
683 ))
684 } else {
685 None
686 };
687
688 let chain_identifier = epoch_store.get_chain_identifier();
689
690 info!("creating archive reader");
691 let (randomness_tx, randomness_rx) = mpsc::channel(
693 config
694 .p2p_config
695 .randomness
696 .clone()
697 .unwrap_or_default()
698 .mailbox_capacity(),
699 );
700 let P2pComponents {
701 p2p_network,
702 known_peers,
703 discovery_handle,
704 state_sync_handle,
705 randomness_handle,
706 endpoint_manager,
707 } = Self::create_p2p_network(
708 &config,
709 state_sync_store.clone(),
710 chain_identifier,
711 randomness_tx,
712 &prometheus_registry,
713 )?;
714
715 for peer in &config.p2p_config.peer_address_overrides {
717 endpoint_manager
718 .update_endpoint(
719 EndpointId::P2p(peer.peer_id),
720 AddressSource::Config,
721 peer.addresses.clone(),
722 )
723 .expect("Updating peer address overrides should not fail");
724 }
725
726 update_peer_addresses(
728 &config,
729 &endpoint_manager,
730 epoch_store.epoch_start_state(),
731 None,
732 );
733
734 info!("start snapshot upload");
735 let state_snapshot_handle = Self::start_state_snapshot(
737 &config,
738 &prometheus_registry,
739 checkpoint_store.clone(),
740 chain_identifier,
741 )?;
742
743 info!("start db checkpoint");
745 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
746 &config,
747 &prometheus_registry,
748 state_snapshot_handle.is_some(),
749 )?;
750
751 if !epoch_store
752 .protocol_config()
753 .simplified_unwrap_then_delete()
754 {
755 config
757 .authority_store_pruning_config
758 .set_killswitch_tombstone_pruning(true);
759 }
760
761 let authority_name = config.protocol_public_key();
762
763 info!("create authority state");
764 let state = AuthorityState::new(
765 authority_name,
766 secret,
767 config.supported_protocol_versions.unwrap(),
768 store.clone(),
769 cache_traits.clone(),
770 epoch_store.clone(),
771 committee_store.clone(),
772 index_store.clone(),
773 rpc_index,
774 checkpoint_store.clone(),
775 &prometheus_registry,
776 genesis.objects(),
777 &db_checkpoint_config,
778 config.clone(),
779 chain_identifier,
780 config.policy_config.clone(),
781 config.firewall_config.clone(),
782 pruner_watermarks,
783 )
784 .await;
785 if epoch_store.epoch() == 0 {
787 let txn = &genesis.transaction();
788 let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
789 let transaction =
790 sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
791 sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
792 genesis.transaction().data().clone(),
793 sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
794 ),
795 );
796 state
797 .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
798 .instrument(span)
799 .await
800 .unwrap();
801 }
802
803 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
805
806 let (end_of_epoch_channel, end_of_epoch_receiver) =
807 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
808
809 let transaction_orchestrator = if node_role.is_fullnode() && run_with_range.is_none() {
810 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
811 auth_agg.load_full(),
812 state.clone(),
813 end_of_epoch_receiver,
814 &config.db_path(),
815 &prometheus_registry,
816 &config,
817 )))
818 } else {
819 None
820 };
821
822 let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
823 state.clone(),
824 state_sync_store,
825 &transaction_orchestrator.clone(),
826 &config,
827 &prometheus_registry,
828 server_version,
829 node_role,
830 )
831 .await?;
832
833 let global_state_hasher = Arc::new(GlobalStateHasher::new(
834 cache_traits.global_state_hash_store.clone(),
835 GlobalStateHashMetrics::new(&prometheus_registry),
836 ));
837
838 let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
839 "sui",
840 ®istry_service.default_registry(),
841 );
842
843 let connection_monitor_handle =
844 mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
845 p2p_network.downgrade(),
846 Arc::new(network_connection_metrics),
847 known_peers,
848 );
849
850 let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry()));
851
852 sui_node_metrics
853 .binary_max_protocol_version
854 .set(ProtocolVersion::MAX.as_u64() as i64);
855 sui_node_metrics
856 .configured_max_protocol_version
857 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
858
859 let node_role = epoch_store.node_role();
860 let validator_components = if node_role.runs_consensus() {
861 let mut components = Self::construct_validator_components(
862 config.clone(),
863 state.clone(),
864 committee,
865 epoch_store.clone(),
866 checkpoint_store.clone(),
867 state_sync_handle.clone(),
868 randomness_handle.clone(),
869 Arc::downgrade(&global_state_hasher),
870 backpressure_manager.clone(),
871 ®istry_service,
872 sui_node_metrics.clone(),
873 checkpoint_metrics.clone(),
874 node_role,
875 )
876 .await?;
877
878 if node_role.is_validator() {
879 components
880 .consensus_adapter
881 .recover_end_of_publish(&epoch_store);
882
883 components.validator_server_handle = Some(
885 components
886 .validator_server_handle
887 .take()
888 .unwrap()
889 .start()
890 .await,
891 );
892
893 endpoint_manager
895 .set_consensus_address_updater(components.consensus_manager.clone());
896 } else {
897 info!("Starting node as Observer — connecting to configured peers");
898 }
899
900 Some(components)
901 } else {
902 None
903 };
904
905 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
907
908 let node = Self {
909 config,
910 validator_components: Mutex::new(validator_components),
911 http_servers,
912 state,
913 transaction_orchestrator,
914 registry_service,
915 metrics: sui_node_metrics,
916 checkpoint_metrics,
917
918 _discovery: discovery_handle,
919 _connection_monitor_handle: connection_monitor_handle,
920 state_sync_handle,
921 randomness_handle,
922 checkpoint_store,
923 global_state_hasher: Mutex::new(Some(global_state_hasher)),
924 end_of_epoch_channel,
925 endpoint_manager,
926 backpressure_manager,
927
928 _db_checkpoint_handle: db_checkpoint_handle,
929
930 #[cfg(msim)]
931 sim_state: Default::default(),
932
933 _state_snapshot_uploader_handle: state_snapshot_handle,
934 shutdown_channel_tx: shutdown_channel,
935
936 auth_agg,
937 subscription_service_checkpoint_sender,
938 };
939
940 info!("SuiNode started!");
941 let node = Arc::new(node);
942 let node_copy = node.clone();
943 spawn_monitored_task!(async move {
944 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
945 if let Err(error) = result {
946 warn!("Reconfiguration finished with error {:?}", error);
947 }
948 });
949
950 Ok(node)
951 }
952
953 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
954 self.end_of_epoch_channel.subscribe()
955 }
956
957 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
958 self.shutdown_channel_tx.subscribe()
959 }
960
961 pub fn current_epoch_for_testing(&self) -> EpochId {
962 self.state.current_epoch_for_testing()
963 }
964
965 pub fn db_checkpoint_path(&self) -> PathBuf {
966 self.config.db_checkpoint_path()
967 }
968
969 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
971 info!("close_epoch (current epoch = {})", epoch_store.epoch());
972 self.validator_components
973 .lock()
974 .await
975 .as_ref()
976 .ok_or_else(|| SuiError::from("Node is not a validator"))?
977 .consensus_adapter
978 .close_epoch(epoch_store);
979 Ok(())
980 }
981
982 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
983 self.state
984 .clear_override_protocol_upgrade_buffer_stake(epoch)
985 }
986
987 pub fn set_override_protocol_upgrade_buffer_stake(
988 &self,
989 epoch: EpochId,
990 buffer_stake_bps: u64,
991 ) -> SuiResult {
992 self.state
993 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
994 }
995
996 pub async fn close_epoch_for_testing(&self) -> SuiResult {
999 let epoch_store = self.state.epoch_store_for_testing();
1000 self.close_epoch(&epoch_store).await
1001 }
1002
1003 fn start_state_snapshot(
1004 config: &NodeConfig,
1005 prometheus_registry: &Registry,
1006 checkpoint_store: Arc<CheckpointStore>,
1007 chain_identifier: ChainIdentifier,
1008 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1009 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1010 let snapshot_uploader = StateSnapshotUploader::new(
1011 &config.db_checkpoint_path(),
1012 &config.snapshot_path(),
1013 remote_store_config.clone(),
1014 60,
1015 prometheus_registry,
1016 checkpoint_store,
1017 chain_identifier,
1018 config.state_snapshot_write_config.archive_interval_epochs,
1019 )?;
1020 Ok(Some(snapshot_uploader.start()))
1021 } else {
1022 Ok(None)
1023 }
1024 }
1025
1026 fn start_db_checkpoint(
1027 config: &NodeConfig,
1028 prometheus_registry: &Registry,
1029 state_snapshot_enabled: bool,
1030 ) -> Result<(
1031 DBCheckpointConfig,
1032 Option<tokio::sync::broadcast::Sender<()>>,
1033 )> {
1034 let checkpoint_path = Some(
1035 config
1036 .db_checkpoint_config
1037 .checkpoint_path
1038 .clone()
1039 .unwrap_or_else(|| config.db_checkpoint_path()),
1040 );
1041 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1042 DBCheckpointConfig {
1043 checkpoint_path,
1044 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1045 true
1046 } else {
1047 config
1048 .db_checkpoint_config
1049 .perform_db_checkpoints_at_epoch_end
1050 },
1051 ..config.db_checkpoint_config.clone()
1052 }
1053 } else {
1054 config.db_checkpoint_config.clone()
1055 };
1056
1057 match (
1058 db_checkpoint_config.object_store_config.as_ref(),
1059 state_snapshot_enabled,
1060 ) {
1061 (None, false) => Ok((db_checkpoint_config, None)),
1066 (_, _) => {
1067 let handler = DBCheckpointHandler::new(
1068 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1069 db_checkpoint_config.object_store_config.as_ref(),
1070 60,
1071 db_checkpoint_config
1072 .prune_and_compact_before_upload
1073 .unwrap_or(true),
1074 config.authority_store_pruning_config.clone(),
1075 prometheus_registry,
1076 state_snapshot_enabled,
1077 )?;
1078 Ok((
1079 db_checkpoint_config,
1080 Some(DBCheckpointHandler::start(handler)),
1081 ))
1082 }
1083 }
1084 }
1085
1086 fn create_p2p_network(
1087 config: &NodeConfig,
1088 state_sync_store: RocksDbStore,
1089 chain_identifier: ChainIdentifier,
1090 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1091 prometheus_registry: &Registry,
1092 ) -> Result<P2pComponents> {
1093 let mut p2p_config = config.p2p_config.clone();
1094 {
1095 let disc = p2p_config.discovery.get_or_insert_with(Default::default);
1096 if disc.peer_addr_store_path.is_none() {
1097 disc.peer_addr_store_path =
1098 Some(config.db_path().join("discovery_peer_cache.yaml"));
1099 }
1100 }
1101 let mut discovery_builder = discovery::Builder::new().config(p2p_config.clone());
1102 if let Some(consensus_config) = &config.consensus_config {
1103 let effective_addr = consensus_config
1104 .external_address
1105 .as_ref()
1106 .or(consensus_config.listen_address.as_ref());
1107 if let Some(addr) = effective_addr {
1108 discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1109 }
1110 }
1111 let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1112 let discovery_sender = discovery.sender();
1113
1114 let (state_sync, state_sync_router) = state_sync::Builder::new()
1115 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1116 .store(state_sync_store)
1117 .archive_config(config.archive_reader_config())
1118 .discovery_sender(discovery_sender)
1119 .with_metrics(prometheus_registry)
1120 .build();
1121
1122 let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1123 let known_peers: HashMap<PeerId, String> = discovery_config
1124 .allowlisted_peers
1125 .clone()
1126 .into_iter()
1127 .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1128 .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1129 peer.peer_id
1130 .map(|peer_id| (peer_id, "seed_peer".to_string()))
1131 }))
1132 .collect();
1133
1134 let (randomness, randomness_router) =
1135 randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1136 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1137 .with_metrics(prometheus_registry)
1138 .build();
1139
1140 let p2p_network = {
1141 let routes = anemo::Router::new()
1142 .add_rpc_service(discovery_server)
1143 .merge(state_sync_router);
1144 let routes = routes.merge(randomness_router);
1145
1146 let inbound_network_metrics =
1147 mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1148 let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1149 "sui",
1150 "outbound",
1151 prometheus_registry,
1152 );
1153
1154 let service = ServiceBuilder::new()
1155 .layer(
1156 TraceLayer::new_for_server_errors()
1157 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1158 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1159 )
1160 .layer(CallbackLayer::new(
1161 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1162 Arc::new(inbound_network_metrics),
1163 config.p2p_config.excessive_message_size(),
1164 ),
1165 ))
1166 .service(routes);
1167
1168 let outbound_layer = ServiceBuilder::new()
1169 .layer(
1170 TraceLayer::new_for_client_and_server_errors()
1171 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1172 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1173 )
1174 .layer(CallbackLayer::new(
1175 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1176 Arc::new(outbound_network_metrics),
1177 config.p2p_config.excessive_message_size(),
1178 ),
1179 ))
1180 .into_inner();
1181
1182 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1183 anemo_config.max_request_frame_size = Some(1 << 20);
1186 anemo_config.max_response_frame_size = Some(128 << 20);
1189
1190 let mut quic_config = anemo_config.quic.unwrap_or_default();
1193 if quic_config.socket_send_buffer_size.is_none() {
1194 quic_config.socket_send_buffer_size = Some(20 << 20);
1195 }
1196 if quic_config.socket_receive_buffer_size.is_none() {
1197 quic_config.socket_receive_buffer_size = Some(20 << 20);
1198 }
1199 quic_config.allow_failed_socket_buffer_size_setting = true;
1200
1201 if quic_config.max_concurrent_bidi_streams.is_none() {
1204 quic_config.max_concurrent_bidi_streams = Some(500);
1205 }
1206 if quic_config.max_concurrent_uni_streams.is_none() {
1207 quic_config.max_concurrent_uni_streams = Some(500);
1208 }
1209 if quic_config.stream_receive_window.is_none() {
1210 quic_config.stream_receive_window = Some(100 << 20);
1211 }
1212 if quic_config.receive_window.is_none() {
1213 quic_config.receive_window = Some(200 << 20);
1214 }
1215 if quic_config.send_window.is_none() {
1216 quic_config.send_window = Some(200 << 20);
1217 }
1218 if quic_config.crypto_buffer_size.is_none() {
1219 quic_config.crypto_buffer_size = Some(1 << 20);
1220 }
1221 if quic_config.max_idle_timeout_ms.is_none() {
1222 quic_config.max_idle_timeout_ms = Some(10_000);
1223 }
1224 if quic_config.keep_alive_interval_ms.is_none() {
1225 quic_config.keep_alive_interval_ms = Some(5_000);
1226 }
1227 anemo_config.quic = Some(quic_config);
1228
1229 let server_name = format!("sui-{}", chain_identifier);
1230 let network = Network::bind(config.p2p_config.listen_address)
1231 .server_name(&server_name)
1232 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1233 .config(anemo_config)
1234 .outbound_request_layer(outbound_layer)
1235 .start(service)?;
1236 info!(
1237 server_name = server_name,
1238 "P2p network started on {}",
1239 network.local_addr()
1240 );
1241
1242 network
1243 };
1244
1245 let discovery_handle =
1246 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1247 let state_sync_handle = state_sync.start(p2p_network.clone());
1248 let randomness_handle = randomness.start(p2p_network.clone());
1249
1250 Ok(P2pComponents {
1251 p2p_network,
1252 known_peers,
1253 discovery_handle,
1254 state_sync_handle,
1255 randomness_handle,
1256 endpoint_manager,
1257 })
1258 }
1259
1260 async fn construct_validator_components(
1261 config: NodeConfig,
1262 state: Arc<AuthorityState>,
1263 committee: Arc<Committee>,
1264 epoch_store: Arc<AuthorityPerEpochStore>,
1265 checkpoint_store: Arc<CheckpointStore>,
1266 state_sync_handle: state_sync::Handle,
1267 randomness_handle: randomness::Handle,
1268 global_state_hasher: Weak<GlobalStateHasher>,
1269 backpressure_manager: Arc<BackpressureManager>,
1270 registry_service: &RegistryService,
1271 sui_node_metrics: Arc<SuiNodeMetrics>,
1272 checkpoint_metrics: Arc<CheckpointMetrics>,
1273 node_role: NodeRole,
1274 ) -> Result<ValidatorComponents> {
1275 let mut config_clone = config.clone();
1276 let consensus_config = config_clone
1277 .consensus_config
1278 .as_mut()
1279 .ok_or_else(|| anyhow!("Node is missing consensus config"))?;
1280
1281 let client = Arc::new(UpdatableConsensusClient::new());
1282 let inflight_slot_freed_notify = Arc::new(tokio::sync::Notify::new());
1283 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1284 &committee,
1285 consensus_config,
1286 state.name,
1287 ®istry_service.default_registry(),
1288 client.clone(),
1289 checkpoint_store.clone(),
1290 inflight_slot_freed_notify.clone(),
1291 ));
1292
1293 let consensus_manager = Arc::new(ConsensusManager::new(
1294 &config,
1295 consensus_config,
1296 registry_service,
1297 client,
1298 node_role,
1299 ));
1300
1301 let consensus_store_pruner = ConsensusStorePruner::new(
1303 consensus_manager.get_storage_base_path(),
1304 consensus_config.db_retention_epochs(),
1305 consensus_config.db_pruner_period(),
1306 ®istry_service.default_registry(),
1307 );
1308
1309 let sui_tx_validator_metrics =
1310 SuiTxValidatorMetrics::new(®istry_service.default_registry());
1311
1312 let (validator_server_handle, admission_queue) = if node_role.is_validator() {
1313 let (handle, queue) = Self::start_grpc_validator_service(
1314 &config,
1315 state.clone(),
1316 consensus_adapter.clone(),
1317 epoch_store.clone(),
1318 ®istry_service.default_registry(),
1319 inflight_slot_freed_notify,
1320 )
1321 .await?;
1322 (Some(handle), queue)
1323 } else {
1324 (None, None)
1325 };
1326
1327 let validator_overload_monitor_handle = if node_role.is_validator()
1330 && config
1331 .authority_overload_config
1332 .max_load_shedding_percentage
1333 > 0
1334 {
1335 let authority_state = Arc::downgrade(&state);
1336 let overload_config = config.authority_overload_config.clone();
1337 fail_point!("starting_overload_monitor");
1338 Some(spawn_monitored_task!(overload_monitor(
1339 authority_state,
1340 overload_config,
1341 )))
1342 } else {
1343 None
1344 };
1345
1346 Self::start_epoch_specific_validator_components(
1347 &config,
1348 state.clone(),
1349 consensus_adapter,
1350 checkpoint_store,
1351 epoch_store,
1352 state_sync_handle,
1353 randomness_handle,
1354 consensus_manager,
1355 consensus_store_pruner,
1356 global_state_hasher,
1357 backpressure_manager,
1358 validator_server_handle,
1359 validator_overload_monitor_handle,
1360 checkpoint_metrics,
1361 sui_node_metrics,
1362 sui_tx_validator_metrics,
1363 admission_queue,
1364 node_role,
1365 )
1366 .await
1367 }
1368
1369 async fn start_epoch_specific_validator_components(
1370 config: &NodeConfig,
1371 state: Arc<AuthorityState>,
1372 consensus_adapter: Arc<ConsensusAdapter>,
1373 checkpoint_store: Arc<CheckpointStore>,
1374 epoch_store: Arc<AuthorityPerEpochStore>,
1375 state_sync_handle: state_sync::Handle,
1376 randomness_handle: randomness::Handle,
1377 consensus_manager: Arc<ConsensusManager>,
1378 consensus_store_pruner: ConsensusStorePruner,
1379 state_hasher: Weak<GlobalStateHasher>,
1380 backpressure_manager: Arc<BackpressureManager>,
1381 validator_server_handle: Option<SpawnOnce>,
1382 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1383 checkpoint_metrics: Arc<CheckpointMetrics>,
1384 sui_node_metrics: Arc<SuiNodeMetrics>,
1385 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1386 admission_queue: Option<AdmissionQueueContext>,
1387 node_role: NodeRole,
1388 ) -> Result<ValidatorComponents> {
1389 let checkpoint_service = Self::build_checkpoint_service(
1390 config,
1391 consensus_adapter.clone(),
1392 checkpoint_store.clone(),
1393 epoch_store.clone(),
1394 state.clone(),
1395 state_sync_handle,
1396 state_hasher,
1397 checkpoint_metrics.clone(),
1398 node_role,
1399 );
1400
1401 if node_role.runs_consensus() && epoch_store.randomness_state_enabled() {
1402 let randomness_manager = RandomnessManager::try_new(
1403 Arc::downgrade(&epoch_store),
1404 Box::new(consensus_adapter.clone()),
1405 randomness_handle,
1406 config.protocol_key_pair(),
1407 )
1408 .await;
1409 if let Some(randomness_manager) = randomness_manager {
1410 epoch_store
1411 .set_randomness_manager(randomness_manager)
1412 .await?;
1413 }
1414 }
1415
1416 if node_role.is_validator() {
1417 ExecutionTimeObserver::spawn(
1418 epoch_store.clone(),
1419 Box::new(consensus_adapter.clone()),
1420 config
1421 .execution_time_observer_config
1422 .clone()
1423 .unwrap_or_default(),
1424 );
1425 }
1426
1427 let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1428 None,
1429 state.metrics.clone(),
1430 ));
1431
1432 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1433 state.clone(),
1434 checkpoint_service.clone(),
1435 epoch_store.clone(),
1436 consensus_adapter.clone(),
1437 throughput_calculator,
1438 backpressure_manager,
1439 config.congestion_log.clone(),
1440 );
1441
1442 info!("Starting consensus manager asynchronously");
1443
1444 tokio::spawn({
1446 let config = config.clone();
1447 let epoch_store = epoch_store.clone();
1448 let sui_tx_validator = SuiTxValidator::new(
1449 state.clone(),
1450 epoch_store.clone(),
1451 checkpoint_service.clone(),
1452 sui_tx_validator_metrics.clone(),
1453 );
1454 let consensus_manager = consensus_manager.clone();
1455 async move {
1456 consensus_manager
1457 .start(
1458 &config,
1459 epoch_store,
1460 consensus_handler_initializer,
1461 sui_tx_validator,
1462 )
1463 .await;
1464 }
1465 });
1466 let replay_waiter = consensus_manager.replay_waiter();
1467
1468 info!("Spawning checkpoint service");
1469 let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1470 None
1471 } else {
1472 Some(replay_waiter)
1473 };
1474 checkpoint_service
1475 .spawn(epoch_store.clone(), replay_waiter)
1476 .await;
1477
1478 if node_role.is_validator() && epoch_store.authenticator_state_enabled() {
1479 Self::start_jwk_updater(
1480 config,
1481 sui_node_metrics,
1482 state.name,
1483 epoch_store.clone(),
1484 consensus_adapter.clone(),
1485 );
1486 }
1487
1488 if let Some(ctx) = &admission_queue {
1489 ctx.rotate_for_epoch(epoch_store);
1490 }
1491
1492 Ok(ValidatorComponents {
1493 validator_server_handle,
1494 validator_overload_monitor_handle,
1495 consensus_manager,
1496 consensus_store_pruner,
1497 consensus_adapter,
1498 checkpoint_metrics,
1499 sui_tx_validator_metrics,
1500 admission_queue,
1501 })
1502 }
1503
1504 fn build_checkpoint_service(
1505 config: &NodeConfig,
1506 consensus_adapter: Arc<ConsensusAdapter>,
1507 checkpoint_store: Arc<CheckpointStore>,
1508 epoch_store: Arc<AuthorityPerEpochStore>,
1509 state: Arc<AuthorityState>,
1510 state_sync_handle: state_sync::Handle,
1511 state_hasher: Weak<GlobalStateHasher>,
1512 checkpoint_metrics: Arc<CheckpointMetrics>,
1513 node_role: NodeRole,
1514 ) -> Arc<CheckpointService> {
1515 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1516 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1517
1518 debug!(
1519 "Starting checkpoint service with epoch start timestamp {}
1520 and epoch duration {}",
1521 epoch_start_timestamp_ms, epoch_duration_ms
1522 );
1523
1524 let checkpoint_output: Box<dyn CheckpointOutput> = if node_role.is_validator() {
1525 Box::new(SubmitCheckpointToConsensus {
1526 sender: consensus_adapter,
1527 signer: state.secret.clone(),
1528 authority: config.protocol_public_key(),
1529 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1530 .checked_add(epoch_duration_ms)
1531 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1532 metrics: checkpoint_metrics.clone(),
1533 })
1534 } else {
1535 LogCheckpointOutput::boxed()
1536 };
1537
1538 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1539 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1540 let max_checkpoint_size_bytes =
1541 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1542
1543 CheckpointService::build(
1544 state.clone(),
1545 checkpoint_store,
1546 epoch_store,
1547 state.get_transaction_cache_reader().clone(),
1548 state_hasher,
1549 checkpoint_output,
1550 Box::new(certified_checkpoint_output),
1551 checkpoint_metrics,
1552 max_tx_per_checkpoint,
1553 max_checkpoint_size_bytes,
1554 )
1555 }
1556
1557 fn construct_consensus_adapter(
1558 committee: &Committee,
1559 consensus_config: &ConsensusConfig,
1560 authority: AuthorityName,
1561 prometheus_registry: &Registry,
1562 consensus_client: Arc<dyn ConsensusClient>,
1563 checkpoint_store: Arc<CheckpointStore>,
1564 inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1565 ) -> ConsensusAdapter {
1566 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1567 ConsensusAdapter::new(
1570 consensus_client,
1571 checkpoint_store,
1572 authority,
1573 consensus_config.max_pending_transactions(),
1574 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1575 ca_metrics,
1576 inflight_slot_freed_notify,
1577 )
1578 }
1579
1580 async fn start_grpc_validator_service(
1581 config: &NodeConfig,
1582 state: Arc<AuthorityState>,
1583 consensus_adapter: Arc<ConsensusAdapter>,
1584 epoch_store: Arc<AuthorityPerEpochStore>,
1585 prometheus_registry: &Registry,
1586 inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1587 ) -> Result<(SpawnOnce, Option<AdmissionQueueContext>)> {
1588 let overload_config = &config.authority_overload_config;
1589 let admission_queue = overload_config.admission_queue_enabled.then(|| {
1590 let manager = Arc::new(AdmissionQueueManager::new(
1591 consensus_adapter.clone(),
1592 Arc::new(AdmissionQueueMetrics::new(prometheus_registry)),
1593 overload_config.admission_queue_capacity_fraction,
1594 overload_config.admission_queue_bypass_fraction,
1595 overload_config.admission_queue_failover_timeout,
1596 inflight_slot_freed_notify,
1597 ));
1598 AdmissionQueueContext::spawn(manager, epoch_store)
1599 });
1600 let validator_service = ValidatorService::new(
1601 state.clone(),
1602 consensus_adapter,
1603 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1604 config.policy_config.clone().map(|p| p.client_id_source),
1605 admission_queue.clone(),
1606 );
1607
1608 let mut server_conf = mysten_network::config::Config::new();
1609 server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1610 server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1611 server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1612 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1613 server_conf.load_shed = config.grpc_load_shed;
1614 let mut server_builder =
1615 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1616
1617 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1618
1619 let tls_config = sui_tls::create_rustls_server_config(
1620 config.network_key_pair().copy().private(),
1621 SUI_TLS_SERVER_NAME.to_string(),
1622 );
1623
1624 let network_address = config.network_address().clone();
1625
1626 let (ready_tx, ready_rx) = oneshot::channel();
1627
1628 let spawn_once = SpawnOnce::new(ready_rx, async move {
1629 let server = server_builder
1630 .bind(&network_address, Some(tls_config))
1631 .await
1632 .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1633 let local_addr = server.local_addr();
1634 info!("Listening to traffic on {local_addr}");
1635 ready_tx.send(()).unwrap();
1636 if let Err(err) = server.serve().await {
1637 info!("Server stopped: {err}");
1638 }
1639 info!("Server stopped");
1640 });
1641 Ok((spawn_once, admission_queue))
1642 }
1643
1644 pub fn state(&self) -> Arc<AuthorityState> {
1645 self.state.clone()
1646 }
1647
1648 #[cfg(any(test, msim))]
1649 pub fn connection_monitor_handle_for_testing(
1650 &self,
1651 ) -> &mysten_network::anemo_connection_monitor::ConnectionMonitorHandle {
1652 &self._connection_monitor_handle
1653 }
1654
1655 pub fn node_role(&self) -> NodeRole {
1656 self.state.load_epoch_store_one_call_per_task().node_role()
1657 }
1658
1659 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1661 self.state.reference_gas_price_for_testing()
1662 }
1663
1664 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1665 self.state.committee_store().clone()
1666 }
1667
1668 pub fn clone_authority_aggregator(
1679 &self,
1680 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1681 self.transaction_orchestrator
1682 .as_ref()
1683 .map(|to| to.clone_authority_aggregator())
1684 }
1685
1686 pub fn transaction_orchestrator(
1687 &self,
1688 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1689 self.transaction_orchestrator.clone()
1690 }
1691
1692 pub async fn monitor_reconfiguration(
1695 self: Arc<Self>,
1696 mut epoch_store: Arc<AuthorityPerEpochStore>,
1697 ) -> Result<()> {
1698 let checkpoint_executor_metrics =
1699 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1700
1701 loop {
1702 let mut hasher_guard = self.global_state_hasher.lock().await;
1703 let hasher = hasher_guard.take().unwrap();
1704 info!(
1705 "Creating checkpoint executor for epoch {}",
1706 epoch_store.epoch()
1707 );
1708 let checkpoint_executor = CheckpointExecutor::new(
1709 epoch_store.clone(),
1710 self.checkpoint_store.clone(),
1711 self.state.clone(),
1712 hasher.clone(),
1713 self.backpressure_manager.clone(),
1714 self.config.checkpoint_executor_config.clone(),
1715 checkpoint_executor_metrics.clone(),
1716 self.subscription_service_checkpoint_sender.clone(),
1717 );
1718
1719 let run_with_range = self.config.run_with_range;
1720
1721 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1722
1723 self.metrics
1725 .current_protocol_version
1726 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1727
1728 if let Some(components) = &*self.validator_components.lock().await
1731 && cur_epoch_store.is_validator()
1732 {
1733 tokio::time::sleep(Duration::from_millis(1)).await;
1735
1736 let config = cur_epoch_store.protocol_config();
1737 let mut supported_protocol_versions = self
1738 .config
1739 .supported_protocol_versions
1740 .expect("Supported versions should be populated")
1741 .truncate_below(config.version);
1743
1744 while supported_protocol_versions.max > config.version {
1745 let proposed_protocol_config = ProtocolConfig::get_for_version(
1746 supported_protocol_versions.max,
1747 cur_epoch_store.get_chain(),
1748 );
1749
1750 if proposed_protocol_config.enable_accumulators()
1751 && !epoch_store.accumulator_root_exists()
1752 {
1753 error!(
1754 "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1755 supported_protocol_versions.max
1756 );
1757 supported_protocol_versions.max = supported_protocol_versions.max.prev();
1758 } else {
1759 break;
1760 }
1761 }
1762
1763 let binary_config = config.binary_config(None);
1764 let transaction = ConsensusTransaction::new_capability_notification_v2(
1765 AuthorityCapabilitiesV2::new(
1766 self.state.name,
1767 cur_epoch_store.get_chain_identifier().chain(),
1768 supported_protocol_versions,
1769 self.state
1770 .get_available_system_packages(&binary_config)
1771 .await,
1772 ),
1773 );
1774 info!(?transaction, "submitting capabilities to consensus");
1775 components.consensus_adapter.submit(
1776 transaction,
1777 None,
1778 &cur_epoch_store,
1779 None,
1780 None,
1781 )?;
1782 }
1783
1784 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1785
1786 if stop_condition == StopReason::RunWithRangeCondition {
1787 SuiNode::shutdown(&self).await;
1788 self.shutdown_channel_tx
1789 .send(run_with_range)
1790 .expect("RunWithRangeCondition met but failed to send shutdown message");
1791 return Ok(());
1792 }
1793
1794 let latest_system_state = self
1796 .state
1797 .get_object_cache_reader()
1798 .get_sui_system_state_object_unsafe()
1799 .expect("Read Sui System State object cannot fail");
1800
1801 #[cfg(msim)]
1802 if !self
1803 .sim_state
1804 .sim_safe_mode_expected
1805 .load(Ordering::Relaxed)
1806 {
1807 debug_assert!(!latest_system_state.safe_mode());
1808 }
1809
1810 #[cfg(not(msim))]
1811 debug_assert!(!latest_system_state.safe_mode());
1812
1813 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1814 && self.state.is_fullnode(&cur_epoch_store)
1815 {
1816 warn!(
1817 "Failed to send end of epoch notification to subscriber: {:?}",
1818 err
1819 );
1820 }
1821
1822 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1823 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1824
1825 self.auth_agg.store(Arc::new(
1826 self.auth_agg
1827 .load()
1828 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1829 ));
1830
1831 let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1832 let next_epoch = next_epoch_committee.epoch();
1833 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1834
1835 info!(
1836 next_epoch,
1837 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1838 );
1839
1840 fail_point_async!("reconfig_delay");
1841
1842 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1843
1844 update_peer_addresses(
1845 &self.config,
1846 &self.endpoint_manager,
1847 &new_epoch_start_state,
1848 Some(cur_epoch_store.epoch_start_state()),
1849 );
1850
1851 let mut validator_components_lock_guard = self.validator_components.lock().await;
1852
1853 let new_epoch_store = self
1857 .reconfigure_state(
1858 &self.state,
1859 &cur_epoch_store,
1860 next_epoch_committee.clone(),
1861 new_epoch_start_state,
1862 hasher.clone(),
1863 )
1864 .await;
1865
1866 let new_role = new_epoch_store.node_role();
1867
1868 let new_validator_components = if let Some(ValidatorComponents {
1869 validator_server_handle,
1870 validator_overload_monitor_handle,
1871 consensus_manager,
1872 consensus_store_pruner,
1873 consensus_adapter,
1874 checkpoint_metrics,
1875 sui_tx_validator_metrics,
1876 admission_queue,
1877 }) = validator_components_lock_guard.take()
1878 {
1879 info!("Reconfiguring node (was running consensus).");
1880
1881 consensus_manager.shutdown().await;
1882 info!("Consensus has shut down.");
1883
1884 info!("Epoch store finished reconfiguration.");
1885
1886 let global_state_hasher_metrics = Arc::into_inner(hasher)
1889 .expect("Object state hasher should have no other references at this point")
1890 .metrics();
1891 let new_hasher = Arc::new(GlobalStateHasher::new(
1892 self.state.get_global_state_hash_store().clone(),
1893 global_state_hasher_metrics,
1894 ));
1895 let weak_hasher = Arc::downgrade(&new_hasher);
1896 *hasher_guard = Some(new_hasher);
1897
1898 consensus_store_pruner.prune(next_epoch).await;
1899
1900 if new_role.runs_consensus() {
1901 info!("Restarting consensus as {new_role}");
1902 Some(
1903 Self::start_epoch_specific_validator_components(
1904 &self.config,
1905 self.state.clone(),
1906 consensus_adapter,
1907 self.checkpoint_store.clone(),
1908 new_epoch_store.clone(),
1909 self.state_sync_handle.clone(),
1910 self.randomness_handle.clone(),
1911 consensus_manager,
1912 consensus_store_pruner,
1913 weak_hasher,
1914 self.backpressure_manager.clone(),
1915 validator_server_handle,
1916 validator_overload_monitor_handle,
1917 checkpoint_metrics,
1918 self.metrics.clone(),
1919 sui_tx_validator_metrics,
1920 admission_queue,
1921 new_role,
1922 )
1923 .await?,
1924 )
1925 } else {
1926 info!(
1927 "This node has new role {new_role} and no longer runs consensus after reconfiguration"
1928 );
1929 None
1930 }
1931 } else {
1932 let global_state_hasher_metrics = Arc::into_inner(hasher)
1935 .expect("Object state hasher should have no other references at this point")
1936 .metrics();
1937 let new_hasher = Arc::new(GlobalStateHasher::new(
1938 self.state.get_global_state_hash_store().clone(),
1939 global_state_hasher_metrics,
1940 ));
1941 let weak_hasher = Arc::downgrade(&new_hasher);
1942 *hasher_guard = Some(new_hasher);
1943
1944 if new_role.runs_consensus() {
1945 info!("Promoting node to {new_role}, starting consensus components");
1946
1947 let mut components = Self::construct_validator_components(
1948 self.config.clone(),
1949 self.state.clone(),
1950 Arc::new(next_epoch_committee.clone()),
1951 new_epoch_store.clone(),
1952 self.checkpoint_store.clone(),
1953 self.state_sync_handle.clone(),
1954 self.randomness_handle.clone(),
1955 weak_hasher,
1956 self.backpressure_manager.clone(),
1957 &self.registry_service,
1958 self.metrics.clone(),
1959 self.checkpoint_metrics.clone(),
1960 new_role,
1961 )
1962 .await?;
1963
1964 if new_role.is_validator() {
1965 components.validator_server_handle = Some(
1966 components
1967 .validator_server_handle
1968 .take()
1969 .unwrap()
1970 .start()
1971 .await,
1972 );
1973
1974 self.endpoint_manager
1975 .set_consensus_address_updater(components.consensus_manager.clone());
1976 }
1977
1978 Some(components)
1979 } else {
1980 None
1981 }
1982 };
1983 *validator_components_lock_guard = new_validator_components;
1984
1985 cur_epoch_store.release_db_handles();
1988
1989 if cfg!(msim)
1990 && !matches!(
1991 self.config
1992 .authority_store_pruning_config
1993 .num_epochs_to_retain_for_checkpoints(),
1994 None | Some(u64::MAX) | Some(0)
1995 )
1996 {
1997 self.state
1998 .prune_checkpoints_for_eligible_epochs_for_testing(
1999 self.config.clone(),
2000 sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2001 )
2002 .await?;
2003 }
2004
2005 epoch_store = new_epoch_store;
2006 info!("Reconfiguration finished");
2007 }
2008 }
2009
2010 async fn shutdown(&self) {
2011 if let Some(validator_components) = &*self.validator_components.lock().await {
2012 validator_components.consensus_manager.shutdown().await;
2013 }
2014 }
2015
2016 async fn reconfigure_state(
2017 &self,
2018 state: &Arc<AuthorityState>,
2019 cur_epoch_store: &AuthorityPerEpochStore,
2020 next_epoch_committee: Committee,
2021 next_epoch_start_system_state: EpochStartSystemState,
2022 global_state_hasher: Arc<GlobalStateHasher>,
2023 ) -> Arc<AuthorityPerEpochStore> {
2024 let next_epoch = next_epoch_committee.epoch();
2025
2026 let last_checkpoint = self
2027 .checkpoint_store
2028 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2029 .expect("Error loading last checkpoint for current epoch")
2030 .expect("Could not load last checkpoint for current epoch");
2031
2032 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2033
2034 assert_eq!(
2035 Some(last_checkpoint_seq),
2036 self.checkpoint_store
2037 .get_highest_executed_checkpoint_seq_number()
2038 .expect("Error loading highest executed checkpoint sequence number")
2039 );
2040
2041 let epoch_start_configuration = EpochStartConfiguration::new(
2042 next_epoch_start_system_state,
2043 *last_checkpoint.digest(),
2044 state.get_object_store().as_ref(),
2045 EpochFlag::default_flags_for_new_epoch(&state.config),
2046 )
2047 .expect("EpochStartConfiguration construction cannot fail");
2048
2049 let new_epoch_store = self
2050 .state
2051 .reconfigure(
2052 cur_epoch_store,
2053 self.config.supported_protocol_versions.unwrap(),
2054 next_epoch_committee,
2055 epoch_start_configuration,
2056 global_state_hasher,
2057 &self.config.expensive_safety_check_config,
2058 last_checkpoint_seq,
2059 )
2060 .await
2061 .expect("Reconfigure authority state cannot fail");
2062 info!(next_epoch, "Node State has been reconfigured");
2063 assert_eq!(next_epoch, new_epoch_store.epoch());
2064 self.state.get_reconfig_api().update_epoch_flags_metrics(
2065 cur_epoch_store.epoch_start_config().flags(),
2066 new_epoch_store.epoch_start_config().flags(),
2067 );
2068
2069 new_epoch_store
2070 }
2071
2072 pub fn get_config(&self) -> &NodeConfig {
2073 &self.config
2074 }
2075
2076 pub fn randomness_handle(&self) -> randomness::Handle {
2077 self.randomness_handle.clone()
2078 }
2079
2080 pub fn state_sync_handle(&self) -> state_sync::Handle {
2081 self.state_sync_handle.clone()
2082 }
2083
2084 pub fn endpoint_manager(&self) -> &EndpointManager {
2085 &self.endpoint_manager
2086 }
2087
2088 fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2090 let digest_str = digest.to_string();
2091 if digest_str.len() >= 8 {
2092 digest_str[0..8].to_string()
2093 } else {
2094 digest_str
2095 }
2096 }
2097
2098 async fn check_and_recover_forks(
2102 checkpoint_store: &CheckpointStore,
2103 checkpoint_metrics: &CheckpointMetrics,
2104 fork_recovery: Option<&ForkRecoveryConfig>,
2105 ) -> Result<()> {
2106 if let Some(recovery) = fork_recovery {
2108 Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2109 Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2110 }
2111
2112 if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2113 .get_checkpoint_fork_detected()
2114 .map_err(|e| {
2115 error!("Failed to check for checkpoint fork: {:?}", e);
2116 e
2117 })?
2118 {
2119 Self::handle_checkpoint_fork(
2120 checkpoint_seq,
2121 checkpoint_digest,
2122 checkpoint_metrics,
2123 fork_recovery,
2124 )
2125 .await?;
2126 }
2127 if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2128 .get_transaction_fork_detected()
2129 .map_err(|e| {
2130 error!("Failed to check for transaction fork: {:?}", e);
2131 e
2132 })?
2133 {
2134 Self::handle_transaction_fork(
2135 tx_digest,
2136 expected_effects,
2137 actual_effects,
2138 checkpoint_metrics,
2139 fork_recovery,
2140 )
2141 .await?;
2142 }
2143
2144 Ok(())
2145 }
2146
2147 fn try_recover_checkpoint_fork(
2148 checkpoint_store: &CheckpointStore,
2149 recovery: &ForkRecoveryConfig,
2150 ) -> Result<()> {
2151 for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2154 let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2155 anyhow::bail!(
2156 "Invalid checkpoint digest override for seq {}: {}",
2157 seq,
2158 expected_digest_str
2159 );
2160 };
2161
2162 if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2163 let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2164 if local_digest != expected_digest {
2165 info!(
2166 seq,
2167 local = %Self::get_digest_prefix(local_digest),
2168 expected = %Self::get_digest_prefix(expected_digest),
2169 "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2170 seq
2171 );
2172 checkpoint_store
2173 .clear_locally_computed_checkpoints_from(*seq)
2174 .context(
2175 "Failed to clear locally computed checkpoints from override seq",
2176 )?;
2177 }
2178 }
2179 }
2180
2181 if let Some((checkpoint_seq, checkpoint_digest)) =
2182 checkpoint_store.get_checkpoint_fork_detected()?
2183 && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2184 {
2185 info!(
2186 "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2187 checkpoint_seq, checkpoint_digest
2188 );
2189 checkpoint_store
2190 .clear_checkpoint_fork_detected()
2191 .expect("Failed to clear checkpoint fork detected marker");
2192 }
2193 Ok(())
2194 }
2195
2196 fn try_recover_transaction_fork(
2197 checkpoint_store: &CheckpointStore,
2198 recovery: &ForkRecoveryConfig,
2199 ) -> Result<()> {
2200 if recovery.transaction_overrides.is_empty() {
2201 return Ok(());
2202 }
2203
2204 if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2205 && recovery
2206 .transaction_overrides
2207 .contains_key(&tx_digest.to_string())
2208 {
2209 info!(
2210 "Fork recovery enabled: clearing transaction fork for tx {:?}",
2211 tx_digest
2212 );
2213 checkpoint_store
2214 .clear_transaction_fork_detected()
2215 .expect("Failed to clear transaction fork detected marker");
2216 }
2217 Ok(())
2218 }
2219
2220 fn get_current_timestamp() -> u64 {
2221 std::time::SystemTime::now()
2222 .duration_since(std::time::SystemTime::UNIX_EPOCH)
2223 .unwrap()
2224 .as_secs()
2225 }
2226
2227 async fn handle_checkpoint_fork(
2228 checkpoint_seq: u64,
2229 checkpoint_digest: CheckpointDigest,
2230 checkpoint_metrics: &CheckpointMetrics,
2231 fork_recovery: Option<&ForkRecoveryConfig>,
2232 ) -> Result<()> {
2233 checkpoint_metrics
2234 .checkpoint_fork_crash_mode
2235 .with_label_values(&[
2236 &checkpoint_seq.to_string(),
2237 &Self::get_digest_prefix(checkpoint_digest),
2238 &Self::get_current_timestamp().to_string(),
2239 ])
2240 .set(1);
2241
2242 let behavior = fork_recovery
2243 .map(|fr| fr.fork_crash_behavior)
2244 .unwrap_or_default();
2245
2246 match behavior {
2247 ForkCrashBehavior::AwaitForkRecovery => {
2248 error!(
2249 checkpoint_seq = checkpoint_seq,
2250 checkpoint_digest = ?checkpoint_digest,
2251 "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2252 );
2253 futures::future::pending::<()>().await;
2254 unreachable!("pending() should never return");
2255 }
2256 ForkCrashBehavior::ReturnError => {
2257 error!(
2258 checkpoint_seq = checkpoint_seq,
2259 checkpoint_digest = ?checkpoint_digest,
2260 "Checkpoint fork detected! Returning error."
2261 );
2262 Err(anyhow::anyhow!(
2263 "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2264 checkpoint_seq,
2265 checkpoint_digest
2266 ))
2267 }
2268 }
2269 }
2270
2271 async fn handle_transaction_fork(
2272 tx_digest: TransactionDigest,
2273 expected_effects_digest: TransactionEffectsDigest,
2274 actual_effects_digest: TransactionEffectsDigest,
2275 checkpoint_metrics: &CheckpointMetrics,
2276 fork_recovery: Option<&ForkRecoveryConfig>,
2277 ) -> Result<()> {
2278 checkpoint_metrics
2279 .transaction_fork_crash_mode
2280 .with_label_values(&[
2281 &Self::get_digest_prefix(tx_digest),
2282 &Self::get_digest_prefix(expected_effects_digest),
2283 &Self::get_digest_prefix(actual_effects_digest),
2284 &Self::get_current_timestamp().to_string(),
2285 ])
2286 .set(1);
2287
2288 let behavior = fork_recovery
2289 .map(|fr| fr.fork_crash_behavior)
2290 .unwrap_or_default();
2291
2292 match behavior {
2293 ForkCrashBehavior::AwaitForkRecovery => {
2294 error!(
2295 tx_digest = ?tx_digest,
2296 expected_effects_digest = ?expected_effects_digest,
2297 actual_effects_digest = ?actual_effects_digest,
2298 "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2299 );
2300 futures::future::pending::<()>().await;
2301 unreachable!("pending() should never return");
2302 }
2303 ForkCrashBehavior::ReturnError => {
2304 error!(
2305 tx_digest = ?tx_digest,
2306 expected_effects_digest = ?expected_effects_digest,
2307 actual_effects_digest = ?actual_effects_digest,
2308 "Transaction fork detected! Returning error."
2309 );
2310 Err(anyhow::anyhow!(
2311 "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2312 tx_digest,
2313 expected_effects_digest,
2314 actual_effects_digest
2315 ))
2316 }
2317 }
2318 }
2319}
2320
2321#[cfg(not(msim))]
2322impl SuiNode {
2323 async fn fetch_jwks(
2324 _authority: AuthorityName,
2325 provider: &OIDCProvider,
2326 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2327 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2328 use sui_types::error::SuiErrorKind;
2329 let client = reqwest::Client::new();
2330 fetch_jwks(provider, &client, true)
2331 .await
2332 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2333 }
2334}
2335
2336#[cfg(msim)]
2337impl SuiNode {
2338 pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2339 self.sim_state.sim_node.id()
2340 }
2341
2342 pub fn set_safe_mode_expected(&self, new_value: bool) {
2343 info!("Setting safe mode expected to {}", new_value);
2344 self.sim_state
2345 .sim_safe_mode_expected
2346 .store(new_value, Ordering::Relaxed);
2347 }
2348
2349 #[allow(unused_variables)]
2350 async fn fetch_jwks(
2351 authority: AuthorityName,
2352 provider: &OIDCProvider,
2353 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2354 get_jwk_injector()(authority, provider)
2355 }
2356}
2357
2358enum SpawnOnce {
2359 Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2361 #[allow(unused)]
2362 Started(JoinHandle<()>),
2363}
2364
2365impl SpawnOnce {
2366 pub fn new(
2367 ready_rx: oneshot::Receiver<()>,
2368 future: impl Future<Output = ()> + Send + 'static,
2369 ) -> Self {
2370 Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2371 }
2372
2373 pub async fn start(self) -> Self {
2374 match self {
2375 Self::Unstarted(ready_rx, future) => {
2376 let future = future.into_inner();
2377 let handle = tokio::spawn(future);
2378 ready_rx.await.unwrap();
2379 Self::Started(handle)
2380 }
2381 Self::Started(_) => self,
2382 }
2383 }
2384}
2385
2386fn update_peer_addresses(
2390 config: &NodeConfig,
2391 endpoint_manager: &EndpointManager,
2392 epoch_start_state: &EpochStartSystemState,
2393 prev_epoch_start_state: Option<&EpochStartSystemState>,
2394) {
2395 if config.consensus_config().is_none() {
2396 return;
2397 }
2398 let new_peers: HashSet<PeerId> = epoch_start_state
2399 .get_validator_as_p2p_peers(config.protocol_public_key())
2400 .into_iter()
2401 .map(|(peer_id, address)| {
2402 endpoint_manager
2403 .update_endpoint(
2404 EndpointId::P2p(peer_id),
2405 AddressSource::Chain,
2406 vec![address],
2407 )
2408 .expect("Updating peer addresses should not fail");
2409 peer_id
2410 })
2411 .collect();
2412
2413 if let Some(prev) = prev_epoch_start_state {
2415 for (peer_id, _) in prev.get_validator_as_p2p_peers(config.protocol_public_key()) {
2416 if !new_peers.contains(&peer_id) {
2417 endpoint_manager
2418 .update_endpoint(EndpointId::P2p(peer_id), AddressSource::Chain, vec![])
2419 .expect("Clearing peer addresses should not fail");
2420 }
2421 }
2422 }
2423}
2424
2425fn build_kv_store(
2426 state: &Arc<AuthorityState>,
2427 config: &NodeConfig,
2428 registry: &Registry,
2429) -> Result<Arc<TransactionKeyValueStore>> {
2430 let metrics = KeyValueStoreMetrics::new(registry);
2431 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2432
2433 let base_url = &config.transaction_kv_store_read_config.base_url;
2434
2435 if base_url.is_empty() {
2436 info!("no http kv store url provided, using local db only");
2437 return Ok(Arc::new(db_store));
2438 }
2439
2440 let base_url: url::Url = base_url.parse().tap_err(|e| {
2441 error!(
2442 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2443 base_url, e
2444 )
2445 })?;
2446
2447 let network_str = match state.get_chain_identifier().chain() {
2448 Chain::Mainnet => "/mainnet",
2449 _ => {
2450 info!("using local db only for kv store");
2451 return Ok(Arc::new(db_store));
2452 }
2453 };
2454
2455 let base_url = base_url.join(network_str)?.to_string();
2456 let http_store = HttpKVStore::new_kv(
2457 &base_url,
2458 config.transaction_kv_store_read_config.cache_size,
2459 metrics.clone(),
2460 )?;
2461 info!("using local key-value store with fallback to http key-value store");
2462 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2463 db_store,
2464 http_store,
2465 metrics,
2466 "json_rpc_fallback",
2467 )))
2468}
2469
2470async fn build_http_servers(
2471 state: Arc<AuthorityState>,
2472 store: RocksDbStore,
2473 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2474 config: &NodeConfig,
2475 prometheus_registry: &Registry,
2476 server_version: ServerVersion,
2477 node_role: NodeRole,
2478) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2479 if !node_role.should_run_rpc_servers() {
2481 return Ok((HttpServers::default(), None));
2482 }
2483
2484 info!("starting rpc service with config: {:?}", config.rpc);
2485
2486 let mut router = axum::Router::new();
2487
2488 let json_rpc_router = {
2489 let traffic_controller = state.traffic_controller.clone();
2490 let mut server = JsonRpcServerBuilder::new(
2491 env!("CARGO_PKG_VERSION"),
2492 prometheus_registry,
2493 traffic_controller,
2494 config.policy_config.clone(),
2495 );
2496
2497 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2498
2499 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2500 server.register_module(ReadApi::new(
2501 state.clone(),
2502 kv_store.clone(),
2503 metrics.clone(),
2504 ))?;
2505 server.register_module(CoinReadApi::new(
2506 state.clone(),
2507 kv_store.clone(),
2508 metrics.clone(),
2509 ))?;
2510
2511 if config.run_with_range.is_none() {
2514 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2515 }
2516 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2517 server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2518
2519 if let Some(transaction_orchestrator) = transaction_orchestrator {
2520 server.register_module(TransactionExecutionApi::new(
2521 state.clone(),
2522 transaction_orchestrator.clone(),
2523 metrics.clone(),
2524 ))?;
2525 }
2526
2527 let name_service_config =
2528 if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2529 config.name_service_package_address,
2530 config.name_service_registry_id,
2531 config.name_service_reverse_registry_id,
2532 ) {
2533 sui_name_service::NameServiceConfig::new(
2534 package_address,
2535 registry_id,
2536 reverse_registry_id,
2537 )
2538 } else {
2539 match state.get_chain_identifier().chain() {
2540 Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2541 Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2542 Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2543 }
2544 };
2545
2546 server.register_module(IndexerApi::new(
2547 state.clone(),
2548 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2549 kv_store,
2550 name_service_config,
2551 metrics,
2552 config.indexer_max_subscriptions,
2553 ))?;
2554 server.register_module(MoveUtils::new(state.clone()))?;
2555
2556 let server_type = config.jsonrpc_server_type();
2557
2558 server.to_router(server_type).await?
2559 };
2560
2561 router = router.merge(json_rpc_router);
2562
2563 let (subscription_service_checkpoint_sender, subscription_service_handle) =
2564 SubscriptionService::build(prometheus_registry);
2565 let rpc_router = {
2566 let mut rpc_service =
2567 sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2568 rpc_service.with_server_version(server_version);
2569
2570 if let Some(config) = config.rpc.clone() {
2571 rpc_service.with_config(config);
2572 }
2573
2574 rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2575 rpc_service.with_subscription_service(subscription_service_handle);
2576
2577 if let Some(transaction_orchestrator) = transaction_orchestrator {
2578 rpc_service.with_executor(transaction_orchestrator.clone())
2579 }
2580
2581 rpc_service.into_router().await
2582 };
2583
2584 let layers = ServiceBuilder::new()
2585 .map_request(|mut request: axum::http::Request<_>| {
2586 if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2587 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2588 request.extensions_mut().insert(axum_connect_info);
2589 }
2590 request
2591 })
2592 .layer(axum::middleware::from_fn(server_timing_middleware))
2593 .layer(
2595 tower_http::cors::CorsLayer::new()
2596 .allow_methods([http::Method::GET, http::Method::POST])
2597 .allow_origin(tower_http::cors::Any)
2598 .allow_headers(tower_http::cors::Any)
2599 .expose_headers(tower_http::cors::Any),
2600 );
2601
2602 router = router.merge(rpc_router).layer(layers);
2603
2604 let https = if let Some((tls_config, https_address)) = config
2605 .rpc()
2606 .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2607 {
2608 let https = sui_http::Builder::new()
2609 .tls_single_cert(tls_config.cert(), tls_config.key())
2610 .and_then(|builder| builder.serve(https_address, router.clone()))
2611 .map_err(|e| anyhow::anyhow!(e))?;
2612
2613 info!(
2614 https_address =? https.local_addr(),
2615 "HTTPS rpc server listening on {}",
2616 https.local_addr()
2617 );
2618
2619 Some(https)
2620 } else {
2621 None
2622 };
2623
2624 let http = sui_http::Builder::new()
2625 .serve(&config.json_rpc_address, router)
2626 .map_err(|e| anyhow::anyhow!(e))?;
2627
2628 info!(
2629 http_address =? http.local_addr(),
2630 "HTTP rpc server listening on {}",
2631 http.local_addr()
2632 );
2633
2634 Ok((
2635 HttpServers {
2636 http: Some(http),
2637 https,
2638 },
2639 Some(subscription_service_checkpoint_sender),
2640 ))
2641}
2642
2643#[cfg(not(test))]
2644fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2645 protocol_config.max_transactions_per_checkpoint() as usize
2646}
2647
2648#[cfg(test)]
2649fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2650 2
2651}
2652
2653#[derive(Default)]
2654struct HttpServers {
2655 #[allow(unused)]
2656 http: Option<sui_http::ServerHandle>,
2657 #[allow(unused)]
2658 https: Option<sui_http::ServerHandle>,
2659}
2660
2661#[cfg(test)]
2662mod tests {
2663 use super::*;
2664 use prometheus::Registry;
2665 use std::collections::BTreeMap;
2666 use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2667 use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2668 use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2669
2670 #[tokio::test]
2671 async fn test_fork_error_and_recovery_both_paths() {
2672 let checkpoint_store = CheckpointStore::new_for_tests();
2673 let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2674
2675 let seq_num = 42;
2677 let digest = CheckpointDigest::random();
2678 checkpoint_store
2679 .record_checkpoint_fork_detected(seq_num, digest)
2680 .unwrap();
2681
2682 let fork_recovery = ForkRecoveryConfig {
2683 transaction_overrides: Default::default(),
2684 checkpoint_overrides: Default::default(),
2685 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2686 };
2687
2688 let r = SuiNode::check_and_recover_forks(
2689 &checkpoint_store,
2690 &checkpoint_metrics,
2691 Some(&fork_recovery),
2692 )
2693 .await;
2694 assert!(r.is_err());
2695 assert!(
2696 r.unwrap_err()
2697 .to_string()
2698 .contains("Checkpoint fork detected")
2699 );
2700
2701 let mut checkpoint_overrides = BTreeMap::new();
2702 checkpoint_overrides.insert(seq_num, digest.to_string());
2703 let fork_recovery_with_override = ForkRecoveryConfig {
2704 transaction_overrides: Default::default(),
2705 checkpoint_overrides,
2706 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2707 };
2708 let r = SuiNode::check_and_recover_forks(
2709 &checkpoint_store,
2710 &checkpoint_metrics,
2711 Some(&fork_recovery_with_override),
2712 )
2713 .await;
2714 assert!(r.is_ok());
2715 assert!(
2716 checkpoint_store
2717 .get_checkpoint_fork_detected()
2718 .unwrap()
2719 .is_none()
2720 );
2721
2722 let tx_digest = TransactionDigest::random();
2724 let expected_effects = TransactionEffectsDigest::random();
2725 let actual_effects = TransactionEffectsDigest::random();
2726 checkpoint_store
2727 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2728 .unwrap();
2729
2730 let fork_recovery = ForkRecoveryConfig {
2731 transaction_overrides: Default::default(),
2732 checkpoint_overrides: Default::default(),
2733 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2734 };
2735 let r = SuiNode::check_and_recover_forks(
2736 &checkpoint_store,
2737 &checkpoint_metrics,
2738 Some(&fork_recovery),
2739 )
2740 .await;
2741 assert!(r.is_err());
2742 assert!(
2743 r.unwrap_err()
2744 .to_string()
2745 .contains("Transaction fork detected")
2746 );
2747
2748 let mut transaction_overrides = BTreeMap::new();
2749 transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2750 let fork_recovery_with_override = ForkRecoveryConfig {
2751 transaction_overrides,
2752 checkpoint_overrides: Default::default(),
2753 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2754 };
2755 let r = SuiNode::check_and_recover_forks(
2756 &checkpoint_store,
2757 &checkpoint_metrics,
2758 Some(&fork_recovery_with_override),
2759 )
2760 .await;
2761 assert!(r.is_ok());
2762 assert!(
2763 checkpoint_store
2764 .get_transaction_fork_detected()
2765 .unwrap()
2766 .is_none()
2767 );
2768 }
2769}