1use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::debug_fatal;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::authority::ExecutionEnv;
29use sui_core::authority::RandomnessRoundReceiver;
30use sui_core::authority::authority_store_tables::{
31 AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
32};
33use sui_core::authority::backpressure::BackpressureManager;
34use sui_core::authority::epoch_start_configuration::EpochFlag;
35use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
36use sui_core::authority::shared_object_version_manager::Schedulable;
37use sui_core::consensus_adapter::ConsensusClient;
38use sui_core::consensus_manager::UpdatableConsensusClient;
39use sui_core::epoch::randomness::RandomnessManager;
40use sui_core::execution_cache::build_execution_cache;
41use sui_network::validator::server::SUI_TLS_SERVER_NAME;
42use sui_types::full_checkpoint_content::Checkpoint;
43
44use sui_core::execution_scheduler::SchedulingSource;
45use sui_core::global_state_hasher::GlobalStateHashMetrics;
46use sui_core::storage::RestReadStore;
47use sui_json_rpc::bridge_api::BridgeReadApi;
48use sui_json_rpc_api::JsonRpcMetrics;
49use sui_network::randomness;
50use sui_rpc_api::RpcMetrics;
51use sui_rpc_api::ServerVersion;
52use sui_rpc_api::subscription::SubscriptionService;
53use sui_types::base_types::ConciseableName;
54use sui_types::crypto::RandomnessRound;
55use sui_types::digests::{
56 ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
57};
58use sui_types::executable_transaction::VerifiedExecutableTransaction;
59use sui_types::messages_consensus::AuthorityCapabilitiesV2;
60use sui_types::messages_consensus::ConsensusTransactionKind;
61use sui_types::sui_system_state::SuiSystemState;
62use sui_types::transaction::VerifiedCertificate;
63use tap::tap::TapFallible;
64use tokio::sync::oneshot;
65use tokio::sync::{Mutex, broadcast, mpsc, watch};
66use tokio::task::JoinHandle;
67use tower::ServiceBuilder;
68use tracing::{Instrument, error_span, info};
69use tracing::{debug, error, warn};
70
71use fastcrypto_zkp::bn254::zk_login::JWK;
72pub use handle::SuiNodeHandle;
73use mysten_metrics::{RegistryService, spawn_monitored_task};
74use mysten_service::server_timing::server_timing_middleware;
75use sui_config::node::{DBCheckpointConfig, RunWithRange};
76use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
77use sui_config::node_config_metrics::NodeConfigMetrics;
78use sui_config::{ConsensusConfig, NodeConfig};
79use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
80use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
81use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
82use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
83use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
84use sui_core::authority_aggregator::{AuthAggMetrics, AuthorityAggregator};
85use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
86use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
87use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
88use sui_core::checkpoints::{
89 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
90 SubmitCheckpointToConsensus,
91};
92use sui_core::consensus_adapter::{
93 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
94};
95use sui_core::consensus_manager::ConsensusManager;
96use sui_core::consensus_throughput_calculator::{
97 ConsensusThroughputCalculator, ConsensusThroughputProfiler, ThroughputProfileRanges,
98};
99use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
100use sui_core::db_checkpoint_handler::DBCheckpointHandler;
101use sui_core::epoch::committee_store::CommitteeStore;
102use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
103use sui_core::epoch::epoch_metrics::EpochMetrics;
104use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
105use sui_core::global_state_hasher::GlobalStateHasher;
106use sui_core::jsonrpc_index::IndexStore;
107use sui_core::module_cache_metrics::ResolverMetrics;
108use sui_core::overload_monitor::overload_monitor;
109use sui_core::rpc_index::RpcIndexStore;
110use sui_core::signature_verifier::SignatureVerifierMetrics;
111use sui_core::storage::RocksDbStore;
112use sui_core::transaction_orchestrator::TransactionOrchestrator;
113use sui_core::{
114 authority::{AuthorityState, AuthorityStore},
115 authority_client::NetworkAuthorityClient,
116};
117use sui_json_rpc::JsonRpcServerBuilder;
118use sui_json_rpc::coin_api::CoinReadApi;
119use sui_json_rpc::governance_api::GovernanceReadApi;
120use sui_json_rpc::indexer_api::IndexerApi;
121use sui_json_rpc::move_utils::MoveUtils;
122use sui_json_rpc::read_api::ReadApi;
123use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
124use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
125use sui_macros::fail_point;
126use sui_macros::{fail_point_async, replay_log};
127use sui_network::api::ValidatorServer;
128use sui_network::discovery;
129use sui_network::discovery::TrustedPeerChangeEvent;
130use sui_network::state_sync;
131use sui_network::validator::server::ServerBuilder;
132use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
133use sui_snapshot::uploader::StateSnapshotUploader;
134use sui_storage::{
135 http_key_value_store::HttpKVStore,
136 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
137 key_value_store_metrics::KeyValueStoreMetrics,
138};
139use sui_types::base_types::{AuthorityName, EpochId};
140use sui_types::committee::Committee;
141use sui_types::crypto::KeypairTraits;
142use sui_types::error::{SuiError, SuiResult};
143use sui_types::messages_consensus::{
144 AuthorityCapabilitiesV1, ConsensusTransaction, check_total_jwk_size,
145};
146use sui_types::sui_system_state::SuiSystemStateTrait;
147use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
148use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
149use sui_types::supported_protocol_versions::SupportedProtocolVersions;
150use typed_store::DBMetrics;
151use typed_store::rocks::default_db_options;
152
153use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
154
155pub mod admin;
156mod handle;
157pub mod metrics;
158
159pub struct ValidatorComponents {
160 validator_server_handle: SpawnOnce,
161 validator_overload_monitor_handle: Option<JoinHandle<()>>,
162 consensus_manager: Arc<ConsensusManager>,
163 consensus_store_pruner: ConsensusStorePruner,
164 consensus_adapter: Arc<ConsensusAdapter>,
165 checkpoint_metrics: Arc<CheckpointMetrics>,
166 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
167}
168pub struct P2pComponents {
169 p2p_network: Network,
170 known_peers: HashMap<PeerId, String>,
171 discovery_handle: discovery::Handle,
172 state_sync_handle: state_sync::Handle,
173 randomness_handle: randomness::Handle,
174}
175
176#[cfg(msim)]
177mod simulator {
178 use std::sync::atomic::AtomicBool;
179 use sui_types::error::SuiErrorKind;
180
181 use super::*;
182 pub(super) struct SimState {
183 pub sim_node: sui_simulator::runtime::NodeHandle,
184 pub sim_safe_mode_expected: AtomicBool,
185 _leak_detector: sui_simulator::NodeLeakDetector,
186 }
187
188 impl Default for SimState {
189 fn default() -> Self {
190 Self {
191 sim_node: sui_simulator::runtime::NodeHandle::current(),
192 sim_safe_mode_expected: AtomicBool::new(false),
193 _leak_detector: sui_simulator::NodeLeakDetector::new(),
194 }
195 }
196 }
197
198 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
199 + Send
200 + Sync
201 + 'static;
202
203 fn default_fetch_jwks(
204 _authority: AuthorityName,
205 _provider: &OIDCProvider,
206 ) -> SuiResult<Vec<(JwkId, JWK)>> {
207 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
208 parse_jwks(
210 sui_types::zk_login_util::DEFAULT_JWK_BYTES,
211 &OIDCProvider::Twitch,
212 true,
213 )
214 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
215 }
216
217 thread_local! {
218 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
219 }
220
221 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
222 JWK_INJECTOR.with(|injector| injector.borrow().clone())
223 }
224
225 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
226 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
227 }
228}
229
230#[cfg(msim)]
231pub use simulator::set_jwk_injector;
232#[cfg(msim)]
233use simulator::*;
234use sui_core::authority::authority_store_pruner::{ObjectsCompactionFilter, PrunerWatermarks};
235use sui_core::{
236 consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
237 validator_tx_finalizer::ValidatorTxFinalizer,
238};
239
240const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
241
242pub struct SuiNode {
243 config: NodeConfig,
244 validator_components: Mutex<Option<ValidatorComponents>>,
245
246 #[allow(unused)]
248 http_servers: HttpServers,
249
250 state: Arc<AuthorityState>,
251 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
252 registry_service: RegistryService,
253 metrics: Arc<SuiNodeMetrics>,
254 checkpoint_metrics: Arc<CheckpointMetrics>,
255
256 _discovery: discovery::Handle,
257 _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
258 state_sync_handle: state_sync::Handle,
259 randomness_handle: randomness::Handle,
260 checkpoint_store: Arc<CheckpointStore>,
261 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
262 connection_monitor_status: Arc<ConnectionMonitorStatus>,
263
264 end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
266
267 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
269
270 backpressure_manager: Arc<BackpressureManager>,
271
272 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
273
274 #[cfg(msim)]
275 sim_state: SimState,
276
277 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
278 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
280
281 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
286
287 subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
288}
289
290impl fmt::Debug for SuiNode {
291 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
292 f.debug_struct("SuiNode")
293 .field("name", &self.state.name.concise())
294 .finish()
295 }
296}
297
298static MAX_JWK_KEYS_PER_FETCH: usize = 100;
299
300impl SuiNode {
301 pub async fn start(
302 config: NodeConfig,
303 registry_service: RegistryService,
304 ) -> Result<Arc<SuiNode>> {
305 Self::start_async(
306 config,
307 registry_service,
308 ServerVersion::new("sui-node", "unknown"),
309 )
310 .await
311 }
312
313 fn start_jwk_updater(
314 config: &NodeConfig,
315 metrics: Arc<SuiNodeMetrics>,
316 authority: AuthorityName,
317 epoch_store: Arc<AuthorityPerEpochStore>,
318 consensus_adapter: Arc<ConsensusAdapter>,
319 ) {
320 let epoch = epoch_store.epoch();
321
322 let supported_providers = config
323 .zklogin_oauth_providers
324 .get(&epoch_store.get_chain_identifier().chain())
325 .unwrap_or(&BTreeSet::new())
326 .iter()
327 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
328 .collect::<Vec<_>>();
329
330 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
331
332 info!(
333 ?fetch_interval,
334 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
335 );
336
337 fn validate_jwk(
338 metrics: &Arc<SuiNodeMetrics>,
339 provider: &OIDCProvider,
340 id: &JwkId,
341 jwk: &JWK,
342 ) -> bool {
343 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
344 warn!(
345 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
346 id.iss, provider
347 );
348 metrics
349 .invalid_jwks
350 .with_label_values(&[&provider.to_string()])
351 .inc();
352 return false;
353 };
354
355 if iss_provider != *provider {
356 warn!(
357 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
358 id.iss, provider, iss_provider
359 );
360 metrics
361 .invalid_jwks
362 .with_label_values(&[&provider.to_string()])
363 .inc();
364 return false;
365 }
366
367 if !check_total_jwk_size(id, jwk) {
368 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
369 metrics
370 .invalid_jwks
371 .with_label_values(&[&provider.to_string()])
372 .inc();
373 return false;
374 }
375
376 true
377 }
378
379 for p in supported_providers.into_iter() {
388 let provider_str = p.to_string();
389 let epoch_store = epoch_store.clone();
390 let consensus_adapter = consensus_adapter.clone();
391 let metrics = metrics.clone();
392 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
393 async move {
394 let mut seen = HashSet::new();
397 loop {
398 info!("fetching JWK for provider {:?}", p);
399 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
400 match Self::fetch_jwks(authority, &p).await {
401 Err(e) => {
402 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
403 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
404 tokio::time::sleep(Duration::from_secs(30)).await;
406 continue;
407 }
408 Ok(mut keys) => {
409 metrics.total_jwks
410 .with_label_values(&[&provider_str])
411 .inc_by(keys.len() as u64);
412
413 keys.retain(|(id, jwk)| {
414 validate_jwk(&metrics, &p, id, jwk) &&
415 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
416 seen.insert((id.clone(), jwk.clone()))
417 });
418
419 metrics.unique_jwks
420 .with_label_values(&[&provider_str])
421 .inc_by(keys.len() as u64);
422
423 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
426 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
427 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
428 }
429
430 for (id, jwk) in keys.into_iter() {
431 info!("Submitting JWK to consensus: {:?}", id);
432
433 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
434 consensus_adapter.submit(txn, None, &epoch_store, None, None)
435 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
436 .ok();
437 }
438 }
439 }
440 tokio::time::sleep(fetch_interval).await;
441 }
442 }
443 .instrument(error_span!("jwk_updater_task", epoch)),
444 ));
445 }
446 }
447
448 pub async fn start_async(
449 config: NodeConfig,
450 registry_service: RegistryService,
451 server_version: ServerVersion,
452 ) -> Result<Arc<SuiNode>> {
453 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
454 let mut config = config.clone();
455 if config.supported_protocol_versions.is_none() {
456 info!(
457 "populating config.supported_protocol_versions with default {:?}",
458 SupportedProtocolVersions::SYSTEM_DEFAULT
459 );
460 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
461 }
462
463 let run_with_range = config.run_with_range;
464 let is_validator = config.consensus_config().is_some();
465 let is_full_node = !is_validator;
466 let prometheus_registry = registry_service.default_registry();
467
468 info!(node =? config.protocol_public_key(),
469 "Initializing sui-node listening on {}", config.network_address
470 );
471
472 DBMetrics::init(registry_service.clone());
474
475 mysten_metrics::init_metrics(&prometheus_registry);
477 #[cfg(not(msim))]
479 mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
480
481 let genesis = config.genesis()?.clone();
482
483 let secret = Arc::pin(config.protocol_key_pair().copy());
484 let genesis_committee = genesis.committee()?;
485 let committee_store = Arc::new(CommitteeStore::new(
486 config.db_path().join("epochs"),
487 &genesis_committee,
488 None,
489 ));
490
491 let pruner_watermarks = Arc::new(PrunerWatermarks::default());
492 let checkpoint_store = CheckpointStore::new(
493 &config.db_path().join("checkpoints"),
494 pruner_watermarks.clone(),
495 );
496 let checkpoint_metrics = CheckpointMetrics::new(®istry_service.default_registry());
497
498 Self::check_and_recover_forks(
499 &checkpoint_store,
500 &checkpoint_metrics,
501 is_validator,
502 config.fork_recovery.as_ref(),
503 )
504 .await?;
505
506 let mut pruner_db = None;
507 if config
508 .authority_store_pruning_config
509 .enable_compaction_filter
510 {
511 pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
512 &config.db_path().join("store"),
513 )));
514 }
515 let compaction_filter = pruner_db
516 .clone()
517 .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
518
519 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
521 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
522 enable_write_stall,
523 compaction_filter,
524 };
525 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
526 &config.db_path().join("store"),
527 Some(perpetual_tables_options),
528 Some(pruner_watermarks.epoch_id.clone()),
529 ));
530 let is_genesis = perpetual_tables
531 .database_is_empty()
532 .expect("Database read should not fail at init.");
533
534 let backpressure_manager =
535 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
536
537 let store =
538 AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
539
540 let cur_epoch = store.get_recovery_epoch_at_restart()?;
541 let committee = committee_store
542 .get_committee(&cur_epoch)?
543 .expect("Committee of the current epoch must exist");
544 let epoch_start_configuration = store
545 .get_epoch_start_configuration()?
546 .expect("EpochStartConfiguration of the current epoch must exist");
547 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
548 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
549
550 let cache_traits = build_execution_cache(
551 &config.execution_cache,
552 &prometheus_registry,
553 &store,
554 backpressure_manager.clone(),
555 );
556
557 let auth_agg = {
558 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
559 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
560 Arc::new(ArcSwap::new(Arc::new(
561 AuthorityAggregator::new_from_epoch_start_state(
562 epoch_start_configuration.epoch_start_state(),
563 &committee_store,
564 safe_client_metrics_base,
565 auth_agg_metrics,
566 ),
567 )))
568 };
569
570 let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
571 let chain = match config.chain_override_for_testing {
572 Some(chain) => chain,
573 None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
574 };
575
576 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
577 let epoch_store = AuthorityPerEpochStore::new(
578 config.protocol_public_key(),
579 committee.clone(),
580 &config.db_path().join("store"),
581 Some(epoch_options.options),
582 EpochMetrics::new(®istry_service.default_registry()),
583 epoch_start_configuration,
584 cache_traits.backing_package_store.clone(),
585 cache_traits.object_store.clone(),
586 cache_metrics,
587 signature_verifier_metrics,
588 &config.expensive_safety_check_config,
589 (chain_id, chain),
590 checkpoint_store
591 .get_highest_executed_checkpoint_seq_number()
592 .expect("checkpoint store read cannot fail")
593 .unwrap_or(0),
594 Arc::new(SubmittedTransactionCacheMetrics::new(
595 ®istry_service.default_registry(),
596 )),
597 )?;
598
599 info!("created epoch store");
600
601 replay_log!(
602 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
603 epoch_store.epoch(),
604 epoch_store.protocol_config()
605 );
606
607 if is_genesis {
609 info!("checking SUI conservation at genesis");
610 cache_traits
615 .reconfig_api
616 .expensive_check_sui_conservation(&epoch_store)
617 .expect("SUI conservation check cannot fail at genesis");
618 }
619
620 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
621 let default_buffer_stake = epoch_store
622 .protocol_config()
623 .buffer_stake_for_protocol_upgrade_bps();
624 if effective_buffer_stake != default_buffer_stake {
625 warn!(
626 ?effective_buffer_stake,
627 ?default_buffer_stake,
628 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
629 );
630 }
631
632 checkpoint_store.insert_genesis_checkpoint(
633 genesis.checkpoint(),
634 genesis.checkpoint_contents().clone(),
635 &epoch_store,
636 );
637
638 info!("creating state sync store");
639 let state_sync_store = RocksDbStore::new(
640 cache_traits.clone(),
641 committee_store.clone(),
642 checkpoint_store.clone(),
643 );
644
645 let index_store = if is_full_node && config.enable_index_processing {
646 info!("creating index store");
647 Some(Arc::new(IndexStore::new(
648 config.db_path().join("indexes"),
649 &prometheus_registry,
650 epoch_store
651 .protocol_config()
652 .max_move_identifier_len_as_option(),
653 config.remove_deprecated_tables,
654 )))
655 } else {
656 None
657 };
658
659 let rpc_index = if is_full_node && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
660 Some(Arc::new(
661 RpcIndexStore::new(
662 &config.db_path(),
663 &store,
664 &checkpoint_store,
665 &epoch_store,
666 &cache_traits.backing_package_store,
667 pruner_watermarks.checkpoint_id.clone(),
668 config.rpc().cloned().unwrap_or_default(),
669 )
670 .await,
671 ))
672 } else {
673 None
674 };
675
676 let chain_identifier = epoch_store.get_chain_identifier();
677
678 info!("creating archive reader");
679 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
683 let (randomness_tx, randomness_rx) = mpsc::channel(
684 config
685 .p2p_config
686 .randomness
687 .clone()
688 .unwrap_or_default()
689 .mailbox_capacity(),
690 );
691 let P2pComponents {
692 p2p_network,
693 known_peers,
694 discovery_handle,
695 state_sync_handle,
696 randomness_handle,
697 } = Self::create_p2p_network(
698 &config,
699 state_sync_store.clone(),
700 chain_identifier,
701 trusted_peer_change_rx,
702 randomness_tx,
703 &prometheus_registry,
704 )?;
705
706 send_trusted_peer_change(
709 &config,
710 &trusted_peer_change_tx,
711 epoch_store.epoch_start_state(),
712 )
713 .expect("Initial trusted peers must be set");
714
715 info!("start snapshot upload");
716 let state_snapshot_handle = Self::start_state_snapshot(
718 &config,
719 &prometheus_registry,
720 checkpoint_store.clone(),
721 chain_identifier,
722 )?;
723
724 info!("start db checkpoint");
726 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
727 &config,
728 &prometheus_registry,
729 state_snapshot_handle.is_some(),
730 )?;
731
732 if !epoch_store
733 .protocol_config()
734 .simplified_unwrap_then_delete()
735 {
736 config
738 .authority_store_pruning_config
739 .set_killswitch_tombstone_pruning(true);
740 }
741
742 let authority_name = config.protocol_public_key();
743 let validator_tx_finalizer =
744 config
745 .enable_validator_tx_finalizer
746 .then_some(Arc::new(ValidatorTxFinalizer::new(
747 auth_agg.clone(),
748 authority_name,
749 &prometheus_registry,
750 )));
751
752 info!("create authority state");
753 let state = AuthorityState::new(
754 authority_name,
755 secret,
756 config.supported_protocol_versions.unwrap(),
757 store.clone(),
758 cache_traits.clone(),
759 epoch_store.clone(),
760 committee_store.clone(),
761 index_store.clone(),
762 rpc_index,
763 checkpoint_store.clone(),
764 &prometheus_registry,
765 genesis.objects(),
766 &db_checkpoint_config,
767 config.clone(),
768 validator_tx_finalizer,
769 chain_identifier,
770 pruner_db,
771 config.policy_config.clone(),
772 config.firewall_config.clone(),
773 pruner_watermarks,
774 )
775 .await;
776 if epoch_store.epoch() == 0 {
778 let txn = &genesis.transaction();
779 let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
780 let transaction =
781 sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
782 sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
783 genesis.transaction().data().clone(),
784 sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
785 ),
786 );
787 state
788 .try_execute_immediately(
789 &transaction,
790 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
791 &epoch_store,
792 )
793 .instrument(span)
794 .await
795 .unwrap();
796 }
797
798 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
800
801 if config
802 .expensive_safety_check_config
803 .enable_secondary_index_checks()
804 && let Some(indexes) = state.indexes.clone()
805 {
806 sui_core::verify_indexes::verify_indexes(
807 state.get_global_state_hash_store().as_ref(),
808 indexes,
809 )
810 .expect("secondary indexes are inconsistent");
811 }
812
813 let (end_of_epoch_channel, end_of_epoch_receiver) =
814 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
815
816 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
817 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
818 auth_agg.load_full(),
819 state.clone(),
820 end_of_epoch_receiver,
821 &config.db_path(),
822 &prometheus_registry,
823 &config,
824 )))
825 } else {
826 None
827 };
828
829 let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
830 state.clone(),
831 state_sync_store,
832 &transaction_orchestrator.clone(),
833 &config,
834 &prometheus_registry,
835 server_version,
836 )
837 .await?;
838
839 let global_state_hasher = Arc::new(GlobalStateHasher::new(
840 cache_traits.global_state_hash_store.clone(),
841 GlobalStateHashMetrics::new(&prometheus_registry),
842 ));
843
844 let authority_names_to_peer_ids = epoch_store
845 .epoch_start_state()
846 .get_authority_names_to_peer_ids();
847
848 let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
849 "sui",
850 ®istry_service.default_registry(),
851 );
852
853 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
854
855 let connection_monitor_handle =
856 mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
857 p2p_network.downgrade(),
858 Arc::new(network_connection_metrics),
859 known_peers,
860 );
861
862 let connection_monitor_status = ConnectionMonitorStatus {
863 connection_statuses: connection_monitor_handle.connection_statuses(),
864 authority_names_to_peer_ids,
865 };
866
867 let connection_monitor_status = Arc::new(connection_monitor_status);
868 let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry()));
869
870 sui_node_metrics
871 .binary_max_protocol_version
872 .set(ProtocolVersion::MAX.as_u64() as i64);
873 sui_node_metrics
874 .configured_max_protocol_version
875 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
876
877 let validator_components = if state.is_validator(&epoch_store) {
878 let (components, _) = futures::join!(
879 Self::construct_validator_components(
880 config.clone(),
881 state.clone(),
882 committee,
883 epoch_store.clone(),
884 checkpoint_store.clone(),
885 state_sync_handle.clone(),
886 randomness_handle.clone(),
887 Arc::downgrade(&global_state_hasher),
888 backpressure_manager.clone(),
889 connection_monitor_status.clone(),
890 ®istry_service,
891 sui_node_metrics.clone(),
892 checkpoint_metrics.clone(),
893 ),
894 Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
895 );
896 let mut components = components?;
897
898 components.consensus_adapter.submit_recovered(&epoch_store);
899
900 components.validator_server_handle = components.validator_server_handle.start().await;
902
903 Some(components)
904 } else {
905 None
906 };
907
908 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
910
911 let node = Self {
912 config,
913 validator_components: Mutex::new(validator_components),
914 http_servers,
915 state,
916 transaction_orchestrator,
917 registry_service,
918 metrics: sui_node_metrics,
919 checkpoint_metrics,
920
921 _discovery: discovery_handle,
922 _connection_monitor_handle: connection_monitor_handle,
923 state_sync_handle,
924 randomness_handle,
925 checkpoint_store,
926 global_state_hasher: Mutex::new(Some(global_state_hasher)),
927 end_of_epoch_channel,
928 connection_monitor_status,
929 trusted_peer_change_tx,
930 backpressure_manager,
931
932 _db_checkpoint_handle: db_checkpoint_handle,
933
934 #[cfg(msim)]
935 sim_state: Default::default(),
936
937 _state_snapshot_uploader_handle: state_snapshot_handle,
938 shutdown_channel_tx: shutdown_channel,
939
940 auth_agg,
941 subscription_service_checkpoint_sender,
942 };
943
944 info!("SuiNode started!");
945 let node = Arc::new(node);
946 let node_copy = node.clone();
947 spawn_monitored_task!(async move {
948 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
949 if let Err(error) = result {
950 warn!("Reconfiguration finished with error {:?}", error);
951 }
952 });
953
954 Ok(node)
955 }
956
957 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
958 self.end_of_epoch_channel.subscribe()
959 }
960
961 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
962 self.shutdown_channel_tx.subscribe()
963 }
964
965 pub fn current_epoch_for_testing(&self) -> EpochId {
966 self.state.current_epoch_for_testing()
967 }
968
969 pub fn db_checkpoint_path(&self) -> PathBuf {
970 self.config.db_checkpoint_path()
971 }
972
973 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
975 info!("close_epoch (current epoch = {})", epoch_store.epoch());
976 self.validator_components
977 .lock()
978 .await
979 .as_ref()
980 .ok_or_else(|| SuiError::from("Node is not a validator"))?
981 .consensus_adapter
982 .close_epoch(epoch_store);
983 Ok(())
984 }
985
986 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
987 self.state
988 .clear_override_protocol_upgrade_buffer_stake(epoch)
989 }
990
991 pub fn set_override_protocol_upgrade_buffer_stake(
992 &self,
993 epoch: EpochId,
994 buffer_stake_bps: u64,
995 ) -> SuiResult {
996 self.state
997 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
998 }
999
1000 pub async fn close_epoch_for_testing(&self) -> SuiResult {
1003 let epoch_store = self.state.epoch_store_for_testing();
1004 self.close_epoch(&epoch_store).await
1005 }
1006
1007 fn start_state_snapshot(
1008 config: &NodeConfig,
1009 prometheus_registry: &Registry,
1010 checkpoint_store: Arc<CheckpointStore>,
1011 chain_identifier: ChainIdentifier,
1012 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1013 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1014 let snapshot_uploader = StateSnapshotUploader::new(
1015 &config.db_checkpoint_path(),
1016 &config.snapshot_path(),
1017 remote_store_config.clone(),
1018 60,
1019 prometheus_registry,
1020 checkpoint_store,
1021 chain_identifier,
1022 )?;
1023 Ok(Some(snapshot_uploader.start()))
1024 } else {
1025 Ok(None)
1026 }
1027 }
1028
1029 fn start_db_checkpoint(
1030 config: &NodeConfig,
1031 prometheus_registry: &Registry,
1032 state_snapshot_enabled: bool,
1033 ) -> Result<(
1034 DBCheckpointConfig,
1035 Option<tokio::sync::broadcast::Sender<()>>,
1036 )> {
1037 let checkpoint_path = Some(
1038 config
1039 .db_checkpoint_config
1040 .checkpoint_path
1041 .clone()
1042 .unwrap_or_else(|| config.db_checkpoint_path()),
1043 );
1044 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1045 DBCheckpointConfig {
1046 checkpoint_path,
1047 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1048 true
1049 } else {
1050 config
1051 .db_checkpoint_config
1052 .perform_db_checkpoints_at_epoch_end
1053 },
1054 ..config.db_checkpoint_config.clone()
1055 }
1056 } else {
1057 config.db_checkpoint_config.clone()
1058 };
1059
1060 match (
1061 db_checkpoint_config.object_store_config.as_ref(),
1062 state_snapshot_enabled,
1063 ) {
1064 (None, false) => Ok((db_checkpoint_config, None)),
1069 (_, _) => {
1070 let handler = DBCheckpointHandler::new(
1071 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1072 db_checkpoint_config.object_store_config.as_ref(),
1073 60,
1074 db_checkpoint_config
1075 .prune_and_compact_before_upload
1076 .unwrap_or(true),
1077 config.authority_store_pruning_config.clone(),
1078 prometheus_registry,
1079 state_snapshot_enabled,
1080 )?;
1081 Ok((
1082 db_checkpoint_config,
1083 Some(DBCheckpointHandler::start(handler)),
1084 ))
1085 }
1086 }
1087 }
1088
1089 fn create_p2p_network(
1090 config: &NodeConfig,
1091 state_sync_store: RocksDbStore,
1092 chain_identifier: ChainIdentifier,
1093 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1094 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1095 prometheus_registry: &Registry,
1096 ) -> Result<P2pComponents> {
1097 let (state_sync, state_sync_server) = state_sync::Builder::new()
1098 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1099 .store(state_sync_store)
1100 .archive_config(config.archive_reader_config())
1101 .with_metrics(prometheus_registry)
1102 .build();
1103
1104 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1105 .config(config.p2p_config.clone())
1106 .build();
1107
1108 let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1109 let known_peers: HashMap<PeerId, String> = discovery_config
1110 .allowlisted_peers
1111 .clone()
1112 .into_iter()
1113 .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1114 .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1115 peer.peer_id
1116 .map(|peer_id| (peer_id, "seed_peer".to_string()))
1117 }))
1118 .collect();
1119
1120 let (randomness, randomness_router) =
1121 randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1122 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1123 .with_metrics(prometheus_registry)
1124 .build();
1125
1126 let p2p_network = {
1127 let routes = anemo::Router::new()
1128 .add_rpc_service(discovery_server)
1129 .add_rpc_service(state_sync_server);
1130 let routes = routes.merge(randomness_router);
1131
1132 let inbound_network_metrics =
1133 mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1134 let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1135 "sui",
1136 "outbound",
1137 prometheus_registry,
1138 );
1139
1140 let service = ServiceBuilder::new()
1141 .layer(
1142 TraceLayer::new_for_server_errors()
1143 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1144 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1145 )
1146 .layer(CallbackLayer::new(
1147 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1148 Arc::new(inbound_network_metrics),
1149 config.p2p_config.excessive_message_size(),
1150 ),
1151 ))
1152 .service(routes);
1153
1154 let outbound_layer = ServiceBuilder::new()
1155 .layer(
1156 TraceLayer::new_for_client_and_server_errors()
1157 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1158 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1159 )
1160 .layer(CallbackLayer::new(
1161 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1162 Arc::new(outbound_network_metrics),
1163 config.p2p_config.excessive_message_size(),
1164 ),
1165 ))
1166 .into_inner();
1167
1168 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1169 anemo_config.max_frame_size = Some(1 << 30);
1172
1173 let mut quic_config = anemo_config.quic.unwrap_or_default();
1176 if quic_config.socket_send_buffer_size.is_none() {
1177 quic_config.socket_send_buffer_size = Some(20 << 20);
1178 }
1179 if quic_config.socket_receive_buffer_size.is_none() {
1180 quic_config.socket_receive_buffer_size = Some(20 << 20);
1181 }
1182 quic_config.allow_failed_socket_buffer_size_setting = true;
1183
1184 if quic_config.max_concurrent_bidi_streams.is_none() {
1187 quic_config.max_concurrent_bidi_streams = Some(500);
1188 }
1189 if quic_config.max_concurrent_uni_streams.is_none() {
1190 quic_config.max_concurrent_uni_streams = Some(500);
1191 }
1192 if quic_config.stream_receive_window.is_none() {
1193 quic_config.stream_receive_window = Some(100 << 20);
1194 }
1195 if quic_config.receive_window.is_none() {
1196 quic_config.receive_window = Some(200 << 20);
1197 }
1198 if quic_config.send_window.is_none() {
1199 quic_config.send_window = Some(200 << 20);
1200 }
1201 if quic_config.crypto_buffer_size.is_none() {
1202 quic_config.crypto_buffer_size = Some(1 << 20);
1203 }
1204 if quic_config.max_idle_timeout_ms.is_none() {
1205 quic_config.max_idle_timeout_ms = Some(30_000);
1206 }
1207 if quic_config.keep_alive_interval_ms.is_none() {
1208 quic_config.keep_alive_interval_ms = Some(5_000);
1209 }
1210 anemo_config.quic = Some(quic_config);
1211
1212 let server_name = format!("sui-{}", chain_identifier);
1213 let network = Network::bind(config.p2p_config.listen_address)
1214 .server_name(&server_name)
1215 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1216 .config(anemo_config)
1217 .outbound_request_layer(outbound_layer)
1218 .start(service)?;
1219 info!(
1220 server_name = server_name,
1221 "P2p network started on {}",
1222 network.local_addr()
1223 );
1224
1225 network
1226 };
1227
1228 let discovery_handle =
1229 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1230 let state_sync_handle = state_sync.start(p2p_network.clone());
1231 let randomness_handle = randomness.start(p2p_network.clone());
1232
1233 Ok(P2pComponents {
1234 p2p_network,
1235 known_peers,
1236 discovery_handle,
1237 state_sync_handle,
1238 randomness_handle,
1239 })
1240 }
1241
1242 async fn construct_validator_components(
1243 config: NodeConfig,
1244 state: Arc<AuthorityState>,
1245 committee: Arc<Committee>,
1246 epoch_store: Arc<AuthorityPerEpochStore>,
1247 checkpoint_store: Arc<CheckpointStore>,
1248 state_sync_handle: state_sync::Handle,
1249 randomness_handle: randomness::Handle,
1250 global_state_hasher: Weak<GlobalStateHasher>,
1251 backpressure_manager: Arc<BackpressureManager>,
1252 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1253 registry_service: &RegistryService,
1254 sui_node_metrics: Arc<SuiNodeMetrics>,
1255 checkpoint_metrics: Arc<CheckpointMetrics>,
1256 ) -> Result<ValidatorComponents> {
1257 let mut config_clone = config.clone();
1258 let consensus_config = config_clone
1259 .consensus_config
1260 .as_mut()
1261 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1262
1263 let client = Arc::new(UpdatableConsensusClient::new());
1264 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1265 &committee,
1266 consensus_config,
1267 state.name,
1268 connection_monitor_status.clone(),
1269 ®istry_service.default_registry(),
1270 epoch_store.protocol_config().clone(),
1271 client.clone(),
1272 checkpoint_store.clone(),
1273 ));
1274 let consensus_manager = Arc::new(ConsensusManager::new(
1275 &config,
1276 consensus_config,
1277 registry_service,
1278 client,
1279 ));
1280
1281 let consensus_store_pruner = ConsensusStorePruner::new(
1283 consensus_manager.get_storage_base_path(),
1284 consensus_config.db_retention_epochs(),
1285 consensus_config.db_pruner_period(),
1286 ®istry_service.default_registry(),
1287 );
1288
1289 let sui_tx_validator_metrics =
1290 SuiTxValidatorMetrics::new(®istry_service.default_registry());
1291
1292 let validator_server_handle = Self::start_grpc_validator_service(
1293 &config,
1294 state.clone(),
1295 consensus_adapter.clone(),
1296 ®istry_service.default_registry(),
1297 )
1298 .await?;
1299
1300 let validator_overload_monitor_handle = if config
1303 .authority_overload_config
1304 .max_load_shedding_percentage
1305 > 0
1306 {
1307 let authority_state = Arc::downgrade(&state);
1308 let overload_config = config.authority_overload_config.clone();
1309 fail_point!("starting_overload_monitor");
1310 Some(spawn_monitored_task!(overload_monitor(
1311 authority_state,
1312 overload_config,
1313 )))
1314 } else {
1315 None
1316 };
1317
1318 Self::start_epoch_specific_validator_components(
1319 &config,
1320 state.clone(),
1321 consensus_adapter,
1322 checkpoint_store,
1323 epoch_store,
1324 state_sync_handle,
1325 randomness_handle,
1326 consensus_manager,
1327 consensus_store_pruner,
1328 global_state_hasher,
1329 backpressure_manager,
1330 validator_server_handle,
1331 validator_overload_monitor_handle,
1332 checkpoint_metrics,
1333 sui_node_metrics,
1334 sui_tx_validator_metrics,
1335 )
1336 .await
1337 }
1338
1339 async fn start_epoch_specific_validator_components(
1340 config: &NodeConfig,
1341 state: Arc<AuthorityState>,
1342 consensus_adapter: Arc<ConsensusAdapter>,
1343 checkpoint_store: Arc<CheckpointStore>,
1344 epoch_store: Arc<AuthorityPerEpochStore>,
1345 state_sync_handle: state_sync::Handle,
1346 randomness_handle: randomness::Handle,
1347 consensus_manager: Arc<ConsensusManager>,
1348 consensus_store_pruner: ConsensusStorePruner,
1349 state_hasher: Weak<GlobalStateHasher>,
1350 backpressure_manager: Arc<BackpressureManager>,
1351 validator_server_handle: SpawnOnce,
1352 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1353 checkpoint_metrics: Arc<CheckpointMetrics>,
1354 sui_node_metrics: Arc<SuiNodeMetrics>,
1355 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1356 ) -> Result<ValidatorComponents> {
1357 let checkpoint_service = Self::build_checkpoint_service(
1358 config,
1359 consensus_adapter.clone(),
1360 checkpoint_store.clone(),
1361 epoch_store.clone(),
1362 state.clone(),
1363 state_sync_handle,
1364 state_hasher,
1365 checkpoint_metrics.clone(),
1366 );
1367
1368 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1372
1373 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1374
1375 if epoch_store.randomness_state_enabled() {
1376 let randomness_manager = RandomnessManager::try_new(
1377 Arc::downgrade(&epoch_store),
1378 Box::new(consensus_adapter.clone()),
1379 randomness_handle,
1380 config.protocol_key_pair(),
1381 )
1382 .await;
1383 if let Some(randomness_manager) = randomness_manager {
1384 epoch_store
1385 .set_randomness_manager(randomness_manager)
1386 .await?;
1387 }
1388 }
1389
1390 ExecutionTimeObserver::spawn(
1391 epoch_store.clone(),
1392 Box::new(consensus_adapter.clone()),
1393 config
1394 .execution_time_observer_config
1395 .clone()
1396 .unwrap_or_default(),
1397 );
1398
1399 let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1400 None,
1401 state.metrics.clone(),
1402 ));
1403
1404 let throughput_profiler = Arc::new(ConsensusThroughputProfiler::new(
1405 throughput_calculator.clone(),
1406 None,
1407 None,
1408 state.metrics.clone(),
1409 ThroughputProfileRanges::from_chain(epoch_store.get_chain_identifier()),
1410 ));
1411
1412 consensus_adapter.swap_throughput_profiler(throughput_profiler);
1413
1414 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1415 state.clone(),
1416 checkpoint_service.clone(),
1417 epoch_store.clone(),
1418 consensus_adapter.clone(),
1419 low_scoring_authorities,
1420 throughput_calculator,
1421 backpressure_manager,
1422 );
1423
1424 info!("Starting consensus manager asynchronously");
1425
1426 tokio::spawn({
1428 let config = config.clone();
1429 let epoch_store = epoch_store.clone();
1430 let sui_tx_validator = SuiTxValidator::new(
1431 state.clone(),
1432 consensus_adapter.clone(),
1433 checkpoint_service.clone(),
1434 sui_tx_validator_metrics.clone(),
1435 );
1436 let consensus_manager = consensus_manager.clone();
1437 async move {
1438 consensus_manager
1439 .start(
1440 &config,
1441 epoch_store,
1442 consensus_handler_initializer,
1443 sui_tx_validator,
1444 )
1445 .await;
1446 }
1447 });
1448 let replay_waiter = consensus_manager.replay_waiter();
1449
1450 info!("Spawning checkpoint service");
1451 let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1452 None
1453 } else {
1454 Some(replay_waiter)
1455 };
1456 checkpoint_service
1457 .spawn(epoch_store.clone(), replay_waiter)
1458 .await;
1459
1460 if epoch_store.authenticator_state_enabled() {
1461 Self::start_jwk_updater(
1462 config,
1463 sui_node_metrics,
1464 state.name,
1465 epoch_store.clone(),
1466 consensus_adapter.clone(),
1467 );
1468 }
1469
1470 Ok(ValidatorComponents {
1471 validator_server_handle,
1472 validator_overload_monitor_handle,
1473 consensus_manager,
1474 consensus_store_pruner,
1475 consensus_adapter,
1476 checkpoint_metrics,
1477 sui_tx_validator_metrics,
1478 })
1479 }
1480
1481 fn build_checkpoint_service(
1482 config: &NodeConfig,
1483 consensus_adapter: Arc<ConsensusAdapter>,
1484 checkpoint_store: Arc<CheckpointStore>,
1485 epoch_store: Arc<AuthorityPerEpochStore>,
1486 state: Arc<AuthorityState>,
1487 state_sync_handle: state_sync::Handle,
1488 state_hasher: Weak<GlobalStateHasher>,
1489 checkpoint_metrics: Arc<CheckpointMetrics>,
1490 ) -> Arc<CheckpointService> {
1491 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1492 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1493
1494 debug!(
1495 "Starting checkpoint service with epoch start timestamp {}
1496 and epoch duration {}",
1497 epoch_start_timestamp_ms, epoch_duration_ms
1498 );
1499
1500 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1501 sender: consensus_adapter,
1502 signer: state.secret.clone(),
1503 authority: config.protocol_public_key(),
1504 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1505 .checked_add(epoch_duration_ms)
1506 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1507 metrics: checkpoint_metrics.clone(),
1508 });
1509
1510 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1511 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1512 let max_checkpoint_size_bytes =
1513 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1514
1515 CheckpointService::build(
1516 state.clone(),
1517 checkpoint_store,
1518 epoch_store,
1519 state.get_transaction_cache_reader().clone(),
1520 state_hasher,
1521 checkpoint_output,
1522 Box::new(certified_checkpoint_output),
1523 checkpoint_metrics,
1524 max_tx_per_checkpoint,
1525 max_checkpoint_size_bytes,
1526 )
1527 }
1528
1529 fn construct_consensus_adapter(
1530 committee: &Committee,
1531 consensus_config: &ConsensusConfig,
1532 authority: AuthorityName,
1533 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1534 prometheus_registry: &Registry,
1535 protocol_config: ProtocolConfig,
1536 consensus_client: Arc<dyn ConsensusClient>,
1537 checkpoint_store: Arc<CheckpointStore>,
1538 ) -> ConsensusAdapter {
1539 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1540 ConsensusAdapter::new(
1543 consensus_client,
1544 checkpoint_store,
1545 authority,
1546 connection_monitor_status,
1547 consensus_config.max_pending_transactions(),
1548 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1549 consensus_config.max_submit_position,
1550 consensus_config.submit_delay_step_override(),
1551 ca_metrics,
1552 protocol_config,
1553 )
1554 }
1555
1556 async fn start_grpc_validator_service(
1557 config: &NodeConfig,
1558 state: Arc<AuthorityState>,
1559 consensus_adapter: Arc<ConsensusAdapter>,
1560 prometheus_registry: &Registry,
1561 ) -> Result<SpawnOnce> {
1562 let validator_service = ValidatorService::new(
1563 state.clone(),
1564 consensus_adapter,
1565 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1566 config.policy_config.clone().map(|p| p.client_id_source),
1567 );
1568
1569 let mut server_conf = mysten_network::config::Config::new();
1570 server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1571 server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1572 server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1573 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1574 server_conf.load_shed = config.grpc_load_shed;
1575 let mut server_builder =
1576 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1577
1578 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1579
1580 let tls_config = sui_tls::create_rustls_server_config(
1581 config.network_key_pair().copy().private(),
1582 SUI_TLS_SERVER_NAME.to_string(),
1583 );
1584
1585 let network_address = config.network_address().clone();
1586
1587 let (ready_tx, ready_rx) = oneshot::channel();
1588
1589 Ok(SpawnOnce::new(ready_rx, async move {
1590 let server = server_builder
1591 .bind(&network_address, Some(tls_config))
1592 .await
1593 .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1594 let local_addr = server.local_addr();
1595 info!("Listening to traffic on {local_addr}");
1596 ready_tx.send(()).unwrap();
1597 if let Err(err) = server.serve().await {
1598 info!("Server stopped: {err}");
1599 }
1600 info!("Server stopped");
1601 }))
1602 }
1603
1604 async fn reexecute_pending_consensus_certs(
1618 epoch_store: &Arc<AuthorityPerEpochStore>,
1619 state: &Arc<AuthorityState>,
1620 ) {
1621 let mut pending_consensus_certificates = Vec::new();
1622 let mut additional_certs = Vec::new();
1623
1624 for tx in epoch_store.get_all_pending_consensus_transactions() {
1625 match tx.kind {
1626 ConsensusTransactionKind::CertifiedTransaction(tx) if !tx.is_consensus_tx() => {
1629 let tx = *tx;
1630 let tx = VerifiedExecutableTransaction::new_from_certificate(
1633 VerifiedCertificate::new_unchecked(tx),
1634 );
1635 if let Some(fx_digest) = epoch_store
1638 .get_signed_effects_digest(tx.digest())
1639 .expect("db error")
1640 {
1641 pending_consensus_certificates.push((
1642 Schedulable::Transaction(tx),
1643 ExecutionEnv::new().with_expected_effects_digest(fx_digest),
1644 ));
1645 } else {
1646 additional_certs.push((
1647 Schedulable::Transaction(tx),
1648 ExecutionEnv::new()
1649 .with_scheduling_source(SchedulingSource::NonFastPath),
1650 ));
1651 }
1652 }
1653 _ => (),
1654 }
1655 }
1656
1657 let digests = pending_consensus_certificates
1658 .iter()
1659 .map(|(tx, _)| *tx.key().unwrap_digest())
1661 .collect::<Vec<_>>();
1662
1663 info!(
1664 "reexecuting {} pending consensus certificates: {:?}",
1665 digests.len(),
1666 digests
1667 );
1668
1669 state
1670 .execution_scheduler()
1671 .enqueue(pending_consensus_certificates, epoch_store);
1672 state
1673 .execution_scheduler()
1674 .enqueue(additional_certs, epoch_store);
1675
1676 let timeout = if cfg!(msim) { 120 } else { 60 };
1681 if tokio::time::timeout(
1682 std::time::Duration::from_secs(timeout),
1683 state
1684 .get_transaction_cache_reader()
1685 .notify_read_executed_effects_digests(
1686 "SuiNode::notify_read_executed_effects_digests",
1687 &digests,
1688 ),
1689 )
1690 .await
1691 .is_err()
1692 {
1693 let executed_effects_digests = state
1695 .get_transaction_cache_reader()
1696 .multi_get_executed_effects_digests(&digests);
1697 let pending_digests = digests
1698 .iter()
1699 .zip(executed_effects_digests.iter())
1700 .filter_map(|(digest, executed_effects_digest)| {
1701 if executed_effects_digest.is_none() {
1702 Some(digest)
1703 } else {
1704 None
1705 }
1706 })
1707 .collect::<Vec<_>>();
1708 debug_fatal!(
1709 "Timed out waiting for effects digests to be executed: {:?}",
1710 pending_digests
1711 );
1712 }
1713 }
1714
1715 pub fn state(&self) -> Arc<AuthorityState> {
1716 self.state.clone()
1717 }
1718
1719 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1721 self.state.reference_gas_price_for_testing()
1722 }
1723
1724 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1725 self.state.committee_store().clone()
1726 }
1727
1728 pub fn clone_authority_aggregator(
1739 &self,
1740 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1741 self.transaction_orchestrator
1742 .as_ref()
1743 .map(|to| to.clone_authority_aggregator())
1744 }
1745
1746 pub fn transaction_orchestrator(
1747 &self,
1748 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1749 self.transaction_orchestrator.clone()
1750 }
1751
1752 pub async fn monitor_reconfiguration(
1755 self: Arc<Self>,
1756 mut epoch_store: Arc<AuthorityPerEpochStore>,
1757 ) -> Result<()> {
1758 let checkpoint_executor_metrics =
1759 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1760
1761 loop {
1762 let mut hasher_guard = self.global_state_hasher.lock().await;
1763 let hasher = hasher_guard.take().unwrap();
1764 info!(
1765 "Creating checkpoint executor for epoch {}",
1766 epoch_store.epoch()
1767 );
1768 let checkpoint_executor = CheckpointExecutor::new(
1769 epoch_store.clone(),
1770 self.checkpoint_store.clone(),
1771 self.state.clone(),
1772 hasher.clone(),
1773 self.backpressure_manager.clone(),
1774 self.config.checkpoint_executor_config.clone(),
1775 checkpoint_executor_metrics.clone(),
1776 self.subscription_service_checkpoint_sender.clone(),
1777 );
1778
1779 let run_with_range = self.config.run_with_range;
1780
1781 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1782
1783 self.metrics
1785 .current_protocol_version
1786 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1787
1788 if let Some(components) = &*self.validator_components.lock().await {
1790 tokio::time::sleep(Duration::from_millis(1)).await;
1792
1793 let config = cur_epoch_store.protocol_config();
1794 let mut supported_protocol_versions = self
1795 .config
1796 .supported_protocol_versions
1797 .expect("Supported versions should be populated")
1798 .truncate_below(config.version);
1800
1801 while supported_protocol_versions.max > config.version {
1802 let proposed_protocol_config = ProtocolConfig::get_for_version(
1803 supported_protocol_versions.max,
1804 cur_epoch_store.get_chain(),
1805 );
1806
1807 if proposed_protocol_config.enable_accumulators()
1808 && !epoch_store.accumulator_root_exists()
1809 {
1810 error!(
1811 "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1812 supported_protocol_versions.max
1813 );
1814 supported_protocol_versions.max = supported_protocol_versions.max.prev();
1815 } else {
1816 break;
1817 }
1818 }
1819
1820 let binary_config = config.binary_config(None);
1821 let transaction = if config.authority_capabilities_v2() {
1822 ConsensusTransaction::new_capability_notification_v2(
1823 AuthorityCapabilitiesV2::new(
1824 self.state.name,
1825 cur_epoch_store.get_chain_identifier().chain(),
1826 supported_protocol_versions,
1827 self.state
1828 .get_available_system_packages(&binary_config)
1829 .await,
1830 ),
1831 )
1832 } else {
1833 ConsensusTransaction::new_capability_notification(AuthorityCapabilitiesV1::new(
1834 self.state.name,
1835 self.config
1836 .supported_protocol_versions
1837 .expect("Supported versions should be populated"),
1838 self.state
1839 .get_available_system_packages(&binary_config)
1840 .await,
1841 ))
1842 };
1843 info!(?transaction, "submitting capabilities to consensus");
1844 components.consensus_adapter.submit(
1845 transaction,
1846 None,
1847 &cur_epoch_store,
1848 None,
1849 None,
1850 )?;
1851 }
1852
1853 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1854
1855 if stop_condition == StopReason::RunWithRangeCondition {
1856 SuiNode::shutdown(&self).await;
1857 self.shutdown_channel_tx
1858 .send(run_with_range)
1859 .expect("RunWithRangeCondition met but failed to send shutdown message");
1860 return Ok(());
1861 }
1862
1863 let latest_system_state = self
1865 .state
1866 .get_object_cache_reader()
1867 .get_sui_system_state_object_unsafe()
1868 .expect("Read Sui System State object cannot fail");
1869
1870 #[cfg(msim)]
1871 if !self
1872 .sim_state
1873 .sim_safe_mode_expected
1874 .load(Ordering::Relaxed)
1875 {
1876 debug_assert!(!latest_system_state.safe_mode());
1877 }
1878
1879 #[cfg(not(msim))]
1880 debug_assert!(!latest_system_state.safe_mode());
1881
1882 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1883 && self.state.is_fullnode(&cur_epoch_store)
1884 {
1885 warn!(
1886 "Failed to send end of epoch notification to subscriber: {:?}",
1887 err
1888 );
1889 }
1890
1891 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1892 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1893
1894 self.auth_agg.store(Arc::new(
1895 self.auth_agg
1896 .load()
1897 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1898 ));
1899
1900 let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1901 let next_epoch = next_epoch_committee.epoch();
1902 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1903
1904 info!(
1905 next_epoch,
1906 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1907 );
1908
1909 fail_point_async!("reconfig_delay");
1910
1911 let authority_names_to_peer_ids =
1916 new_epoch_start_state.get_authority_names_to_peer_ids();
1917 self.connection_monitor_status
1918 .update_mapping_for_epoch(authority_names_to_peer_ids);
1919
1920 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1921
1922 let _ = send_trusted_peer_change(
1923 &self.config,
1924 &self.trusted_peer_change_tx,
1925 &new_epoch_start_state,
1926 );
1927
1928 let mut validator_components_lock_guard = self.validator_components.lock().await;
1929
1930 let new_epoch_store = self
1934 .reconfigure_state(
1935 &self.state,
1936 &cur_epoch_store,
1937 next_epoch_committee.clone(),
1938 new_epoch_start_state,
1939 hasher.clone(),
1940 )
1941 .await;
1942
1943 let new_validator_components = if let Some(ValidatorComponents {
1944 validator_server_handle,
1945 validator_overload_monitor_handle,
1946 consensus_manager,
1947 consensus_store_pruner,
1948 consensus_adapter,
1949 checkpoint_metrics,
1950 sui_tx_validator_metrics,
1951 }) = validator_components_lock_guard.take()
1952 {
1953 info!("Reconfiguring the validator.");
1954
1955 consensus_manager.shutdown().await;
1956 info!("Consensus has shut down.");
1957
1958 info!("Epoch store finished reconfiguration.");
1959
1960 let global_state_hasher_metrics = Arc::into_inner(hasher)
1963 .expect("Object state hasher should have no other references at this point")
1964 .metrics();
1965 let new_hasher = Arc::new(GlobalStateHasher::new(
1966 self.state.get_global_state_hash_store().clone(),
1967 global_state_hasher_metrics,
1968 ));
1969 let weak_hasher = Arc::downgrade(&new_hasher);
1970 *hasher_guard = Some(new_hasher);
1971
1972 consensus_store_pruner.prune(next_epoch).await;
1973
1974 if self.state.is_validator(&new_epoch_store) {
1975 Some(
1977 Self::start_epoch_specific_validator_components(
1978 &self.config,
1979 self.state.clone(),
1980 consensus_adapter,
1981 self.checkpoint_store.clone(),
1982 new_epoch_store.clone(),
1983 self.state_sync_handle.clone(),
1984 self.randomness_handle.clone(),
1985 consensus_manager,
1986 consensus_store_pruner,
1987 weak_hasher,
1988 self.backpressure_manager.clone(),
1989 validator_server_handle,
1990 validator_overload_monitor_handle,
1991 checkpoint_metrics,
1992 self.metrics.clone(),
1993 sui_tx_validator_metrics,
1994 )
1995 .await?,
1996 )
1997 } else {
1998 info!("This node is no longer a validator after reconfiguration");
1999 None
2000 }
2001 } else {
2002 let global_state_hasher_metrics = Arc::into_inner(hasher)
2005 .expect("Object state hasher should have no other references at this point")
2006 .metrics();
2007 let new_hasher = Arc::new(GlobalStateHasher::new(
2008 self.state.get_global_state_hash_store().clone(),
2009 global_state_hasher_metrics,
2010 ));
2011 let weak_hasher = Arc::downgrade(&new_hasher);
2012 *hasher_guard = Some(new_hasher);
2013
2014 if self.state.is_validator(&new_epoch_store) {
2015 info!("Promoting the node from fullnode to validator, starting grpc server");
2016
2017 let mut components = Self::construct_validator_components(
2018 self.config.clone(),
2019 self.state.clone(),
2020 Arc::new(next_epoch_committee.clone()),
2021 new_epoch_store.clone(),
2022 self.checkpoint_store.clone(),
2023 self.state_sync_handle.clone(),
2024 self.randomness_handle.clone(),
2025 weak_hasher,
2026 self.backpressure_manager.clone(),
2027 self.connection_monitor_status.clone(),
2028 &self.registry_service,
2029 self.metrics.clone(),
2030 self.checkpoint_metrics.clone(),
2031 )
2032 .await?;
2033
2034 components.validator_server_handle =
2035 components.validator_server_handle.start().await;
2036
2037 Some(components)
2038 } else {
2039 None
2040 }
2041 };
2042 *validator_components_lock_guard = new_validator_components;
2043
2044 cur_epoch_store.release_db_handles();
2047
2048 if cfg!(msim)
2049 && !matches!(
2050 self.config
2051 .authority_store_pruning_config
2052 .num_epochs_to_retain_for_checkpoints(),
2053 None | Some(u64::MAX) | Some(0)
2054 )
2055 {
2056 self.state
2057 .prune_checkpoints_for_eligible_epochs_for_testing(
2058 self.config.clone(),
2059 sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2060 )
2061 .await?;
2062 }
2063
2064 epoch_store = new_epoch_store;
2065 info!("Reconfiguration finished");
2066 }
2067 }
2068
2069 async fn shutdown(&self) {
2070 if let Some(validator_components) = &*self.validator_components.lock().await {
2071 validator_components.consensus_manager.shutdown().await;
2072 }
2073 }
2074
2075 async fn reconfigure_state(
2076 &self,
2077 state: &Arc<AuthorityState>,
2078 cur_epoch_store: &AuthorityPerEpochStore,
2079 next_epoch_committee: Committee,
2080 next_epoch_start_system_state: EpochStartSystemState,
2081 global_state_hasher: Arc<GlobalStateHasher>,
2082 ) -> Arc<AuthorityPerEpochStore> {
2083 let next_epoch = next_epoch_committee.epoch();
2084
2085 let last_checkpoint = self
2086 .checkpoint_store
2087 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2088 .expect("Error loading last checkpoint for current epoch")
2089 .expect("Could not load last checkpoint for current epoch");
2090
2091 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2092
2093 assert_eq!(
2094 Some(last_checkpoint_seq),
2095 self.checkpoint_store
2096 .get_highest_executed_checkpoint_seq_number()
2097 .expect("Error loading highest executed checkpoint sequence number")
2098 );
2099
2100 let epoch_start_configuration = EpochStartConfiguration::new(
2101 next_epoch_start_system_state,
2102 *last_checkpoint.digest(),
2103 state.get_object_store().as_ref(),
2104 EpochFlag::default_flags_for_new_epoch(&state.config),
2105 )
2106 .expect("EpochStartConfiguration construction cannot fail");
2107
2108 let new_epoch_store = self
2109 .state
2110 .reconfigure(
2111 cur_epoch_store,
2112 self.config.supported_protocol_versions.unwrap(),
2113 next_epoch_committee,
2114 epoch_start_configuration,
2115 global_state_hasher,
2116 &self.config.expensive_safety_check_config,
2117 last_checkpoint_seq,
2118 )
2119 .await
2120 .expect("Reconfigure authority state cannot fail");
2121 info!(next_epoch, "Node State has been reconfigured");
2122 assert_eq!(next_epoch, new_epoch_store.epoch());
2123 self.state.get_reconfig_api().update_epoch_flags_metrics(
2124 cur_epoch_store.epoch_start_config().flags(),
2125 new_epoch_store.epoch_start_config().flags(),
2126 );
2127
2128 new_epoch_store
2129 }
2130
2131 pub fn get_config(&self) -> &NodeConfig {
2132 &self.config
2133 }
2134
2135 pub fn randomness_handle(&self) -> randomness::Handle {
2136 self.randomness_handle.clone()
2137 }
2138
2139 fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2141 let digest_str = digest.to_string();
2142 if digest_str.len() >= 8 {
2143 digest_str[0..8].to_string()
2144 } else {
2145 digest_str
2146 }
2147 }
2148
2149 async fn check_and_recover_forks(
2153 checkpoint_store: &CheckpointStore,
2154 checkpoint_metrics: &CheckpointMetrics,
2155 is_validator: bool,
2156 fork_recovery: Option<&ForkRecoveryConfig>,
2157 ) -> Result<()> {
2158 if !is_validator {
2161 return Ok(());
2162 }
2163
2164 if let Some(recovery) = fork_recovery {
2166 Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2167 Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2168 }
2169
2170 if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2171 .get_checkpoint_fork_detected()
2172 .map_err(|e| {
2173 error!("Failed to check for checkpoint fork: {:?}", e);
2174 e
2175 })?
2176 {
2177 Self::handle_checkpoint_fork(
2178 checkpoint_seq,
2179 checkpoint_digest,
2180 checkpoint_metrics,
2181 fork_recovery,
2182 )
2183 .await?;
2184 }
2185 if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2186 .get_transaction_fork_detected()
2187 .map_err(|e| {
2188 error!("Failed to check for transaction fork: {:?}", e);
2189 e
2190 })?
2191 {
2192 Self::handle_transaction_fork(
2193 tx_digest,
2194 expected_effects,
2195 actual_effects,
2196 checkpoint_metrics,
2197 fork_recovery,
2198 )
2199 .await?;
2200 }
2201
2202 Ok(())
2203 }
2204
2205 fn try_recover_checkpoint_fork(
2206 checkpoint_store: &CheckpointStore,
2207 recovery: &ForkRecoveryConfig,
2208 ) -> Result<()> {
2209 for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2212 let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2213 anyhow::bail!(
2214 "Invalid checkpoint digest override for seq {}: {}",
2215 seq,
2216 expected_digest_str
2217 );
2218 };
2219
2220 if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2221 let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2222 if local_digest != expected_digest {
2223 info!(
2224 seq,
2225 local = %Self::get_digest_prefix(local_digest),
2226 expected = %Self::get_digest_prefix(expected_digest),
2227 "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2228 seq
2229 );
2230 checkpoint_store
2231 .clear_locally_computed_checkpoints_from(*seq)
2232 .context(
2233 "Failed to clear locally computed checkpoints from override seq",
2234 )?;
2235 }
2236 }
2237 }
2238
2239 if let Some((checkpoint_seq, checkpoint_digest)) =
2240 checkpoint_store.get_checkpoint_fork_detected()?
2241 && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2242 {
2243 info!(
2244 "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2245 checkpoint_seq, checkpoint_digest
2246 );
2247 checkpoint_store
2248 .clear_checkpoint_fork_detected()
2249 .expect("Failed to clear checkpoint fork detected marker");
2250 }
2251 Ok(())
2252 }
2253
2254 fn try_recover_transaction_fork(
2255 checkpoint_store: &CheckpointStore,
2256 recovery: &ForkRecoveryConfig,
2257 ) -> Result<()> {
2258 if recovery.transaction_overrides.is_empty() {
2259 return Ok(());
2260 }
2261
2262 if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2263 && recovery
2264 .transaction_overrides
2265 .contains_key(&tx_digest.to_string())
2266 {
2267 info!(
2268 "Fork recovery enabled: clearing transaction fork for tx {:?}",
2269 tx_digest
2270 );
2271 checkpoint_store
2272 .clear_transaction_fork_detected()
2273 .expect("Failed to clear transaction fork detected marker");
2274 }
2275 Ok(())
2276 }
2277
2278 fn get_current_timestamp() -> u64 {
2279 std::time::SystemTime::now()
2280 .duration_since(std::time::SystemTime::UNIX_EPOCH)
2281 .unwrap()
2282 .as_secs()
2283 }
2284
2285 async fn handle_checkpoint_fork(
2286 checkpoint_seq: u64,
2287 checkpoint_digest: CheckpointDigest,
2288 checkpoint_metrics: &CheckpointMetrics,
2289 fork_recovery: Option<&ForkRecoveryConfig>,
2290 ) -> Result<()> {
2291 checkpoint_metrics
2292 .checkpoint_fork_crash_mode
2293 .with_label_values(&[
2294 &checkpoint_seq.to_string(),
2295 &Self::get_digest_prefix(checkpoint_digest),
2296 &Self::get_current_timestamp().to_string(),
2297 ])
2298 .set(1);
2299
2300 let behavior = fork_recovery
2301 .map(|fr| fr.fork_crash_behavior)
2302 .unwrap_or_default();
2303
2304 match behavior {
2305 ForkCrashBehavior::AwaitForkRecovery => {
2306 error!(
2307 checkpoint_seq = checkpoint_seq,
2308 checkpoint_digest = ?checkpoint_digest,
2309 "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2310 );
2311 futures::future::pending::<()>().await;
2312 unreachable!("pending() should never return");
2313 }
2314 ForkCrashBehavior::ReturnError => {
2315 error!(
2316 checkpoint_seq = checkpoint_seq,
2317 checkpoint_digest = ?checkpoint_digest,
2318 "Checkpoint fork detected! Returning error."
2319 );
2320 Err(anyhow::anyhow!(
2321 "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2322 checkpoint_seq,
2323 checkpoint_digest
2324 ))
2325 }
2326 }
2327 }
2328
2329 async fn handle_transaction_fork(
2330 tx_digest: TransactionDigest,
2331 expected_effects_digest: TransactionEffectsDigest,
2332 actual_effects_digest: TransactionEffectsDigest,
2333 checkpoint_metrics: &CheckpointMetrics,
2334 fork_recovery: Option<&ForkRecoveryConfig>,
2335 ) -> Result<()> {
2336 checkpoint_metrics
2337 .transaction_fork_crash_mode
2338 .with_label_values(&[
2339 &Self::get_digest_prefix(tx_digest),
2340 &Self::get_digest_prefix(expected_effects_digest),
2341 &Self::get_digest_prefix(actual_effects_digest),
2342 &Self::get_current_timestamp().to_string(),
2343 ])
2344 .set(1);
2345
2346 let behavior = fork_recovery
2347 .map(|fr| fr.fork_crash_behavior)
2348 .unwrap_or_default();
2349
2350 match behavior {
2351 ForkCrashBehavior::AwaitForkRecovery => {
2352 error!(
2353 tx_digest = ?tx_digest,
2354 expected_effects_digest = ?expected_effects_digest,
2355 actual_effects_digest = ?actual_effects_digest,
2356 "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2357 );
2358 futures::future::pending::<()>().await;
2359 unreachable!("pending() should never return");
2360 }
2361 ForkCrashBehavior::ReturnError => {
2362 error!(
2363 tx_digest = ?tx_digest,
2364 expected_effects_digest = ?expected_effects_digest,
2365 actual_effects_digest = ?actual_effects_digest,
2366 "Transaction fork detected! Returning error."
2367 );
2368 Err(anyhow::anyhow!(
2369 "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2370 tx_digest,
2371 expected_effects_digest,
2372 actual_effects_digest
2373 ))
2374 }
2375 }
2376 }
2377}
2378
2379#[cfg(not(msim))]
2380impl SuiNode {
2381 async fn fetch_jwks(
2382 _authority: AuthorityName,
2383 provider: &OIDCProvider,
2384 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2385 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2386 use sui_types::error::SuiErrorKind;
2387 let client = reqwest::Client::new();
2388 fetch_jwks(provider, &client, true)
2389 .await
2390 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2391 }
2392}
2393
2394#[cfg(msim)]
2395impl SuiNode {
2396 pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2397 self.sim_state.sim_node.id()
2398 }
2399
2400 pub fn set_safe_mode_expected(&self, new_value: bool) {
2401 info!("Setting safe mode expected to {}", new_value);
2402 self.sim_state
2403 .sim_safe_mode_expected
2404 .store(new_value, Ordering::Relaxed);
2405 }
2406
2407 #[allow(unused_variables)]
2408 async fn fetch_jwks(
2409 authority: AuthorityName,
2410 provider: &OIDCProvider,
2411 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2412 get_jwk_injector()(authority, provider)
2413 }
2414}
2415
2416enum SpawnOnce {
2417 Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2419 #[allow(unused)]
2420 Started(JoinHandle<()>),
2421}
2422
2423impl SpawnOnce {
2424 pub fn new(
2425 ready_rx: oneshot::Receiver<()>,
2426 future: impl Future<Output = ()> + Send + 'static,
2427 ) -> Self {
2428 Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2429 }
2430
2431 pub async fn start(self) -> Self {
2432 match self {
2433 Self::Unstarted(ready_rx, future) => {
2434 let future = future.into_inner();
2435 let handle = tokio::spawn(future);
2436 ready_rx.await.unwrap();
2437 Self::Started(handle)
2438 }
2439 Self::Started(_) => self,
2440 }
2441 }
2442}
2443
2444fn send_trusted_peer_change(
2446 config: &NodeConfig,
2447 sender: &watch::Sender<TrustedPeerChangeEvent>,
2448 epoch_state_state: &EpochStartSystemState,
2449) -> Result<(), watch::error::SendError<TrustedPeerChangeEvent>> {
2450 sender
2451 .send(TrustedPeerChangeEvent {
2452 new_peers: epoch_state_state.get_validator_as_p2p_peers(config.protocol_public_key()),
2453 })
2454 .tap_err(|err| {
2455 warn!(
2456 "Failed to send validator peer information to state sync: {:?}",
2457 err
2458 );
2459 })
2460}
2461
2462fn build_kv_store(
2463 state: &Arc<AuthorityState>,
2464 config: &NodeConfig,
2465 registry: &Registry,
2466) -> Result<Arc<TransactionKeyValueStore>> {
2467 let metrics = KeyValueStoreMetrics::new(registry);
2468 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2469
2470 let base_url = &config.transaction_kv_store_read_config.base_url;
2471
2472 if base_url.is_empty() {
2473 info!("no http kv store url provided, using local db only");
2474 return Ok(Arc::new(db_store));
2475 }
2476
2477 let base_url: url::Url = base_url.parse().tap_err(|e| {
2478 error!(
2479 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2480 base_url, e
2481 )
2482 })?;
2483
2484 let network_str = match state.get_chain_identifier().chain() {
2485 Chain::Mainnet => "/mainnet",
2486 _ => {
2487 info!("using local db only for kv store");
2488 return Ok(Arc::new(db_store));
2489 }
2490 };
2491
2492 let base_url = base_url.join(network_str)?.to_string();
2493 let http_store = HttpKVStore::new_kv(
2494 &base_url,
2495 config.transaction_kv_store_read_config.cache_size,
2496 metrics.clone(),
2497 )?;
2498 info!("using local key-value store with fallback to http key-value store");
2499 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2500 db_store,
2501 http_store,
2502 metrics,
2503 "json_rpc_fallback",
2504 )))
2505}
2506
2507async fn build_http_servers(
2508 state: Arc<AuthorityState>,
2509 store: RocksDbStore,
2510 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2511 config: &NodeConfig,
2512 prometheus_registry: &Registry,
2513 server_version: ServerVersion,
2514) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2515 if config.consensus_config().is_some() {
2517 return Ok((HttpServers::default(), None));
2518 }
2519
2520 let mut router = axum::Router::new();
2521
2522 let json_rpc_router = {
2523 let traffic_controller = state.traffic_controller.clone();
2524 let mut server = JsonRpcServerBuilder::new(
2525 env!("CARGO_PKG_VERSION"),
2526 prometheus_registry,
2527 traffic_controller,
2528 config.policy_config.clone(),
2529 );
2530
2531 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2532
2533 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2534 server.register_module(ReadApi::new(
2535 state.clone(),
2536 kv_store.clone(),
2537 metrics.clone(),
2538 ))?;
2539 server.register_module(CoinReadApi::new(
2540 state.clone(),
2541 kv_store.clone(),
2542 metrics.clone(),
2543 ))?;
2544
2545 if config.run_with_range.is_none() {
2548 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2549 }
2550 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2551 server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2552
2553 if let Some(transaction_orchestrator) = transaction_orchestrator {
2554 server.register_module(TransactionExecutionApi::new(
2555 state.clone(),
2556 transaction_orchestrator.clone(),
2557 metrics.clone(),
2558 ))?;
2559 }
2560
2561 let name_service_config =
2562 if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2563 config.name_service_package_address,
2564 config.name_service_registry_id,
2565 config.name_service_reverse_registry_id,
2566 ) {
2567 sui_name_service::NameServiceConfig::new(
2568 package_address,
2569 registry_id,
2570 reverse_registry_id,
2571 )
2572 } else {
2573 match state.get_chain_identifier().chain() {
2574 Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2575 Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2576 Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2577 }
2578 };
2579
2580 server.register_module(IndexerApi::new(
2581 state.clone(),
2582 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2583 kv_store,
2584 name_service_config,
2585 metrics,
2586 config.indexer_max_subscriptions,
2587 ))?;
2588 server.register_module(MoveUtils::new(state.clone()))?;
2589
2590 let server_type = config.jsonrpc_server_type();
2591
2592 server.to_router(server_type).await?
2593 };
2594
2595 router = router.merge(json_rpc_router);
2596
2597 let (subscription_service_checkpoint_sender, subscription_service_handle) =
2598 SubscriptionService::build(prometheus_registry);
2599 let rpc_router = {
2600 let mut rpc_service =
2601 sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2602 rpc_service.with_server_version(server_version);
2603
2604 if let Some(config) = config.rpc.clone() {
2605 rpc_service.with_config(config);
2606 }
2607
2608 rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2609 rpc_service.with_subscription_service(subscription_service_handle);
2610
2611 if let Some(transaction_orchestrator) = transaction_orchestrator {
2612 rpc_service.with_executor(transaction_orchestrator.clone())
2613 }
2614
2615 rpc_service.into_router().await
2616 };
2617
2618 let layers = ServiceBuilder::new()
2619 .map_request(|mut request: axum::http::Request<_>| {
2620 if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2621 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2622 request.extensions_mut().insert(axum_connect_info);
2623 }
2624 request
2625 })
2626 .layer(axum::middleware::from_fn(server_timing_middleware))
2627 .layer(
2629 tower_http::cors::CorsLayer::new()
2630 .allow_methods([http::Method::GET, http::Method::POST])
2631 .allow_origin(tower_http::cors::Any)
2632 .allow_headers(tower_http::cors::Any),
2633 );
2634
2635 router = router.merge(rpc_router).layer(layers);
2636
2637 let https = if let Some((tls_config, https_address)) = config
2638 .rpc()
2639 .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2640 {
2641 let https = sui_http::Builder::new()
2642 .tls_single_cert(tls_config.cert(), tls_config.key())
2643 .and_then(|builder| builder.serve(https_address, router.clone()))
2644 .map_err(|e| anyhow::anyhow!(e))?;
2645
2646 info!(
2647 https_address =? https.local_addr(),
2648 "HTTPS rpc server listening on {}",
2649 https.local_addr()
2650 );
2651
2652 Some(https)
2653 } else {
2654 None
2655 };
2656
2657 let http = sui_http::Builder::new()
2658 .serve(&config.json_rpc_address, router)
2659 .map_err(|e| anyhow::anyhow!(e))?;
2660
2661 info!(
2662 http_address =? http.local_addr(),
2663 "HTTP rpc server listening on {}",
2664 http.local_addr()
2665 );
2666
2667 Ok((
2668 HttpServers {
2669 http: Some(http),
2670 https,
2671 },
2672 Some(subscription_service_checkpoint_sender),
2673 ))
2674}
2675
2676#[cfg(not(test))]
2677fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2678 protocol_config.max_transactions_per_checkpoint() as usize
2679}
2680
2681#[cfg(test)]
2682fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2683 2
2684}
2685
2686#[derive(Default)]
2687struct HttpServers {
2688 #[allow(unused)]
2689 http: Option<sui_http::ServerHandle>,
2690 #[allow(unused)]
2691 https: Option<sui_http::ServerHandle>,
2692}
2693
2694#[cfg(test)]
2695mod tests {
2696 use super::*;
2697 use prometheus::Registry;
2698 use std::collections::BTreeMap;
2699 use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2700 use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2701 use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2702
2703 #[tokio::test]
2704 async fn test_fork_error_and_recovery_both_paths() {
2705 let checkpoint_store = CheckpointStore::new_for_tests();
2706 let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2707
2708 let seq_num = 42;
2710 let digest = CheckpointDigest::random();
2711 checkpoint_store
2712 .record_checkpoint_fork_detected(seq_num, digest)
2713 .unwrap();
2714
2715 let fork_recovery = ForkRecoveryConfig {
2716 transaction_overrides: Default::default(),
2717 checkpoint_overrides: Default::default(),
2718 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2719 };
2720
2721 let r = SuiNode::check_and_recover_forks(
2722 &checkpoint_store,
2723 &checkpoint_metrics,
2724 true,
2725 Some(&fork_recovery),
2726 )
2727 .await;
2728 assert!(r.is_err());
2729 assert!(
2730 r.unwrap_err()
2731 .to_string()
2732 .contains("Checkpoint fork detected")
2733 );
2734
2735 let mut checkpoint_overrides = BTreeMap::new();
2736 checkpoint_overrides.insert(seq_num, digest.to_string());
2737 let fork_recovery_with_override = ForkRecoveryConfig {
2738 transaction_overrides: Default::default(),
2739 checkpoint_overrides,
2740 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2741 };
2742 let r = SuiNode::check_and_recover_forks(
2743 &checkpoint_store,
2744 &checkpoint_metrics,
2745 true,
2746 Some(&fork_recovery_with_override),
2747 )
2748 .await;
2749 assert!(r.is_ok());
2750 assert!(
2751 checkpoint_store
2752 .get_checkpoint_fork_detected()
2753 .unwrap()
2754 .is_none()
2755 );
2756
2757 let tx_digest = TransactionDigest::random();
2759 let expected_effects = TransactionEffectsDigest::random();
2760 let actual_effects = TransactionEffectsDigest::random();
2761 checkpoint_store
2762 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2763 .unwrap();
2764
2765 let fork_recovery = ForkRecoveryConfig {
2766 transaction_overrides: Default::default(),
2767 checkpoint_overrides: Default::default(),
2768 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2769 };
2770 let r = SuiNode::check_and_recover_forks(
2771 &checkpoint_store,
2772 &checkpoint_metrics,
2773 true,
2774 Some(&fork_recovery),
2775 )
2776 .await;
2777 assert!(r.is_err());
2778 assert!(
2779 r.unwrap_err()
2780 .to_string()
2781 .contains("Transaction fork detected")
2782 );
2783
2784 let mut transaction_overrides = BTreeMap::new();
2785 transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2786 let fork_recovery_with_override = ForkRecoveryConfig {
2787 transaction_overrides,
2788 checkpoint_overrides: Default::default(),
2789 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2790 };
2791 let r = SuiNode::check_and_recover_forks(
2792 &checkpoint_store,
2793 &checkpoint_metrics,
2794 true,
2795 Some(&fork_recovery_with_override),
2796 )
2797 .await;
2798 assert!(r.is_ok());
2799 assert!(
2800 checkpoint_store
2801 .get_transaction_fork_detected()
2802 .unwrap()
2803 .is_none()
2804 );
2805 }
2806}