1use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::admission_queue::{
29 AdmissionQueueContext, AdmissionQueueManager, AdmissionQueueMetrics,
30};
31use sui_core::authority::ExecutionEnv;
32use sui_core::authority::RandomnessRoundReceiver;
33use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
34use sui_core::authority::backpressure::BackpressureManager;
35use sui_core::authority::epoch_start_configuration::EpochFlag;
36use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
37use sui_core::consensus_adapter::ConsensusClient;
38use sui_core::consensus_manager::UpdatableConsensusClient;
39use sui_core::epoch::randomness::RandomnessManager;
40use sui_core::execution_cache::build_execution_cache;
41use sui_network::endpoint_manager::{AddressSource, EndpointId};
42use sui_network::validator::server::SUI_TLS_SERVER_NAME;
43use sui_types::full_checkpoint_content::Checkpoint;
44
45use sui_core::global_state_hasher::GlobalStateHashMetrics;
46use sui_core::storage::RestReadStore;
47use sui_json_rpc::bridge_api::BridgeReadApi;
48use sui_json_rpc_api::JsonRpcMetrics;
49use sui_network::randomness;
50use sui_rpc_api::RpcMetrics;
51use sui_rpc_api::ServerVersion;
52use sui_rpc_api::subscription::SubscriptionService;
53use sui_types::base_types::ConciseableName;
54use sui_types::crypto::RandomnessRound;
55use sui_types::digests::{
56 ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
57};
58use sui_types::messages_consensus::AuthorityCapabilitiesV2;
59use sui_types::sui_system_state::SuiSystemState;
60use tap::tap::TapFallible;
61use tokio::sync::oneshot;
62use tokio::sync::{Mutex, broadcast, mpsc};
63use tokio::task::JoinHandle;
64use tower::ServiceBuilder;
65use tracing::{Instrument, error_span, info};
66use tracing::{debug, error, warn};
67
68macro_rules! jwk_log {
72 ($($arg:tt)+) => {
73 if in_test_configuration() {
74 debug!($($arg)+);
75 } else {
76 info!($($arg)+);
77 }
78 };
79}
80
81use fastcrypto_zkp::bn254::zk_login::JWK;
82pub use handle::SuiNodeHandle;
83use mysten_metrics::{RegistryService, spawn_monitored_task};
84use mysten_service::server_timing::server_timing_middleware;
85use sui_config::node::{DBCheckpointConfig, RunWithRange};
86use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
87use sui_config::node_config_metrics::NodeConfigMetrics;
88use sui_config::{ConsensusConfig, NodeConfig};
89use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
90use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
91use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
92use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
93use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
94use sui_core::authority_aggregator::AuthorityAggregator;
95use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
96use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
97use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
98use sui_core::checkpoints::{
99 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
100 SubmitCheckpointToConsensus,
101};
102use sui_core::consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics};
103use sui_core::consensus_manager::ConsensusManager;
104use sui_core::consensus_throughput_calculator::ConsensusThroughputCalculator;
105use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
106use sui_core::db_checkpoint_handler::DBCheckpointHandler;
107use sui_core::epoch::committee_store::CommitteeStore;
108use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
109use sui_core::epoch::epoch_metrics::EpochMetrics;
110use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
111use sui_core::global_state_hasher::GlobalStateHasher;
112use sui_core::jsonrpc_index::IndexStore;
113use sui_core::module_cache_metrics::ResolverMetrics;
114use sui_core::overload_monitor::overload_monitor;
115use sui_core::rpc_index::RpcIndexStore;
116use sui_core::signature_verifier::SignatureVerifierMetrics;
117use sui_core::storage::RocksDbStore;
118use sui_core::transaction_orchestrator::TransactionOrchestrator;
119use sui_core::{
120 authority::{AuthorityState, AuthorityStore},
121 authority_client::NetworkAuthorityClient,
122};
123use sui_json_rpc::JsonRpcServerBuilder;
124use sui_json_rpc::coin_api::CoinReadApi;
125use sui_json_rpc::governance_api::GovernanceReadApi;
126use sui_json_rpc::indexer_api::IndexerApi;
127use sui_json_rpc::move_utils::MoveUtils;
128use sui_json_rpc::read_api::ReadApi;
129use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
130use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
131use sui_macros::fail_point;
132use sui_macros::{fail_point_async, replay_log};
133use sui_network::api::ValidatorServer;
134use sui_network::discovery;
135use sui_network::endpoint_manager::EndpointManager;
136use sui_network::state_sync;
137use sui_network::validator::server::ServerBuilder;
138use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
139use sui_snapshot::uploader::StateSnapshotUploader;
140use sui_storage::{
141 http_key_value_store::HttpKVStore,
142 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
143 key_value_store_metrics::KeyValueStoreMetrics,
144};
145use sui_types::base_types::{AuthorityName, EpochId};
146use sui_types::committee::Committee;
147use sui_types::crypto::KeypairTraits;
148use sui_types::error::{SuiError, SuiResult};
149use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
150use sui_types::sui_system_state::SuiSystemStateTrait;
151use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
152use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
153use sui_types::supported_protocol_versions::SupportedProtocolVersions;
154use typed_store::DBMetrics;
155use typed_store::rocks::default_db_options;
156
157use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
158
159pub mod admin;
160mod handle;
161pub mod metrics;
162
163pub struct ValidatorComponents {
164 validator_server_handle: SpawnOnce,
165 validator_overload_monitor_handle: Option<JoinHandle<()>>,
166 consensus_manager: Arc<ConsensusManager>,
167 consensus_store_pruner: ConsensusStorePruner,
168 consensus_adapter: Arc<ConsensusAdapter>,
169 checkpoint_metrics: Arc<CheckpointMetrics>,
170 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
171 admission_queue: Option<AdmissionQueueContext>,
172}
173pub struct P2pComponents {
174 p2p_network: Network,
175 known_peers: HashMap<PeerId, String>,
176 discovery_handle: discovery::Handle,
177 state_sync_handle: state_sync::Handle,
178 randomness_handle: randomness::Handle,
179 endpoint_manager: EndpointManager,
180}
181
182#[cfg(msim)]
183mod simulator {
184 use std::sync::atomic::AtomicBool;
185 use sui_types::error::SuiErrorKind;
186
187 use super::*;
188 pub(super) struct SimState {
189 pub sim_node: sui_simulator::runtime::NodeHandle,
190 pub sim_safe_mode_expected: AtomicBool,
191 _leak_detector: sui_simulator::NodeLeakDetector,
192 }
193
194 impl Default for SimState {
195 fn default() -> Self {
196 Self {
197 sim_node: sui_simulator::runtime::NodeHandle::current(),
198 sim_safe_mode_expected: AtomicBool::new(false),
199 _leak_detector: sui_simulator::NodeLeakDetector::new(),
200 }
201 }
202 }
203
204 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
205 + Send
206 + Sync
207 + 'static;
208
209 fn default_fetch_jwks(
210 _authority: AuthorityName,
211 _provider: &OIDCProvider,
212 ) -> SuiResult<Vec<(JwkId, JWK)>> {
213 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
214 parse_jwks(
216 sui_types::zk_login_util::DEFAULT_JWK_BYTES,
217 &OIDCProvider::Twitch,
218 true,
219 )
220 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
221 }
222
223 thread_local! {
224 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
225 }
226
227 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
228 JWK_INJECTOR.with(|injector| injector.borrow().clone())
229 }
230
231 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
232 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
233 }
234}
235
236#[cfg(msim)]
237pub use simulator::set_jwk_injector;
238#[cfg(msim)]
239use simulator::*;
240use sui_core::authority::authority_store_pruner::PrunerWatermarks;
241use sui_core::{
242 consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
243};
244
245const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
246
247pub struct SuiNode {
248 config: NodeConfig,
249 validator_components: Mutex<Option<ValidatorComponents>>,
250
251 #[allow(unused)]
253 http_servers: HttpServers,
254
255 state: Arc<AuthorityState>,
256 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
257 registry_service: RegistryService,
258 metrics: Arc<SuiNodeMetrics>,
259 checkpoint_metrics: Arc<CheckpointMetrics>,
260
261 _discovery: discovery::Handle,
262 _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
263 state_sync_handle: state_sync::Handle,
264 randomness_handle: randomness::Handle,
265 checkpoint_store: Arc<CheckpointStore>,
266 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
267
268 end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
270
271 endpoint_manager: EndpointManager,
273
274 backpressure_manager: Arc<BackpressureManager>,
275
276 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
277
278 #[cfg(msim)]
279 sim_state: SimState,
280
281 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
282 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
284
285 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
290
291 subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
292}
293
294impl fmt::Debug for SuiNode {
295 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
296 f.debug_struct("SuiNode")
297 .field("name", &self.state.name.concise())
298 .finish()
299 }
300}
301
302static MAX_JWK_KEYS_PER_FETCH: usize = 100;
303
304impl SuiNode {
305 pub async fn start(
306 config: NodeConfig,
307 registry_service: RegistryService,
308 ) -> Result<Arc<SuiNode>> {
309 Self::start_async(
310 config,
311 registry_service,
312 ServerVersion::new("sui-node", "unknown"),
313 )
314 .await
315 }
316
317 fn start_jwk_updater(
318 config: &NodeConfig,
319 metrics: Arc<SuiNodeMetrics>,
320 authority: AuthorityName,
321 epoch_store: Arc<AuthorityPerEpochStore>,
322 consensus_adapter: Arc<ConsensusAdapter>,
323 ) {
324 let epoch = epoch_store.epoch();
325
326 let supported_providers = config
327 .zklogin_oauth_providers
328 .get(&epoch_store.get_chain_identifier().chain())
329 .unwrap_or(&BTreeSet::new())
330 .iter()
331 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
332 .collect::<Vec<_>>();
333
334 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
335
336 info!(
337 ?fetch_interval,
338 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
339 );
340
341 fn validate_jwk(
342 metrics: &Arc<SuiNodeMetrics>,
343 provider: &OIDCProvider,
344 id: &JwkId,
345 jwk: &JWK,
346 ) -> bool {
347 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
348 warn!(
349 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
350 id.iss, provider
351 );
352 metrics
353 .invalid_jwks
354 .with_label_values(&[&provider.to_string()])
355 .inc();
356 return false;
357 };
358
359 if iss_provider != *provider {
360 warn!(
361 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
362 id.iss, provider, iss_provider
363 );
364 metrics
365 .invalid_jwks
366 .with_label_values(&[&provider.to_string()])
367 .inc();
368 return false;
369 }
370
371 if !check_total_jwk_size(id, jwk) {
372 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
373 metrics
374 .invalid_jwks
375 .with_label_values(&[&provider.to_string()])
376 .inc();
377 return false;
378 }
379
380 true
381 }
382
383 for p in supported_providers.into_iter() {
392 let provider_str = p.to_string();
393 let epoch_store = epoch_store.clone();
394 let consensus_adapter = consensus_adapter.clone();
395 let metrics = metrics.clone();
396 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
397 async move {
398 let mut seen = HashSet::new();
401 loop {
402 jwk_log!("fetching JWK for provider {:?}", p);
403 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
404 match Self::fetch_jwks(authority, &p).await {
405 Err(e) => {
406 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
407 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
408 tokio::time::sleep(Duration::from_secs(30)).await;
410 continue;
411 }
412 Ok(mut keys) => {
413 metrics.total_jwks
414 .with_label_values(&[&provider_str])
415 .inc_by(keys.len() as u64);
416
417 keys.retain(|(id, jwk)| {
418 validate_jwk(&metrics, &p, id, jwk) &&
419 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
420 seen.insert((id.clone(), jwk.clone()))
421 });
422
423 metrics.unique_jwks
424 .with_label_values(&[&provider_str])
425 .inc_by(keys.len() as u64);
426
427 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
430 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
431 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
432 }
433
434 for (id, jwk) in keys.into_iter() {
435 jwk_log!("Submitting JWK to consensus: {:?}", id);
436
437 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
438 consensus_adapter.submit(txn, None, &epoch_store, None, None)
439 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
440 .ok();
441 }
442 }
443 }
444 tokio::time::sleep(fetch_interval).await;
445 }
446 }
447 .instrument(error_span!("jwk_updater_task", epoch)),
448 ));
449 }
450 }
451
452 pub async fn start_async(
453 config: NodeConfig,
454 registry_service: RegistryService,
455 server_version: ServerVersion,
456 ) -> Result<Arc<SuiNode>> {
457 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
458 let mut config = config.clone();
459 if config.supported_protocol_versions.is_none() {
460 info!(
461 "populating config.supported_protocol_versions with default {:?}",
462 SupportedProtocolVersions::SYSTEM_DEFAULT
463 );
464 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
465 }
466
467 let run_with_range = config.run_with_range;
468 let is_validator = config.consensus_config().is_some();
469 let is_full_node = !is_validator;
470 let prometheus_registry = registry_service.default_registry();
471
472 info!(node =? config.protocol_public_key(),
473 "Initializing sui-node listening on {}", config.network_address
474 );
475
476 DBMetrics::init(registry_service.clone());
478
479 typed_store::init_write_sync(config.enable_db_sync_to_disk);
481
482 mysten_metrics::init_metrics(&prometheus_registry);
484 #[cfg(not(msim))]
486 mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
487
488 let genesis = config.genesis()?.clone();
489
490 let secret = Arc::pin(config.protocol_key_pair().copy());
491 let genesis_committee = genesis.committee();
492 let committee_store = Arc::new(CommitteeStore::new(
493 config.db_path().join("epochs"),
494 &genesis_committee,
495 None,
496 ));
497
498 let pruner_watermarks = Arc::new(PrunerWatermarks::default());
499 let checkpoint_store = CheckpointStore::new(
500 &config.db_path().join("checkpoints"),
501 pruner_watermarks.clone(),
502 );
503 let checkpoint_metrics = CheckpointMetrics::new(®istry_service.default_registry());
504
505 Self::check_and_recover_forks(
506 &checkpoint_store,
507 &checkpoint_metrics,
508 is_validator,
509 config.fork_recovery.as_ref(),
510 )
511 .await?;
512
513 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
515 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
516 enable_write_stall,
517 is_validator,
518 };
519 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
520 &config.db_path().join("store"),
521 Some(perpetual_tables_options),
522 Some(pruner_watermarks.epoch_id.clone()),
523 ));
524 let is_genesis = perpetual_tables
525 .database_is_empty()
526 .expect("Database read should not fail at init.");
527
528 let backpressure_manager =
529 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
530
531 let store =
532 AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
533
534 let cur_epoch = store.get_recovery_epoch_at_restart()?;
535 let committee = committee_store
536 .get_committee(&cur_epoch)?
537 .expect("Committee of the current epoch must exist");
538 let epoch_start_configuration = store
539 .get_epoch_start_configuration()?
540 .expect("EpochStartConfiguration of the current epoch must exist");
541 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
542 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
543
544 let cache_traits = build_execution_cache(
545 &config.execution_cache,
546 &prometheus_registry,
547 &store,
548 backpressure_manager.clone(),
549 );
550
551 let auth_agg = {
552 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
553 Arc::new(ArcSwap::new(Arc::new(
554 AuthorityAggregator::new_from_epoch_start_state(
555 epoch_start_configuration.epoch_start_state(),
556 &committee_store,
557 safe_client_metrics_base,
558 ),
559 )))
560 };
561
562 let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
563 let chain = match config.chain_override_for_testing {
564 Some(chain) => chain,
565 None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
566 };
567
568 let highest_executed_checkpoint = checkpoint_store
569 .get_highest_executed_checkpoint_seq_number()
570 .expect("checkpoint store read cannot fail")
571 .unwrap_or(0);
572
573 let previous_epoch_last_checkpoint = if cur_epoch == 0 {
574 0
575 } else {
576 checkpoint_store
577 .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
578 .expect("checkpoint store read cannot fail")
579 .unwrap_or(highest_executed_checkpoint)
580 };
581
582 let epoch_options = default_db_options().optimize_db_for_write_throughput(4, false);
583 let epoch_store = AuthorityPerEpochStore::new(
584 config.protocol_public_key(),
585 committee.clone(),
586 &config.db_path().join("store"),
587 Some(epoch_options.options),
588 EpochMetrics::new(®istry_service.default_registry()),
589 epoch_start_configuration,
590 cache_traits.backing_package_store.clone(),
591 cache_traits.object_store.clone(),
592 cache_metrics,
593 signature_verifier_metrics,
594 &config.expensive_safety_check_config,
595 (chain_id, chain),
596 highest_executed_checkpoint,
597 previous_epoch_last_checkpoint,
598 Arc::new(SubmittedTransactionCacheMetrics::new(
599 ®istry_service.default_registry(),
600 )),
601 )?;
602
603 info!("created epoch store");
604
605 replay_log!(
606 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
607 epoch_store.epoch(),
608 epoch_store.protocol_config()
609 );
610
611 if is_genesis {
613 info!("checking SUI conservation at genesis");
614 cache_traits
619 .reconfig_api
620 .expensive_check_sui_conservation(&epoch_store)
621 .expect("SUI conservation check cannot fail at genesis");
622 }
623
624 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
625 let default_buffer_stake = epoch_store
626 .protocol_config()
627 .buffer_stake_for_protocol_upgrade_bps();
628 if effective_buffer_stake != default_buffer_stake {
629 warn!(
630 ?effective_buffer_stake,
631 ?default_buffer_stake,
632 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
633 );
634 }
635
636 checkpoint_store.insert_genesis_checkpoint(
637 genesis.checkpoint(),
638 genesis.checkpoint_contents().clone(),
639 &epoch_store,
640 );
641
642 info!("creating state sync store");
643 let state_sync_store = RocksDbStore::new(
644 cache_traits.clone(),
645 committee_store.clone(),
646 checkpoint_store.clone(),
647 );
648
649 let index_store = if is_full_node && config.enable_index_processing {
650 info!("creating jsonrpc index store");
651 Some(Arc::new(IndexStore::new(
652 config.db_path().join("indexes"),
653 &prometheus_registry,
654 epoch_store
655 .protocol_config()
656 .max_move_identifier_len_as_option(),
657 config.remove_deprecated_tables,
658 )))
659 } else {
660 None
661 };
662
663 let rpc_index = if is_full_node && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
664 info!("creating rpc index store");
665 Some(Arc::new(
666 RpcIndexStore::new(
667 &config.db_path(),
668 &store,
669 &checkpoint_store,
670 &epoch_store,
671 &cache_traits.backing_package_store,
672 pruner_watermarks.checkpoint_id.clone(),
673 config.rpc().cloned().unwrap_or_default(),
674 )
675 .await,
676 ))
677 } else {
678 None
679 };
680
681 let chain_identifier = epoch_store.get_chain_identifier();
682
683 info!("creating archive reader");
684 let (randomness_tx, randomness_rx) = mpsc::channel(
686 config
687 .p2p_config
688 .randomness
689 .clone()
690 .unwrap_or_default()
691 .mailbox_capacity(),
692 );
693 let P2pComponents {
694 p2p_network,
695 known_peers,
696 discovery_handle,
697 state_sync_handle,
698 randomness_handle,
699 endpoint_manager,
700 } = Self::create_p2p_network(
701 &config,
702 state_sync_store.clone(),
703 chain_identifier,
704 randomness_tx,
705 &prometheus_registry,
706 )?;
707
708 for peer in &config.p2p_config.peer_address_overrides {
710 endpoint_manager
711 .update_endpoint(
712 EndpointId::P2p(peer.peer_id),
713 AddressSource::Config,
714 peer.addresses.clone(),
715 )
716 .expect("Updating peer address overrides should not fail");
717 }
718
719 update_peer_addresses(&config, &endpoint_manager, epoch_store.epoch_start_state());
721
722 info!("start snapshot upload");
723 let state_snapshot_handle = Self::start_state_snapshot(
725 &config,
726 &prometheus_registry,
727 checkpoint_store.clone(),
728 chain_identifier,
729 )?;
730
731 info!("start db checkpoint");
733 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
734 &config,
735 &prometheus_registry,
736 state_snapshot_handle.is_some(),
737 )?;
738
739 if !epoch_store
740 .protocol_config()
741 .simplified_unwrap_then_delete()
742 {
743 config
745 .authority_store_pruning_config
746 .set_killswitch_tombstone_pruning(true);
747 }
748
749 let authority_name = config.protocol_public_key();
750
751 info!("create authority state");
752 let state = AuthorityState::new(
753 authority_name,
754 secret,
755 config.supported_protocol_versions.unwrap(),
756 store.clone(),
757 cache_traits.clone(),
758 epoch_store.clone(),
759 committee_store.clone(),
760 index_store.clone(),
761 rpc_index,
762 checkpoint_store.clone(),
763 &prometheus_registry,
764 genesis.objects(),
765 &db_checkpoint_config,
766 config.clone(),
767 chain_identifier,
768 config.policy_config.clone(),
769 config.firewall_config.clone(),
770 pruner_watermarks,
771 )
772 .await;
773 if epoch_store.epoch() == 0 {
775 let txn = &genesis.transaction();
776 let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
777 let transaction =
778 sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
779 sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
780 genesis.transaction().data().clone(),
781 sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
782 ),
783 );
784 state
785 .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
786 .instrument(span)
787 .await
788 .unwrap();
789 }
790
791 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
793
794 if config
795 .expensive_safety_check_config
796 .enable_secondary_index_checks()
797 && let Some(indexes) = state.indexes.clone()
798 {
799 sui_core::verify_indexes::verify_indexes(
800 state.get_global_state_hash_store().as_ref(),
801 indexes,
802 )
803 .expect("secondary indexes are inconsistent");
804 }
805
806 let (end_of_epoch_channel, end_of_epoch_receiver) =
807 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
808
809 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
810 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
811 auth_agg.load_full(),
812 state.clone(),
813 end_of_epoch_receiver,
814 &config.db_path(),
815 &prometheus_registry,
816 &config,
817 )))
818 } else {
819 None
820 };
821
822 let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
823 state.clone(),
824 state_sync_store,
825 &transaction_orchestrator.clone(),
826 &config,
827 &prometheus_registry,
828 server_version,
829 )
830 .await?;
831
832 let global_state_hasher = Arc::new(GlobalStateHasher::new(
833 cache_traits.global_state_hash_store.clone(),
834 GlobalStateHashMetrics::new(&prometheus_registry),
835 ));
836
837 let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
838 "sui",
839 ®istry_service.default_registry(),
840 );
841
842 let connection_monitor_handle =
843 mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
844 p2p_network.downgrade(),
845 Arc::new(network_connection_metrics),
846 known_peers,
847 );
848
849 let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry()));
850
851 sui_node_metrics
852 .binary_max_protocol_version
853 .set(ProtocolVersion::MAX.as_u64() as i64);
854 sui_node_metrics
855 .configured_max_protocol_version
856 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
857
858 let validator_components = if state.is_validator(&epoch_store) {
859 let mut components = Self::construct_validator_components(
860 config.clone(),
861 state.clone(),
862 committee,
863 epoch_store.clone(),
864 checkpoint_store.clone(),
865 state_sync_handle.clone(),
866 randomness_handle.clone(),
867 Arc::downgrade(&global_state_hasher),
868 backpressure_manager.clone(),
869 ®istry_service,
870 sui_node_metrics.clone(),
871 checkpoint_metrics.clone(),
872 )
873 .await?;
874
875 components
876 .consensus_adapter
877 .recover_end_of_publish(&epoch_store);
878
879 components.validator_server_handle = components.validator_server_handle.start().await;
881
882 endpoint_manager.set_consensus_address_updater(components.consensus_manager.clone());
884
885 Some(components)
886 } else {
887 None
888 };
889
890 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
892
893 let node = Self {
894 config,
895 validator_components: Mutex::new(validator_components),
896 http_servers,
897 state,
898 transaction_orchestrator,
899 registry_service,
900 metrics: sui_node_metrics,
901 checkpoint_metrics,
902
903 _discovery: discovery_handle,
904 _connection_monitor_handle: connection_monitor_handle,
905 state_sync_handle,
906 randomness_handle,
907 checkpoint_store,
908 global_state_hasher: Mutex::new(Some(global_state_hasher)),
909 end_of_epoch_channel,
910 endpoint_manager,
911 backpressure_manager,
912
913 _db_checkpoint_handle: db_checkpoint_handle,
914
915 #[cfg(msim)]
916 sim_state: Default::default(),
917
918 _state_snapshot_uploader_handle: state_snapshot_handle,
919 shutdown_channel_tx: shutdown_channel,
920
921 auth_agg,
922 subscription_service_checkpoint_sender,
923 };
924
925 info!("SuiNode started!");
926 let node = Arc::new(node);
927 let node_copy = node.clone();
928 spawn_monitored_task!(async move {
929 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
930 if let Err(error) = result {
931 warn!("Reconfiguration finished with error {:?}", error);
932 }
933 });
934
935 Ok(node)
936 }
937
938 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
939 self.end_of_epoch_channel.subscribe()
940 }
941
942 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
943 self.shutdown_channel_tx.subscribe()
944 }
945
946 pub fn current_epoch_for_testing(&self) -> EpochId {
947 self.state.current_epoch_for_testing()
948 }
949
950 pub fn db_checkpoint_path(&self) -> PathBuf {
951 self.config.db_checkpoint_path()
952 }
953
954 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
956 info!("close_epoch (current epoch = {})", epoch_store.epoch());
957 self.validator_components
958 .lock()
959 .await
960 .as_ref()
961 .ok_or_else(|| SuiError::from("Node is not a validator"))?
962 .consensus_adapter
963 .close_epoch(epoch_store);
964 Ok(())
965 }
966
967 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
968 self.state
969 .clear_override_protocol_upgrade_buffer_stake(epoch)
970 }
971
972 pub fn set_override_protocol_upgrade_buffer_stake(
973 &self,
974 epoch: EpochId,
975 buffer_stake_bps: u64,
976 ) -> SuiResult {
977 self.state
978 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
979 }
980
981 pub async fn close_epoch_for_testing(&self) -> SuiResult {
984 let epoch_store = self.state.epoch_store_for_testing();
985 self.close_epoch(&epoch_store).await
986 }
987
988 fn start_state_snapshot(
989 config: &NodeConfig,
990 prometheus_registry: &Registry,
991 checkpoint_store: Arc<CheckpointStore>,
992 chain_identifier: ChainIdentifier,
993 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
994 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
995 let snapshot_uploader = StateSnapshotUploader::new(
996 &config.db_checkpoint_path(),
997 &config.snapshot_path(),
998 remote_store_config.clone(),
999 60,
1000 prometheus_registry,
1001 checkpoint_store,
1002 chain_identifier,
1003 config.state_snapshot_write_config.archive_interval_epochs,
1004 )?;
1005 Ok(Some(snapshot_uploader.start()))
1006 } else {
1007 Ok(None)
1008 }
1009 }
1010
1011 fn start_db_checkpoint(
1012 config: &NodeConfig,
1013 prometheus_registry: &Registry,
1014 state_snapshot_enabled: bool,
1015 ) -> Result<(
1016 DBCheckpointConfig,
1017 Option<tokio::sync::broadcast::Sender<()>>,
1018 )> {
1019 let checkpoint_path = Some(
1020 config
1021 .db_checkpoint_config
1022 .checkpoint_path
1023 .clone()
1024 .unwrap_or_else(|| config.db_checkpoint_path()),
1025 );
1026 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1027 DBCheckpointConfig {
1028 checkpoint_path,
1029 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1030 true
1031 } else {
1032 config
1033 .db_checkpoint_config
1034 .perform_db_checkpoints_at_epoch_end
1035 },
1036 ..config.db_checkpoint_config.clone()
1037 }
1038 } else {
1039 config.db_checkpoint_config.clone()
1040 };
1041
1042 match (
1043 db_checkpoint_config.object_store_config.as_ref(),
1044 state_snapshot_enabled,
1045 ) {
1046 (None, false) => Ok((db_checkpoint_config, None)),
1051 (_, _) => {
1052 let handler = DBCheckpointHandler::new(
1053 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1054 db_checkpoint_config.object_store_config.as_ref(),
1055 60,
1056 db_checkpoint_config
1057 .prune_and_compact_before_upload
1058 .unwrap_or(true),
1059 config.authority_store_pruning_config.clone(),
1060 prometheus_registry,
1061 state_snapshot_enabled,
1062 )?;
1063 Ok((
1064 db_checkpoint_config,
1065 Some(DBCheckpointHandler::start(handler)),
1066 ))
1067 }
1068 }
1069 }
1070
1071 fn create_p2p_network(
1072 config: &NodeConfig,
1073 state_sync_store: RocksDbStore,
1074 chain_identifier: ChainIdentifier,
1075 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1076 prometheus_registry: &Registry,
1077 ) -> Result<P2pComponents> {
1078 let mut p2p_config = config.p2p_config.clone();
1079 {
1080 let disc = p2p_config.discovery.get_or_insert_with(Default::default);
1081 if disc.peer_addr_store_path.is_none() {
1082 disc.peer_addr_store_path =
1083 Some(config.db_path().join("discovery_peer_cache.yaml"));
1084 }
1085 }
1086 let mut discovery_builder = discovery::Builder::new().config(p2p_config.clone());
1087 if let Some(consensus_config) = &config.consensus_config {
1088 let effective_addr = consensus_config
1089 .external_address
1090 .as_ref()
1091 .or(consensus_config.listen_address.as_ref());
1092 if let Some(addr) = effective_addr {
1093 discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1094 }
1095 }
1096 let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1097 let discovery_sender = discovery.sender();
1098
1099 let (state_sync, state_sync_router) = state_sync::Builder::new()
1100 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1101 .store(state_sync_store)
1102 .archive_config(config.archive_reader_config())
1103 .discovery_sender(discovery_sender)
1104 .with_metrics(prometheus_registry)
1105 .build();
1106
1107 let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1108 let known_peers: HashMap<PeerId, String> = discovery_config
1109 .allowlisted_peers
1110 .clone()
1111 .into_iter()
1112 .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1113 .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1114 peer.peer_id
1115 .map(|peer_id| (peer_id, "seed_peer".to_string()))
1116 }))
1117 .collect();
1118
1119 let (randomness, randomness_router) =
1120 randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1121 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1122 .with_metrics(prometheus_registry)
1123 .build();
1124
1125 let p2p_network = {
1126 let routes = anemo::Router::new()
1127 .add_rpc_service(discovery_server)
1128 .merge(state_sync_router);
1129 let routes = routes.merge(randomness_router);
1130
1131 let inbound_network_metrics =
1132 mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1133 let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1134 "sui",
1135 "outbound",
1136 prometheus_registry,
1137 );
1138
1139 let service = ServiceBuilder::new()
1140 .layer(
1141 TraceLayer::new_for_server_errors()
1142 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1143 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1144 )
1145 .layer(CallbackLayer::new(
1146 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1147 Arc::new(inbound_network_metrics),
1148 config.p2p_config.excessive_message_size(),
1149 ),
1150 ))
1151 .service(routes);
1152
1153 let outbound_layer = ServiceBuilder::new()
1154 .layer(
1155 TraceLayer::new_for_client_and_server_errors()
1156 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1157 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1158 )
1159 .layer(CallbackLayer::new(
1160 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1161 Arc::new(outbound_network_metrics),
1162 config.p2p_config.excessive_message_size(),
1163 ),
1164 ))
1165 .into_inner();
1166
1167 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1168 anemo_config.max_request_frame_size = Some(1 << 20);
1171 anemo_config.max_response_frame_size = Some(128 << 20);
1174
1175 let mut quic_config = anemo_config.quic.unwrap_or_default();
1178 if quic_config.socket_send_buffer_size.is_none() {
1179 quic_config.socket_send_buffer_size = Some(20 << 20);
1180 }
1181 if quic_config.socket_receive_buffer_size.is_none() {
1182 quic_config.socket_receive_buffer_size = Some(20 << 20);
1183 }
1184 quic_config.allow_failed_socket_buffer_size_setting = true;
1185
1186 if quic_config.max_concurrent_bidi_streams.is_none() {
1189 quic_config.max_concurrent_bidi_streams = Some(500);
1190 }
1191 if quic_config.max_concurrent_uni_streams.is_none() {
1192 quic_config.max_concurrent_uni_streams = Some(500);
1193 }
1194 if quic_config.stream_receive_window.is_none() {
1195 quic_config.stream_receive_window = Some(100 << 20);
1196 }
1197 if quic_config.receive_window.is_none() {
1198 quic_config.receive_window = Some(200 << 20);
1199 }
1200 if quic_config.send_window.is_none() {
1201 quic_config.send_window = Some(200 << 20);
1202 }
1203 if quic_config.crypto_buffer_size.is_none() {
1204 quic_config.crypto_buffer_size = Some(1 << 20);
1205 }
1206 if quic_config.max_idle_timeout_ms.is_none() {
1207 quic_config.max_idle_timeout_ms = Some(10_000);
1208 }
1209 if quic_config.keep_alive_interval_ms.is_none() {
1210 quic_config.keep_alive_interval_ms = Some(5_000);
1211 }
1212 anemo_config.quic = Some(quic_config);
1213
1214 let server_name = format!("sui-{}", chain_identifier);
1215 let network = Network::bind(config.p2p_config.listen_address)
1216 .server_name(&server_name)
1217 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1218 .config(anemo_config)
1219 .outbound_request_layer(outbound_layer)
1220 .start(service)?;
1221 info!(
1222 server_name = server_name,
1223 "P2p network started on {}",
1224 network.local_addr()
1225 );
1226
1227 network
1228 };
1229
1230 let discovery_handle =
1231 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1232 let state_sync_handle = state_sync.start(p2p_network.clone());
1233 let randomness_handle = randomness.start(p2p_network.clone());
1234
1235 Ok(P2pComponents {
1236 p2p_network,
1237 known_peers,
1238 discovery_handle,
1239 state_sync_handle,
1240 randomness_handle,
1241 endpoint_manager,
1242 })
1243 }
1244
1245 async fn construct_validator_components(
1246 config: NodeConfig,
1247 state: Arc<AuthorityState>,
1248 committee: Arc<Committee>,
1249 epoch_store: Arc<AuthorityPerEpochStore>,
1250 checkpoint_store: Arc<CheckpointStore>,
1251 state_sync_handle: state_sync::Handle,
1252 randomness_handle: randomness::Handle,
1253 global_state_hasher: Weak<GlobalStateHasher>,
1254 backpressure_manager: Arc<BackpressureManager>,
1255 registry_service: &RegistryService,
1256 sui_node_metrics: Arc<SuiNodeMetrics>,
1257 checkpoint_metrics: Arc<CheckpointMetrics>,
1258 ) -> Result<ValidatorComponents> {
1259 let mut config_clone = config.clone();
1260 let consensus_config = config_clone
1261 .consensus_config
1262 .as_mut()
1263 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1264
1265 let client = Arc::new(UpdatableConsensusClient::new());
1266 let inflight_slot_freed_notify = Arc::new(tokio::sync::Notify::new());
1267 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1268 &committee,
1269 consensus_config,
1270 state.name,
1271 ®istry_service.default_registry(),
1272 client.clone(),
1273 checkpoint_store.clone(),
1274 inflight_slot_freed_notify.clone(),
1275 ));
1276 let consensus_manager = Arc::new(ConsensusManager::new(
1277 &config,
1278 consensus_config,
1279 registry_service,
1280 client,
1281 ));
1282
1283 let consensus_store_pruner = ConsensusStorePruner::new(
1285 consensus_manager.get_storage_base_path(),
1286 consensus_config.db_retention_epochs(),
1287 consensus_config.db_pruner_period(),
1288 ®istry_service.default_registry(),
1289 );
1290
1291 let sui_tx_validator_metrics =
1292 SuiTxValidatorMetrics::new(®istry_service.default_registry());
1293
1294 let (validator_server_handle, admission_queue) = Self::start_grpc_validator_service(
1295 &config,
1296 state.clone(),
1297 consensus_adapter.clone(),
1298 epoch_store.clone(),
1299 ®istry_service.default_registry(),
1300 inflight_slot_freed_notify,
1301 )
1302 .await?;
1303
1304 let validator_overload_monitor_handle = if config
1307 .authority_overload_config
1308 .max_load_shedding_percentage
1309 > 0
1310 {
1311 let authority_state = Arc::downgrade(&state);
1312 let overload_config = config.authority_overload_config.clone();
1313 fail_point!("starting_overload_monitor");
1314 Some(spawn_monitored_task!(overload_monitor(
1315 authority_state,
1316 overload_config,
1317 )))
1318 } else {
1319 None
1320 };
1321
1322 Self::start_epoch_specific_validator_components(
1323 &config,
1324 state.clone(),
1325 consensus_adapter,
1326 checkpoint_store,
1327 epoch_store,
1328 state_sync_handle,
1329 randomness_handle,
1330 consensus_manager,
1331 consensus_store_pruner,
1332 global_state_hasher,
1333 backpressure_manager,
1334 validator_server_handle,
1335 validator_overload_monitor_handle,
1336 checkpoint_metrics,
1337 sui_node_metrics,
1338 sui_tx_validator_metrics,
1339 admission_queue,
1340 )
1341 .await
1342 }
1343
1344 async fn start_epoch_specific_validator_components(
1345 config: &NodeConfig,
1346 state: Arc<AuthorityState>,
1347 consensus_adapter: Arc<ConsensusAdapter>,
1348 checkpoint_store: Arc<CheckpointStore>,
1349 epoch_store: Arc<AuthorityPerEpochStore>,
1350 state_sync_handle: state_sync::Handle,
1351 randomness_handle: randomness::Handle,
1352 consensus_manager: Arc<ConsensusManager>,
1353 consensus_store_pruner: ConsensusStorePruner,
1354 state_hasher: Weak<GlobalStateHasher>,
1355 backpressure_manager: Arc<BackpressureManager>,
1356 validator_server_handle: SpawnOnce,
1357 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1358 checkpoint_metrics: Arc<CheckpointMetrics>,
1359 sui_node_metrics: Arc<SuiNodeMetrics>,
1360 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1361 admission_queue: Option<AdmissionQueueContext>,
1362 ) -> Result<ValidatorComponents> {
1363 let checkpoint_service = Self::build_checkpoint_service(
1364 config,
1365 consensus_adapter.clone(),
1366 checkpoint_store.clone(),
1367 epoch_store.clone(),
1368 state.clone(),
1369 state_sync_handle,
1370 state_hasher,
1371 checkpoint_metrics.clone(),
1372 );
1373
1374 if epoch_store.randomness_state_enabled() {
1375 let randomness_manager = RandomnessManager::try_new(
1376 Arc::downgrade(&epoch_store),
1377 Box::new(consensus_adapter.clone()),
1378 randomness_handle,
1379 config.protocol_key_pair(),
1380 )
1381 .await;
1382 if let Some(randomness_manager) = randomness_manager {
1383 epoch_store
1384 .set_randomness_manager(randomness_manager)
1385 .await?;
1386 }
1387 }
1388
1389 ExecutionTimeObserver::spawn(
1390 epoch_store.clone(),
1391 Box::new(consensus_adapter.clone()),
1392 config
1393 .execution_time_observer_config
1394 .clone()
1395 .unwrap_or_default(),
1396 );
1397
1398 let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1399 None,
1400 state.metrics.clone(),
1401 ));
1402
1403 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1404 state.clone(),
1405 checkpoint_service.clone(),
1406 epoch_store.clone(),
1407 consensus_adapter.clone(),
1408 throughput_calculator,
1409 backpressure_manager,
1410 config.congestion_log.clone(),
1411 );
1412
1413 info!("Starting consensus manager asynchronously");
1414
1415 tokio::spawn({
1417 let config = config.clone();
1418 let epoch_store = epoch_store.clone();
1419 let sui_tx_validator = SuiTxValidator::new(
1420 state.clone(),
1421 epoch_store.clone(),
1422 checkpoint_service.clone(),
1423 sui_tx_validator_metrics.clone(),
1424 );
1425 let consensus_manager = consensus_manager.clone();
1426 async move {
1427 consensus_manager
1428 .start(
1429 &config,
1430 epoch_store,
1431 consensus_handler_initializer,
1432 sui_tx_validator,
1433 )
1434 .await;
1435 }
1436 });
1437 let replay_waiter = consensus_manager.replay_waiter();
1438
1439 info!("Spawning checkpoint service");
1440 let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1441 None
1442 } else {
1443 Some(replay_waiter)
1444 };
1445 checkpoint_service
1446 .spawn(epoch_store.clone(), replay_waiter)
1447 .await;
1448
1449 if epoch_store.authenticator_state_enabled() {
1450 Self::start_jwk_updater(
1451 config,
1452 sui_node_metrics,
1453 state.name,
1454 epoch_store.clone(),
1455 consensus_adapter.clone(),
1456 );
1457 }
1458
1459 if let Some(ctx) = &admission_queue {
1460 ctx.rotate_for_epoch(epoch_store);
1461 }
1462
1463 Ok(ValidatorComponents {
1464 validator_server_handle,
1465 validator_overload_monitor_handle,
1466 consensus_manager,
1467 consensus_store_pruner,
1468 consensus_adapter,
1469 checkpoint_metrics,
1470 sui_tx_validator_metrics,
1471 admission_queue,
1472 })
1473 }
1474
1475 fn build_checkpoint_service(
1476 config: &NodeConfig,
1477 consensus_adapter: Arc<ConsensusAdapter>,
1478 checkpoint_store: Arc<CheckpointStore>,
1479 epoch_store: Arc<AuthorityPerEpochStore>,
1480 state: Arc<AuthorityState>,
1481 state_sync_handle: state_sync::Handle,
1482 state_hasher: Weak<GlobalStateHasher>,
1483 checkpoint_metrics: Arc<CheckpointMetrics>,
1484 ) -> Arc<CheckpointService> {
1485 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1486 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1487
1488 debug!(
1489 "Starting checkpoint service with epoch start timestamp {}
1490 and epoch duration {}",
1491 epoch_start_timestamp_ms, epoch_duration_ms
1492 );
1493
1494 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1495 sender: consensus_adapter,
1496 signer: state.secret.clone(),
1497 authority: config.protocol_public_key(),
1498 next_reconfiguration_timestamp_ms: epoch_store.next_reconfiguration_timestamp_ms(),
1499 metrics: checkpoint_metrics.clone(),
1500 });
1501
1502 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1503 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1504 let max_checkpoint_size_bytes =
1505 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1506
1507 CheckpointService::build(
1508 state.clone(),
1509 checkpoint_store,
1510 epoch_store,
1511 state.get_transaction_cache_reader().clone(),
1512 state_hasher,
1513 checkpoint_output,
1514 Box::new(certified_checkpoint_output),
1515 checkpoint_metrics,
1516 max_tx_per_checkpoint,
1517 max_checkpoint_size_bytes,
1518 )
1519 }
1520
1521 fn construct_consensus_adapter(
1522 committee: &Committee,
1523 consensus_config: &ConsensusConfig,
1524 authority: AuthorityName,
1525 prometheus_registry: &Registry,
1526 consensus_client: Arc<dyn ConsensusClient>,
1527 checkpoint_store: Arc<CheckpointStore>,
1528 inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1529 ) -> ConsensusAdapter {
1530 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1531 ConsensusAdapter::new(
1534 consensus_client,
1535 checkpoint_store,
1536 authority,
1537 consensus_config.max_pending_transactions(),
1538 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1539 ca_metrics,
1540 inflight_slot_freed_notify,
1541 )
1542 }
1543
1544 async fn start_grpc_validator_service(
1545 config: &NodeConfig,
1546 state: Arc<AuthorityState>,
1547 consensus_adapter: Arc<ConsensusAdapter>,
1548 epoch_store: Arc<AuthorityPerEpochStore>,
1549 prometheus_registry: &Registry,
1550 inflight_slot_freed_notify: Arc<tokio::sync::Notify>,
1551 ) -> Result<(SpawnOnce, Option<AdmissionQueueContext>)> {
1552 let overload_config = &config.authority_overload_config;
1553 let admission_queue = overload_config.admission_queue_enabled.then(|| {
1554 let manager = Arc::new(AdmissionQueueManager::new(
1555 consensus_adapter.clone(),
1556 Arc::new(AdmissionQueueMetrics::new(prometheus_registry)),
1557 overload_config.admission_queue_capacity_fraction,
1558 overload_config.admission_queue_bypass_fraction,
1559 overload_config.admission_queue_failover_timeout,
1560 inflight_slot_freed_notify,
1561 ));
1562 AdmissionQueueContext::spawn(manager, epoch_store)
1563 });
1564 let validator_service = ValidatorService::new(
1565 state.clone(),
1566 consensus_adapter,
1567 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1568 config.policy_config.clone().map(|p| p.client_id_source),
1569 admission_queue.clone(),
1570 );
1571
1572 let mut server_conf = mysten_network::config::Config::new();
1573 server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1574 server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1575 server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1576 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1577 server_conf.load_shed = config.grpc_load_shed;
1578 let mut server_builder =
1579 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1580
1581 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1582
1583 let tls_config = sui_tls::create_rustls_server_config(
1584 config.network_key_pair().copy().private(),
1585 SUI_TLS_SERVER_NAME.to_string(),
1586 );
1587
1588 let network_address = config.network_address().clone();
1589
1590 let (ready_tx, ready_rx) = oneshot::channel();
1591
1592 let spawn_once = SpawnOnce::new(ready_rx, async move {
1593 let server = server_builder
1594 .bind(&network_address, Some(tls_config))
1595 .await
1596 .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1597 let local_addr = server.local_addr();
1598 info!("Listening to traffic on {local_addr}");
1599 ready_tx.send(()).unwrap();
1600 if let Err(err) = server.serve().await {
1601 info!("Server stopped: {err}");
1602 }
1603 info!("Server stopped");
1604 });
1605 Ok((spawn_once, admission_queue))
1606 }
1607
1608 pub fn state(&self) -> Arc<AuthorityState> {
1609 self.state.clone()
1610 }
1611
1612 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1614 self.state.reference_gas_price_for_testing()
1615 }
1616
1617 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1618 self.state.committee_store().clone()
1619 }
1620
1621 pub fn clone_authority_aggregator(
1632 &self,
1633 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1634 self.transaction_orchestrator
1635 .as_ref()
1636 .map(|to| to.clone_authority_aggregator())
1637 }
1638
1639 pub fn transaction_orchestrator(
1640 &self,
1641 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1642 self.transaction_orchestrator.clone()
1643 }
1644
1645 pub async fn monitor_reconfiguration(
1648 self: Arc<Self>,
1649 mut epoch_store: Arc<AuthorityPerEpochStore>,
1650 ) -> Result<()> {
1651 let checkpoint_executor_metrics =
1652 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1653
1654 loop {
1655 let mut hasher_guard = self.global_state_hasher.lock().await;
1656 let hasher = hasher_guard.take().unwrap();
1657 info!(
1658 "Creating checkpoint executor for epoch {}",
1659 epoch_store.epoch()
1660 );
1661 let checkpoint_executor = CheckpointExecutor::new(
1662 epoch_store.clone(),
1663 self.checkpoint_store.clone(),
1664 self.state.clone(),
1665 hasher.clone(),
1666 self.backpressure_manager.clone(),
1667 self.config.checkpoint_executor_config.clone(),
1668 checkpoint_executor_metrics.clone(),
1669 self.subscription_service_checkpoint_sender.clone(),
1670 );
1671
1672 let run_with_range = self.config.run_with_range;
1673
1674 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1675
1676 self.metrics
1678 .current_protocol_version
1679 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1680
1681 if let Some(components) = &*self.validator_components.lock().await {
1683 tokio::time::sleep(Duration::from_millis(1)).await;
1685
1686 let config = cur_epoch_store.protocol_config();
1687 let mut supported_protocol_versions = self
1688 .config
1689 .supported_protocol_versions
1690 .expect("Supported versions should be populated")
1691 .truncate_below(config.version);
1693
1694 while supported_protocol_versions.max > config.version {
1695 let proposed_protocol_config = ProtocolConfig::get_for_version(
1696 supported_protocol_versions.max,
1697 cur_epoch_store.get_chain(),
1698 );
1699
1700 if proposed_protocol_config.enable_accumulators()
1701 && !epoch_store.accumulator_root_exists()
1702 {
1703 error!(
1704 "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1705 supported_protocol_versions.max
1706 );
1707 supported_protocol_versions.max = supported_protocol_versions.max.prev();
1708 } else {
1709 break;
1710 }
1711 }
1712
1713 let binary_config = config.binary_config(None);
1714 let transaction = ConsensusTransaction::new_capability_notification_v2(
1715 AuthorityCapabilitiesV2::new(
1716 self.state.name,
1717 cur_epoch_store.get_chain_identifier().chain(),
1718 supported_protocol_versions,
1719 self.state
1720 .get_available_system_packages(&binary_config)
1721 .await,
1722 ),
1723 );
1724 info!(?transaction, "submitting capabilities to consensus");
1725 components.consensus_adapter.submit(
1726 transaction,
1727 None,
1728 &cur_epoch_store,
1729 None,
1730 None,
1731 )?;
1732 }
1733
1734 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1735
1736 if stop_condition == StopReason::RunWithRangeCondition {
1737 SuiNode::shutdown(&self).await;
1738 self.shutdown_channel_tx
1739 .send(run_with_range)
1740 .expect("RunWithRangeCondition met but failed to send shutdown message");
1741 return Ok(());
1742 }
1743
1744 let latest_system_state = self
1746 .state
1747 .get_object_cache_reader()
1748 .get_sui_system_state_object_unsafe()
1749 .expect("Read Sui System State object cannot fail");
1750
1751 #[cfg(msim)]
1752 if !self
1753 .sim_state
1754 .sim_safe_mode_expected
1755 .load(Ordering::Relaxed)
1756 {
1757 debug_assert!(!latest_system_state.safe_mode());
1758 }
1759
1760 #[cfg(not(msim))]
1761 debug_assert!(!latest_system_state.safe_mode());
1762
1763 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1764 && self.state.is_fullnode(&cur_epoch_store)
1765 {
1766 warn!(
1767 "Failed to send end of epoch notification to subscriber: {:?}",
1768 err
1769 );
1770 }
1771
1772 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1773 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1774
1775 self.auth_agg.store(Arc::new(
1776 self.auth_agg
1777 .load()
1778 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1779 ));
1780
1781 let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1782 let next_epoch = next_epoch_committee.epoch();
1783 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1784
1785 info!(
1786 next_epoch,
1787 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1788 );
1789
1790 fail_point_async!("reconfig_delay");
1791
1792 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1793
1794 update_peer_addresses(&self.config, &self.endpoint_manager, &new_epoch_start_state);
1795
1796 let mut validator_components_lock_guard = self.validator_components.lock().await;
1797
1798 let new_epoch_store = self
1802 .reconfigure_state(
1803 &self.state,
1804 &cur_epoch_store,
1805 next_epoch_committee.clone(),
1806 new_epoch_start_state,
1807 hasher.clone(),
1808 )
1809 .await;
1810
1811 let new_validator_components = if let Some(ValidatorComponents {
1812 validator_server_handle,
1813 validator_overload_monitor_handle,
1814 consensus_manager,
1815 consensus_store_pruner,
1816 consensus_adapter,
1817 checkpoint_metrics,
1818 sui_tx_validator_metrics,
1819 admission_queue,
1820 }) = validator_components_lock_guard.take()
1821 {
1822 info!("Reconfiguring the validator.");
1823
1824 consensus_manager.shutdown().await;
1825 info!("Consensus has shut down.");
1826
1827 info!("Epoch store finished reconfiguration.");
1828
1829 let global_state_hasher_metrics = Arc::into_inner(hasher)
1832 .expect("Object state hasher should have no other references at this point")
1833 .metrics();
1834 let new_hasher = Arc::new(GlobalStateHasher::new(
1835 self.state.get_global_state_hash_store().clone(),
1836 global_state_hasher_metrics,
1837 ));
1838 let weak_hasher = Arc::downgrade(&new_hasher);
1839 *hasher_guard = Some(new_hasher);
1840
1841 consensus_store_pruner.prune(next_epoch).await;
1842
1843 if self.state.is_validator(&new_epoch_store) {
1844 Some(
1846 Self::start_epoch_specific_validator_components(
1847 &self.config,
1848 self.state.clone(),
1849 consensus_adapter,
1850 self.checkpoint_store.clone(),
1851 new_epoch_store.clone(),
1852 self.state_sync_handle.clone(),
1853 self.randomness_handle.clone(),
1854 consensus_manager,
1855 consensus_store_pruner,
1856 weak_hasher,
1857 self.backpressure_manager.clone(),
1858 validator_server_handle,
1859 validator_overload_monitor_handle,
1860 checkpoint_metrics,
1861 self.metrics.clone(),
1862 sui_tx_validator_metrics,
1863 admission_queue,
1864 )
1865 .await?,
1866 )
1867 } else {
1868 info!("This node is no longer a validator after reconfiguration");
1869 None
1870 }
1871 } else {
1872 let global_state_hasher_metrics = Arc::into_inner(hasher)
1875 .expect("Object state hasher should have no other references at this point")
1876 .metrics();
1877 let new_hasher = Arc::new(GlobalStateHasher::new(
1878 self.state.get_global_state_hash_store().clone(),
1879 global_state_hasher_metrics,
1880 ));
1881 let weak_hasher = Arc::downgrade(&new_hasher);
1882 *hasher_guard = Some(new_hasher);
1883
1884 if self.state.is_validator(&new_epoch_store) {
1885 info!("Promoting the node from fullnode to validator, starting grpc server");
1886
1887 let mut components = Self::construct_validator_components(
1888 self.config.clone(),
1889 self.state.clone(),
1890 Arc::new(next_epoch_committee.clone()),
1891 new_epoch_store.clone(),
1892 self.checkpoint_store.clone(),
1893 self.state_sync_handle.clone(),
1894 self.randomness_handle.clone(),
1895 weak_hasher,
1896 self.backpressure_manager.clone(),
1897 &self.registry_service,
1898 self.metrics.clone(),
1899 self.checkpoint_metrics.clone(),
1900 )
1901 .await?;
1902
1903 components.validator_server_handle =
1904 components.validator_server_handle.start().await;
1905
1906 self.endpoint_manager
1908 .set_consensus_address_updater(components.consensus_manager.clone());
1909
1910 Some(components)
1911 } else {
1912 None
1913 }
1914 };
1915 *validator_components_lock_guard = new_validator_components;
1916
1917 cur_epoch_store.release_db_handles();
1920
1921 if cfg!(msim)
1922 && !matches!(
1923 self.config
1924 .authority_store_pruning_config
1925 .num_epochs_to_retain_for_checkpoints(),
1926 None | Some(u64::MAX) | Some(0)
1927 )
1928 {
1929 self.state
1930 .prune_checkpoints_for_eligible_epochs_for_testing(
1931 self.config.clone(),
1932 sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1933 )
1934 .await?;
1935 }
1936
1937 epoch_store = new_epoch_store;
1938 info!("Reconfiguration finished");
1939 }
1940 }
1941
1942 async fn shutdown(&self) {
1943 if let Some(validator_components) = &*self.validator_components.lock().await {
1944 validator_components.consensus_manager.shutdown().await;
1945 }
1946 }
1947
1948 async fn reconfigure_state(
1949 &self,
1950 state: &Arc<AuthorityState>,
1951 cur_epoch_store: &AuthorityPerEpochStore,
1952 next_epoch_committee: Committee,
1953 next_epoch_start_system_state: EpochStartSystemState,
1954 global_state_hasher: Arc<GlobalStateHasher>,
1955 ) -> Arc<AuthorityPerEpochStore> {
1956 let next_epoch = next_epoch_committee.epoch();
1957
1958 let last_checkpoint = self
1959 .checkpoint_store
1960 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1961 .expect("Error loading last checkpoint for current epoch")
1962 .expect("Could not load last checkpoint for current epoch");
1963
1964 let last_checkpoint_seq = *last_checkpoint.sequence_number();
1965
1966 assert_eq!(
1967 Some(last_checkpoint_seq),
1968 self.checkpoint_store
1969 .get_highest_executed_checkpoint_seq_number()
1970 .expect("Error loading highest executed checkpoint sequence number")
1971 );
1972
1973 let epoch_start_configuration = EpochStartConfiguration::new(
1974 next_epoch_start_system_state,
1975 *last_checkpoint.digest(),
1976 state.get_object_store().as_ref(),
1977 EpochFlag::default_flags_for_new_epoch(&state.config),
1978 )
1979 .expect("EpochStartConfiguration construction cannot fail");
1980
1981 let new_epoch_store = self
1982 .state
1983 .reconfigure(
1984 cur_epoch_store,
1985 self.config.supported_protocol_versions.unwrap(),
1986 next_epoch_committee,
1987 epoch_start_configuration,
1988 global_state_hasher,
1989 &self.config.expensive_safety_check_config,
1990 last_checkpoint_seq,
1991 )
1992 .await
1993 .expect("Reconfigure authority state cannot fail");
1994 info!(next_epoch, "Node State has been reconfigured");
1995 assert_eq!(next_epoch, new_epoch_store.epoch());
1996 self.state.get_reconfig_api().update_epoch_flags_metrics(
1997 cur_epoch_store.epoch_start_config().flags(),
1998 new_epoch_store.epoch_start_config().flags(),
1999 );
2000
2001 new_epoch_store
2002 }
2003
2004 pub fn get_config(&self) -> &NodeConfig {
2005 &self.config
2006 }
2007
2008 pub fn randomness_handle(&self) -> randomness::Handle {
2009 self.randomness_handle.clone()
2010 }
2011
2012 pub fn state_sync_handle(&self) -> state_sync::Handle {
2013 self.state_sync_handle.clone()
2014 }
2015
2016 pub fn endpoint_manager(&self) -> &EndpointManager {
2017 &self.endpoint_manager
2018 }
2019
2020 fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2022 let digest_str = digest.to_string();
2023 if digest_str.len() >= 8 {
2024 digest_str[0..8].to_string()
2025 } else {
2026 digest_str
2027 }
2028 }
2029
2030 async fn check_and_recover_forks(
2034 checkpoint_store: &CheckpointStore,
2035 checkpoint_metrics: &CheckpointMetrics,
2036 is_validator: bool,
2037 fork_recovery: Option<&ForkRecoveryConfig>,
2038 ) -> Result<()> {
2039 if !is_validator {
2042 return Ok(());
2043 }
2044
2045 if let Some(recovery) = fork_recovery {
2047 Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2048 Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2049 }
2050
2051 if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2052 .get_checkpoint_fork_detected()
2053 .map_err(|e| {
2054 error!("Failed to check for checkpoint fork: {:?}", e);
2055 e
2056 })?
2057 {
2058 Self::handle_checkpoint_fork(
2059 checkpoint_seq,
2060 checkpoint_digest,
2061 checkpoint_metrics,
2062 fork_recovery,
2063 )
2064 .await?;
2065 }
2066 if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2067 .get_transaction_fork_detected()
2068 .map_err(|e| {
2069 error!("Failed to check for transaction fork: {:?}", e);
2070 e
2071 })?
2072 {
2073 Self::handle_transaction_fork(
2074 tx_digest,
2075 expected_effects,
2076 actual_effects,
2077 checkpoint_metrics,
2078 fork_recovery,
2079 )
2080 .await?;
2081 }
2082
2083 Ok(())
2084 }
2085
2086 fn try_recover_checkpoint_fork(
2087 checkpoint_store: &CheckpointStore,
2088 recovery: &ForkRecoveryConfig,
2089 ) -> Result<()> {
2090 for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2093 let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2094 anyhow::bail!(
2095 "Invalid checkpoint digest override for seq {}: {}",
2096 seq,
2097 expected_digest_str
2098 );
2099 };
2100
2101 if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2102 let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2103 if local_digest != expected_digest {
2104 info!(
2105 seq,
2106 local = %Self::get_digest_prefix(local_digest),
2107 expected = %Self::get_digest_prefix(expected_digest),
2108 "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2109 seq
2110 );
2111 checkpoint_store
2112 .clear_locally_computed_checkpoints_from(*seq)
2113 .context(
2114 "Failed to clear locally computed checkpoints from override seq",
2115 )?;
2116 }
2117 }
2118 }
2119
2120 if let Some((checkpoint_seq, checkpoint_digest)) =
2121 checkpoint_store.get_checkpoint_fork_detected()?
2122 && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2123 {
2124 info!(
2125 "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2126 checkpoint_seq, checkpoint_digest
2127 );
2128 checkpoint_store
2129 .clear_checkpoint_fork_detected()
2130 .expect("Failed to clear checkpoint fork detected marker");
2131 }
2132 Ok(())
2133 }
2134
2135 fn try_recover_transaction_fork(
2136 checkpoint_store: &CheckpointStore,
2137 recovery: &ForkRecoveryConfig,
2138 ) -> Result<()> {
2139 if recovery.transaction_overrides.is_empty() {
2140 return Ok(());
2141 }
2142
2143 if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2144 && recovery
2145 .transaction_overrides
2146 .contains_key(&tx_digest.to_string())
2147 {
2148 info!(
2149 "Fork recovery enabled: clearing transaction fork for tx {:?}",
2150 tx_digest
2151 );
2152 checkpoint_store
2153 .clear_transaction_fork_detected()
2154 .expect("Failed to clear transaction fork detected marker");
2155 }
2156 Ok(())
2157 }
2158
2159 fn get_current_timestamp() -> u64 {
2160 std::time::SystemTime::now()
2161 .duration_since(std::time::SystemTime::UNIX_EPOCH)
2162 .unwrap()
2163 .as_secs()
2164 }
2165
2166 async fn handle_checkpoint_fork(
2167 checkpoint_seq: u64,
2168 checkpoint_digest: CheckpointDigest,
2169 checkpoint_metrics: &CheckpointMetrics,
2170 fork_recovery: Option<&ForkRecoveryConfig>,
2171 ) -> Result<()> {
2172 checkpoint_metrics
2173 .checkpoint_fork_crash_mode
2174 .with_label_values(&[
2175 &checkpoint_seq.to_string(),
2176 &Self::get_digest_prefix(checkpoint_digest),
2177 &Self::get_current_timestamp().to_string(),
2178 ])
2179 .set(1);
2180
2181 let behavior = fork_recovery
2182 .map(|fr| fr.fork_crash_behavior)
2183 .unwrap_or_default();
2184
2185 match behavior {
2186 ForkCrashBehavior::AwaitForkRecovery => {
2187 error!(
2188 checkpoint_seq = checkpoint_seq,
2189 checkpoint_digest = ?checkpoint_digest,
2190 "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2191 );
2192 futures::future::pending::<()>().await;
2193 unreachable!("pending() should never return");
2194 }
2195 ForkCrashBehavior::ReturnError => {
2196 error!(
2197 checkpoint_seq = checkpoint_seq,
2198 checkpoint_digest = ?checkpoint_digest,
2199 "Checkpoint fork detected! Returning error."
2200 );
2201 Err(anyhow::anyhow!(
2202 "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2203 checkpoint_seq,
2204 checkpoint_digest
2205 ))
2206 }
2207 }
2208 }
2209
2210 async fn handle_transaction_fork(
2211 tx_digest: TransactionDigest,
2212 expected_effects_digest: TransactionEffectsDigest,
2213 actual_effects_digest: TransactionEffectsDigest,
2214 checkpoint_metrics: &CheckpointMetrics,
2215 fork_recovery: Option<&ForkRecoveryConfig>,
2216 ) -> Result<()> {
2217 checkpoint_metrics
2218 .transaction_fork_crash_mode
2219 .with_label_values(&[
2220 &Self::get_digest_prefix(tx_digest),
2221 &Self::get_digest_prefix(expected_effects_digest),
2222 &Self::get_digest_prefix(actual_effects_digest),
2223 &Self::get_current_timestamp().to_string(),
2224 ])
2225 .set(1);
2226
2227 let behavior = fork_recovery
2228 .map(|fr| fr.fork_crash_behavior)
2229 .unwrap_or_default();
2230
2231 match behavior {
2232 ForkCrashBehavior::AwaitForkRecovery => {
2233 error!(
2234 tx_digest = ?tx_digest,
2235 expected_effects_digest = ?expected_effects_digest,
2236 actual_effects_digest = ?actual_effects_digest,
2237 "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2238 );
2239 futures::future::pending::<()>().await;
2240 unreachable!("pending() should never return");
2241 }
2242 ForkCrashBehavior::ReturnError => {
2243 error!(
2244 tx_digest = ?tx_digest,
2245 expected_effects_digest = ?expected_effects_digest,
2246 actual_effects_digest = ?actual_effects_digest,
2247 "Transaction fork detected! Returning error."
2248 );
2249 Err(anyhow::anyhow!(
2250 "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2251 tx_digest,
2252 expected_effects_digest,
2253 actual_effects_digest
2254 ))
2255 }
2256 }
2257 }
2258}
2259
2260#[cfg(not(msim))]
2261impl SuiNode {
2262 async fn fetch_jwks(
2263 _authority: AuthorityName,
2264 provider: &OIDCProvider,
2265 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2266 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2267 use sui_types::error::SuiErrorKind;
2268 let client = reqwest::Client::new();
2269 fetch_jwks(provider, &client, true)
2270 .await
2271 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2272 }
2273}
2274
2275#[cfg(msim)]
2276impl SuiNode {
2277 pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2278 self.sim_state.sim_node.id()
2279 }
2280
2281 pub fn set_safe_mode_expected(&self, new_value: bool) {
2282 info!("Setting safe mode expected to {}", new_value);
2283 self.sim_state
2284 .sim_safe_mode_expected
2285 .store(new_value, Ordering::Relaxed);
2286 }
2287
2288 #[allow(unused_variables)]
2289 async fn fetch_jwks(
2290 authority: AuthorityName,
2291 provider: &OIDCProvider,
2292 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2293 get_jwk_injector()(authority, provider)
2294 }
2295}
2296
2297enum SpawnOnce {
2298 Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2300 #[allow(unused)]
2301 Started(JoinHandle<()>),
2302}
2303
2304impl SpawnOnce {
2305 pub fn new(
2306 ready_rx: oneshot::Receiver<()>,
2307 future: impl Future<Output = ()> + Send + 'static,
2308 ) -> Self {
2309 Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2310 }
2311
2312 pub async fn start(self) -> Self {
2313 match self {
2314 Self::Unstarted(ready_rx, future) => {
2315 let future = future.into_inner();
2316 let handle = tokio::spawn(future);
2317 ready_rx.await.unwrap();
2318 Self::Started(handle)
2319 }
2320 Self::Started(_) => self,
2321 }
2322 }
2323}
2324
2325fn update_peer_addresses(
2327 config: &NodeConfig,
2328 endpoint_manager: &EndpointManager,
2329 epoch_start_state: &EpochStartSystemState,
2330) {
2331 for (peer_id, address) in
2332 epoch_start_state.get_validator_as_p2p_peers(config.protocol_public_key())
2333 {
2334 endpoint_manager
2335 .update_endpoint(
2336 EndpointId::P2p(peer_id),
2337 AddressSource::Chain,
2338 vec![address],
2339 )
2340 .expect("Updating peer addresses should not fail");
2341 }
2342}
2343
2344fn build_kv_store(
2345 state: &Arc<AuthorityState>,
2346 config: &NodeConfig,
2347 registry: &Registry,
2348) -> Result<Arc<TransactionKeyValueStore>> {
2349 let metrics = KeyValueStoreMetrics::new(registry);
2350 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2351
2352 let base_url = &config.transaction_kv_store_read_config.base_url;
2353
2354 if base_url.is_empty() {
2355 info!("no http kv store url provided, using local db only");
2356 return Ok(Arc::new(db_store));
2357 }
2358
2359 let base_url: url::Url = base_url.parse().tap_err(|e| {
2360 error!(
2361 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2362 base_url, e
2363 )
2364 })?;
2365
2366 let network_str = match state.get_chain_identifier().chain() {
2367 Chain::Mainnet => "/mainnet",
2368 _ => {
2369 info!("using local db only for kv store");
2370 return Ok(Arc::new(db_store));
2371 }
2372 };
2373
2374 let base_url = base_url.join(network_str)?.to_string();
2375 let http_store = HttpKVStore::new_kv(
2376 &base_url,
2377 config.transaction_kv_store_read_config.cache_size,
2378 metrics.clone(),
2379 )?;
2380 info!("using local key-value store with fallback to http key-value store");
2381 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2382 db_store,
2383 http_store,
2384 metrics,
2385 "json_rpc_fallback",
2386 )))
2387}
2388
2389async fn build_http_servers(
2390 state: Arc<AuthorityState>,
2391 store: RocksDbStore,
2392 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2393 config: &NodeConfig,
2394 prometheus_registry: &Registry,
2395 server_version: ServerVersion,
2396) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2397 if config.consensus_config().is_some() {
2399 return Ok((HttpServers::default(), None));
2400 }
2401
2402 info!("starting rpc service with config: {:?}", config.rpc);
2403
2404 let mut router = axum::Router::new();
2405
2406 let json_rpc_router = {
2407 let traffic_controller = state.traffic_controller.clone();
2408 let mut server = JsonRpcServerBuilder::new(
2409 env!("CARGO_PKG_VERSION"),
2410 prometheus_registry,
2411 traffic_controller,
2412 config.policy_config.clone(),
2413 );
2414
2415 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2416
2417 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2418 server.register_module(ReadApi::new(
2419 state.clone(),
2420 kv_store.clone(),
2421 metrics.clone(),
2422 ))?;
2423 server.register_module(CoinReadApi::new(
2424 state.clone(),
2425 kv_store.clone(),
2426 metrics.clone(),
2427 ))?;
2428
2429 if config.run_with_range.is_none() {
2432 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2433 }
2434 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2435 server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2436
2437 if let Some(transaction_orchestrator) = transaction_orchestrator {
2438 server.register_module(TransactionExecutionApi::new(
2439 state.clone(),
2440 transaction_orchestrator.clone(),
2441 metrics.clone(),
2442 ))?;
2443 }
2444
2445 let name_service_config =
2446 if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2447 config.name_service_package_address,
2448 config.name_service_registry_id,
2449 config.name_service_reverse_registry_id,
2450 ) {
2451 sui_name_service::NameServiceConfig::new(
2452 package_address,
2453 registry_id,
2454 reverse_registry_id,
2455 )
2456 } else {
2457 match state.get_chain_identifier().chain() {
2458 Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2459 Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2460 Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2461 }
2462 };
2463
2464 server.register_module(IndexerApi::new(
2465 state.clone(),
2466 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2467 kv_store,
2468 name_service_config,
2469 metrics,
2470 config.indexer_max_subscriptions,
2471 ))?;
2472 server.register_module(MoveUtils::new(state.clone()))?;
2473
2474 let server_type = config.jsonrpc_server_type();
2475
2476 server.to_router(server_type).await?
2477 };
2478
2479 router = router.merge(json_rpc_router);
2480
2481 let (subscription_service_checkpoint_sender, subscription_service_handle) =
2482 SubscriptionService::build(prometheus_registry);
2483 let rpc_router = {
2484 let mut rpc_service =
2485 sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2486 rpc_service.with_server_version(server_version);
2487
2488 if let Some(config) = config.rpc.clone() {
2489 rpc_service.with_config(config);
2490 }
2491
2492 rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2493 rpc_service.with_subscription_service(subscription_service_handle);
2494
2495 if let Some(transaction_orchestrator) = transaction_orchestrator {
2496 rpc_service.with_executor(transaction_orchestrator.clone())
2497 }
2498
2499 rpc_service.into_router().await
2500 };
2501
2502 let layers = ServiceBuilder::new()
2503 .map_request(|mut request: axum::http::Request<_>| {
2504 if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2505 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2506 request.extensions_mut().insert(axum_connect_info);
2507 }
2508 request
2509 })
2510 .layer(axum::middleware::from_fn(server_timing_middleware))
2511 .layer(
2513 tower_http::cors::CorsLayer::new()
2514 .allow_methods([http::Method::GET, http::Method::POST])
2515 .allow_origin(tower_http::cors::Any)
2516 .allow_headers(tower_http::cors::Any)
2517 .expose_headers(tower_http::cors::Any),
2518 );
2519
2520 router = router.merge(rpc_router).layer(layers);
2521
2522 let https = if let Some((tls_config, https_address)) = config
2523 .rpc()
2524 .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2525 {
2526 let https = sui_http::Builder::new()
2527 .tls_single_cert(tls_config.cert(), tls_config.key())
2528 .and_then(|builder| builder.serve(https_address, router.clone()))
2529 .map_err(|e| anyhow::anyhow!(e))?;
2530
2531 info!(
2532 https_address =? https.local_addr(),
2533 "HTTPS rpc server listening on {}",
2534 https.local_addr()
2535 );
2536
2537 Some(https)
2538 } else {
2539 None
2540 };
2541
2542 let http = sui_http::Builder::new()
2543 .serve(&config.json_rpc_address, router)
2544 .map_err(|e| anyhow::anyhow!(e))?;
2545
2546 info!(
2547 http_address =? http.local_addr(),
2548 "HTTP rpc server listening on {}",
2549 http.local_addr()
2550 );
2551
2552 Ok((
2553 HttpServers {
2554 http: Some(http),
2555 https,
2556 },
2557 Some(subscription_service_checkpoint_sender),
2558 ))
2559}
2560
2561#[cfg(not(test))]
2562fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2563 protocol_config.max_transactions_per_checkpoint() as usize
2564}
2565
2566#[cfg(test)]
2567fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2568 2
2569}
2570
2571#[derive(Default)]
2572struct HttpServers {
2573 #[allow(unused)]
2574 http: Option<sui_http::ServerHandle>,
2575 #[allow(unused)]
2576 https: Option<sui_http::ServerHandle>,
2577}
2578
2579#[cfg(test)]
2580mod tests {
2581 use super::*;
2582 use prometheus::Registry;
2583 use std::collections::BTreeMap;
2584 use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2585 use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2586 use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2587
2588 #[tokio::test]
2589 async fn test_fork_error_and_recovery_both_paths() {
2590 let checkpoint_store = CheckpointStore::new_for_tests();
2591 let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2592
2593 let seq_num = 42;
2595 let digest = CheckpointDigest::random();
2596 checkpoint_store
2597 .record_checkpoint_fork_detected(seq_num, digest)
2598 .unwrap();
2599
2600 let fork_recovery = ForkRecoveryConfig {
2601 transaction_overrides: Default::default(),
2602 checkpoint_overrides: Default::default(),
2603 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2604 };
2605
2606 let r = SuiNode::check_and_recover_forks(
2607 &checkpoint_store,
2608 &checkpoint_metrics,
2609 true,
2610 Some(&fork_recovery),
2611 )
2612 .await;
2613 assert!(r.is_err());
2614 assert!(
2615 r.unwrap_err()
2616 .to_string()
2617 .contains("Checkpoint fork detected")
2618 );
2619
2620 let mut checkpoint_overrides = BTreeMap::new();
2621 checkpoint_overrides.insert(seq_num, digest.to_string());
2622 let fork_recovery_with_override = ForkRecoveryConfig {
2623 transaction_overrides: Default::default(),
2624 checkpoint_overrides,
2625 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2626 };
2627 let r = SuiNode::check_and_recover_forks(
2628 &checkpoint_store,
2629 &checkpoint_metrics,
2630 true,
2631 Some(&fork_recovery_with_override),
2632 )
2633 .await;
2634 assert!(r.is_ok());
2635 assert!(
2636 checkpoint_store
2637 .get_checkpoint_fork_detected()
2638 .unwrap()
2639 .is_none()
2640 );
2641
2642 let tx_digest = TransactionDigest::random();
2644 let expected_effects = TransactionEffectsDigest::random();
2645 let actual_effects = TransactionEffectsDigest::random();
2646 checkpoint_store
2647 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2648 .unwrap();
2649
2650 let fork_recovery = ForkRecoveryConfig {
2651 transaction_overrides: Default::default(),
2652 checkpoint_overrides: Default::default(),
2653 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2654 };
2655 let r = SuiNode::check_and_recover_forks(
2656 &checkpoint_store,
2657 &checkpoint_metrics,
2658 true,
2659 Some(&fork_recovery),
2660 )
2661 .await;
2662 assert!(r.is_err());
2663 assert!(
2664 r.unwrap_err()
2665 .to_string()
2666 .contains("Transaction fork detected")
2667 );
2668
2669 let mut transaction_overrides = BTreeMap::new();
2670 transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2671 let fork_recovery_with_override = ForkRecoveryConfig {
2672 transaction_overrides,
2673 checkpoint_overrides: Default::default(),
2674 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2675 };
2676 let r = SuiNode::check_and_recover_forks(
2677 &checkpoint_store,
2678 &checkpoint_metrics,
2679 true,
2680 Some(&fork_recovery_with_override),
2681 )
2682 .await;
2683 assert!(r.is_ok());
2684 assert!(
2685 checkpoint_store
2686 .get_transaction_fork_detected()
2687 .unwrap()
2688 .is_none()
2689 );
2690 }
2691}