1use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::debug_fatal;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::authority::ExecutionEnv;
29use sui_core::authority::RandomnessRoundReceiver;
30use sui_core::authority::authority_store_tables::{
31 AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
32};
33use sui_core::authority::backpressure::BackpressureManager;
34use sui_core::authority::epoch_start_configuration::EpochFlag;
35use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
36use sui_core::authority::shared_object_version_manager::Schedulable;
37use sui_core::consensus_adapter::ConsensusClient;
38use sui_core::consensus_manager::UpdatableConsensusClient;
39use sui_core::epoch::randomness::RandomnessManager;
40use sui_core::execution_cache::build_execution_cache;
41use sui_network::endpoint_manager::EndpointId;
42use sui_network::validator::server::SUI_TLS_SERVER_NAME;
43use sui_types::full_checkpoint_content::Checkpoint;
44
45use sui_core::execution_scheduler::SchedulingSource;
46use sui_core::global_state_hasher::GlobalStateHashMetrics;
47use sui_core::storage::RestReadStore;
48use sui_json_rpc::bridge_api::BridgeReadApi;
49use sui_json_rpc_api::JsonRpcMetrics;
50use sui_network::randomness;
51use sui_rpc_api::RpcMetrics;
52use sui_rpc_api::ServerVersion;
53use sui_rpc_api::subscription::SubscriptionService;
54use sui_types::base_types::ConciseableName;
55use sui_types::crypto::RandomnessRound;
56use sui_types::digests::{
57 ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
58};
59use sui_types::executable_transaction::VerifiedExecutableTransaction;
60use sui_types::messages_consensus::AuthorityCapabilitiesV2;
61use sui_types::messages_consensus::ConsensusTransactionKind;
62use sui_types::sui_system_state::SuiSystemState;
63use sui_types::transaction::VerifiedCertificate;
64use tap::tap::TapFallible;
65use tokio::sync::oneshot;
66use tokio::sync::{Mutex, broadcast, mpsc};
67use tokio::task::JoinHandle;
68use tower::ServiceBuilder;
69use tracing::{Instrument, error_span, info};
70use tracing::{debug, error, warn};
71
72use fastcrypto_zkp::bn254::zk_login::JWK;
73pub use handle::SuiNodeHandle;
74use mysten_metrics::{RegistryService, spawn_monitored_task};
75use mysten_service::server_timing::server_timing_middleware;
76use sui_config::node::{DBCheckpointConfig, RunWithRange};
77use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
78use sui_config::node_config_metrics::NodeConfigMetrics;
79use sui_config::{ConsensusConfig, NodeConfig};
80use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
81use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
82use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
83use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
84use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
85use sui_core::authority_aggregator::AuthorityAggregator;
86use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
87use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
88use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
89use sui_core::checkpoints::{
90 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
91 SubmitCheckpointToConsensus,
92};
93use sui_core::consensus_adapter::{
94 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
95};
96use sui_core::consensus_manager::ConsensusManager;
97use sui_core::consensus_throughput_calculator::{
98 ConsensusThroughputCalculator, ConsensusThroughputProfiler, ThroughputProfileRanges,
99};
100use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
101use sui_core::db_checkpoint_handler::DBCheckpointHandler;
102use sui_core::epoch::committee_store::CommitteeStore;
103use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
104use sui_core::epoch::epoch_metrics::EpochMetrics;
105use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
106use sui_core::global_state_hasher::GlobalStateHasher;
107use sui_core::jsonrpc_index::IndexStore;
108use sui_core::module_cache_metrics::ResolverMetrics;
109use sui_core::overload_monitor::overload_monitor;
110use sui_core::rpc_index::RpcIndexStore;
111use sui_core::signature_verifier::SignatureVerifierMetrics;
112use sui_core::storage::RocksDbStore;
113use sui_core::transaction_orchestrator::TransactionOrchestrator;
114use sui_core::{
115 authority::{AuthorityState, AuthorityStore},
116 authority_client::NetworkAuthorityClient,
117};
118use sui_json_rpc::JsonRpcServerBuilder;
119use sui_json_rpc::coin_api::CoinReadApi;
120use sui_json_rpc::governance_api::GovernanceReadApi;
121use sui_json_rpc::indexer_api::IndexerApi;
122use sui_json_rpc::move_utils::MoveUtils;
123use sui_json_rpc::read_api::ReadApi;
124use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
125use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
126use sui_macros::fail_point;
127use sui_macros::{fail_point_async, replay_log};
128use sui_network::api::ValidatorServer;
129use sui_network::discovery;
130use sui_network::endpoint_manager::EndpointManager;
131use sui_network::state_sync;
132use sui_network::validator::server::ServerBuilder;
133use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
134use sui_snapshot::uploader::StateSnapshotUploader;
135use sui_storage::{
136 http_key_value_store::HttpKVStore,
137 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
138 key_value_store_metrics::KeyValueStoreMetrics,
139};
140use sui_types::base_types::{AuthorityName, EpochId};
141use sui_types::committee::Committee;
142use sui_types::crypto::KeypairTraits;
143use sui_types::error::{SuiError, SuiResult};
144use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
145use sui_types::sui_system_state::SuiSystemStateTrait;
146use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
147use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
148use sui_types::supported_protocol_versions::SupportedProtocolVersions;
149use typed_store::DBMetrics;
150use typed_store::rocks::default_db_options;
151
152use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
153
154pub mod admin;
155mod handle;
156pub mod metrics;
157
158pub struct ValidatorComponents {
159 validator_server_handle: SpawnOnce,
160 validator_overload_monitor_handle: Option<JoinHandle<()>>,
161 consensus_manager: Arc<ConsensusManager>,
162 consensus_store_pruner: ConsensusStorePruner,
163 consensus_adapter: Arc<ConsensusAdapter>,
164 checkpoint_metrics: Arc<CheckpointMetrics>,
165 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
166}
167pub struct P2pComponents {
168 p2p_network: Network,
169 known_peers: HashMap<PeerId, String>,
170 discovery_handle: discovery::Handle,
171 state_sync_handle: state_sync::Handle,
172 randomness_handle: randomness::Handle,
173}
174
175#[cfg(msim)]
176mod simulator {
177 use std::sync::atomic::AtomicBool;
178 use sui_types::error::SuiErrorKind;
179
180 use super::*;
181 pub(super) struct SimState {
182 pub sim_node: sui_simulator::runtime::NodeHandle,
183 pub sim_safe_mode_expected: AtomicBool,
184 _leak_detector: sui_simulator::NodeLeakDetector,
185 }
186
187 impl Default for SimState {
188 fn default() -> Self {
189 Self {
190 sim_node: sui_simulator::runtime::NodeHandle::current(),
191 sim_safe_mode_expected: AtomicBool::new(false),
192 _leak_detector: sui_simulator::NodeLeakDetector::new(),
193 }
194 }
195 }
196
197 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
198 + Send
199 + Sync
200 + 'static;
201
202 fn default_fetch_jwks(
203 _authority: AuthorityName,
204 _provider: &OIDCProvider,
205 ) -> SuiResult<Vec<(JwkId, JWK)>> {
206 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
207 parse_jwks(
209 sui_types::zk_login_util::DEFAULT_JWK_BYTES,
210 &OIDCProvider::Twitch,
211 true,
212 )
213 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
214 }
215
216 thread_local! {
217 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
218 }
219
220 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
221 JWK_INJECTOR.with(|injector| injector.borrow().clone())
222 }
223
224 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
225 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
226 }
227}
228
229#[cfg(msim)]
230pub use simulator::set_jwk_injector;
231#[cfg(msim)]
232use simulator::*;
233use sui_core::authority::authority_store_pruner::{ObjectsCompactionFilter, PrunerWatermarks};
234use sui_core::{
235 consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
236};
237
238const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
239
240pub struct SuiNode {
241 config: NodeConfig,
242 validator_components: Mutex<Option<ValidatorComponents>>,
243
244 #[allow(unused)]
246 http_servers: HttpServers,
247
248 state: Arc<AuthorityState>,
249 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
250 registry_service: RegistryService,
251 metrics: Arc<SuiNodeMetrics>,
252 checkpoint_metrics: Arc<CheckpointMetrics>,
253
254 _discovery: discovery::Handle,
255 _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
256 state_sync_handle: state_sync::Handle,
257 randomness_handle: randomness::Handle,
258 checkpoint_store: Arc<CheckpointStore>,
259 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
260 connection_monitor_status: Arc<ConnectionMonitorStatus>,
261
262 end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
264
265 endpoint_manager: EndpointManager,
267
268 backpressure_manager: Arc<BackpressureManager>,
269
270 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
271
272 #[cfg(msim)]
273 sim_state: SimState,
274
275 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
276 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
278
279 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
284
285 subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
286}
287
288impl fmt::Debug for SuiNode {
289 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
290 f.debug_struct("SuiNode")
291 .field("name", &self.state.name.concise())
292 .finish()
293 }
294}
295
296static MAX_JWK_KEYS_PER_FETCH: usize = 100;
297
298impl SuiNode {
299 pub async fn start(
300 config: NodeConfig,
301 registry_service: RegistryService,
302 ) -> Result<Arc<SuiNode>> {
303 Self::start_async(
304 config,
305 registry_service,
306 ServerVersion::new("sui-node", "unknown"),
307 )
308 .await
309 }
310
311 fn start_jwk_updater(
312 config: &NodeConfig,
313 metrics: Arc<SuiNodeMetrics>,
314 authority: AuthorityName,
315 epoch_store: Arc<AuthorityPerEpochStore>,
316 consensus_adapter: Arc<ConsensusAdapter>,
317 ) {
318 let epoch = epoch_store.epoch();
319
320 let supported_providers = config
321 .zklogin_oauth_providers
322 .get(&epoch_store.get_chain_identifier().chain())
323 .unwrap_or(&BTreeSet::new())
324 .iter()
325 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
326 .collect::<Vec<_>>();
327
328 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
329
330 info!(
331 ?fetch_interval,
332 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
333 );
334
335 fn validate_jwk(
336 metrics: &Arc<SuiNodeMetrics>,
337 provider: &OIDCProvider,
338 id: &JwkId,
339 jwk: &JWK,
340 ) -> bool {
341 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
342 warn!(
343 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
344 id.iss, provider
345 );
346 metrics
347 .invalid_jwks
348 .with_label_values(&[&provider.to_string()])
349 .inc();
350 return false;
351 };
352
353 if iss_provider != *provider {
354 warn!(
355 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
356 id.iss, provider, iss_provider
357 );
358 metrics
359 .invalid_jwks
360 .with_label_values(&[&provider.to_string()])
361 .inc();
362 return false;
363 }
364
365 if !check_total_jwk_size(id, jwk) {
366 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
367 metrics
368 .invalid_jwks
369 .with_label_values(&[&provider.to_string()])
370 .inc();
371 return false;
372 }
373
374 true
375 }
376
377 for p in supported_providers.into_iter() {
386 let provider_str = p.to_string();
387 let epoch_store = epoch_store.clone();
388 let consensus_adapter = consensus_adapter.clone();
389 let metrics = metrics.clone();
390 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
391 async move {
392 let mut seen = HashSet::new();
395 loop {
396 info!("fetching JWK for provider {:?}", p);
397 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
398 match Self::fetch_jwks(authority, &p).await {
399 Err(e) => {
400 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
401 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
402 tokio::time::sleep(Duration::from_secs(30)).await;
404 continue;
405 }
406 Ok(mut keys) => {
407 metrics.total_jwks
408 .with_label_values(&[&provider_str])
409 .inc_by(keys.len() as u64);
410
411 keys.retain(|(id, jwk)| {
412 validate_jwk(&metrics, &p, id, jwk) &&
413 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
414 seen.insert((id.clone(), jwk.clone()))
415 });
416
417 metrics.unique_jwks
418 .with_label_values(&[&provider_str])
419 .inc_by(keys.len() as u64);
420
421 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
424 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
425 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
426 }
427
428 for (id, jwk) in keys.into_iter() {
429 info!("Submitting JWK to consensus: {:?}", id);
430
431 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
432 consensus_adapter.submit(txn, None, &epoch_store, None, None)
433 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
434 .ok();
435 }
436 }
437 }
438 tokio::time::sleep(fetch_interval).await;
439 }
440 }
441 .instrument(error_span!("jwk_updater_task", epoch)),
442 ));
443 }
444 }
445
446 pub async fn start_async(
447 config: NodeConfig,
448 registry_service: RegistryService,
449 server_version: ServerVersion,
450 ) -> Result<Arc<SuiNode>> {
451 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
452 let mut config = config.clone();
453 if config.supported_protocol_versions.is_none() {
454 info!(
455 "populating config.supported_protocol_versions with default {:?}",
456 SupportedProtocolVersions::SYSTEM_DEFAULT
457 );
458 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
459 }
460
461 let run_with_range = config.run_with_range;
462 let is_validator = config.consensus_config().is_some();
463 let is_full_node = !is_validator;
464 let prometheus_registry = registry_service.default_registry();
465
466 info!(node =? config.protocol_public_key(),
467 "Initializing sui-node listening on {}", config.network_address
468 );
469
470 DBMetrics::init(registry_service.clone());
472
473 typed_store::init_write_sync(config.enable_db_sync_to_disk);
475
476 mysten_metrics::init_metrics(&prometheus_registry);
478 #[cfg(not(msim))]
480 mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
481
482 let genesis = config.genesis()?.clone();
483
484 let secret = Arc::pin(config.protocol_key_pair().copy());
485 let genesis_committee = genesis.committee()?;
486 let committee_store = Arc::new(CommitteeStore::new(
487 config.db_path().join("epochs"),
488 &genesis_committee,
489 None,
490 ));
491
492 let pruner_watermarks = Arc::new(PrunerWatermarks::default());
493 let checkpoint_store = CheckpointStore::new(
494 &config.db_path().join("checkpoints"),
495 pruner_watermarks.clone(),
496 );
497 let checkpoint_metrics = CheckpointMetrics::new(®istry_service.default_registry());
498
499 Self::check_and_recover_forks(
500 &checkpoint_store,
501 &checkpoint_metrics,
502 is_validator,
503 config.fork_recovery.as_ref(),
504 )
505 .await?;
506
507 let mut pruner_db = None;
508 if config
509 .authority_store_pruning_config
510 .enable_compaction_filter
511 {
512 pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
513 &config.db_path().join("store"),
514 )));
515 }
516 let compaction_filter = pruner_db
517 .clone()
518 .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
519
520 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
522 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
523 enable_write_stall,
524 compaction_filter,
525 };
526 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
527 &config.db_path().join("store"),
528 Some(perpetual_tables_options),
529 Some(pruner_watermarks.epoch_id.clone()),
530 ));
531 let is_genesis = perpetual_tables
532 .database_is_empty()
533 .expect("Database read should not fail at init.");
534
535 let backpressure_manager =
536 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
537
538 let store =
539 AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
540
541 let cur_epoch = store.get_recovery_epoch_at_restart()?;
542 let committee = committee_store
543 .get_committee(&cur_epoch)?
544 .expect("Committee of the current epoch must exist");
545 let epoch_start_configuration = store
546 .get_epoch_start_configuration()?
547 .expect("EpochStartConfiguration of the current epoch must exist");
548 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
549 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
550
551 let cache_traits = build_execution_cache(
552 &config.execution_cache,
553 &prometheus_registry,
554 &store,
555 backpressure_manager.clone(),
556 );
557
558 let auth_agg = {
559 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
560 Arc::new(ArcSwap::new(Arc::new(
561 AuthorityAggregator::new_from_epoch_start_state(
562 epoch_start_configuration.epoch_start_state(),
563 &committee_store,
564 safe_client_metrics_base,
565 ),
566 )))
567 };
568
569 let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
570 let chain = match config.chain_override_for_testing {
571 Some(chain) => chain,
572 None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
573 };
574
575 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
576 let epoch_store = AuthorityPerEpochStore::new(
577 config.protocol_public_key(),
578 committee.clone(),
579 &config.db_path().join("store"),
580 Some(epoch_options.options),
581 EpochMetrics::new(®istry_service.default_registry()),
582 epoch_start_configuration,
583 cache_traits.backing_package_store.clone(),
584 cache_traits.object_store.clone(),
585 cache_metrics,
586 signature_verifier_metrics,
587 &config.expensive_safety_check_config,
588 (chain_id, chain),
589 checkpoint_store
590 .get_highest_executed_checkpoint_seq_number()
591 .expect("checkpoint store read cannot fail")
592 .unwrap_or(0),
593 Arc::new(SubmittedTransactionCacheMetrics::new(
594 ®istry_service.default_registry(),
595 )),
596 )?;
597
598 info!("created epoch store");
599
600 replay_log!(
601 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
602 epoch_store.epoch(),
603 epoch_store.protocol_config()
604 );
605
606 if is_genesis {
608 info!("checking SUI conservation at genesis");
609 cache_traits
614 .reconfig_api
615 .expensive_check_sui_conservation(&epoch_store)
616 .expect("SUI conservation check cannot fail at genesis");
617 }
618
619 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
620 let default_buffer_stake = epoch_store
621 .protocol_config()
622 .buffer_stake_for_protocol_upgrade_bps();
623 if effective_buffer_stake != default_buffer_stake {
624 warn!(
625 ?effective_buffer_stake,
626 ?default_buffer_stake,
627 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
628 );
629 }
630
631 checkpoint_store.insert_genesis_checkpoint(
632 genesis.checkpoint(),
633 genesis.checkpoint_contents().clone(),
634 &epoch_store,
635 );
636
637 info!("creating state sync store");
638 let state_sync_store = RocksDbStore::new(
639 cache_traits.clone(),
640 committee_store.clone(),
641 checkpoint_store.clone(),
642 );
643
644 let index_store = if is_full_node && config.enable_index_processing {
645 info!("creating index store");
646 Some(Arc::new(IndexStore::new(
647 config.db_path().join("indexes"),
648 &prometheus_registry,
649 epoch_store
650 .protocol_config()
651 .max_move_identifier_len_as_option(),
652 config.remove_deprecated_tables,
653 )))
654 } else {
655 None
656 };
657
658 let rpc_index = if is_full_node && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
659 Some(Arc::new(
660 RpcIndexStore::new(
661 &config.db_path(),
662 &store,
663 &checkpoint_store,
664 &epoch_store,
665 &cache_traits.backing_package_store,
666 pruner_watermarks.checkpoint_id.clone(),
667 config.rpc().cloned().unwrap_or_default(),
668 )
669 .await,
670 ))
671 } else {
672 None
673 };
674
675 let chain_identifier = epoch_store.get_chain_identifier();
676
677 info!("creating archive reader");
678 let (randomness_tx, randomness_rx) = mpsc::channel(
680 config
681 .p2p_config
682 .randomness
683 .clone()
684 .unwrap_or_default()
685 .mailbox_capacity(),
686 );
687 let P2pComponents {
688 p2p_network,
689 known_peers,
690 discovery_handle,
691 state_sync_handle,
692 randomness_handle,
693 } = Self::create_p2p_network(
694 &config,
695 state_sync_store.clone(),
696 chain_identifier,
697 randomness_tx,
698 &prometheus_registry,
699 )?;
700
701 let endpoint_manager = EndpointManager::new(discovery_handle.clone());
702
703 update_peer_addresses(&config, &endpoint_manager, epoch_store.epoch_start_state());
705
706 info!("start snapshot upload");
707 let state_snapshot_handle = Self::start_state_snapshot(
709 &config,
710 &prometheus_registry,
711 checkpoint_store.clone(),
712 chain_identifier,
713 )?;
714
715 info!("start db checkpoint");
717 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
718 &config,
719 &prometheus_registry,
720 state_snapshot_handle.is_some(),
721 )?;
722
723 if !epoch_store
724 .protocol_config()
725 .simplified_unwrap_then_delete()
726 {
727 config
729 .authority_store_pruning_config
730 .set_killswitch_tombstone_pruning(true);
731 }
732
733 let authority_name = config.protocol_public_key();
734
735 info!("create authority state");
736 let state = AuthorityState::new(
737 authority_name,
738 secret,
739 config.supported_protocol_versions.unwrap(),
740 store.clone(),
741 cache_traits.clone(),
742 epoch_store.clone(),
743 committee_store.clone(),
744 index_store.clone(),
745 rpc_index,
746 checkpoint_store.clone(),
747 &prometheus_registry,
748 genesis.objects(),
749 &db_checkpoint_config,
750 config.clone(),
751 chain_identifier,
752 pruner_db,
753 config.policy_config.clone(),
754 config.firewall_config.clone(),
755 pruner_watermarks,
756 )
757 .await;
758 if epoch_store.epoch() == 0 {
760 let txn = &genesis.transaction();
761 let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
762 let transaction =
763 sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
764 sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
765 genesis.transaction().data().clone(),
766 sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
767 ),
768 );
769 state
770 .try_execute_immediately(
771 &transaction,
772 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
773 &epoch_store,
774 )
775 .instrument(span)
776 .await
777 .unwrap();
778 }
779
780 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
782
783 if config
784 .expensive_safety_check_config
785 .enable_secondary_index_checks()
786 && let Some(indexes) = state.indexes.clone()
787 {
788 sui_core::verify_indexes::verify_indexes(
789 state.get_global_state_hash_store().as_ref(),
790 indexes,
791 )
792 .expect("secondary indexes are inconsistent");
793 }
794
795 let (end_of_epoch_channel, end_of_epoch_receiver) =
796 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
797
798 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
799 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
800 auth_agg.load_full(),
801 state.clone(),
802 end_of_epoch_receiver,
803 &config.db_path(),
804 &prometheus_registry,
805 &config,
806 )))
807 } else {
808 None
809 };
810
811 let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
812 state.clone(),
813 state_sync_store,
814 &transaction_orchestrator.clone(),
815 &config,
816 &prometheus_registry,
817 server_version,
818 )
819 .await?;
820
821 let global_state_hasher = Arc::new(GlobalStateHasher::new(
822 cache_traits.global_state_hash_store.clone(),
823 GlobalStateHashMetrics::new(&prometheus_registry),
824 ));
825
826 let authority_names_to_peer_ids = epoch_store
827 .epoch_start_state()
828 .get_authority_names_to_peer_ids();
829
830 let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
831 "sui",
832 ®istry_service.default_registry(),
833 );
834
835 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
836
837 let connection_monitor_handle =
838 mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
839 p2p_network.downgrade(),
840 Arc::new(network_connection_metrics),
841 known_peers,
842 );
843
844 let connection_monitor_status = ConnectionMonitorStatus {
845 connection_statuses: connection_monitor_handle.connection_statuses(),
846 authority_names_to_peer_ids,
847 };
848
849 let connection_monitor_status = Arc::new(connection_monitor_status);
850 let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry()));
851
852 sui_node_metrics
853 .binary_max_protocol_version
854 .set(ProtocolVersion::MAX.as_u64() as i64);
855 sui_node_metrics
856 .configured_max_protocol_version
857 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
858
859 let validator_components = if state.is_validator(&epoch_store) {
860 let (components, _) = futures::join!(
861 Self::construct_validator_components(
862 config.clone(),
863 state.clone(),
864 committee,
865 epoch_store.clone(),
866 checkpoint_store.clone(),
867 state_sync_handle.clone(),
868 randomness_handle.clone(),
869 Arc::downgrade(&global_state_hasher),
870 backpressure_manager.clone(),
871 connection_monitor_status.clone(),
872 ®istry_service,
873 sui_node_metrics.clone(),
874 checkpoint_metrics.clone(),
875 ),
876 Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
877 );
878 let mut components = components?;
879
880 components.consensus_adapter.submit_recovered(&epoch_store);
881
882 components.validator_server_handle = components.validator_server_handle.start().await;
884
885 Some(components)
886 } else {
887 None
888 };
889
890 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
892
893 let node = Self {
894 config,
895 validator_components: Mutex::new(validator_components),
896 http_servers,
897 state,
898 transaction_orchestrator,
899 registry_service,
900 metrics: sui_node_metrics,
901 checkpoint_metrics,
902
903 _discovery: discovery_handle,
904 _connection_monitor_handle: connection_monitor_handle,
905 state_sync_handle,
906 randomness_handle,
907 checkpoint_store,
908 global_state_hasher: Mutex::new(Some(global_state_hasher)),
909 end_of_epoch_channel,
910 connection_monitor_status,
911 endpoint_manager,
912 backpressure_manager,
913
914 _db_checkpoint_handle: db_checkpoint_handle,
915
916 #[cfg(msim)]
917 sim_state: Default::default(),
918
919 _state_snapshot_uploader_handle: state_snapshot_handle,
920 shutdown_channel_tx: shutdown_channel,
921
922 auth_agg,
923 subscription_service_checkpoint_sender,
924 };
925
926 info!("SuiNode started!");
927 let node = Arc::new(node);
928 let node_copy = node.clone();
929 spawn_monitored_task!(async move {
930 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
931 if let Err(error) = result {
932 warn!("Reconfiguration finished with error {:?}", error);
933 }
934 });
935
936 Ok(node)
937 }
938
939 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
940 self.end_of_epoch_channel.subscribe()
941 }
942
943 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
944 self.shutdown_channel_tx.subscribe()
945 }
946
947 pub fn current_epoch_for_testing(&self) -> EpochId {
948 self.state.current_epoch_for_testing()
949 }
950
951 pub fn db_checkpoint_path(&self) -> PathBuf {
952 self.config.db_checkpoint_path()
953 }
954
955 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
957 info!("close_epoch (current epoch = {})", epoch_store.epoch());
958 self.validator_components
959 .lock()
960 .await
961 .as_ref()
962 .ok_or_else(|| SuiError::from("Node is not a validator"))?
963 .consensus_adapter
964 .close_epoch(epoch_store);
965 Ok(())
966 }
967
968 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
969 self.state
970 .clear_override_protocol_upgrade_buffer_stake(epoch)
971 }
972
973 pub fn set_override_protocol_upgrade_buffer_stake(
974 &self,
975 epoch: EpochId,
976 buffer_stake_bps: u64,
977 ) -> SuiResult {
978 self.state
979 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
980 }
981
982 pub async fn close_epoch_for_testing(&self) -> SuiResult {
985 let epoch_store = self.state.epoch_store_for_testing();
986 self.close_epoch(&epoch_store).await
987 }
988
989 fn start_state_snapshot(
990 config: &NodeConfig,
991 prometheus_registry: &Registry,
992 checkpoint_store: Arc<CheckpointStore>,
993 chain_identifier: ChainIdentifier,
994 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
995 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
996 let snapshot_uploader = StateSnapshotUploader::new(
997 &config.db_checkpoint_path(),
998 &config.snapshot_path(),
999 remote_store_config.clone(),
1000 60,
1001 prometheus_registry,
1002 checkpoint_store,
1003 chain_identifier,
1004 config.state_snapshot_write_config.archive_interval_epochs,
1005 )?;
1006 Ok(Some(snapshot_uploader.start()))
1007 } else {
1008 Ok(None)
1009 }
1010 }
1011
1012 fn start_db_checkpoint(
1013 config: &NodeConfig,
1014 prometheus_registry: &Registry,
1015 state_snapshot_enabled: bool,
1016 ) -> Result<(
1017 DBCheckpointConfig,
1018 Option<tokio::sync::broadcast::Sender<()>>,
1019 )> {
1020 let checkpoint_path = Some(
1021 config
1022 .db_checkpoint_config
1023 .checkpoint_path
1024 .clone()
1025 .unwrap_or_else(|| config.db_checkpoint_path()),
1026 );
1027 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1028 DBCheckpointConfig {
1029 checkpoint_path,
1030 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1031 true
1032 } else {
1033 config
1034 .db_checkpoint_config
1035 .perform_db_checkpoints_at_epoch_end
1036 },
1037 ..config.db_checkpoint_config.clone()
1038 }
1039 } else {
1040 config.db_checkpoint_config.clone()
1041 };
1042
1043 match (
1044 db_checkpoint_config.object_store_config.as_ref(),
1045 state_snapshot_enabled,
1046 ) {
1047 (None, false) => Ok((db_checkpoint_config, None)),
1052 (_, _) => {
1053 let handler = DBCheckpointHandler::new(
1054 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1055 db_checkpoint_config.object_store_config.as_ref(),
1056 60,
1057 db_checkpoint_config
1058 .prune_and_compact_before_upload
1059 .unwrap_or(true),
1060 config.authority_store_pruning_config.clone(),
1061 prometheus_registry,
1062 state_snapshot_enabled,
1063 )?;
1064 Ok((
1065 db_checkpoint_config,
1066 Some(DBCheckpointHandler::start(handler)),
1067 ))
1068 }
1069 }
1070 }
1071
1072 fn create_p2p_network(
1073 config: &NodeConfig,
1074 state_sync_store: RocksDbStore,
1075 chain_identifier: ChainIdentifier,
1076 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1077 prometheus_registry: &Registry,
1078 ) -> Result<P2pComponents> {
1079 let (state_sync, state_sync_router) = state_sync::Builder::new()
1080 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1081 .store(state_sync_store)
1082 .archive_config(config.archive_reader_config())
1083 .with_metrics(prometheus_registry)
1084 .build();
1085
1086 let (discovery, discovery_server) = discovery::Builder::new()
1087 .config(config.p2p_config.clone())
1088 .build();
1089
1090 let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1091 let known_peers: HashMap<PeerId, String> = discovery_config
1092 .allowlisted_peers
1093 .clone()
1094 .into_iter()
1095 .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1096 .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1097 peer.peer_id
1098 .map(|peer_id| (peer_id, "seed_peer".to_string()))
1099 }))
1100 .collect();
1101
1102 let (randomness, randomness_router) =
1103 randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1104 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1105 .with_metrics(prometheus_registry)
1106 .build();
1107
1108 let p2p_network = {
1109 let routes = anemo::Router::new()
1110 .add_rpc_service(discovery_server)
1111 .merge(state_sync_router);
1112 let routes = routes.merge(randomness_router);
1113
1114 let inbound_network_metrics =
1115 mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1116 let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1117 "sui",
1118 "outbound",
1119 prometheus_registry,
1120 );
1121
1122 let service = ServiceBuilder::new()
1123 .layer(
1124 TraceLayer::new_for_server_errors()
1125 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1126 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1127 )
1128 .layer(CallbackLayer::new(
1129 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1130 Arc::new(inbound_network_metrics),
1131 config.p2p_config.excessive_message_size(),
1132 ),
1133 ))
1134 .service(routes);
1135
1136 let outbound_layer = ServiceBuilder::new()
1137 .layer(
1138 TraceLayer::new_for_client_and_server_errors()
1139 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1140 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1141 )
1142 .layer(CallbackLayer::new(
1143 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1144 Arc::new(outbound_network_metrics),
1145 config.p2p_config.excessive_message_size(),
1146 ),
1147 ))
1148 .into_inner();
1149
1150 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1151 anemo_config.max_frame_size = Some(1 << 30);
1154
1155 let mut quic_config = anemo_config.quic.unwrap_or_default();
1158 if quic_config.socket_send_buffer_size.is_none() {
1159 quic_config.socket_send_buffer_size = Some(20 << 20);
1160 }
1161 if quic_config.socket_receive_buffer_size.is_none() {
1162 quic_config.socket_receive_buffer_size = Some(20 << 20);
1163 }
1164 quic_config.allow_failed_socket_buffer_size_setting = true;
1165
1166 if quic_config.max_concurrent_bidi_streams.is_none() {
1169 quic_config.max_concurrent_bidi_streams = Some(500);
1170 }
1171 if quic_config.max_concurrent_uni_streams.is_none() {
1172 quic_config.max_concurrent_uni_streams = Some(500);
1173 }
1174 if quic_config.stream_receive_window.is_none() {
1175 quic_config.stream_receive_window = Some(100 << 20);
1176 }
1177 if quic_config.receive_window.is_none() {
1178 quic_config.receive_window = Some(200 << 20);
1179 }
1180 if quic_config.send_window.is_none() {
1181 quic_config.send_window = Some(200 << 20);
1182 }
1183 if quic_config.crypto_buffer_size.is_none() {
1184 quic_config.crypto_buffer_size = Some(1 << 20);
1185 }
1186 if quic_config.max_idle_timeout_ms.is_none() {
1187 quic_config.max_idle_timeout_ms = Some(30_000);
1188 }
1189 if quic_config.keep_alive_interval_ms.is_none() {
1190 quic_config.keep_alive_interval_ms = Some(5_000);
1191 }
1192 anemo_config.quic = Some(quic_config);
1193
1194 let server_name = format!("sui-{}", chain_identifier);
1195 let network = Network::bind(config.p2p_config.listen_address)
1196 .server_name(&server_name)
1197 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1198 .config(anemo_config)
1199 .outbound_request_layer(outbound_layer)
1200 .start(service)?;
1201 info!(
1202 server_name = server_name,
1203 "P2p network started on {}",
1204 network.local_addr()
1205 );
1206
1207 network
1208 };
1209
1210 let discovery_handle =
1211 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1212 let state_sync_handle = state_sync.start(p2p_network.clone());
1213 let randomness_handle = randomness.start(p2p_network.clone());
1214
1215 Ok(P2pComponents {
1216 p2p_network,
1217 known_peers,
1218 discovery_handle,
1219 state_sync_handle,
1220 randomness_handle,
1221 })
1222 }
1223
1224 async fn construct_validator_components(
1225 config: NodeConfig,
1226 state: Arc<AuthorityState>,
1227 committee: Arc<Committee>,
1228 epoch_store: Arc<AuthorityPerEpochStore>,
1229 checkpoint_store: Arc<CheckpointStore>,
1230 state_sync_handle: state_sync::Handle,
1231 randomness_handle: randomness::Handle,
1232 global_state_hasher: Weak<GlobalStateHasher>,
1233 backpressure_manager: Arc<BackpressureManager>,
1234 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1235 registry_service: &RegistryService,
1236 sui_node_metrics: Arc<SuiNodeMetrics>,
1237 checkpoint_metrics: Arc<CheckpointMetrics>,
1238 ) -> Result<ValidatorComponents> {
1239 let mut config_clone = config.clone();
1240 let consensus_config = config_clone
1241 .consensus_config
1242 .as_mut()
1243 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1244
1245 let client = Arc::new(UpdatableConsensusClient::new());
1246 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1247 &committee,
1248 consensus_config,
1249 state.name,
1250 connection_monitor_status.clone(),
1251 ®istry_service.default_registry(),
1252 epoch_store.protocol_config().clone(),
1253 client.clone(),
1254 checkpoint_store.clone(),
1255 ));
1256 let consensus_manager = Arc::new(ConsensusManager::new(
1257 &config,
1258 consensus_config,
1259 registry_service,
1260 client,
1261 ));
1262
1263 let consensus_store_pruner = ConsensusStorePruner::new(
1265 consensus_manager.get_storage_base_path(),
1266 consensus_config.db_retention_epochs(),
1267 consensus_config.db_pruner_period(),
1268 ®istry_service.default_registry(),
1269 );
1270
1271 let sui_tx_validator_metrics =
1272 SuiTxValidatorMetrics::new(®istry_service.default_registry());
1273
1274 let validator_server_handle = Self::start_grpc_validator_service(
1275 &config,
1276 state.clone(),
1277 consensus_adapter.clone(),
1278 ®istry_service.default_registry(),
1279 )
1280 .await?;
1281
1282 let validator_overload_monitor_handle = if config
1285 .authority_overload_config
1286 .max_load_shedding_percentage
1287 > 0
1288 {
1289 let authority_state = Arc::downgrade(&state);
1290 let overload_config = config.authority_overload_config.clone();
1291 fail_point!("starting_overload_monitor");
1292 Some(spawn_monitored_task!(overload_monitor(
1293 authority_state,
1294 overload_config,
1295 )))
1296 } else {
1297 None
1298 };
1299
1300 Self::start_epoch_specific_validator_components(
1301 &config,
1302 state.clone(),
1303 consensus_adapter,
1304 checkpoint_store,
1305 epoch_store,
1306 state_sync_handle,
1307 randomness_handle,
1308 consensus_manager,
1309 consensus_store_pruner,
1310 global_state_hasher,
1311 backpressure_manager,
1312 validator_server_handle,
1313 validator_overload_monitor_handle,
1314 checkpoint_metrics,
1315 sui_node_metrics,
1316 sui_tx_validator_metrics,
1317 )
1318 .await
1319 }
1320
1321 async fn start_epoch_specific_validator_components(
1322 config: &NodeConfig,
1323 state: Arc<AuthorityState>,
1324 consensus_adapter: Arc<ConsensusAdapter>,
1325 checkpoint_store: Arc<CheckpointStore>,
1326 epoch_store: Arc<AuthorityPerEpochStore>,
1327 state_sync_handle: state_sync::Handle,
1328 randomness_handle: randomness::Handle,
1329 consensus_manager: Arc<ConsensusManager>,
1330 consensus_store_pruner: ConsensusStorePruner,
1331 state_hasher: Weak<GlobalStateHasher>,
1332 backpressure_manager: Arc<BackpressureManager>,
1333 validator_server_handle: SpawnOnce,
1334 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1335 checkpoint_metrics: Arc<CheckpointMetrics>,
1336 sui_node_metrics: Arc<SuiNodeMetrics>,
1337 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1338 ) -> Result<ValidatorComponents> {
1339 let checkpoint_service = Self::build_checkpoint_service(
1340 config,
1341 consensus_adapter.clone(),
1342 checkpoint_store.clone(),
1343 epoch_store.clone(),
1344 state.clone(),
1345 state_sync_handle,
1346 state_hasher,
1347 checkpoint_metrics.clone(),
1348 );
1349
1350 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1354
1355 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1356
1357 if epoch_store.randomness_state_enabled() {
1358 let randomness_manager = RandomnessManager::try_new(
1359 Arc::downgrade(&epoch_store),
1360 Box::new(consensus_adapter.clone()),
1361 randomness_handle,
1362 config.protocol_key_pair(),
1363 )
1364 .await;
1365 if let Some(randomness_manager) = randomness_manager {
1366 epoch_store
1367 .set_randomness_manager(randomness_manager)
1368 .await?;
1369 }
1370 }
1371
1372 ExecutionTimeObserver::spawn(
1373 epoch_store.clone(),
1374 Box::new(consensus_adapter.clone()),
1375 config
1376 .execution_time_observer_config
1377 .clone()
1378 .unwrap_or_default(),
1379 );
1380
1381 let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1382 None,
1383 state.metrics.clone(),
1384 ));
1385
1386 let throughput_profiler = Arc::new(ConsensusThroughputProfiler::new(
1387 throughput_calculator.clone(),
1388 None,
1389 None,
1390 state.metrics.clone(),
1391 ThroughputProfileRanges::from_chain(epoch_store.get_chain_identifier()),
1392 ));
1393
1394 consensus_adapter.swap_throughput_profiler(throughput_profiler);
1395
1396 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1397 state.clone(),
1398 checkpoint_service.clone(),
1399 epoch_store.clone(),
1400 consensus_adapter.clone(),
1401 low_scoring_authorities,
1402 throughput_calculator,
1403 backpressure_manager,
1404 );
1405
1406 info!("Starting consensus manager asynchronously");
1407
1408 tokio::spawn({
1410 let config = config.clone();
1411 let epoch_store = epoch_store.clone();
1412 let sui_tx_validator = SuiTxValidator::new(
1413 state.clone(),
1414 epoch_store.clone(),
1415 checkpoint_service.clone(),
1416 sui_tx_validator_metrics.clone(),
1417 );
1418 let consensus_manager = consensus_manager.clone();
1419 async move {
1420 consensus_manager
1421 .start(
1422 &config,
1423 epoch_store,
1424 consensus_handler_initializer,
1425 sui_tx_validator,
1426 )
1427 .await;
1428 }
1429 });
1430 let replay_waiter = consensus_manager.replay_waiter();
1431
1432 info!("Spawning checkpoint service");
1433 let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1434 None
1435 } else {
1436 Some(replay_waiter)
1437 };
1438 checkpoint_service
1439 .spawn(epoch_store.clone(), replay_waiter)
1440 .await;
1441
1442 if epoch_store.authenticator_state_enabled() {
1443 Self::start_jwk_updater(
1444 config,
1445 sui_node_metrics,
1446 state.name,
1447 epoch_store.clone(),
1448 consensus_adapter.clone(),
1449 );
1450 }
1451
1452 Ok(ValidatorComponents {
1453 validator_server_handle,
1454 validator_overload_monitor_handle,
1455 consensus_manager,
1456 consensus_store_pruner,
1457 consensus_adapter,
1458 checkpoint_metrics,
1459 sui_tx_validator_metrics,
1460 })
1461 }
1462
1463 fn build_checkpoint_service(
1464 config: &NodeConfig,
1465 consensus_adapter: Arc<ConsensusAdapter>,
1466 checkpoint_store: Arc<CheckpointStore>,
1467 epoch_store: Arc<AuthorityPerEpochStore>,
1468 state: Arc<AuthorityState>,
1469 state_sync_handle: state_sync::Handle,
1470 state_hasher: Weak<GlobalStateHasher>,
1471 checkpoint_metrics: Arc<CheckpointMetrics>,
1472 ) -> Arc<CheckpointService> {
1473 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1474 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1475
1476 debug!(
1477 "Starting checkpoint service with epoch start timestamp {}
1478 and epoch duration {}",
1479 epoch_start_timestamp_ms, epoch_duration_ms
1480 );
1481
1482 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1483 sender: consensus_adapter,
1484 signer: state.secret.clone(),
1485 authority: config.protocol_public_key(),
1486 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1487 .checked_add(epoch_duration_ms)
1488 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1489 metrics: checkpoint_metrics.clone(),
1490 });
1491
1492 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1493 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1494 let max_checkpoint_size_bytes =
1495 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1496
1497 CheckpointService::build(
1498 state.clone(),
1499 checkpoint_store,
1500 epoch_store,
1501 state.get_transaction_cache_reader().clone(),
1502 state_hasher,
1503 checkpoint_output,
1504 Box::new(certified_checkpoint_output),
1505 checkpoint_metrics,
1506 max_tx_per_checkpoint,
1507 max_checkpoint_size_bytes,
1508 )
1509 }
1510
1511 fn construct_consensus_adapter(
1512 committee: &Committee,
1513 consensus_config: &ConsensusConfig,
1514 authority: AuthorityName,
1515 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1516 prometheus_registry: &Registry,
1517 protocol_config: ProtocolConfig,
1518 consensus_client: Arc<dyn ConsensusClient>,
1519 checkpoint_store: Arc<CheckpointStore>,
1520 ) -> ConsensusAdapter {
1521 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1522 ConsensusAdapter::new(
1525 consensus_client,
1526 checkpoint_store,
1527 authority,
1528 connection_monitor_status,
1529 consensus_config.max_pending_transactions(),
1530 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1531 consensus_config.max_submit_position,
1532 consensus_config.submit_delay_step_override(),
1533 ca_metrics,
1534 protocol_config,
1535 )
1536 }
1537
1538 async fn start_grpc_validator_service(
1539 config: &NodeConfig,
1540 state: Arc<AuthorityState>,
1541 consensus_adapter: Arc<ConsensusAdapter>,
1542 prometheus_registry: &Registry,
1543 ) -> Result<SpawnOnce> {
1544 let validator_service = ValidatorService::new(
1545 state.clone(),
1546 consensus_adapter,
1547 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1548 config.policy_config.clone().map(|p| p.client_id_source),
1549 );
1550
1551 let mut server_conf = mysten_network::config::Config::new();
1552 server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1553 server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1554 server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1555 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1556 server_conf.load_shed = config.grpc_load_shed;
1557 let mut server_builder =
1558 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1559
1560 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1561
1562 let tls_config = sui_tls::create_rustls_server_config(
1563 config.network_key_pair().copy().private(),
1564 SUI_TLS_SERVER_NAME.to_string(),
1565 );
1566
1567 let network_address = config.network_address().clone();
1568
1569 let (ready_tx, ready_rx) = oneshot::channel();
1570
1571 Ok(SpawnOnce::new(ready_rx, async move {
1572 let server = server_builder
1573 .bind(&network_address, Some(tls_config))
1574 .await
1575 .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1576 let local_addr = server.local_addr();
1577 info!("Listening to traffic on {local_addr}");
1578 ready_tx.send(()).unwrap();
1579 if let Err(err) = server.serve().await {
1580 info!("Server stopped: {err}");
1581 }
1582 info!("Server stopped");
1583 }))
1584 }
1585
1586 async fn reexecute_pending_consensus_certs(
1600 epoch_store: &Arc<AuthorityPerEpochStore>,
1601 state: &Arc<AuthorityState>,
1602 ) {
1603 let mut pending_consensus_certificates = Vec::new();
1604 let mut additional_certs = Vec::new();
1605
1606 for tx in epoch_store.get_all_pending_consensus_transactions() {
1607 match tx.kind {
1608 ConsensusTransactionKind::CertifiedTransaction(tx)
1613 if !tx.is_consensus_tx()
1614 && !epoch_store.protocol_config().disable_preconsensus_locking() =>
1615 {
1616 let tx = *tx;
1617 let tx = VerifiedExecutableTransaction::new_from_certificate(
1620 VerifiedCertificate::new_unchecked(tx),
1621 );
1622 if let Some(fx_digest) = epoch_store
1625 .get_signed_effects_digest(tx.digest())
1626 .expect("db error")
1627 {
1628 pending_consensus_certificates.push((
1629 Schedulable::Transaction(tx),
1630 ExecutionEnv::new().with_expected_effects_digest(fx_digest),
1631 ));
1632 } else {
1633 additional_certs.push((
1634 Schedulable::Transaction(tx),
1635 ExecutionEnv::new()
1636 .with_scheduling_source(SchedulingSource::NonFastPath),
1637 ));
1638 }
1639 }
1640 _ => (),
1641 }
1642 }
1643
1644 let digests = pending_consensus_certificates
1645 .iter()
1646 .map(|(tx, _)| *tx.key().unwrap_digest())
1648 .collect::<Vec<_>>();
1649
1650 info!(
1651 "reexecuting {} pending consensus certificates: {:?}",
1652 digests.len(),
1653 digests
1654 );
1655
1656 state
1657 .execution_scheduler()
1658 .enqueue(pending_consensus_certificates, epoch_store);
1659 state
1660 .execution_scheduler()
1661 .enqueue(additional_certs, epoch_store);
1662
1663 let timeout = if cfg!(msim) { 120 } else { 60 };
1668 if tokio::time::timeout(
1669 std::time::Duration::from_secs(timeout),
1670 state
1671 .get_transaction_cache_reader()
1672 .notify_read_executed_effects_digests(
1673 "SuiNode::notify_read_executed_effects_digests",
1674 &digests,
1675 ),
1676 )
1677 .await
1678 .is_err()
1679 {
1680 let executed_effects_digests = state
1682 .get_transaction_cache_reader()
1683 .multi_get_executed_effects_digests(&digests);
1684 let pending_digests = digests
1685 .iter()
1686 .zip(executed_effects_digests.iter())
1687 .filter_map(|(digest, executed_effects_digest)| {
1688 if executed_effects_digest.is_none() {
1689 Some(digest)
1690 } else {
1691 None
1692 }
1693 })
1694 .collect::<Vec<_>>();
1695 debug_fatal!(
1696 "Timed out waiting for effects digests to be executed: {:?}",
1697 pending_digests
1698 );
1699 }
1700 }
1701
1702 pub fn state(&self) -> Arc<AuthorityState> {
1703 self.state.clone()
1704 }
1705
1706 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1708 self.state.reference_gas_price_for_testing()
1709 }
1710
1711 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1712 self.state.committee_store().clone()
1713 }
1714
1715 pub fn clone_authority_aggregator(
1726 &self,
1727 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1728 self.transaction_orchestrator
1729 .as_ref()
1730 .map(|to| to.clone_authority_aggregator())
1731 }
1732
1733 pub fn transaction_orchestrator(
1734 &self,
1735 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1736 self.transaction_orchestrator.clone()
1737 }
1738
1739 pub async fn monitor_reconfiguration(
1742 self: Arc<Self>,
1743 mut epoch_store: Arc<AuthorityPerEpochStore>,
1744 ) -> Result<()> {
1745 let checkpoint_executor_metrics =
1746 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1747
1748 loop {
1749 let mut hasher_guard = self.global_state_hasher.lock().await;
1750 let hasher = hasher_guard.take().unwrap();
1751 info!(
1752 "Creating checkpoint executor for epoch {}",
1753 epoch_store.epoch()
1754 );
1755 let checkpoint_executor = CheckpointExecutor::new(
1756 epoch_store.clone(),
1757 self.checkpoint_store.clone(),
1758 self.state.clone(),
1759 hasher.clone(),
1760 self.backpressure_manager.clone(),
1761 self.config.checkpoint_executor_config.clone(),
1762 checkpoint_executor_metrics.clone(),
1763 self.subscription_service_checkpoint_sender.clone(),
1764 );
1765
1766 let run_with_range = self.config.run_with_range;
1767
1768 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1769
1770 self.metrics
1772 .current_protocol_version
1773 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1774
1775 if let Some(components) = &*self.validator_components.lock().await {
1777 tokio::time::sleep(Duration::from_millis(1)).await;
1779
1780 let config = cur_epoch_store.protocol_config();
1781 let mut supported_protocol_versions = self
1782 .config
1783 .supported_protocol_versions
1784 .expect("Supported versions should be populated")
1785 .truncate_below(config.version);
1787
1788 while supported_protocol_versions.max > config.version {
1789 let proposed_protocol_config = ProtocolConfig::get_for_version(
1790 supported_protocol_versions.max,
1791 cur_epoch_store.get_chain(),
1792 );
1793
1794 if proposed_protocol_config.enable_accumulators()
1795 && !epoch_store.accumulator_root_exists()
1796 {
1797 error!(
1798 "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1799 supported_protocol_versions.max
1800 );
1801 supported_protocol_versions.max = supported_protocol_versions.max.prev();
1802 } else {
1803 break;
1804 }
1805 }
1806
1807 let binary_config = config.binary_config(None);
1808 let transaction = ConsensusTransaction::new_capability_notification_v2(
1809 AuthorityCapabilitiesV2::new(
1810 self.state.name,
1811 cur_epoch_store.get_chain_identifier().chain(),
1812 supported_protocol_versions,
1813 self.state
1814 .get_available_system_packages(&binary_config)
1815 .await,
1816 ),
1817 );
1818 info!(?transaction, "submitting capabilities to consensus");
1819 components.consensus_adapter.submit(
1820 transaction,
1821 None,
1822 &cur_epoch_store,
1823 None,
1824 None,
1825 )?;
1826 }
1827
1828 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1829
1830 if stop_condition == StopReason::RunWithRangeCondition {
1831 SuiNode::shutdown(&self).await;
1832 self.shutdown_channel_tx
1833 .send(run_with_range)
1834 .expect("RunWithRangeCondition met but failed to send shutdown message");
1835 return Ok(());
1836 }
1837
1838 let latest_system_state = self
1840 .state
1841 .get_object_cache_reader()
1842 .get_sui_system_state_object_unsafe()
1843 .expect("Read Sui System State object cannot fail");
1844
1845 #[cfg(msim)]
1846 if !self
1847 .sim_state
1848 .sim_safe_mode_expected
1849 .load(Ordering::Relaxed)
1850 {
1851 debug_assert!(!latest_system_state.safe_mode());
1852 }
1853
1854 #[cfg(not(msim))]
1855 debug_assert!(!latest_system_state.safe_mode());
1856
1857 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1858 && self.state.is_fullnode(&cur_epoch_store)
1859 {
1860 warn!(
1861 "Failed to send end of epoch notification to subscriber: {:?}",
1862 err
1863 );
1864 }
1865
1866 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1867 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1868
1869 self.auth_agg.store(Arc::new(
1870 self.auth_agg
1871 .load()
1872 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1873 ));
1874
1875 let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1876 let next_epoch = next_epoch_committee.epoch();
1877 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1878
1879 info!(
1880 next_epoch,
1881 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1882 );
1883
1884 fail_point_async!("reconfig_delay");
1885
1886 let authority_names_to_peer_ids =
1891 new_epoch_start_state.get_authority_names_to_peer_ids();
1892 self.connection_monitor_status
1893 .update_mapping_for_epoch(authority_names_to_peer_ids);
1894
1895 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1896
1897 update_peer_addresses(&self.config, &self.endpoint_manager, &new_epoch_start_state);
1898
1899 let mut validator_components_lock_guard = self.validator_components.lock().await;
1900
1901 let new_epoch_store = self
1905 .reconfigure_state(
1906 &self.state,
1907 &cur_epoch_store,
1908 next_epoch_committee.clone(),
1909 new_epoch_start_state,
1910 hasher.clone(),
1911 )
1912 .await;
1913
1914 let new_validator_components = if let Some(ValidatorComponents {
1915 validator_server_handle,
1916 validator_overload_monitor_handle,
1917 consensus_manager,
1918 consensus_store_pruner,
1919 consensus_adapter,
1920 checkpoint_metrics,
1921 sui_tx_validator_metrics,
1922 }) = validator_components_lock_guard.take()
1923 {
1924 info!("Reconfiguring the validator.");
1925
1926 consensus_manager.shutdown().await;
1927 info!("Consensus has shut down.");
1928
1929 info!("Epoch store finished reconfiguration.");
1930
1931 let global_state_hasher_metrics = Arc::into_inner(hasher)
1934 .expect("Object state hasher should have no other references at this point")
1935 .metrics();
1936 let new_hasher = Arc::new(GlobalStateHasher::new(
1937 self.state.get_global_state_hash_store().clone(),
1938 global_state_hasher_metrics,
1939 ));
1940 let weak_hasher = Arc::downgrade(&new_hasher);
1941 *hasher_guard = Some(new_hasher);
1942
1943 consensus_store_pruner.prune(next_epoch).await;
1944
1945 if self.state.is_validator(&new_epoch_store) {
1946 Some(
1948 Self::start_epoch_specific_validator_components(
1949 &self.config,
1950 self.state.clone(),
1951 consensus_adapter,
1952 self.checkpoint_store.clone(),
1953 new_epoch_store.clone(),
1954 self.state_sync_handle.clone(),
1955 self.randomness_handle.clone(),
1956 consensus_manager,
1957 consensus_store_pruner,
1958 weak_hasher,
1959 self.backpressure_manager.clone(),
1960 validator_server_handle,
1961 validator_overload_monitor_handle,
1962 checkpoint_metrics,
1963 self.metrics.clone(),
1964 sui_tx_validator_metrics,
1965 )
1966 .await?,
1967 )
1968 } else {
1969 info!("This node is no longer a validator after reconfiguration");
1970 None
1971 }
1972 } else {
1973 let global_state_hasher_metrics = Arc::into_inner(hasher)
1976 .expect("Object state hasher should have no other references at this point")
1977 .metrics();
1978 let new_hasher = Arc::new(GlobalStateHasher::new(
1979 self.state.get_global_state_hash_store().clone(),
1980 global_state_hasher_metrics,
1981 ));
1982 let weak_hasher = Arc::downgrade(&new_hasher);
1983 *hasher_guard = Some(new_hasher);
1984
1985 if self.state.is_validator(&new_epoch_store) {
1986 info!("Promoting the node from fullnode to validator, starting grpc server");
1987
1988 let mut components = Self::construct_validator_components(
1989 self.config.clone(),
1990 self.state.clone(),
1991 Arc::new(next_epoch_committee.clone()),
1992 new_epoch_store.clone(),
1993 self.checkpoint_store.clone(),
1994 self.state_sync_handle.clone(),
1995 self.randomness_handle.clone(),
1996 weak_hasher,
1997 self.backpressure_manager.clone(),
1998 self.connection_monitor_status.clone(),
1999 &self.registry_service,
2000 self.metrics.clone(),
2001 self.checkpoint_metrics.clone(),
2002 )
2003 .await?;
2004
2005 components.validator_server_handle =
2006 components.validator_server_handle.start().await;
2007
2008 Some(components)
2009 } else {
2010 None
2011 }
2012 };
2013 *validator_components_lock_guard = new_validator_components;
2014
2015 cur_epoch_store.release_db_handles();
2018
2019 if cfg!(msim)
2020 && !matches!(
2021 self.config
2022 .authority_store_pruning_config
2023 .num_epochs_to_retain_for_checkpoints(),
2024 None | Some(u64::MAX) | Some(0)
2025 )
2026 {
2027 self.state
2028 .prune_checkpoints_for_eligible_epochs_for_testing(
2029 self.config.clone(),
2030 sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2031 )
2032 .await?;
2033 }
2034
2035 epoch_store = new_epoch_store;
2036 info!("Reconfiguration finished");
2037 }
2038 }
2039
2040 async fn shutdown(&self) {
2041 if let Some(validator_components) = &*self.validator_components.lock().await {
2042 validator_components.consensus_manager.shutdown().await;
2043 }
2044 }
2045
2046 async fn reconfigure_state(
2047 &self,
2048 state: &Arc<AuthorityState>,
2049 cur_epoch_store: &AuthorityPerEpochStore,
2050 next_epoch_committee: Committee,
2051 next_epoch_start_system_state: EpochStartSystemState,
2052 global_state_hasher: Arc<GlobalStateHasher>,
2053 ) -> Arc<AuthorityPerEpochStore> {
2054 let next_epoch = next_epoch_committee.epoch();
2055
2056 let last_checkpoint = self
2057 .checkpoint_store
2058 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2059 .expect("Error loading last checkpoint for current epoch")
2060 .expect("Could not load last checkpoint for current epoch");
2061
2062 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2063
2064 assert_eq!(
2065 Some(last_checkpoint_seq),
2066 self.checkpoint_store
2067 .get_highest_executed_checkpoint_seq_number()
2068 .expect("Error loading highest executed checkpoint sequence number")
2069 );
2070
2071 let epoch_start_configuration = EpochStartConfiguration::new(
2072 next_epoch_start_system_state,
2073 *last_checkpoint.digest(),
2074 state.get_object_store().as_ref(),
2075 EpochFlag::default_flags_for_new_epoch(&state.config),
2076 )
2077 .expect("EpochStartConfiguration construction cannot fail");
2078
2079 let new_epoch_store = self
2080 .state
2081 .reconfigure(
2082 cur_epoch_store,
2083 self.config.supported_protocol_versions.unwrap(),
2084 next_epoch_committee,
2085 epoch_start_configuration,
2086 global_state_hasher,
2087 &self.config.expensive_safety_check_config,
2088 last_checkpoint_seq,
2089 )
2090 .await
2091 .expect("Reconfigure authority state cannot fail");
2092 info!(next_epoch, "Node State has been reconfigured");
2093 assert_eq!(next_epoch, new_epoch_store.epoch());
2094 self.state.get_reconfig_api().update_epoch_flags_metrics(
2095 cur_epoch_store.epoch_start_config().flags(),
2096 new_epoch_store.epoch_start_config().flags(),
2097 );
2098
2099 new_epoch_store
2100 }
2101
2102 pub fn get_config(&self) -> &NodeConfig {
2103 &self.config
2104 }
2105
2106 pub fn randomness_handle(&self) -> randomness::Handle {
2107 self.randomness_handle.clone()
2108 }
2109
2110 fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2112 let digest_str = digest.to_string();
2113 if digest_str.len() >= 8 {
2114 digest_str[0..8].to_string()
2115 } else {
2116 digest_str
2117 }
2118 }
2119
2120 async fn check_and_recover_forks(
2124 checkpoint_store: &CheckpointStore,
2125 checkpoint_metrics: &CheckpointMetrics,
2126 is_validator: bool,
2127 fork_recovery: Option<&ForkRecoveryConfig>,
2128 ) -> Result<()> {
2129 if !is_validator {
2132 return Ok(());
2133 }
2134
2135 if let Some(recovery) = fork_recovery {
2137 Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2138 Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2139 }
2140
2141 if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2142 .get_checkpoint_fork_detected()
2143 .map_err(|e| {
2144 error!("Failed to check for checkpoint fork: {:?}", e);
2145 e
2146 })?
2147 {
2148 Self::handle_checkpoint_fork(
2149 checkpoint_seq,
2150 checkpoint_digest,
2151 checkpoint_metrics,
2152 fork_recovery,
2153 )
2154 .await?;
2155 }
2156 if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2157 .get_transaction_fork_detected()
2158 .map_err(|e| {
2159 error!("Failed to check for transaction fork: {:?}", e);
2160 e
2161 })?
2162 {
2163 Self::handle_transaction_fork(
2164 tx_digest,
2165 expected_effects,
2166 actual_effects,
2167 checkpoint_metrics,
2168 fork_recovery,
2169 )
2170 .await?;
2171 }
2172
2173 Ok(())
2174 }
2175
2176 fn try_recover_checkpoint_fork(
2177 checkpoint_store: &CheckpointStore,
2178 recovery: &ForkRecoveryConfig,
2179 ) -> Result<()> {
2180 for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2183 let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2184 anyhow::bail!(
2185 "Invalid checkpoint digest override for seq {}: {}",
2186 seq,
2187 expected_digest_str
2188 );
2189 };
2190
2191 if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2192 let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2193 if local_digest != expected_digest {
2194 info!(
2195 seq,
2196 local = %Self::get_digest_prefix(local_digest),
2197 expected = %Self::get_digest_prefix(expected_digest),
2198 "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2199 seq
2200 );
2201 checkpoint_store
2202 .clear_locally_computed_checkpoints_from(*seq)
2203 .context(
2204 "Failed to clear locally computed checkpoints from override seq",
2205 )?;
2206 }
2207 }
2208 }
2209
2210 if let Some((checkpoint_seq, checkpoint_digest)) =
2211 checkpoint_store.get_checkpoint_fork_detected()?
2212 && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2213 {
2214 info!(
2215 "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2216 checkpoint_seq, checkpoint_digest
2217 );
2218 checkpoint_store
2219 .clear_checkpoint_fork_detected()
2220 .expect("Failed to clear checkpoint fork detected marker");
2221 }
2222 Ok(())
2223 }
2224
2225 fn try_recover_transaction_fork(
2226 checkpoint_store: &CheckpointStore,
2227 recovery: &ForkRecoveryConfig,
2228 ) -> Result<()> {
2229 if recovery.transaction_overrides.is_empty() {
2230 return Ok(());
2231 }
2232
2233 if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2234 && recovery
2235 .transaction_overrides
2236 .contains_key(&tx_digest.to_string())
2237 {
2238 info!(
2239 "Fork recovery enabled: clearing transaction fork for tx {:?}",
2240 tx_digest
2241 );
2242 checkpoint_store
2243 .clear_transaction_fork_detected()
2244 .expect("Failed to clear transaction fork detected marker");
2245 }
2246 Ok(())
2247 }
2248
2249 fn get_current_timestamp() -> u64 {
2250 std::time::SystemTime::now()
2251 .duration_since(std::time::SystemTime::UNIX_EPOCH)
2252 .unwrap()
2253 .as_secs()
2254 }
2255
2256 async fn handle_checkpoint_fork(
2257 checkpoint_seq: u64,
2258 checkpoint_digest: CheckpointDigest,
2259 checkpoint_metrics: &CheckpointMetrics,
2260 fork_recovery: Option<&ForkRecoveryConfig>,
2261 ) -> Result<()> {
2262 checkpoint_metrics
2263 .checkpoint_fork_crash_mode
2264 .with_label_values(&[
2265 &checkpoint_seq.to_string(),
2266 &Self::get_digest_prefix(checkpoint_digest),
2267 &Self::get_current_timestamp().to_string(),
2268 ])
2269 .set(1);
2270
2271 let behavior = fork_recovery
2272 .map(|fr| fr.fork_crash_behavior)
2273 .unwrap_or_default();
2274
2275 match behavior {
2276 ForkCrashBehavior::AwaitForkRecovery => {
2277 error!(
2278 checkpoint_seq = checkpoint_seq,
2279 checkpoint_digest = ?checkpoint_digest,
2280 "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2281 );
2282 futures::future::pending::<()>().await;
2283 unreachable!("pending() should never return");
2284 }
2285 ForkCrashBehavior::ReturnError => {
2286 error!(
2287 checkpoint_seq = checkpoint_seq,
2288 checkpoint_digest = ?checkpoint_digest,
2289 "Checkpoint fork detected! Returning error."
2290 );
2291 Err(anyhow::anyhow!(
2292 "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2293 checkpoint_seq,
2294 checkpoint_digest
2295 ))
2296 }
2297 }
2298 }
2299
2300 async fn handle_transaction_fork(
2301 tx_digest: TransactionDigest,
2302 expected_effects_digest: TransactionEffectsDigest,
2303 actual_effects_digest: TransactionEffectsDigest,
2304 checkpoint_metrics: &CheckpointMetrics,
2305 fork_recovery: Option<&ForkRecoveryConfig>,
2306 ) -> Result<()> {
2307 checkpoint_metrics
2308 .transaction_fork_crash_mode
2309 .with_label_values(&[
2310 &Self::get_digest_prefix(tx_digest),
2311 &Self::get_digest_prefix(expected_effects_digest),
2312 &Self::get_digest_prefix(actual_effects_digest),
2313 &Self::get_current_timestamp().to_string(),
2314 ])
2315 .set(1);
2316
2317 let behavior = fork_recovery
2318 .map(|fr| fr.fork_crash_behavior)
2319 .unwrap_or_default();
2320
2321 match behavior {
2322 ForkCrashBehavior::AwaitForkRecovery => {
2323 error!(
2324 tx_digest = ?tx_digest,
2325 expected_effects_digest = ?expected_effects_digest,
2326 actual_effects_digest = ?actual_effects_digest,
2327 "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2328 );
2329 futures::future::pending::<()>().await;
2330 unreachable!("pending() should never return");
2331 }
2332 ForkCrashBehavior::ReturnError => {
2333 error!(
2334 tx_digest = ?tx_digest,
2335 expected_effects_digest = ?expected_effects_digest,
2336 actual_effects_digest = ?actual_effects_digest,
2337 "Transaction fork detected! Returning error."
2338 );
2339 Err(anyhow::anyhow!(
2340 "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2341 tx_digest,
2342 expected_effects_digest,
2343 actual_effects_digest
2344 ))
2345 }
2346 }
2347 }
2348}
2349
2350#[cfg(not(msim))]
2351impl SuiNode {
2352 async fn fetch_jwks(
2353 _authority: AuthorityName,
2354 provider: &OIDCProvider,
2355 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2356 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2357 use sui_types::error::SuiErrorKind;
2358 let client = reqwest::Client::new();
2359 fetch_jwks(provider, &client, true)
2360 .await
2361 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2362 }
2363}
2364
2365#[cfg(msim)]
2366impl SuiNode {
2367 pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2368 self.sim_state.sim_node.id()
2369 }
2370
2371 pub fn set_safe_mode_expected(&self, new_value: bool) {
2372 info!("Setting safe mode expected to {}", new_value);
2373 self.sim_state
2374 .sim_safe_mode_expected
2375 .store(new_value, Ordering::Relaxed);
2376 }
2377
2378 #[allow(unused_variables)]
2379 async fn fetch_jwks(
2380 authority: AuthorityName,
2381 provider: &OIDCProvider,
2382 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2383 get_jwk_injector()(authority, provider)
2384 }
2385}
2386
2387enum SpawnOnce {
2388 Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2390 #[allow(unused)]
2391 Started(JoinHandle<()>),
2392}
2393
2394impl SpawnOnce {
2395 pub fn new(
2396 ready_rx: oneshot::Receiver<()>,
2397 future: impl Future<Output = ()> + Send + 'static,
2398 ) -> Self {
2399 Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2400 }
2401
2402 pub async fn start(self) -> Self {
2403 match self {
2404 Self::Unstarted(ready_rx, future) => {
2405 let future = future.into_inner();
2406 let handle = tokio::spawn(future);
2407 ready_rx.await.unwrap();
2408 Self::Started(handle)
2409 }
2410 Self::Started(_) => self,
2411 }
2412 }
2413}
2414
2415fn update_peer_addresses(
2417 config: &NodeConfig,
2418 endpoint_manager: &EndpointManager,
2419 epoch_start_state: &EpochStartSystemState,
2420) {
2421 for (peer_id, address) in
2422 epoch_start_state.get_validator_as_p2p_peers(config.protocol_public_key())
2423 {
2424 endpoint_manager.update_endpoint(EndpointId::P2p(peer_id), vec![address]);
2425 }
2426}
2427
2428fn build_kv_store(
2429 state: &Arc<AuthorityState>,
2430 config: &NodeConfig,
2431 registry: &Registry,
2432) -> Result<Arc<TransactionKeyValueStore>> {
2433 let metrics = KeyValueStoreMetrics::new(registry);
2434 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2435
2436 let base_url = &config.transaction_kv_store_read_config.base_url;
2437
2438 if base_url.is_empty() {
2439 info!("no http kv store url provided, using local db only");
2440 return Ok(Arc::new(db_store));
2441 }
2442
2443 let base_url: url::Url = base_url.parse().tap_err(|e| {
2444 error!(
2445 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2446 base_url, e
2447 )
2448 })?;
2449
2450 let network_str = match state.get_chain_identifier().chain() {
2451 Chain::Mainnet => "/mainnet",
2452 _ => {
2453 info!("using local db only for kv store");
2454 return Ok(Arc::new(db_store));
2455 }
2456 };
2457
2458 let base_url = base_url.join(network_str)?.to_string();
2459 let http_store = HttpKVStore::new_kv(
2460 &base_url,
2461 config.transaction_kv_store_read_config.cache_size,
2462 metrics.clone(),
2463 )?;
2464 info!("using local key-value store with fallback to http key-value store");
2465 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2466 db_store,
2467 http_store,
2468 metrics,
2469 "json_rpc_fallback",
2470 )))
2471}
2472
2473async fn build_http_servers(
2474 state: Arc<AuthorityState>,
2475 store: RocksDbStore,
2476 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2477 config: &NodeConfig,
2478 prometheus_registry: &Registry,
2479 server_version: ServerVersion,
2480) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2481 if config.consensus_config().is_some() {
2483 return Ok((HttpServers::default(), None));
2484 }
2485
2486 let mut router = axum::Router::new();
2487
2488 let json_rpc_router = {
2489 let traffic_controller = state.traffic_controller.clone();
2490 let mut server = JsonRpcServerBuilder::new(
2491 env!("CARGO_PKG_VERSION"),
2492 prometheus_registry,
2493 traffic_controller,
2494 config.policy_config.clone(),
2495 );
2496
2497 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2498
2499 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2500 server.register_module(ReadApi::new(
2501 state.clone(),
2502 kv_store.clone(),
2503 metrics.clone(),
2504 ))?;
2505 server.register_module(CoinReadApi::new(
2506 state.clone(),
2507 kv_store.clone(),
2508 metrics.clone(),
2509 ))?;
2510
2511 if config.run_with_range.is_none() {
2514 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2515 }
2516 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2517 server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2518
2519 if let Some(transaction_orchestrator) = transaction_orchestrator {
2520 server.register_module(TransactionExecutionApi::new(
2521 state.clone(),
2522 transaction_orchestrator.clone(),
2523 metrics.clone(),
2524 ))?;
2525 }
2526
2527 let name_service_config =
2528 if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2529 config.name_service_package_address,
2530 config.name_service_registry_id,
2531 config.name_service_reverse_registry_id,
2532 ) {
2533 sui_name_service::NameServiceConfig::new(
2534 package_address,
2535 registry_id,
2536 reverse_registry_id,
2537 )
2538 } else {
2539 match state.get_chain_identifier().chain() {
2540 Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2541 Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2542 Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2543 }
2544 };
2545
2546 server.register_module(IndexerApi::new(
2547 state.clone(),
2548 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2549 kv_store,
2550 name_service_config,
2551 metrics,
2552 config.indexer_max_subscriptions,
2553 ))?;
2554 server.register_module(MoveUtils::new(state.clone()))?;
2555
2556 let server_type = config.jsonrpc_server_type();
2557
2558 server.to_router(server_type).await?
2559 };
2560
2561 router = router.merge(json_rpc_router);
2562
2563 let (subscription_service_checkpoint_sender, subscription_service_handle) =
2564 SubscriptionService::build(prometheus_registry);
2565 let rpc_router = {
2566 let mut rpc_service =
2567 sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2568 rpc_service.with_server_version(server_version);
2569
2570 if let Some(config) = config.rpc.clone() {
2571 rpc_service.with_config(config);
2572 }
2573
2574 rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2575 rpc_service.with_subscription_service(subscription_service_handle);
2576
2577 if let Some(transaction_orchestrator) = transaction_orchestrator {
2578 rpc_service.with_executor(transaction_orchestrator.clone())
2579 }
2580
2581 rpc_service.into_router().await
2582 };
2583
2584 let layers = ServiceBuilder::new()
2585 .map_request(|mut request: axum::http::Request<_>| {
2586 if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2587 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2588 request.extensions_mut().insert(axum_connect_info);
2589 }
2590 request
2591 })
2592 .layer(axum::middleware::from_fn(server_timing_middleware))
2593 .layer(
2595 tower_http::cors::CorsLayer::new()
2596 .allow_methods([http::Method::GET, http::Method::POST])
2597 .allow_origin(tower_http::cors::Any)
2598 .allow_headers(tower_http::cors::Any),
2599 );
2600
2601 router = router.merge(rpc_router).layer(layers);
2602
2603 let https = if let Some((tls_config, https_address)) = config
2604 .rpc()
2605 .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2606 {
2607 let https = sui_http::Builder::new()
2608 .tls_single_cert(tls_config.cert(), tls_config.key())
2609 .and_then(|builder| builder.serve(https_address, router.clone()))
2610 .map_err(|e| anyhow::anyhow!(e))?;
2611
2612 info!(
2613 https_address =? https.local_addr(),
2614 "HTTPS rpc server listening on {}",
2615 https.local_addr()
2616 );
2617
2618 Some(https)
2619 } else {
2620 None
2621 };
2622
2623 let http = sui_http::Builder::new()
2624 .serve(&config.json_rpc_address, router)
2625 .map_err(|e| anyhow::anyhow!(e))?;
2626
2627 info!(
2628 http_address =? http.local_addr(),
2629 "HTTP rpc server listening on {}",
2630 http.local_addr()
2631 );
2632
2633 Ok((
2634 HttpServers {
2635 http: Some(http),
2636 https,
2637 },
2638 Some(subscription_service_checkpoint_sender),
2639 ))
2640}
2641
2642#[cfg(not(test))]
2643fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2644 protocol_config.max_transactions_per_checkpoint() as usize
2645}
2646
2647#[cfg(test)]
2648fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2649 2
2650}
2651
2652#[derive(Default)]
2653struct HttpServers {
2654 #[allow(unused)]
2655 http: Option<sui_http::ServerHandle>,
2656 #[allow(unused)]
2657 https: Option<sui_http::ServerHandle>,
2658}
2659
2660#[cfg(test)]
2661mod tests {
2662 use super::*;
2663 use prometheus::Registry;
2664 use std::collections::BTreeMap;
2665 use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2666 use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2667 use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2668
2669 #[tokio::test]
2670 async fn test_fork_error_and_recovery_both_paths() {
2671 let checkpoint_store = CheckpointStore::new_for_tests();
2672 let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2673
2674 let seq_num = 42;
2676 let digest = CheckpointDigest::random();
2677 checkpoint_store
2678 .record_checkpoint_fork_detected(seq_num, digest)
2679 .unwrap();
2680
2681 let fork_recovery = ForkRecoveryConfig {
2682 transaction_overrides: Default::default(),
2683 checkpoint_overrides: Default::default(),
2684 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2685 };
2686
2687 let r = SuiNode::check_and_recover_forks(
2688 &checkpoint_store,
2689 &checkpoint_metrics,
2690 true,
2691 Some(&fork_recovery),
2692 )
2693 .await;
2694 assert!(r.is_err());
2695 assert!(
2696 r.unwrap_err()
2697 .to_string()
2698 .contains("Checkpoint fork detected")
2699 );
2700
2701 let mut checkpoint_overrides = BTreeMap::new();
2702 checkpoint_overrides.insert(seq_num, digest.to_string());
2703 let fork_recovery_with_override = ForkRecoveryConfig {
2704 transaction_overrides: Default::default(),
2705 checkpoint_overrides,
2706 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2707 };
2708 let r = SuiNode::check_and_recover_forks(
2709 &checkpoint_store,
2710 &checkpoint_metrics,
2711 true,
2712 Some(&fork_recovery_with_override),
2713 )
2714 .await;
2715 assert!(r.is_ok());
2716 assert!(
2717 checkpoint_store
2718 .get_checkpoint_fork_detected()
2719 .unwrap()
2720 .is_none()
2721 );
2722
2723 let tx_digest = TransactionDigest::random();
2725 let expected_effects = TransactionEffectsDigest::random();
2726 let actual_effects = TransactionEffectsDigest::random();
2727 checkpoint_store
2728 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2729 .unwrap();
2730
2731 let fork_recovery = ForkRecoveryConfig {
2732 transaction_overrides: Default::default(),
2733 checkpoint_overrides: Default::default(),
2734 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2735 };
2736 let r = SuiNode::check_and_recover_forks(
2737 &checkpoint_store,
2738 &checkpoint_metrics,
2739 true,
2740 Some(&fork_recovery),
2741 )
2742 .await;
2743 assert!(r.is_err());
2744 assert!(
2745 r.unwrap_err()
2746 .to_string()
2747 .contains("Transaction fork detected")
2748 );
2749
2750 let mut transaction_overrides = BTreeMap::new();
2751 transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2752 let fork_recovery_with_override = ForkRecoveryConfig {
2753 transaction_overrides,
2754 checkpoint_overrides: Default::default(),
2755 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2756 };
2757 let r = SuiNode::check_and_recover_forks(
2758 &checkpoint_store,
2759 &checkpoint_metrics,
2760 true,
2761 Some(&fork_recovery_with_override),
2762 )
2763 .await;
2764 assert!(r.is_ok());
2765 assert!(
2766 checkpoint_store
2767 .get_transaction_fork_detected()
2768 .unwrap()
2769 .is_none()
2770 );
2771 }
2772}