1use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::anyhow;
11use anyhow::Context;
12use anyhow::Result;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::debug_fatal;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::authority::authority_store_tables::{
29 AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
30};
31use sui_core::authority::backpressure::BackpressureManager;
32use sui_core::authority::epoch_start_configuration::EpochFlag;
33use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
34use sui_core::authority::shared_object_version_manager::Schedulable;
35use sui_core::authority::ExecutionEnv;
36use sui_core::authority::RandomnessRoundReceiver;
37use sui_core::consensus_adapter::ConsensusClient;
38use sui_core::consensus_manager::UpdatableConsensusClient;
39use sui_core::epoch::randomness::RandomnessManager;
40use sui_core::execution_cache::build_execution_cache;
41use sui_network::validator::server::SUI_TLS_SERVER_NAME;
42use sui_types::full_checkpoint_content::Checkpoint;
43
44use sui_core::execution_scheduler::SchedulingSource;
45use sui_core::global_state_hasher::GlobalStateHashMetrics;
46use sui_core::storage::RestReadStore;
47use sui_json_rpc::bridge_api::BridgeReadApi;
48use sui_json_rpc_api::JsonRpcMetrics;
49use sui_network::randomness;
50use sui_rpc_api::subscription::SubscriptionService;
51use sui_rpc_api::RpcMetrics;
52use sui_rpc_api::ServerVersion;
53use sui_types::base_types::ConciseableName;
54use sui_types::crypto::RandomnessRound;
55use sui_types::digests::{
56 ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
57};
58use sui_types::executable_transaction::VerifiedExecutableTransaction;
59use sui_types::messages_consensus::AuthorityCapabilitiesV2;
60use sui_types::messages_consensus::ConsensusTransactionKind;
61use sui_types::sui_system_state::SuiSystemState;
62use sui_types::transaction::VerifiedCertificate;
63use tap::tap::TapFallible;
64use tokio::sync::oneshot;
65use tokio::sync::{broadcast, mpsc, watch, Mutex};
66use tokio::task::JoinHandle;
67use tower::ServiceBuilder;
68use tracing::{debug, error, warn};
69use tracing::{error_span, info, Instrument};
70
71use fastcrypto_zkp::bn254::zk_login::JWK;
72pub use handle::SuiNodeHandle;
73use mysten_metrics::{spawn_monitored_task, RegistryService};
74use mysten_service::server_timing::server_timing_middleware;
75use sui_config::node::{DBCheckpointConfig, RunWithRange};
76use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
77use sui_config::node_config_metrics::NodeConfigMetrics;
78use sui_config::{ConsensusConfig, NodeConfig};
79use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
80use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
81use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
82use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
83use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
84use sui_core::authority_aggregator::{AuthAggMetrics, AuthorityAggregator};
85use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
86use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
87use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
88use sui_core::checkpoints::{
89 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
90 SubmitCheckpointToConsensus,
91};
92use sui_core::consensus_adapter::{
93 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
94};
95use sui_core::consensus_manager::ConsensusManager;
96use sui_core::consensus_throughput_calculator::{
97 ConsensusThroughputCalculator, ConsensusThroughputProfiler, ThroughputProfileRanges,
98};
99use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
100use sui_core::db_checkpoint_handler::DBCheckpointHandler;
101use sui_core::epoch::committee_store::CommitteeStore;
102use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
103use sui_core::epoch::epoch_metrics::EpochMetrics;
104use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
105use sui_core::global_state_hasher::GlobalStateHasher;
106use sui_core::jsonrpc_index::IndexStore;
107use sui_core::module_cache_metrics::ResolverMetrics;
108use sui_core::overload_monitor::overload_monitor;
109use sui_core::rpc_index::RpcIndexStore;
110use sui_core::signature_verifier::SignatureVerifierMetrics;
111use sui_core::storage::RocksDbStore;
112use sui_core::transaction_orchestrator::TransactionOrchestrator;
113use sui_core::{
114 authority::{AuthorityState, AuthorityStore},
115 authority_client::NetworkAuthorityClient,
116};
117use sui_json_rpc::coin_api::CoinReadApi;
118use sui_json_rpc::governance_api::GovernanceReadApi;
119use sui_json_rpc::indexer_api::IndexerApi;
120use sui_json_rpc::move_utils::MoveUtils;
121use sui_json_rpc::read_api::ReadApi;
122use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
123use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
124use sui_json_rpc::JsonRpcServerBuilder;
125use sui_macros::fail_point;
126use sui_macros::{fail_point_async, replay_log};
127use sui_network::api::ValidatorServer;
128use sui_network::discovery;
129use sui_network::discovery::TrustedPeerChangeEvent;
130use sui_network::state_sync;
131use sui_network::validator::server::ServerBuilder;
132use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
133use sui_snapshot::uploader::StateSnapshotUploader;
134use sui_storage::{
135 http_key_value_store::HttpKVStore,
136 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
137 key_value_store_metrics::KeyValueStoreMetrics,
138};
139use sui_types::base_types::{AuthorityName, EpochId};
140use sui_types::committee::Committee;
141use sui_types::crypto::KeypairTraits;
142use sui_types::error::{SuiError, SuiResult};
143use sui_types::messages_consensus::{
144 check_total_jwk_size, AuthorityCapabilitiesV1, ConsensusTransaction,
145};
146use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
147use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
148use sui_types::sui_system_state::SuiSystemStateTrait;
149use sui_types::supported_protocol_versions::SupportedProtocolVersions;
150use typed_store::rocks::default_db_options;
151use typed_store::DBMetrics;
152
153use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
154
155pub mod admin;
156mod handle;
157pub mod metrics;
158
159pub struct ValidatorComponents {
160 validator_server_handle: SpawnOnce,
161 validator_overload_monitor_handle: Option<JoinHandle<()>>,
162 consensus_manager: Arc<ConsensusManager>,
163 consensus_store_pruner: ConsensusStorePruner,
164 consensus_adapter: Arc<ConsensusAdapter>,
165 checkpoint_metrics: Arc<CheckpointMetrics>,
166 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
167}
168pub struct P2pComponents {
169 p2p_network: Network,
170 known_peers: HashMap<PeerId, String>,
171 discovery_handle: discovery::Handle,
172 state_sync_handle: state_sync::Handle,
173 randomness_handle: randomness::Handle,
174}
175
176#[cfg(msim)]
177mod simulator {
178 use std::sync::atomic::AtomicBool;
179 use sui_types::error::SuiErrorKind;
180
181 use super::*;
182 pub(super) struct SimState {
183 pub sim_node: sui_simulator::runtime::NodeHandle,
184 pub sim_safe_mode_expected: AtomicBool,
185 _leak_detector: sui_simulator::NodeLeakDetector,
186 }
187
188 impl Default for SimState {
189 fn default() -> Self {
190 Self {
191 sim_node: sui_simulator::runtime::NodeHandle::current(),
192 sim_safe_mode_expected: AtomicBool::new(false),
193 _leak_detector: sui_simulator::NodeLeakDetector::new(),
194 }
195 }
196 }
197
198 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
199 + Send
200 + Sync
201 + 'static;
202
203 fn default_fetch_jwks(
204 _authority: AuthorityName,
205 _provider: &OIDCProvider,
206 ) -> SuiResult<Vec<(JwkId, JWK)>> {
207 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
208 parse_jwks(
210 sui_types::zk_login_util::DEFAULT_JWK_BYTES,
211 &OIDCProvider::Twitch,
212 true,
213 )
214 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
215 }
216
217 thread_local! {
218 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
219 }
220
221 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
222 JWK_INJECTOR.with(|injector| injector.borrow().clone())
223 }
224
225 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
226 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
227 }
228}
229
230#[cfg(msim)]
231pub use simulator::set_jwk_injector;
232#[cfg(msim)]
233use simulator::*;
234use sui_core::authority::authority_store_pruner::{ObjectsCompactionFilter, PrunerWatermarks};
235use sui_core::{
236 consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
237 validator_tx_finalizer::ValidatorTxFinalizer,
238};
239
240const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
241
242pub struct SuiNode {
243 config: NodeConfig,
244 validator_components: Mutex<Option<ValidatorComponents>>,
245
246 #[allow(unused)]
248 http_servers: HttpServers,
249
250 state: Arc<AuthorityState>,
251 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
252 registry_service: RegistryService,
253 metrics: Arc<SuiNodeMetrics>,
254 checkpoint_metrics: Arc<CheckpointMetrics>,
255
256 _discovery: discovery::Handle,
257 _connection_monitor_handle: consensus_core::ConnectionMonitorHandle,
258 state_sync_handle: state_sync::Handle,
259 randomness_handle: randomness::Handle,
260 checkpoint_store: Arc<CheckpointStore>,
261 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
262 connection_monitor_status: Arc<ConnectionMonitorStatus>,
263
264 end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
266
267 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
269
270 backpressure_manager: Arc<BackpressureManager>,
271
272 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
273
274 #[cfg(msim)]
275 sim_state: SimState,
276
277 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
278 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
280
281 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
286
287 subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
288}
289
290impl fmt::Debug for SuiNode {
291 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
292 f.debug_struct("SuiNode")
293 .field("name", &self.state.name.concise())
294 .finish()
295 }
296}
297
298static MAX_JWK_KEYS_PER_FETCH: usize = 100;
299
300impl SuiNode {
301 pub async fn start(
302 config: NodeConfig,
303 registry_service: RegistryService,
304 ) -> Result<Arc<SuiNode>> {
305 Self::start_async(
306 config,
307 registry_service,
308 ServerVersion::new("sui-node", "unknown"),
309 )
310 .await
311 }
312
313 fn start_jwk_updater(
314 config: &NodeConfig,
315 metrics: Arc<SuiNodeMetrics>,
316 authority: AuthorityName,
317 epoch_store: Arc<AuthorityPerEpochStore>,
318 consensus_adapter: Arc<ConsensusAdapter>,
319 ) {
320 let epoch = epoch_store.epoch();
321
322 let supported_providers = config
323 .zklogin_oauth_providers
324 .get(&epoch_store.get_chain_identifier().chain())
325 .unwrap_or(&BTreeSet::new())
326 .iter()
327 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
328 .collect::<Vec<_>>();
329
330 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
331
332 info!(
333 ?fetch_interval,
334 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
335 );
336
337 fn validate_jwk(
338 metrics: &Arc<SuiNodeMetrics>,
339 provider: &OIDCProvider,
340 id: &JwkId,
341 jwk: &JWK,
342 ) -> bool {
343 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
344 warn!(
345 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
346 id.iss, provider
347 );
348 metrics
349 .invalid_jwks
350 .with_label_values(&[&provider.to_string()])
351 .inc();
352 return false;
353 };
354
355 if iss_provider != *provider {
356 warn!(
357 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
358 id.iss, provider, iss_provider
359 );
360 metrics
361 .invalid_jwks
362 .with_label_values(&[&provider.to_string()])
363 .inc();
364 return false;
365 }
366
367 if !check_total_jwk_size(id, jwk) {
368 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
369 metrics
370 .invalid_jwks
371 .with_label_values(&[&provider.to_string()])
372 .inc();
373 return false;
374 }
375
376 true
377 }
378
379 for p in supported_providers.into_iter() {
388 let provider_str = p.to_string();
389 let epoch_store = epoch_store.clone();
390 let consensus_adapter = consensus_adapter.clone();
391 let metrics = metrics.clone();
392 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
393 async move {
394 let mut seen = HashSet::new();
397 loop {
398 info!("fetching JWK for provider {:?}", p);
399 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
400 match Self::fetch_jwks(authority, &p).await {
401 Err(e) => {
402 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
403 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
404 tokio::time::sleep(Duration::from_secs(30)).await;
406 continue;
407 }
408 Ok(mut keys) => {
409 metrics.total_jwks
410 .with_label_values(&[&provider_str])
411 .inc_by(keys.len() as u64);
412
413 keys.retain(|(id, jwk)| {
414 validate_jwk(&metrics, &p, id, jwk) &&
415 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
416 seen.insert((id.clone(), jwk.clone()))
417 });
418
419 metrics.unique_jwks
420 .with_label_values(&[&provider_str])
421 .inc_by(keys.len() as u64);
422
423 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
426 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
427 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
428 }
429
430 for (id, jwk) in keys.into_iter() {
431 info!("Submitting JWK to consensus: {:?}", id);
432
433 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
434 consensus_adapter.submit(txn, None, &epoch_store, None, None)
435 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
436 .ok();
437 }
438 }
439 }
440 tokio::time::sleep(fetch_interval).await;
441 }
442 }
443 .instrument(error_span!("jwk_updater_task", epoch)),
444 ));
445 }
446 }
447
448 pub async fn start_async(
449 config: NodeConfig,
450 registry_service: RegistryService,
451 server_version: ServerVersion,
452 ) -> Result<Arc<SuiNode>> {
453 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
454 let mut config = config.clone();
455 if config.supported_protocol_versions.is_none() {
456 info!(
457 "populating config.supported_protocol_versions with default {:?}",
458 SupportedProtocolVersions::SYSTEM_DEFAULT
459 );
460 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
461 }
462
463 let run_with_range = config.run_with_range;
464 let is_validator = config.consensus_config().is_some();
465 let is_full_node = !is_validator;
466 let prometheus_registry = registry_service.default_registry();
467
468 info!(node =? config.protocol_public_key(),
469 "Initializing sui-node listening on {}", config.network_address
470 );
471
472 DBMetrics::init(registry_service.clone());
474
475 mysten_metrics::init_metrics(&prometheus_registry);
477 #[cfg(not(msim))]
479 mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
480
481 let genesis = config.genesis()?.clone();
482
483 let secret = Arc::pin(config.protocol_key_pair().copy());
484 let genesis_committee = genesis.committee()?;
485 let committee_store = Arc::new(CommitteeStore::new(
486 config.db_path().join("epochs"),
487 &genesis_committee,
488 None,
489 ));
490
491 let pruner_watermarks = Arc::new(PrunerWatermarks::default());
492 let checkpoint_store = CheckpointStore::new(
493 &config.db_path().join("checkpoints"),
494 pruner_watermarks.clone(),
495 );
496 let checkpoint_metrics = CheckpointMetrics::new(®istry_service.default_registry());
497
498 Self::check_and_recover_forks(
499 &checkpoint_store,
500 &checkpoint_metrics,
501 is_validator,
502 config.fork_recovery.as_ref(),
503 )
504 .await?;
505
506 let mut pruner_db = None;
507 if config
508 .authority_store_pruning_config
509 .enable_compaction_filter
510 {
511 pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
512 &config.db_path().join("store"),
513 )));
514 }
515 let compaction_filter = pruner_db
516 .clone()
517 .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
518
519 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
521 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
522 enable_write_stall,
523 compaction_filter,
524 };
525 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
526 &config.db_path().join("store"),
527 Some(perpetual_tables_options),
528 Some(pruner_watermarks.epoch_id.clone()),
529 ));
530 let is_genesis = perpetual_tables
531 .database_is_empty()
532 .expect("Database read should not fail at init.");
533
534 let backpressure_manager =
535 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
536
537 let store =
538 AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
539
540 let cur_epoch = store.get_recovery_epoch_at_restart()?;
541 let committee = committee_store
542 .get_committee(&cur_epoch)?
543 .expect("Committee of the current epoch must exist");
544 let epoch_start_configuration = store
545 .get_epoch_start_configuration()?
546 .expect("EpochStartConfiguration of the current epoch must exist");
547 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
548 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
549
550 let cache_traits = build_execution_cache(
551 &config.execution_cache,
552 &prometheus_registry,
553 &store,
554 backpressure_manager.clone(),
555 );
556
557 let auth_agg = {
558 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
559 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
560 Arc::new(ArcSwap::new(Arc::new(
561 AuthorityAggregator::new_from_epoch_start_state(
562 epoch_start_configuration.epoch_start_state(),
563 &committee_store,
564 safe_client_metrics_base,
565 auth_agg_metrics,
566 ),
567 )))
568 };
569
570 let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
571 let chain = match config.chain_override_for_testing {
572 Some(chain) => chain,
573 None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
574 };
575
576 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
577 let epoch_store = AuthorityPerEpochStore::new(
578 config.protocol_public_key(),
579 committee.clone(),
580 &config.db_path().join("store"),
581 Some(epoch_options.options),
582 EpochMetrics::new(®istry_service.default_registry()),
583 epoch_start_configuration,
584 cache_traits.backing_package_store.clone(),
585 cache_traits.object_store.clone(),
586 cache_metrics,
587 signature_verifier_metrics,
588 &config.expensive_safety_check_config,
589 (chain_id, chain),
590 checkpoint_store
591 .get_highest_executed_checkpoint_seq_number()
592 .expect("checkpoint store read cannot fail")
593 .unwrap_or(0),
594 Arc::new(SubmittedTransactionCacheMetrics::new(
595 ®istry_service.default_registry(),
596 )),
597 )?;
598
599 info!("created epoch store");
600
601 replay_log!(
602 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
603 epoch_store.epoch(),
604 epoch_store.protocol_config()
605 );
606
607 if is_genesis {
609 info!("checking SUI conservation at genesis");
610 cache_traits
615 .reconfig_api
616 .expensive_check_sui_conservation(&epoch_store)
617 .expect("SUI conservation check cannot fail at genesis");
618 }
619
620 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
621 let default_buffer_stake = epoch_store
622 .protocol_config()
623 .buffer_stake_for_protocol_upgrade_bps();
624 if effective_buffer_stake != default_buffer_stake {
625 warn!(
626 ?effective_buffer_stake,
627 ?default_buffer_stake,
628 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
629 );
630 }
631
632 checkpoint_store.insert_genesis_checkpoint(
633 genesis.checkpoint(),
634 genesis.checkpoint_contents().clone(),
635 &epoch_store,
636 );
637
638 info!("creating state sync store");
639 let state_sync_store = RocksDbStore::new(
640 cache_traits.clone(),
641 committee_store.clone(),
642 checkpoint_store.clone(),
643 );
644
645 let index_store = if is_full_node && config.enable_index_processing {
646 info!("creating index store");
647 Some(Arc::new(IndexStore::new(
648 config.db_path().join("indexes"),
649 &prometheus_registry,
650 epoch_store
651 .protocol_config()
652 .max_move_identifier_len_as_option(),
653 config.remove_deprecated_tables,
654 )))
655 } else {
656 None
657 };
658
659 let rpc_index = if is_full_node && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
660 Some(Arc::new(
661 RpcIndexStore::new(
662 &config.db_path(),
663 &store,
664 &checkpoint_store,
665 &epoch_store,
666 &cache_traits.backing_package_store,
667 pruner_watermarks.checkpoint_id.clone(),
668 config.rpc().cloned().unwrap_or_default(),
669 )
670 .await,
671 ))
672 } else {
673 None
674 };
675
676 let chain_identifier = epoch_store.get_chain_identifier();
677
678 info!("creating archive reader");
679 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
683 let (randomness_tx, randomness_rx) = mpsc::channel(
684 config
685 .p2p_config
686 .randomness
687 .clone()
688 .unwrap_or_default()
689 .mailbox_capacity(),
690 );
691 let P2pComponents {
692 p2p_network,
693 known_peers,
694 discovery_handle,
695 state_sync_handle,
696 randomness_handle,
697 } = Self::create_p2p_network(
698 &config,
699 state_sync_store.clone(),
700 chain_identifier,
701 trusted_peer_change_rx,
702 randomness_tx,
703 &prometheus_registry,
704 )?;
705
706 send_trusted_peer_change(
709 &config,
710 &trusted_peer_change_tx,
711 epoch_store.epoch_start_state(),
712 )
713 .expect("Initial trusted peers must be set");
714
715 info!("start snapshot upload");
716 let state_snapshot_handle = Self::start_state_snapshot(
718 &config,
719 &prometheus_registry,
720 checkpoint_store.clone(),
721 chain_identifier,
722 )?;
723
724 info!("start db checkpoint");
726 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
727 &config,
728 &prometheus_registry,
729 state_snapshot_handle.is_some(),
730 )?;
731
732 if !epoch_store
733 .protocol_config()
734 .simplified_unwrap_then_delete()
735 {
736 config
738 .authority_store_pruning_config
739 .set_killswitch_tombstone_pruning(true);
740 }
741
742 let authority_name = config.protocol_public_key();
743 let validator_tx_finalizer =
744 config
745 .enable_validator_tx_finalizer
746 .then_some(Arc::new(ValidatorTxFinalizer::new(
747 auth_agg.clone(),
748 authority_name,
749 &prometheus_registry,
750 )));
751
752 info!("create authority state");
753 let state = AuthorityState::new(
754 authority_name,
755 secret,
756 config.supported_protocol_versions.unwrap(),
757 store.clone(),
758 cache_traits.clone(),
759 epoch_store.clone(),
760 committee_store.clone(),
761 index_store.clone(),
762 rpc_index,
763 checkpoint_store.clone(),
764 &prometheus_registry,
765 genesis.objects(),
766 &db_checkpoint_config,
767 config.clone(),
768 validator_tx_finalizer,
769 chain_identifier,
770 pruner_db,
771 config.policy_config.clone(),
772 config.firewall_config.clone(),
773 pruner_watermarks,
774 )
775 .await;
776 if epoch_store.epoch() == 0 {
778 let txn = &genesis.transaction();
779 let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
780 let transaction =
781 sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
782 sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
783 genesis.transaction().data().clone(),
784 sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
785 ),
786 );
787 state
788 .try_execute_immediately(
789 &transaction,
790 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
791 &epoch_store,
792 )
793 .instrument(span)
794 .await
795 .unwrap();
796 }
797
798 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
800
801 if config
802 .expensive_safety_check_config
803 .enable_secondary_index_checks()
804 {
805 if let Some(indexes) = state.indexes.clone() {
806 sui_core::verify_indexes::verify_indexes(
807 state.get_global_state_hash_store().as_ref(),
808 indexes,
809 )
810 .expect("secondary indexes are inconsistent");
811 }
812 }
813
814 let (end_of_epoch_channel, end_of_epoch_receiver) =
815 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
816
817 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
818 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
819 auth_agg.load_full(),
820 state.clone(),
821 end_of_epoch_receiver,
822 &config.db_path(),
823 &prometheus_registry,
824 &config,
825 )))
826 } else {
827 None
828 };
829
830 let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
831 state.clone(),
832 state_sync_store,
833 &transaction_orchestrator.clone(),
834 &config,
835 &prometheus_registry,
836 server_version,
837 )
838 .await?;
839
840 let global_state_hasher = Arc::new(GlobalStateHasher::new(
841 cache_traits.global_state_hash_store.clone(),
842 GlobalStateHashMetrics::new(&prometheus_registry),
843 ));
844
845 let authority_names_to_peer_ids = epoch_store
846 .epoch_start_state()
847 .get_authority_names_to_peer_ids();
848
849 let network_connection_metrics = consensus_core::QuinnConnectionMetrics::new(
850 "sui",
851 ®istry_service.default_registry(),
852 );
853
854 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
855
856 let connection_monitor_handle = consensus_core::AnemoConnectionMonitor::spawn(
857 p2p_network.downgrade(),
858 Arc::new(network_connection_metrics),
859 known_peers,
860 );
861
862 let connection_monitor_status = ConnectionMonitorStatus {
863 connection_statuses: connection_monitor_handle.connection_statuses(),
864 authority_names_to_peer_ids,
865 };
866
867 let connection_monitor_status = Arc::new(connection_monitor_status);
868 let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry()));
869
870 sui_node_metrics
871 .binary_max_protocol_version
872 .set(ProtocolVersion::MAX.as_u64() as i64);
873 sui_node_metrics
874 .configured_max_protocol_version
875 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
876
877 let validator_components = if state.is_validator(&epoch_store) {
878 let (components, _) = futures::join!(
879 Self::construct_validator_components(
880 config.clone(),
881 state.clone(),
882 committee,
883 epoch_store.clone(),
884 checkpoint_store.clone(),
885 state_sync_handle.clone(),
886 randomness_handle.clone(),
887 Arc::downgrade(&global_state_hasher),
888 backpressure_manager.clone(),
889 connection_monitor_status.clone(),
890 ®istry_service,
891 sui_node_metrics.clone(),
892 checkpoint_metrics.clone(),
893 ),
894 Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
895 );
896 let mut components = components?;
897
898 components.consensus_adapter.submit_recovered(&epoch_store);
899
900 components.validator_server_handle = components.validator_server_handle.start().await;
902
903 Some(components)
904 } else {
905 None
906 };
907
908 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
910
911 let node = Self {
912 config,
913 validator_components: Mutex::new(validator_components),
914 http_servers,
915 state,
916 transaction_orchestrator,
917 registry_service,
918 metrics: sui_node_metrics,
919 checkpoint_metrics,
920
921 _discovery: discovery_handle,
922 _connection_monitor_handle: connection_monitor_handle,
923 state_sync_handle,
924 randomness_handle,
925 checkpoint_store,
926 global_state_hasher: Mutex::new(Some(global_state_hasher)),
927 end_of_epoch_channel,
928 connection_monitor_status,
929 trusted_peer_change_tx,
930 backpressure_manager,
931
932 _db_checkpoint_handle: db_checkpoint_handle,
933
934 #[cfg(msim)]
935 sim_state: Default::default(),
936
937 _state_snapshot_uploader_handle: state_snapshot_handle,
938 shutdown_channel_tx: shutdown_channel,
939
940 auth_agg,
941 subscription_service_checkpoint_sender,
942 };
943
944 info!("SuiNode started!");
945 let node = Arc::new(node);
946 let node_copy = node.clone();
947 spawn_monitored_task!(async move {
948 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
949 if let Err(error) = result {
950 warn!("Reconfiguration finished with error {:?}", error);
951 }
952 });
953
954 Ok(node)
955 }
956
957 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
958 self.end_of_epoch_channel.subscribe()
959 }
960
961 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
962 self.shutdown_channel_tx.subscribe()
963 }
964
965 pub fn current_epoch_for_testing(&self) -> EpochId {
966 self.state.current_epoch_for_testing()
967 }
968
969 pub fn db_checkpoint_path(&self) -> PathBuf {
970 self.config.db_checkpoint_path()
971 }
972
973 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
975 info!("close_epoch (current epoch = {})", epoch_store.epoch());
976 self.validator_components
977 .lock()
978 .await
979 .as_ref()
980 .ok_or_else(|| SuiError::from("Node is not a validator"))?
981 .consensus_adapter
982 .close_epoch(epoch_store);
983 Ok(())
984 }
985
986 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
987 self.state
988 .clear_override_protocol_upgrade_buffer_stake(epoch)
989 }
990
991 pub fn set_override_protocol_upgrade_buffer_stake(
992 &self,
993 epoch: EpochId,
994 buffer_stake_bps: u64,
995 ) -> SuiResult {
996 self.state
997 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
998 }
999
1000 pub async fn close_epoch_for_testing(&self) -> SuiResult {
1003 let epoch_store = self.state.epoch_store_for_testing();
1004 self.close_epoch(&epoch_store).await
1005 }
1006
1007 fn start_state_snapshot(
1008 config: &NodeConfig,
1009 prometheus_registry: &Registry,
1010 checkpoint_store: Arc<CheckpointStore>,
1011 chain_identifier: ChainIdentifier,
1012 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1013 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1014 let snapshot_uploader = StateSnapshotUploader::new(
1015 &config.db_checkpoint_path(),
1016 &config.snapshot_path(),
1017 remote_store_config.clone(),
1018 60,
1019 prometheus_registry,
1020 checkpoint_store,
1021 chain_identifier,
1022 )?;
1023 Ok(Some(snapshot_uploader.start()))
1024 } else {
1025 Ok(None)
1026 }
1027 }
1028
1029 fn start_db_checkpoint(
1030 config: &NodeConfig,
1031 prometheus_registry: &Registry,
1032 state_snapshot_enabled: bool,
1033 ) -> Result<(
1034 DBCheckpointConfig,
1035 Option<tokio::sync::broadcast::Sender<()>>,
1036 )> {
1037 let checkpoint_path = Some(
1038 config
1039 .db_checkpoint_config
1040 .checkpoint_path
1041 .clone()
1042 .unwrap_or_else(|| config.db_checkpoint_path()),
1043 );
1044 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1045 DBCheckpointConfig {
1046 checkpoint_path,
1047 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1048 true
1049 } else {
1050 config
1051 .db_checkpoint_config
1052 .perform_db_checkpoints_at_epoch_end
1053 },
1054 ..config.db_checkpoint_config.clone()
1055 }
1056 } else {
1057 config.db_checkpoint_config.clone()
1058 };
1059
1060 match (
1061 db_checkpoint_config.object_store_config.as_ref(),
1062 state_snapshot_enabled,
1063 ) {
1064 (None, false) => Ok((db_checkpoint_config, None)),
1069 (_, _) => {
1070 let handler = DBCheckpointHandler::new(
1071 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1072 db_checkpoint_config.object_store_config.as_ref(),
1073 60,
1074 db_checkpoint_config
1075 .prune_and_compact_before_upload
1076 .unwrap_or(true),
1077 config.authority_store_pruning_config.clone(),
1078 prometheus_registry,
1079 state_snapshot_enabled,
1080 )?;
1081 Ok((
1082 db_checkpoint_config,
1083 Some(DBCheckpointHandler::start(handler)),
1084 ))
1085 }
1086 }
1087 }
1088
1089 fn create_p2p_network(
1090 config: &NodeConfig,
1091 state_sync_store: RocksDbStore,
1092 chain_identifier: ChainIdentifier,
1093 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1094 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1095 prometheus_registry: &Registry,
1096 ) -> Result<P2pComponents> {
1097 let (state_sync, state_sync_server) = state_sync::Builder::new()
1098 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1099 .store(state_sync_store)
1100 .archive_config(config.archive_reader_config())
1101 .with_metrics(prometheus_registry)
1102 .build();
1103
1104 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1105 .config(config.p2p_config.clone())
1106 .build();
1107
1108 let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1109 let known_peers: HashMap<PeerId, String> = discovery_config
1110 .allowlisted_peers
1111 .clone()
1112 .into_iter()
1113 .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1114 .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1115 peer.peer_id
1116 .map(|peer_id| (peer_id, "seed_peer".to_string()))
1117 }))
1118 .collect();
1119
1120 let (randomness, randomness_router) =
1121 randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1122 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1123 .with_metrics(prometheus_registry)
1124 .build();
1125
1126 let p2p_network = {
1127 let routes = anemo::Router::new()
1128 .add_rpc_service(discovery_server)
1129 .add_rpc_service(state_sync_server);
1130 let routes = routes.merge(randomness_router);
1131
1132 let inbound_network_metrics =
1133 consensus_core::NetworkRouteMetrics::new("sui", "inbound", prometheus_registry);
1134 let outbound_network_metrics =
1135 consensus_core::NetworkRouteMetrics::new("sui", "outbound", prometheus_registry);
1136
1137 let service = ServiceBuilder::new()
1138 .layer(
1139 TraceLayer::new_for_server_errors()
1140 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1141 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1142 )
1143 .layer(CallbackLayer::new(
1144 consensus_core::MetricsMakeCallbackHandler::new(
1145 Arc::new(inbound_network_metrics),
1146 config.p2p_config.excessive_message_size(),
1147 ),
1148 ))
1149 .service(routes);
1150
1151 let outbound_layer = ServiceBuilder::new()
1152 .layer(
1153 TraceLayer::new_for_client_and_server_errors()
1154 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1155 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1156 )
1157 .layer(CallbackLayer::new(
1158 consensus_core::MetricsMakeCallbackHandler::new(
1159 Arc::new(outbound_network_metrics),
1160 config.p2p_config.excessive_message_size(),
1161 ),
1162 ))
1163 .into_inner();
1164
1165 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1166 anemo_config.max_frame_size = Some(1 << 30);
1169
1170 let mut quic_config = anemo_config.quic.unwrap_or_default();
1173 if quic_config.socket_send_buffer_size.is_none() {
1174 quic_config.socket_send_buffer_size = Some(20 << 20);
1175 }
1176 if quic_config.socket_receive_buffer_size.is_none() {
1177 quic_config.socket_receive_buffer_size = Some(20 << 20);
1178 }
1179 quic_config.allow_failed_socket_buffer_size_setting = true;
1180
1181 if quic_config.max_concurrent_bidi_streams.is_none() {
1184 quic_config.max_concurrent_bidi_streams = Some(500);
1185 }
1186 if quic_config.max_concurrent_uni_streams.is_none() {
1187 quic_config.max_concurrent_uni_streams = Some(500);
1188 }
1189 if quic_config.stream_receive_window.is_none() {
1190 quic_config.stream_receive_window = Some(100 << 20);
1191 }
1192 if quic_config.receive_window.is_none() {
1193 quic_config.receive_window = Some(200 << 20);
1194 }
1195 if quic_config.send_window.is_none() {
1196 quic_config.send_window = Some(200 << 20);
1197 }
1198 if quic_config.crypto_buffer_size.is_none() {
1199 quic_config.crypto_buffer_size = Some(1 << 20);
1200 }
1201 if quic_config.max_idle_timeout_ms.is_none() {
1202 quic_config.max_idle_timeout_ms = Some(30_000);
1203 }
1204 if quic_config.keep_alive_interval_ms.is_none() {
1205 quic_config.keep_alive_interval_ms = Some(5_000);
1206 }
1207 anemo_config.quic = Some(quic_config);
1208
1209 let server_name = format!("sui-{}", chain_identifier);
1210 let network = Network::bind(config.p2p_config.listen_address)
1211 .server_name(&server_name)
1212 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1213 .config(anemo_config)
1214 .outbound_request_layer(outbound_layer)
1215 .start(service)?;
1216 info!(
1217 server_name = server_name,
1218 "P2p network started on {}",
1219 network.local_addr()
1220 );
1221
1222 network
1223 };
1224
1225 let discovery_handle =
1226 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1227 let state_sync_handle = state_sync.start(p2p_network.clone());
1228 let randomness_handle = randomness.start(p2p_network.clone());
1229
1230 Ok(P2pComponents {
1231 p2p_network,
1232 known_peers,
1233 discovery_handle,
1234 state_sync_handle,
1235 randomness_handle,
1236 })
1237 }
1238
1239 async fn construct_validator_components(
1240 config: NodeConfig,
1241 state: Arc<AuthorityState>,
1242 committee: Arc<Committee>,
1243 epoch_store: Arc<AuthorityPerEpochStore>,
1244 checkpoint_store: Arc<CheckpointStore>,
1245 state_sync_handle: state_sync::Handle,
1246 randomness_handle: randomness::Handle,
1247 global_state_hasher: Weak<GlobalStateHasher>,
1248 backpressure_manager: Arc<BackpressureManager>,
1249 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1250 registry_service: &RegistryService,
1251 sui_node_metrics: Arc<SuiNodeMetrics>,
1252 checkpoint_metrics: Arc<CheckpointMetrics>,
1253 ) -> Result<ValidatorComponents> {
1254 let mut config_clone = config.clone();
1255 let consensus_config = config_clone
1256 .consensus_config
1257 .as_mut()
1258 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1259
1260 let client = Arc::new(UpdatableConsensusClient::new());
1261 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1262 &committee,
1263 consensus_config,
1264 state.name,
1265 connection_monitor_status.clone(),
1266 ®istry_service.default_registry(),
1267 epoch_store.protocol_config().clone(),
1268 client.clone(),
1269 checkpoint_store.clone(),
1270 ));
1271 let consensus_manager = Arc::new(ConsensusManager::new(
1272 &config,
1273 consensus_config,
1274 registry_service,
1275 client,
1276 ));
1277
1278 let consensus_store_pruner = ConsensusStorePruner::new(
1280 consensus_manager.get_storage_base_path(),
1281 consensus_config.db_retention_epochs(),
1282 consensus_config.db_pruner_period(),
1283 ®istry_service.default_registry(),
1284 );
1285
1286 let sui_tx_validator_metrics =
1287 SuiTxValidatorMetrics::new(®istry_service.default_registry());
1288
1289 let validator_server_handle = Self::start_grpc_validator_service(
1290 &config,
1291 state.clone(),
1292 consensus_adapter.clone(),
1293 ®istry_service.default_registry(),
1294 )
1295 .await?;
1296
1297 let validator_overload_monitor_handle = if config
1300 .authority_overload_config
1301 .max_load_shedding_percentage
1302 > 0
1303 {
1304 let authority_state = Arc::downgrade(&state);
1305 let overload_config = config.authority_overload_config.clone();
1306 fail_point!("starting_overload_monitor");
1307 Some(spawn_monitored_task!(overload_monitor(
1308 authority_state,
1309 overload_config,
1310 )))
1311 } else {
1312 None
1313 };
1314
1315 Self::start_epoch_specific_validator_components(
1316 &config,
1317 state.clone(),
1318 consensus_adapter,
1319 checkpoint_store,
1320 epoch_store,
1321 state_sync_handle,
1322 randomness_handle,
1323 consensus_manager,
1324 consensus_store_pruner,
1325 global_state_hasher,
1326 backpressure_manager,
1327 validator_server_handle,
1328 validator_overload_monitor_handle,
1329 checkpoint_metrics,
1330 sui_node_metrics,
1331 sui_tx_validator_metrics,
1332 )
1333 .await
1334 }
1335
1336 async fn start_epoch_specific_validator_components(
1337 config: &NodeConfig,
1338 state: Arc<AuthorityState>,
1339 consensus_adapter: Arc<ConsensusAdapter>,
1340 checkpoint_store: Arc<CheckpointStore>,
1341 epoch_store: Arc<AuthorityPerEpochStore>,
1342 state_sync_handle: state_sync::Handle,
1343 randomness_handle: randomness::Handle,
1344 consensus_manager: Arc<ConsensusManager>,
1345 consensus_store_pruner: ConsensusStorePruner,
1346 state_hasher: Weak<GlobalStateHasher>,
1347 backpressure_manager: Arc<BackpressureManager>,
1348 validator_server_handle: SpawnOnce,
1349 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1350 checkpoint_metrics: Arc<CheckpointMetrics>,
1351 sui_node_metrics: Arc<SuiNodeMetrics>,
1352 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1353 ) -> Result<ValidatorComponents> {
1354 let checkpoint_service = Self::build_checkpoint_service(
1355 config,
1356 consensus_adapter.clone(),
1357 checkpoint_store.clone(),
1358 epoch_store.clone(),
1359 state.clone(),
1360 state_sync_handle,
1361 state_hasher,
1362 checkpoint_metrics.clone(),
1363 );
1364
1365 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1369
1370 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1371
1372 if epoch_store.randomness_state_enabled() {
1373 let randomness_manager = RandomnessManager::try_new(
1374 Arc::downgrade(&epoch_store),
1375 Box::new(consensus_adapter.clone()),
1376 randomness_handle,
1377 config.protocol_key_pair(),
1378 )
1379 .await;
1380 if let Some(randomness_manager) = randomness_manager {
1381 epoch_store
1382 .set_randomness_manager(randomness_manager)
1383 .await?;
1384 }
1385 }
1386
1387 ExecutionTimeObserver::spawn(
1388 epoch_store.clone(),
1389 Box::new(consensus_adapter.clone()),
1390 config
1391 .execution_time_observer_config
1392 .clone()
1393 .unwrap_or_default(),
1394 );
1395
1396 let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1397 None,
1398 state.metrics.clone(),
1399 ));
1400
1401 let throughput_profiler = Arc::new(ConsensusThroughputProfiler::new(
1402 throughput_calculator.clone(),
1403 None,
1404 None,
1405 state.metrics.clone(),
1406 ThroughputProfileRanges::from_chain(epoch_store.get_chain_identifier()),
1407 ));
1408
1409 consensus_adapter.swap_throughput_profiler(throughput_profiler);
1410
1411 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1412 state.clone(),
1413 checkpoint_service.clone(),
1414 epoch_store.clone(),
1415 consensus_adapter.clone(),
1416 low_scoring_authorities,
1417 throughput_calculator,
1418 backpressure_manager,
1419 );
1420
1421 info!("Starting consensus manager asynchronously");
1422
1423 tokio::spawn({
1425 let config = config.clone();
1426 let epoch_store = epoch_store.clone();
1427 let sui_tx_validator = SuiTxValidator::new(
1428 state.clone(),
1429 consensus_adapter.clone(),
1430 checkpoint_service.clone(),
1431 sui_tx_validator_metrics.clone(),
1432 );
1433 let consensus_manager = consensus_manager.clone();
1434 async move {
1435 consensus_manager
1436 .start(
1437 &config,
1438 epoch_store,
1439 consensus_handler_initializer,
1440 sui_tx_validator,
1441 )
1442 .await;
1443 }
1444 });
1445 let replay_waiter = consensus_manager.replay_waiter();
1446
1447 info!("Spawning checkpoint service");
1448 let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1449 None
1450 } else {
1451 Some(replay_waiter)
1452 };
1453 checkpoint_service
1454 .spawn(epoch_store.clone(), replay_waiter)
1455 .await;
1456
1457 if epoch_store.authenticator_state_enabled() {
1458 Self::start_jwk_updater(
1459 config,
1460 sui_node_metrics,
1461 state.name,
1462 epoch_store.clone(),
1463 consensus_adapter.clone(),
1464 );
1465 }
1466
1467 Ok(ValidatorComponents {
1468 validator_server_handle,
1469 validator_overload_monitor_handle,
1470 consensus_manager,
1471 consensus_store_pruner,
1472 consensus_adapter,
1473 checkpoint_metrics,
1474 sui_tx_validator_metrics,
1475 })
1476 }
1477
1478 fn build_checkpoint_service(
1479 config: &NodeConfig,
1480 consensus_adapter: Arc<ConsensusAdapter>,
1481 checkpoint_store: Arc<CheckpointStore>,
1482 epoch_store: Arc<AuthorityPerEpochStore>,
1483 state: Arc<AuthorityState>,
1484 state_sync_handle: state_sync::Handle,
1485 state_hasher: Weak<GlobalStateHasher>,
1486 checkpoint_metrics: Arc<CheckpointMetrics>,
1487 ) -> Arc<CheckpointService> {
1488 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1489 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1490
1491 debug!(
1492 "Starting checkpoint service with epoch start timestamp {}
1493 and epoch duration {}",
1494 epoch_start_timestamp_ms, epoch_duration_ms
1495 );
1496
1497 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1498 sender: consensus_adapter,
1499 signer: state.secret.clone(),
1500 authority: config.protocol_public_key(),
1501 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1502 .checked_add(epoch_duration_ms)
1503 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1504 metrics: checkpoint_metrics.clone(),
1505 });
1506
1507 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1508 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1509 let max_checkpoint_size_bytes =
1510 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1511
1512 CheckpointService::build(
1513 state.clone(),
1514 checkpoint_store,
1515 epoch_store,
1516 state.get_transaction_cache_reader().clone(),
1517 state_hasher,
1518 checkpoint_output,
1519 Box::new(certified_checkpoint_output),
1520 checkpoint_metrics,
1521 max_tx_per_checkpoint,
1522 max_checkpoint_size_bytes,
1523 )
1524 }
1525
1526 fn construct_consensus_adapter(
1527 committee: &Committee,
1528 consensus_config: &ConsensusConfig,
1529 authority: AuthorityName,
1530 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1531 prometheus_registry: &Registry,
1532 protocol_config: ProtocolConfig,
1533 consensus_client: Arc<dyn ConsensusClient>,
1534 checkpoint_store: Arc<CheckpointStore>,
1535 ) -> ConsensusAdapter {
1536 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1537 ConsensusAdapter::new(
1540 consensus_client,
1541 checkpoint_store,
1542 authority,
1543 connection_monitor_status,
1544 consensus_config.max_pending_transactions(),
1545 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1546 consensus_config.max_submit_position,
1547 consensus_config.submit_delay_step_override(),
1548 ca_metrics,
1549 protocol_config,
1550 )
1551 }
1552
1553 async fn start_grpc_validator_service(
1554 config: &NodeConfig,
1555 state: Arc<AuthorityState>,
1556 consensus_adapter: Arc<ConsensusAdapter>,
1557 prometheus_registry: &Registry,
1558 ) -> Result<SpawnOnce> {
1559 let validator_service = ValidatorService::new(
1560 state.clone(),
1561 consensus_adapter,
1562 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1563 config.policy_config.clone().map(|p| p.client_id_source),
1564 );
1565
1566 let mut server_conf = mysten_network::config::Config::new();
1567 server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1568 server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1569 server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1570 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1571 server_conf.load_shed = config.grpc_load_shed;
1572 let mut server_builder =
1573 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1574
1575 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1576
1577 let tls_config = sui_tls::create_rustls_server_config(
1578 config.network_key_pair().copy().private(),
1579 SUI_TLS_SERVER_NAME.to_string(),
1580 );
1581
1582 let network_address = config.network_address().clone();
1583
1584 let (ready_tx, ready_rx) = oneshot::channel();
1585
1586 Ok(SpawnOnce::new(ready_rx, async move {
1587 let server = server_builder
1588 .bind(&network_address, Some(tls_config))
1589 .await
1590 .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1591 let local_addr = server.local_addr();
1592 info!("Listening to traffic on {local_addr}");
1593 ready_tx.send(()).unwrap();
1594 if let Err(err) = server.serve().await {
1595 info!("Server stopped: {err}");
1596 }
1597 info!("Server stopped");
1598 }))
1599 }
1600
1601 async fn reexecute_pending_consensus_certs(
1615 epoch_store: &Arc<AuthorityPerEpochStore>,
1616 state: &Arc<AuthorityState>,
1617 ) {
1618 let mut pending_consensus_certificates = Vec::new();
1619 let mut additional_certs = Vec::new();
1620
1621 for tx in epoch_store.get_all_pending_consensus_transactions() {
1622 match tx.kind {
1623 ConsensusTransactionKind::CertifiedTransaction(tx) if !tx.is_consensus_tx() => {
1626 let tx = *tx;
1627 let tx = VerifiedExecutableTransaction::new_from_certificate(
1630 VerifiedCertificate::new_unchecked(tx),
1631 );
1632 if let Some(fx_digest) = epoch_store
1635 .get_signed_effects_digest(tx.digest())
1636 .expect("db error")
1637 {
1638 pending_consensus_certificates.push((
1639 Schedulable::Transaction(tx),
1640 ExecutionEnv::new().with_expected_effects_digest(fx_digest),
1641 ));
1642 } else {
1643 additional_certs.push((
1644 Schedulable::Transaction(tx),
1645 ExecutionEnv::new()
1646 .with_scheduling_source(SchedulingSource::NonFastPath),
1647 ));
1648 }
1649 }
1650 _ => (),
1651 }
1652 }
1653
1654 let digests = pending_consensus_certificates
1655 .iter()
1656 .map(|(tx, _)| *tx.key().unwrap_digest())
1658 .collect::<Vec<_>>();
1659
1660 info!(
1661 "reexecuting {} pending consensus certificates: {:?}",
1662 digests.len(),
1663 digests
1664 );
1665
1666 state
1667 .execution_scheduler()
1668 .enqueue(pending_consensus_certificates, epoch_store);
1669 state
1670 .execution_scheduler()
1671 .enqueue(additional_certs, epoch_store);
1672
1673 let timeout = if cfg!(msim) { 120 } else { 60 };
1678 if tokio::time::timeout(
1679 std::time::Duration::from_secs(timeout),
1680 state
1681 .get_transaction_cache_reader()
1682 .notify_read_executed_effects_digests(
1683 "SuiNode::notify_read_executed_effects_digests",
1684 &digests,
1685 ),
1686 )
1687 .await
1688 .is_err()
1689 {
1690 let executed_effects_digests = state
1692 .get_transaction_cache_reader()
1693 .multi_get_executed_effects_digests(&digests);
1694 let pending_digests = digests
1695 .iter()
1696 .zip(executed_effects_digests.iter())
1697 .filter_map(|(digest, executed_effects_digest)| {
1698 if executed_effects_digest.is_none() {
1699 Some(digest)
1700 } else {
1701 None
1702 }
1703 })
1704 .collect::<Vec<_>>();
1705 debug_fatal!(
1706 "Timed out waiting for effects digests to be executed: {:?}",
1707 pending_digests
1708 );
1709 }
1710 }
1711
1712 pub fn state(&self) -> Arc<AuthorityState> {
1713 self.state.clone()
1714 }
1715
1716 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1718 self.state.reference_gas_price_for_testing()
1719 }
1720
1721 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1722 self.state.committee_store().clone()
1723 }
1724
1725 pub fn clone_authority_aggregator(
1737 &self,
1738 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1739 self.transaction_orchestrator
1740 .as_ref()
1741 .map(|to| to.clone_authority_aggregator())
1742 }
1743
1744 pub fn transaction_orchestrator(
1745 &self,
1746 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1747 self.transaction_orchestrator.clone()
1748 }
1749
1750 pub async fn monitor_reconfiguration(
1753 self: Arc<Self>,
1754 mut epoch_store: Arc<AuthorityPerEpochStore>,
1755 ) -> Result<()> {
1756 let checkpoint_executor_metrics =
1757 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1758
1759 loop {
1760 let mut hasher_guard = self.global_state_hasher.lock().await;
1761 let hasher = hasher_guard.take().unwrap();
1762 info!(
1763 "Creating checkpoint executor for epoch {}",
1764 epoch_store.epoch()
1765 );
1766 let checkpoint_executor = CheckpointExecutor::new(
1767 epoch_store.clone(),
1768 self.checkpoint_store.clone(),
1769 self.state.clone(),
1770 hasher.clone(),
1771 self.backpressure_manager.clone(),
1772 self.config.checkpoint_executor_config.clone(),
1773 checkpoint_executor_metrics.clone(),
1774 self.subscription_service_checkpoint_sender.clone(),
1775 );
1776
1777 let run_with_range = self.config.run_with_range;
1778
1779 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1780
1781 self.metrics
1783 .current_protocol_version
1784 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1785
1786 if let Some(components) = &*self.validator_components.lock().await {
1788 tokio::time::sleep(Duration::from_millis(1)).await;
1790
1791 let config = cur_epoch_store.protocol_config();
1792 let mut supported_protocol_versions = self
1793 .config
1794 .supported_protocol_versions
1795 .expect("Supported versions should be populated")
1796 .truncate_below(config.version);
1798
1799 while supported_protocol_versions.max > config.version {
1800 let proposed_protocol_config = ProtocolConfig::get_for_version(
1801 supported_protocol_versions.max,
1802 cur_epoch_store.get_chain(),
1803 );
1804
1805 if proposed_protocol_config.enable_accumulators()
1806 && !epoch_store.accumulator_root_exists()
1807 {
1808 error!(
1809 "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1810 supported_protocol_versions.max
1811 );
1812 supported_protocol_versions.max = supported_protocol_versions.max.prev();
1813 } else {
1814 break;
1815 }
1816 }
1817
1818 let binary_config = config.binary_config(None);
1819 let transaction = if config.authority_capabilities_v2() {
1820 ConsensusTransaction::new_capability_notification_v2(
1821 AuthorityCapabilitiesV2::new(
1822 self.state.name,
1823 cur_epoch_store.get_chain_identifier().chain(),
1824 supported_protocol_versions,
1825 self.state
1826 .get_available_system_packages(&binary_config)
1827 .await,
1828 ),
1829 )
1830 } else {
1831 ConsensusTransaction::new_capability_notification(AuthorityCapabilitiesV1::new(
1832 self.state.name,
1833 self.config
1834 .supported_protocol_versions
1835 .expect("Supported versions should be populated"),
1836 self.state
1837 .get_available_system_packages(&binary_config)
1838 .await,
1839 ))
1840 };
1841 info!(?transaction, "submitting capabilities to consensus");
1842 components.consensus_adapter.submit(
1843 transaction,
1844 None,
1845 &cur_epoch_store,
1846 None,
1847 None,
1848 )?;
1849 }
1850
1851 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1852
1853 if stop_condition == StopReason::RunWithRangeCondition {
1854 SuiNode::shutdown(&self).await;
1855 self.shutdown_channel_tx
1856 .send(run_with_range)
1857 .expect("RunWithRangeCondition met but failed to send shutdown message");
1858 return Ok(());
1859 }
1860
1861 let latest_system_state = self
1863 .state
1864 .get_object_cache_reader()
1865 .get_sui_system_state_object_unsafe()
1866 .expect("Read Sui System State object cannot fail");
1867
1868 #[cfg(msim)]
1869 if !self
1870 .sim_state
1871 .sim_safe_mode_expected
1872 .load(Ordering::Relaxed)
1873 {
1874 debug_assert!(!latest_system_state.safe_mode());
1875 }
1876
1877 #[cfg(not(msim))]
1878 debug_assert!(!latest_system_state.safe_mode());
1879
1880 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1881 if self.state.is_fullnode(&cur_epoch_store) {
1882 warn!(
1883 "Failed to send end of epoch notification to subscriber: {:?}",
1884 err
1885 );
1886 }
1887 }
1888
1889 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1890 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1891
1892 self.auth_agg.store(Arc::new(
1893 self.auth_agg
1894 .load()
1895 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1896 ));
1897
1898 let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1899 let next_epoch = next_epoch_committee.epoch();
1900 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1901
1902 info!(
1903 next_epoch,
1904 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1905 );
1906
1907 fail_point_async!("reconfig_delay");
1908
1909 let authority_names_to_peer_ids =
1914 new_epoch_start_state.get_authority_names_to_peer_ids();
1915 self.connection_monitor_status
1916 .update_mapping_for_epoch(authority_names_to_peer_ids);
1917
1918 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1919
1920 let _ = send_trusted_peer_change(
1921 &self.config,
1922 &self.trusted_peer_change_tx,
1923 &new_epoch_start_state,
1924 );
1925
1926 let mut validator_components_lock_guard = self.validator_components.lock().await;
1927
1928 let new_epoch_store = self
1932 .reconfigure_state(
1933 &self.state,
1934 &cur_epoch_store,
1935 next_epoch_committee.clone(),
1936 new_epoch_start_state,
1937 hasher.clone(),
1938 )
1939 .await;
1940
1941 let new_validator_components = if let Some(ValidatorComponents {
1942 validator_server_handle,
1943 validator_overload_monitor_handle,
1944 consensus_manager,
1945 consensus_store_pruner,
1946 consensus_adapter,
1947 checkpoint_metrics,
1948 sui_tx_validator_metrics,
1949 }) = validator_components_lock_guard.take()
1950 {
1951 info!("Reconfiguring the validator.");
1952
1953 consensus_manager.shutdown().await;
1954 info!("Consensus has shut down.");
1955
1956 info!("Epoch store finished reconfiguration.");
1957
1958 let global_state_hasher_metrics = Arc::into_inner(hasher)
1961 .expect("Object state hasher should have no other references at this point")
1962 .metrics();
1963 let new_hasher = Arc::new(GlobalStateHasher::new(
1964 self.state.get_global_state_hash_store().clone(),
1965 global_state_hasher_metrics,
1966 ));
1967 let weak_hasher = Arc::downgrade(&new_hasher);
1968 *hasher_guard = Some(new_hasher);
1969
1970 consensus_store_pruner.prune(next_epoch).await;
1971
1972 if self.state.is_validator(&new_epoch_store) {
1973 Some(
1975 Self::start_epoch_specific_validator_components(
1976 &self.config,
1977 self.state.clone(),
1978 consensus_adapter,
1979 self.checkpoint_store.clone(),
1980 new_epoch_store.clone(),
1981 self.state_sync_handle.clone(),
1982 self.randomness_handle.clone(),
1983 consensus_manager,
1984 consensus_store_pruner,
1985 weak_hasher,
1986 self.backpressure_manager.clone(),
1987 validator_server_handle,
1988 validator_overload_monitor_handle,
1989 checkpoint_metrics,
1990 self.metrics.clone(),
1991 sui_tx_validator_metrics,
1992 )
1993 .await?,
1994 )
1995 } else {
1996 info!("This node is no longer a validator after reconfiguration");
1997 None
1998 }
1999 } else {
2000 let global_state_hasher_metrics = Arc::into_inner(hasher)
2003 .expect("Object state hasher should have no other references at this point")
2004 .metrics();
2005 let new_hasher = Arc::new(GlobalStateHasher::new(
2006 self.state.get_global_state_hash_store().clone(),
2007 global_state_hasher_metrics,
2008 ));
2009 let weak_hasher = Arc::downgrade(&new_hasher);
2010 *hasher_guard = Some(new_hasher);
2011
2012 if self.state.is_validator(&new_epoch_store) {
2013 info!("Promoting the node from fullnode to validator, starting grpc server");
2014
2015 let mut components = Self::construct_validator_components(
2016 self.config.clone(),
2017 self.state.clone(),
2018 Arc::new(next_epoch_committee.clone()),
2019 new_epoch_store.clone(),
2020 self.checkpoint_store.clone(),
2021 self.state_sync_handle.clone(),
2022 self.randomness_handle.clone(),
2023 weak_hasher,
2024 self.backpressure_manager.clone(),
2025 self.connection_monitor_status.clone(),
2026 &self.registry_service,
2027 self.metrics.clone(),
2028 self.checkpoint_metrics.clone(),
2029 )
2030 .await?;
2031
2032 components.validator_server_handle =
2033 components.validator_server_handle.start().await;
2034
2035 Some(components)
2036 } else {
2037 None
2038 }
2039 };
2040 *validator_components_lock_guard = new_validator_components;
2041
2042 cur_epoch_store.release_db_handles();
2045
2046 if cfg!(msim)
2047 && !matches!(
2048 self.config
2049 .authority_store_pruning_config
2050 .num_epochs_to_retain_for_checkpoints(),
2051 None | Some(u64::MAX) | Some(0)
2052 )
2053 {
2054 self.state
2055 .prune_checkpoints_for_eligible_epochs_for_testing(
2056 self.config.clone(),
2057 sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2058 )
2059 .await?;
2060 }
2061
2062 epoch_store = new_epoch_store;
2063 info!("Reconfiguration finished");
2064 }
2065 }
2066
2067 async fn shutdown(&self) {
2068 if let Some(validator_components) = &*self.validator_components.lock().await {
2069 validator_components.consensus_manager.shutdown().await;
2070 }
2071 }
2072
2073 async fn reconfigure_state(
2074 &self,
2075 state: &Arc<AuthorityState>,
2076 cur_epoch_store: &AuthorityPerEpochStore,
2077 next_epoch_committee: Committee,
2078 next_epoch_start_system_state: EpochStartSystemState,
2079 global_state_hasher: Arc<GlobalStateHasher>,
2080 ) -> Arc<AuthorityPerEpochStore> {
2081 let next_epoch = next_epoch_committee.epoch();
2082
2083 let last_checkpoint = self
2084 .checkpoint_store
2085 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2086 .expect("Error loading last checkpoint for current epoch")
2087 .expect("Could not load last checkpoint for current epoch");
2088
2089 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2090
2091 assert_eq!(
2092 Some(last_checkpoint_seq),
2093 self.checkpoint_store
2094 .get_highest_executed_checkpoint_seq_number()
2095 .expect("Error loading highest executed checkpoint sequence number")
2096 );
2097
2098 let epoch_start_configuration = EpochStartConfiguration::new(
2099 next_epoch_start_system_state,
2100 *last_checkpoint.digest(),
2101 state.get_object_store().as_ref(),
2102 EpochFlag::default_flags_for_new_epoch(&state.config),
2103 )
2104 .expect("EpochStartConfiguration construction cannot fail");
2105
2106 let new_epoch_store = self
2107 .state
2108 .reconfigure(
2109 cur_epoch_store,
2110 self.config.supported_protocol_versions.unwrap(),
2111 next_epoch_committee,
2112 epoch_start_configuration,
2113 global_state_hasher,
2114 &self.config.expensive_safety_check_config,
2115 last_checkpoint_seq,
2116 )
2117 .await
2118 .expect("Reconfigure authority state cannot fail");
2119 info!(next_epoch, "Node State has been reconfigured");
2120 assert_eq!(next_epoch, new_epoch_store.epoch());
2121 self.state.get_reconfig_api().update_epoch_flags_metrics(
2122 cur_epoch_store.epoch_start_config().flags(),
2123 new_epoch_store.epoch_start_config().flags(),
2124 );
2125
2126 new_epoch_store
2127 }
2128
2129 pub fn get_config(&self) -> &NodeConfig {
2130 &self.config
2131 }
2132
2133 pub fn randomness_handle(&self) -> randomness::Handle {
2134 self.randomness_handle.clone()
2135 }
2136
2137 fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2139 let digest_str = digest.to_string();
2140 if digest_str.len() >= 8 {
2141 digest_str[0..8].to_string()
2142 } else {
2143 digest_str
2144 }
2145 }
2146
2147 async fn check_and_recover_forks(
2151 checkpoint_store: &CheckpointStore,
2152 checkpoint_metrics: &CheckpointMetrics,
2153 is_validator: bool,
2154 fork_recovery: Option<&ForkRecoveryConfig>,
2155 ) -> Result<()> {
2156 if !is_validator {
2159 return Ok(());
2160 }
2161
2162 if let Some(recovery) = fork_recovery {
2164 Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2165 Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2166 }
2167
2168 if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2169 .get_checkpoint_fork_detected()
2170 .map_err(|e| {
2171 error!("Failed to check for checkpoint fork: {:?}", e);
2172 e
2173 })?
2174 {
2175 Self::handle_checkpoint_fork(
2176 checkpoint_seq,
2177 checkpoint_digest,
2178 checkpoint_metrics,
2179 fork_recovery,
2180 )
2181 .await?;
2182 }
2183 if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2184 .get_transaction_fork_detected()
2185 .map_err(|e| {
2186 error!("Failed to check for transaction fork: {:?}", e);
2187 e
2188 })?
2189 {
2190 Self::handle_transaction_fork(
2191 tx_digest,
2192 expected_effects,
2193 actual_effects,
2194 checkpoint_metrics,
2195 fork_recovery,
2196 )
2197 .await?;
2198 }
2199
2200 Ok(())
2201 }
2202
2203 fn try_recover_checkpoint_fork(
2204 checkpoint_store: &CheckpointStore,
2205 recovery: &ForkRecoveryConfig,
2206 ) -> Result<()> {
2207 for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2210 let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2211 anyhow::bail!(
2212 "Invalid checkpoint digest override for seq {}: {}",
2213 seq,
2214 expected_digest_str
2215 );
2216 };
2217
2218 if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2219 let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2220 if local_digest != expected_digest {
2221 info!(
2222 seq,
2223 local = %Self::get_digest_prefix(local_digest),
2224 expected = %Self::get_digest_prefix(expected_digest),
2225 "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2226 seq
2227 );
2228 checkpoint_store
2229 .clear_locally_computed_checkpoints_from(*seq)
2230 .context(
2231 "Failed to clear locally computed checkpoints from override seq",
2232 )?;
2233 }
2234 }
2235 }
2236
2237 if let Some((checkpoint_seq, checkpoint_digest)) =
2238 checkpoint_store.get_checkpoint_fork_detected()?
2239 {
2240 if recovery.checkpoint_overrides.contains_key(&checkpoint_seq) {
2241 info!(
2242 "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2243 checkpoint_seq, checkpoint_digest
2244 );
2245 checkpoint_store
2246 .clear_checkpoint_fork_detected()
2247 .expect("Failed to clear checkpoint fork detected marker");
2248 }
2249 }
2250 Ok(())
2251 }
2252
2253 fn try_recover_transaction_fork(
2254 checkpoint_store: &CheckpointStore,
2255 recovery: &ForkRecoveryConfig,
2256 ) -> Result<()> {
2257 if recovery.transaction_overrides.is_empty() {
2258 return Ok(());
2259 }
2260
2261 if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()? {
2262 if recovery
2263 .transaction_overrides
2264 .contains_key(&tx_digest.to_string())
2265 {
2266 info!(
2267 "Fork recovery enabled: clearing transaction fork for tx {:?}",
2268 tx_digest
2269 );
2270 checkpoint_store
2271 .clear_transaction_fork_detected()
2272 .expect("Failed to clear transaction fork detected marker");
2273 }
2274 }
2275 Ok(())
2276 }
2277
2278 fn get_current_timestamp() -> u64 {
2279 std::time::SystemTime::now()
2280 .duration_since(std::time::SystemTime::UNIX_EPOCH)
2281 .unwrap()
2282 .as_secs()
2283 }
2284
2285 async fn handle_checkpoint_fork(
2286 checkpoint_seq: u64,
2287 checkpoint_digest: CheckpointDigest,
2288 checkpoint_metrics: &CheckpointMetrics,
2289 fork_recovery: Option<&ForkRecoveryConfig>,
2290 ) -> Result<()> {
2291 checkpoint_metrics
2292 .checkpoint_fork_crash_mode
2293 .with_label_values(&[
2294 &checkpoint_seq.to_string(),
2295 &Self::get_digest_prefix(checkpoint_digest),
2296 &Self::get_current_timestamp().to_string(),
2297 ])
2298 .set(1);
2299
2300 let behavior = fork_recovery
2301 .map(|fr| fr.fork_crash_behavior)
2302 .unwrap_or_default();
2303
2304 match behavior {
2305 ForkCrashBehavior::AwaitForkRecovery => {
2306 error!(
2307 checkpoint_seq = checkpoint_seq,
2308 checkpoint_digest = ?checkpoint_digest,
2309 "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2310 );
2311 futures::future::pending::<()>().await;
2312 unreachable!("pending() should never return");
2313 }
2314 ForkCrashBehavior::ReturnError => {
2315 error!(
2316 checkpoint_seq = checkpoint_seq,
2317 checkpoint_digest = ?checkpoint_digest,
2318 "Checkpoint fork detected! Returning error."
2319 );
2320 Err(anyhow::anyhow!(
2321 "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2322 checkpoint_seq,
2323 checkpoint_digest
2324 ))
2325 }
2326 }
2327 }
2328
2329 async fn handle_transaction_fork(
2330 tx_digest: TransactionDigest,
2331 expected_effects_digest: TransactionEffectsDigest,
2332 actual_effects_digest: TransactionEffectsDigest,
2333 checkpoint_metrics: &CheckpointMetrics,
2334 fork_recovery: Option<&ForkRecoveryConfig>,
2335 ) -> Result<()> {
2336 checkpoint_metrics
2337 .transaction_fork_crash_mode
2338 .with_label_values(&[
2339 &Self::get_digest_prefix(tx_digest),
2340 &Self::get_digest_prefix(expected_effects_digest),
2341 &Self::get_digest_prefix(actual_effects_digest),
2342 &Self::get_current_timestamp().to_string(),
2343 ])
2344 .set(1);
2345
2346 let behavior = fork_recovery
2347 .map(|fr| fr.fork_crash_behavior)
2348 .unwrap_or_default();
2349
2350 match behavior {
2351 ForkCrashBehavior::AwaitForkRecovery => {
2352 error!(
2353 tx_digest = ?tx_digest,
2354 expected_effects_digest = ?expected_effects_digest,
2355 actual_effects_digest = ?actual_effects_digest,
2356 "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2357 );
2358 futures::future::pending::<()>().await;
2359 unreachable!("pending() should never return");
2360 }
2361 ForkCrashBehavior::ReturnError => {
2362 error!(
2363 tx_digest = ?tx_digest,
2364 expected_effects_digest = ?expected_effects_digest,
2365 actual_effects_digest = ?actual_effects_digest,
2366 "Transaction fork detected! Returning error."
2367 );
2368 Err(anyhow::anyhow!(
2369 "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2370 tx_digest, expected_effects_digest, actual_effects_digest
2371 ))
2372 }
2373 }
2374 }
2375}
2376
2377#[cfg(not(msim))]
2378impl SuiNode {
2379 async fn fetch_jwks(
2380 _authority: AuthorityName,
2381 provider: &OIDCProvider,
2382 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2383 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2384 use sui_types::error::SuiErrorKind;
2385 let client = reqwest::Client::new();
2386 fetch_jwks(provider, &client, true)
2387 .await
2388 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2389 }
2390}
2391
2392#[cfg(msim)]
2393impl SuiNode {
2394 pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2395 self.sim_state.sim_node.id()
2396 }
2397
2398 pub fn set_safe_mode_expected(&self, new_value: bool) {
2399 info!("Setting safe mode expected to {}", new_value);
2400 self.sim_state
2401 .sim_safe_mode_expected
2402 .store(new_value, Ordering::Relaxed);
2403 }
2404
2405 #[allow(unused_variables)]
2406 async fn fetch_jwks(
2407 authority: AuthorityName,
2408 provider: &OIDCProvider,
2409 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2410 get_jwk_injector()(authority, provider)
2411 }
2412}
2413
2414enum SpawnOnce {
2415 Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2417 #[allow(unused)]
2418 Started(JoinHandle<()>),
2419}
2420
2421impl SpawnOnce {
2422 pub fn new(
2423 ready_rx: oneshot::Receiver<()>,
2424 future: impl Future<Output = ()> + Send + 'static,
2425 ) -> Self {
2426 Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2427 }
2428
2429 pub async fn start(self) -> Self {
2430 match self {
2431 Self::Unstarted(ready_rx, future) => {
2432 let future = future.into_inner();
2433 let handle = tokio::spawn(future);
2434 ready_rx.await.unwrap();
2435 Self::Started(handle)
2436 }
2437 Self::Started(_) => self,
2438 }
2439 }
2440}
2441
2442fn send_trusted_peer_change(
2444 config: &NodeConfig,
2445 sender: &watch::Sender<TrustedPeerChangeEvent>,
2446 epoch_state_state: &EpochStartSystemState,
2447) -> Result<(), watch::error::SendError<TrustedPeerChangeEvent>> {
2448 sender
2449 .send(TrustedPeerChangeEvent {
2450 new_peers: epoch_state_state.get_validator_as_p2p_peers(config.protocol_public_key()),
2451 })
2452 .tap_err(|err| {
2453 warn!(
2454 "Failed to send validator peer information to state sync: {:?}",
2455 err
2456 );
2457 })
2458}
2459
2460fn build_kv_store(
2461 state: &Arc<AuthorityState>,
2462 config: &NodeConfig,
2463 registry: &Registry,
2464) -> Result<Arc<TransactionKeyValueStore>> {
2465 let metrics = KeyValueStoreMetrics::new(registry);
2466 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2467
2468 let base_url = &config.transaction_kv_store_read_config.base_url;
2469
2470 if base_url.is_empty() {
2471 info!("no http kv store url provided, using local db only");
2472 return Ok(Arc::new(db_store));
2473 }
2474
2475 let base_url: url::Url = base_url.parse().tap_err(|e| {
2476 error!(
2477 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2478 base_url, e
2479 )
2480 })?;
2481
2482 let network_str = match state.get_chain_identifier().chain() {
2483 Chain::Mainnet => "/mainnet",
2484 _ => {
2485 info!("using local db only for kv store");
2486 return Ok(Arc::new(db_store));
2487 }
2488 };
2489
2490 let base_url = base_url.join(network_str)?.to_string();
2491 let http_store = HttpKVStore::new_kv(
2492 &base_url,
2493 config.transaction_kv_store_read_config.cache_size,
2494 metrics.clone(),
2495 )?;
2496 info!("using local key-value store with fallback to http key-value store");
2497 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2498 db_store,
2499 http_store,
2500 metrics,
2501 "json_rpc_fallback",
2502 )))
2503}
2504
2505async fn build_http_servers(
2506 state: Arc<AuthorityState>,
2507 store: RocksDbStore,
2508 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2509 config: &NodeConfig,
2510 prometheus_registry: &Registry,
2511 server_version: ServerVersion,
2512) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2513 if config.consensus_config().is_some() {
2515 return Ok((HttpServers::default(), None));
2516 }
2517
2518 let mut router = axum::Router::new();
2519
2520 let json_rpc_router = {
2521 let traffic_controller = state.traffic_controller.clone();
2522 let mut server = JsonRpcServerBuilder::new(
2523 env!("CARGO_PKG_VERSION"),
2524 prometheus_registry,
2525 traffic_controller,
2526 config.policy_config.clone(),
2527 );
2528
2529 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2530
2531 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2532 server.register_module(ReadApi::new(
2533 state.clone(),
2534 kv_store.clone(),
2535 metrics.clone(),
2536 ))?;
2537 server.register_module(CoinReadApi::new(
2538 state.clone(),
2539 kv_store.clone(),
2540 metrics.clone(),
2541 ))?;
2542
2543 if config.run_with_range.is_none() {
2546 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2547 }
2548 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2549 server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2550
2551 if let Some(transaction_orchestrator) = transaction_orchestrator {
2552 server.register_module(TransactionExecutionApi::new(
2553 state.clone(),
2554 transaction_orchestrator.clone(),
2555 metrics.clone(),
2556 ))?;
2557 }
2558
2559 let name_service_config =
2560 if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2561 config.name_service_package_address,
2562 config.name_service_registry_id,
2563 config.name_service_reverse_registry_id,
2564 ) {
2565 sui_name_service::NameServiceConfig::new(
2566 package_address,
2567 registry_id,
2568 reverse_registry_id,
2569 )
2570 } else {
2571 match state.get_chain_identifier().chain() {
2572 Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2573 Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2574 Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2575 }
2576 };
2577
2578 server.register_module(IndexerApi::new(
2579 state.clone(),
2580 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2581 kv_store,
2582 name_service_config,
2583 metrics,
2584 config.indexer_max_subscriptions,
2585 ))?;
2586 server.register_module(MoveUtils::new(state.clone()))?;
2587
2588 let server_type = config.jsonrpc_server_type();
2589
2590 server.to_router(server_type).await?
2591 };
2592
2593 router = router.merge(json_rpc_router);
2594
2595 let (subscription_service_checkpoint_sender, subscription_service_handle) =
2596 SubscriptionService::build(prometheus_registry);
2597 let rpc_router = {
2598 let mut rpc_service =
2599 sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2600 rpc_service.with_server_version(server_version);
2601
2602 if let Some(config) = config.rpc.clone() {
2603 rpc_service.with_config(config);
2604 }
2605
2606 rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2607 rpc_service.with_subscription_service(subscription_service_handle);
2608
2609 if let Some(transaction_orchestrator) = transaction_orchestrator {
2610 rpc_service.with_executor(transaction_orchestrator.clone())
2611 }
2612
2613 rpc_service.into_router().await
2614 };
2615
2616 let layers = ServiceBuilder::new()
2617 .map_request(|mut request: axum::http::Request<_>| {
2618 if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2619 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2620 request.extensions_mut().insert(axum_connect_info);
2621 }
2622 request
2623 })
2624 .layer(axum::middleware::from_fn(server_timing_middleware))
2625 .layer(
2627 tower_http::cors::CorsLayer::new()
2628 .allow_methods([http::Method::GET, http::Method::POST])
2629 .allow_origin(tower_http::cors::Any)
2630 .allow_headers(tower_http::cors::Any),
2631 );
2632
2633 router = router.merge(rpc_router).layer(layers);
2634
2635 let https = if let Some((tls_config, https_address)) = config
2636 .rpc()
2637 .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2638 {
2639 let https = sui_http::Builder::new()
2640 .tls_single_cert(tls_config.cert(), tls_config.key())
2641 .and_then(|builder| builder.serve(https_address, router.clone()))
2642 .map_err(|e| anyhow::anyhow!(e))?;
2643
2644 info!(
2645 https_address =? https.local_addr(),
2646 "HTTPS rpc server listening on {}",
2647 https.local_addr()
2648 );
2649
2650 Some(https)
2651 } else {
2652 None
2653 };
2654
2655 let http = sui_http::Builder::new()
2656 .serve(&config.json_rpc_address, router)
2657 .map_err(|e| anyhow::anyhow!(e))?;
2658
2659 info!(
2660 http_address =? http.local_addr(),
2661 "HTTP rpc server listening on {}",
2662 http.local_addr()
2663 );
2664
2665 Ok((
2666 HttpServers {
2667 http: Some(http),
2668 https,
2669 },
2670 Some(subscription_service_checkpoint_sender),
2671 ))
2672}
2673
2674#[cfg(not(test))]
2675fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2676 protocol_config.max_transactions_per_checkpoint() as usize
2677}
2678
2679#[cfg(test)]
2680fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2681 2
2682}
2683
2684#[derive(Default)]
2685struct HttpServers {
2686 #[allow(unused)]
2687 http: Option<sui_http::ServerHandle>,
2688 #[allow(unused)]
2689 https: Option<sui_http::ServerHandle>,
2690}
2691
2692#[cfg(test)]
2693mod tests {
2694 use super::*;
2695 use prometheus::Registry;
2696 use std::collections::BTreeMap;
2697 use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2698 use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2699 use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2700
2701 #[tokio::test]
2702 async fn test_fork_error_and_recovery_both_paths() {
2703 let checkpoint_store = CheckpointStore::new_for_tests();
2704 let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2705
2706 let seq_num = 42;
2708 let digest = CheckpointDigest::random();
2709 checkpoint_store
2710 .record_checkpoint_fork_detected(seq_num, digest)
2711 .unwrap();
2712
2713 let fork_recovery = ForkRecoveryConfig {
2714 transaction_overrides: Default::default(),
2715 checkpoint_overrides: Default::default(),
2716 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2717 };
2718
2719 let r = SuiNode::check_and_recover_forks(
2720 &checkpoint_store,
2721 &checkpoint_metrics,
2722 true,
2723 Some(&fork_recovery),
2724 )
2725 .await;
2726 assert!(r.is_err());
2727 assert!(r
2728 .unwrap_err()
2729 .to_string()
2730 .contains("Checkpoint fork detected"));
2731
2732 let mut checkpoint_overrides = BTreeMap::new();
2733 checkpoint_overrides.insert(seq_num, digest.to_string());
2734 let fork_recovery_with_override = ForkRecoveryConfig {
2735 transaction_overrides: Default::default(),
2736 checkpoint_overrides,
2737 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2738 };
2739 let r = SuiNode::check_and_recover_forks(
2740 &checkpoint_store,
2741 &checkpoint_metrics,
2742 true,
2743 Some(&fork_recovery_with_override),
2744 )
2745 .await;
2746 assert!(r.is_ok());
2747 assert!(checkpoint_store
2748 .get_checkpoint_fork_detected()
2749 .unwrap()
2750 .is_none());
2751
2752 let tx_digest = TransactionDigest::random();
2754 let expected_effects = TransactionEffectsDigest::random();
2755 let actual_effects = TransactionEffectsDigest::random();
2756 checkpoint_store
2757 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2758 .unwrap();
2759
2760 let fork_recovery = ForkRecoveryConfig {
2761 transaction_overrides: Default::default(),
2762 checkpoint_overrides: Default::default(),
2763 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2764 };
2765 let r = SuiNode::check_and_recover_forks(
2766 &checkpoint_store,
2767 &checkpoint_metrics,
2768 true,
2769 Some(&fork_recovery),
2770 )
2771 .await;
2772 assert!(r.is_err());
2773 assert!(r
2774 .unwrap_err()
2775 .to_string()
2776 .contains("Transaction fork detected"));
2777
2778 let mut transaction_overrides = BTreeMap::new();
2779 transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2780 let fork_recovery_with_override = ForkRecoveryConfig {
2781 transaction_overrides,
2782 checkpoint_overrides: Default::default(),
2783 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2784 };
2785 let r = SuiNode::check_and_recover_forks(
2786 &checkpoint_store,
2787 &checkpoint_metrics,
2788 true,
2789 Some(&fork_recovery_with_override),
2790 )
2791 .await;
2792 assert!(r.is_ok());
2793 assert!(checkpoint_store
2794 .get_transaction_fork_detected()
2795 .unwrap()
2796 .is_none());
2797 }
2798}