1use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::authority::ExecutionEnv;
29use sui_core::authority::RandomnessRoundReceiver;
30use sui_core::authority::authority_store_tables::{
31 AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
32};
33use sui_core::authority::backpressure::BackpressureManager;
34use sui_core::authority::epoch_start_configuration::EpochFlag;
35use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
36use sui_core::consensus_adapter::ConsensusClient;
37use sui_core::consensus_manager::UpdatableConsensusClient;
38use sui_core::epoch::randomness::RandomnessManager;
39use sui_core::execution_cache::build_execution_cache;
40use sui_network::endpoint_manager::{AddressSource, EndpointId};
41use sui_network::validator::server::SUI_TLS_SERVER_NAME;
42use sui_types::full_checkpoint_content::Checkpoint;
43
44use sui_core::execution_scheduler::SchedulingSource;
45use sui_core::global_state_hasher::GlobalStateHashMetrics;
46use sui_core::storage::RestReadStore;
47use sui_json_rpc::bridge_api::BridgeReadApi;
48use sui_json_rpc_api::JsonRpcMetrics;
49use sui_network::randomness;
50use sui_rpc_api::RpcMetrics;
51use sui_rpc_api::ServerVersion;
52use sui_rpc_api::subscription::SubscriptionService;
53use sui_types::base_types::ConciseableName;
54use sui_types::crypto::RandomnessRound;
55use sui_types::digests::{
56 ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
57};
58use sui_types::messages_consensus::AuthorityCapabilitiesV2;
59use sui_types::sui_system_state::SuiSystemState;
60use tap::tap::TapFallible;
61use tokio::sync::oneshot;
62use tokio::sync::{Mutex, broadcast, mpsc};
63use tokio::task::JoinHandle;
64use tower::ServiceBuilder;
65use tracing::{Instrument, error_span, info};
66use tracing::{debug, error, warn};
67
68macro_rules! jwk_log {
72 ($($arg:tt)+) => {
73 if in_test_configuration() {
74 debug!($($arg)+);
75 } else {
76 info!($($arg)+);
77 }
78 };
79}
80
81use fastcrypto_zkp::bn254::zk_login::JWK;
82pub use handle::SuiNodeHandle;
83use mysten_metrics::{RegistryService, spawn_monitored_task};
84use mysten_service::server_timing::server_timing_middleware;
85use sui_config::node::{DBCheckpointConfig, RunWithRange};
86use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
87use sui_config::node_config_metrics::NodeConfigMetrics;
88use sui_config::{ConsensusConfig, NodeConfig};
89use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
90use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
91use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
92use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
93use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
94use sui_core::authority_aggregator::AuthorityAggregator;
95use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
96use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
97use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
98use sui_core::checkpoints::{
99 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
100 SubmitCheckpointToConsensus,
101};
102use sui_core::consensus_adapter::{
103 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
104};
105use sui_core::consensus_manager::ConsensusManager;
106use sui_core::consensus_throughput_calculator::{
107 ConsensusThroughputCalculator, ConsensusThroughputProfiler, ThroughputProfileRanges,
108};
109use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
110use sui_core::db_checkpoint_handler::DBCheckpointHandler;
111use sui_core::epoch::committee_store::CommitteeStore;
112use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
113use sui_core::epoch::epoch_metrics::EpochMetrics;
114use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
115use sui_core::global_state_hasher::GlobalStateHasher;
116use sui_core::jsonrpc_index::IndexStore;
117use sui_core::module_cache_metrics::ResolverMetrics;
118use sui_core::overload_monitor::overload_monitor;
119use sui_core::rpc_index::RpcIndexStore;
120use sui_core::signature_verifier::SignatureVerifierMetrics;
121use sui_core::storage::RocksDbStore;
122use sui_core::transaction_orchestrator::TransactionOrchestrator;
123use sui_core::{
124 authority::{AuthorityState, AuthorityStore},
125 authority_client::NetworkAuthorityClient,
126};
127use sui_json_rpc::JsonRpcServerBuilder;
128use sui_json_rpc::coin_api::CoinReadApi;
129use sui_json_rpc::governance_api::GovernanceReadApi;
130use sui_json_rpc::indexer_api::IndexerApi;
131use sui_json_rpc::move_utils::MoveUtils;
132use sui_json_rpc::read_api::ReadApi;
133use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
134use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
135use sui_macros::fail_point;
136use sui_macros::{fail_point_async, replay_log};
137use sui_network::api::ValidatorServer;
138use sui_network::discovery;
139use sui_network::endpoint_manager::EndpointManager;
140use sui_network::state_sync;
141use sui_network::validator::server::ServerBuilder;
142use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
143use sui_snapshot::uploader::StateSnapshotUploader;
144use sui_storage::{
145 http_key_value_store::HttpKVStore,
146 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
147 key_value_store_metrics::KeyValueStoreMetrics,
148};
149use sui_types::base_types::{AuthorityName, EpochId};
150use sui_types::committee::Committee;
151use sui_types::crypto::KeypairTraits;
152use sui_types::error::{SuiError, SuiResult};
153use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
154use sui_types::sui_system_state::SuiSystemStateTrait;
155use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
156use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
157use sui_types::supported_protocol_versions::SupportedProtocolVersions;
158use typed_store::DBMetrics;
159use typed_store::rocks::default_db_options;
160
161use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
162
163pub mod admin;
164mod handle;
165pub mod metrics;
166
167pub struct ValidatorComponents {
168 validator_server_handle: SpawnOnce,
169 validator_overload_monitor_handle: Option<JoinHandle<()>>,
170 consensus_manager: Arc<ConsensusManager>,
171 consensus_store_pruner: ConsensusStorePruner,
172 consensus_adapter: Arc<ConsensusAdapter>,
173 checkpoint_metrics: Arc<CheckpointMetrics>,
174 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
175}
176pub struct P2pComponents {
177 p2p_network: Network,
178 known_peers: HashMap<PeerId, String>,
179 discovery_handle: discovery::Handle,
180 state_sync_handle: state_sync::Handle,
181 randomness_handle: randomness::Handle,
182}
183
184#[cfg(msim)]
185mod simulator {
186 use std::sync::atomic::AtomicBool;
187 use sui_types::error::SuiErrorKind;
188
189 use super::*;
190 pub(super) struct SimState {
191 pub sim_node: sui_simulator::runtime::NodeHandle,
192 pub sim_safe_mode_expected: AtomicBool,
193 _leak_detector: sui_simulator::NodeLeakDetector,
194 }
195
196 impl Default for SimState {
197 fn default() -> Self {
198 Self {
199 sim_node: sui_simulator::runtime::NodeHandle::current(),
200 sim_safe_mode_expected: AtomicBool::new(false),
201 _leak_detector: sui_simulator::NodeLeakDetector::new(),
202 }
203 }
204 }
205
206 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
207 + Send
208 + Sync
209 + 'static;
210
211 fn default_fetch_jwks(
212 _authority: AuthorityName,
213 _provider: &OIDCProvider,
214 ) -> SuiResult<Vec<(JwkId, JWK)>> {
215 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
216 parse_jwks(
218 sui_types::zk_login_util::DEFAULT_JWK_BYTES,
219 &OIDCProvider::Twitch,
220 true,
221 )
222 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
223 }
224
225 thread_local! {
226 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
227 }
228
229 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
230 JWK_INJECTOR.with(|injector| injector.borrow().clone())
231 }
232
233 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
234 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
235 }
236}
237
238#[cfg(msim)]
239pub use simulator::set_jwk_injector;
240#[cfg(msim)]
241use simulator::*;
242use sui_core::authority::authority_store_pruner::{ObjectsCompactionFilter, PrunerWatermarks};
243use sui_core::{
244 consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
245};
246
247const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
248
249pub struct SuiNode {
250 config: NodeConfig,
251 validator_components: Mutex<Option<ValidatorComponents>>,
252
253 #[allow(unused)]
255 http_servers: HttpServers,
256
257 state: Arc<AuthorityState>,
258 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
259 registry_service: RegistryService,
260 metrics: Arc<SuiNodeMetrics>,
261 checkpoint_metrics: Arc<CheckpointMetrics>,
262
263 _discovery: discovery::Handle,
264 _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
265 state_sync_handle: state_sync::Handle,
266 randomness_handle: randomness::Handle,
267 checkpoint_store: Arc<CheckpointStore>,
268 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
269 connection_monitor_status: Arc<ConnectionMonitorStatus>,
270
271 end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
273
274 endpoint_manager: EndpointManager,
276
277 backpressure_manager: Arc<BackpressureManager>,
278
279 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
280
281 #[cfg(msim)]
282 sim_state: SimState,
283
284 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
285 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
287
288 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
293
294 subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
295}
296
297impl fmt::Debug for SuiNode {
298 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
299 f.debug_struct("SuiNode")
300 .field("name", &self.state.name.concise())
301 .finish()
302 }
303}
304
305static MAX_JWK_KEYS_PER_FETCH: usize = 100;
306
307impl SuiNode {
308 pub async fn start(
309 config: NodeConfig,
310 registry_service: RegistryService,
311 ) -> Result<Arc<SuiNode>> {
312 Self::start_async(
313 config,
314 registry_service,
315 ServerVersion::new("sui-node", "unknown"),
316 )
317 .await
318 }
319
320 fn start_jwk_updater(
321 config: &NodeConfig,
322 metrics: Arc<SuiNodeMetrics>,
323 authority: AuthorityName,
324 epoch_store: Arc<AuthorityPerEpochStore>,
325 consensus_adapter: Arc<ConsensusAdapter>,
326 ) {
327 let epoch = epoch_store.epoch();
328
329 let supported_providers = config
330 .zklogin_oauth_providers
331 .get(&epoch_store.get_chain_identifier().chain())
332 .unwrap_or(&BTreeSet::new())
333 .iter()
334 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
335 .collect::<Vec<_>>();
336
337 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
338
339 info!(
340 ?fetch_interval,
341 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
342 );
343
344 fn validate_jwk(
345 metrics: &Arc<SuiNodeMetrics>,
346 provider: &OIDCProvider,
347 id: &JwkId,
348 jwk: &JWK,
349 ) -> bool {
350 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
351 warn!(
352 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
353 id.iss, provider
354 );
355 metrics
356 .invalid_jwks
357 .with_label_values(&[&provider.to_string()])
358 .inc();
359 return false;
360 };
361
362 if iss_provider != *provider {
363 warn!(
364 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
365 id.iss, provider, iss_provider
366 );
367 metrics
368 .invalid_jwks
369 .with_label_values(&[&provider.to_string()])
370 .inc();
371 return false;
372 }
373
374 if !check_total_jwk_size(id, jwk) {
375 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
376 metrics
377 .invalid_jwks
378 .with_label_values(&[&provider.to_string()])
379 .inc();
380 return false;
381 }
382
383 true
384 }
385
386 for p in supported_providers.into_iter() {
395 let provider_str = p.to_string();
396 let epoch_store = epoch_store.clone();
397 let consensus_adapter = consensus_adapter.clone();
398 let metrics = metrics.clone();
399 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
400 async move {
401 let mut seen = HashSet::new();
404 loop {
405 jwk_log!("fetching JWK for provider {:?}", p);
406 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
407 match Self::fetch_jwks(authority, &p).await {
408 Err(e) => {
409 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
410 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
411 tokio::time::sleep(Duration::from_secs(30)).await;
413 continue;
414 }
415 Ok(mut keys) => {
416 metrics.total_jwks
417 .with_label_values(&[&provider_str])
418 .inc_by(keys.len() as u64);
419
420 keys.retain(|(id, jwk)| {
421 validate_jwk(&metrics, &p, id, jwk) &&
422 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
423 seen.insert((id.clone(), jwk.clone()))
424 });
425
426 metrics.unique_jwks
427 .with_label_values(&[&provider_str])
428 .inc_by(keys.len() as u64);
429
430 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
433 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
434 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
435 }
436
437 for (id, jwk) in keys.into_iter() {
438 jwk_log!("Submitting JWK to consensus: {:?}", id);
439
440 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
441 consensus_adapter.submit(txn, None, &epoch_store, None, None)
442 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
443 .ok();
444 }
445 }
446 }
447 tokio::time::sleep(fetch_interval).await;
448 }
449 }
450 .instrument(error_span!("jwk_updater_task", epoch)),
451 ));
452 }
453 }
454
455 pub async fn start_async(
456 config: NodeConfig,
457 registry_service: RegistryService,
458 server_version: ServerVersion,
459 ) -> Result<Arc<SuiNode>> {
460 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
461 let mut config = config.clone();
462 if config.supported_protocol_versions.is_none() {
463 info!(
464 "populating config.supported_protocol_versions with default {:?}",
465 SupportedProtocolVersions::SYSTEM_DEFAULT
466 );
467 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
468 }
469
470 let run_with_range = config.run_with_range;
471 let is_validator = config.consensus_config().is_some();
472 let is_full_node = !is_validator;
473 let prometheus_registry = registry_service.default_registry();
474
475 info!(node =? config.protocol_public_key(),
476 "Initializing sui-node listening on {}", config.network_address
477 );
478
479 DBMetrics::init(registry_service.clone());
481
482 typed_store::init_write_sync(config.enable_db_sync_to_disk);
484
485 mysten_metrics::init_metrics(&prometheus_registry);
487 #[cfg(not(msim))]
489 mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
490
491 let genesis = config.genesis()?.clone();
492
493 let secret = Arc::pin(config.protocol_key_pair().copy());
494 let genesis_committee = genesis.committee();
495 let committee_store = Arc::new(CommitteeStore::new(
496 config.db_path().join("epochs"),
497 &genesis_committee,
498 None,
499 ));
500
501 let pruner_watermarks = Arc::new(PrunerWatermarks::default());
502 let checkpoint_store = CheckpointStore::new(
503 &config.db_path().join("checkpoints"),
504 pruner_watermarks.clone(),
505 );
506 let checkpoint_metrics = CheckpointMetrics::new(®istry_service.default_registry());
507
508 Self::check_and_recover_forks(
509 &checkpoint_store,
510 &checkpoint_metrics,
511 is_validator,
512 config.fork_recovery.as_ref(),
513 )
514 .await?;
515
516 let mut pruner_db = None;
517 if config
518 .authority_store_pruning_config
519 .enable_compaction_filter
520 {
521 pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
522 &config.db_path().join("store"),
523 )));
524 }
525 let compaction_filter = pruner_db
526 .clone()
527 .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
528
529 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
531 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
532 enable_write_stall,
533 compaction_filter,
534 is_validator,
535 };
536 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
537 &config.db_path().join("store"),
538 Some(perpetual_tables_options),
539 Some(pruner_watermarks.epoch_id.clone()),
540 ));
541 let is_genesis = perpetual_tables
542 .database_is_empty()
543 .expect("Database read should not fail at init.");
544
545 let backpressure_manager =
546 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
547
548 let store =
549 AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
550
551 let cur_epoch = store.get_recovery_epoch_at_restart()?;
552 let committee = committee_store
553 .get_committee(&cur_epoch)?
554 .expect("Committee of the current epoch must exist");
555 let epoch_start_configuration = store
556 .get_epoch_start_configuration()?
557 .expect("EpochStartConfiguration of the current epoch must exist");
558 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
559 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
560
561 let cache_traits = build_execution_cache(
562 &config.execution_cache,
563 &prometheus_registry,
564 &store,
565 backpressure_manager.clone(),
566 );
567
568 let auth_agg = {
569 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
570 Arc::new(ArcSwap::new(Arc::new(
571 AuthorityAggregator::new_from_epoch_start_state(
572 epoch_start_configuration.epoch_start_state(),
573 &committee_store,
574 safe_client_metrics_base,
575 ),
576 )))
577 };
578
579 let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
580 let chain = match config.chain_override_for_testing {
581 Some(chain) => chain,
582 None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
583 };
584
585 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
586 let epoch_store = AuthorityPerEpochStore::new(
587 config.protocol_public_key(),
588 committee.clone(),
589 &config.db_path().join("store"),
590 Some(epoch_options.options),
591 EpochMetrics::new(®istry_service.default_registry()),
592 epoch_start_configuration,
593 cache_traits.backing_package_store.clone(),
594 cache_traits.object_store.clone(),
595 cache_metrics,
596 signature_verifier_metrics,
597 &config.expensive_safety_check_config,
598 (chain_id, chain),
599 checkpoint_store
600 .get_highest_executed_checkpoint_seq_number()
601 .expect("checkpoint store read cannot fail")
602 .unwrap_or(0),
603 Arc::new(SubmittedTransactionCacheMetrics::new(
604 ®istry_service.default_registry(),
605 )),
606 )?;
607
608 info!("created epoch store");
609
610 replay_log!(
611 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
612 epoch_store.epoch(),
613 epoch_store.protocol_config()
614 );
615
616 if is_genesis {
618 info!("checking SUI conservation at genesis");
619 cache_traits
624 .reconfig_api
625 .expensive_check_sui_conservation(&epoch_store)
626 .expect("SUI conservation check cannot fail at genesis");
627 }
628
629 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
630 let default_buffer_stake = epoch_store
631 .protocol_config()
632 .buffer_stake_for_protocol_upgrade_bps();
633 if effective_buffer_stake != default_buffer_stake {
634 warn!(
635 ?effective_buffer_stake,
636 ?default_buffer_stake,
637 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
638 );
639 }
640
641 checkpoint_store.insert_genesis_checkpoint(
642 genesis.checkpoint(),
643 genesis.checkpoint_contents().clone(),
644 &epoch_store,
645 );
646
647 info!("creating state sync store");
648 let state_sync_store = RocksDbStore::new(
649 cache_traits.clone(),
650 committee_store.clone(),
651 checkpoint_store.clone(),
652 );
653
654 let index_store = if is_full_node && config.enable_index_processing {
655 info!("creating index store");
656 Some(Arc::new(IndexStore::new(
657 config.db_path().join("indexes"),
658 &prometheus_registry,
659 epoch_store
660 .protocol_config()
661 .max_move_identifier_len_as_option(),
662 config.remove_deprecated_tables,
663 )))
664 } else {
665 None
666 };
667
668 let rpc_index = if is_full_node && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
669 Some(Arc::new(
670 RpcIndexStore::new(
671 &config.db_path(),
672 &store,
673 &checkpoint_store,
674 &epoch_store,
675 &cache_traits.backing_package_store,
676 pruner_watermarks.checkpoint_id.clone(),
677 config.rpc().cloned().unwrap_or_default(),
678 )
679 .await,
680 ))
681 } else {
682 None
683 };
684
685 let chain_identifier = epoch_store.get_chain_identifier();
686
687 info!("creating archive reader");
688 let (randomness_tx, randomness_rx) = mpsc::channel(
690 config
691 .p2p_config
692 .randomness
693 .clone()
694 .unwrap_or_default()
695 .mailbox_capacity(),
696 );
697 let P2pComponents {
698 p2p_network,
699 known_peers,
700 discovery_handle,
701 state_sync_handle,
702 randomness_handle,
703 } = Self::create_p2p_network(
704 &config,
705 state_sync_store.clone(),
706 chain_identifier,
707 randomness_tx,
708 &prometheus_registry,
709 )?;
710
711 let endpoint_manager = EndpointManager::new(discovery_handle.clone());
712
713 for peer in &config.p2p_config.peer_address_overrides {
715 endpoint_manager
716 .update_endpoint(
717 EndpointId::P2p(peer.peer_id),
718 AddressSource::Config,
719 peer.addresses.clone(),
720 )
721 .expect("Updating peer address overrides should not fail");
722 }
723
724 update_peer_addresses(&config, &endpoint_manager, epoch_store.epoch_start_state());
726
727 info!("start snapshot upload");
728 let state_snapshot_handle = Self::start_state_snapshot(
730 &config,
731 &prometheus_registry,
732 checkpoint_store.clone(),
733 chain_identifier,
734 )?;
735
736 info!("start db checkpoint");
738 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
739 &config,
740 &prometheus_registry,
741 state_snapshot_handle.is_some(),
742 )?;
743
744 if !epoch_store
745 .protocol_config()
746 .simplified_unwrap_then_delete()
747 {
748 config
750 .authority_store_pruning_config
751 .set_killswitch_tombstone_pruning(true);
752 }
753
754 let authority_name = config.protocol_public_key();
755
756 info!("create authority state");
757 let state = AuthorityState::new(
758 authority_name,
759 secret,
760 config.supported_protocol_versions.unwrap(),
761 store.clone(),
762 cache_traits.clone(),
763 epoch_store.clone(),
764 committee_store.clone(),
765 index_store.clone(),
766 rpc_index,
767 checkpoint_store.clone(),
768 &prometheus_registry,
769 genesis.objects(),
770 &db_checkpoint_config,
771 config.clone(),
772 chain_identifier,
773 pruner_db,
774 config.policy_config.clone(),
775 config.firewall_config.clone(),
776 pruner_watermarks,
777 )
778 .await;
779 if epoch_store.epoch() == 0 {
781 let txn = &genesis.transaction();
782 let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
783 let transaction =
784 sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
785 sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
786 genesis.transaction().data().clone(),
787 sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
788 ),
789 );
790 state
791 .try_execute_immediately(
792 &transaction,
793 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
794 &epoch_store,
795 )
796 .instrument(span)
797 .await
798 .unwrap();
799 }
800
801 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
803
804 if config
805 .expensive_safety_check_config
806 .enable_secondary_index_checks()
807 && let Some(indexes) = state.indexes.clone()
808 {
809 sui_core::verify_indexes::verify_indexes(
810 state.get_global_state_hash_store().as_ref(),
811 indexes,
812 )
813 .expect("secondary indexes are inconsistent");
814 }
815
816 let (end_of_epoch_channel, end_of_epoch_receiver) =
817 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
818
819 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
820 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
821 auth_agg.load_full(),
822 state.clone(),
823 end_of_epoch_receiver,
824 &config.db_path(),
825 &prometheus_registry,
826 &config,
827 )))
828 } else {
829 None
830 };
831
832 let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
833 state.clone(),
834 state_sync_store,
835 &transaction_orchestrator.clone(),
836 &config,
837 &prometheus_registry,
838 server_version,
839 )
840 .await?;
841
842 let global_state_hasher = Arc::new(GlobalStateHasher::new(
843 cache_traits.global_state_hash_store.clone(),
844 GlobalStateHashMetrics::new(&prometheus_registry),
845 ));
846
847 let authority_names_to_peer_ids = epoch_store
848 .epoch_start_state()
849 .get_authority_names_to_peer_ids();
850
851 let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
852 "sui",
853 ®istry_service.default_registry(),
854 );
855
856 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
857
858 let connection_monitor_handle =
859 mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
860 p2p_network.downgrade(),
861 Arc::new(network_connection_metrics),
862 known_peers,
863 );
864
865 let connection_monitor_status = ConnectionMonitorStatus {
866 connection_statuses: connection_monitor_handle.connection_statuses(),
867 authority_names_to_peer_ids,
868 };
869
870 let connection_monitor_status = Arc::new(connection_monitor_status);
871 let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry()));
872
873 sui_node_metrics
874 .binary_max_protocol_version
875 .set(ProtocolVersion::MAX.as_u64() as i64);
876 sui_node_metrics
877 .configured_max_protocol_version
878 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
879
880 let validator_components = if state.is_validator(&epoch_store) {
881 let mut components = Self::construct_validator_components(
882 config.clone(),
883 state.clone(),
884 committee,
885 epoch_store.clone(),
886 checkpoint_store.clone(),
887 state_sync_handle.clone(),
888 randomness_handle.clone(),
889 Arc::downgrade(&global_state_hasher),
890 backpressure_manager.clone(),
891 connection_monitor_status.clone(),
892 ®istry_service,
893 sui_node_metrics.clone(),
894 checkpoint_metrics.clone(),
895 )
896 .await?;
897
898 components.consensus_adapter.submit_recovered(&epoch_store);
899
900 components.validator_server_handle = components.validator_server_handle.start().await;
902
903 endpoint_manager.set_consensus_address_updater(components.consensus_manager.clone());
905
906 Some(components)
907 } else {
908 None
909 };
910
911 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
913
914 let node = Self {
915 config,
916 validator_components: Mutex::new(validator_components),
917 http_servers,
918 state,
919 transaction_orchestrator,
920 registry_service,
921 metrics: sui_node_metrics,
922 checkpoint_metrics,
923
924 _discovery: discovery_handle,
925 _connection_monitor_handle: connection_monitor_handle,
926 state_sync_handle,
927 randomness_handle,
928 checkpoint_store,
929 global_state_hasher: Mutex::new(Some(global_state_hasher)),
930 end_of_epoch_channel,
931 connection_monitor_status,
932 endpoint_manager,
933 backpressure_manager,
934
935 _db_checkpoint_handle: db_checkpoint_handle,
936
937 #[cfg(msim)]
938 sim_state: Default::default(),
939
940 _state_snapshot_uploader_handle: state_snapshot_handle,
941 shutdown_channel_tx: shutdown_channel,
942
943 auth_agg,
944 subscription_service_checkpoint_sender,
945 };
946
947 info!("SuiNode started!");
948 let node = Arc::new(node);
949 let node_copy = node.clone();
950 spawn_monitored_task!(async move {
951 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
952 if let Err(error) = result {
953 warn!("Reconfiguration finished with error {:?}", error);
954 }
955 });
956
957 Ok(node)
958 }
959
960 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
961 self.end_of_epoch_channel.subscribe()
962 }
963
964 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
965 self.shutdown_channel_tx.subscribe()
966 }
967
968 pub fn current_epoch_for_testing(&self) -> EpochId {
969 self.state.current_epoch_for_testing()
970 }
971
972 pub fn db_checkpoint_path(&self) -> PathBuf {
973 self.config.db_checkpoint_path()
974 }
975
976 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
978 info!("close_epoch (current epoch = {})", epoch_store.epoch());
979 self.validator_components
980 .lock()
981 .await
982 .as_ref()
983 .ok_or_else(|| SuiError::from("Node is not a validator"))?
984 .consensus_adapter
985 .close_epoch(epoch_store);
986 Ok(())
987 }
988
989 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
990 self.state
991 .clear_override_protocol_upgrade_buffer_stake(epoch)
992 }
993
994 pub fn set_override_protocol_upgrade_buffer_stake(
995 &self,
996 epoch: EpochId,
997 buffer_stake_bps: u64,
998 ) -> SuiResult {
999 self.state
1000 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1001 }
1002
1003 pub async fn close_epoch_for_testing(&self) -> SuiResult {
1006 let epoch_store = self.state.epoch_store_for_testing();
1007 self.close_epoch(&epoch_store).await
1008 }
1009
1010 fn start_state_snapshot(
1011 config: &NodeConfig,
1012 prometheus_registry: &Registry,
1013 checkpoint_store: Arc<CheckpointStore>,
1014 chain_identifier: ChainIdentifier,
1015 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1016 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1017 let snapshot_uploader = StateSnapshotUploader::new(
1018 &config.db_checkpoint_path(),
1019 &config.snapshot_path(),
1020 remote_store_config.clone(),
1021 60,
1022 prometheus_registry,
1023 checkpoint_store,
1024 chain_identifier,
1025 config.state_snapshot_write_config.archive_interval_epochs,
1026 )?;
1027 Ok(Some(snapshot_uploader.start()))
1028 } else {
1029 Ok(None)
1030 }
1031 }
1032
1033 fn start_db_checkpoint(
1034 config: &NodeConfig,
1035 prometheus_registry: &Registry,
1036 state_snapshot_enabled: bool,
1037 ) -> Result<(
1038 DBCheckpointConfig,
1039 Option<tokio::sync::broadcast::Sender<()>>,
1040 )> {
1041 let checkpoint_path = Some(
1042 config
1043 .db_checkpoint_config
1044 .checkpoint_path
1045 .clone()
1046 .unwrap_or_else(|| config.db_checkpoint_path()),
1047 );
1048 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1049 DBCheckpointConfig {
1050 checkpoint_path,
1051 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1052 true
1053 } else {
1054 config
1055 .db_checkpoint_config
1056 .perform_db_checkpoints_at_epoch_end
1057 },
1058 ..config.db_checkpoint_config.clone()
1059 }
1060 } else {
1061 config.db_checkpoint_config.clone()
1062 };
1063
1064 match (
1065 db_checkpoint_config.object_store_config.as_ref(),
1066 state_snapshot_enabled,
1067 ) {
1068 (None, false) => Ok((db_checkpoint_config, None)),
1073 (_, _) => {
1074 let handler = DBCheckpointHandler::new(
1075 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1076 db_checkpoint_config.object_store_config.as_ref(),
1077 60,
1078 db_checkpoint_config
1079 .prune_and_compact_before_upload
1080 .unwrap_or(true),
1081 config.authority_store_pruning_config.clone(),
1082 prometheus_registry,
1083 state_snapshot_enabled,
1084 )?;
1085 Ok((
1086 db_checkpoint_config,
1087 Some(DBCheckpointHandler::start(handler)),
1088 ))
1089 }
1090 }
1091 }
1092
1093 fn create_p2p_network(
1094 config: &NodeConfig,
1095 state_sync_store: RocksDbStore,
1096 chain_identifier: ChainIdentifier,
1097 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1098 prometheus_registry: &Registry,
1099 ) -> Result<P2pComponents> {
1100 let (state_sync, state_sync_router) = state_sync::Builder::new()
1101 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1102 .store(state_sync_store)
1103 .archive_config(config.archive_reader_config())
1104 .with_metrics(prometheus_registry)
1105 .build();
1106
1107 let (discovery, discovery_server) = discovery::Builder::new()
1108 .config(config.p2p_config.clone())
1109 .build();
1110
1111 let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1112 let known_peers: HashMap<PeerId, String> = discovery_config
1113 .allowlisted_peers
1114 .clone()
1115 .into_iter()
1116 .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1117 .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1118 peer.peer_id
1119 .map(|peer_id| (peer_id, "seed_peer".to_string()))
1120 }))
1121 .collect();
1122
1123 let (randomness, randomness_router) =
1124 randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1125 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1126 .with_metrics(prometheus_registry)
1127 .build();
1128
1129 let p2p_network = {
1130 let routes = anemo::Router::new()
1131 .add_rpc_service(discovery_server)
1132 .merge(state_sync_router);
1133 let routes = routes.merge(randomness_router);
1134
1135 let inbound_network_metrics =
1136 mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1137 let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1138 "sui",
1139 "outbound",
1140 prometheus_registry,
1141 );
1142
1143 let service = ServiceBuilder::new()
1144 .layer(
1145 TraceLayer::new_for_server_errors()
1146 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1147 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1148 )
1149 .layer(CallbackLayer::new(
1150 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1151 Arc::new(inbound_network_metrics),
1152 config.p2p_config.excessive_message_size(),
1153 ),
1154 ))
1155 .service(routes);
1156
1157 let outbound_layer = ServiceBuilder::new()
1158 .layer(
1159 TraceLayer::new_for_client_and_server_errors()
1160 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1161 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1162 )
1163 .layer(CallbackLayer::new(
1164 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1165 Arc::new(outbound_network_metrics),
1166 config.p2p_config.excessive_message_size(),
1167 ),
1168 ))
1169 .into_inner();
1170
1171 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1172 anemo_config.max_frame_size = Some(1 << 30);
1175
1176 let mut quic_config = anemo_config.quic.unwrap_or_default();
1179 if quic_config.socket_send_buffer_size.is_none() {
1180 quic_config.socket_send_buffer_size = Some(20 << 20);
1181 }
1182 if quic_config.socket_receive_buffer_size.is_none() {
1183 quic_config.socket_receive_buffer_size = Some(20 << 20);
1184 }
1185 quic_config.allow_failed_socket_buffer_size_setting = true;
1186
1187 if quic_config.max_concurrent_bidi_streams.is_none() {
1190 quic_config.max_concurrent_bidi_streams = Some(500);
1191 }
1192 if quic_config.max_concurrent_uni_streams.is_none() {
1193 quic_config.max_concurrent_uni_streams = Some(500);
1194 }
1195 if quic_config.stream_receive_window.is_none() {
1196 quic_config.stream_receive_window = Some(100 << 20);
1197 }
1198 if quic_config.receive_window.is_none() {
1199 quic_config.receive_window = Some(200 << 20);
1200 }
1201 if quic_config.send_window.is_none() {
1202 quic_config.send_window = Some(200 << 20);
1203 }
1204 if quic_config.crypto_buffer_size.is_none() {
1205 quic_config.crypto_buffer_size = Some(1 << 20);
1206 }
1207 if quic_config.max_idle_timeout_ms.is_none() {
1208 quic_config.max_idle_timeout_ms = Some(30_000);
1209 }
1210 if quic_config.keep_alive_interval_ms.is_none() {
1211 quic_config.keep_alive_interval_ms = Some(5_000);
1212 }
1213 anemo_config.quic = Some(quic_config);
1214
1215 let server_name = format!("sui-{}", chain_identifier);
1216 let network = Network::bind(config.p2p_config.listen_address)
1217 .server_name(&server_name)
1218 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1219 .config(anemo_config)
1220 .outbound_request_layer(outbound_layer)
1221 .start(service)?;
1222 info!(
1223 server_name = server_name,
1224 "P2p network started on {}",
1225 network.local_addr()
1226 );
1227
1228 network
1229 };
1230
1231 let discovery_handle =
1232 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1233 let state_sync_handle = state_sync.start(p2p_network.clone());
1234 let randomness_handle = randomness.start(p2p_network.clone());
1235
1236 Ok(P2pComponents {
1237 p2p_network,
1238 known_peers,
1239 discovery_handle,
1240 state_sync_handle,
1241 randomness_handle,
1242 })
1243 }
1244
1245 async fn construct_validator_components(
1246 config: NodeConfig,
1247 state: Arc<AuthorityState>,
1248 committee: Arc<Committee>,
1249 epoch_store: Arc<AuthorityPerEpochStore>,
1250 checkpoint_store: Arc<CheckpointStore>,
1251 state_sync_handle: state_sync::Handle,
1252 randomness_handle: randomness::Handle,
1253 global_state_hasher: Weak<GlobalStateHasher>,
1254 backpressure_manager: Arc<BackpressureManager>,
1255 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1256 registry_service: &RegistryService,
1257 sui_node_metrics: Arc<SuiNodeMetrics>,
1258 checkpoint_metrics: Arc<CheckpointMetrics>,
1259 ) -> Result<ValidatorComponents> {
1260 let mut config_clone = config.clone();
1261 let consensus_config = config_clone
1262 .consensus_config
1263 .as_mut()
1264 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1265
1266 let client = Arc::new(UpdatableConsensusClient::new());
1267 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1268 &committee,
1269 consensus_config,
1270 state.name,
1271 connection_monitor_status.clone(),
1272 ®istry_service.default_registry(),
1273 epoch_store.protocol_config().clone(),
1274 client.clone(),
1275 checkpoint_store.clone(),
1276 ));
1277 let consensus_manager = Arc::new(ConsensusManager::new(
1278 &config,
1279 consensus_config,
1280 registry_service,
1281 client,
1282 ));
1283
1284 let consensus_store_pruner = ConsensusStorePruner::new(
1286 consensus_manager.get_storage_base_path(),
1287 consensus_config.db_retention_epochs(),
1288 consensus_config.db_pruner_period(),
1289 ®istry_service.default_registry(),
1290 );
1291
1292 let sui_tx_validator_metrics =
1293 SuiTxValidatorMetrics::new(®istry_service.default_registry());
1294
1295 let validator_server_handle = Self::start_grpc_validator_service(
1296 &config,
1297 state.clone(),
1298 consensus_adapter.clone(),
1299 ®istry_service.default_registry(),
1300 )
1301 .await?;
1302
1303 let validator_overload_monitor_handle = if config
1306 .authority_overload_config
1307 .max_load_shedding_percentage
1308 > 0
1309 {
1310 let authority_state = Arc::downgrade(&state);
1311 let overload_config = config.authority_overload_config.clone();
1312 fail_point!("starting_overload_monitor");
1313 Some(spawn_monitored_task!(overload_monitor(
1314 authority_state,
1315 overload_config,
1316 )))
1317 } else {
1318 None
1319 };
1320
1321 Self::start_epoch_specific_validator_components(
1322 &config,
1323 state.clone(),
1324 consensus_adapter,
1325 checkpoint_store,
1326 epoch_store,
1327 state_sync_handle,
1328 randomness_handle,
1329 consensus_manager,
1330 consensus_store_pruner,
1331 global_state_hasher,
1332 backpressure_manager,
1333 validator_server_handle,
1334 validator_overload_monitor_handle,
1335 checkpoint_metrics,
1336 sui_node_metrics,
1337 sui_tx_validator_metrics,
1338 )
1339 .await
1340 }
1341
1342 async fn start_epoch_specific_validator_components(
1343 config: &NodeConfig,
1344 state: Arc<AuthorityState>,
1345 consensus_adapter: Arc<ConsensusAdapter>,
1346 checkpoint_store: Arc<CheckpointStore>,
1347 epoch_store: Arc<AuthorityPerEpochStore>,
1348 state_sync_handle: state_sync::Handle,
1349 randomness_handle: randomness::Handle,
1350 consensus_manager: Arc<ConsensusManager>,
1351 consensus_store_pruner: ConsensusStorePruner,
1352 state_hasher: Weak<GlobalStateHasher>,
1353 backpressure_manager: Arc<BackpressureManager>,
1354 validator_server_handle: SpawnOnce,
1355 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1356 checkpoint_metrics: Arc<CheckpointMetrics>,
1357 sui_node_metrics: Arc<SuiNodeMetrics>,
1358 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1359 ) -> Result<ValidatorComponents> {
1360 let checkpoint_service = Self::build_checkpoint_service(
1361 config,
1362 consensus_adapter.clone(),
1363 checkpoint_store.clone(),
1364 epoch_store.clone(),
1365 state.clone(),
1366 state_sync_handle,
1367 state_hasher,
1368 checkpoint_metrics.clone(),
1369 );
1370
1371 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1375
1376 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1377
1378 if epoch_store.randomness_state_enabled() {
1379 let randomness_manager = RandomnessManager::try_new(
1380 Arc::downgrade(&epoch_store),
1381 Box::new(consensus_adapter.clone()),
1382 randomness_handle,
1383 config.protocol_key_pair(),
1384 )
1385 .await;
1386 if let Some(randomness_manager) = randomness_manager {
1387 epoch_store
1388 .set_randomness_manager(randomness_manager)
1389 .await?;
1390 }
1391 }
1392
1393 ExecutionTimeObserver::spawn(
1394 epoch_store.clone(),
1395 Box::new(consensus_adapter.clone()),
1396 config
1397 .execution_time_observer_config
1398 .clone()
1399 .unwrap_or_default(),
1400 );
1401
1402 let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1403 None,
1404 state.metrics.clone(),
1405 ));
1406
1407 let throughput_profiler = Arc::new(ConsensusThroughputProfiler::new(
1408 throughput_calculator.clone(),
1409 None,
1410 None,
1411 state.metrics.clone(),
1412 ThroughputProfileRanges::from_chain(epoch_store.get_chain_identifier()),
1413 ));
1414
1415 consensus_adapter.swap_throughput_profiler(throughput_profiler);
1416
1417 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1418 state.clone(),
1419 checkpoint_service.clone(),
1420 epoch_store.clone(),
1421 consensus_adapter.clone(),
1422 low_scoring_authorities,
1423 throughput_calculator,
1424 backpressure_manager,
1425 );
1426
1427 info!("Starting consensus manager asynchronously");
1428
1429 tokio::spawn({
1431 let config = config.clone();
1432 let epoch_store = epoch_store.clone();
1433 let sui_tx_validator = SuiTxValidator::new(
1434 state.clone(),
1435 epoch_store.clone(),
1436 checkpoint_service.clone(),
1437 sui_tx_validator_metrics.clone(),
1438 );
1439 let consensus_manager = consensus_manager.clone();
1440 async move {
1441 consensus_manager
1442 .start(
1443 &config,
1444 epoch_store,
1445 consensus_handler_initializer,
1446 sui_tx_validator,
1447 )
1448 .await;
1449 }
1450 });
1451 let replay_waiter = consensus_manager.replay_waiter();
1452
1453 info!("Spawning checkpoint service");
1454 let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1455 None
1456 } else {
1457 Some(replay_waiter)
1458 };
1459 checkpoint_service
1460 .spawn(epoch_store.clone(), replay_waiter)
1461 .await;
1462
1463 if epoch_store.authenticator_state_enabled() {
1464 Self::start_jwk_updater(
1465 config,
1466 sui_node_metrics,
1467 state.name,
1468 epoch_store.clone(),
1469 consensus_adapter.clone(),
1470 );
1471 }
1472
1473 Ok(ValidatorComponents {
1474 validator_server_handle,
1475 validator_overload_monitor_handle,
1476 consensus_manager,
1477 consensus_store_pruner,
1478 consensus_adapter,
1479 checkpoint_metrics,
1480 sui_tx_validator_metrics,
1481 })
1482 }
1483
1484 fn build_checkpoint_service(
1485 config: &NodeConfig,
1486 consensus_adapter: Arc<ConsensusAdapter>,
1487 checkpoint_store: Arc<CheckpointStore>,
1488 epoch_store: Arc<AuthorityPerEpochStore>,
1489 state: Arc<AuthorityState>,
1490 state_sync_handle: state_sync::Handle,
1491 state_hasher: Weak<GlobalStateHasher>,
1492 checkpoint_metrics: Arc<CheckpointMetrics>,
1493 ) -> Arc<CheckpointService> {
1494 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1495 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1496
1497 debug!(
1498 "Starting checkpoint service with epoch start timestamp {}
1499 and epoch duration {}",
1500 epoch_start_timestamp_ms, epoch_duration_ms
1501 );
1502
1503 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1504 sender: consensus_adapter,
1505 signer: state.secret.clone(),
1506 authority: config.protocol_public_key(),
1507 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1508 .checked_add(epoch_duration_ms)
1509 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1510 metrics: checkpoint_metrics.clone(),
1511 });
1512
1513 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1514 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1515 let max_checkpoint_size_bytes =
1516 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1517
1518 CheckpointService::build(
1519 state.clone(),
1520 checkpoint_store,
1521 epoch_store,
1522 state.get_transaction_cache_reader().clone(),
1523 state_hasher,
1524 checkpoint_output,
1525 Box::new(certified_checkpoint_output),
1526 checkpoint_metrics,
1527 max_tx_per_checkpoint,
1528 max_checkpoint_size_bytes,
1529 )
1530 }
1531
1532 fn construct_consensus_adapter(
1533 committee: &Committee,
1534 consensus_config: &ConsensusConfig,
1535 authority: AuthorityName,
1536 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1537 prometheus_registry: &Registry,
1538 protocol_config: ProtocolConfig,
1539 consensus_client: Arc<dyn ConsensusClient>,
1540 checkpoint_store: Arc<CheckpointStore>,
1541 ) -> ConsensusAdapter {
1542 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1543 ConsensusAdapter::new(
1546 consensus_client,
1547 checkpoint_store,
1548 authority,
1549 connection_monitor_status,
1550 consensus_config.max_pending_transactions(),
1551 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1552 consensus_config.max_submit_position,
1553 consensus_config.submit_delay_step_override(),
1554 ca_metrics,
1555 protocol_config,
1556 )
1557 }
1558
1559 async fn start_grpc_validator_service(
1560 config: &NodeConfig,
1561 state: Arc<AuthorityState>,
1562 consensus_adapter: Arc<ConsensusAdapter>,
1563 prometheus_registry: &Registry,
1564 ) -> Result<SpawnOnce> {
1565 let validator_service = ValidatorService::new(
1566 state.clone(),
1567 consensus_adapter,
1568 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1569 config.policy_config.clone().map(|p| p.client_id_source),
1570 );
1571
1572 let mut server_conf = mysten_network::config::Config::new();
1573 server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1574 server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1575 server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1576 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1577 server_conf.load_shed = config.grpc_load_shed;
1578 let mut server_builder =
1579 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1580
1581 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1582
1583 let tls_config = sui_tls::create_rustls_server_config(
1584 config.network_key_pair().copy().private(),
1585 SUI_TLS_SERVER_NAME.to_string(),
1586 );
1587
1588 let network_address = config.network_address().clone();
1589
1590 let (ready_tx, ready_rx) = oneshot::channel();
1591
1592 Ok(SpawnOnce::new(ready_rx, async move {
1593 let server = server_builder
1594 .bind(&network_address, Some(tls_config))
1595 .await
1596 .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1597 let local_addr = server.local_addr();
1598 info!("Listening to traffic on {local_addr}");
1599 ready_tx.send(()).unwrap();
1600 if let Err(err) = server.serve().await {
1601 info!("Server stopped: {err}");
1602 }
1603 info!("Server stopped");
1604 }))
1605 }
1606
1607 pub fn state(&self) -> Arc<AuthorityState> {
1608 self.state.clone()
1609 }
1610
1611 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1613 self.state.reference_gas_price_for_testing()
1614 }
1615
1616 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1617 self.state.committee_store().clone()
1618 }
1619
1620 pub fn clone_authority_aggregator(
1631 &self,
1632 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1633 self.transaction_orchestrator
1634 .as_ref()
1635 .map(|to| to.clone_authority_aggregator())
1636 }
1637
1638 pub fn transaction_orchestrator(
1639 &self,
1640 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1641 self.transaction_orchestrator.clone()
1642 }
1643
1644 pub async fn monitor_reconfiguration(
1647 self: Arc<Self>,
1648 mut epoch_store: Arc<AuthorityPerEpochStore>,
1649 ) -> Result<()> {
1650 let checkpoint_executor_metrics =
1651 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1652
1653 loop {
1654 let mut hasher_guard = self.global_state_hasher.lock().await;
1655 let hasher = hasher_guard.take().unwrap();
1656 info!(
1657 "Creating checkpoint executor for epoch {}",
1658 epoch_store.epoch()
1659 );
1660 let checkpoint_executor = CheckpointExecutor::new(
1661 epoch_store.clone(),
1662 self.checkpoint_store.clone(),
1663 self.state.clone(),
1664 hasher.clone(),
1665 self.backpressure_manager.clone(),
1666 self.config.checkpoint_executor_config.clone(),
1667 checkpoint_executor_metrics.clone(),
1668 self.subscription_service_checkpoint_sender.clone(),
1669 );
1670
1671 let run_with_range = self.config.run_with_range;
1672
1673 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1674
1675 self.metrics
1677 .current_protocol_version
1678 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1679
1680 if let Some(components) = &*self.validator_components.lock().await {
1682 tokio::time::sleep(Duration::from_millis(1)).await;
1684
1685 let config = cur_epoch_store.protocol_config();
1686 let mut supported_protocol_versions = self
1687 .config
1688 .supported_protocol_versions
1689 .expect("Supported versions should be populated")
1690 .truncate_below(config.version);
1692
1693 while supported_protocol_versions.max > config.version {
1694 let proposed_protocol_config = ProtocolConfig::get_for_version(
1695 supported_protocol_versions.max,
1696 cur_epoch_store.get_chain(),
1697 );
1698
1699 if proposed_protocol_config.enable_accumulators()
1700 && !epoch_store.accumulator_root_exists()
1701 {
1702 error!(
1703 "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1704 supported_protocol_versions.max
1705 );
1706 supported_protocol_versions.max = supported_protocol_versions.max.prev();
1707 } else {
1708 break;
1709 }
1710 }
1711
1712 let binary_config = config.binary_config(None);
1713 let transaction = ConsensusTransaction::new_capability_notification_v2(
1714 AuthorityCapabilitiesV2::new(
1715 self.state.name,
1716 cur_epoch_store.get_chain_identifier().chain(),
1717 supported_protocol_versions,
1718 self.state
1719 .get_available_system_packages(&binary_config)
1720 .await,
1721 ),
1722 );
1723 info!(?transaction, "submitting capabilities to consensus");
1724 components.consensus_adapter.submit(
1725 transaction,
1726 None,
1727 &cur_epoch_store,
1728 None,
1729 None,
1730 )?;
1731 }
1732
1733 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1734
1735 if stop_condition == StopReason::RunWithRangeCondition {
1736 SuiNode::shutdown(&self).await;
1737 self.shutdown_channel_tx
1738 .send(run_with_range)
1739 .expect("RunWithRangeCondition met but failed to send shutdown message");
1740 return Ok(());
1741 }
1742
1743 let latest_system_state = self
1745 .state
1746 .get_object_cache_reader()
1747 .get_sui_system_state_object_unsafe()
1748 .expect("Read Sui System State object cannot fail");
1749
1750 #[cfg(msim)]
1751 if !self
1752 .sim_state
1753 .sim_safe_mode_expected
1754 .load(Ordering::Relaxed)
1755 {
1756 debug_assert!(!latest_system_state.safe_mode());
1757 }
1758
1759 #[cfg(not(msim))]
1760 debug_assert!(!latest_system_state.safe_mode());
1761
1762 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1763 && self.state.is_fullnode(&cur_epoch_store)
1764 {
1765 warn!(
1766 "Failed to send end of epoch notification to subscriber: {:?}",
1767 err
1768 );
1769 }
1770
1771 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1772 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1773
1774 self.auth_agg.store(Arc::new(
1775 self.auth_agg
1776 .load()
1777 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1778 ));
1779
1780 let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1781 let next_epoch = next_epoch_committee.epoch();
1782 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1783
1784 info!(
1785 next_epoch,
1786 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1787 );
1788
1789 fail_point_async!("reconfig_delay");
1790
1791 let authority_names_to_peer_ids =
1796 new_epoch_start_state.get_authority_names_to_peer_ids();
1797 self.connection_monitor_status
1798 .update_mapping_for_epoch(authority_names_to_peer_ids);
1799
1800 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1801
1802 update_peer_addresses(&self.config, &self.endpoint_manager, &new_epoch_start_state);
1803
1804 let mut validator_components_lock_guard = self.validator_components.lock().await;
1805
1806 let new_epoch_store = self
1810 .reconfigure_state(
1811 &self.state,
1812 &cur_epoch_store,
1813 next_epoch_committee.clone(),
1814 new_epoch_start_state,
1815 hasher.clone(),
1816 )
1817 .await;
1818
1819 let new_validator_components = if let Some(ValidatorComponents {
1820 validator_server_handle,
1821 validator_overload_monitor_handle,
1822 consensus_manager,
1823 consensus_store_pruner,
1824 consensus_adapter,
1825 checkpoint_metrics,
1826 sui_tx_validator_metrics,
1827 }) = validator_components_lock_guard.take()
1828 {
1829 info!("Reconfiguring the validator.");
1830
1831 consensus_manager.shutdown().await;
1832 info!("Consensus has shut down.");
1833
1834 info!("Epoch store finished reconfiguration.");
1835
1836 let global_state_hasher_metrics = Arc::into_inner(hasher)
1839 .expect("Object state hasher should have no other references at this point")
1840 .metrics();
1841 let new_hasher = Arc::new(GlobalStateHasher::new(
1842 self.state.get_global_state_hash_store().clone(),
1843 global_state_hasher_metrics,
1844 ));
1845 let weak_hasher = Arc::downgrade(&new_hasher);
1846 *hasher_guard = Some(new_hasher);
1847
1848 consensus_store_pruner.prune(next_epoch).await;
1849
1850 if self.state.is_validator(&new_epoch_store) {
1851 Some(
1853 Self::start_epoch_specific_validator_components(
1854 &self.config,
1855 self.state.clone(),
1856 consensus_adapter,
1857 self.checkpoint_store.clone(),
1858 new_epoch_store.clone(),
1859 self.state_sync_handle.clone(),
1860 self.randomness_handle.clone(),
1861 consensus_manager,
1862 consensus_store_pruner,
1863 weak_hasher,
1864 self.backpressure_manager.clone(),
1865 validator_server_handle,
1866 validator_overload_monitor_handle,
1867 checkpoint_metrics,
1868 self.metrics.clone(),
1869 sui_tx_validator_metrics,
1870 )
1871 .await?,
1872 )
1873 } else {
1874 info!("This node is no longer a validator after reconfiguration");
1875 None
1876 }
1877 } else {
1878 let global_state_hasher_metrics = Arc::into_inner(hasher)
1881 .expect("Object state hasher should have no other references at this point")
1882 .metrics();
1883 let new_hasher = Arc::new(GlobalStateHasher::new(
1884 self.state.get_global_state_hash_store().clone(),
1885 global_state_hasher_metrics,
1886 ));
1887 let weak_hasher = Arc::downgrade(&new_hasher);
1888 *hasher_guard = Some(new_hasher);
1889
1890 if self.state.is_validator(&new_epoch_store) {
1891 info!("Promoting the node from fullnode to validator, starting grpc server");
1892
1893 let mut components = Self::construct_validator_components(
1894 self.config.clone(),
1895 self.state.clone(),
1896 Arc::new(next_epoch_committee.clone()),
1897 new_epoch_store.clone(),
1898 self.checkpoint_store.clone(),
1899 self.state_sync_handle.clone(),
1900 self.randomness_handle.clone(),
1901 weak_hasher,
1902 self.backpressure_manager.clone(),
1903 self.connection_monitor_status.clone(),
1904 &self.registry_service,
1905 self.metrics.clone(),
1906 self.checkpoint_metrics.clone(),
1907 )
1908 .await?;
1909
1910 components.validator_server_handle =
1911 components.validator_server_handle.start().await;
1912
1913 self.endpoint_manager
1915 .set_consensus_address_updater(components.consensus_manager.clone());
1916
1917 Some(components)
1918 } else {
1919 None
1920 }
1921 };
1922 *validator_components_lock_guard = new_validator_components;
1923
1924 cur_epoch_store.release_db_handles();
1927
1928 if cfg!(msim)
1929 && !matches!(
1930 self.config
1931 .authority_store_pruning_config
1932 .num_epochs_to_retain_for_checkpoints(),
1933 None | Some(u64::MAX) | Some(0)
1934 )
1935 {
1936 self.state
1937 .prune_checkpoints_for_eligible_epochs_for_testing(
1938 self.config.clone(),
1939 sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1940 )
1941 .await?;
1942 }
1943
1944 epoch_store = new_epoch_store;
1945 info!("Reconfiguration finished");
1946 }
1947 }
1948
1949 async fn shutdown(&self) {
1950 if let Some(validator_components) = &*self.validator_components.lock().await {
1951 validator_components.consensus_manager.shutdown().await;
1952 }
1953 }
1954
1955 async fn reconfigure_state(
1956 &self,
1957 state: &Arc<AuthorityState>,
1958 cur_epoch_store: &AuthorityPerEpochStore,
1959 next_epoch_committee: Committee,
1960 next_epoch_start_system_state: EpochStartSystemState,
1961 global_state_hasher: Arc<GlobalStateHasher>,
1962 ) -> Arc<AuthorityPerEpochStore> {
1963 let next_epoch = next_epoch_committee.epoch();
1964
1965 let last_checkpoint = self
1966 .checkpoint_store
1967 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1968 .expect("Error loading last checkpoint for current epoch")
1969 .expect("Could not load last checkpoint for current epoch");
1970
1971 let last_checkpoint_seq = *last_checkpoint.sequence_number();
1972
1973 assert_eq!(
1974 Some(last_checkpoint_seq),
1975 self.checkpoint_store
1976 .get_highest_executed_checkpoint_seq_number()
1977 .expect("Error loading highest executed checkpoint sequence number")
1978 );
1979
1980 let epoch_start_configuration = EpochStartConfiguration::new(
1981 next_epoch_start_system_state,
1982 *last_checkpoint.digest(),
1983 state.get_object_store().as_ref(),
1984 EpochFlag::default_flags_for_new_epoch(&state.config),
1985 )
1986 .expect("EpochStartConfiguration construction cannot fail");
1987
1988 let new_epoch_store = self
1989 .state
1990 .reconfigure(
1991 cur_epoch_store,
1992 self.config.supported_protocol_versions.unwrap(),
1993 next_epoch_committee,
1994 epoch_start_configuration,
1995 global_state_hasher,
1996 &self.config.expensive_safety_check_config,
1997 last_checkpoint_seq,
1998 )
1999 .await
2000 .expect("Reconfigure authority state cannot fail");
2001 info!(next_epoch, "Node State has been reconfigured");
2002 assert_eq!(next_epoch, new_epoch_store.epoch());
2003 self.state.get_reconfig_api().update_epoch_flags_metrics(
2004 cur_epoch_store.epoch_start_config().flags(),
2005 new_epoch_store.epoch_start_config().flags(),
2006 );
2007
2008 new_epoch_store
2009 }
2010
2011 pub fn get_config(&self) -> &NodeConfig {
2012 &self.config
2013 }
2014
2015 pub fn randomness_handle(&self) -> randomness::Handle {
2016 self.randomness_handle.clone()
2017 }
2018
2019 pub fn endpoint_manager(&self) -> &EndpointManager {
2020 &self.endpoint_manager
2021 }
2022
2023 fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2025 let digest_str = digest.to_string();
2026 if digest_str.len() >= 8 {
2027 digest_str[0..8].to_string()
2028 } else {
2029 digest_str
2030 }
2031 }
2032
2033 async fn check_and_recover_forks(
2037 checkpoint_store: &CheckpointStore,
2038 checkpoint_metrics: &CheckpointMetrics,
2039 is_validator: bool,
2040 fork_recovery: Option<&ForkRecoveryConfig>,
2041 ) -> Result<()> {
2042 if !is_validator {
2045 return Ok(());
2046 }
2047
2048 if let Some(recovery) = fork_recovery {
2050 Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2051 Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2052 }
2053
2054 if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2055 .get_checkpoint_fork_detected()
2056 .map_err(|e| {
2057 error!("Failed to check for checkpoint fork: {:?}", e);
2058 e
2059 })?
2060 {
2061 Self::handle_checkpoint_fork(
2062 checkpoint_seq,
2063 checkpoint_digest,
2064 checkpoint_metrics,
2065 fork_recovery,
2066 )
2067 .await?;
2068 }
2069 if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2070 .get_transaction_fork_detected()
2071 .map_err(|e| {
2072 error!("Failed to check for transaction fork: {:?}", e);
2073 e
2074 })?
2075 {
2076 Self::handle_transaction_fork(
2077 tx_digest,
2078 expected_effects,
2079 actual_effects,
2080 checkpoint_metrics,
2081 fork_recovery,
2082 )
2083 .await?;
2084 }
2085
2086 Ok(())
2087 }
2088
2089 fn try_recover_checkpoint_fork(
2090 checkpoint_store: &CheckpointStore,
2091 recovery: &ForkRecoveryConfig,
2092 ) -> Result<()> {
2093 for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2096 let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2097 anyhow::bail!(
2098 "Invalid checkpoint digest override for seq {}: {}",
2099 seq,
2100 expected_digest_str
2101 );
2102 };
2103
2104 if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2105 let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2106 if local_digest != expected_digest {
2107 info!(
2108 seq,
2109 local = %Self::get_digest_prefix(local_digest),
2110 expected = %Self::get_digest_prefix(expected_digest),
2111 "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2112 seq
2113 );
2114 checkpoint_store
2115 .clear_locally_computed_checkpoints_from(*seq)
2116 .context(
2117 "Failed to clear locally computed checkpoints from override seq",
2118 )?;
2119 }
2120 }
2121 }
2122
2123 if let Some((checkpoint_seq, checkpoint_digest)) =
2124 checkpoint_store.get_checkpoint_fork_detected()?
2125 && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2126 {
2127 info!(
2128 "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2129 checkpoint_seq, checkpoint_digest
2130 );
2131 checkpoint_store
2132 .clear_checkpoint_fork_detected()
2133 .expect("Failed to clear checkpoint fork detected marker");
2134 }
2135 Ok(())
2136 }
2137
2138 fn try_recover_transaction_fork(
2139 checkpoint_store: &CheckpointStore,
2140 recovery: &ForkRecoveryConfig,
2141 ) -> Result<()> {
2142 if recovery.transaction_overrides.is_empty() {
2143 return Ok(());
2144 }
2145
2146 if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2147 && recovery
2148 .transaction_overrides
2149 .contains_key(&tx_digest.to_string())
2150 {
2151 info!(
2152 "Fork recovery enabled: clearing transaction fork for tx {:?}",
2153 tx_digest
2154 );
2155 checkpoint_store
2156 .clear_transaction_fork_detected()
2157 .expect("Failed to clear transaction fork detected marker");
2158 }
2159 Ok(())
2160 }
2161
2162 fn get_current_timestamp() -> u64 {
2163 std::time::SystemTime::now()
2164 .duration_since(std::time::SystemTime::UNIX_EPOCH)
2165 .unwrap()
2166 .as_secs()
2167 }
2168
2169 async fn handle_checkpoint_fork(
2170 checkpoint_seq: u64,
2171 checkpoint_digest: CheckpointDigest,
2172 checkpoint_metrics: &CheckpointMetrics,
2173 fork_recovery: Option<&ForkRecoveryConfig>,
2174 ) -> Result<()> {
2175 checkpoint_metrics
2176 .checkpoint_fork_crash_mode
2177 .with_label_values(&[
2178 &checkpoint_seq.to_string(),
2179 &Self::get_digest_prefix(checkpoint_digest),
2180 &Self::get_current_timestamp().to_string(),
2181 ])
2182 .set(1);
2183
2184 let behavior = fork_recovery
2185 .map(|fr| fr.fork_crash_behavior)
2186 .unwrap_or_default();
2187
2188 match behavior {
2189 ForkCrashBehavior::AwaitForkRecovery => {
2190 error!(
2191 checkpoint_seq = checkpoint_seq,
2192 checkpoint_digest = ?checkpoint_digest,
2193 "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2194 );
2195 futures::future::pending::<()>().await;
2196 unreachable!("pending() should never return");
2197 }
2198 ForkCrashBehavior::ReturnError => {
2199 error!(
2200 checkpoint_seq = checkpoint_seq,
2201 checkpoint_digest = ?checkpoint_digest,
2202 "Checkpoint fork detected! Returning error."
2203 );
2204 Err(anyhow::anyhow!(
2205 "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2206 checkpoint_seq,
2207 checkpoint_digest
2208 ))
2209 }
2210 }
2211 }
2212
2213 async fn handle_transaction_fork(
2214 tx_digest: TransactionDigest,
2215 expected_effects_digest: TransactionEffectsDigest,
2216 actual_effects_digest: TransactionEffectsDigest,
2217 checkpoint_metrics: &CheckpointMetrics,
2218 fork_recovery: Option<&ForkRecoveryConfig>,
2219 ) -> Result<()> {
2220 checkpoint_metrics
2221 .transaction_fork_crash_mode
2222 .with_label_values(&[
2223 &Self::get_digest_prefix(tx_digest),
2224 &Self::get_digest_prefix(expected_effects_digest),
2225 &Self::get_digest_prefix(actual_effects_digest),
2226 &Self::get_current_timestamp().to_string(),
2227 ])
2228 .set(1);
2229
2230 let behavior = fork_recovery
2231 .map(|fr| fr.fork_crash_behavior)
2232 .unwrap_or_default();
2233
2234 match behavior {
2235 ForkCrashBehavior::AwaitForkRecovery => {
2236 error!(
2237 tx_digest = ?tx_digest,
2238 expected_effects_digest = ?expected_effects_digest,
2239 actual_effects_digest = ?actual_effects_digest,
2240 "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2241 );
2242 futures::future::pending::<()>().await;
2243 unreachable!("pending() should never return");
2244 }
2245 ForkCrashBehavior::ReturnError => {
2246 error!(
2247 tx_digest = ?tx_digest,
2248 expected_effects_digest = ?expected_effects_digest,
2249 actual_effects_digest = ?actual_effects_digest,
2250 "Transaction fork detected! Returning error."
2251 );
2252 Err(anyhow::anyhow!(
2253 "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2254 tx_digest,
2255 expected_effects_digest,
2256 actual_effects_digest
2257 ))
2258 }
2259 }
2260 }
2261}
2262
2263#[cfg(not(msim))]
2264impl SuiNode {
2265 async fn fetch_jwks(
2266 _authority: AuthorityName,
2267 provider: &OIDCProvider,
2268 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2269 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2270 use sui_types::error::SuiErrorKind;
2271 let client = reqwest::Client::new();
2272 fetch_jwks(provider, &client, true)
2273 .await
2274 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2275 }
2276}
2277
2278#[cfg(msim)]
2279impl SuiNode {
2280 pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2281 self.sim_state.sim_node.id()
2282 }
2283
2284 pub fn set_safe_mode_expected(&self, new_value: bool) {
2285 info!("Setting safe mode expected to {}", new_value);
2286 self.sim_state
2287 .sim_safe_mode_expected
2288 .store(new_value, Ordering::Relaxed);
2289 }
2290
2291 #[allow(unused_variables)]
2292 async fn fetch_jwks(
2293 authority: AuthorityName,
2294 provider: &OIDCProvider,
2295 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2296 get_jwk_injector()(authority, provider)
2297 }
2298}
2299
2300enum SpawnOnce {
2301 Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2303 #[allow(unused)]
2304 Started(JoinHandle<()>),
2305}
2306
2307impl SpawnOnce {
2308 pub fn new(
2309 ready_rx: oneshot::Receiver<()>,
2310 future: impl Future<Output = ()> + Send + 'static,
2311 ) -> Self {
2312 Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2313 }
2314
2315 pub async fn start(self) -> Self {
2316 match self {
2317 Self::Unstarted(ready_rx, future) => {
2318 let future = future.into_inner();
2319 let handle = tokio::spawn(future);
2320 ready_rx.await.unwrap();
2321 Self::Started(handle)
2322 }
2323 Self::Started(_) => self,
2324 }
2325 }
2326}
2327
2328fn update_peer_addresses(
2330 config: &NodeConfig,
2331 endpoint_manager: &EndpointManager,
2332 epoch_start_state: &EpochStartSystemState,
2333) {
2334 for (peer_id, address) in
2335 epoch_start_state.get_validator_as_p2p_peers(config.protocol_public_key())
2336 {
2337 endpoint_manager
2338 .update_endpoint(
2339 EndpointId::P2p(peer_id),
2340 AddressSource::Committee,
2341 vec![address],
2342 )
2343 .expect("Updating peer addresses should not fail");
2344 }
2345}
2346
2347fn build_kv_store(
2348 state: &Arc<AuthorityState>,
2349 config: &NodeConfig,
2350 registry: &Registry,
2351) -> Result<Arc<TransactionKeyValueStore>> {
2352 let metrics = KeyValueStoreMetrics::new(registry);
2353 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2354
2355 let base_url = &config.transaction_kv_store_read_config.base_url;
2356
2357 if base_url.is_empty() {
2358 info!("no http kv store url provided, using local db only");
2359 return Ok(Arc::new(db_store));
2360 }
2361
2362 let base_url: url::Url = base_url.parse().tap_err(|e| {
2363 error!(
2364 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2365 base_url, e
2366 )
2367 })?;
2368
2369 let network_str = match state.get_chain_identifier().chain() {
2370 Chain::Mainnet => "/mainnet",
2371 _ => {
2372 info!("using local db only for kv store");
2373 return Ok(Arc::new(db_store));
2374 }
2375 };
2376
2377 let base_url = base_url.join(network_str)?.to_string();
2378 let http_store = HttpKVStore::new_kv(
2379 &base_url,
2380 config.transaction_kv_store_read_config.cache_size,
2381 metrics.clone(),
2382 )?;
2383 info!("using local key-value store with fallback to http key-value store");
2384 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2385 db_store,
2386 http_store,
2387 metrics,
2388 "json_rpc_fallback",
2389 )))
2390}
2391
2392async fn build_http_servers(
2393 state: Arc<AuthorityState>,
2394 store: RocksDbStore,
2395 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2396 config: &NodeConfig,
2397 prometheus_registry: &Registry,
2398 server_version: ServerVersion,
2399) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2400 if config.consensus_config().is_some() {
2402 return Ok((HttpServers::default(), None));
2403 }
2404
2405 let mut router = axum::Router::new();
2406
2407 let json_rpc_router = {
2408 let traffic_controller = state.traffic_controller.clone();
2409 let mut server = JsonRpcServerBuilder::new(
2410 env!("CARGO_PKG_VERSION"),
2411 prometheus_registry,
2412 traffic_controller,
2413 config.policy_config.clone(),
2414 );
2415
2416 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2417
2418 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2419 server.register_module(ReadApi::new(
2420 state.clone(),
2421 kv_store.clone(),
2422 metrics.clone(),
2423 ))?;
2424 server.register_module(CoinReadApi::new(
2425 state.clone(),
2426 kv_store.clone(),
2427 metrics.clone(),
2428 ))?;
2429
2430 if config.run_with_range.is_none() {
2433 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2434 }
2435 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2436 server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2437
2438 if let Some(transaction_orchestrator) = transaction_orchestrator {
2439 server.register_module(TransactionExecutionApi::new(
2440 state.clone(),
2441 transaction_orchestrator.clone(),
2442 metrics.clone(),
2443 ))?;
2444 }
2445
2446 let name_service_config =
2447 if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2448 config.name_service_package_address,
2449 config.name_service_registry_id,
2450 config.name_service_reverse_registry_id,
2451 ) {
2452 sui_name_service::NameServiceConfig::new(
2453 package_address,
2454 registry_id,
2455 reverse_registry_id,
2456 )
2457 } else {
2458 match state.get_chain_identifier().chain() {
2459 Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2460 Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2461 Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2462 }
2463 };
2464
2465 server.register_module(IndexerApi::new(
2466 state.clone(),
2467 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2468 kv_store,
2469 name_service_config,
2470 metrics,
2471 config.indexer_max_subscriptions,
2472 ))?;
2473 server.register_module(MoveUtils::new(state.clone()))?;
2474
2475 let server_type = config.jsonrpc_server_type();
2476
2477 server.to_router(server_type).await?
2478 };
2479
2480 router = router.merge(json_rpc_router);
2481
2482 let (subscription_service_checkpoint_sender, subscription_service_handle) =
2483 SubscriptionService::build(prometheus_registry);
2484 let rpc_router = {
2485 let mut rpc_service =
2486 sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2487 rpc_service.with_server_version(server_version);
2488
2489 if let Some(config) = config.rpc.clone() {
2490 rpc_service.with_config(config);
2491 }
2492
2493 rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2494 rpc_service.with_subscription_service(subscription_service_handle);
2495
2496 if let Some(transaction_orchestrator) = transaction_orchestrator {
2497 rpc_service.with_executor(transaction_orchestrator.clone())
2498 }
2499
2500 rpc_service.into_router().await
2501 };
2502
2503 let layers = ServiceBuilder::new()
2504 .map_request(|mut request: axum::http::Request<_>| {
2505 if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2506 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2507 request.extensions_mut().insert(axum_connect_info);
2508 }
2509 request
2510 })
2511 .layer(axum::middleware::from_fn(server_timing_middleware))
2512 .layer(
2514 tower_http::cors::CorsLayer::new()
2515 .allow_methods([http::Method::GET, http::Method::POST])
2516 .allow_origin(tower_http::cors::Any)
2517 .allow_headers(tower_http::cors::Any)
2518 .expose_headers(tower_http::cors::Any),
2519 );
2520
2521 router = router.merge(rpc_router).layer(layers);
2522
2523 let https = if let Some((tls_config, https_address)) = config
2524 .rpc()
2525 .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2526 {
2527 let https = sui_http::Builder::new()
2528 .tls_single_cert(tls_config.cert(), tls_config.key())
2529 .and_then(|builder| builder.serve(https_address, router.clone()))
2530 .map_err(|e| anyhow::anyhow!(e))?;
2531
2532 info!(
2533 https_address =? https.local_addr(),
2534 "HTTPS rpc server listening on {}",
2535 https.local_addr()
2536 );
2537
2538 Some(https)
2539 } else {
2540 None
2541 };
2542
2543 let http = sui_http::Builder::new()
2544 .serve(&config.json_rpc_address, router)
2545 .map_err(|e| anyhow::anyhow!(e))?;
2546
2547 info!(
2548 http_address =? http.local_addr(),
2549 "HTTP rpc server listening on {}",
2550 http.local_addr()
2551 );
2552
2553 Ok((
2554 HttpServers {
2555 http: Some(http),
2556 https,
2557 },
2558 Some(subscription_service_checkpoint_sender),
2559 ))
2560}
2561
2562#[cfg(not(test))]
2563fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2564 protocol_config.max_transactions_per_checkpoint() as usize
2565}
2566
2567#[cfg(test)]
2568fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2569 2
2570}
2571
2572#[derive(Default)]
2573struct HttpServers {
2574 #[allow(unused)]
2575 http: Option<sui_http::ServerHandle>,
2576 #[allow(unused)]
2577 https: Option<sui_http::ServerHandle>,
2578}
2579
2580#[cfg(test)]
2581mod tests {
2582 use super::*;
2583 use prometheus::Registry;
2584 use std::collections::BTreeMap;
2585 use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2586 use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2587 use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2588
2589 #[tokio::test]
2590 async fn test_fork_error_and_recovery_both_paths() {
2591 let checkpoint_store = CheckpointStore::new_for_tests();
2592 let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2593
2594 let seq_num = 42;
2596 let digest = CheckpointDigest::random();
2597 checkpoint_store
2598 .record_checkpoint_fork_detected(seq_num, digest)
2599 .unwrap();
2600
2601 let fork_recovery = ForkRecoveryConfig {
2602 transaction_overrides: Default::default(),
2603 checkpoint_overrides: Default::default(),
2604 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2605 };
2606
2607 let r = SuiNode::check_and_recover_forks(
2608 &checkpoint_store,
2609 &checkpoint_metrics,
2610 true,
2611 Some(&fork_recovery),
2612 )
2613 .await;
2614 assert!(r.is_err());
2615 assert!(
2616 r.unwrap_err()
2617 .to_string()
2618 .contains("Checkpoint fork detected")
2619 );
2620
2621 let mut checkpoint_overrides = BTreeMap::new();
2622 checkpoint_overrides.insert(seq_num, digest.to_string());
2623 let fork_recovery_with_override = ForkRecoveryConfig {
2624 transaction_overrides: Default::default(),
2625 checkpoint_overrides,
2626 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2627 };
2628 let r = SuiNode::check_and_recover_forks(
2629 &checkpoint_store,
2630 &checkpoint_metrics,
2631 true,
2632 Some(&fork_recovery_with_override),
2633 )
2634 .await;
2635 assert!(r.is_ok());
2636 assert!(
2637 checkpoint_store
2638 .get_checkpoint_fork_detected()
2639 .unwrap()
2640 .is_none()
2641 );
2642
2643 let tx_digest = TransactionDigest::random();
2645 let expected_effects = TransactionEffectsDigest::random();
2646 let actual_effects = TransactionEffectsDigest::random();
2647 checkpoint_store
2648 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2649 .unwrap();
2650
2651 let fork_recovery = ForkRecoveryConfig {
2652 transaction_overrides: Default::default(),
2653 checkpoint_overrides: Default::default(),
2654 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2655 };
2656 let r = SuiNode::check_and_recover_forks(
2657 &checkpoint_store,
2658 &checkpoint_metrics,
2659 true,
2660 Some(&fork_recovery),
2661 )
2662 .await;
2663 assert!(r.is_err());
2664 assert!(
2665 r.unwrap_err()
2666 .to_string()
2667 .contains("Transaction fork detected")
2668 );
2669
2670 let mut transaction_overrides = BTreeMap::new();
2671 transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2672 let fork_recovery_with_override = ForkRecoveryConfig {
2673 transaction_overrides,
2674 checkpoint_overrides: Default::default(),
2675 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2676 };
2677 let r = SuiNode::check_and_recover_forks(
2678 &checkpoint_store,
2679 &checkpoint_metrics,
2680 true,
2681 Some(&fork_recovery_with_override),
2682 )
2683 .await;
2684 assert!(r.is_ok());
2685 assert!(
2686 checkpoint_store
2687 .get_transaction_fork_detected()
2688 .unwrap()
2689 .is_none()
2690 );
2691 }
2692}