1use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::authority::ExecutionEnv;
29use sui_core::authority::RandomnessRoundReceiver;
30use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
31use sui_core::authority::backpressure::BackpressureManager;
32use sui_core::authority::epoch_start_configuration::EpochFlag;
33use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
34use sui_core::consensus_adapter::ConsensusClient;
35use sui_core::consensus_manager::UpdatableConsensusClient;
36use sui_core::epoch::randomness::RandomnessManager;
37use sui_core::execution_cache::build_execution_cache;
38use sui_network::endpoint_manager::{AddressSource, EndpointId};
39use sui_network::validator::server::SUI_TLS_SERVER_NAME;
40use sui_types::full_checkpoint_content::Checkpoint;
41
42use sui_core::global_state_hasher::GlobalStateHashMetrics;
43use sui_core::storage::RestReadStore;
44use sui_json_rpc::bridge_api::BridgeReadApi;
45use sui_json_rpc_api::JsonRpcMetrics;
46use sui_network::randomness;
47use sui_rpc_api::RpcMetrics;
48use sui_rpc_api::ServerVersion;
49use sui_rpc_api::subscription::SubscriptionService;
50use sui_types::base_types::ConciseableName;
51use sui_types::crypto::RandomnessRound;
52use sui_types::digests::{
53 ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
54};
55use sui_types::messages_consensus::AuthorityCapabilitiesV2;
56use sui_types::sui_system_state::SuiSystemState;
57use tap::tap::TapFallible;
58use tokio::sync::oneshot;
59use tokio::sync::{Mutex, broadcast, mpsc};
60use tokio::task::JoinHandle;
61use tower::ServiceBuilder;
62use tracing::{Instrument, error_span, info};
63use tracing::{debug, error, warn};
64
65macro_rules! jwk_log {
69 ($($arg:tt)+) => {
70 if in_test_configuration() {
71 debug!($($arg)+);
72 } else {
73 info!($($arg)+);
74 }
75 };
76}
77
78use fastcrypto_zkp::bn254::zk_login::JWK;
79pub use handle::SuiNodeHandle;
80use mysten_metrics::{RegistryService, spawn_monitored_task};
81use mysten_service::server_timing::server_timing_middleware;
82use sui_config::node::{DBCheckpointConfig, RunWithRange};
83use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
84use sui_config::node_config_metrics::NodeConfigMetrics;
85use sui_config::{ConsensusConfig, NodeConfig};
86use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
87use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
88use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
89use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
90use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
91use sui_core::authority_aggregator::AuthorityAggregator;
92use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
93use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
94use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
95use sui_core::checkpoints::{
96 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
97 SubmitCheckpointToConsensus,
98};
99use sui_core::consensus_adapter::{
100 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
101};
102use sui_core::consensus_manager::ConsensusManager;
103use sui_core::consensus_throughput_calculator::{
104 ConsensusThroughputCalculator, ConsensusThroughputProfiler, ThroughputProfileRanges,
105};
106use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
107use sui_core::db_checkpoint_handler::DBCheckpointHandler;
108use sui_core::epoch::committee_store::CommitteeStore;
109use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
110use sui_core::epoch::epoch_metrics::EpochMetrics;
111use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
112use sui_core::global_state_hasher::GlobalStateHasher;
113use sui_core::jsonrpc_index::IndexStore;
114use sui_core::module_cache_metrics::ResolverMetrics;
115use sui_core::overload_monitor::overload_monitor;
116use sui_core::rpc_index::RpcIndexStore;
117use sui_core::signature_verifier::SignatureVerifierMetrics;
118use sui_core::storage::RocksDbStore;
119use sui_core::transaction_orchestrator::TransactionOrchestrator;
120use sui_core::{
121 authority::{AuthorityState, AuthorityStore},
122 authority_client::NetworkAuthorityClient,
123};
124use sui_json_rpc::JsonRpcServerBuilder;
125use sui_json_rpc::coin_api::CoinReadApi;
126use sui_json_rpc::governance_api::GovernanceReadApi;
127use sui_json_rpc::indexer_api::IndexerApi;
128use sui_json_rpc::move_utils::MoveUtils;
129use sui_json_rpc::read_api::ReadApi;
130use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
131use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
132use sui_macros::fail_point;
133use sui_macros::{fail_point_async, replay_log};
134use sui_network::api::ValidatorServer;
135use sui_network::discovery;
136use sui_network::endpoint_manager::EndpointManager;
137use sui_network::state_sync;
138use sui_network::validator::server::ServerBuilder;
139use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
140use sui_snapshot::uploader::StateSnapshotUploader;
141use sui_storage::{
142 http_key_value_store::HttpKVStore,
143 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
144 key_value_store_metrics::KeyValueStoreMetrics,
145};
146use sui_types::base_types::{AuthorityName, EpochId};
147use sui_types::committee::Committee;
148use sui_types::crypto::KeypairTraits;
149use sui_types::error::{SuiError, SuiResult};
150use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
151use sui_types::sui_system_state::SuiSystemStateTrait;
152use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
153use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
154use sui_types::supported_protocol_versions::SupportedProtocolVersions;
155use typed_store::DBMetrics;
156use typed_store::rocks::default_db_options;
157
158use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
159
160pub mod admin;
161mod handle;
162pub mod metrics;
163
164pub struct ValidatorComponents {
165 validator_server_handle: SpawnOnce,
166 validator_overload_monitor_handle: Option<JoinHandle<()>>,
167 consensus_manager: Arc<ConsensusManager>,
168 consensus_store_pruner: ConsensusStorePruner,
169 consensus_adapter: Arc<ConsensusAdapter>,
170 checkpoint_metrics: Arc<CheckpointMetrics>,
171 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
172}
173pub struct P2pComponents {
174 p2p_network: Network,
175 known_peers: HashMap<PeerId, String>,
176 discovery_handle: discovery::Handle,
177 state_sync_handle: state_sync::Handle,
178 randomness_handle: randomness::Handle,
179 endpoint_manager: EndpointManager,
180}
181
182#[cfg(msim)]
183mod simulator {
184 use std::sync::atomic::AtomicBool;
185 use sui_types::error::SuiErrorKind;
186
187 use super::*;
188 pub(super) struct SimState {
189 pub sim_node: sui_simulator::runtime::NodeHandle,
190 pub sim_safe_mode_expected: AtomicBool,
191 _leak_detector: sui_simulator::NodeLeakDetector,
192 }
193
194 impl Default for SimState {
195 fn default() -> Self {
196 Self {
197 sim_node: sui_simulator::runtime::NodeHandle::current(),
198 sim_safe_mode_expected: AtomicBool::new(false),
199 _leak_detector: sui_simulator::NodeLeakDetector::new(),
200 }
201 }
202 }
203
204 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
205 + Send
206 + Sync
207 + 'static;
208
209 fn default_fetch_jwks(
210 _authority: AuthorityName,
211 _provider: &OIDCProvider,
212 ) -> SuiResult<Vec<(JwkId, JWK)>> {
213 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
214 parse_jwks(
216 sui_types::zk_login_util::DEFAULT_JWK_BYTES,
217 &OIDCProvider::Twitch,
218 true,
219 )
220 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
221 }
222
223 thread_local! {
224 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
225 }
226
227 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
228 JWK_INJECTOR.with(|injector| injector.borrow().clone())
229 }
230
231 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
232 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
233 }
234}
235
236#[cfg(msim)]
237pub use simulator::set_jwk_injector;
238#[cfg(msim)]
239use simulator::*;
240use sui_core::authority::authority_store_pruner::PrunerWatermarks;
241use sui_core::{
242 consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
243};
244
245const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
246
247pub struct SuiNode {
248 config: NodeConfig,
249 validator_components: Mutex<Option<ValidatorComponents>>,
250
251 #[allow(unused)]
253 http_servers: HttpServers,
254
255 state: Arc<AuthorityState>,
256 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
257 registry_service: RegistryService,
258 metrics: Arc<SuiNodeMetrics>,
259 checkpoint_metrics: Arc<CheckpointMetrics>,
260
261 _discovery: discovery::Handle,
262 _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
263 state_sync_handle: state_sync::Handle,
264 randomness_handle: randomness::Handle,
265 checkpoint_store: Arc<CheckpointStore>,
266 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
267 connection_monitor_status: Arc<ConnectionMonitorStatus>,
268
269 end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
271
272 endpoint_manager: EndpointManager,
274
275 backpressure_manager: Arc<BackpressureManager>,
276
277 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
278
279 #[cfg(msim)]
280 sim_state: SimState,
281
282 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
283 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
285
286 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
291
292 subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
293}
294
295impl fmt::Debug for SuiNode {
296 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
297 f.debug_struct("SuiNode")
298 .field("name", &self.state.name.concise())
299 .finish()
300 }
301}
302
303static MAX_JWK_KEYS_PER_FETCH: usize = 100;
304
305impl SuiNode {
306 pub async fn start(
307 config: NodeConfig,
308 registry_service: RegistryService,
309 ) -> Result<Arc<SuiNode>> {
310 Self::start_async(
311 config,
312 registry_service,
313 ServerVersion::new("sui-node", "unknown"),
314 )
315 .await
316 }
317
318 fn start_jwk_updater(
319 config: &NodeConfig,
320 metrics: Arc<SuiNodeMetrics>,
321 authority: AuthorityName,
322 epoch_store: Arc<AuthorityPerEpochStore>,
323 consensus_adapter: Arc<ConsensusAdapter>,
324 ) {
325 let epoch = epoch_store.epoch();
326
327 let supported_providers = config
328 .zklogin_oauth_providers
329 .get(&epoch_store.get_chain_identifier().chain())
330 .unwrap_or(&BTreeSet::new())
331 .iter()
332 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
333 .collect::<Vec<_>>();
334
335 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
336
337 info!(
338 ?fetch_interval,
339 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
340 );
341
342 fn validate_jwk(
343 metrics: &Arc<SuiNodeMetrics>,
344 provider: &OIDCProvider,
345 id: &JwkId,
346 jwk: &JWK,
347 ) -> bool {
348 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
349 warn!(
350 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
351 id.iss, provider
352 );
353 metrics
354 .invalid_jwks
355 .with_label_values(&[&provider.to_string()])
356 .inc();
357 return false;
358 };
359
360 if iss_provider != *provider {
361 warn!(
362 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
363 id.iss, provider, iss_provider
364 );
365 metrics
366 .invalid_jwks
367 .with_label_values(&[&provider.to_string()])
368 .inc();
369 return false;
370 }
371
372 if !check_total_jwk_size(id, jwk) {
373 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
374 metrics
375 .invalid_jwks
376 .with_label_values(&[&provider.to_string()])
377 .inc();
378 return false;
379 }
380
381 true
382 }
383
384 for p in supported_providers.into_iter() {
393 let provider_str = p.to_string();
394 let epoch_store = epoch_store.clone();
395 let consensus_adapter = consensus_adapter.clone();
396 let metrics = metrics.clone();
397 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
398 async move {
399 let mut seen = HashSet::new();
402 loop {
403 jwk_log!("fetching JWK for provider {:?}", p);
404 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
405 match Self::fetch_jwks(authority, &p).await {
406 Err(e) => {
407 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
408 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
409 tokio::time::sleep(Duration::from_secs(30)).await;
411 continue;
412 }
413 Ok(mut keys) => {
414 metrics.total_jwks
415 .with_label_values(&[&provider_str])
416 .inc_by(keys.len() as u64);
417
418 keys.retain(|(id, jwk)| {
419 validate_jwk(&metrics, &p, id, jwk) &&
420 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
421 seen.insert((id.clone(), jwk.clone()))
422 });
423
424 metrics.unique_jwks
425 .with_label_values(&[&provider_str])
426 .inc_by(keys.len() as u64);
427
428 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
431 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
432 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
433 }
434
435 for (id, jwk) in keys.into_iter() {
436 jwk_log!("Submitting JWK to consensus: {:?}", id);
437
438 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
439 consensus_adapter.submit(txn, None, &epoch_store, None, None)
440 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
441 .ok();
442 }
443 }
444 }
445 tokio::time::sleep(fetch_interval).await;
446 }
447 }
448 .instrument(error_span!("jwk_updater_task", epoch)),
449 ));
450 }
451 }
452
453 pub async fn start_async(
454 config: NodeConfig,
455 registry_service: RegistryService,
456 server_version: ServerVersion,
457 ) -> Result<Arc<SuiNode>> {
458 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
459 let mut config = config.clone();
460 if config.supported_protocol_versions.is_none() {
461 info!(
462 "populating config.supported_protocol_versions with default {:?}",
463 SupportedProtocolVersions::SYSTEM_DEFAULT
464 );
465 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
466 }
467
468 let run_with_range = config.run_with_range;
469 let is_validator = config.consensus_config().is_some();
470 let is_full_node = !is_validator;
471 let prometheus_registry = registry_service.default_registry();
472
473 info!(node =? config.protocol_public_key(),
474 "Initializing sui-node listening on {}", config.network_address
475 );
476
477 DBMetrics::init(registry_service.clone());
479
480 typed_store::init_write_sync(config.enable_db_sync_to_disk);
482
483 mysten_metrics::init_metrics(&prometheus_registry);
485 #[cfg(not(msim))]
487 mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
488
489 let genesis = config.genesis()?.clone();
490
491 let secret = Arc::pin(config.protocol_key_pair().copy());
492 let genesis_committee = genesis.committee();
493 let committee_store = Arc::new(CommitteeStore::new(
494 config.db_path().join("epochs"),
495 &genesis_committee,
496 None,
497 ));
498
499 let pruner_watermarks = Arc::new(PrunerWatermarks::default());
500 let checkpoint_store = CheckpointStore::new(
501 &config.db_path().join("checkpoints"),
502 pruner_watermarks.clone(),
503 );
504 let checkpoint_metrics = CheckpointMetrics::new(®istry_service.default_registry());
505
506 Self::check_and_recover_forks(
507 &checkpoint_store,
508 &checkpoint_metrics,
509 is_validator,
510 config.fork_recovery.as_ref(),
511 )
512 .await?;
513
514 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
516 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
517 enable_write_stall,
518 is_validator,
519 };
520 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
521 &config.db_path().join("store"),
522 Some(perpetual_tables_options),
523 Some(pruner_watermarks.epoch_id.clone()),
524 ));
525 let is_genesis = perpetual_tables
526 .database_is_empty()
527 .expect("Database read should not fail at init.");
528
529 let backpressure_manager =
530 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
531
532 let store =
533 AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
534
535 let cur_epoch = store.get_recovery_epoch_at_restart()?;
536 let committee = committee_store
537 .get_committee(&cur_epoch)?
538 .expect("Committee of the current epoch must exist");
539 let epoch_start_configuration = store
540 .get_epoch_start_configuration()?
541 .expect("EpochStartConfiguration of the current epoch must exist");
542 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
543 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
544
545 let cache_traits = build_execution_cache(
546 &config.execution_cache,
547 &prometheus_registry,
548 &store,
549 backpressure_manager.clone(),
550 );
551
552 let auth_agg = {
553 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
554 Arc::new(ArcSwap::new(Arc::new(
555 AuthorityAggregator::new_from_epoch_start_state(
556 epoch_start_configuration.epoch_start_state(),
557 &committee_store,
558 safe_client_metrics_base,
559 ),
560 )))
561 };
562
563 let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
564 let chain = match config.chain_override_for_testing {
565 Some(chain) => chain,
566 None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
567 };
568
569 let highest_executed_checkpoint = checkpoint_store
570 .get_highest_executed_checkpoint_seq_number()
571 .expect("checkpoint store read cannot fail")
572 .unwrap_or(0);
573
574 let previous_epoch_last_checkpoint = if cur_epoch == 0 {
575 0
576 } else {
577 checkpoint_store
578 .get_epoch_last_checkpoint_seq_number(cur_epoch - 1)
579 .expect("checkpoint store read cannot fail")
580 .unwrap_or(highest_executed_checkpoint)
581 };
582
583 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
584 let epoch_store = AuthorityPerEpochStore::new(
585 config.protocol_public_key(),
586 committee.clone(),
587 &config.db_path().join("store"),
588 Some(epoch_options.options),
589 EpochMetrics::new(®istry_service.default_registry()),
590 epoch_start_configuration,
591 cache_traits.backing_package_store.clone(),
592 cache_traits.object_store.clone(),
593 cache_metrics,
594 signature_verifier_metrics,
595 &config.expensive_safety_check_config,
596 (chain_id, chain),
597 highest_executed_checkpoint,
598 previous_epoch_last_checkpoint,
599 Arc::new(SubmittedTransactionCacheMetrics::new(
600 ®istry_service.default_registry(),
601 )),
602 )?;
603
604 info!("created epoch store");
605
606 replay_log!(
607 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
608 epoch_store.epoch(),
609 epoch_store.protocol_config()
610 );
611
612 if is_genesis {
614 info!("checking SUI conservation at genesis");
615 cache_traits
620 .reconfig_api
621 .expensive_check_sui_conservation(&epoch_store)
622 .expect("SUI conservation check cannot fail at genesis");
623 }
624
625 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
626 let default_buffer_stake = epoch_store
627 .protocol_config()
628 .buffer_stake_for_protocol_upgrade_bps();
629 if effective_buffer_stake != default_buffer_stake {
630 warn!(
631 ?effective_buffer_stake,
632 ?default_buffer_stake,
633 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
634 );
635 }
636
637 checkpoint_store.insert_genesis_checkpoint(
638 genesis.checkpoint(),
639 genesis.checkpoint_contents().clone(),
640 &epoch_store,
641 );
642
643 info!("creating state sync store");
644 let state_sync_store = RocksDbStore::new(
645 cache_traits.clone(),
646 committee_store.clone(),
647 checkpoint_store.clone(),
648 );
649
650 let index_store = if is_full_node && config.enable_index_processing {
651 info!("creating jsonrpc index store");
652 Some(Arc::new(IndexStore::new(
653 config.db_path().join("indexes"),
654 &prometheus_registry,
655 epoch_store
656 .protocol_config()
657 .max_move_identifier_len_as_option(),
658 config.remove_deprecated_tables,
659 )))
660 } else {
661 None
662 };
663
664 let rpc_index = if is_full_node && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
665 info!("creating rpc index store");
666 Some(Arc::new(
667 RpcIndexStore::new(
668 &config.db_path(),
669 &store,
670 &checkpoint_store,
671 &epoch_store,
672 &cache_traits.backing_package_store,
673 pruner_watermarks.checkpoint_id.clone(),
674 config.rpc().cloned().unwrap_or_default(),
675 )
676 .await,
677 ))
678 } else {
679 None
680 };
681
682 let chain_identifier = epoch_store.get_chain_identifier();
683
684 info!("creating archive reader");
685 let (randomness_tx, randomness_rx) = mpsc::channel(
687 config
688 .p2p_config
689 .randomness
690 .clone()
691 .unwrap_or_default()
692 .mailbox_capacity(),
693 );
694 let P2pComponents {
695 p2p_network,
696 known_peers,
697 discovery_handle,
698 state_sync_handle,
699 randomness_handle,
700 endpoint_manager,
701 } = Self::create_p2p_network(
702 &config,
703 state_sync_store.clone(),
704 chain_identifier,
705 randomness_tx,
706 &prometheus_registry,
707 )?;
708
709 for peer in &config.p2p_config.peer_address_overrides {
711 endpoint_manager
712 .update_endpoint(
713 EndpointId::P2p(peer.peer_id),
714 AddressSource::Config,
715 peer.addresses.clone(),
716 )
717 .expect("Updating peer address overrides should not fail");
718 }
719
720 update_peer_addresses(&config, &endpoint_manager, epoch_store.epoch_start_state());
722
723 info!("start snapshot upload");
724 let state_snapshot_handle = Self::start_state_snapshot(
726 &config,
727 &prometheus_registry,
728 checkpoint_store.clone(),
729 chain_identifier,
730 )?;
731
732 info!("start db checkpoint");
734 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
735 &config,
736 &prometheus_registry,
737 state_snapshot_handle.is_some(),
738 )?;
739
740 if !epoch_store
741 .protocol_config()
742 .simplified_unwrap_then_delete()
743 {
744 config
746 .authority_store_pruning_config
747 .set_killswitch_tombstone_pruning(true);
748 }
749
750 let authority_name = config.protocol_public_key();
751
752 info!("create authority state");
753 let state = AuthorityState::new(
754 authority_name,
755 secret,
756 config.supported_protocol_versions.unwrap(),
757 store.clone(),
758 cache_traits.clone(),
759 epoch_store.clone(),
760 committee_store.clone(),
761 index_store.clone(),
762 rpc_index,
763 checkpoint_store.clone(),
764 &prometheus_registry,
765 genesis.objects(),
766 &db_checkpoint_config,
767 config.clone(),
768 chain_identifier,
769 config.policy_config.clone(),
770 config.firewall_config.clone(),
771 pruner_watermarks,
772 )
773 .await;
774 if epoch_store.epoch() == 0 {
776 let txn = &genesis.transaction();
777 let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
778 let transaction =
779 sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
780 sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
781 genesis.transaction().data().clone(),
782 sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
783 ),
784 );
785 state
786 .try_execute_immediately(&transaction, ExecutionEnv::new(), &epoch_store)
787 .instrument(span)
788 .await
789 .unwrap();
790 }
791
792 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
794
795 if config
796 .expensive_safety_check_config
797 .enable_secondary_index_checks()
798 && let Some(indexes) = state.indexes.clone()
799 {
800 sui_core::verify_indexes::verify_indexes(
801 state.get_global_state_hash_store().as_ref(),
802 indexes,
803 )
804 .expect("secondary indexes are inconsistent");
805 }
806
807 let (end_of_epoch_channel, end_of_epoch_receiver) =
808 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
809
810 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
811 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
812 auth_agg.load_full(),
813 state.clone(),
814 end_of_epoch_receiver,
815 &config.db_path(),
816 &prometheus_registry,
817 &config,
818 )))
819 } else {
820 None
821 };
822
823 let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
824 state.clone(),
825 state_sync_store,
826 &transaction_orchestrator.clone(),
827 &config,
828 &prometheus_registry,
829 server_version,
830 )
831 .await?;
832
833 let global_state_hasher = Arc::new(GlobalStateHasher::new(
834 cache_traits.global_state_hash_store.clone(),
835 GlobalStateHashMetrics::new(&prometheus_registry),
836 ));
837
838 let authority_names_to_peer_ids = epoch_store
839 .epoch_start_state()
840 .get_authority_names_to_peer_ids();
841
842 let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
843 "sui",
844 ®istry_service.default_registry(),
845 );
846
847 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
848
849 let connection_monitor_handle =
850 mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
851 p2p_network.downgrade(),
852 Arc::new(network_connection_metrics),
853 known_peers,
854 );
855
856 let connection_monitor_status = ConnectionMonitorStatus {
857 connection_statuses: connection_monitor_handle.connection_statuses(),
858 authority_names_to_peer_ids,
859 };
860
861 let connection_monitor_status = Arc::new(connection_monitor_status);
862 let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry()));
863
864 sui_node_metrics
865 .binary_max_protocol_version
866 .set(ProtocolVersion::MAX.as_u64() as i64);
867 sui_node_metrics
868 .configured_max_protocol_version
869 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
870
871 let validator_components = if state.is_validator(&epoch_store) {
872 let mut components = Self::construct_validator_components(
873 config.clone(),
874 state.clone(),
875 committee,
876 epoch_store.clone(),
877 checkpoint_store.clone(),
878 state_sync_handle.clone(),
879 randomness_handle.clone(),
880 Arc::downgrade(&global_state_hasher),
881 backpressure_manager.clone(),
882 connection_monitor_status.clone(),
883 ®istry_service,
884 sui_node_metrics.clone(),
885 checkpoint_metrics.clone(),
886 )
887 .await?;
888
889 components.consensus_adapter.submit_recovered(&epoch_store);
890
891 components.validator_server_handle = components.validator_server_handle.start().await;
893
894 endpoint_manager.set_consensus_address_updater(components.consensus_manager.clone());
896
897 Some(components)
898 } else {
899 None
900 };
901
902 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
904
905 let node = Self {
906 config,
907 validator_components: Mutex::new(validator_components),
908 http_servers,
909 state,
910 transaction_orchestrator,
911 registry_service,
912 metrics: sui_node_metrics,
913 checkpoint_metrics,
914
915 _discovery: discovery_handle,
916 _connection_monitor_handle: connection_monitor_handle,
917 state_sync_handle,
918 randomness_handle,
919 checkpoint_store,
920 global_state_hasher: Mutex::new(Some(global_state_hasher)),
921 end_of_epoch_channel,
922 connection_monitor_status,
923 endpoint_manager,
924 backpressure_manager,
925
926 _db_checkpoint_handle: db_checkpoint_handle,
927
928 #[cfg(msim)]
929 sim_state: Default::default(),
930
931 _state_snapshot_uploader_handle: state_snapshot_handle,
932 shutdown_channel_tx: shutdown_channel,
933
934 auth_agg,
935 subscription_service_checkpoint_sender,
936 };
937
938 info!("SuiNode started!");
939 let node = Arc::new(node);
940 let node_copy = node.clone();
941 spawn_monitored_task!(async move {
942 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
943 if let Err(error) = result {
944 warn!("Reconfiguration finished with error {:?}", error);
945 }
946 });
947
948 Ok(node)
949 }
950
951 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
952 self.end_of_epoch_channel.subscribe()
953 }
954
955 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
956 self.shutdown_channel_tx.subscribe()
957 }
958
959 pub fn current_epoch_for_testing(&self) -> EpochId {
960 self.state.current_epoch_for_testing()
961 }
962
963 pub fn db_checkpoint_path(&self) -> PathBuf {
964 self.config.db_checkpoint_path()
965 }
966
967 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
969 info!("close_epoch (current epoch = {})", epoch_store.epoch());
970 self.validator_components
971 .lock()
972 .await
973 .as_ref()
974 .ok_or_else(|| SuiError::from("Node is not a validator"))?
975 .consensus_adapter
976 .close_epoch(epoch_store);
977 Ok(())
978 }
979
980 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
981 self.state
982 .clear_override_protocol_upgrade_buffer_stake(epoch)
983 }
984
985 pub fn set_override_protocol_upgrade_buffer_stake(
986 &self,
987 epoch: EpochId,
988 buffer_stake_bps: u64,
989 ) -> SuiResult {
990 self.state
991 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
992 }
993
994 pub async fn close_epoch_for_testing(&self) -> SuiResult {
997 let epoch_store = self.state.epoch_store_for_testing();
998 self.close_epoch(&epoch_store).await
999 }
1000
1001 fn start_state_snapshot(
1002 config: &NodeConfig,
1003 prometheus_registry: &Registry,
1004 checkpoint_store: Arc<CheckpointStore>,
1005 chain_identifier: ChainIdentifier,
1006 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1007 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1008 let snapshot_uploader = StateSnapshotUploader::new(
1009 &config.db_checkpoint_path(),
1010 &config.snapshot_path(),
1011 remote_store_config.clone(),
1012 60,
1013 prometheus_registry,
1014 checkpoint_store,
1015 chain_identifier,
1016 config.state_snapshot_write_config.archive_interval_epochs,
1017 )?;
1018 Ok(Some(snapshot_uploader.start()))
1019 } else {
1020 Ok(None)
1021 }
1022 }
1023
1024 fn start_db_checkpoint(
1025 config: &NodeConfig,
1026 prometheus_registry: &Registry,
1027 state_snapshot_enabled: bool,
1028 ) -> Result<(
1029 DBCheckpointConfig,
1030 Option<tokio::sync::broadcast::Sender<()>>,
1031 )> {
1032 let checkpoint_path = Some(
1033 config
1034 .db_checkpoint_config
1035 .checkpoint_path
1036 .clone()
1037 .unwrap_or_else(|| config.db_checkpoint_path()),
1038 );
1039 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1040 DBCheckpointConfig {
1041 checkpoint_path,
1042 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1043 true
1044 } else {
1045 config
1046 .db_checkpoint_config
1047 .perform_db_checkpoints_at_epoch_end
1048 },
1049 ..config.db_checkpoint_config.clone()
1050 }
1051 } else {
1052 config.db_checkpoint_config.clone()
1053 };
1054
1055 match (
1056 db_checkpoint_config.object_store_config.as_ref(),
1057 state_snapshot_enabled,
1058 ) {
1059 (None, false) => Ok((db_checkpoint_config, None)),
1064 (_, _) => {
1065 let handler = DBCheckpointHandler::new(
1066 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1067 db_checkpoint_config.object_store_config.as_ref(),
1068 60,
1069 db_checkpoint_config
1070 .prune_and_compact_before_upload
1071 .unwrap_or(true),
1072 config.authority_store_pruning_config.clone(),
1073 prometheus_registry,
1074 state_snapshot_enabled,
1075 )?;
1076 Ok((
1077 db_checkpoint_config,
1078 Some(DBCheckpointHandler::start(handler)),
1079 ))
1080 }
1081 }
1082 }
1083
1084 fn create_p2p_network(
1085 config: &NodeConfig,
1086 state_sync_store: RocksDbStore,
1087 chain_identifier: ChainIdentifier,
1088 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1089 prometheus_registry: &Registry,
1090 ) -> Result<P2pComponents> {
1091 let (state_sync, state_sync_router) = state_sync::Builder::new()
1092 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1093 .store(state_sync_store)
1094 .archive_config(config.archive_reader_config())
1095 .with_metrics(prometheus_registry)
1096 .build();
1097
1098 let mut discovery_builder = discovery::Builder::new().config(config.p2p_config.clone());
1099 if let Some(consensus_config) = &config.consensus_config {
1100 let effective_addr = consensus_config
1101 .external_address
1102 .as_ref()
1103 .or(consensus_config.listen_address.as_ref());
1104 if let Some(addr) = effective_addr {
1105 discovery_builder = discovery_builder.consensus_external_address(addr.clone());
1106 }
1107 }
1108 let (discovery, discovery_server, endpoint_manager) = discovery_builder.build();
1109
1110 let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1111 let known_peers: HashMap<PeerId, String> = discovery_config
1112 .allowlisted_peers
1113 .clone()
1114 .into_iter()
1115 .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1116 .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1117 peer.peer_id
1118 .map(|peer_id| (peer_id, "seed_peer".to_string()))
1119 }))
1120 .collect();
1121
1122 let (randomness, randomness_router) =
1123 randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1124 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1125 .with_metrics(prometheus_registry)
1126 .build();
1127
1128 let p2p_network = {
1129 let routes = anemo::Router::new()
1130 .add_rpc_service(discovery_server)
1131 .merge(state_sync_router);
1132 let routes = routes.merge(randomness_router);
1133
1134 let inbound_network_metrics =
1135 mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1136 let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1137 "sui",
1138 "outbound",
1139 prometheus_registry,
1140 );
1141
1142 let service = ServiceBuilder::new()
1143 .layer(
1144 TraceLayer::new_for_server_errors()
1145 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1146 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1147 )
1148 .layer(CallbackLayer::new(
1149 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1150 Arc::new(inbound_network_metrics),
1151 config.p2p_config.excessive_message_size(),
1152 ),
1153 ))
1154 .service(routes);
1155
1156 let outbound_layer = ServiceBuilder::new()
1157 .layer(
1158 TraceLayer::new_for_client_and_server_errors()
1159 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1160 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1161 )
1162 .layer(CallbackLayer::new(
1163 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1164 Arc::new(outbound_network_metrics),
1165 config.p2p_config.excessive_message_size(),
1166 ),
1167 ))
1168 .into_inner();
1169
1170 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1171 anemo_config.max_frame_size = Some(1 << 30);
1174
1175 let mut quic_config = anemo_config.quic.unwrap_or_default();
1178 if quic_config.socket_send_buffer_size.is_none() {
1179 quic_config.socket_send_buffer_size = Some(20 << 20);
1180 }
1181 if quic_config.socket_receive_buffer_size.is_none() {
1182 quic_config.socket_receive_buffer_size = Some(20 << 20);
1183 }
1184 quic_config.allow_failed_socket_buffer_size_setting = true;
1185
1186 if quic_config.max_concurrent_bidi_streams.is_none() {
1189 quic_config.max_concurrent_bidi_streams = Some(500);
1190 }
1191 if quic_config.max_concurrent_uni_streams.is_none() {
1192 quic_config.max_concurrent_uni_streams = Some(500);
1193 }
1194 if quic_config.stream_receive_window.is_none() {
1195 quic_config.stream_receive_window = Some(100 << 20);
1196 }
1197 if quic_config.receive_window.is_none() {
1198 quic_config.receive_window = Some(200 << 20);
1199 }
1200 if quic_config.send_window.is_none() {
1201 quic_config.send_window = Some(200 << 20);
1202 }
1203 if quic_config.crypto_buffer_size.is_none() {
1204 quic_config.crypto_buffer_size = Some(1 << 20);
1205 }
1206 if quic_config.max_idle_timeout_ms.is_none() {
1207 quic_config.max_idle_timeout_ms = Some(10_000);
1208 }
1209 if quic_config.keep_alive_interval_ms.is_none() {
1210 quic_config.keep_alive_interval_ms = Some(5_000);
1211 }
1212 anemo_config.quic = Some(quic_config);
1213
1214 let server_name = format!("sui-{}", chain_identifier);
1215 let network = Network::bind(config.p2p_config.listen_address)
1216 .server_name(&server_name)
1217 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1218 .config(anemo_config)
1219 .outbound_request_layer(outbound_layer)
1220 .start(service)?;
1221 info!(
1222 server_name = server_name,
1223 "P2p network started on {}",
1224 network.local_addr()
1225 );
1226
1227 network
1228 };
1229
1230 let discovery_handle =
1231 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1232 let state_sync_handle = state_sync.start(p2p_network.clone());
1233 let randomness_handle = randomness.start(p2p_network.clone());
1234
1235 Ok(P2pComponents {
1236 p2p_network,
1237 known_peers,
1238 discovery_handle,
1239 state_sync_handle,
1240 randomness_handle,
1241 endpoint_manager,
1242 })
1243 }
1244
1245 async fn construct_validator_components(
1246 config: NodeConfig,
1247 state: Arc<AuthorityState>,
1248 committee: Arc<Committee>,
1249 epoch_store: Arc<AuthorityPerEpochStore>,
1250 checkpoint_store: Arc<CheckpointStore>,
1251 state_sync_handle: state_sync::Handle,
1252 randomness_handle: randomness::Handle,
1253 global_state_hasher: Weak<GlobalStateHasher>,
1254 backpressure_manager: Arc<BackpressureManager>,
1255 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1256 registry_service: &RegistryService,
1257 sui_node_metrics: Arc<SuiNodeMetrics>,
1258 checkpoint_metrics: Arc<CheckpointMetrics>,
1259 ) -> Result<ValidatorComponents> {
1260 let mut config_clone = config.clone();
1261 let consensus_config = config_clone
1262 .consensus_config
1263 .as_mut()
1264 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1265
1266 let client = Arc::new(UpdatableConsensusClient::new());
1267 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1268 &committee,
1269 consensus_config,
1270 state.name,
1271 connection_monitor_status.clone(),
1272 ®istry_service.default_registry(),
1273 epoch_store.protocol_config().clone(),
1274 client.clone(),
1275 checkpoint_store.clone(),
1276 ));
1277 let consensus_manager = Arc::new(ConsensusManager::new(
1278 &config,
1279 consensus_config,
1280 registry_service,
1281 client,
1282 ));
1283
1284 let consensus_store_pruner = ConsensusStorePruner::new(
1286 consensus_manager.get_storage_base_path(),
1287 consensus_config.db_retention_epochs(),
1288 consensus_config.db_pruner_period(),
1289 ®istry_service.default_registry(),
1290 );
1291
1292 let sui_tx_validator_metrics =
1293 SuiTxValidatorMetrics::new(®istry_service.default_registry());
1294
1295 let validator_server_handle = Self::start_grpc_validator_service(
1296 &config,
1297 state.clone(),
1298 consensus_adapter.clone(),
1299 ®istry_service.default_registry(),
1300 )
1301 .await?;
1302
1303 let validator_overload_monitor_handle = if config
1306 .authority_overload_config
1307 .max_load_shedding_percentage
1308 > 0
1309 {
1310 let authority_state = Arc::downgrade(&state);
1311 let overload_config = config.authority_overload_config.clone();
1312 fail_point!("starting_overload_monitor");
1313 Some(spawn_monitored_task!(overload_monitor(
1314 authority_state,
1315 overload_config,
1316 )))
1317 } else {
1318 None
1319 };
1320
1321 Self::start_epoch_specific_validator_components(
1322 &config,
1323 state.clone(),
1324 consensus_adapter,
1325 checkpoint_store,
1326 epoch_store,
1327 state_sync_handle,
1328 randomness_handle,
1329 consensus_manager,
1330 consensus_store_pruner,
1331 global_state_hasher,
1332 backpressure_manager,
1333 validator_server_handle,
1334 validator_overload_monitor_handle,
1335 checkpoint_metrics,
1336 sui_node_metrics,
1337 sui_tx_validator_metrics,
1338 )
1339 .await
1340 }
1341
1342 async fn start_epoch_specific_validator_components(
1343 config: &NodeConfig,
1344 state: Arc<AuthorityState>,
1345 consensus_adapter: Arc<ConsensusAdapter>,
1346 checkpoint_store: Arc<CheckpointStore>,
1347 epoch_store: Arc<AuthorityPerEpochStore>,
1348 state_sync_handle: state_sync::Handle,
1349 randomness_handle: randomness::Handle,
1350 consensus_manager: Arc<ConsensusManager>,
1351 consensus_store_pruner: ConsensusStorePruner,
1352 state_hasher: Weak<GlobalStateHasher>,
1353 backpressure_manager: Arc<BackpressureManager>,
1354 validator_server_handle: SpawnOnce,
1355 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1356 checkpoint_metrics: Arc<CheckpointMetrics>,
1357 sui_node_metrics: Arc<SuiNodeMetrics>,
1358 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1359 ) -> Result<ValidatorComponents> {
1360 let checkpoint_service = Self::build_checkpoint_service(
1361 config,
1362 consensus_adapter.clone(),
1363 checkpoint_store.clone(),
1364 epoch_store.clone(),
1365 state.clone(),
1366 state_sync_handle,
1367 state_hasher,
1368 checkpoint_metrics.clone(),
1369 );
1370
1371 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1375
1376 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1377
1378 if epoch_store.randomness_state_enabled() {
1379 let randomness_manager = RandomnessManager::try_new(
1380 Arc::downgrade(&epoch_store),
1381 Box::new(consensus_adapter.clone()),
1382 randomness_handle,
1383 config.protocol_key_pair(),
1384 )
1385 .await;
1386 if let Some(randomness_manager) = randomness_manager {
1387 epoch_store
1388 .set_randomness_manager(randomness_manager)
1389 .await?;
1390 }
1391 }
1392
1393 ExecutionTimeObserver::spawn(
1394 epoch_store.clone(),
1395 Box::new(consensus_adapter.clone()),
1396 config
1397 .execution_time_observer_config
1398 .clone()
1399 .unwrap_or_default(),
1400 );
1401
1402 let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1403 None,
1404 state.metrics.clone(),
1405 ));
1406
1407 let throughput_profiler = Arc::new(ConsensusThroughputProfiler::new(
1408 throughput_calculator.clone(),
1409 None,
1410 None,
1411 state.metrics.clone(),
1412 ThroughputProfileRanges::from_chain(epoch_store.get_chain_identifier()),
1413 ));
1414
1415 consensus_adapter.swap_throughput_profiler(throughput_profiler);
1416
1417 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1418 state.clone(),
1419 checkpoint_service.clone(),
1420 epoch_store.clone(),
1421 consensus_adapter.clone(),
1422 low_scoring_authorities,
1423 throughput_calculator,
1424 backpressure_manager,
1425 config.congestion_log.clone(),
1426 );
1427
1428 info!("Starting consensus manager asynchronously");
1429
1430 tokio::spawn({
1432 let config = config.clone();
1433 let epoch_store = epoch_store.clone();
1434 let sui_tx_validator = SuiTxValidator::new(
1435 state.clone(),
1436 epoch_store.clone(),
1437 checkpoint_service.clone(),
1438 sui_tx_validator_metrics.clone(),
1439 );
1440 let consensus_manager = consensus_manager.clone();
1441 async move {
1442 consensus_manager
1443 .start(
1444 &config,
1445 epoch_store,
1446 consensus_handler_initializer,
1447 sui_tx_validator,
1448 )
1449 .await;
1450 }
1451 });
1452 let replay_waiter = consensus_manager.replay_waiter();
1453
1454 info!("Spawning checkpoint service");
1455 let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1456 None
1457 } else {
1458 Some(replay_waiter)
1459 };
1460 checkpoint_service
1461 .spawn(epoch_store.clone(), replay_waiter)
1462 .await;
1463
1464 if epoch_store.authenticator_state_enabled() {
1465 Self::start_jwk_updater(
1466 config,
1467 sui_node_metrics,
1468 state.name,
1469 epoch_store.clone(),
1470 consensus_adapter.clone(),
1471 );
1472 }
1473
1474 Ok(ValidatorComponents {
1475 validator_server_handle,
1476 validator_overload_monitor_handle,
1477 consensus_manager,
1478 consensus_store_pruner,
1479 consensus_adapter,
1480 checkpoint_metrics,
1481 sui_tx_validator_metrics,
1482 })
1483 }
1484
1485 fn build_checkpoint_service(
1486 config: &NodeConfig,
1487 consensus_adapter: Arc<ConsensusAdapter>,
1488 checkpoint_store: Arc<CheckpointStore>,
1489 epoch_store: Arc<AuthorityPerEpochStore>,
1490 state: Arc<AuthorityState>,
1491 state_sync_handle: state_sync::Handle,
1492 state_hasher: Weak<GlobalStateHasher>,
1493 checkpoint_metrics: Arc<CheckpointMetrics>,
1494 ) -> Arc<CheckpointService> {
1495 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1496 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1497
1498 debug!(
1499 "Starting checkpoint service with epoch start timestamp {}
1500 and epoch duration {}",
1501 epoch_start_timestamp_ms, epoch_duration_ms
1502 );
1503
1504 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1505 sender: consensus_adapter,
1506 signer: state.secret.clone(),
1507 authority: config.protocol_public_key(),
1508 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1509 .checked_add(epoch_duration_ms)
1510 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1511 metrics: checkpoint_metrics.clone(),
1512 });
1513
1514 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1515 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1516 let max_checkpoint_size_bytes =
1517 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1518
1519 CheckpointService::build(
1520 state.clone(),
1521 checkpoint_store,
1522 epoch_store,
1523 state.get_transaction_cache_reader().clone(),
1524 state_hasher,
1525 checkpoint_output,
1526 Box::new(certified_checkpoint_output),
1527 checkpoint_metrics,
1528 max_tx_per_checkpoint,
1529 max_checkpoint_size_bytes,
1530 )
1531 }
1532
1533 fn construct_consensus_adapter(
1534 committee: &Committee,
1535 consensus_config: &ConsensusConfig,
1536 authority: AuthorityName,
1537 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1538 prometheus_registry: &Registry,
1539 protocol_config: ProtocolConfig,
1540 consensus_client: Arc<dyn ConsensusClient>,
1541 checkpoint_store: Arc<CheckpointStore>,
1542 ) -> ConsensusAdapter {
1543 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1544 ConsensusAdapter::new(
1547 consensus_client,
1548 checkpoint_store,
1549 authority,
1550 connection_monitor_status,
1551 consensus_config.max_pending_transactions(),
1552 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1553 consensus_config.max_submit_position,
1554 consensus_config.submit_delay_step_override(),
1555 ca_metrics,
1556 protocol_config,
1557 )
1558 }
1559
1560 async fn start_grpc_validator_service(
1561 config: &NodeConfig,
1562 state: Arc<AuthorityState>,
1563 consensus_adapter: Arc<ConsensusAdapter>,
1564 prometheus_registry: &Registry,
1565 ) -> Result<SpawnOnce> {
1566 let validator_service = ValidatorService::new(
1567 state.clone(),
1568 consensus_adapter,
1569 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1570 config.policy_config.clone().map(|p| p.client_id_source),
1571 );
1572
1573 let mut server_conf = mysten_network::config::Config::new();
1574 server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1575 server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1576 server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1577 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1578 server_conf.load_shed = config.grpc_load_shed;
1579 let mut server_builder =
1580 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1581
1582 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1583
1584 let tls_config = sui_tls::create_rustls_server_config(
1585 config.network_key_pair().copy().private(),
1586 SUI_TLS_SERVER_NAME.to_string(),
1587 );
1588
1589 let network_address = config.network_address().clone();
1590
1591 let (ready_tx, ready_rx) = oneshot::channel();
1592
1593 Ok(SpawnOnce::new(ready_rx, async move {
1594 let server = server_builder
1595 .bind(&network_address, Some(tls_config))
1596 .await
1597 .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1598 let local_addr = server.local_addr();
1599 info!("Listening to traffic on {local_addr}");
1600 ready_tx.send(()).unwrap();
1601 if let Err(err) = server.serve().await {
1602 info!("Server stopped: {err}");
1603 }
1604 info!("Server stopped");
1605 }))
1606 }
1607
1608 pub fn state(&self) -> Arc<AuthorityState> {
1609 self.state.clone()
1610 }
1611
1612 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1614 self.state.reference_gas_price_for_testing()
1615 }
1616
1617 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1618 self.state.committee_store().clone()
1619 }
1620
1621 pub fn clone_authority_aggregator(
1632 &self,
1633 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1634 self.transaction_orchestrator
1635 .as_ref()
1636 .map(|to| to.clone_authority_aggregator())
1637 }
1638
1639 pub fn transaction_orchestrator(
1640 &self,
1641 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1642 self.transaction_orchestrator.clone()
1643 }
1644
1645 pub async fn monitor_reconfiguration(
1648 self: Arc<Self>,
1649 mut epoch_store: Arc<AuthorityPerEpochStore>,
1650 ) -> Result<()> {
1651 let checkpoint_executor_metrics =
1652 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1653
1654 loop {
1655 let mut hasher_guard = self.global_state_hasher.lock().await;
1656 let hasher = hasher_guard.take().unwrap();
1657 info!(
1658 "Creating checkpoint executor for epoch {}",
1659 epoch_store.epoch()
1660 );
1661 let checkpoint_executor = CheckpointExecutor::new(
1662 epoch_store.clone(),
1663 self.checkpoint_store.clone(),
1664 self.state.clone(),
1665 hasher.clone(),
1666 self.backpressure_manager.clone(),
1667 self.config.checkpoint_executor_config.clone(),
1668 checkpoint_executor_metrics.clone(),
1669 self.subscription_service_checkpoint_sender.clone(),
1670 );
1671
1672 let run_with_range = self.config.run_with_range;
1673
1674 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1675
1676 self.metrics
1678 .current_protocol_version
1679 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1680
1681 if let Some(components) = &*self.validator_components.lock().await {
1683 tokio::time::sleep(Duration::from_millis(1)).await;
1685
1686 let config = cur_epoch_store.protocol_config();
1687 let mut supported_protocol_versions = self
1688 .config
1689 .supported_protocol_versions
1690 .expect("Supported versions should be populated")
1691 .truncate_below(config.version);
1693
1694 while supported_protocol_versions.max > config.version {
1695 let proposed_protocol_config = ProtocolConfig::get_for_version(
1696 supported_protocol_versions.max,
1697 cur_epoch_store.get_chain(),
1698 );
1699
1700 if proposed_protocol_config.enable_accumulators()
1701 && !epoch_store.accumulator_root_exists()
1702 {
1703 error!(
1704 "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1705 supported_protocol_versions.max
1706 );
1707 supported_protocol_versions.max = supported_protocol_versions.max.prev();
1708 } else {
1709 break;
1710 }
1711 }
1712
1713 let binary_config = config.binary_config(None);
1714 let transaction = ConsensusTransaction::new_capability_notification_v2(
1715 AuthorityCapabilitiesV2::new(
1716 self.state.name,
1717 cur_epoch_store.get_chain_identifier().chain(),
1718 supported_protocol_versions,
1719 self.state
1720 .get_available_system_packages(&binary_config)
1721 .await,
1722 ),
1723 );
1724 info!(?transaction, "submitting capabilities to consensus");
1725 components.consensus_adapter.submit(
1726 transaction,
1727 None,
1728 &cur_epoch_store,
1729 None,
1730 None,
1731 )?;
1732 }
1733
1734 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1735
1736 if stop_condition == StopReason::RunWithRangeCondition {
1737 SuiNode::shutdown(&self).await;
1738 self.shutdown_channel_tx
1739 .send(run_with_range)
1740 .expect("RunWithRangeCondition met but failed to send shutdown message");
1741 return Ok(());
1742 }
1743
1744 let latest_system_state = self
1746 .state
1747 .get_object_cache_reader()
1748 .get_sui_system_state_object_unsafe()
1749 .expect("Read Sui System State object cannot fail");
1750
1751 #[cfg(msim)]
1752 if !self
1753 .sim_state
1754 .sim_safe_mode_expected
1755 .load(Ordering::Relaxed)
1756 {
1757 debug_assert!(!latest_system_state.safe_mode());
1758 }
1759
1760 #[cfg(not(msim))]
1761 debug_assert!(!latest_system_state.safe_mode());
1762
1763 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1764 && self.state.is_fullnode(&cur_epoch_store)
1765 {
1766 warn!(
1767 "Failed to send end of epoch notification to subscriber: {:?}",
1768 err
1769 );
1770 }
1771
1772 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1773 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1774
1775 self.auth_agg.store(Arc::new(
1776 self.auth_agg
1777 .load()
1778 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1779 ));
1780
1781 let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1782 let next_epoch = next_epoch_committee.epoch();
1783 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1784
1785 info!(
1786 next_epoch,
1787 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1788 );
1789
1790 fail_point_async!("reconfig_delay");
1791
1792 let authority_names_to_peer_ids =
1797 new_epoch_start_state.get_authority_names_to_peer_ids();
1798 self.connection_monitor_status
1799 .update_mapping_for_epoch(authority_names_to_peer_ids);
1800
1801 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1802
1803 update_peer_addresses(&self.config, &self.endpoint_manager, &new_epoch_start_state);
1804
1805 let mut validator_components_lock_guard = self.validator_components.lock().await;
1806
1807 let new_epoch_store = self
1811 .reconfigure_state(
1812 &self.state,
1813 &cur_epoch_store,
1814 next_epoch_committee.clone(),
1815 new_epoch_start_state,
1816 hasher.clone(),
1817 )
1818 .await;
1819
1820 let new_validator_components = if let Some(ValidatorComponents {
1821 validator_server_handle,
1822 validator_overload_monitor_handle,
1823 consensus_manager,
1824 consensus_store_pruner,
1825 consensus_adapter,
1826 checkpoint_metrics,
1827 sui_tx_validator_metrics,
1828 }) = validator_components_lock_guard.take()
1829 {
1830 info!("Reconfiguring the validator.");
1831
1832 consensus_manager.shutdown().await;
1833 info!("Consensus has shut down.");
1834
1835 info!("Epoch store finished reconfiguration.");
1836
1837 let global_state_hasher_metrics = Arc::into_inner(hasher)
1840 .expect("Object state hasher should have no other references at this point")
1841 .metrics();
1842 let new_hasher = Arc::new(GlobalStateHasher::new(
1843 self.state.get_global_state_hash_store().clone(),
1844 global_state_hasher_metrics,
1845 ));
1846 let weak_hasher = Arc::downgrade(&new_hasher);
1847 *hasher_guard = Some(new_hasher);
1848
1849 consensus_store_pruner.prune(next_epoch).await;
1850
1851 if self.state.is_validator(&new_epoch_store) {
1852 Some(
1854 Self::start_epoch_specific_validator_components(
1855 &self.config,
1856 self.state.clone(),
1857 consensus_adapter,
1858 self.checkpoint_store.clone(),
1859 new_epoch_store.clone(),
1860 self.state_sync_handle.clone(),
1861 self.randomness_handle.clone(),
1862 consensus_manager,
1863 consensus_store_pruner,
1864 weak_hasher,
1865 self.backpressure_manager.clone(),
1866 validator_server_handle,
1867 validator_overload_monitor_handle,
1868 checkpoint_metrics,
1869 self.metrics.clone(),
1870 sui_tx_validator_metrics,
1871 )
1872 .await?,
1873 )
1874 } else {
1875 info!("This node is no longer a validator after reconfiguration");
1876 None
1877 }
1878 } else {
1879 let global_state_hasher_metrics = Arc::into_inner(hasher)
1882 .expect("Object state hasher should have no other references at this point")
1883 .metrics();
1884 let new_hasher = Arc::new(GlobalStateHasher::new(
1885 self.state.get_global_state_hash_store().clone(),
1886 global_state_hasher_metrics,
1887 ));
1888 let weak_hasher = Arc::downgrade(&new_hasher);
1889 *hasher_guard = Some(new_hasher);
1890
1891 if self.state.is_validator(&new_epoch_store) {
1892 info!("Promoting the node from fullnode to validator, starting grpc server");
1893
1894 let mut components = Self::construct_validator_components(
1895 self.config.clone(),
1896 self.state.clone(),
1897 Arc::new(next_epoch_committee.clone()),
1898 new_epoch_store.clone(),
1899 self.checkpoint_store.clone(),
1900 self.state_sync_handle.clone(),
1901 self.randomness_handle.clone(),
1902 weak_hasher,
1903 self.backpressure_manager.clone(),
1904 self.connection_monitor_status.clone(),
1905 &self.registry_service,
1906 self.metrics.clone(),
1907 self.checkpoint_metrics.clone(),
1908 )
1909 .await?;
1910
1911 components.validator_server_handle =
1912 components.validator_server_handle.start().await;
1913
1914 self.endpoint_manager
1916 .set_consensus_address_updater(components.consensus_manager.clone());
1917
1918 Some(components)
1919 } else {
1920 None
1921 }
1922 };
1923 *validator_components_lock_guard = new_validator_components;
1924
1925 cur_epoch_store.release_db_handles();
1928
1929 if cfg!(msim)
1930 && !matches!(
1931 self.config
1932 .authority_store_pruning_config
1933 .num_epochs_to_retain_for_checkpoints(),
1934 None | Some(u64::MAX) | Some(0)
1935 )
1936 {
1937 self.state
1938 .prune_checkpoints_for_eligible_epochs_for_testing(
1939 self.config.clone(),
1940 sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1941 )
1942 .await?;
1943 }
1944
1945 epoch_store = new_epoch_store;
1946 info!("Reconfiguration finished");
1947 }
1948 }
1949
1950 async fn shutdown(&self) {
1951 if let Some(validator_components) = &*self.validator_components.lock().await {
1952 validator_components.consensus_manager.shutdown().await;
1953 }
1954 }
1955
1956 async fn reconfigure_state(
1957 &self,
1958 state: &Arc<AuthorityState>,
1959 cur_epoch_store: &AuthorityPerEpochStore,
1960 next_epoch_committee: Committee,
1961 next_epoch_start_system_state: EpochStartSystemState,
1962 global_state_hasher: Arc<GlobalStateHasher>,
1963 ) -> Arc<AuthorityPerEpochStore> {
1964 let next_epoch = next_epoch_committee.epoch();
1965
1966 let last_checkpoint = self
1967 .checkpoint_store
1968 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1969 .expect("Error loading last checkpoint for current epoch")
1970 .expect("Could not load last checkpoint for current epoch");
1971
1972 let last_checkpoint_seq = *last_checkpoint.sequence_number();
1973
1974 assert_eq!(
1975 Some(last_checkpoint_seq),
1976 self.checkpoint_store
1977 .get_highest_executed_checkpoint_seq_number()
1978 .expect("Error loading highest executed checkpoint sequence number")
1979 );
1980
1981 let epoch_start_configuration = EpochStartConfiguration::new(
1982 next_epoch_start_system_state,
1983 *last_checkpoint.digest(),
1984 state.get_object_store().as_ref(),
1985 EpochFlag::default_flags_for_new_epoch(&state.config),
1986 )
1987 .expect("EpochStartConfiguration construction cannot fail");
1988
1989 let new_epoch_store = self
1990 .state
1991 .reconfigure(
1992 cur_epoch_store,
1993 self.config.supported_protocol_versions.unwrap(),
1994 next_epoch_committee,
1995 epoch_start_configuration,
1996 global_state_hasher,
1997 &self.config.expensive_safety_check_config,
1998 last_checkpoint_seq,
1999 )
2000 .await
2001 .expect("Reconfigure authority state cannot fail");
2002 info!(next_epoch, "Node State has been reconfigured");
2003 assert_eq!(next_epoch, new_epoch_store.epoch());
2004 self.state.get_reconfig_api().update_epoch_flags_metrics(
2005 cur_epoch_store.epoch_start_config().flags(),
2006 new_epoch_store.epoch_start_config().flags(),
2007 );
2008
2009 new_epoch_store
2010 }
2011
2012 pub fn get_config(&self) -> &NodeConfig {
2013 &self.config
2014 }
2015
2016 pub fn randomness_handle(&self) -> randomness::Handle {
2017 self.randomness_handle.clone()
2018 }
2019
2020 pub fn endpoint_manager(&self) -> &EndpointManager {
2021 &self.endpoint_manager
2022 }
2023
2024 fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2026 let digest_str = digest.to_string();
2027 if digest_str.len() >= 8 {
2028 digest_str[0..8].to_string()
2029 } else {
2030 digest_str
2031 }
2032 }
2033
2034 async fn check_and_recover_forks(
2038 checkpoint_store: &CheckpointStore,
2039 checkpoint_metrics: &CheckpointMetrics,
2040 is_validator: bool,
2041 fork_recovery: Option<&ForkRecoveryConfig>,
2042 ) -> Result<()> {
2043 if !is_validator {
2046 return Ok(());
2047 }
2048
2049 if let Some(recovery) = fork_recovery {
2051 Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2052 Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2053 }
2054
2055 if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2056 .get_checkpoint_fork_detected()
2057 .map_err(|e| {
2058 error!("Failed to check for checkpoint fork: {:?}", e);
2059 e
2060 })?
2061 {
2062 Self::handle_checkpoint_fork(
2063 checkpoint_seq,
2064 checkpoint_digest,
2065 checkpoint_metrics,
2066 fork_recovery,
2067 )
2068 .await?;
2069 }
2070 if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2071 .get_transaction_fork_detected()
2072 .map_err(|e| {
2073 error!("Failed to check for transaction fork: {:?}", e);
2074 e
2075 })?
2076 {
2077 Self::handle_transaction_fork(
2078 tx_digest,
2079 expected_effects,
2080 actual_effects,
2081 checkpoint_metrics,
2082 fork_recovery,
2083 )
2084 .await?;
2085 }
2086
2087 Ok(())
2088 }
2089
2090 fn try_recover_checkpoint_fork(
2091 checkpoint_store: &CheckpointStore,
2092 recovery: &ForkRecoveryConfig,
2093 ) -> Result<()> {
2094 for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2097 let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2098 anyhow::bail!(
2099 "Invalid checkpoint digest override for seq {}: {}",
2100 seq,
2101 expected_digest_str
2102 );
2103 };
2104
2105 if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2106 let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2107 if local_digest != expected_digest {
2108 info!(
2109 seq,
2110 local = %Self::get_digest_prefix(local_digest),
2111 expected = %Self::get_digest_prefix(expected_digest),
2112 "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2113 seq
2114 );
2115 checkpoint_store
2116 .clear_locally_computed_checkpoints_from(*seq)
2117 .context(
2118 "Failed to clear locally computed checkpoints from override seq",
2119 )?;
2120 }
2121 }
2122 }
2123
2124 if let Some((checkpoint_seq, checkpoint_digest)) =
2125 checkpoint_store.get_checkpoint_fork_detected()?
2126 && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2127 {
2128 info!(
2129 "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2130 checkpoint_seq, checkpoint_digest
2131 );
2132 checkpoint_store
2133 .clear_checkpoint_fork_detected()
2134 .expect("Failed to clear checkpoint fork detected marker");
2135 }
2136 Ok(())
2137 }
2138
2139 fn try_recover_transaction_fork(
2140 checkpoint_store: &CheckpointStore,
2141 recovery: &ForkRecoveryConfig,
2142 ) -> Result<()> {
2143 if recovery.transaction_overrides.is_empty() {
2144 return Ok(());
2145 }
2146
2147 if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2148 && recovery
2149 .transaction_overrides
2150 .contains_key(&tx_digest.to_string())
2151 {
2152 info!(
2153 "Fork recovery enabled: clearing transaction fork for tx {:?}",
2154 tx_digest
2155 );
2156 checkpoint_store
2157 .clear_transaction_fork_detected()
2158 .expect("Failed to clear transaction fork detected marker");
2159 }
2160 Ok(())
2161 }
2162
2163 fn get_current_timestamp() -> u64 {
2164 std::time::SystemTime::now()
2165 .duration_since(std::time::SystemTime::UNIX_EPOCH)
2166 .unwrap()
2167 .as_secs()
2168 }
2169
2170 async fn handle_checkpoint_fork(
2171 checkpoint_seq: u64,
2172 checkpoint_digest: CheckpointDigest,
2173 checkpoint_metrics: &CheckpointMetrics,
2174 fork_recovery: Option<&ForkRecoveryConfig>,
2175 ) -> Result<()> {
2176 checkpoint_metrics
2177 .checkpoint_fork_crash_mode
2178 .with_label_values(&[
2179 &checkpoint_seq.to_string(),
2180 &Self::get_digest_prefix(checkpoint_digest),
2181 &Self::get_current_timestamp().to_string(),
2182 ])
2183 .set(1);
2184
2185 let behavior = fork_recovery
2186 .map(|fr| fr.fork_crash_behavior)
2187 .unwrap_or_default();
2188
2189 match behavior {
2190 ForkCrashBehavior::AwaitForkRecovery => {
2191 error!(
2192 checkpoint_seq = checkpoint_seq,
2193 checkpoint_digest = ?checkpoint_digest,
2194 "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2195 );
2196 futures::future::pending::<()>().await;
2197 unreachable!("pending() should never return");
2198 }
2199 ForkCrashBehavior::ReturnError => {
2200 error!(
2201 checkpoint_seq = checkpoint_seq,
2202 checkpoint_digest = ?checkpoint_digest,
2203 "Checkpoint fork detected! Returning error."
2204 );
2205 Err(anyhow::anyhow!(
2206 "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2207 checkpoint_seq,
2208 checkpoint_digest
2209 ))
2210 }
2211 }
2212 }
2213
2214 async fn handle_transaction_fork(
2215 tx_digest: TransactionDigest,
2216 expected_effects_digest: TransactionEffectsDigest,
2217 actual_effects_digest: TransactionEffectsDigest,
2218 checkpoint_metrics: &CheckpointMetrics,
2219 fork_recovery: Option<&ForkRecoveryConfig>,
2220 ) -> Result<()> {
2221 checkpoint_metrics
2222 .transaction_fork_crash_mode
2223 .with_label_values(&[
2224 &Self::get_digest_prefix(tx_digest),
2225 &Self::get_digest_prefix(expected_effects_digest),
2226 &Self::get_digest_prefix(actual_effects_digest),
2227 &Self::get_current_timestamp().to_string(),
2228 ])
2229 .set(1);
2230
2231 let behavior = fork_recovery
2232 .map(|fr| fr.fork_crash_behavior)
2233 .unwrap_or_default();
2234
2235 match behavior {
2236 ForkCrashBehavior::AwaitForkRecovery => {
2237 error!(
2238 tx_digest = ?tx_digest,
2239 expected_effects_digest = ?expected_effects_digest,
2240 actual_effects_digest = ?actual_effects_digest,
2241 "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2242 );
2243 futures::future::pending::<()>().await;
2244 unreachable!("pending() should never return");
2245 }
2246 ForkCrashBehavior::ReturnError => {
2247 error!(
2248 tx_digest = ?tx_digest,
2249 expected_effects_digest = ?expected_effects_digest,
2250 actual_effects_digest = ?actual_effects_digest,
2251 "Transaction fork detected! Returning error."
2252 );
2253 Err(anyhow::anyhow!(
2254 "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2255 tx_digest,
2256 expected_effects_digest,
2257 actual_effects_digest
2258 ))
2259 }
2260 }
2261 }
2262}
2263
2264#[cfg(not(msim))]
2265impl SuiNode {
2266 async fn fetch_jwks(
2267 _authority: AuthorityName,
2268 provider: &OIDCProvider,
2269 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2270 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2271 use sui_types::error::SuiErrorKind;
2272 let client = reqwest::Client::new();
2273 fetch_jwks(provider, &client, true)
2274 .await
2275 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2276 }
2277}
2278
2279#[cfg(msim)]
2280impl SuiNode {
2281 pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2282 self.sim_state.sim_node.id()
2283 }
2284
2285 pub fn set_safe_mode_expected(&self, new_value: bool) {
2286 info!("Setting safe mode expected to {}", new_value);
2287 self.sim_state
2288 .sim_safe_mode_expected
2289 .store(new_value, Ordering::Relaxed);
2290 }
2291
2292 #[allow(unused_variables)]
2293 async fn fetch_jwks(
2294 authority: AuthorityName,
2295 provider: &OIDCProvider,
2296 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2297 get_jwk_injector()(authority, provider)
2298 }
2299}
2300
2301enum SpawnOnce {
2302 Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2304 #[allow(unused)]
2305 Started(JoinHandle<()>),
2306}
2307
2308impl SpawnOnce {
2309 pub fn new(
2310 ready_rx: oneshot::Receiver<()>,
2311 future: impl Future<Output = ()> + Send + 'static,
2312 ) -> Self {
2313 Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2314 }
2315
2316 pub async fn start(self) -> Self {
2317 match self {
2318 Self::Unstarted(ready_rx, future) => {
2319 let future = future.into_inner();
2320 let handle = tokio::spawn(future);
2321 ready_rx.await.unwrap();
2322 Self::Started(handle)
2323 }
2324 Self::Started(_) => self,
2325 }
2326 }
2327}
2328
2329fn update_peer_addresses(
2331 config: &NodeConfig,
2332 endpoint_manager: &EndpointManager,
2333 epoch_start_state: &EpochStartSystemState,
2334) {
2335 for (peer_id, address) in
2336 epoch_start_state.get_validator_as_p2p_peers(config.protocol_public_key())
2337 {
2338 endpoint_manager
2339 .update_endpoint(
2340 EndpointId::P2p(peer_id),
2341 AddressSource::Committee,
2342 vec![address],
2343 )
2344 .expect("Updating peer addresses should not fail");
2345 }
2346}
2347
2348fn build_kv_store(
2349 state: &Arc<AuthorityState>,
2350 config: &NodeConfig,
2351 registry: &Registry,
2352) -> Result<Arc<TransactionKeyValueStore>> {
2353 let metrics = KeyValueStoreMetrics::new(registry);
2354 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2355
2356 let base_url = &config.transaction_kv_store_read_config.base_url;
2357
2358 if base_url.is_empty() {
2359 info!("no http kv store url provided, using local db only");
2360 return Ok(Arc::new(db_store));
2361 }
2362
2363 let base_url: url::Url = base_url.parse().tap_err(|e| {
2364 error!(
2365 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2366 base_url, e
2367 )
2368 })?;
2369
2370 let network_str = match state.get_chain_identifier().chain() {
2371 Chain::Mainnet => "/mainnet",
2372 _ => {
2373 info!("using local db only for kv store");
2374 return Ok(Arc::new(db_store));
2375 }
2376 };
2377
2378 let base_url = base_url.join(network_str)?.to_string();
2379 let http_store = HttpKVStore::new_kv(
2380 &base_url,
2381 config.transaction_kv_store_read_config.cache_size,
2382 metrics.clone(),
2383 )?;
2384 info!("using local key-value store with fallback to http key-value store");
2385 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2386 db_store,
2387 http_store,
2388 metrics,
2389 "json_rpc_fallback",
2390 )))
2391}
2392
2393async fn build_http_servers(
2394 state: Arc<AuthorityState>,
2395 store: RocksDbStore,
2396 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2397 config: &NodeConfig,
2398 prometheus_registry: &Registry,
2399 server_version: ServerVersion,
2400) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2401 if config.consensus_config().is_some() {
2403 return Ok((HttpServers::default(), None));
2404 }
2405
2406 info!("starting rpc service with config: {:?}", config.rpc);
2407
2408 let mut router = axum::Router::new();
2409
2410 let json_rpc_router = {
2411 let traffic_controller = state.traffic_controller.clone();
2412 let mut server = JsonRpcServerBuilder::new(
2413 env!("CARGO_PKG_VERSION"),
2414 prometheus_registry,
2415 traffic_controller,
2416 config.policy_config.clone(),
2417 );
2418
2419 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2420
2421 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2422 server.register_module(ReadApi::new(
2423 state.clone(),
2424 kv_store.clone(),
2425 metrics.clone(),
2426 ))?;
2427 server.register_module(CoinReadApi::new(
2428 state.clone(),
2429 kv_store.clone(),
2430 metrics.clone(),
2431 ))?;
2432
2433 if config.run_with_range.is_none() {
2436 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2437 }
2438 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2439 server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2440
2441 if let Some(transaction_orchestrator) = transaction_orchestrator {
2442 server.register_module(TransactionExecutionApi::new(
2443 state.clone(),
2444 transaction_orchestrator.clone(),
2445 metrics.clone(),
2446 ))?;
2447 }
2448
2449 let name_service_config =
2450 if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2451 config.name_service_package_address,
2452 config.name_service_registry_id,
2453 config.name_service_reverse_registry_id,
2454 ) {
2455 sui_name_service::NameServiceConfig::new(
2456 package_address,
2457 registry_id,
2458 reverse_registry_id,
2459 )
2460 } else {
2461 match state.get_chain_identifier().chain() {
2462 Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2463 Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2464 Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2465 }
2466 };
2467
2468 server.register_module(IndexerApi::new(
2469 state.clone(),
2470 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2471 kv_store,
2472 name_service_config,
2473 metrics,
2474 config.indexer_max_subscriptions,
2475 ))?;
2476 server.register_module(MoveUtils::new(state.clone()))?;
2477
2478 let server_type = config.jsonrpc_server_type();
2479
2480 server.to_router(server_type).await?
2481 };
2482
2483 router = router.merge(json_rpc_router);
2484
2485 let (subscription_service_checkpoint_sender, subscription_service_handle) =
2486 SubscriptionService::build(prometheus_registry);
2487 let rpc_router = {
2488 let mut rpc_service =
2489 sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2490 rpc_service.with_server_version(server_version);
2491
2492 if let Some(config) = config.rpc.clone() {
2493 rpc_service.with_config(config);
2494 }
2495
2496 rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2497 rpc_service.with_subscription_service(subscription_service_handle);
2498
2499 if let Some(transaction_orchestrator) = transaction_orchestrator {
2500 rpc_service.with_executor(transaction_orchestrator.clone())
2501 }
2502
2503 rpc_service.into_router().await
2504 };
2505
2506 let layers = ServiceBuilder::new()
2507 .map_request(|mut request: axum::http::Request<_>| {
2508 if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2509 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2510 request.extensions_mut().insert(axum_connect_info);
2511 }
2512 request
2513 })
2514 .layer(axum::middleware::from_fn(server_timing_middleware))
2515 .layer(
2517 tower_http::cors::CorsLayer::new()
2518 .allow_methods([http::Method::GET, http::Method::POST])
2519 .allow_origin(tower_http::cors::Any)
2520 .allow_headers(tower_http::cors::Any)
2521 .expose_headers(tower_http::cors::Any),
2522 );
2523
2524 router = router.merge(rpc_router).layer(layers);
2525
2526 let https = if let Some((tls_config, https_address)) = config
2527 .rpc()
2528 .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2529 {
2530 let https = sui_http::Builder::new()
2531 .tls_single_cert(tls_config.cert(), tls_config.key())
2532 .and_then(|builder| builder.serve(https_address, router.clone()))
2533 .map_err(|e| anyhow::anyhow!(e))?;
2534
2535 info!(
2536 https_address =? https.local_addr(),
2537 "HTTPS rpc server listening on {}",
2538 https.local_addr()
2539 );
2540
2541 Some(https)
2542 } else {
2543 None
2544 };
2545
2546 let http = sui_http::Builder::new()
2547 .serve(&config.json_rpc_address, router)
2548 .map_err(|e| anyhow::anyhow!(e))?;
2549
2550 info!(
2551 http_address =? http.local_addr(),
2552 "HTTP rpc server listening on {}",
2553 http.local_addr()
2554 );
2555
2556 Ok((
2557 HttpServers {
2558 http: Some(http),
2559 https,
2560 },
2561 Some(subscription_service_checkpoint_sender),
2562 ))
2563}
2564
2565#[cfg(not(test))]
2566fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2567 protocol_config.max_transactions_per_checkpoint() as usize
2568}
2569
2570#[cfg(test)]
2571fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2572 2
2573}
2574
2575#[derive(Default)]
2576struct HttpServers {
2577 #[allow(unused)]
2578 http: Option<sui_http::ServerHandle>,
2579 #[allow(unused)]
2580 https: Option<sui_http::ServerHandle>,
2581}
2582
2583#[cfg(test)]
2584mod tests {
2585 use super::*;
2586 use prometheus::Registry;
2587 use std::collections::BTreeMap;
2588 use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2589 use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2590 use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2591
2592 #[tokio::test]
2593 async fn test_fork_error_and_recovery_both_paths() {
2594 let checkpoint_store = CheckpointStore::new_for_tests();
2595 let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2596
2597 let seq_num = 42;
2599 let digest = CheckpointDigest::random();
2600 checkpoint_store
2601 .record_checkpoint_fork_detected(seq_num, digest)
2602 .unwrap();
2603
2604 let fork_recovery = ForkRecoveryConfig {
2605 transaction_overrides: Default::default(),
2606 checkpoint_overrides: Default::default(),
2607 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2608 };
2609
2610 let r = SuiNode::check_and_recover_forks(
2611 &checkpoint_store,
2612 &checkpoint_metrics,
2613 true,
2614 Some(&fork_recovery),
2615 )
2616 .await;
2617 assert!(r.is_err());
2618 assert!(
2619 r.unwrap_err()
2620 .to_string()
2621 .contains("Checkpoint fork detected")
2622 );
2623
2624 let mut checkpoint_overrides = BTreeMap::new();
2625 checkpoint_overrides.insert(seq_num, digest.to_string());
2626 let fork_recovery_with_override = ForkRecoveryConfig {
2627 transaction_overrides: Default::default(),
2628 checkpoint_overrides,
2629 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2630 };
2631 let r = SuiNode::check_and_recover_forks(
2632 &checkpoint_store,
2633 &checkpoint_metrics,
2634 true,
2635 Some(&fork_recovery_with_override),
2636 )
2637 .await;
2638 assert!(r.is_ok());
2639 assert!(
2640 checkpoint_store
2641 .get_checkpoint_fork_detected()
2642 .unwrap()
2643 .is_none()
2644 );
2645
2646 let tx_digest = TransactionDigest::random();
2648 let expected_effects = TransactionEffectsDigest::random();
2649 let actual_effects = TransactionEffectsDigest::random();
2650 checkpoint_store
2651 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2652 .unwrap();
2653
2654 let fork_recovery = ForkRecoveryConfig {
2655 transaction_overrides: Default::default(),
2656 checkpoint_overrides: Default::default(),
2657 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2658 };
2659 let r = SuiNode::check_and_recover_forks(
2660 &checkpoint_store,
2661 &checkpoint_metrics,
2662 true,
2663 Some(&fork_recovery),
2664 )
2665 .await;
2666 assert!(r.is_err());
2667 assert!(
2668 r.unwrap_err()
2669 .to_string()
2670 .contains("Transaction fork detected")
2671 );
2672
2673 let mut transaction_overrides = BTreeMap::new();
2674 transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2675 let fork_recovery_with_override = ForkRecoveryConfig {
2676 transaction_overrides,
2677 checkpoint_overrides: Default::default(),
2678 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2679 };
2680 let r = SuiNode::check_and_recover_forks(
2681 &checkpoint_store,
2682 &checkpoint_metrics,
2683 true,
2684 Some(&fork_recovery_with_override),
2685 )
2686 .await;
2687 assert!(r.is_ok());
2688 assert!(
2689 checkpoint_store
2690 .get_transaction_fork_detected()
2691 .unwrap()
2692 .is_none()
2693 );
2694 }
2695}