1use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
4use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient};
5use crate::consensus_handler::{ConsensusHandlerInitializer, MysticetiConsensusHandler};
6use crate::consensus_validator::SuiTxValidator;
7use crate::mysticeti_adapter::LazyMysticetiClient;
8use arc_swap::ArcSwapOption;
9use async_trait::async_trait;
10use consensus_config::{
11 ChainType, Committee, ConsensusProtocolConfig, NetworkKeyPair,
12 NetworkPublicKey as ConsensusNetworkPublicKey, Parameters, ProtocolKeyPair, Stake,
13};
14use consensus_core::{
15 Clock, CommitConsumerArgs, CommitConsumerMonitor, CommitIndex, ConsensusAuthority, NetworkType,
16 RandomnessSignatureHandler, storage::rocksdb_store::RocksDBStore,
17};
18use core::panic;
19use fastcrypto::traits::KeyPair as _;
20use mysten_metrics::{RegistryID, RegistryService};
21use mysten_network::Multiaddr;
22use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
23use std::collections::BTreeMap;
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use sui_config::{ConsensusConfig, NodeConfig};
28use sui_network::endpoint_manager::ConsensusAddressUpdater;
29use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
30use sui_types::crypto::NetworkPublicKey;
31use sui_types::error::{SuiErrorKind, SuiResult};
32use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
33use sui_types::node_role::NodeRole;
34use sui_types::{
35 committee::EpochId, sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
36};
37use tokio::sync::{Mutex, broadcast};
38use tokio::time::{sleep, timeout};
39use tracing::{error, info};
40
41#[cfg(test)]
42#[path = "../unit_tests/consensus_manager_tests.rs"]
43pub mod consensus_manager_tests;
44
45#[derive(PartialEq)]
46enum Running {
47 True(EpochId, ProtocolVersion),
48 False,
49}
50
51struct AddressOverridesMap {
54 map: BTreeMap<
56 ConsensusNetworkPublicKey,
57 BTreeMap<sui_network::endpoint_manager::AddressSource, Vec<Multiaddr>>,
58 >,
59}
60
61impl AddressOverridesMap {
62 pub fn new() -> Self {
63 Self {
64 map: BTreeMap::new(),
65 }
66 }
67
68 pub fn insert(
69 &mut self,
70 network_pubkey: ConsensusNetworkPublicKey,
71 source: sui_network::endpoint_manager::AddressSource,
72 addresses: Vec<Multiaddr>,
73 ) {
74 self.map
75 .entry(network_pubkey)
76 .or_default()
77 .insert(source, addresses);
78 }
79
80 pub fn remove(
81 &mut self,
82 network_pubkey: ConsensusNetworkPublicKey,
83 source: sui_network::endpoint_manager::AddressSource,
84 ) {
85 self.map
86 .entry(network_pubkey.clone())
87 .or_default()
88 .remove(&source);
89
90 if self.map.get(&network_pubkey.clone()).unwrap().is_empty() {
92 self.map.remove(&network_pubkey);
93 }
94 }
95
96 pub fn get_highest_priority_address(
97 &self,
98 network_pubkey: ConsensusNetworkPublicKey,
99 ) -> Option<Multiaddr> {
100 self.map
101 .get(&network_pubkey)
102 .and_then(|sources| sources.first_key_value())
103 .and_then(|(_, addresses)| addresses.first().cloned())
104 }
105
106 pub fn get_all_highest_priority_addresses(
107 &self,
108 ) -> Vec<(ConsensusNetworkPublicKey, Multiaddr)> {
109 let mut result = Vec::new();
110
111 for (network_pubkey, sources) in self.map.iter() {
112 if let Some((_source, addresses)) = sources.first_key_value()
113 && let Some(address) = addresses.first()
114 {
115 result.push((network_pubkey.clone(), address.clone()));
116 }
117 }
118 result
119 }
120}
121
122fn apply_v3_threshold_overrides(committee: Committee) -> Committee {
128 let malicious_stake: Stake = std::env::var("SUI_CONSENSUS_V3_MALICIOUS_STAKE")
129 .ok()
130 .and_then(|s| s.parse().ok())
131 .unwrap_or(1_250);
132 let crash_stake: Stake = std::env::var("SUI_CONSENSUS_V3_CRASH_STAKE")
133 .ok()
134 .and_then(|s| s.parse().ok())
135 .unwrap_or(1_250);
136 info!(
137 "consensus_manager: applying v3 committee thresholds \
138 (malicious_stake={malicious_stake}, crash_stake={crash_stake})"
139 );
140 Committee::new_v3(
141 committee.epoch(),
142 committee.authorities_slice().to_vec(),
143 malicious_stake,
144 crash_stake,
145 )
146}
147
148fn to_consensus_protocol_config(config: &ProtocolConfig, chain: Chain) -> ConsensusProtocolConfig {
149 let chain_type = match chain {
150 Chain::Mainnet => ChainType::Mainnet,
151 Chain::Testnet => ChainType::Testnet,
152 Chain::Unknown => ChainType::Unknown,
153 };
154 ConsensusProtocolConfig::new(
155 config.version.as_u64(),
156 chain_type,
157 config.max_transaction_size_bytes(),
158 config.max_transactions_in_block_bytes(),
159 config.max_num_transactions_in_block(),
160 config.gc_depth(),
161 true,
162 config.mysticeti_num_leaders_per_round(),
163 config.consensus_bad_nodes_stake_threshold(),
164 false,
165 300,
166 12,
167 )
168}
169
170pub struct ConsensusManager {
173 consensus_config: ConsensusConfig,
174 protocol_keypair: Option<ProtocolKeyPair>,
175 network_keypair: NetworkKeyPair,
176 storage_base_path: PathBuf,
177 metrics: Arc<ConsensusManagerMetrics>,
178 registry_service: RegistryService,
179 authority: ArcSwapOption<(ConsensusAuthority, RegistryID)>,
180
181 client: Arc<LazyMysticetiClient>,
184 consensus_client: Arc<UpdatableConsensusClient>,
185
186 consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
187
188 #[cfg(test)]
189 pub(crate) consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
190 #[cfg(not(test))]
191 consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
192 consumer_monitor_sender: broadcast::Sender<Arc<CommitConsumerMonitor>>,
193
194 running: Mutex<Running>,
195
196 #[cfg(test)]
197 pub(crate) boot_counter: Mutex<u64>,
198 #[cfg(not(test))]
199 boot_counter: Mutex<u64>,
200
201 address_overrides: parking_lot::Mutex<AddressOverridesMap>,
204}
205
206impl ConsensusManager {
207 pub fn new(
208 node_config: &NodeConfig,
209 consensus_config: &ConsensusConfig,
210 registry_service: &RegistryService,
211 consensus_client: Arc<UpdatableConsensusClient>,
212 node_role: NodeRole,
213 ) -> Self {
214 let metrics = Arc::new(ConsensusManagerMetrics::new(
215 ®istry_service.default_registry(),
216 ));
217 let client = Arc::new(LazyMysticetiClient::new());
218 let (consumer_monitor_sender, _) = broadcast::channel(1);
219 let protocol_keypair = if node_role.is_validator() {
220 Some(ProtocolKeyPair::new(node_config.worker_key_pair().copy()))
221 } else {
222 None
223 };
224 Self {
225 consensus_config: consensus_config.clone(),
226 protocol_keypair,
227 network_keypair: NetworkKeyPair::new(node_config.network_key_pair().copy()),
228 storage_base_path: consensus_config.db_path().to_path_buf(),
229 metrics,
230 registry_service: registry_service.clone(),
231 authority: ArcSwapOption::empty(),
232 client,
233 consensus_client,
234 consensus_handler: Mutex::new(None),
235 consumer_monitor: ArcSwapOption::empty(),
236 consumer_monitor_sender,
237 running: Mutex::new(Running::False),
238 boot_counter: Mutex::new(0),
239 address_overrides: parking_lot::Mutex::new(AddressOverridesMap::new()),
240 }
241 }
242
243 pub async fn start(
244 &self,
245 node_config: &NodeConfig,
246 epoch_store: Arc<AuthorityPerEpochStore>,
247 consensus_handler_initializer: ConsensusHandlerInitializer,
248 tx_validator: SuiTxValidator,
249 randomness_signature_handler: Option<Arc<dyn RandomnessSignatureHandler>>,
250 ) {
251 let epoch = epoch_store.epoch();
252 let protocol_config = epoch_store.protocol_config();
253 let consensus_protocol_config =
254 to_consensus_protocol_config(protocol_config, epoch_store.get_chain());
255 let system_state = epoch_store.epoch_start_state();
256 let committee = if consensus_protocol_config.enable_v3() {
257 apply_v3_threshold_overrides(system_state.get_consensus_committee())
258 } else {
259 system_state.get_consensus_committee()
260 };
261
262 let start_time = Instant::now();
264 let mut running = self.running.lock().await;
265 if let Running::True(running_epoch, running_version) = *running {
266 error!(
267 "Consensus is already Running for epoch {running_epoch:?} & protocol version {running_version:?} - shutdown first before starting",
268 );
269 return;
270 }
271 *running = Running::True(epoch, protocol_config.version);
272
273 info!(
274 "Starting up consensus for epoch {epoch:?} & protocol version {:?}",
275 protocol_config.version
276 );
277
278 self.consensus_client.set(self.client.clone());
279
280 let consensus_config = node_config
281 .consensus_config()
282 .expect("consensus_config should exist");
283
284 let parameters = Parameters {
285 db_path: self.get_store_path(epoch),
286 listen_address_override: consensus_config.listen_address.clone(),
287 ..consensus_config.parameters.clone().unwrap_or_default()
288 };
289
290 let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
291
292 let consensus_handler = consensus_handler_initializer.new_consensus_handler();
293
294 let num_prior_commits = protocol_config.consensus_num_requested_prior_commits_at_startup();
295 let last_processed_commit_index =
296 consensus_handler.last_processed_subdag_index() as CommitIndex;
297 let replay_after_commit_index =
298 last_processed_commit_index.saturating_sub(num_prior_commits);
299
300 let (commit_consumer, commit_receiver) =
301 CommitConsumerArgs::new(replay_after_commit_index, last_processed_commit_index);
302 let monitor = commit_consumer.monitor();
303
304 let node_role = epoch_store.node_role();
305
306 let handler = MysticetiConsensusHandler::new(
308 last_processed_commit_index,
309 consensus_handler,
310 commit_receiver,
311 monitor.clone(),
312 node_role,
313 );
314 let mut consensus_handler = self.consensus_handler.lock().await;
315 *consensus_handler = Some(handler);
316
317 let participated_on_previous_run =
321 if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
322 previous_monitor.highest_handled_commit() > 0
323 } else {
324 false
325 };
326
327 let mut boot_counter = self.boot_counter.lock().await;
332 if participated_on_previous_run {
333 *boot_counter += 1;
334 } else {
335 info!(
336 "Node has not participated in previous epoch consensus. Boot counter ({}) will not increment.",
337 *boot_counter
338 );
339 }
340
341 let authority = ConsensusAuthority::start(
342 NetworkType::Tonic,
343 epoch_store.epoch_start_config().epoch_start_timestamp_ms(),
344 committee.clone(),
345 parameters.clone(),
346 consensus_protocol_config,
347 self.protocol_keypair.clone(),
348 self.network_keypair.clone(),
349 Arc::new(Clock::default()),
350 Arc::new(tx_validator.clone()),
351 commit_consumer,
352 registry.clone(),
353 *boot_counter,
354 randomness_signature_handler,
355 )
356 .await;
357 let client = authority.transaction_client();
358
359 let registry_id = self.registry_service.add(registry.clone());
360
361 let registered_authority = Arc::new((authority, registry_id));
362 self.authority.swap(Some(registered_authority.clone()));
363
364 let highest_priority_addresses = self
366 .address_overrides
367 .lock()
368 .get_all_highest_priority_addresses();
369 for (network_pubkey, address) in highest_priority_addresses {
370 registered_authority
371 .0
372 .update_peer_address(network_pubkey, Some(address.clone()));
373 }
374
375 self.client.set(client);
377
378 let _ = self.consumer_monitor_sender.send(monitor);
380
381 let elapsed = start_time.elapsed().as_secs_f64();
382 self.metrics.start_latency.set(elapsed as i64);
383
384 tracing::info!(
385 "Started consensus for epoch {} & protocol version {:?} completed - took {} seconds",
386 epoch,
387 protocol_config.version,
388 elapsed
389 );
390 }
391
392 pub async fn shutdown(&self) {
393 info!("Shutting down consensus ...");
394
395 let start_time = Instant::now();
397 let mut running = self.running.lock().await;
398 let (shutdown_epoch, shutdown_version) = match *running {
399 Running::True(epoch, version) => {
400 tracing::info!(
401 "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
402 );
403 *running = Running::False;
404 (epoch, version)
405 }
406 Running::False => {
407 error!("Consensus shutdown was called but consensus is not running");
408 return;
409 }
410 };
411
412 self.client.clear();
414
415 let r = self.authority.swap(None).unwrap();
417 let Ok((authority, registry_id)) = Arc::try_unwrap(r) else {
418 panic!("Failed to retrieve the Mysticeti authority");
419 };
420
421 authority.stop().await;
423
424 let mut consensus_handler = self.consensus_handler.lock().await;
426 if let Some(mut handler) = consensus_handler.take() {
427 handler.abort().await;
428 }
429
430 self.registry_service.remove(registry_id);
432
433 self.consensus_client.clear();
434
435 let elapsed = start_time.elapsed().as_secs_f64();
436 self.metrics.shutdown_latency.set(elapsed as i64);
437
438 tracing::info!(
439 "Consensus stopped for epoch {shutdown_epoch:?} & protocol version {shutdown_version:?} is complete - took {} seconds",
440 elapsed
441 );
442 }
443
444 pub async fn is_running(&self) -> bool {
445 let running = self.running.lock().await;
446 matches!(*running, Running::True(_, _))
447 }
448
449 pub fn replay_waiter(&self) -> ReplayWaiter {
450 let consumer_monitor_receiver = self.consumer_monitor_sender.subscribe();
451 ReplayWaiter::new(consumer_monitor_receiver)
452 }
453
454 pub fn get_storage_base_path(&self) -> PathBuf {
455 self.consensus_config.db_path().to_path_buf()
456 }
457
458 pub fn consensus_store(&self) -> Option<Arc<RocksDBStore>> {
459 self.authority.load().as_ref().map(|a| a.0.store())
460 }
461
462 fn get_store_path(&self, epoch: EpochId) -> PathBuf {
463 let mut store_path = self.storage_base_path.clone();
464 store_path.push(format!("{}", epoch));
465 store_path
466 }
467}
468
469impl ConsensusAddressUpdater for ConsensusManager {
471 fn update_address(
472 &self,
473 network_pubkey: NetworkPublicKey,
474 source: sui_network::endpoint_manager::AddressSource,
475 addresses: Vec<Multiaddr>,
476 ) -> SuiResult<()> {
477 let network_pubkey = ConsensusNetworkPublicKey::new(network_pubkey.clone());
479
480 let address_to_apply = {
482 let mut address_overrides = self.address_overrides.lock();
483
484 if addresses.is_empty() {
485 address_overrides.remove(network_pubkey.clone(), source);
486 } else {
487 address_overrides.insert(network_pubkey.clone(), source, addresses.clone());
488 }
489
490 address_overrides.get_highest_priority_address(network_pubkey.clone())
491 };
492
493 if let Some(authority) = self.authority.load_full() {
495 authority
496 .0
497 .update_peer_address(network_pubkey, address_to_apply);
498 Ok(())
499 } else {
500 info!(
501 "Consensus authority node is not running, address update persisted for peer {:?} from source {:?} and will be applied on next start",
502 network_pubkey, source
503 );
504 Err(SuiErrorKind::GenericAuthorityError {
505 error: "Consensus authority node is not running. Can not apply address update"
506 .to_string(),
507 }
508 .into())
509 }
510 }
511}
512
513#[derive(Default)]
516pub struct UpdatableConsensusClient {
517 client: ArcSwapOption<Arc<dyn ConsensusClient>>,
519}
520
521impl UpdatableConsensusClient {
522 pub fn new() -> Self {
523 Self {
524 client: ArcSwapOption::empty(),
525 }
526 }
527
528 async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
529 const START_TIMEOUT: Duration = Duration::from_secs(300);
530 const RETRY_INTERVAL: Duration = Duration::from_millis(100);
531 if let Ok(client) = timeout(START_TIMEOUT, async {
532 loop {
533 let Some(client) = self.client.load_full() else {
534 sleep(RETRY_INTERVAL).await;
535 continue;
536 };
537 return client;
538 }
539 })
540 .await
541 {
542 return client;
543 }
544
545 panic!(
546 "Timed out after {:?} waiting for Consensus to start!",
547 START_TIMEOUT,
548 );
549 }
550
551 pub fn set(&self, client: Arc<dyn ConsensusClient>) {
552 self.client.store(Some(Arc::new(client)));
553 }
554
555 pub fn clear(&self) {
556 self.client.store(None);
557 }
558}
559
560#[async_trait]
561impl ConsensusClient for UpdatableConsensusClient {
562 async fn submit(
563 &self,
564 transactions: &[ConsensusTransaction],
565 epoch_store: &Arc<AuthorityPerEpochStore>,
566 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
567 let client = self.get().await;
568 client.submit(transactions, epoch_store).await
569 }
570}
571
572pub struct ReplayWaiter {
574 consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
575}
576
577impl ReplayWaiter {
578 pub(crate) fn new(
579 consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
580 ) -> Self {
581 Self {
582 consumer_monitor_receiver,
583 }
584 }
585
586 pub(crate) async fn wait_for_replay(mut self) {
587 loop {
588 info!("Waiting for consensus to start replaying ...");
589 let Ok(monitor) = self.consumer_monitor_receiver.recv().await else {
590 continue;
591 };
592 info!("Waiting for consensus handler to finish replaying ...");
593 monitor
594 .replay_to_consumer_last_processed_commit_complete()
595 .await;
596 break;
597 }
598 }
599}
600
601impl Clone for ReplayWaiter {
602 fn clone(&self) -> Self {
603 Self {
604 consumer_monitor_receiver: self.consumer_monitor_receiver.resubscribe(),
605 }
606 }
607}
608
609pub struct ConsensusManagerMetrics {
610 start_latency: IntGauge,
611 shutdown_latency: IntGauge,
612}
613
614impl ConsensusManagerMetrics {
615 pub fn new(registry: &Registry) -> Self {
616 Self {
617 start_latency: register_int_gauge_with_registry!(
618 "consensus_manager_start_latency",
619 "The latency of starting up consensus nodes",
620 registry,
621 )
622 .unwrap(),
623 shutdown_latency: register_int_gauge_with_registry!(
624 "consensus_manager_shutdown_latency",
625 "The latency of shutting down consensus nodes",
626 registry,
627 )
628 .unwrap(),
629 }
630 }
631}