1use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::authority::ExecutionEnv;
29use sui_core::authority::RandomnessRoundReceiver;
30use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
31use sui_core::authority::backpressure::BackpressureManager;
32use sui_core::authority::epoch_start_configuration::EpochFlag;
33use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
34use sui_core::consensus_adapter::ConsensusClient;
35use sui_core::consensus_manager::UpdatableConsensusClient;
36use sui_core::epoch::randomness::RandomnessManager;
37use sui_core::execution_cache::build_execution_cache;
38use sui_network::endpoint_manager::{AddressSource, EndpointId};
39use sui_network::validator::server::SUI_TLS_SERVER_NAME;
40use sui_types::full_checkpoint_content::Checkpoint;
41
42use sui_core::global_state_hasher::GlobalStateHashMetrics;
43use sui_core::storage::RestReadStore;
44use sui_json_rpc::bridge_api::BridgeReadApi;
45use sui_json_rpc_api::JsonRpcMetrics;
46use sui_network::randomness;
47use sui_rpc_api::RpcMetrics;
48use sui_rpc_api::ServerVersion;
49use sui_rpc_api::subscription::SubscriptionService;
50use sui_types::base_types::ConciseableName;
51use sui_types::crypto::RandomnessRound;
52use sui_types::digests::{
53 ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
54};
55use sui_types::messages_consensus::AuthorityCapabilitiesV2;
56use sui_types::sui_system_state::SuiSystemState;
57use tap::tap::TapFallible;
58use tokio::sync::oneshot;
59use tokio::sync::{Mutex, broadcast, mpsc};
60use tokio::task::JoinHandle;
61use tower::ServiceBuilder;
62use tracing::{Instrument, error_span, info};
63use tracing::{debug, error, warn};
64
65macro_rules! jwk_log {
69 ($($arg:tt)+) => {
70 if in_test_configuration() {
71 debug!($($arg)+);
72 } else {
73 info!($($arg)+);
74 }
75 };
76}
77
78use fastcrypto_zkp::bn254::zk_login::JWK;
79pub use handle::SuiNodeHandle;
80use mysten_metrics::{RegistryService, spawn_monitored_task};
81use mysten_service::server_timing::server_timing_middleware;
82use sui_config::node::{DBCheckpointConfig, RunWithRange};
83use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
84use sui_config::node_config_metrics::NodeConfigMetrics;
85use sui_config::{ConsensusConfig, NodeConfig};
86use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
87use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
88use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
89use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
90use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
91use sui_core::authority_aggregator::AuthorityAggregator;
92use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
93use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
94use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
95use sui_core::checkpoints::{
96 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
97 SubmitCheckpointToConsensus,
98};
99use sui_core::consensus_adapter::{
100 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
101};
102use sui_core::consensus_manager::ConsensusManager;
103use sui_core::consensus_throughput_calculator::{
104 ConsensusThroughputCalculator, ConsensusThroughputProfiler, ThroughputProfileRanges,
105};
106use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
107use sui_core::db_checkpoint_handler::DBCheckpointHandler;
108use sui_core::epoch::committee_store::CommitteeStore;
109use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
110use sui_core::epoch::epoch_metrics::EpochMetrics;
111use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
112use sui_core::global_state_hasher::GlobalStateHasher;
113use sui_core::jsonrpc_index::IndexStore;
114use sui_core::module_cache_metrics::ResolverMetrics;
115use sui_core::overload_monitor::overload_monitor;
116use sui_core::rpc_index::RpcIndexStore;
117use sui_core::signature_verifier::SignatureVerifierMetrics;
118use sui_core::storage::RocksDbStore;
119use sui_core::transaction_orchestrator::TransactionOrchestrator;
120use sui_core::{
121 authority::{AuthorityState, AuthorityStore},
122 authority_client::NetworkAuthorityClient,
123};
124use sui_json_rpc::JsonRpcServerBuilder;
125use sui_json_rpc::coin_api::CoinReadApi;
126use sui_json_rpc::governance_api::GovernanceReadApi;
127use sui_json_rpc::indexer_api::IndexerApi;
128use sui_json_rpc::move_utils::MoveUtils;
129use sui_json_rpc::read_api::ReadApi;
130use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
131use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
132use sui_macros::fail_point;
133use sui_macros::{fail_point_async, replay_log};
134use sui_network::api::ValidatorServer;
135use sui_network::discovery;
136use sui_network::endpoint_manager::EndpointManager;
137use sui_network::state_sync;
138use sui_network::validator::server::ServerBuilder;
139use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
140use sui_snapshot::uploader::StateSnapshotUploader;
141use sui_storage::{
142 http_key_value_store::HttpKVStore,
143 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
144 key_value_store_metrics::KeyValueStoreMetrics,
145};
146use sui_types::base_types::{AuthorityName, EpochId};
147use sui_types::committee::Committee;
148use sui_types::crypto::KeypairTraits;
149use sui_types::error::{SuiError, SuiResult};
150use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
151use sui_types::sui_system_state::SuiSystemStateTrait;
152use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
153use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
154use sui_types::supported_protocol_versions::SupportedProtocolVersions;
155use typed_store::DBMetrics;
156use typed_store::rocks::default_db_options;
157
158use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
159
160pub mod admin;
161mod handle;
162pub mod metrics;
163
164pub struct ValidatorComponents {
165 validator_server_handle: SpawnOnce,
166 validator_overload_monitor_handle: Option<JoinHandle<()>>,
167 consensus_manager: Arc<ConsensusManager>,
168 consensus_store_pruner: ConsensusStorePruner,
169 consensus_adapter: Arc<ConsensusAdapter>,
170 checkpoint_metrics: Arc<CheckpointMetrics>,
171 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
172}
173pub struct P2pComponents {
174 p2p_network: Network,
175 known_peers: HashMap<PeerId, String>,
176 discovery_handle: discovery::Handle,
177 state_sync_handle: state_sync::Handle,
178 randomness_handle: randomness::Handle,
179 endpoint_manager: EndpointManager,
180}
181
182#[cfg(msim)]
183mod simulator {
184 use std::sync::atomic::AtomicBool;
185 use sui_types::error::SuiErrorKind;
186
187 use super::*;
188 pub(super) struct SimState {
189 pub sim_node: sui_simulator::runtime::NodeHandle,
190 pub sim_safe_mode_expected: AtomicBool,
191 _leak_detector: sui_simulator::NodeLeakDetector,
192 }
193
194 impl Default for SimState {
195 fn default() -> Self {
196 Self {
197 sim_node: sui_simulator::runtime::NodeHandle::current(),
198 sim_safe_mode_expected: AtomicBool::new(false),
199 _leak_detector: sui_simulator::NodeLeakDetector::new(),
200 }
201 }
202 }
203
204 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
205 + Send
206 + Sync
207 + 'static;
208
209 fn default_fetch_jwks(
210 _authority: AuthorityName,
211 _provider: &OIDCProvider,
212 ) -> SuiResult<Vec<(JwkId, JWK)>> {
213 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
214 parse_jwks(
216 sui_types::zk_login_util::DEFAULT_JWK_BYTES,
217 &OIDCProvider::Twitch,
218 true,
219 )
220 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
221 }
222
223 thread_local! {
224 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
225 }
226
227 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
228 JWK_INJECTOR.with(|injector| injector.borrow().clone())
229 }
230
231 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
232 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
233 }
234}
235
236#[cfg(msim)]
237pub use simulator::set_jwk_injector;
238#[cfg(msim)]
239use simulator::*;
240use sui_core::authority::authority_store_pruner::PrunerWatermarks;
241use sui_core::{
242 consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
243};
244
245const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
246
247pub struct SuiNode {
248 config: NodeConfig,
249 validator_components: Mutex<Option<ValidatorComponents>>,
250
251 #[allow(unused)]
253 http_servers: HttpServers,
254
255 state: Arc<AuthorityState>,
256 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
257 registry_service: RegistryService,
258 metrics: Arc<SuiNodeMetrics>,
259 checkpoint_metrics: Arc<CheckpointMetrics>,
260
261 _discovery: discovery::Handle,
262 _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
263 state_sync_handle: state_sync::Handle,
264 randomness_handle: randomness::Handle,
265 checkpoint_store: Arc<CheckpointStore>,
266 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
267 connection_monitor_status: Arc<ConnectionMonitorStatus>,
268
269 end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
271
272 endpoint_manager: EndpointManager,
274
275 backpressure_manager: Arc<BackpressureManager>,
276
277 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
278
279 #[cfg(msim)]
280 sim_state: SimState,
281
282 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
283 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
285
286 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
291
292 subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
293}
294
295impl fmt::Debug for SuiNode {
296 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
297 f.debug_struct("SuiNode")
298 .field("name", &self.state.name.concise())
299 .finish()
300 }
301}
302
303static MAX_JWK_KEYS_PER_FETCH: usize = 100;
304
305impl SuiNode {
306 pub async fn start(
307 config: NodeConfig,
308 registry_service: RegistryService,
309 ) -> Result<Arc<SuiNode>> {
310 Self::start_async(
311 config,
312 registry_service,
313 ServerVersion::new("sui-node", "unknown"),
314 )
315 .await
316 }
317
318 fn start_jwk_updater(
319 config: &NodeConfig,
320 metrics: Arc<SuiNodeMetrics>,
321 authority: AuthorityName,
322 epoch_store: Arc<AuthorityPerEpochStore>,
323 consensus_adapter: Arc<ConsensusAdapter>,
324 ) {
325 let epoch = epoch_store.epoch();
326
327 let supported_providers = config
328 .zklogin_oauth_providers
329 .get(&epoch_store.get_chain_identifier().chain())
330 .unwrap_or(&BTreeSet::new())
331 .iter()
332 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
333 .collect::<Vec<_>>();
334
335 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
336
337 info!(
338 ?fetch_interval,
339 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
340 );
341
342 fn validate_jwk(
343 metrics: &Arc<SuiNodeMetrics>,
344 provider: &OIDCProvider,
345 id: &JwkId,
346 jwk: &JWK,
347 ) -> bool {
348 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
349 warn!(
350 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
351 id.iss, provider
352 );
353 metrics
354 .invalid_jwks
355 .with_label_values(&[&provider.to_string()])
356 .inc();
357 return false;
358 };
359
360 if iss_provider != *provider {
361 warn!(
362 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
363 id.iss, provider, iss_provider
364 );
365 metrics
366 .invalid_jwks
367 .with_label_values(&[&provider.to_string()])
368 .inc();
369 return false;
370 }
371
372 if !check_total_jwk_size(id, jwk) {
373 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
374 metrics
375 .invalid_jwks
376 .with_label_values(&[&provider.to_string()])
377 .inc();
378 return false;
379 }
380
381 true
382 }
383
384 for p in supported_providers.into_iter() {
393 let provider_str = p.to_string();
394 let epoch_store = epoch_store.clone();
395 let consensus_adapter = consensus_adapter.clone();
396 let metrics = metrics.clone();
397 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
398 async move {
399 let mut seen = HashSet::new();
402 loop {
403 jwk_log!("fetching JWK for provider {:?}", p);
404 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
405 match Self::fetch_jwks(authority, &p).await {
406 Err(e) => {
407 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
408 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
409 tokio::time::sleep(Duration::from_secs(30)).await;
411 continue;
412 }
413 Ok(mut keys) => {
414 metrics.total_jwks
415 .with_label_values(&[&provider_str])
416 .inc_by(keys.len() as u64);
417
418 keys.retain(|(id, jwk)| {
419 validate_jwk(&metrics, &p, id, jwk) &&
420 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
421 seen.insert((id.clone(), jwk.clone()))
422 });
423
424 metrics.unique_jwks
425 .with_label_values(&[&provider_str])
426 .inc_by(keys.len() as u64);
427
428 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
431 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
432 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
433 }
434
435 for (id, jwk) in keys.into_iter() {
436 jwk_log!("Submitting JWK to consensus: {:?}", id);
437
438 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
439 consensus_adapter.submit(txn, None, &epoch_store, None, None)
440 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
441 .ok();
442 }
443 }
444 }
445 tokio::time::sleep(fetch_interval).await;
446 }
447 }
448 .instrument(error_span!("jwk_updater_task", epoch)),
449 ));
450 }
451 }
452
453 pub async fn start_async(
454 config: NodeConfig,
455 registry_service: RegistryService,
456 server_version: ServerVersion,
457 ) -> Result<Arc<SuiNode>> {
458 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
459 let mut config = config.clone();
460 if config.supported_protocol_versions.is_none() {
461 info!(
462 "populating config.supported_protocol_versions with default {:?}",
463 SupportedProtocolVersions::SYSTEM_DEFAULT
464 );
465 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
466 }
467
468 let run_with_range = config.run_with_range;
469 let is_validator = config.consensus_config().is_some();
470 let is_full_node = !is_validator;
471 let prometheus_registry = registry_service.default_registry();
472
473 info!(node =? config.protocol_public_key(),
474 "Initializing sui-node listening on {}", config.network_address
475 );
476
477 DBMetrics::init(registry_service.clone());
479
480 typed_store::init_write_sync(config.enable_db_sync_to_disk);
482
483 mysten_metrics::init_metrics(&prometheus_registry);
485 #[cfg(not(msim))]
487 mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
488
489 let genesis = config.genesis()?.clone();
490
491 let secret = Arc::pin(config.protocol_key_pair().copy());
492 let genesis_committee = genesis.committee();
493 let committee_store = Arc::new(CommitteeStore::new(
494 config.db_path().join("epochs"),
495 &genesis_committee,
496 None,
497 ));
498
499 let pruner_watermarks = Arc::new(PrunerWatermarks::default());
500 let checkpoint_store = CheckpointStore::new(
501 &config.db_path().join("checkpoints"),
502 pruner_watermarks.clone(),
503 );
504 let checkpoint_metrics = CheckpointMetrics::new(®istry_service.default_registry());
505
506 Self::check_and_recover_forks(
507 &checkpoint_store,
508 &checkpoint_metrics,
509 is_validator,
510 config.fork_recovery.as_ref(),
511 )
512 .await?;
513
514 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
516 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
517 enable_write_stall,
518 is_validator,
519 };
520 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
521 &config.db_path().join("store"),
522 Some(perpetual_tables_options),
523 Some(pruner_watermarks.epoch_id.clone()),
524 ));
525 let is_genesis = perpetual_tables
526 .database_is_empty()
527 .expect("Database read should not fail at init.");
528
529 let backpressure_manager =
530 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
531
532 let store =
533 AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
534
535 let cur_epoch = store.get_recovery_epoch_at_restart()?;
536 let committee = committee_store
537 .get_committee(&cur_epoch)?
538 .expect("Committee of the current epoch must exist");
539 let epoch_start_configuration = store
540 .get_epoch_start_configuration()?
541 .expect("EpochStartConfiguration of the current epoch must exist");
542 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
543 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
544
545 let cache_traits = build_execution_cache(
546 &config.execution_cache,
547 &prometheus_registry,
548 &store,
549 backpressure_manager.clone(),
550 );
551
552 let auth_agg = {
553 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
554 Arc::new(ArcSwap::new(Arc::new(
555 AuthorityAggregator::new_from_epoch_start_state(
556 epoch_start_configuration.epoch_start_state(),
557 &committee_store,
558 safe_client_metrics_base,
559 ),
560 )))
561 };
562
563 let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
564 let chain = match config.chain_override_for_testing {
565 Some(chain) => chain,
566 None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
567 };
568
569 let highest_executed_checkpoint = checkpoint_store
570 .get_highest_executed_checkpoint_seq_number()
571 .expect("checkpoint store read cannot fail")
572 .unwrap_or(0);
573
574 let previous_epoch_last_checkpoint = if cur_epoch == 0 {
575 0
576 } else {
577 checkpoint_store
578 .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
579 .expect("checkpoint store read cannot fail")
580 .unwrap_or(highest_executed_checkpoint)
581 };
582
583 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
584 let epoch_store = AuthorityPerEpochStore::new(
585 config.protocol_public_key(),
586 committee.clone(),
587 &config.db_path().join("store"),
588 Some(epoch_options.options),
589 EpochMetrics::new(®istry_service.default_registry()),
590 epoch_start_configuration,
591 cache_traits.backing_package_store.clone(),
592 cache_traits.object_store.clone(),
593 cache_metrics,
594 signature_verifier_metrics,
595 &config.expensive_safety_check_config,
596 (chain_id, chain),
597 highest_executed_checkpoint,
598 previous_epoch_last_checkpoint,
599 Arc::new(SubmittedTransactionCacheMetrics::new(
600 ®istry_service.default_registry(),
601 )),
602 )?;
603
604 info!("created epoch store");
605
606 replay_log!(
607 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
608 epoch_store.epoch(),
609 epoch_store.protocol_config()
610 );
611
612 if is_genesis {
614 info!("checking SUI conservation at genesis");
615 cache_traits
620 .reconfig_api
621 .expensive_check_sui_conservation(&epoch_store)
622 .expect("SUI conservation check cannot fail at genesis");
623 }
624
625 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
626 let default_buffer_stake = epoch_store
627 .protocol_config()
628 .buffer_stake_for_protocol_upgrade_bps();
629 if effective_buffer_stake != default_buffer_stake {
630 warn!(
631 ?effective_buffer_stake,
632 ?default_buffer_stake,
633 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
634 );
635 }
636
637 checkpoint_store.insert_genesis_checkpoint(
638 genesis.checkpoint(),
639 genesis.checkpoint_contents().clone(),
640 &epoch_store,
641 );
642
643 info!("creating state sync store");
644 let state_sync_store = RocksDbStore::new(
645 cache_traits.clone(),
646 committee_store.clone(),
647 checkpoint_store.clone(),
648 );
649
650 let index_store = if is_full_node && config.enable_index_processing {
651 info!("creating jsonrpc index store");
652 Some(Arc::new(IndexStore::new(
653 config.db_path().join("indexes"),
654 &prometheus_registry,
655 epoch_store
656 .protocol_config()
657 .max_move_identifier_len_as_option(),
658 config.remove_deprecated_tables,
659 )))
660 } else {
661 None
662 };
663
664 let rpc_index = if is_full_node && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
665 info!("creating rpc index store");
666 Some(Arc::new(
667 RpcIndexStore::new(
668 &config.db_path(),
669 &store,
670 &checkpoint_store,
671 &epoch_store,
672 &cache_traits.backing_package_store,
673 pruner_watermarks.checkpoint_id.clone(),
674 config.rpc().cloned().unwrap_or_default(),
675 )
676 .await,
677 ))
678 } else {
679 None
680 };
681
682 let chain_identifier = epoch_store.get_chain_identifier();
683
684 info!("creating archive reader");
685 let (randomness_tx, randomness_rx) = mpsc::channel(
687 config
688 .p2p_config
689 .randomness
690 .clone()
691 .unwrap_or_default()
692 .mailbox_capacity(),
693 );
694 let P2pComponents {
695 p2p_network,
696 known_peers,
697 discovery_handle,
698 state_sync_handle,
699 randomness_handle,
700 endpoint_manager,
701 } = Self::create_p2p_network(
702 &config,
703 state_sync_store.clone(),
704 chain_identifier,
705 randomness_tx,
706 &prometheus_registry,
707 )?;
708
709 for peer in &config.p2p_config.peer_address_overrides {
711 endpoint_manager
712 .update_endpoint(
713 EndpointId::P2p(peer.peer_id),
714 AddressSource::Config,
715 peer.addresses.clone(),
716 )
717 .expect("Updating peer address overrides should not fail");
718 }
719
720 update_peer_addresses(&config, &endpoint_manager, epoch_store.epoch_start_state());
722
723 info!("start snapshot upload");
724 let state_snapshot_handle = Self::start_state_snapshot(
726 &config,
727 &prometheus_registry,
728 checkpoint_store.clone(),
729 chain_identifier,
730 )?;
731
732 info!("start db checkpoint");
734 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
735 &config,
736 &prometheus_registry,
737 state_snapshot_handle.is_some(),
738 )?;
739
740 if !epoch_store
741 .protocol_config()
742 .simplified_unwrap_then_delete()
743 {
744 config
746 .authority_store_pruning_config
747 .set_killswitch_tombstone_pruning(true);
748 }
749
750 let authority_name = config.protocol_public_key();
751
752 info!("create authority state");
753 let state = AuthorityState::new(
754 authority_name,
755 secret,
756 config.supported_protocol_versions.unwrap(),
757 store.clone(),
758 cache_traits.clone(),
759 epoch_store.clone(),
760 committee_store.clone(),
761 index_store.clone(),
762 rpc_index,
763 checkpoint_store.clone(),
764 &prometheus_registry,
765 genesis.objects(),
766 &db_checkpoint_config,
767 config.clone(),
768 chain_identifier,
769 config.policy_config.clone(),
770 config.firewall_config.clone(),
771 pruner_watermarks,
772 )
773 .await;
774 if epoch_store.epoch() == 0 {
776 let txn = &genesis.transaction();
777 let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
778 let transaction =
779 sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
780 sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
781 genesis.transaction().data().clone(),
782 sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
783 ),
784 );
785 state
786 .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
787 .instrument(span)
788 .await
789 .unwrap();
790 }
791
792 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
794
795 if config
796 .expensive_safety_check_config
797 .enable_secondary_index_checks()
798 && let Some(indexes) = state.indexes.clone()
799 {
800 sui_core::verify_indexes::verify_indexes(
801 state.get_global_state_hash_store().as_ref(),
802 indexes,
803 )
804 .expect("secondary indexes are inconsistent");
805 }
806
807 let (end_of_epoch_channel, end_of_epoch_receiver) =
808 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
809
810 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
811 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
812 auth_agg.load_full(),
813 state.clone(),
814 end_of_epoch_receiver,
815 &config.db_path(),
816 &prometheus_registry,
817 &config,
818 )))
819 } else {
820 None
821 };
822
823 let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
824 state.clone(),
825 state_sync_store,
826 &transaction_orchestrator.clone(),
827 &config,
828 &prometheus_registry,
829 server_version,
830 )
831 .await?;
832
833 let global_state_hasher = Arc::new(GlobalStateHasher::new(
834 cache_traits.global_state_hash_store.clone(),
835 GlobalStateHashMetrics::new(&prometheus_registry),
836 ));
837
838 let authority_names_to_peer_ids = epoch_store
839 .epoch_start_state()
840 .get_authority_names_to_peer_ids();
841
842 let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
843 "sui",
844 ®istry_service.default_registry(),
845 );
846
847 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
848
849 let connection_monitor_handle =
850 mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
851 p2p_network.downgrade(),
852 Arc::new(network_connection_metrics),
853 known_peers,
854 );
855
856 let connection_monitor_status = ConnectionMonitorStatus {
857 connection_statuses: connection_monitor_handle.connection_statuses(),
858 authority_names_to_peer_ids,
859 };
860
861 let connection_monitor_status = Arc::new(connection_monitor_status);
862 let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry()));
863
864 sui_node_metrics
865 .binary_max_protocol_version
866 .set(ProtocolVersion::MAX.as_u64() as i64);
867 sui_node_metrics
868 .configured_max_protocol_version
869 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
870
871 let validator_components = if state.is_validator(&epoch_store) {
872 let mut components = Self::construct_validator_components(
873 config.clone(),
874 state.clone(),
875 committee,
876 epoch_store.clone(),
877 checkpoint_store.clone(),
878 state_sync_handle.clone(),
879 randomness_handle.clone(),
880 Arc::downgrade(&global_state_hasher),
881 backpressure_manager.clone(),
882 connection_monitor_status.clone(),
883 ®istry_service,
884 sui_node_metrics.clone(),
885 checkpoint_metrics.clone(),
886 )
887 .await?;
888
889 components.consensus_adapter.submit_recovered(&epoch_store);
890
891 components.validator_server_handle = components.validator_server_handle.start().await;
893
894 endpoint_manager.set_consensus_address_updater(components.consensus_manager.clone());
896
897 Some(components)
898 } else {
899 None
900 };
901
902 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
904
905 let node = Self {
906 config,
907 validator_components: Mutex::new(validator_components),
908 http_servers,
909 state,
910 transaction_orchestrator,
911 registry_service,
912 metrics: sui_node_metrics,
913 checkpoint_metrics,
914
915 _discovery: discovery_handle,
916 _connection_monitor_handle: connection_monitor_handle,
917 state_sync_handle,
918 randomness_handle,
919 checkpoint_store,
920 global_state_hasher: Mutex::new(Some(global_state_hasher)),
921 end_of_epoch_channel,
922 connection_monitor_status,
923 endpoint_manager,
924 backpressure_manager,
925
926 _db_checkpoint_handle: db_checkpoint_handle,
927
928 #[cfg(msim)]
929 sim_state: Default::default(),
930
931 _state_snapshot_uploader_handle: state_snapshot_handle,
932 shutdown_channel_tx: shutdown_channel,
933
934 auth_agg,
935 subscription_service_checkpoint_sender,
936 };
937
938 info!("SuiNode started!");
939 let node = Arc::new(node);
940 let node_copy = node.clone();
941 spawn_monitored_task!(async move {
942 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
943 if let Err(error) = result {
944 warn!("Reconfiguration finished with error {:?}", error);
945 }
946 });
947
948 Ok(node)
949 }
950
951 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
952 self.end_of_epoch_channel.subscribe()
953 }
954
955 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
956 self.shutdown_channel_tx.subscribe()
957 }
958
959 pub fn current_epoch_for_testing(&self) -> EpochId {
960 self.state.current_epoch_for_testing()
961 }
962
963 pub fn db_checkpoint_path(&self) -> PathBuf {
964 self.config.db_checkpoint_path()
965 }
966
967 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
969 info!("close_epoch (current epoch = {})", epoch_store.epoch());
970 self.validator_components
971 .lock()
972 .await
973 .as_ref()
974 .ok_or_else(|| SuiError::from("Node is not a validator"))?
975 .consensus_adapter
976 .close_epoch(epoch_store);
977 Ok(())
978 }
979
980 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
981 self.state
982 .clear_override_protocol_upgrade_buffer_stake(epoch)
983 }
984
985 pub fn set_override_protocol_upgrade_buffer_stake(
986 &self,
987 epoch: EpochId,
988 buffer_stake_bps: u64,
989 ) -> SuiResult {
990 self.state
991 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
992 }
993
994 pub async fn close_epoch_for_testing(&self) -> SuiResult {
997 let epoch_store = self.state.epoch_store_for_testing();
998 self.close_epoch(&epoch_store).await
999 }
1000
1001 fn start_state_snapshot(
1002 config: &NodeConfig,
1003 prometheus_registry: &Registry,
1004 checkpoint_store: Arc<CheckpointStore>,
1005 chain_identifier: ChainIdentifier,
1006 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1007 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1008 let snapshot_uploader = StateSnapshotUploader::new(
1009 &config.db_checkpoint_path(),
1010 &config.snapshot_path(),
1011 remote_store_config.clone(),
1012 60,
1013 prometheus_registry,
1014 checkpoint_store,
1015 chain_identifier,
1016 config.state_snapshot_write_config.archive_interval_epochs,
1017 )?;
1018 Ok(Some(snapshot_uploader.start()))
1019 } else {
1020 Ok(None)
1021 }
1022 }
1023
1024 fn start_db_checkpoint(
1025 config: &NodeConfig,
1026 prometheus_registry: &Registry,
1027 state_snapshot_enabled: bool,
1028 ) -> Result<(
1029 DBCheckpointConfig,
1030 Option<tokio::sync::broadcast::Sender<()>>,
1031 )> {
1032 let checkpoint_path = Some(
1033 config
1034 .db_checkpoint_config
1035 .checkpoint_path
1036 .clone()
1037 .unwrap_or_else(|| config.db_checkpoint_path()),
1038 );
1039 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1040 DBCheckpointConfig {
1041 checkpoint_path,
1042 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1043 true
1044 } else {
1045 config
1046 .db_checkpoint_config
1047 .perform_db_checkpoints_at_epoch_end
1048 },
1049 ..config.db_checkpoint_config.clone()
1050 }
1051 } else {
1052 config.db_checkpoint_config.clone()
1053 };
1054
1055 match (
1056 db_checkpoint_config.object_store_config.as_ref(),
1057 state_snapshot_enabled,
1058 ) {
1059 (None, false) => Ok((db_checkpoint_config, None)),
1064 (_, _) => {
1065 let handler = DBCheckpointHandler::new(
1066 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1067 db_checkpoint_config.object_store_config.as_ref(),
1068 60,
1069 db_checkpoint_config
1070 .prune_and_compact_before_upload
1071 .unwrap_or(true),
1072 config.authority_store_pruning_config.clone(),
1073 prometheus_registry,
1074 state_snapshot_enabled,
1075 )?;
1076 Ok((
1077 db_checkpoint_config,
1078 Some(DBCheckpointHandler::start(handler)),
1079 ))
1080 }
1081 }
1082 }
1083
1084 fn create_p2p_network(
1085 config: &NodeConfig,
1086 state_sync_store: RocksDbStore,
1087 chain_identifier: ChainIdentifier,
1088 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1089 prometheus_registry: &Registry,
1090 ) -> Result<P2pComponents> {
1091 let mut p2p_config = config.p2p_config.clone();
1092 {
1093 let disc = p2p_config.discovery.get_or_insert_with(Default::default);
1094 if disc.peer_addr_store_path.is_none() {
1095 disc.peer_addr_store_path =
1096 Some(config.db_path().join("discovery_peer_cache.yaml"));
1097 }
1098 }
1099 let mut discovery_builder = discovery::Builder::new().config(p2p_config.clone());
1100 if let Some(consensus_config) = &config.consensus_config {
1101 let effective_addr = consensus_config
1102 .external_address
1103 .as_ref()
1104 .or(consensus_config.listen_address.as_ref());
1105 if let Some(addr) = effective_addr {
1106 discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1107 }
1108 }
1109 let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1110 let discovery_sender = discovery.sender();
1111
1112 let (state_sync, state_sync_router) = state_sync::Builder::new()
1113 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1114 .store(state_sync_store)
1115 .archive_config(config.archive_reader_config())
1116 .discovery_sender(discovery_sender)
1117 .with_metrics(prometheus_registry)
1118 .build();
1119
1120 let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1121 let known_peers: HashMap<PeerId, String> = discovery_config
1122 .allowlisted_peers
1123 .clone()
1124 .into_iter()
1125 .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1126 .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1127 peer.peer_id
1128 .map(|peer_id| (peer_id, "seed_peer".to_string()))
1129 }))
1130 .collect();
1131
1132 let (randomness, randomness_router) =
1133 randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1134 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1135 .with_metrics(prometheus_registry)
1136 .build();
1137
1138 let p2p_network = {
1139 let routes = anemo::Router::new()
1140 .add_rpc_service(discovery_server)
1141 .merge(state_sync_router);
1142 let routes = routes.merge(randomness_router);
1143
1144 let inbound_network_metrics =
1145 mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1146 let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1147 "sui",
1148 "outbound",
1149 prometheus_registry,
1150 );
1151
1152 let service = ServiceBuilder::new()
1153 .layer(
1154 TraceLayer::new_for_server_errors()
1155 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1156 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1157 )
1158 .layer(CallbackLayer::new(
1159 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1160 Arc::new(inbound_network_metrics),
1161 config.p2p_config.excessive_message_size(),
1162 ),
1163 ))
1164 .service(routes);
1165
1166 let outbound_layer = ServiceBuilder::new()
1167 .layer(
1168 TraceLayer::new_for_client_and_server_errors()
1169 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1170 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1171 )
1172 .layer(CallbackLayer::new(
1173 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1174 Arc::new(outbound_network_metrics),
1175 config.p2p_config.excessive_message_size(),
1176 ),
1177 ))
1178 .into_inner();
1179
1180 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1181 anemo_config.max_frame_size = Some(1 << 30);
1184
1185 let mut quic_config = anemo_config.quic.unwrap_or_default();
1188 if quic_config.socket_send_buffer_size.is_none() {
1189 quic_config.socket_send_buffer_size = Some(20 << 20);
1190 }
1191 if quic_config.socket_receive_buffer_size.is_none() {
1192 quic_config.socket_receive_buffer_size = Some(20 << 20);
1193 }
1194 quic_config.allow_failed_socket_buffer_size_setting = true;
1195
1196 if quic_config.max_concurrent_bidi_streams.is_none() {
1199 quic_config.max_concurrent_bidi_streams = Some(500);
1200 }
1201 if quic_config.max_concurrent_uni_streams.is_none() {
1202 quic_config.max_concurrent_uni_streams = Some(500);
1203 }
1204 if quic_config.stream_receive_window.is_none() {
1205 quic_config.stream_receive_window = Some(100 << 20);
1206 }
1207 if quic_config.receive_window.is_none() {
1208 quic_config.receive_window = Some(200 << 20);
1209 }
1210 if quic_config.send_window.is_none() {
1211 quic_config.send_window = Some(200 << 20);
1212 }
1213 if quic_config.crypto_buffer_size.is_none() {
1214 quic_config.crypto_buffer_size = Some(1 << 20);
1215 }
1216 if quic_config.max_idle_timeout_ms.is_none() {
1217 quic_config.max_idle_timeout_ms = Some(10_000);
1218 }
1219 if quic_config.keep_alive_interval_ms.is_none() {
1220 quic_config.keep_alive_interval_ms = Some(5_000);
1221 }
1222 anemo_config.quic = Some(quic_config);
1223
1224 let server_name = format!("sui-{}", chain_identifier);
1225 let network = Network::bind(config.p2p_config.listen_address)
1226 .server_name(&server_name)
1227 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1228 .config(anemo_config)
1229 .outbound_request_layer(outbound_layer)
1230 .start(service)?;
1231 info!(
1232 server_name = server_name,
1233 "P2p network started on {}",
1234 network.local_addr()
1235 );
1236
1237 network
1238 };
1239
1240 let discovery_handle =
1241 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1242 let state_sync_handle = state_sync.start(p2p_network.clone());
1243 let randomness_handle = randomness.start(p2p_network.clone());
1244
1245 Ok(P2pComponents {
1246 p2p_network,
1247 known_peers,
1248 discovery_handle,
1249 state_sync_handle,
1250 randomness_handle,
1251 endpoint_manager,
1252 })
1253 }
1254
1255 async fn construct_validator_components(
1256 config: NodeConfig,
1257 state: Arc<AuthorityState>,
1258 committee: Arc<Committee>,
1259 epoch_store: Arc<AuthorityPerEpochStore>,
1260 checkpoint_store: Arc<CheckpointStore>,
1261 state_sync_handle: state_sync::Handle,
1262 randomness_handle: randomness::Handle,
1263 global_state_hasher: Weak<GlobalStateHasher>,
1264 backpressure_manager: Arc<BackpressureManager>,
1265 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1266 registry_service: &RegistryService,
1267 sui_node_metrics: Arc<SuiNodeMetrics>,
1268 checkpoint_metrics: Arc<CheckpointMetrics>,
1269 ) -> Result<ValidatorComponents> {
1270 let mut config_clone = config.clone();
1271 let consensus_config = config_clone
1272 .consensus_config
1273 .as_mut()
1274 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1275
1276 let client = Arc::new(UpdatableConsensusClient::new());
1277 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1278 &committee,
1279 consensus_config,
1280 state.name,
1281 connection_monitor_status.clone(),
1282 ®istry_service.default_registry(),
1283 epoch_store.protocol_config().clone(),
1284 client.clone(),
1285 checkpoint_store.clone(),
1286 ));
1287 let consensus_manager = Arc::new(ConsensusManager::new(
1288 &config,
1289 consensus_config,
1290 registry_service,
1291 client,
1292 ));
1293
1294 let consensus_store_pruner = ConsensusStorePruner::new(
1296 consensus_manager.get_storage_base_path(),
1297 consensus_config.db_retention_epochs(),
1298 consensus_config.db_pruner_period(),
1299 ®istry_service.default_registry(),
1300 );
1301
1302 let sui_tx_validator_metrics =
1303 SuiTxValidatorMetrics::new(®istry_service.default_registry());
1304
1305 let validator_server_handle = Self::start_grpc_validator_service(
1306 &config,
1307 state.clone(),
1308 consensus_adapter.clone(),
1309 ®istry_service.default_registry(),
1310 )
1311 .await?;
1312
1313 let validator_overload_monitor_handle = if config
1316 .authority_overload_config
1317 .max_load_shedding_percentage
1318 > 0
1319 {
1320 let authority_state = Arc::downgrade(&state);
1321 let overload_config = config.authority_overload_config.clone();
1322 fail_point!("starting_overload_monitor");
1323 Some(spawn_monitored_task!(overload_monitor(
1324 authority_state,
1325 overload_config,
1326 )))
1327 } else {
1328 None
1329 };
1330
1331 Self::start_epoch_specific_validator_components(
1332 &config,
1333 state.clone(),
1334 consensus_adapter,
1335 checkpoint_store,
1336 epoch_store,
1337 state_sync_handle,
1338 randomness_handle,
1339 consensus_manager,
1340 consensus_store_pruner,
1341 global_state_hasher,
1342 backpressure_manager,
1343 validator_server_handle,
1344 validator_overload_monitor_handle,
1345 checkpoint_metrics,
1346 sui_node_metrics,
1347 sui_tx_validator_metrics,
1348 )
1349 .await
1350 }
1351
1352 async fn start_epoch_specific_validator_components(
1353 config: &NodeConfig,
1354 state: Arc<AuthorityState>,
1355 consensus_adapter: Arc<ConsensusAdapter>,
1356 checkpoint_store: Arc<CheckpointStore>,
1357 epoch_store: Arc<AuthorityPerEpochStore>,
1358 state_sync_handle: state_sync::Handle,
1359 randomness_handle: randomness::Handle,
1360 consensus_manager: Arc<ConsensusManager>,
1361 consensus_store_pruner: ConsensusStorePruner,
1362 state_hasher: Weak<GlobalStateHasher>,
1363 backpressure_manager: Arc<BackpressureManager>,
1364 validator_server_handle: SpawnOnce,
1365 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1366 checkpoint_metrics: Arc<CheckpointMetrics>,
1367 sui_node_metrics: Arc<SuiNodeMetrics>,
1368 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1369 ) -> Result<ValidatorComponents> {
1370 let checkpoint_service = Self::build_checkpoint_service(
1371 config,
1372 consensus_adapter.clone(),
1373 checkpoint_store.clone(),
1374 epoch_store.clone(),
1375 state.clone(),
1376 state_sync_handle,
1377 state_hasher,
1378 checkpoint_metrics.clone(),
1379 );
1380
1381 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1385
1386 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1387
1388 if epoch_store.randomness_state_enabled() {
1389 let randomness_manager = RandomnessManager::try_new(
1390 Arc::downgrade(&epoch_store),
1391 Box::new(consensus_adapter.clone()),
1392 randomness_handle,
1393 config.protocol_key_pair(),
1394 )
1395 .await;
1396 if let Some(randomness_manager) = randomness_manager {
1397 epoch_store
1398 .set_randomness_manager(randomness_manager)
1399 .await?;
1400 }
1401 }
1402
1403 ExecutionTimeObserver::spawn(
1404 epoch_store.clone(),
1405 Box::new(consensus_adapter.clone()),
1406 config
1407 .execution_time_observer_config
1408 .clone()
1409 .unwrap_or_default(),
1410 );
1411
1412 let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1413 None,
1414 state.metrics.clone(),
1415 ));
1416
1417 let throughput_profiler = Arc::new(ConsensusThroughputProfiler::new(
1418 throughput_calculator.clone(),
1419 None,
1420 None,
1421 state.metrics.clone(),
1422 ThroughputProfileRanges::from_chain(epoch_store.get_chain_identifier()),
1423 ));
1424
1425 consensus_adapter.swap_throughput_profiler(throughput_profiler);
1426
1427 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1428 state.clone(),
1429 checkpoint_service.clone(),
1430 epoch_store.clone(),
1431 consensus_adapter.clone(),
1432 low_scoring_authorities,
1433 throughput_calculator,
1434 backpressure_manager,
1435 config.congestion_log.clone(),
1436 );
1437
1438 info!("Starting consensus manager asynchronously");
1439
1440 tokio::spawn({
1442 let config = config.clone();
1443 let epoch_store = epoch_store.clone();
1444 let sui_tx_validator = SuiTxValidator::new(
1445 state.clone(),
1446 epoch_store.clone(),
1447 checkpoint_service.clone(),
1448 sui_tx_validator_metrics.clone(),
1449 );
1450 let consensus_manager = consensus_manager.clone();
1451 async move {
1452 consensus_manager
1453 .start(
1454 &config,
1455 epoch_store,
1456 consensus_handler_initializer,
1457 sui_tx_validator,
1458 )
1459 .await;
1460 }
1461 });
1462 let replay_waiter = consensus_manager.replay_waiter();
1463
1464 info!("Spawning checkpoint service");
1465 let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1466 None
1467 } else {
1468 Some(replay_waiter)
1469 };
1470 checkpoint_service
1471 .spawn(epoch_store.clone(), replay_waiter)
1472 .await;
1473
1474 if epoch_store.authenticator_state_enabled() {
1475 Self::start_jwk_updater(
1476 config,
1477 sui_node_metrics,
1478 state.name,
1479 epoch_store.clone(),
1480 consensus_adapter.clone(),
1481 );
1482 }
1483
1484 Ok(ValidatorComponents {
1485 validator_server_handle,
1486 validator_overload_monitor_handle,
1487 consensus_manager,
1488 consensus_store_pruner,
1489 consensus_adapter,
1490 checkpoint_metrics,
1491 sui_tx_validator_metrics,
1492 })
1493 }
1494
1495 fn build_checkpoint_service(
1496 config: &NodeConfig,
1497 consensus_adapter: Arc<ConsensusAdapter>,
1498 checkpoint_store: Arc<CheckpointStore>,
1499 epoch_store: Arc<AuthorityPerEpochStore>,
1500 state: Arc<AuthorityState>,
1501 state_sync_handle: state_sync::Handle,
1502 state_hasher: Weak<GlobalStateHasher>,
1503 checkpoint_metrics: Arc<CheckpointMetrics>,
1504 ) -> Arc<CheckpointService> {
1505 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1506 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1507
1508 debug!(
1509 "Starting checkpoint service with epoch start timestamp {}
1510 and epoch duration {}",
1511 epoch_start_timestamp_ms, epoch_duration_ms
1512 );
1513
1514 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1515 sender: consensus_adapter,
1516 signer: state.secret.clone(),
1517 authority: config.protocol_public_key(),
1518 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1519 .checked_add(epoch_duration_ms)
1520 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1521 metrics: checkpoint_metrics.clone(),
1522 });
1523
1524 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1525 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1526 let max_checkpoint_size_bytes =
1527 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1528
1529 CheckpointService::build(
1530 state.clone(),
1531 checkpoint_store,
1532 epoch_store,
1533 state.get_transaction_cache_reader().clone(),
1534 state_hasher,
1535 checkpoint_output,
1536 Box::new(certified_checkpoint_output),
1537 checkpoint_metrics,
1538 max_tx_per_checkpoint,
1539 max_checkpoint_size_bytes,
1540 )
1541 }
1542
1543 fn construct_consensus_adapter(
1544 committee: &Committee,
1545 consensus_config: &ConsensusConfig,
1546 authority: AuthorityName,
1547 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1548 prometheus_registry: &Registry,
1549 protocol_config: ProtocolConfig,
1550 consensus_client: Arc<dyn ConsensusClient>,
1551 checkpoint_store: Arc<CheckpointStore>,
1552 ) -> ConsensusAdapter {
1553 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1554 ConsensusAdapter::new(
1557 consensus_client,
1558 checkpoint_store,
1559 authority,
1560 connection_monitor_status,
1561 consensus_config.max_pending_transactions(),
1562 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1563 consensus_config.max_submit_position,
1564 consensus_config.submit_delay_step_override(),
1565 ca_metrics,
1566 protocol_config,
1567 )
1568 }
1569
1570 async fn start_grpc_validator_service(
1571 config: &NodeConfig,
1572 state: Arc<AuthorityState>,
1573 consensus_adapter: Arc<ConsensusAdapter>,
1574 prometheus_registry: &Registry,
1575 ) -> Result<SpawnOnce> {
1576 let validator_service = ValidatorService::new(
1577 state.clone(),
1578 consensus_adapter,
1579 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1580 config.policy_config.clone().map(|p| p.client_id_source),
1581 );
1582
1583 let mut server_conf = mysten_network::config::Config::new();
1584 server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1585 server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1586 server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1587 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1588 server_conf.load_shed = config.grpc_load_shed;
1589 let mut server_builder =
1590 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1591
1592 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1593
1594 let tls_config = sui_tls::create_rustls_server_config(
1595 config.network_key_pair().copy().private(),
1596 SUI_TLS_SERVER_NAME.to_string(),
1597 );
1598
1599 let network_address = config.network_address().clone();
1600
1601 let (ready_tx, ready_rx) = oneshot::channel();
1602
1603 Ok(SpawnOnce::new(ready_rx, async move {
1604 let server = server_builder
1605 .bind(&network_address, Some(tls_config))
1606 .await
1607 .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1608 let local_addr = server.local_addr();
1609 info!("Listening to traffic on {local_addr}");
1610 ready_tx.send(()).unwrap();
1611 if let Err(err) = server.serve().await {
1612 info!("Server stopped: {err}");
1613 }
1614 info!("Server stopped");
1615 }))
1616 }
1617
1618 pub fn state(&self) -> Arc<AuthorityState> {
1619 self.state.clone()
1620 }
1621
1622 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1624 self.state.reference_gas_price_for_testing()
1625 }
1626
1627 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1628 self.state.committee_store().clone()
1629 }
1630
1631 pub fn clone_authority_aggregator(
1642 &self,
1643 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1644 self.transaction_orchestrator
1645 .as_ref()
1646 .map(|to| to.clone_authority_aggregator())
1647 }
1648
1649 pub fn transaction_orchestrator(
1650 &self,
1651 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1652 self.transaction_orchestrator.clone()
1653 }
1654
1655 pub async fn monitor_reconfiguration(
1658 self: Arc<Self>,
1659 mut epoch_store: Arc<AuthorityPerEpochStore>,
1660 ) -> Result<()> {
1661 let checkpoint_executor_metrics =
1662 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1663
1664 loop {
1665 let mut hasher_guard = self.global_state_hasher.lock().await;
1666 let hasher = hasher_guard.take().unwrap();
1667 info!(
1668 "Creating checkpoint executor for epoch {}",
1669 epoch_store.epoch()
1670 );
1671 let checkpoint_executor = CheckpointExecutor::new(
1672 epoch_store.clone(),
1673 self.checkpoint_store.clone(),
1674 self.state.clone(),
1675 hasher.clone(),
1676 self.backpressure_manager.clone(),
1677 self.config.checkpoint_executor_config.clone(),
1678 checkpoint_executor_metrics.clone(),
1679 self.subscription_service_checkpoint_sender.clone(),
1680 );
1681
1682 let run_with_range = self.config.run_with_range;
1683
1684 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1685
1686 self.metrics
1688 .current_protocol_version
1689 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1690
1691 if let Some(components) = &*self.validator_components.lock().await {
1693 tokio::time::sleep(Duration::from_millis(1)).await;
1695
1696 let config = cur_epoch_store.protocol_config();
1697 let mut supported_protocol_versions = self
1698 .config
1699 .supported_protocol_versions
1700 .expect("Supported versions should be populated")
1701 .truncate_below(config.version);
1703
1704 while supported_protocol_versions.max > config.version {
1705 let proposed_protocol_config = ProtocolConfig::get_for_version(
1706 supported_protocol_versions.max,
1707 cur_epoch_store.get_chain(),
1708 );
1709
1710 if proposed_protocol_config.enable_accumulators()
1711 && !epoch_store.accumulator_root_exists()
1712 {
1713 error!(
1714 "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1715 supported_protocol_versions.max
1716 );
1717 supported_protocol_versions.max = supported_protocol_versions.max.prev();
1718 } else {
1719 break;
1720 }
1721 }
1722
1723 let binary_config = config.binary_config(None);
1724 let transaction = ConsensusTransaction::new_capability_notification_v2(
1725 AuthorityCapabilitiesV2::new(
1726 self.state.name,
1727 cur_epoch_store.get_chain_identifier().chain(),
1728 supported_protocol_versions,
1729 self.state
1730 .get_available_system_packages(&binary_config)
1731 .await,
1732 ),
1733 );
1734 info!(?transaction, "submitting capabilities to consensus");
1735 components.consensus_adapter.submit(
1736 transaction,
1737 None,
1738 &cur_epoch_store,
1739 None,
1740 None,
1741 )?;
1742 }
1743
1744 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1745
1746 if stop_condition == StopReason::RunWithRangeCondition {
1747 SuiNode::shutdown(&self).await;
1748 self.shutdown_channel_tx
1749 .send(run_with_range)
1750 .expect("RunWithRangeCondition met but failed to send shutdown message");
1751 return Ok(());
1752 }
1753
1754 let latest_system_state = self
1756 .state
1757 .get_object_cache_reader()
1758 .get_sui_system_state_object_unsafe()
1759 .expect("Read Sui System State object cannot fail");
1760
1761 #[cfg(msim)]
1762 if !self
1763 .sim_state
1764 .sim_safe_mode_expected
1765 .load(Ordering::Relaxed)
1766 {
1767 debug_assert!(!latest_system_state.safe_mode());
1768 }
1769
1770 #[cfg(not(msim))]
1771 debug_assert!(!latest_system_state.safe_mode());
1772
1773 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1774 && self.state.is_fullnode(&cur_epoch_store)
1775 {
1776 warn!(
1777 "Failed to send end of epoch notification to subscriber: {:?}",
1778 err
1779 );
1780 }
1781
1782 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1783 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1784
1785 self.auth_agg.store(Arc::new(
1786 self.auth_agg
1787 .load()
1788 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1789 ));
1790
1791 let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1792 let next_epoch = next_epoch_committee.epoch();
1793 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1794
1795 info!(
1796 next_epoch,
1797 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1798 );
1799
1800 fail_point_async!("reconfig_delay");
1801
1802 let authority_names_to_peer_ids =
1807 new_epoch_start_state.get_authority_names_to_peer_ids();
1808 self.connection_monitor_status
1809 .update_mapping_for_epoch(authority_names_to_peer_ids);
1810
1811 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1812
1813 update_peer_addresses(&self.config, &self.endpoint_manager, &new_epoch_start_state);
1814
1815 let mut validator_components_lock_guard = self.validator_components.lock().await;
1816
1817 let new_epoch_store = self
1821 .reconfigure_state(
1822 &self.state,
1823 &cur_epoch_store,
1824 next_epoch_committee.clone(),
1825 new_epoch_start_state,
1826 hasher.clone(),
1827 )
1828 .await;
1829
1830 let new_validator_components = if let Some(ValidatorComponents {
1831 validator_server_handle,
1832 validator_overload_monitor_handle,
1833 consensus_manager,
1834 consensus_store_pruner,
1835 consensus_adapter,
1836 checkpoint_metrics,
1837 sui_tx_validator_metrics,
1838 }) = validator_components_lock_guard.take()
1839 {
1840 info!("Reconfiguring the validator.");
1841
1842 consensus_manager.shutdown().await;
1843 info!("Consensus has shut down.");
1844
1845 info!("Epoch store finished reconfiguration.");
1846
1847 let global_state_hasher_metrics = Arc::into_inner(hasher)
1850 .expect("Object state hasher should have no other references at this point")
1851 .metrics();
1852 let new_hasher = Arc::new(GlobalStateHasher::new(
1853 self.state.get_global_state_hash_store().clone(),
1854 global_state_hasher_metrics,
1855 ));
1856 let weak_hasher = Arc::downgrade(&new_hasher);
1857 *hasher_guard = Some(new_hasher);
1858
1859 consensus_store_pruner.prune(next_epoch).await;
1860
1861 if self.state.is_validator(&new_epoch_store) {
1862 Some(
1864 Self::start_epoch_specific_validator_components(
1865 &self.config,
1866 self.state.clone(),
1867 consensus_adapter,
1868 self.checkpoint_store.clone(),
1869 new_epoch_store.clone(),
1870 self.state_sync_handle.clone(),
1871 self.randomness_handle.clone(),
1872 consensus_manager,
1873 consensus_store_pruner,
1874 weak_hasher,
1875 self.backpressure_manager.clone(),
1876 validator_server_handle,
1877 validator_overload_monitor_handle,
1878 checkpoint_metrics,
1879 self.metrics.clone(),
1880 sui_tx_validator_metrics,
1881 )
1882 .await?,
1883 )
1884 } else {
1885 info!("This node is no longer a validator after reconfiguration");
1886 None
1887 }
1888 } else {
1889 let global_state_hasher_metrics = Arc::into_inner(hasher)
1892 .expect("Object state hasher should have no other references at this point")
1893 .metrics();
1894 let new_hasher = Arc::new(GlobalStateHasher::new(
1895 self.state.get_global_state_hash_store().clone(),
1896 global_state_hasher_metrics,
1897 ));
1898 let weak_hasher = Arc::downgrade(&new_hasher);
1899 *hasher_guard = Some(new_hasher);
1900
1901 if self.state.is_validator(&new_epoch_store) {
1902 info!("Promoting the node from fullnode to validator, starting grpc server");
1903
1904 let mut components = Self::construct_validator_components(
1905 self.config.clone(),
1906 self.state.clone(),
1907 Arc::new(next_epoch_committee.clone()),
1908 new_epoch_store.clone(),
1909 self.checkpoint_store.clone(),
1910 self.state_sync_handle.clone(),
1911 self.randomness_handle.clone(),
1912 weak_hasher,
1913 self.backpressure_manager.clone(),
1914 self.connection_monitor_status.clone(),
1915 &self.registry_service,
1916 self.metrics.clone(),
1917 self.checkpoint_metrics.clone(),
1918 )
1919 .await?;
1920
1921 components.validator_server_handle =
1922 components.validator_server_handle.start().await;
1923
1924 self.endpoint_manager
1926 .set_consensus_address_updater(components.consensus_manager.clone());
1927
1928 Some(components)
1929 } else {
1930 None
1931 }
1932 };
1933 *validator_components_lock_guard = new_validator_components;
1934
1935 cur_epoch_store.release_db_handles();
1938
1939 if cfg!(msim)
1940 && !matches!(
1941 self.config
1942 .authority_store_pruning_config
1943 .num_epochs_to_retain_for_checkpoints(),
1944 None | Some(u64::MAX) | Some(0)
1945 )
1946 {
1947 self.state
1948 .prune_checkpoints_for_eligible_epochs_for_testing(
1949 self.config.clone(),
1950 sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1951 )
1952 .await?;
1953 }
1954
1955 epoch_store = new_epoch_store;
1956 info!("Reconfiguration finished");
1957 }
1958 }
1959
1960 async fn shutdown(&self) {
1961 if let Some(validator_components) = &*self.validator_components.lock().await {
1962 validator_components.consensus_manager.shutdown().await;
1963 }
1964 }
1965
1966 async fn reconfigure_state(
1967 &self,
1968 state: &Arc<AuthorityState>,
1969 cur_epoch_store: &AuthorityPerEpochStore,
1970 next_epoch_committee: Committee,
1971 next_epoch_start_system_state: EpochStartSystemState,
1972 global_state_hasher: Arc<GlobalStateHasher>,
1973 ) -> Arc<AuthorityPerEpochStore> {
1974 let next_epoch = next_epoch_committee.epoch();
1975
1976 let last_checkpoint = self
1977 .checkpoint_store
1978 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1979 .expect("Error loading last checkpoint for current epoch")
1980 .expect("Could not load last checkpoint for current epoch");
1981
1982 let last_checkpoint_seq = *last_checkpoint.sequence_number();
1983
1984 assert_eq!(
1985 Some(last_checkpoint_seq),
1986 self.checkpoint_store
1987 .get_highest_executed_checkpoint_seq_number()
1988 .expect("Error loading highest executed checkpoint sequence number")
1989 );
1990
1991 let epoch_start_configuration = EpochStartConfiguration::new(
1992 next_epoch_start_system_state,
1993 *last_checkpoint.digest(),
1994 state.get_object_store().as_ref(),
1995 EpochFlag::default_flags_for_new_epoch(&state.config),
1996 )
1997 .expect("EpochStartConfiguration construction cannot fail");
1998
1999 let new_epoch_store = self
2000 .state
2001 .reconfigure(
2002 cur_epoch_store,
2003 self.config.supported_protocol_versions.unwrap(),
2004 next_epoch_committee,
2005 epoch_start_configuration,
2006 global_state_hasher,
2007 &self.config.expensive_safety_check_config,
2008 last_checkpoint_seq,
2009 )
2010 .await
2011 .expect("Reconfigure authority state cannot fail");
2012 info!(next_epoch, "Node State has been reconfigured");
2013 assert_eq!(next_epoch, new_epoch_store.epoch());
2014 self.state.get_reconfig_api().update_epoch_flags_metrics(
2015 cur_epoch_store.epoch_start_config().flags(),
2016 new_epoch_store.epoch_start_config().flags(),
2017 );
2018
2019 new_epoch_store
2020 }
2021
2022 pub fn get_config(&self) -> &NodeConfig {
2023 &self.config
2024 }
2025
2026 pub fn randomness_handle(&self) -> randomness::Handle {
2027 self.randomness_handle.clone()
2028 }
2029
2030 pub fn state_sync_handle(&self) -> state_sync::Handle {
2031 self.state_sync_handle.clone()
2032 }
2033
2034 pub fn endpoint_manager(&self) -> &EndpointManager {
2035 &self.endpoint_manager
2036 }
2037
2038 fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2040 let digest_str = digest.to_string();
2041 if digest_str.len() >= 8 {
2042 digest_str[0..8].to_string()
2043 } else {
2044 digest_str
2045 }
2046 }
2047
2048 async fn check_and_recover_forks(
2052 checkpoint_store: &CheckpointStore,
2053 checkpoint_metrics: &CheckpointMetrics,
2054 is_validator: bool,
2055 fork_recovery: Option<&ForkRecoveryConfig>,
2056 ) -> Result<()> {
2057 if !is_validator {
2060 return Ok(());
2061 }
2062
2063 if let Some(recovery) = fork_recovery {
2065 Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2066 Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2067 }
2068
2069 if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2070 .get_checkpoint_fork_detected()
2071 .map_err(|e| {
2072 error!("Failed to check for checkpoint fork: {:?}", e);
2073 e
2074 })?
2075 {
2076 Self::handle_checkpoint_fork(
2077 checkpoint_seq,
2078 checkpoint_digest,
2079 checkpoint_metrics,
2080 fork_recovery,
2081 )
2082 .await?;
2083 }
2084 if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2085 .get_transaction_fork_detected()
2086 .map_err(|e| {
2087 error!("Failed to check for transaction fork: {:?}", e);
2088 e
2089 })?
2090 {
2091 Self::handle_transaction_fork(
2092 tx_digest,
2093 expected_effects,
2094 actual_effects,
2095 checkpoint_metrics,
2096 fork_recovery,
2097 )
2098 .await?;
2099 }
2100
2101 Ok(())
2102 }
2103
2104 fn try_recover_checkpoint_fork(
2105 checkpoint_store: &CheckpointStore,
2106 recovery: &ForkRecoveryConfig,
2107 ) -> Result<()> {
2108 for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2111 let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2112 anyhow::bail!(
2113 "Invalid checkpoint digest override for seq {}: {}",
2114 seq,
2115 expected_digest_str
2116 );
2117 };
2118
2119 if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2120 let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2121 if local_digest != expected_digest {
2122 info!(
2123 seq,
2124 local = %Self::get_digest_prefix(local_digest),
2125 expected = %Self::get_digest_prefix(expected_digest),
2126 "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2127 seq
2128 );
2129 checkpoint_store
2130 .clear_locally_computed_checkpoints_from(*seq)
2131 .context(
2132 "Failed to clear locally computed checkpoints from override seq",
2133 )?;
2134 }
2135 }
2136 }
2137
2138 if let Some((checkpoint_seq, checkpoint_digest)) =
2139 checkpoint_store.get_checkpoint_fork_detected()?
2140 && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2141 {
2142 info!(
2143 "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2144 checkpoint_seq, checkpoint_digest
2145 );
2146 checkpoint_store
2147 .clear_checkpoint_fork_detected()
2148 .expect("Failed to clear checkpoint fork detected marker");
2149 }
2150 Ok(())
2151 }
2152
2153 fn try_recover_transaction_fork(
2154 checkpoint_store: &CheckpointStore,
2155 recovery: &ForkRecoveryConfig,
2156 ) -> Result<()> {
2157 if recovery.transaction_overrides.is_empty() {
2158 return Ok(());
2159 }
2160
2161 if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2162 && recovery
2163 .transaction_overrides
2164 .contains_key(&tx_digest.to_string())
2165 {
2166 info!(
2167 "Fork recovery enabled: clearing transaction fork for tx {:?}",
2168 tx_digest
2169 );
2170 checkpoint_store
2171 .clear_transaction_fork_detected()
2172 .expect("Failed to clear transaction fork detected marker");
2173 }
2174 Ok(())
2175 }
2176
2177 fn get_current_timestamp() -> u64 {
2178 std::time::SystemTime::now()
2179 .duration_since(std::time::SystemTime::UNIX_EPOCH)
2180 .unwrap()
2181 .as_secs()
2182 }
2183
2184 async fn handle_checkpoint_fork(
2185 checkpoint_seq: u64,
2186 checkpoint_digest: CheckpointDigest,
2187 checkpoint_metrics: &CheckpointMetrics,
2188 fork_recovery: Option<&ForkRecoveryConfig>,
2189 ) -> Result<()> {
2190 checkpoint_metrics
2191 .checkpoint_fork_crash_mode
2192 .with_label_values(&[
2193 &checkpoint_seq.to_string(),
2194 &Self::get_digest_prefix(checkpoint_digest),
2195 &Self::get_current_timestamp().to_string(),
2196 ])
2197 .set(1);
2198
2199 let behavior = fork_recovery
2200 .map(|fr| fr.fork_crash_behavior)
2201 .unwrap_or_default();
2202
2203 match behavior {
2204 ForkCrashBehavior::AwaitForkRecovery => {
2205 error!(
2206 checkpoint_seq = checkpoint_seq,
2207 checkpoint_digest = ?checkpoint_digest,
2208 "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2209 );
2210 futures::future::pending::<()>().await;
2211 unreachable!("pending() should never return");
2212 }
2213 ForkCrashBehavior::ReturnError => {
2214 error!(
2215 checkpoint_seq = checkpoint_seq,
2216 checkpoint_digest = ?checkpoint_digest,
2217 "Checkpoint fork detected! Returning error."
2218 );
2219 Err(anyhow::anyhow!(
2220 "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2221 checkpoint_seq,
2222 checkpoint_digest
2223 ))
2224 }
2225 }
2226 }
2227
2228 async fn handle_transaction_fork(
2229 tx_digest: TransactionDigest,
2230 expected_effects_digest: TransactionEffectsDigest,
2231 actual_effects_digest: TransactionEffectsDigest,
2232 checkpoint_metrics: &CheckpointMetrics,
2233 fork_recovery: Option<&ForkRecoveryConfig>,
2234 ) -> Result<()> {
2235 checkpoint_metrics
2236 .transaction_fork_crash_mode
2237 .with_label_values(&[
2238 &Self::get_digest_prefix(tx_digest),
2239 &Self::get_digest_prefix(expected_effects_digest),
2240 &Self::get_digest_prefix(actual_effects_digest),
2241 &Self::get_current_timestamp().to_string(),
2242 ])
2243 .set(1);
2244
2245 let behavior = fork_recovery
2246 .map(|fr| fr.fork_crash_behavior)
2247 .unwrap_or_default();
2248
2249 match behavior {
2250 ForkCrashBehavior::AwaitForkRecovery => {
2251 error!(
2252 tx_digest = ?tx_digest,
2253 expected_effects_digest = ?expected_effects_digest,
2254 actual_effects_digest = ?actual_effects_digest,
2255 "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2256 );
2257 futures::future::pending::<()>().await;
2258 unreachable!("pending() should never return");
2259 }
2260 ForkCrashBehavior::ReturnError => {
2261 error!(
2262 tx_digest = ?tx_digest,
2263 expected_effects_digest = ?expected_effects_digest,
2264 actual_effects_digest = ?actual_effects_digest,
2265 "Transaction fork detected! Returning error."
2266 );
2267 Err(anyhow::anyhow!(
2268 "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2269 tx_digest,
2270 expected_effects_digest,
2271 actual_effects_digest
2272 ))
2273 }
2274 }
2275 }
2276}
2277
2278#[cfg(not(msim))]
2279impl SuiNode {
2280 async fn fetch_jwks(
2281 _authority: AuthorityName,
2282 provider: &OIDCProvider,
2283 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2284 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2285 use sui_types::error::SuiErrorKind;
2286 let client = reqwest::Client::new();
2287 fetch_jwks(provider, &client, true)
2288 .await
2289 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2290 }
2291}
2292
2293#[cfg(msim)]
2294impl SuiNode {
2295 pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2296 self.sim_state.sim_node.id()
2297 }
2298
2299 pub fn set_safe_mode_expected(&self, new_value: bool) {
2300 info!("Setting safe mode expected to {}", new_value);
2301 self.sim_state
2302 .sim_safe_mode_expected
2303 .store(new_value, Ordering::Relaxed);
2304 }
2305
2306 #[allow(unused_variables)]
2307 async fn fetch_jwks(
2308 authority: AuthorityName,
2309 provider: &OIDCProvider,
2310 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2311 get_jwk_injector()(authority, provider)
2312 }
2313}
2314
2315enum SpawnOnce {
2316 Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2318 #[allow(unused)]
2319 Started(JoinHandle<()>),
2320}
2321
2322impl SpawnOnce {
2323 pub fn new(
2324 ready_rx: oneshot::Receiver<()>,
2325 future: impl Future<Output = ()> + Send + 'static,
2326 ) -> Self {
2327 Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2328 }
2329
2330 pub async fn start(self) -> Self {
2331 match self {
2332 Self::Unstarted(ready_rx, future) => {
2333 let future = future.into_inner();
2334 let handle = tokio::spawn(future);
2335 ready_rx.await.unwrap();
2336 Self::Started(handle)
2337 }
2338 Self::Started(_) => self,
2339 }
2340 }
2341}
2342
2343fn update_peer_addresses(
2345 config: &NodeConfig,
2346 endpoint_manager: &EndpointManager,
2347 epoch_start_state: &EpochStartSystemState,
2348) {
2349 for (peer_id, address) in
2350 epoch_start_state.get_validator_as_p2p_peers(config.protocol_public_key())
2351 {
2352 endpoint_manager
2353 .update_endpoint(
2354 EndpointId::P2p(peer_id),
2355 AddressSource::Chain,
2356 vec![address],
2357 )
2358 .expect("Updating peer addresses should not fail");
2359 }
2360}
2361
2362fn build_kv_store(
2363 state: &Arc<AuthorityState>,
2364 config: &NodeConfig,
2365 registry: &Registry,
2366) -> Result<Arc<TransactionKeyValueStore>> {
2367 let metrics = KeyValueStoreMetrics::new(registry);
2368 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2369
2370 let base_url = &config.transaction_kv_store_read_config.base_url;
2371
2372 if base_url.is_empty() {
2373 info!("no http kv store url provided, using local db only");
2374 return Ok(Arc::new(db_store));
2375 }
2376
2377 let base_url: url::Url = base_url.parse().tap_err(|e| {
2378 error!(
2379 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2380 base_url, e
2381 )
2382 })?;
2383
2384 let network_str = match state.get_chain_identifier().chain() {
2385 Chain::Mainnet => "/mainnet",
2386 _ => {
2387 info!("using local db only for kv store");
2388 return Ok(Arc::new(db_store));
2389 }
2390 };
2391
2392 let base_url = base_url.join(network_str)?.to_string();
2393 let http_store = HttpKVStore::new_kv(
2394 &base_url,
2395 config.transaction_kv_store_read_config.cache_size,
2396 metrics.clone(),
2397 )?;
2398 info!("using local key-value store with fallback to http key-value store");
2399 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2400 db_store,
2401 http_store,
2402 metrics,
2403 "json_rpc_fallback",
2404 )))
2405}
2406
2407async fn build_http_servers(
2408 state: Arc<AuthorityState>,
2409 store: RocksDbStore,
2410 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2411 config: &NodeConfig,
2412 prometheus_registry: &Registry,
2413 server_version: ServerVersion,
2414) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2415 if config.consensus_config().is_some() {
2417 return Ok((HttpServers::default(), None));
2418 }
2419
2420 info!("starting rpc service with config: {:?}", config.rpc);
2421
2422 let mut router = axum::Router::new();
2423
2424 let json_rpc_router = {
2425 let traffic_controller = state.traffic_controller.clone();
2426 let mut server = JsonRpcServerBuilder::new(
2427 env!("CARGO_PKG_VERSION"),
2428 prometheus_registry,
2429 traffic_controller,
2430 config.policy_config.clone(),
2431 );
2432
2433 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2434
2435 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2436 server.register_module(ReadApi::new(
2437 state.clone(),
2438 kv_store.clone(),
2439 metrics.clone(),
2440 ))?;
2441 server.register_module(CoinReadApi::new(
2442 state.clone(),
2443 kv_store.clone(),
2444 metrics.clone(),
2445 ))?;
2446
2447 if config.run_with_range.is_none() {
2450 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2451 }
2452 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2453 server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2454
2455 if let Some(transaction_orchestrator) = transaction_orchestrator {
2456 server.register_module(TransactionExecutionApi::new(
2457 state.clone(),
2458 transaction_orchestrator.clone(),
2459 metrics.clone(),
2460 ))?;
2461 }
2462
2463 let name_service_config =
2464 if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2465 config.name_service_package_address,
2466 config.name_service_registry_id,
2467 config.name_service_reverse_registry_id,
2468 ) {
2469 sui_name_service::NameServiceConfig::new(
2470 package_address,
2471 registry_id,
2472 reverse_registry_id,
2473 )
2474 } else {
2475 match state.get_chain_identifier().chain() {
2476 Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2477 Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2478 Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2479 }
2480 };
2481
2482 server.register_module(IndexerApi::new(
2483 state.clone(),
2484 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2485 kv_store,
2486 name_service_config,
2487 metrics,
2488 config.indexer_max_subscriptions,
2489 ))?;
2490 server.register_module(MoveUtils::new(state.clone()))?;
2491
2492 let server_type = config.jsonrpc_server_type();
2493
2494 server.to_router(server_type).await?
2495 };
2496
2497 router = router.merge(json_rpc_router);
2498
2499 let (subscription_service_checkpoint_sender, subscription_service_handle) =
2500 SubscriptionService::build(prometheus_registry);
2501 let rpc_router = {
2502 let mut rpc_service =
2503 sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2504 rpc_service.with_server_version(server_version);
2505
2506 if let Some(config) = config.rpc.clone() {
2507 rpc_service.with_config(config);
2508 }
2509
2510 rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2511 rpc_service.with_subscription_service(subscription_service_handle);
2512
2513 if let Some(transaction_orchestrator) = transaction_orchestrator {
2514 rpc_service.with_executor(transaction_orchestrator.clone())
2515 }
2516
2517 rpc_service.into_router().await
2518 };
2519
2520 let layers = ServiceBuilder::new()
2521 .map_request(|mut request: axum::http::Request<_>| {
2522 if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2523 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2524 request.extensions_mut().insert(axum_connect_info);
2525 }
2526 request
2527 })
2528 .layer(axum::middleware::from_fn(server_timing_middleware))
2529 .layer(
2531 tower_http::cors::CorsLayer::new()
2532 .allow_methods([http::Method::GET, http::Method::POST])
2533 .allow_origin(tower_http::cors::Any)
2534 .allow_headers(tower_http::cors::Any)
2535 .expose_headers(tower_http::cors::Any),
2536 );
2537
2538 router = router.merge(rpc_router).layer(layers);
2539
2540 let https = if let Some((tls_config, https_address)) = config
2541 .rpc()
2542 .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2543 {
2544 let https = sui_http::Builder::new()
2545 .tls_single_cert(tls_config.cert(), tls_config.key())
2546 .and_then(|builder| builder.serve(https_address, router.clone()))
2547 .map_err(|e| anyhow::anyhow!(e))?;
2548
2549 info!(
2550 https_address =? https.local_addr(),
2551 "HTTPS rpc server listening on {}",
2552 https.local_addr()
2553 );
2554
2555 Some(https)
2556 } else {
2557 None
2558 };
2559
2560 let http = sui_http::Builder::new()
2561 .serve(&config.json_rpc_address, router)
2562 .map_err(|e| anyhow::anyhow!(e))?;
2563
2564 info!(
2565 http_address =? http.local_addr(),
2566 "HTTP rpc server listening on {}",
2567 http.local_addr()
2568 );
2569
2570 Ok((
2571 HttpServers {
2572 http: Some(http),
2573 https,
2574 },
2575 Some(subscription_service_checkpoint_sender),
2576 ))
2577}
2578
2579#[cfg(not(test))]
2580fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2581 protocol_config.max_transactions_per_checkpoint() as usize
2582}
2583
2584#[cfg(test)]
2585fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2586 2
2587}
2588
2589#[derive(Default)]
2590struct HttpServers {
2591 #[allow(unused)]
2592 http: Option<sui_http::ServerHandle>,
2593 #[allow(unused)]
2594 https: Option<sui_http::ServerHandle>,
2595}
2596
2597#[cfg(test)]
2598mod tests {
2599 use super::*;
2600 use prometheus::Registry;
2601 use std::collections::BTreeMap;
2602 use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2603 use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2604 use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2605
2606 #[tokio::test]
2607 async fn test_fork_error_and_recovery_both_paths() {
2608 let checkpoint_store = CheckpointStore::new_for_tests();
2609 let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2610
2611 let seq_num = 42;
2613 let digest = CheckpointDigest::random();
2614 checkpoint_store
2615 .record_checkpoint_fork_detected(seq_num, digest)
2616 .unwrap();
2617
2618 let fork_recovery = ForkRecoveryConfig {
2619 transaction_overrides: Default::default(),
2620 checkpoint_overrides: Default::default(),
2621 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2622 };
2623
2624 let r = SuiNode::check_and_recover_forks(
2625 &checkpoint_store,
2626 &checkpoint_metrics,
2627 true,
2628 Some(&fork_recovery),
2629 )
2630 .await;
2631 assert!(r.is_err());
2632 assert!(
2633 r.unwrap_err()
2634 .to_string()
2635 .contains("Checkpoint fork detected")
2636 );
2637
2638 let mut checkpoint_overrides = BTreeMap::new();
2639 checkpoint_overrides.insert(seq_num, digest.to_string());
2640 let fork_recovery_with_override = ForkRecoveryConfig {
2641 transaction_overrides: Default::default(),
2642 checkpoint_overrides,
2643 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2644 };
2645 let r = SuiNode::check_and_recover_forks(
2646 &checkpoint_store,
2647 &checkpoint_metrics,
2648 true,
2649 Some(&fork_recovery_with_override),
2650 )
2651 .await;
2652 assert!(r.is_ok());
2653 assert!(
2654 checkpoint_store
2655 .get_checkpoint_fork_detected()
2656 .unwrap()
2657 .is_none()
2658 );
2659
2660 let tx_digest = TransactionDigest::random();
2662 let expected_effects = TransactionEffectsDigest::random();
2663 let actual_effects = TransactionEffectsDigest::random();
2664 checkpoint_store
2665 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2666 .unwrap();
2667
2668 let fork_recovery = ForkRecoveryConfig {
2669 transaction_overrides: Default::default(),
2670 checkpoint_overrides: Default::default(),
2671 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2672 };
2673 let r = SuiNode::check_and_recover_forks(
2674 &checkpoint_store,
2675 &checkpoint_metrics,
2676 true,
2677 Some(&fork_recovery),
2678 )
2679 .await;
2680 assert!(r.is_err());
2681 assert!(
2682 r.unwrap_err()
2683 .to_string()
2684 .contains("Transaction fork detected")
2685 );
2686
2687 let mut transaction_overrides = BTreeMap::new();
2688 transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2689 let fork_recovery_with_override = ForkRecoveryConfig {
2690 transaction_overrides,
2691 checkpoint_overrides: Default::default(),
2692 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2693 };
2694 let r = SuiNode::check_and_recover_forks(
2695 &checkpoint_store,
2696 &checkpoint_metrics,
2697 true,
2698 Some(&fork_recovery_with_override),
2699 )
2700 .await;
2701 assert!(r.is_ok());
2702 assert!(
2703 checkpoint_store
2704 .get_transaction_fork_detected()
2705 .unwrap()
2706 .is_none()
2707 );
2708 }
2709}