1use anemo::Network;
5use anemo::PeerId;
6use anemo_tower::callback::CallbackLayer;
7use anemo_tower::trace::DefaultMakeSpan;
8use anemo_tower::trace::DefaultOnFailure;
9use anemo_tower::trace::TraceLayer;
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::anyhow;
13use arc_swap::ArcSwap;
14use fastcrypto_zkp::bn254::zk_login::JwkId;
15use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
16use futures::future::BoxFuture;
17use mysten_common::in_test_configuration;
18use prometheus::Registry;
19use std::collections::{BTreeSet, HashMap, HashSet};
20use std::fmt;
21use std::future::Future;
22use std::path::PathBuf;
23use std::str::FromStr;
24#[cfg(msim)]
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use sui_core::authority::ExecutionEnv;
29use sui_core::authority::RandomnessRoundReceiver;
30use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions;
31use sui_core::authority::backpressure::BackpressureManager;
32use sui_core::authority::epoch_start_configuration::EpochFlag;
33use sui_core::authority::execution_time_estimator::ExecutionTimeObserver;
34use sui_core::consensus_adapter::ConsensusClient;
35use sui_core::consensus_manager::UpdatableConsensusClient;
36use sui_core::epoch::randomness::RandomnessManager;
37use sui_core::execution_cache::build_execution_cache;
38use sui_network::endpoint_manager::{AddressSource, EndpointId};
39use sui_network::validator::server::SUI_TLS_SERVER_NAME;
40use sui_types::full_checkpoint_content::Checkpoint;
41
42use sui_core::execution_scheduler::SchedulingSource;
43use sui_core::global_state_hasher::GlobalStateHashMetrics;
44use sui_core::storage::RestReadStore;
45use sui_json_rpc::bridge_api::BridgeReadApi;
46use sui_json_rpc_api::JsonRpcMetrics;
47use sui_network::randomness;
48use sui_rpc_api::RpcMetrics;
49use sui_rpc_api::ServerVersion;
50use sui_rpc_api::subscription::SubscriptionService;
51use sui_types::base_types::ConciseableName;
52use sui_types::crypto::RandomnessRound;
53use sui_types::digests::{
54 ChainIdentifier, CheckpointDigest, TransactionDigest, TransactionEffectsDigest,
55};
56use sui_types::messages_consensus::AuthorityCapabilitiesV2;
57use sui_types::sui_system_state::SuiSystemState;
58use tap::tap::TapFallible;
59use tokio::sync::oneshot;
60use tokio::sync::{Mutex, broadcast, mpsc};
61use tokio::task::JoinHandle;
62use tower::ServiceBuilder;
63use tracing::{Instrument, error_span, info};
64use tracing::{debug, error, warn};
65
66macro_rules! jwk_log {
70 ($($arg:tt)+) => {
71 if in_test_configuration() {
72 debug!($($arg)+);
73 } else {
74 info!($($arg)+);
75 }
76 };
77}
78
79use fastcrypto_zkp::bn254::zk_login::JWK;
80pub use handle::SuiNodeHandle;
81use mysten_metrics::{RegistryService, spawn_monitored_task};
82use mysten_service::server_timing::server_timing_middleware;
83use sui_config::node::{DBCheckpointConfig, RunWithRange};
84use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
85use sui_config::node_config_metrics::NodeConfigMetrics;
86use sui_config::{ConsensusConfig, NodeConfig};
87use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore;
88use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
89use sui_core::authority::epoch_start_configuration::EpochStartConfigTrait;
90use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
91use sui_core::authority::submitted_transaction_cache::SubmittedTransactionCacheMetrics;
92use sui_core::authority_aggregator::AuthorityAggregator;
93use sui_core::authority_server::{ValidatorService, ValidatorServiceMetrics};
94use sui_core::checkpoints::checkpoint_executor::metrics::CheckpointExecutorMetrics;
95use sui_core::checkpoints::checkpoint_executor::{CheckpointExecutor, StopReason};
96use sui_core::checkpoints::{
97 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
98 SubmitCheckpointToConsensus,
99};
100use sui_core::consensus_adapter::{
101 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
102};
103use sui_core::consensus_manager::ConsensusManager;
104use sui_core::consensus_throughput_calculator::{
105 ConsensusThroughputCalculator, ConsensusThroughputProfiler, ThroughputProfileRanges,
106};
107use sui_core::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics};
108use sui_core::db_checkpoint_handler::DBCheckpointHandler;
109use sui_core::epoch::committee_store::CommitteeStore;
110use sui_core::epoch::consensus_store_pruner::ConsensusStorePruner;
111use sui_core::epoch::epoch_metrics::EpochMetrics;
112use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
113use sui_core::global_state_hasher::GlobalStateHasher;
114use sui_core::jsonrpc_index::IndexStore;
115use sui_core::module_cache_metrics::ResolverMetrics;
116use sui_core::overload_monitor::overload_monitor;
117use sui_core::rpc_index::RpcIndexStore;
118use sui_core::signature_verifier::SignatureVerifierMetrics;
119use sui_core::storage::RocksDbStore;
120use sui_core::transaction_orchestrator::TransactionOrchestrator;
121use sui_core::{
122 authority::{AuthorityState, AuthorityStore},
123 authority_client::NetworkAuthorityClient,
124};
125use sui_json_rpc::JsonRpcServerBuilder;
126use sui_json_rpc::coin_api::CoinReadApi;
127use sui_json_rpc::governance_api::GovernanceReadApi;
128use sui_json_rpc::indexer_api::IndexerApi;
129use sui_json_rpc::move_utils::MoveUtils;
130use sui_json_rpc::read_api::ReadApi;
131use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
132use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
133use sui_macros::fail_point;
134use sui_macros::{fail_point_async, replay_log};
135use sui_network::api::ValidatorServer;
136use sui_network::discovery;
137use sui_network::endpoint_manager::EndpointManager;
138use sui_network::state_sync;
139use sui_network::validator::server::ServerBuilder;
140use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
141use sui_snapshot::uploader::StateSnapshotUploader;
142use sui_storage::{
143 http_key_value_store::HttpKVStore,
144 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
145 key_value_store_metrics::KeyValueStoreMetrics,
146};
147use sui_types::base_types::{AuthorityName, EpochId};
148use sui_types::committee::Committee;
149use sui_types::crypto::KeypairTraits;
150use sui_types::error::{SuiError, SuiResult};
151use sui_types::messages_consensus::{ConsensusTransaction, check_total_jwk_size};
152use sui_types::sui_system_state::SuiSystemStateTrait;
153use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState;
154use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
155use sui_types::supported_protocol_versions::SupportedProtocolVersions;
156use typed_store::DBMetrics;
157use typed_store::rocks::default_db_options;
158
159use crate::metrics::{GrpcMetrics, SuiNodeMetrics};
160
161pub mod admin;
162mod handle;
163pub mod metrics;
164
165pub struct ValidatorComponents {
166 validator_server_handle: SpawnOnce,
167 validator_overload_monitor_handle: Option<JoinHandle<()>>,
168 consensus_manager: Arc<ConsensusManager>,
169 consensus_store_pruner: ConsensusStorePruner,
170 consensus_adapter: Arc<ConsensusAdapter>,
171 checkpoint_metrics: Arc<CheckpointMetrics>,
172 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
173}
174pub struct P2pComponents {
175 p2p_network: Network,
176 known_peers: HashMap<PeerId, String>,
177 discovery_handle: discovery::Handle,
178 state_sync_handle: state_sync::Handle,
179 randomness_handle: randomness::Handle,
180}
181
182#[cfg(msim)]
183mod simulator {
184 use std::sync::atomic::AtomicBool;
185 use sui_types::error::SuiErrorKind;
186
187 use super::*;
188 pub(super) struct SimState {
189 pub sim_node: sui_simulator::runtime::NodeHandle,
190 pub sim_safe_mode_expected: AtomicBool,
191 _leak_detector: sui_simulator::NodeLeakDetector,
192 }
193
194 impl Default for SimState {
195 fn default() -> Self {
196 Self {
197 sim_node: sui_simulator::runtime::NodeHandle::current(),
198 sim_safe_mode_expected: AtomicBool::new(false),
199 _leak_detector: sui_simulator::NodeLeakDetector::new(),
200 }
201 }
202 }
203
204 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> SuiResult<Vec<(JwkId, JWK)>>
205 + Send
206 + Sync
207 + 'static;
208
209 fn default_fetch_jwks(
210 _authority: AuthorityName,
211 _provider: &OIDCProvider,
212 ) -> SuiResult<Vec<(JwkId, JWK)>> {
213 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
214 parse_jwks(
216 sui_types::zk_login_util::DEFAULT_JWK_BYTES,
217 &OIDCProvider::Twitch,
218 true,
219 )
220 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
221 }
222
223 thread_local! {
224 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
225 }
226
227 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
228 JWK_INJECTOR.with(|injector| injector.borrow().clone())
229 }
230
231 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
232 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
233 }
234}
235
236#[cfg(msim)]
237pub use simulator::set_jwk_injector;
238#[cfg(msim)]
239use simulator::*;
240use sui_core::authority::authority_store_pruner::PrunerWatermarks;
241use sui_core::{
242 consensus_handler::ConsensusHandlerInitializer, safe_client::SafeClientMetricsBase,
243};
244
245const DEFAULT_GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(60);
246
247pub struct SuiNode {
248 config: NodeConfig,
249 validator_components: Mutex<Option<ValidatorComponents>>,
250
251 #[allow(unused)]
253 http_servers: HttpServers,
254
255 state: Arc<AuthorityState>,
256 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
257 registry_service: RegistryService,
258 metrics: Arc<SuiNodeMetrics>,
259 checkpoint_metrics: Arc<CheckpointMetrics>,
260
261 _discovery: discovery::Handle,
262 _connection_monitor_handle: mysten_network::anemo_connection_monitor::ConnectionMonitorHandle,
263 state_sync_handle: state_sync::Handle,
264 randomness_handle: randomness::Handle,
265 checkpoint_store: Arc<CheckpointStore>,
266 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
267 connection_monitor_status: Arc<ConnectionMonitorStatus>,
268
269 end_of_epoch_channel: broadcast::Sender<SuiSystemState>,
271
272 endpoint_manager: EndpointManager,
274
275 backpressure_manager: Arc<BackpressureManager>,
276
277 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
278
279 #[cfg(msim)]
280 sim_state: SimState,
281
282 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
283 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
285
286 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
291
292 subscription_service_checkpoint_sender: Option<tokio::sync::mpsc::Sender<Checkpoint>>,
293}
294
295impl fmt::Debug for SuiNode {
296 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
297 f.debug_struct("SuiNode")
298 .field("name", &self.state.name.concise())
299 .finish()
300 }
301}
302
303static MAX_JWK_KEYS_PER_FETCH: usize = 100;
304
305impl SuiNode {
306 pub async fn start(
307 config: NodeConfig,
308 registry_service: RegistryService,
309 ) -> Result<Arc<SuiNode>> {
310 Self::start_async(
311 config,
312 registry_service,
313 ServerVersion::new("sui-node", "unknown"),
314 )
315 .await
316 }
317
318 fn start_jwk_updater(
319 config: &NodeConfig,
320 metrics: Arc<SuiNodeMetrics>,
321 authority: AuthorityName,
322 epoch_store: Arc<AuthorityPerEpochStore>,
323 consensus_adapter: Arc<ConsensusAdapter>,
324 ) {
325 let epoch = epoch_store.epoch();
326
327 let supported_providers = config
328 .zklogin_oauth_providers
329 .get(&epoch_store.get_chain_identifier().chain())
330 .unwrap_or(&BTreeSet::new())
331 .iter()
332 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
333 .collect::<Vec<_>>();
334
335 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
336
337 info!(
338 ?fetch_interval,
339 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
340 );
341
342 fn validate_jwk(
343 metrics: &Arc<SuiNodeMetrics>,
344 provider: &OIDCProvider,
345 id: &JwkId,
346 jwk: &JWK,
347 ) -> bool {
348 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
349 warn!(
350 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
351 id.iss, provider
352 );
353 metrics
354 .invalid_jwks
355 .with_label_values(&[&provider.to_string()])
356 .inc();
357 return false;
358 };
359
360 if iss_provider != *provider {
361 warn!(
362 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
363 id.iss, provider, iss_provider
364 );
365 metrics
366 .invalid_jwks
367 .with_label_values(&[&provider.to_string()])
368 .inc();
369 return false;
370 }
371
372 if !check_total_jwk_size(id, jwk) {
373 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
374 metrics
375 .invalid_jwks
376 .with_label_values(&[&provider.to_string()])
377 .inc();
378 return false;
379 }
380
381 true
382 }
383
384 for p in supported_providers.into_iter() {
393 let provider_str = p.to_string();
394 let epoch_store = epoch_store.clone();
395 let consensus_adapter = consensus_adapter.clone();
396 let metrics = metrics.clone();
397 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
398 async move {
399 let mut seen = HashSet::new();
402 loop {
403 jwk_log!("fetching JWK for provider {:?}", p);
404 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
405 match Self::fetch_jwks(authority, &p).await {
406 Err(e) => {
407 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
408 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
409 tokio::time::sleep(Duration::from_secs(30)).await;
411 continue;
412 }
413 Ok(mut keys) => {
414 metrics.total_jwks
415 .with_label_values(&[&provider_str])
416 .inc_by(keys.len() as u64);
417
418 keys.retain(|(id, jwk)| {
419 validate_jwk(&metrics, &p, id, jwk) &&
420 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
421 seen.insert((id.clone(), jwk.clone()))
422 });
423
424 metrics.unique_jwks
425 .with_label_values(&[&provider_str])
426 .inc_by(keys.len() as u64);
427
428 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
431 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
432 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
433 }
434
435 for (id, jwk) in keys.into_iter() {
436 jwk_log!("Submitting JWK to consensus: {:?}", id);
437
438 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
439 consensus_adapter.submit(txn, None, &epoch_store, None, None)
440 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
441 .ok();
442 }
443 }
444 }
445 tokio::time::sleep(fetch_interval).await;
446 }
447 }
448 .instrument(error_span!("jwk_updater_task", epoch)),
449 ));
450 }
451 }
452
453 pub async fn start_async(
454 config: NodeConfig,
455 registry_service: RegistryService,
456 server_version: ServerVersion,
457 ) -> Result<Arc<SuiNode>> {
458 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
459 let mut config = config.clone();
460 if config.supported_protocol_versions.is_none() {
461 info!(
462 "populating config.supported_protocol_versions with default {:?}",
463 SupportedProtocolVersions::SYSTEM_DEFAULT
464 );
465 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
466 }
467
468 let run_with_range = config.run_with_range;
469 let is_validator = config.consensus_config().is_some();
470 let is_full_node = !is_validator;
471 let prometheus_registry = registry_service.default_registry();
472
473 info!(node =? config.protocol_public_key(),
474 "Initializing sui-node listening on {}", config.network_address
475 );
476
477 DBMetrics::init(registry_service.clone());
479
480 typed_store::init_write_sync(config.enable_db_sync_to_disk);
482
483 mysten_metrics::init_metrics(&prometheus_registry);
485 #[cfg(not(msim))]
487 mysten_metrics::thread_stall_monitor::start_thread_stall_monitor();
488
489 let genesis = config.genesis()?.clone();
490
491 let secret = Arc::pin(config.protocol_key_pair().copy());
492 let genesis_committee = genesis.committee();
493 let committee_store = Arc::new(CommitteeStore::new(
494 config.db_path().join("epochs"),
495 &genesis_committee,
496 None,
497 ));
498
499 let pruner_watermarks = Arc::new(PrunerWatermarks::default());
500 let checkpoint_store = CheckpointStore::new(
501 &config.db_path().join("checkpoints"),
502 pruner_watermarks.clone(),
503 );
504 let checkpoint_metrics = CheckpointMetrics::new(®istry_service.default_registry());
505
506 Self::check_and_recover_forks(
507 &checkpoint_store,
508 &checkpoint_metrics,
509 is_validator,
510 config.fork_recovery.as_ref(),
511 )
512 .await?;
513
514 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
516 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
517 enable_write_stall,
518 is_validator,
519 };
520 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
521 &config.db_path().join("store"),
522 Some(perpetual_tables_options),
523 Some(pruner_watermarks.epoch_id.clone()),
524 ));
525 let is_genesis = perpetual_tables
526 .database_is_empty()
527 .expect("Database read should not fail at init.");
528
529 let backpressure_manager =
530 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
531
532 let store =
533 AuthorityStore::open(perpetual_tables, &genesis, &config, &prometheus_registry).await?;
534
535 let cur_epoch = store.get_recovery_epoch_at_restart()?;
536 let committee = committee_store
537 .get_committee(&cur_epoch)?
538 .expect("Committee of the current epoch must exist");
539 let epoch_start_configuration = store
540 .get_epoch_start_configuration()?
541 .expect("EpochStartConfiguration of the current epoch must exist");
542 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
543 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
544
545 let cache_traits = build_execution_cache(
546 &config.execution_cache,
547 &prometheus_registry,
548 &store,
549 backpressure_manager.clone(),
550 );
551
552 let auth_agg = {
553 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
554 Arc::new(ArcSwap::new(Arc::new(
555 AuthorityAggregator::new_from_epoch_start_state(
556 epoch_start_configuration.epoch_start_state(),
557 &committee_store,
558 safe_client_metrics_base,
559 ),
560 )))
561 };
562
563 let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
564 let chain = match config.chain_override_for_testing {
565 Some(chain) => chain,
566 None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
567 };
568
569 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
570 let epoch_store = AuthorityPerEpochStore::new(
571 config.protocol_public_key(),
572 committee.clone(),
573 &config.db_path().join("store"),
574 Some(epoch_options.options),
575 EpochMetrics::new(®istry_service.default_registry()),
576 epoch_start_configuration,
577 cache_traits.backing_package_store.clone(),
578 cache_traits.object_store.clone(),
579 cache_metrics,
580 signature_verifier_metrics,
581 &config.expensive_safety_check_config,
582 (chain_id, chain),
583 checkpoint_store
584 .get_highest_executed_checkpoint_seq_number()
585 .expect("checkpoint store read cannot fail")
586 .unwrap_or(0),
587 Arc::new(SubmittedTransactionCacheMetrics::new(
588 ®istry_service.default_registry(),
589 )),
590 )?;
591
592 info!("created epoch store");
593
594 replay_log!(
595 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
596 epoch_store.epoch(),
597 epoch_store.protocol_config()
598 );
599
600 if is_genesis {
602 info!("checking SUI conservation at genesis");
603 cache_traits
608 .reconfig_api
609 .expensive_check_sui_conservation(&epoch_store)
610 .expect("SUI conservation check cannot fail at genesis");
611 }
612
613 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
614 let default_buffer_stake = epoch_store
615 .protocol_config()
616 .buffer_stake_for_protocol_upgrade_bps();
617 if effective_buffer_stake != default_buffer_stake {
618 warn!(
619 ?effective_buffer_stake,
620 ?default_buffer_stake,
621 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
622 );
623 }
624
625 checkpoint_store.insert_genesis_checkpoint(
626 genesis.checkpoint(),
627 genesis.checkpoint_contents().clone(),
628 &epoch_store,
629 );
630
631 info!("creating state sync store");
632 let state_sync_store = RocksDbStore::new(
633 cache_traits.clone(),
634 committee_store.clone(),
635 checkpoint_store.clone(),
636 );
637
638 let index_store = if is_full_node && config.enable_index_processing {
639 info!("creating jsonrpc index store");
640 Some(Arc::new(IndexStore::new(
641 config.db_path().join("indexes"),
642 &prometheus_registry,
643 epoch_store
644 .protocol_config()
645 .max_move_identifier_len_as_option(),
646 config.remove_deprecated_tables,
647 )))
648 } else {
649 None
650 };
651
652 let rpc_index = if is_full_node && config.rpc().is_some_and(|rpc| rpc.enable_indexing()) {
653 info!("creating rpc index store");
654 Some(Arc::new(
655 RpcIndexStore::new(
656 &config.db_path(),
657 &store,
658 &checkpoint_store,
659 &epoch_store,
660 &cache_traits.backing_package_store,
661 pruner_watermarks.checkpoint_id.clone(),
662 config.rpc().cloned().unwrap_or_default(),
663 )
664 .await,
665 ))
666 } else {
667 None
668 };
669
670 let chain_identifier = epoch_store.get_chain_identifier();
671
672 info!("creating archive reader");
673 let (randomness_tx, randomness_rx) = mpsc::channel(
675 config
676 .p2p_config
677 .randomness
678 .clone()
679 .unwrap_or_default()
680 .mailbox_capacity(),
681 );
682 let P2pComponents {
683 p2p_network,
684 known_peers,
685 discovery_handle,
686 state_sync_handle,
687 randomness_handle,
688 } = Self::create_p2p_network(
689 &config,
690 state_sync_store.clone(),
691 chain_identifier,
692 randomness_tx,
693 &prometheus_registry,
694 )?;
695
696 let endpoint_manager = EndpointManager::new(discovery_handle.clone());
697
698 for peer in &config.p2p_config.peer_address_overrides {
700 endpoint_manager
701 .update_endpoint(
702 EndpointId::P2p(peer.peer_id),
703 AddressSource::Config,
704 peer.addresses.clone(),
705 )
706 .expect("Updating peer address overrides should not fail");
707 }
708
709 update_peer_addresses(&config, &endpoint_manager, epoch_store.epoch_start_state());
711
712 info!("start snapshot upload");
713 let state_snapshot_handle = Self::start_state_snapshot(
715 &config,
716 &prometheus_registry,
717 checkpoint_store.clone(),
718 chain_identifier,
719 )?;
720
721 info!("start db checkpoint");
723 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
724 &config,
725 &prometheus_registry,
726 state_snapshot_handle.is_some(),
727 )?;
728
729 if !epoch_store
730 .protocol_config()
731 .simplified_unwrap_then_delete()
732 {
733 config
735 .authority_store_pruning_config
736 .set_killswitch_tombstone_pruning(true);
737 }
738
739 let authority_name = config.protocol_public_key();
740
741 info!("create authority state");
742 let state = AuthorityState::new(
743 authority_name,
744 secret,
745 config.supported_protocol_versions.unwrap(),
746 store.clone(),
747 cache_traits.clone(),
748 epoch_store.clone(),
749 committee_store.clone(),
750 index_store.clone(),
751 rpc_index,
752 checkpoint_store.clone(),
753 &prometheus_registry,
754 genesis.objects(),
755 &db_checkpoint_config,
756 config.clone(),
757 chain_identifier,
758 config.policy_config.clone(),
759 config.firewall_config.clone(),
760 pruner_watermarks,
761 )
762 .await;
763 if epoch_store.epoch() == 0 {
765 let txn = &genesis.transaction();
766 let span = error_span!("genesis_txn", tx_digest = ?txn.digest());
767 let transaction =
768 sui_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
769 sui_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
770 genesis.transaction().data().clone(),
771 sui_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
772 ),
773 );
774 state
775 .try_execute_immediately(
776 &transaction,
777 ExecutionEnv::new().with_scheduling_source(SchedulingSource::NonFastPath),
778 &epoch_store,
779 )
780 .instrument(span)
781 .await
782 .unwrap();
783 }
784
785 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
787
788 if config
789 .expensive_safety_check_config
790 .enable_secondary_index_checks()
791 && let Some(indexes) = state.indexes.clone()
792 {
793 sui_core::verify_indexes::verify_indexes(
794 state.get_global_state_hash_store().as_ref(),
795 indexes,
796 )
797 .expect("secondary indexes are inconsistent");
798 }
799
800 let (end_of_epoch_channel, end_of_epoch_receiver) =
801 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
802
803 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
804 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
805 auth_agg.load_full(),
806 state.clone(),
807 end_of_epoch_receiver,
808 &config.db_path(),
809 &prometheus_registry,
810 &config,
811 )))
812 } else {
813 None
814 };
815
816 let (http_servers, subscription_service_checkpoint_sender) = build_http_servers(
817 state.clone(),
818 state_sync_store,
819 &transaction_orchestrator.clone(),
820 &config,
821 &prometheus_registry,
822 server_version,
823 )
824 .await?;
825
826 let global_state_hasher = Arc::new(GlobalStateHasher::new(
827 cache_traits.global_state_hash_store.clone(),
828 GlobalStateHashMetrics::new(&prometheus_registry),
829 ));
830
831 let authority_names_to_peer_ids = epoch_store
832 .epoch_start_state()
833 .get_authority_names_to_peer_ids();
834
835 let network_connection_metrics = mysten_network::quinn_metrics::QuinnConnectionMetrics::new(
836 "sui",
837 ®istry_service.default_registry(),
838 );
839
840 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
841
842 let connection_monitor_handle =
843 mysten_network::anemo_connection_monitor::AnemoConnectionMonitor::spawn(
844 p2p_network.downgrade(),
845 Arc::new(network_connection_metrics),
846 known_peers,
847 );
848
849 let connection_monitor_status = ConnectionMonitorStatus {
850 connection_statuses: connection_monitor_handle.connection_statuses(),
851 authority_names_to_peer_ids,
852 };
853
854 let connection_monitor_status = Arc::new(connection_monitor_status);
855 let sui_node_metrics = Arc::new(SuiNodeMetrics::new(®istry_service.default_registry()));
856
857 sui_node_metrics
858 .binary_max_protocol_version
859 .set(ProtocolVersion::MAX.as_u64() as i64);
860 sui_node_metrics
861 .configured_max_protocol_version
862 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
863
864 let validator_components = if state.is_validator(&epoch_store) {
865 let mut components = Self::construct_validator_components(
866 config.clone(),
867 state.clone(),
868 committee,
869 epoch_store.clone(),
870 checkpoint_store.clone(),
871 state_sync_handle.clone(),
872 randomness_handle.clone(),
873 Arc::downgrade(&global_state_hasher),
874 backpressure_manager.clone(),
875 connection_monitor_status.clone(),
876 ®istry_service,
877 sui_node_metrics.clone(),
878 checkpoint_metrics.clone(),
879 )
880 .await?;
881
882 components.consensus_adapter.submit_recovered(&epoch_store);
883
884 components.validator_server_handle = components.validator_server_handle.start().await;
886
887 endpoint_manager.set_consensus_address_updater(components.consensus_manager.clone());
889
890 Some(components)
891 } else {
892 None
893 };
894
895 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
897
898 let node = Self {
899 config,
900 validator_components: Mutex::new(validator_components),
901 http_servers,
902 state,
903 transaction_orchestrator,
904 registry_service,
905 metrics: sui_node_metrics,
906 checkpoint_metrics,
907
908 _discovery: discovery_handle,
909 _connection_monitor_handle: connection_monitor_handle,
910 state_sync_handle,
911 randomness_handle,
912 checkpoint_store,
913 global_state_hasher: Mutex::new(Some(global_state_hasher)),
914 end_of_epoch_channel,
915 connection_monitor_status,
916 endpoint_manager,
917 backpressure_manager,
918
919 _db_checkpoint_handle: db_checkpoint_handle,
920
921 #[cfg(msim)]
922 sim_state: Default::default(),
923
924 _state_snapshot_uploader_handle: state_snapshot_handle,
925 shutdown_channel_tx: shutdown_channel,
926
927 auth_agg,
928 subscription_service_checkpoint_sender,
929 };
930
931 info!("SuiNode started!");
932 let node = Arc::new(node);
933 let node_copy = node.clone();
934 spawn_monitored_task!(async move {
935 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
936 if let Err(error) = result {
937 warn!("Reconfiguration finished with error {:?}", error);
938 }
939 });
940
941 Ok(node)
942 }
943
944 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
945 self.end_of_epoch_channel.subscribe()
946 }
947
948 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
949 self.shutdown_channel_tx.subscribe()
950 }
951
952 pub fn current_epoch_for_testing(&self) -> EpochId {
953 self.state.current_epoch_for_testing()
954 }
955
956 pub fn db_checkpoint_path(&self) -> PathBuf {
957 self.config.db_checkpoint_path()
958 }
959
960 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> SuiResult {
962 info!("close_epoch (current epoch = {})", epoch_store.epoch());
963 self.validator_components
964 .lock()
965 .await
966 .as_ref()
967 .ok_or_else(|| SuiError::from("Node is not a validator"))?
968 .consensus_adapter
969 .close_epoch(epoch_store);
970 Ok(())
971 }
972
973 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> SuiResult {
974 self.state
975 .clear_override_protocol_upgrade_buffer_stake(epoch)
976 }
977
978 pub fn set_override_protocol_upgrade_buffer_stake(
979 &self,
980 epoch: EpochId,
981 buffer_stake_bps: u64,
982 ) -> SuiResult {
983 self.state
984 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
985 }
986
987 pub async fn close_epoch_for_testing(&self) -> SuiResult {
990 let epoch_store = self.state.epoch_store_for_testing();
991 self.close_epoch(&epoch_store).await
992 }
993
994 fn start_state_snapshot(
995 config: &NodeConfig,
996 prometheus_registry: &Registry,
997 checkpoint_store: Arc<CheckpointStore>,
998 chain_identifier: ChainIdentifier,
999 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1000 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1001 let snapshot_uploader = StateSnapshotUploader::new(
1002 &config.db_checkpoint_path(),
1003 &config.snapshot_path(),
1004 remote_store_config.clone(),
1005 60,
1006 prometheus_registry,
1007 checkpoint_store,
1008 chain_identifier,
1009 config.state_snapshot_write_config.archive_interval_epochs,
1010 )?;
1011 Ok(Some(snapshot_uploader.start()))
1012 } else {
1013 Ok(None)
1014 }
1015 }
1016
1017 fn start_db_checkpoint(
1018 config: &NodeConfig,
1019 prometheus_registry: &Registry,
1020 state_snapshot_enabled: bool,
1021 ) -> Result<(
1022 DBCheckpointConfig,
1023 Option<tokio::sync::broadcast::Sender<()>>,
1024 )> {
1025 let checkpoint_path = Some(
1026 config
1027 .db_checkpoint_config
1028 .checkpoint_path
1029 .clone()
1030 .unwrap_or_else(|| config.db_checkpoint_path()),
1031 );
1032 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1033 DBCheckpointConfig {
1034 checkpoint_path,
1035 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1036 true
1037 } else {
1038 config
1039 .db_checkpoint_config
1040 .perform_db_checkpoints_at_epoch_end
1041 },
1042 ..config.db_checkpoint_config.clone()
1043 }
1044 } else {
1045 config.db_checkpoint_config.clone()
1046 };
1047
1048 match (
1049 db_checkpoint_config.object_store_config.as_ref(),
1050 state_snapshot_enabled,
1051 ) {
1052 (None, false) => Ok((db_checkpoint_config, None)),
1057 (_, _) => {
1058 let handler = DBCheckpointHandler::new(
1059 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1060 db_checkpoint_config.object_store_config.as_ref(),
1061 60,
1062 db_checkpoint_config
1063 .prune_and_compact_before_upload
1064 .unwrap_or(true),
1065 config.authority_store_pruning_config.clone(),
1066 prometheus_registry,
1067 state_snapshot_enabled,
1068 )?;
1069 Ok((
1070 db_checkpoint_config,
1071 Some(DBCheckpointHandler::start(handler)),
1072 ))
1073 }
1074 }
1075 }
1076
1077 fn create_p2p_network(
1078 config: &NodeConfig,
1079 state_sync_store: RocksDbStore,
1080 chain_identifier: ChainIdentifier,
1081 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1082 prometheus_registry: &Registry,
1083 ) -> Result<P2pComponents> {
1084 let (state_sync, state_sync_router) = state_sync::Builder::new()
1085 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1086 .store(state_sync_store)
1087 .archive_config(config.archive_reader_config())
1088 .with_metrics(prometheus_registry)
1089 .build();
1090
1091 let (discovery, discovery_server) = discovery::Builder::new()
1092 .config(config.p2p_config.clone())
1093 .build();
1094
1095 let discovery_config = config.p2p_config.discovery.clone().unwrap_or_default();
1096 let known_peers: HashMap<PeerId, String> = discovery_config
1097 .allowlisted_peers
1098 .clone()
1099 .into_iter()
1100 .map(|ap| (ap.peer_id, "allowlisted_peer".to_string()))
1101 .chain(config.p2p_config.seed_peers.iter().filter_map(|peer| {
1102 peer.peer_id
1103 .map(|peer_id| (peer_id, "seed_peer".to_string()))
1104 }))
1105 .collect();
1106
1107 let (randomness, randomness_router) =
1108 randomness::Builder::new(config.protocol_public_key(), randomness_tx)
1109 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1110 .with_metrics(prometheus_registry)
1111 .build();
1112
1113 let p2p_network = {
1114 let routes = anemo::Router::new()
1115 .add_rpc_service(discovery_server)
1116 .merge(state_sync_router);
1117 let routes = routes.merge(randomness_router);
1118
1119 let inbound_network_metrics =
1120 mysten_network::metrics::NetworkMetrics::new("sui", "inbound", prometheus_registry);
1121 let outbound_network_metrics = mysten_network::metrics::NetworkMetrics::new(
1122 "sui",
1123 "outbound",
1124 prometheus_registry,
1125 );
1126
1127 let service = ServiceBuilder::new()
1128 .layer(
1129 TraceLayer::new_for_server_errors()
1130 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1131 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1132 )
1133 .layer(CallbackLayer::new(
1134 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1135 Arc::new(inbound_network_metrics),
1136 config.p2p_config.excessive_message_size(),
1137 ),
1138 ))
1139 .service(routes);
1140
1141 let outbound_layer = ServiceBuilder::new()
1142 .layer(
1143 TraceLayer::new_for_client_and_server_errors()
1144 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1145 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1146 )
1147 .layer(CallbackLayer::new(
1148 mysten_network::metrics::MetricsMakeCallbackHandler::new(
1149 Arc::new(outbound_network_metrics),
1150 config.p2p_config.excessive_message_size(),
1151 ),
1152 ))
1153 .into_inner();
1154
1155 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1156 anemo_config.max_frame_size = Some(1 << 30);
1159
1160 let mut quic_config = anemo_config.quic.unwrap_or_default();
1163 if quic_config.socket_send_buffer_size.is_none() {
1164 quic_config.socket_send_buffer_size = Some(20 << 20);
1165 }
1166 if quic_config.socket_receive_buffer_size.is_none() {
1167 quic_config.socket_receive_buffer_size = Some(20 << 20);
1168 }
1169 quic_config.allow_failed_socket_buffer_size_setting = true;
1170
1171 if quic_config.max_concurrent_bidi_streams.is_none() {
1174 quic_config.max_concurrent_bidi_streams = Some(500);
1175 }
1176 if quic_config.max_concurrent_uni_streams.is_none() {
1177 quic_config.max_concurrent_uni_streams = Some(500);
1178 }
1179 if quic_config.stream_receive_window.is_none() {
1180 quic_config.stream_receive_window = Some(100 << 20);
1181 }
1182 if quic_config.receive_window.is_none() {
1183 quic_config.receive_window = Some(200 << 20);
1184 }
1185 if quic_config.send_window.is_none() {
1186 quic_config.send_window = Some(200 << 20);
1187 }
1188 if quic_config.crypto_buffer_size.is_none() {
1189 quic_config.crypto_buffer_size = Some(1 << 20);
1190 }
1191 if quic_config.max_idle_timeout_ms.is_none() {
1192 quic_config.max_idle_timeout_ms = Some(10_000);
1193 }
1194 if quic_config.keep_alive_interval_ms.is_none() {
1195 quic_config.keep_alive_interval_ms = Some(5_000);
1196 }
1197 anemo_config.quic = Some(quic_config);
1198
1199 let server_name = format!("sui-{}", chain_identifier);
1200 let network = Network::bind(config.p2p_config.listen_address)
1201 .server_name(&server_name)
1202 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1203 .config(anemo_config)
1204 .outbound_request_layer(outbound_layer)
1205 .start(service)?;
1206 info!(
1207 server_name = server_name,
1208 "P2p network started on {}",
1209 network.local_addr()
1210 );
1211
1212 network
1213 };
1214
1215 let discovery_handle =
1216 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1217 let state_sync_handle = state_sync.start(p2p_network.clone());
1218 let randomness_handle = randomness.start(p2p_network.clone());
1219
1220 Ok(P2pComponents {
1221 p2p_network,
1222 known_peers,
1223 discovery_handle,
1224 state_sync_handle,
1225 randomness_handle,
1226 })
1227 }
1228
1229 async fn construct_validator_components(
1230 config: NodeConfig,
1231 state: Arc<AuthorityState>,
1232 committee: Arc<Committee>,
1233 epoch_store: Arc<AuthorityPerEpochStore>,
1234 checkpoint_store: Arc<CheckpointStore>,
1235 state_sync_handle: state_sync::Handle,
1236 randomness_handle: randomness::Handle,
1237 global_state_hasher: Weak<GlobalStateHasher>,
1238 backpressure_manager: Arc<BackpressureManager>,
1239 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1240 registry_service: &RegistryService,
1241 sui_node_metrics: Arc<SuiNodeMetrics>,
1242 checkpoint_metrics: Arc<CheckpointMetrics>,
1243 ) -> Result<ValidatorComponents> {
1244 let mut config_clone = config.clone();
1245 let consensus_config = config_clone
1246 .consensus_config
1247 .as_mut()
1248 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1249
1250 let client = Arc::new(UpdatableConsensusClient::new());
1251 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1252 &committee,
1253 consensus_config,
1254 state.name,
1255 connection_monitor_status.clone(),
1256 ®istry_service.default_registry(),
1257 epoch_store.protocol_config().clone(),
1258 client.clone(),
1259 checkpoint_store.clone(),
1260 ));
1261 let consensus_manager = Arc::new(ConsensusManager::new(
1262 &config,
1263 consensus_config,
1264 registry_service,
1265 client,
1266 ));
1267
1268 let consensus_store_pruner = ConsensusStorePruner::new(
1270 consensus_manager.get_storage_base_path(),
1271 consensus_config.db_retention_epochs(),
1272 consensus_config.db_pruner_period(),
1273 ®istry_service.default_registry(),
1274 );
1275
1276 let sui_tx_validator_metrics =
1277 SuiTxValidatorMetrics::new(®istry_service.default_registry());
1278
1279 let validator_server_handle = Self::start_grpc_validator_service(
1280 &config,
1281 state.clone(),
1282 consensus_adapter.clone(),
1283 ®istry_service.default_registry(),
1284 )
1285 .await?;
1286
1287 let validator_overload_monitor_handle = if config
1290 .authority_overload_config
1291 .max_load_shedding_percentage
1292 > 0
1293 {
1294 let authority_state = Arc::downgrade(&state);
1295 let overload_config = config.authority_overload_config.clone();
1296 fail_point!("starting_overload_monitor");
1297 Some(spawn_monitored_task!(overload_monitor(
1298 authority_state,
1299 overload_config,
1300 )))
1301 } else {
1302 None
1303 };
1304
1305 Self::start_epoch_specific_validator_components(
1306 &config,
1307 state.clone(),
1308 consensus_adapter,
1309 checkpoint_store,
1310 epoch_store,
1311 state_sync_handle,
1312 randomness_handle,
1313 consensus_manager,
1314 consensus_store_pruner,
1315 global_state_hasher,
1316 backpressure_manager,
1317 validator_server_handle,
1318 validator_overload_monitor_handle,
1319 checkpoint_metrics,
1320 sui_node_metrics,
1321 sui_tx_validator_metrics,
1322 )
1323 .await
1324 }
1325
1326 async fn start_epoch_specific_validator_components(
1327 config: &NodeConfig,
1328 state: Arc<AuthorityState>,
1329 consensus_adapter: Arc<ConsensusAdapter>,
1330 checkpoint_store: Arc<CheckpointStore>,
1331 epoch_store: Arc<AuthorityPerEpochStore>,
1332 state_sync_handle: state_sync::Handle,
1333 randomness_handle: randomness::Handle,
1334 consensus_manager: Arc<ConsensusManager>,
1335 consensus_store_pruner: ConsensusStorePruner,
1336 state_hasher: Weak<GlobalStateHasher>,
1337 backpressure_manager: Arc<BackpressureManager>,
1338 validator_server_handle: SpawnOnce,
1339 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1340 checkpoint_metrics: Arc<CheckpointMetrics>,
1341 sui_node_metrics: Arc<SuiNodeMetrics>,
1342 sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
1343 ) -> Result<ValidatorComponents> {
1344 let checkpoint_service = Self::build_checkpoint_service(
1345 config,
1346 consensus_adapter.clone(),
1347 checkpoint_store.clone(),
1348 epoch_store.clone(),
1349 state.clone(),
1350 state_sync_handle,
1351 state_hasher,
1352 checkpoint_metrics.clone(),
1353 );
1354
1355 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1359
1360 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1361
1362 if epoch_store.randomness_state_enabled() {
1363 let randomness_manager = RandomnessManager::try_new(
1364 Arc::downgrade(&epoch_store),
1365 Box::new(consensus_adapter.clone()),
1366 randomness_handle,
1367 config.protocol_key_pair(),
1368 )
1369 .await;
1370 if let Some(randomness_manager) = randomness_manager {
1371 epoch_store
1372 .set_randomness_manager(randomness_manager)
1373 .await?;
1374 }
1375 }
1376
1377 ExecutionTimeObserver::spawn(
1378 epoch_store.clone(),
1379 Box::new(consensus_adapter.clone()),
1380 config
1381 .execution_time_observer_config
1382 .clone()
1383 .unwrap_or_default(),
1384 );
1385
1386 let throughput_calculator = Arc::new(ConsensusThroughputCalculator::new(
1387 None,
1388 state.metrics.clone(),
1389 ));
1390
1391 let throughput_profiler = Arc::new(ConsensusThroughputProfiler::new(
1392 throughput_calculator.clone(),
1393 None,
1394 None,
1395 state.metrics.clone(),
1396 ThroughputProfileRanges::from_chain(epoch_store.get_chain_identifier()),
1397 ));
1398
1399 consensus_adapter.swap_throughput_profiler(throughput_profiler);
1400
1401 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1402 state.clone(),
1403 checkpoint_service.clone(),
1404 epoch_store.clone(),
1405 consensus_adapter.clone(),
1406 low_scoring_authorities,
1407 throughput_calculator,
1408 backpressure_manager,
1409 config.congestion_log.clone(),
1410 );
1411
1412 info!("Starting consensus manager asynchronously");
1413
1414 tokio::spawn({
1416 let config = config.clone();
1417 let epoch_store = epoch_store.clone();
1418 let sui_tx_validator = SuiTxValidator::new(
1419 state.clone(),
1420 epoch_store.clone(),
1421 checkpoint_service.clone(),
1422 sui_tx_validator_metrics.clone(),
1423 );
1424 let consensus_manager = consensus_manager.clone();
1425 async move {
1426 consensus_manager
1427 .start(
1428 &config,
1429 epoch_store,
1430 consensus_handler_initializer,
1431 sui_tx_validator,
1432 )
1433 .await;
1434 }
1435 });
1436 let replay_waiter = consensus_manager.replay_waiter();
1437
1438 info!("Spawning checkpoint service");
1439 let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1440 None
1441 } else {
1442 Some(replay_waiter)
1443 };
1444 checkpoint_service
1445 .spawn(epoch_store.clone(), replay_waiter)
1446 .await;
1447
1448 if epoch_store.authenticator_state_enabled() {
1449 Self::start_jwk_updater(
1450 config,
1451 sui_node_metrics,
1452 state.name,
1453 epoch_store.clone(),
1454 consensus_adapter.clone(),
1455 );
1456 }
1457
1458 Ok(ValidatorComponents {
1459 validator_server_handle,
1460 validator_overload_monitor_handle,
1461 consensus_manager,
1462 consensus_store_pruner,
1463 consensus_adapter,
1464 checkpoint_metrics,
1465 sui_tx_validator_metrics,
1466 })
1467 }
1468
1469 fn build_checkpoint_service(
1470 config: &NodeConfig,
1471 consensus_adapter: Arc<ConsensusAdapter>,
1472 checkpoint_store: Arc<CheckpointStore>,
1473 epoch_store: Arc<AuthorityPerEpochStore>,
1474 state: Arc<AuthorityState>,
1475 state_sync_handle: state_sync::Handle,
1476 state_hasher: Weak<GlobalStateHasher>,
1477 checkpoint_metrics: Arc<CheckpointMetrics>,
1478 ) -> Arc<CheckpointService> {
1479 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1480 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1481
1482 debug!(
1483 "Starting checkpoint service with epoch start timestamp {}
1484 and epoch duration {}",
1485 epoch_start_timestamp_ms, epoch_duration_ms
1486 );
1487
1488 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1489 sender: consensus_adapter,
1490 signer: state.secret.clone(),
1491 authority: config.protocol_public_key(),
1492 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1493 .checked_add(epoch_duration_ms)
1494 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1495 metrics: checkpoint_metrics.clone(),
1496 });
1497
1498 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1499 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1500 let max_checkpoint_size_bytes =
1501 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1502
1503 CheckpointService::build(
1504 state.clone(),
1505 checkpoint_store,
1506 epoch_store,
1507 state.get_transaction_cache_reader().clone(),
1508 state_hasher,
1509 checkpoint_output,
1510 Box::new(certified_checkpoint_output),
1511 checkpoint_metrics,
1512 max_tx_per_checkpoint,
1513 max_checkpoint_size_bytes,
1514 )
1515 }
1516
1517 fn construct_consensus_adapter(
1518 committee: &Committee,
1519 consensus_config: &ConsensusConfig,
1520 authority: AuthorityName,
1521 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1522 prometheus_registry: &Registry,
1523 protocol_config: ProtocolConfig,
1524 consensus_client: Arc<dyn ConsensusClient>,
1525 checkpoint_store: Arc<CheckpointStore>,
1526 ) -> ConsensusAdapter {
1527 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1528 ConsensusAdapter::new(
1531 consensus_client,
1532 checkpoint_store,
1533 authority,
1534 connection_monitor_status,
1535 consensus_config.max_pending_transactions(),
1536 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1537 consensus_config.max_submit_position,
1538 consensus_config.submit_delay_step_override(),
1539 ca_metrics,
1540 protocol_config,
1541 )
1542 }
1543
1544 async fn start_grpc_validator_service(
1545 config: &NodeConfig,
1546 state: Arc<AuthorityState>,
1547 consensus_adapter: Arc<ConsensusAdapter>,
1548 prometheus_registry: &Registry,
1549 ) -> Result<SpawnOnce> {
1550 let validator_service = ValidatorService::new(
1551 state.clone(),
1552 consensus_adapter,
1553 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1554 config.policy_config.clone().map(|p| p.client_id_source),
1555 );
1556
1557 let mut server_conf = mysten_network::config::Config::new();
1558 server_conf.connect_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1559 server_conf.http2_keepalive_interval = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1560 server_conf.http2_keepalive_timeout = Some(DEFAULT_GRPC_CONNECT_TIMEOUT);
1561 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1562 server_conf.load_shed = config.grpc_load_shed;
1563 let mut server_builder =
1564 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1565
1566 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1567
1568 let tls_config = sui_tls::create_rustls_server_config(
1569 config.network_key_pair().copy().private(),
1570 SUI_TLS_SERVER_NAME.to_string(),
1571 );
1572
1573 let network_address = config.network_address().clone();
1574
1575 let (ready_tx, ready_rx) = oneshot::channel();
1576
1577 Ok(SpawnOnce::new(ready_rx, async move {
1578 let server = server_builder
1579 .bind(&network_address, Some(tls_config))
1580 .await
1581 .unwrap_or_else(|err| panic!("Failed to bind to {network_address}: {err}"));
1582 let local_addr = server.local_addr();
1583 info!("Listening to traffic on {local_addr}");
1584 ready_tx.send(()).unwrap();
1585 if let Err(err) = server.serve().await {
1586 info!("Server stopped: {err}");
1587 }
1588 info!("Server stopped");
1589 }))
1590 }
1591
1592 pub fn state(&self) -> Arc<AuthorityState> {
1593 self.state.clone()
1594 }
1595
1596 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1598 self.state.reference_gas_price_for_testing()
1599 }
1600
1601 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1602 self.state.committee_store().clone()
1603 }
1604
1605 pub fn clone_authority_aggregator(
1616 &self,
1617 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1618 self.transaction_orchestrator
1619 .as_ref()
1620 .map(|to| to.clone_authority_aggregator())
1621 }
1622
1623 pub fn transaction_orchestrator(
1624 &self,
1625 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1626 self.transaction_orchestrator.clone()
1627 }
1628
1629 pub async fn monitor_reconfiguration(
1632 self: Arc<Self>,
1633 mut epoch_store: Arc<AuthorityPerEpochStore>,
1634 ) -> Result<()> {
1635 let checkpoint_executor_metrics =
1636 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1637
1638 loop {
1639 let mut hasher_guard = self.global_state_hasher.lock().await;
1640 let hasher = hasher_guard.take().unwrap();
1641 info!(
1642 "Creating checkpoint executor for epoch {}",
1643 epoch_store.epoch()
1644 );
1645 let checkpoint_executor = CheckpointExecutor::new(
1646 epoch_store.clone(),
1647 self.checkpoint_store.clone(),
1648 self.state.clone(),
1649 hasher.clone(),
1650 self.backpressure_manager.clone(),
1651 self.config.checkpoint_executor_config.clone(),
1652 checkpoint_executor_metrics.clone(),
1653 self.subscription_service_checkpoint_sender.clone(),
1654 );
1655
1656 let run_with_range = self.config.run_with_range;
1657
1658 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1659
1660 self.metrics
1662 .current_protocol_version
1663 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1664
1665 if let Some(components) = &*self.validator_components.lock().await {
1667 tokio::time::sleep(Duration::from_millis(1)).await;
1669
1670 let config = cur_epoch_store.protocol_config();
1671 let mut supported_protocol_versions = self
1672 .config
1673 .supported_protocol_versions
1674 .expect("Supported versions should be populated")
1675 .truncate_below(config.version);
1677
1678 while supported_protocol_versions.max > config.version {
1679 let proposed_protocol_config = ProtocolConfig::get_for_version(
1680 supported_protocol_versions.max,
1681 cur_epoch_store.get_chain(),
1682 );
1683
1684 if proposed_protocol_config.enable_accumulators()
1685 && !epoch_store.accumulator_root_exists()
1686 {
1687 error!(
1688 "cannot upgrade to protocol version {:?} because accumulator root does not exist",
1689 supported_protocol_versions.max
1690 );
1691 supported_protocol_versions.max = supported_protocol_versions.max.prev();
1692 } else {
1693 break;
1694 }
1695 }
1696
1697 let binary_config = config.binary_config(None);
1698 let transaction = ConsensusTransaction::new_capability_notification_v2(
1699 AuthorityCapabilitiesV2::new(
1700 self.state.name,
1701 cur_epoch_store.get_chain_identifier().chain(),
1702 supported_protocol_versions,
1703 self.state
1704 .get_available_system_packages(&binary_config)
1705 .await,
1706 ),
1707 );
1708 info!(?transaction, "submitting capabilities to consensus");
1709 components.consensus_adapter.submit(
1710 transaction,
1711 None,
1712 &cur_epoch_store,
1713 None,
1714 None,
1715 )?;
1716 }
1717
1718 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1719
1720 if stop_condition == StopReason::RunWithRangeCondition {
1721 SuiNode::shutdown(&self).await;
1722 self.shutdown_channel_tx
1723 .send(run_with_range)
1724 .expect("RunWithRangeCondition met but failed to send shutdown message");
1725 return Ok(());
1726 }
1727
1728 let latest_system_state = self
1730 .state
1731 .get_object_cache_reader()
1732 .get_sui_system_state_object_unsafe()
1733 .expect("Read Sui System State object cannot fail");
1734
1735 #[cfg(msim)]
1736 if !self
1737 .sim_state
1738 .sim_safe_mode_expected
1739 .load(Ordering::Relaxed)
1740 {
1741 debug_assert!(!latest_system_state.safe_mode());
1742 }
1743
1744 #[cfg(not(msim))]
1745 debug_assert!(!latest_system_state.safe_mode());
1746
1747 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone())
1748 && self.state.is_fullnode(&cur_epoch_store)
1749 {
1750 warn!(
1751 "Failed to send end of epoch notification to subscriber: {:?}",
1752 err
1753 );
1754 }
1755
1756 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1757 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1758
1759 self.auth_agg.store(Arc::new(
1760 self.auth_agg
1761 .load()
1762 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1763 ));
1764
1765 let next_epoch_committee = new_epoch_start_state.get_sui_committee();
1766 let next_epoch = next_epoch_committee.epoch();
1767 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1768
1769 info!(
1770 next_epoch,
1771 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1772 );
1773
1774 fail_point_async!("reconfig_delay");
1775
1776 let authority_names_to_peer_ids =
1781 new_epoch_start_state.get_authority_names_to_peer_ids();
1782 self.connection_monitor_status
1783 .update_mapping_for_epoch(authority_names_to_peer_ids);
1784
1785 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1786
1787 update_peer_addresses(&self.config, &self.endpoint_manager, &new_epoch_start_state);
1788
1789 let mut validator_components_lock_guard = self.validator_components.lock().await;
1790
1791 let new_epoch_store = self
1795 .reconfigure_state(
1796 &self.state,
1797 &cur_epoch_store,
1798 next_epoch_committee.clone(),
1799 new_epoch_start_state,
1800 hasher.clone(),
1801 )
1802 .await;
1803
1804 let new_validator_components = if let Some(ValidatorComponents {
1805 validator_server_handle,
1806 validator_overload_monitor_handle,
1807 consensus_manager,
1808 consensus_store_pruner,
1809 consensus_adapter,
1810 checkpoint_metrics,
1811 sui_tx_validator_metrics,
1812 }) = validator_components_lock_guard.take()
1813 {
1814 info!("Reconfiguring the validator.");
1815
1816 consensus_manager.shutdown().await;
1817 info!("Consensus has shut down.");
1818
1819 info!("Epoch store finished reconfiguration.");
1820
1821 let global_state_hasher_metrics = Arc::into_inner(hasher)
1824 .expect("Object state hasher should have no other references at this point")
1825 .metrics();
1826 let new_hasher = Arc::new(GlobalStateHasher::new(
1827 self.state.get_global_state_hash_store().clone(),
1828 global_state_hasher_metrics,
1829 ));
1830 let weak_hasher = Arc::downgrade(&new_hasher);
1831 *hasher_guard = Some(new_hasher);
1832
1833 consensus_store_pruner.prune(next_epoch).await;
1834
1835 if self.state.is_validator(&new_epoch_store) {
1836 Some(
1838 Self::start_epoch_specific_validator_components(
1839 &self.config,
1840 self.state.clone(),
1841 consensus_adapter,
1842 self.checkpoint_store.clone(),
1843 new_epoch_store.clone(),
1844 self.state_sync_handle.clone(),
1845 self.randomness_handle.clone(),
1846 consensus_manager,
1847 consensus_store_pruner,
1848 weak_hasher,
1849 self.backpressure_manager.clone(),
1850 validator_server_handle,
1851 validator_overload_monitor_handle,
1852 checkpoint_metrics,
1853 self.metrics.clone(),
1854 sui_tx_validator_metrics,
1855 )
1856 .await?,
1857 )
1858 } else {
1859 info!("This node is no longer a validator after reconfiguration");
1860 None
1861 }
1862 } else {
1863 let global_state_hasher_metrics = Arc::into_inner(hasher)
1866 .expect("Object state hasher should have no other references at this point")
1867 .metrics();
1868 let new_hasher = Arc::new(GlobalStateHasher::new(
1869 self.state.get_global_state_hash_store().clone(),
1870 global_state_hasher_metrics,
1871 ));
1872 let weak_hasher = Arc::downgrade(&new_hasher);
1873 *hasher_guard = Some(new_hasher);
1874
1875 if self.state.is_validator(&new_epoch_store) {
1876 info!("Promoting the node from fullnode to validator, starting grpc server");
1877
1878 let mut components = Self::construct_validator_components(
1879 self.config.clone(),
1880 self.state.clone(),
1881 Arc::new(next_epoch_committee.clone()),
1882 new_epoch_store.clone(),
1883 self.checkpoint_store.clone(),
1884 self.state_sync_handle.clone(),
1885 self.randomness_handle.clone(),
1886 weak_hasher,
1887 self.backpressure_manager.clone(),
1888 self.connection_monitor_status.clone(),
1889 &self.registry_service,
1890 self.metrics.clone(),
1891 self.checkpoint_metrics.clone(),
1892 )
1893 .await?;
1894
1895 components.validator_server_handle =
1896 components.validator_server_handle.start().await;
1897
1898 self.endpoint_manager
1900 .set_consensus_address_updater(components.consensus_manager.clone());
1901
1902 Some(components)
1903 } else {
1904 None
1905 }
1906 };
1907 *validator_components_lock_guard = new_validator_components;
1908
1909 cur_epoch_store.release_db_handles();
1912
1913 if cfg!(msim)
1914 && !matches!(
1915 self.config
1916 .authority_store_pruning_config
1917 .num_epochs_to_retain_for_checkpoints(),
1918 None | Some(u64::MAX) | Some(0)
1919 )
1920 {
1921 self.state
1922 .prune_checkpoints_for_eligible_epochs_for_testing(
1923 self.config.clone(),
1924 sui_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1925 )
1926 .await?;
1927 }
1928
1929 epoch_store = new_epoch_store;
1930 info!("Reconfiguration finished");
1931 }
1932 }
1933
1934 async fn shutdown(&self) {
1935 if let Some(validator_components) = &*self.validator_components.lock().await {
1936 validator_components.consensus_manager.shutdown().await;
1937 }
1938 }
1939
1940 async fn reconfigure_state(
1941 &self,
1942 state: &Arc<AuthorityState>,
1943 cur_epoch_store: &AuthorityPerEpochStore,
1944 next_epoch_committee: Committee,
1945 next_epoch_start_system_state: EpochStartSystemState,
1946 global_state_hasher: Arc<GlobalStateHasher>,
1947 ) -> Arc<AuthorityPerEpochStore> {
1948 let next_epoch = next_epoch_committee.epoch();
1949
1950 let last_checkpoint = self
1951 .checkpoint_store
1952 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1953 .expect("Error loading last checkpoint for current epoch")
1954 .expect("Could not load last checkpoint for current epoch");
1955
1956 let last_checkpoint_seq = *last_checkpoint.sequence_number();
1957
1958 assert_eq!(
1959 Some(last_checkpoint_seq),
1960 self.checkpoint_store
1961 .get_highest_executed_checkpoint_seq_number()
1962 .expect("Error loading highest executed checkpoint sequence number")
1963 );
1964
1965 let epoch_start_configuration = EpochStartConfiguration::new(
1966 next_epoch_start_system_state,
1967 *last_checkpoint.digest(),
1968 state.get_object_store().as_ref(),
1969 EpochFlag::default_flags_for_new_epoch(&state.config),
1970 )
1971 .expect("EpochStartConfiguration construction cannot fail");
1972
1973 let new_epoch_store = self
1974 .state
1975 .reconfigure(
1976 cur_epoch_store,
1977 self.config.supported_protocol_versions.unwrap(),
1978 next_epoch_committee,
1979 epoch_start_configuration,
1980 global_state_hasher,
1981 &self.config.expensive_safety_check_config,
1982 last_checkpoint_seq,
1983 )
1984 .await
1985 .expect("Reconfigure authority state cannot fail");
1986 info!(next_epoch, "Node State has been reconfigured");
1987 assert_eq!(next_epoch, new_epoch_store.epoch());
1988 self.state.get_reconfig_api().update_epoch_flags_metrics(
1989 cur_epoch_store.epoch_start_config().flags(),
1990 new_epoch_store.epoch_start_config().flags(),
1991 );
1992
1993 new_epoch_store
1994 }
1995
1996 pub fn get_config(&self) -> &NodeConfig {
1997 &self.config
1998 }
1999
2000 pub fn randomness_handle(&self) -> randomness::Handle {
2001 self.randomness_handle.clone()
2002 }
2003
2004 pub fn endpoint_manager(&self) -> &EndpointManager {
2005 &self.endpoint_manager
2006 }
2007
2008 fn get_digest_prefix(digest: impl std::fmt::Display) -> String {
2010 let digest_str = digest.to_string();
2011 if digest_str.len() >= 8 {
2012 digest_str[0..8].to_string()
2013 } else {
2014 digest_str
2015 }
2016 }
2017
2018 async fn check_and_recover_forks(
2022 checkpoint_store: &CheckpointStore,
2023 checkpoint_metrics: &CheckpointMetrics,
2024 is_validator: bool,
2025 fork_recovery: Option<&ForkRecoveryConfig>,
2026 ) -> Result<()> {
2027 if !is_validator {
2030 return Ok(());
2031 }
2032
2033 if let Some(recovery) = fork_recovery {
2035 Self::try_recover_checkpoint_fork(checkpoint_store, recovery)?;
2036 Self::try_recover_transaction_fork(checkpoint_store, recovery)?;
2037 }
2038
2039 if let Some((checkpoint_seq, checkpoint_digest)) = checkpoint_store
2040 .get_checkpoint_fork_detected()
2041 .map_err(|e| {
2042 error!("Failed to check for checkpoint fork: {:?}", e);
2043 e
2044 })?
2045 {
2046 Self::handle_checkpoint_fork(
2047 checkpoint_seq,
2048 checkpoint_digest,
2049 checkpoint_metrics,
2050 fork_recovery,
2051 )
2052 .await?;
2053 }
2054 if let Some((tx_digest, expected_effects, actual_effects)) = checkpoint_store
2055 .get_transaction_fork_detected()
2056 .map_err(|e| {
2057 error!("Failed to check for transaction fork: {:?}", e);
2058 e
2059 })?
2060 {
2061 Self::handle_transaction_fork(
2062 tx_digest,
2063 expected_effects,
2064 actual_effects,
2065 checkpoint_metrics,
2066 fork_recovery,
2067 )
2068 .await?;
2069 }
2070
2071 Ok(())
2072 }
2073
2074 fn try_recover_checkpoint_fork(
2075 checkpoint_store: &CheckpointStore,
2076 recovery: &ForkRecoveryConfig,
2077 ) -> Result<()> {
2078 for (seq, expected_digest_str) in &recovery.checkpoint_overrides {
2081 let Ok(expected_digest) = CheckpointDigest::from_str(expected_digest_str) else {
2082 anyhow::bail!(
2083 "Invalid checkpoint digest override for seq {}: {}",
2084 seq,
2085 expected_digest_str
2086 );
2087 };
2088
2089 if let Some(local_summary) = checkpoint_store.get_locally_computed_checkpoint(*seq)? {
2090 let local_digest = sui_types::message_envelope::Message::digest(&local_summary);
2091 if local_digest != expected_digest {
2092 info!(
2093 seq,
2094 local = %Self::get_digest_prefix(local_digest),
2095 expected = %Self::get_digest_prefix(expected_digest),
2096 "Fork recovery: clearing locally_computed_checkpoints from {} due to digest mismatch",
2097 seq
2098 );
2099 checkpoint_store
2100 .clear_locally_computed_checkpoints_from(*seq)
2101 .context(
2102 "Failed to clear locally computed checkpoints from override seq",
2103 )?;
2104 }
2105 }
2106 }
2107
2108 if let Some((checkpoint_seq, checkpoint_digest)) =
2109 checkpoint_store.get_checkpoint_fork_detected()?
2110 && recovery.checkpoint_overrides.contains_key(&checkpoint_seq)
2111 {
2112 info!(
2113 "Fork recovery enabled: clearing checkpoint fork at seq {} with digest {:?}",
2114 checkpoint_seq, checkpoint_digest
2115 );
2116 checkpoint_store
2117 .clear_checkpoint_fork_detected()
2118 .expect("Failed to clear checkpoint fork detected marker");
2119 }
2120 Ok(())
2121 }
2122
2123 fn try_recover_transaction_fork(
2124 checkpoint_store: &CheckpointStore,
2125 recovery: &ForkRecoveryConfig,
2126 ) -> Result<()> {
2127 if recovery.transaction_overrides.is_empty() {
2128 return Ok(());
2129 }
2130
2131 if let Some((tx_digest, _, _)) = checkpoint_store.get_transaction_fork_detected()?
2132 && recovery
2133 .transaction_overrides
2134 .contains_key(&tx_digest.to_string())
2135 {
2136 info!(
2137 "Fork recovery enabled: clearing transaction fork for tx {:?}",
2138 tx_digest
2139 );
2140 checkpoint_store
2141 .clear_transaction_fork_detected()
2142 .expect("Failed to clear transaction fork detected marker");
2143 }
2144 Ok(())
2145 }
2146
2147 fn get_current_timestamp() -> u64 {
2148 std::time::SystemTime::now()
2149 .duration_since(std::time::SystemTime::UNIX_EPOCH)
2150 .unwrap()
2151 .as_secs()
2152 }
2153
2154 async fn handle_checkpoint_fork(
2155 checkpoint_seq: u64,
2156 checkpoint_digest: CheckpointDigest,
2157 checkpoint_metrics: &CheckpointMetrics,
2158 fork_recovery: Option<&ForkRecoveryConfig>,
2159 ) -> Result<()> {
2160 checkpoint_metrics
2161 .checkpoint_fork_crash_mode
2162 .with_label_values(&[
2163 &checkpoint_seq.to_string(),
2164 &Self::get_digest_prefix(checkpoint_digest),
2165 &Self::get_current_timestamp().to_string(),
2166 ])
2167 .set(1);
2168
2169 let behavior = fork_recovery
2170 .map(|fr| fr.fork_crash_behavior)
2171 .unwrap_or_default();
2172
2173 match behavior {
2174 ForkCrashBehavior::AwaitForkRecovery => {
2175 error!(
2176 checkpoint_seq = checkpoint_seq,
2177 checkpoint_digest = ?checkpoint_digest,
2178 "Checkpoint fork detected! Node startup halted. Sleeping indefinitely."
2179 );
2180 futures::future::pending::<()>().await;
2181 unreachable!("pending() should never return");
2182 }
2183 ForkCrashBehavior::ReturnError => {
2184 error!(
2185 checkpoint_seq = checkpoint_seq,
2186 checkpoint_digest = ?checkpoint_digest,
2187 "Checkpoint fork detected! Returning error."
2188 );
2189 Err(anyhow::anyhow!(
2190 "Checkpoint fork detected! checkpoint_seq: {}, checkpoint_digest: {:?}",
2191 checkpoint_seq,
2192 checkpoint_digest
2193 ))
2194 }
2195 }
2196 }
2197
2198 async fn handle_transaction_fork(
2199 tx_digest: TransactionDigest,
2200 expected_effects_digest: TransactionEffectsDigest,
2201 actual_effects_digest: TransactionEffectsDigest,
2202 checkpoint_metrics: &CheckpointMetrics,
2203 fork_recovery: Option<&ForkRecoveryConfig>,
2204 ) -> Result<()> {
2205 checkpoint_metrics
2206 .transaction_fork_crash_mode
2207 .with_label_values(&[
2208 &Self::get_digest_prefix(tx_digest),
2209 &Self::get_digest_prefix(expected_effects_digest),
2210 &Self::get_digest_prefix(actual_effects_digest),
2211 &Self::get_current_timestamp().to_string(),
2212 ])
2213 .set(1);
2214
2215 let behavior = fork_recovery
2216 .map(|fr| fr.fork_crash_behavior)
2217 .unwrap_or_default();
2218
2219 match behavior {
2220 ForkCrashBehavior::AwaitForkRecovery => {
2221 error!(
2222 tx_digest = ?tx_digest,
2223 expected_effects_digest = ?expected_effects_digest,
2224 actual_effects_digest = ?actual_effects_digest,
2225 "Transaction fork detected! Node startup halted. Sleeping indefinitely."
2226 );
2227 futures::future::pending::<()>().await;
2228 unreachable!("pending() should never return");
2229 }
2230 ForkCrashBehavior::ReturnError => {
2231 error!(
2232 tx_digest = ?tx_digest,
2233 expected_effects_digest = ?expected_effects_digest,
2234 actual_effects_digest = ?actual_effects_digest,
2235 "Transaction fork detected! Returning error."
2236 );
2237 Err(anyhow::anyhow!(
2238 "Transaction fork detected! tx_digest: {:?}, expected_effects: {:?}, actual_effects: {:?}",
2239 tx_digest,
2240 expected_effects_digest,
2241 actual_effects_digest
2242 ))
2243 }
2244 }
2245 }
2246}
2247
2248#[cfg(not(msim))]
2249impl SuiNode {
2250 async fn fetch_jwks(
2251 _authority: AuthorityName,
2252 provider: &OIDCProvider,
2253 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2254 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2255 use sui_types::error::SuiErrorKind;
2256 let client = reqwest::Client::new();
2257 fetch_jwks(provider, &client, true)
2258 .await
2259 .map_err(|_| SuiErrorKind::JWKRetrievalError.into())
2260 }
2261}
2262
2263#[cfg(msim)]
2264impl SuiNode {
2265 pub fn get_sim_node_id(&self) -> sui_simulator::task::NodeId {
2266 self.sim_state.sim_node.id()
2267 }
2268
2269 pub fn set_safe_mode_expected(&self, new_value: bool) {
2270 info!("Setting safe mode expected to {}", new_value);
2271 self.sim_state
2272 .sim_safe_mode_expected
2273 .store(new_value, Ordering::Relaxed);
2274 }
2275
2276 #[allow(unused_variables)]
2277 async fn fetch_jwks(
2278 authority: AuthorityName,
2279 provider: &OIDCProvider,
2280 ) -> SuiResult<Vec<(JwkId, JWK)>> {
2281 get_jwk_injector()(authority, provider)
2282 }
2283}
2284
2285enum SpawnOnce {
2286 Unstarted(oneshot::Receiver<()>, Mutex<BoxFuture<'static, ()>>),
2288 #[allow(unused)]
2289 Started(JoinHandle<()>),
2290}
2291
2292impl SpawnOnce {
2293 pub fn new(
2294 ready_rx: oneshot::Receiver<()>,
2295 future: impl Future<Output = ()> + Send + 'static,
2296 ) -> Self {
2297 Self::Unstarted(ready_rx, Mutex::new(Box::pin(future)))
2298 }
2299
2300 pub async fn start(self) -> Self {
2301 match self {
2302 Self::Unstarted(ready_rx, future) => {
2303 let future = future.into_inner();
2304 let handle = tokio::spawn(future);
2305 ready_rx.await.unwrap();
2306 Self::Started(handle)
2307 }
2308 Self::Started(_) => self,
2309 }
2310 }
2311}
2312
2313fn update_peer_addresses(
2315 config: &NodeConfig,
2316 endpoint_manager: &EndpointManager,
2317 epoch_start_state: &EpochStartSystemState,
2318) {
2319 for (peer_id, address) in
2320 epoch_start_state.get_validator_as_p2p_peers(config.protocol_public_key())
2321 {
2322 endpoint_manager
2323 .update_endpoint(
2324 EndpointId::P2p(peer_id),
2325 AddressSource::Committee,
2326 vec![address],
2327 )
2328 .expect("Updating peer addresses should not fail");
2329 }
2330}
2331
2332fn build_kv_store(
2333 state: &Arc<AuthorityState>,
2334 config: &NodeConfig,
2335 registry: &Registry,
2336) -> Result<Arc<TransactionKeyValueStore>> {
2337 let metrics = KeyValueStoreMetrics::new(registry);
2338 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2339
2340 let base_url = &config.transaction_kv_store_read_config.base_url;
2341
2342 if base_url.is_empty() {
2343 info!("no http kv store url provided, using local db only");
2344 return Ok(Arc::new(db_store));
2345 }
2346
2347 let base_url: url::Url = base_url.parse().tap_err(|e| {
2348 error!(
2349 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2350 base_url, e
2351 )
2352 })?;
2353
2354 let network_str = match state.get_chain_identifier().chain() {
2355 Chain::Mainnet => "/mainnet",
2356 _ => {
2357 info!("using local db only for kv store");
2358 return Ok(Arc::new(db_store));
2359 }
2360 };
2361
2362 let base_url = base_url.join(network_str)?.to_string();
2363 let http_store = HttpKVStore::new_kv(
2364 &base_url,
2365 config.transaction_kv_store_read_config.cache_size,
2366 metrics.clone(),
2367 )?;
2368 info!("using local key-value store with fallback to http key-value store");
2369 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2370 db_store,
2371 http_store,
2372 metrics,
2373 "json_rpc_fallback",
2374 )))
2375}
2376
2377async fn build_http_servers(
2378 state: Arc<AuthorityState>,
2379 store: RocksDbStore,
2380 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2381 config: &NodeConfig,
2382 prometheus_registry: &Registry,
2383 server_version: ServerVersion,
2384) -> Result<(HttpServers, Option<tokio::sync::mpsc::Sender<Checkpoint>>)> {
2385 if config.consensus_config().is_some() {
2387 return Ok((HttpServers::default(), None));
2388 }
2389
2390 info!("starting rpc service with config: {:?}", config.rpc);
2391
2392 let mut router = axum::Router::new();
2393
2394 let json_rpc_router = {
2395 let traffic_controller = state.traffic_controller.clone();
2396 let mut server = JsonRpcServerBuilder::new(
2397 env!("CARGO_PKG_VERSION"),
2398 prometheus_registry,
2399 traffic_controller,
2400 config.policy_config.clone(),
2401 );
2402
2403 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2404
2405 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2406 server.register_module(ReadApi::new(
2407 state.clone(),
2408 kv_store.clone(),
2409 metrics.clone(),
2410 ))?;
2411 server.register_module(CoinReadApi::new(
2412 state.clone(),
2413 kv_store.clone(),
2414 metrics.clone(),
2415 ))?;
2416
2417 if config.run_with_range.is_none() {
2420 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2421 }
2422 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2423 server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2424
2425 if let Some(transaction_orchestrator) = transaction_orchestrator {
2426 server.register_module(TransactionExecutionApi::new(
2427 state.clone(),
2428 transaction_orchestrator.clone(),
2429 metrics.clone(),
2430 ))?;
2431 }
2432
2433 let name_service_config =
2434 if let (Some(package_address), Some(registry_id), Some(reverse_registry_id)) = (
2435 config.name_service_package_address,
2436 config.name_service_registry_id,
2437 config.name_service_reverse_registry_id,
2438 ) {
2439 sui_name_service::NameServiceConfig::new(
2440 package_address,
2441 registry_id,
2442 reverse_registry_id,
2443 )
2444 } else {
2445 match state.get_chain_identifier().chain() {
2446 Chain::Mainnet => sui_name_service::NameServiceConfig::mainnet(),
2447 Chain::Testnet => sui_name_service::NameServiceConfig::testnet(),
2448 Chain::Unknown => sui_name_service::NameServiceConfig::default(),
2449 }
2450 };
2451
2452 server.register_module(IndexerApi::new(
2453 state.clone(),
2454 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2455 kv_store,
2456 name_service_config,
2457 metrics,
2458 config.indexer_max_subscriptions,
2459 ))?;
2460 server.register_module(MoveUtils::new(state.clone()))?;
2461
2462 let server_type = config.jsonrpc_server_type();
2463
2464 server.to_router(server_type).await?
2465 };
2466
2467 router = router.merge(json_rpc_router);
2468
2469 let (subscription_service_checkpoint_sender, subscription_service_handle) =
2470 SubscriptionService::build(prometheus_registry);
2471 let rpc_router = {
2472 let mut rpc_service =
2473 sui_rpc_api::RpcService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2474 rpc_service.with_server_version(server_version);
2475
2476 if let Some(config) = config.rpc.clone() {
2477 rpc_service.with_config(config);
2478 }
2479
2480 rpc_service.with_metrics(RpcMetrics::new(prometheus_registry));
2481 rpc_service.with_subscription_service(subscription_service_handle);
2482
2483 if let Some(transaction_orchestrator) = transaction_orchestrator {
2484 rpc_service.with_executor(transaction_orchestrator.clone())
2485 }
2486
2487 rpc_service.into_router().await
2488 };
2489
2490 let layers = ServiceBuilder::new()
2491 .map_request(|mut request: axum::http::Request<_>| {
2492 if let Some(connect_info) = request.extensions().get::<sui_http::ConnectInfo>() {
2493 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2494 request.extensions_mut().insert(axum_connect_info);
2495 }
2496 request
2497 })
2498 .layer(axum::middleware::from_fn(server_timing_middleware))
2499 .layer(
2501 tower_http::cors::CorsLayer::new()
2502 .allow_methods([http::Method::GET, http::Method::POST])
2503 .allow_origin(tower_http::cors::Any)
2504 .allow_headers(tower_http::cors::Any)
2505 .expose_headers(tower_http::cors::Any),
2506 );
2507
2508 router = router.merge(rpc_router).layer(layers);
2509
2510 let https = if let Some((tls_config, https_address)) = config
2511 .rpc()
2512 .and_then(|config| config.tls_config().map(|tls| (tls, config.https_address())))
2513 {
2514 let https = sui_http::Builder::new()
2515 .tls_single_cert(tls_config.cert(), tls_config.key())
2516 .and_then(|builder| builder.serve(https_address, router.clone()))
2517 .map_err(|e| anyhow::anyhow!(e))?;
2518
2519 info!(
2520 https_address =? https.local_addr(),
2521 "HTTPS rpc server listening on {}",
2522 https.local_addr()
2523 );
2524
2525 Some(https)
2526 } else {
2527 None
2528 };
2529
2530 let http = sui_http::Builder::new()
2531 .serve(&config.json_rpc_address, router)
2532 .map_err(|e| anyhow::anyhow!(e))?;
2533
2534 info!(
2535 http_address =? http.local_addr(),
2536 "HTTP rpc server listening on {}",
2537 http.local_addr()
2538 );
2539
2540 Ok((
2541 HttpServers {
2542 http: Some(http),
2543 https,
2544 },
2545 Some(subscription_service_checkpoint_sender),
2546 ))
2547}
2548
2549#[cfg(not(test))]
2550fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2551 protocol_config.max_transactions_per_checkpoint() as usize
2552}
2553
2554#[cfg(test)]
2555fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2556 2
2557}
2558
2559#[derive(Default)]
2560struct HttpServers {
2561 #[allow(unused)]
2562 http: Option<sui_http::ServerHandle>,
2563 #[allow(unused)]
2564 https: Option<sui_http::ServerHandle>,
2565}
2566
2567#[cfg(test)]
2568mod tests {
2569 use super::*;
2570 use prometheus::Registry;
2571 use std::collections::BTreeMap;
2572 use sui_config::node::{ForkCrashBehavior, ForkRecoveryConfig};
2573 use sui_core::checkpoints::{CheckpointMetrics, CheckpointStore};
2574 use sui_types::digests::{CheckpointDigest, TransactionDigest, TransactionEffectsDigest};
2575
2576 #[tokio::test]
2577 async fn test_fork_error_and_recovery_both_paths() {
2578 let checkpoint_store = CheckpointStore::new_for_tests();
2579 let checkpoint_metrics = CheckpointMetrics::new(&Registry::new());
2580
2581 let seq_num = 42;
2583 let digest = CheckpointDigest::random();
2584 checkpoint_store
2585 .record_checkpoint_fork_detected(seq_num, digest)
2586 .unwrap();
2587
2588 let fork_recovery = ForkRecoveryConfig {
2589 transaction_overrides: Default::default(),
2590 checkpoint_overrides: Default::default(),
2591 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2592 };
2593
2594 let r = SuiNode::check_and_recover_forks(
2595 &checkpoint_store,
2596 &checkpoint_metrics,
2597 true,
2598 Some(&fork_recovery),
2599 )
2600 .await;
2601 assert!(r.is_err());
2602 assert!(
2603 r.unwrap_err()
2604 .to_string()
2605 .contains("Checkpoint fork detected")
2606 );
2607
2608 let mut checkpoint_overrides = BTreeMap::new();
2609 checkpoint_overrides.insert(seq_num, digest.to_string());
2610 let fork_recovery_with_override = ForkRecoveryConfig {
2611 transaction_overrides: Default::default(),
2612 checkpoint_overrides,
2613 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2614 };
2615 let r = SuiNode::check_and_recover_forks(
2616 &checkpoint_store,
2617 &checkpoint_metrics,
2618 true,
2619 Some(&fork_recovery_with_override),
2620 )
2621 .await;
2622 assert!(r.is_ok());
2623 assert!(
2624 checkpoint_store
2625 .get_checkpoint_fork_detected()
2626 .unwrap()
2627 .is_none()
2628 );
2629
2630 let tx_digest = TransactionDigest::random();
2632 let expected_effects = TransactionEffectsDigest::random();
2633 let actual_effects = TransactionEffectsDigest::random();
2634 checkpoint_store
2635 .record_transaction_fork_detected(tx_digest, expected_effects, actual_effects)
2636 .unwrap();
2637
2638 let fork_recovery = ForkRecoveryConfig {
2639 transaction_overrides: Default::default(),
2640 checkpoint_overrides: Default::default(),
2641 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2642 };
2643 let r = SuiNode::check_and_recover_forks(
2644 &checkpoint_store,
2645 &checkpoint_metrics,
2646 true,
2647 Some(&fork_recovery),
2648 )
2649 .await;
2650 assert!(r.is_err());
2651 assert!(
2652 r.unwrap_err()
2653 .to_string()
2654 .contains("Transaction fork detected")
2655 );
2656
2657 let mut transaction_overrides = BTreeMap::new();
2658 transaction_overrides.insert(tx_digest.to_string(), actual_effects.to_string());
2659 let fork_recovery_with_override = ForkRecoveryConfig {
2660 transaction_overrides,
2661 checkpoint_overrides: Default::default(),
2662 fork_crash_behavior: ForkCrashBehavior::ReturnError,
2663 };
2664 let r = SuiNode::check_and_recover_forks(
2665 &checkpoint_store,
2666 &checkpoint_metrics,
2667 true,
2668 Some(&fork_recovery_with_override),
2669 )
2670 .await;
2671 assert!(r.is_ok());
2672 assert!(
2673 checkpoint_store
2674 .get_transaction_fork_detected()
2675 .unwrap()
2676 .is_none()
2677 );
2678 }
2679}