1use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
4use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient};
5use crate::consensus_handler::{ConsensusHandlerInitializer, MysticetiConsensusHandler};
6use crate::consensus_validator::SuiTxValidator;
7use crate::mysticeti_adapter::LazyMysticetiClient;
8use arc_swap::ArcSwapOption;
9use async_trait::async_trait;
10use consensus_config::{
11 ChainType, Committee, ConsensusProtocolConfig, NetworkKeyPair,
12 NetworkPublicKey as ConsensusNetworkPublicKey, Parameters, ProtocolKeyPair, Stake,
13};
14use consensus_core::{
15 Clock, CommitConsumerArgs, CommitConsumerMonitor, CommitIndex, ConsensusAuthority, NetworkType,
16 RandomnessSignatureHandler,
17};
18use core::panic;
19use fastcrypto::traits::KeyPair as _;
20use mysten_metrics::{RegistryID, RegistryService};
21use mysten_network::Multiaddr;
22use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
23use std::collections::BTreeMap;
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use sui_config::{ConsensusConfig, NodeConfig};
28use sui_network::endpoint_manager::ConsensusAddressUpdater;
29use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
30use sui_types::crypto::NetworkPublicKey;
31use sui_types::error::{SuiErrorKind, SuiResult};
32use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
33use sui_types::node_role::NodeRole;
34use sui_types::{
35 committee::EpochId, sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
36};
37use tokio::sync::{Mutex, broadcast};
38use tokio::time::{sleep, timeout};
39use tracing::{error, info};
40
41#[cfg(test)]
42#[path = "../unit_tests/consensus_manager_tests.rs"]
43pub mod consensus_manager_tests;
44
45#[derive(PartialEq)]
46enum Running {
47 True(EpochId, ProtocolVersion),
48 False,
49}
50
51struct AddressOverridesMap {
54 map: BTreeMap<
56 ConsensusNetworkPublicKey,
57 BTreeMap<sui_network::endpoint_manager::AddressSource, Vec<Multiaddr>>,
58 >,
59}
60
61impl AddressOverridesMap {
62 pub fn new() -> Self {
63 Self {
64 map: BTreeMap::new(),
65 }
66 }
67
68 pub fn insert(
69 &mut self,
70 network_pubkey: ConsensusNetworkPublicKey,
71 source: sui_network::endpoint_manager::AddressSource,
72 addresses: Vec<Multiaddr>,
73 ) {
74 self.map
75 .entry(network_pubkey)
76 .or_default()
77 .insert(source, addresses);
78 }
79
80 pub fn remove(
81 &mut self,
82 network_pubkey: ConsensusNetworkPublicKey,
83 source: sui_network::endpoint_manager::AddressSource,
84 ) {
85 self.map
86 .entry(network_pubkey.clone())
87 .or_default()
88 .remove(&source);
89
90 if self.map.get(&network_pubkey.clone()).unwrap().is_empty() {
92 self.map.remove(&network_pubkey);
93 }
94 }
95
96 pub fn get_highest_priority_address(
97 &self,
98 network_pubkey: ConsensusNetworkPublicKey,
99 ) -> Option<Multiaddr> {
100 self.map
101 .get(&network_pubkey)
102 .and_then(|sources| sources.first_key_value())
103 .and_then(|(_, addresses)| addresses.first().cloned())
104 }
105
106 pub fn get_all_highest_priority_addresses(
107 &self,
108 ) -> Vec<(ConsensusNetworkPublicKey, Multiaddr)> {
109 let mut result = Vec::new();
110
111 for (network_pubkey, sources) in self.map.iter() {
112 if let Some((_source, addresses)) = sources.first_key_value()
113 && let Some(address) = addresses.first()
114 {
115 result.push((network_pubkey.clone(), address.clone()));
116 }
117 }
118 result
119 }
120}
121
122fn apply_v3_threshold_overrides(committee: Committee) -> Committee {
128 let malicious_stake: Stake = std::env::var("SUI_CONSENSUS_V3_MALICIOUS_STAKE")
129 .ok()
130 .and_then(|s| s.parse().ok())
131 .unwrap_or(1_250);
132 let crash_stake: Stake = std::env::var("SUI_CONSENSUS_V3_CRASH_STAKE")
133 .ok()
134 .and_then(|s| s.parse().ok())
135 .unwrap_or(1_250);
136 info!(
137 "consensus_manager: applying v3 committee thresholds \
138 (malicious_stake={malicious_stake}, crash_stake={crash_stake})"
139 );
140 Committee::new_v3(
141 committee.epoch(),
142 committee.authorities_slice().to_vec(),
143 malicious_stake,
144 crash_stake,
145 )
146}
147
148fn to_consensus_protocol_config(config: &ProtocolConfig, chain: Chain) -> ConsensusProtocolConfig {
149 let chain_type = match chain {
150 Chain::Mainnet => ChainType::Mainnet,
151 Chain::Testnet => ChainType::Testnet,
152 Chain::Unknown => ChainType::Unknown,
153 };
154 ConsensusProtocolConfig::new(
155 config.version.as_u64(),
156 chain_type,
157 config.max_transaction_size_bytes(),
158 config.max_transactions_in_block_bytes(),
159 config.max_num_transactions_in_block(),
160 config.gc_depth(),
161 true,
162 config.mysticeti_num_leaders_per_round(),
163 config.consensus_bad_nodes_stake_threshold(),
164 false,
165 300,
166 12,
167 )
168}
169
170pub struct ConsensusManager {
173 consensus_config: ConsensusConfig,
174 protocol_keypair: Option<ProtocolKeyPair>,
175 network_keypair: NetworkKeyPair,
176 storage_base_path: PathBuf,
177 metrics: Arc<ConsensusManagerMetrics>,
178 registry_service: RegistryService,
179 authority: ArcSwapOption<(ConsensusAuthority, RegistryID)>,
180
181 client: Arc<LazyMysticetiClient>,
184 consensus_client: Arc<UpdatableConsensusClient>,
185
186 consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
187
188 #[cfg(test)]
189 pub(crate) consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
190 #[cfg(not(test))]
191 consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
192 consumer_monitor_sender: broadcast::Sender<Arc<CommitConsumerMonitor>>,
193
194 running: Mutex<Running>,
195
196 #[cfg(test)]
197 pub(crate) boot_counter: Mutex<u64>,
198 #[cfg(not(test))]
199 boot_counter: Mutex<u64>,
200
201 address_overrides: parking_lot::Mutex<AddressOverridesMap>,
204}
205
206impl ConsensusManager {
207 pub fn new(
208 node_config: &NodeConfig,
209 consensus_config: &ConsensusConfig,
210 registry_service: &RegistryService,
211 consensus_client: Arc<UpdatableConsensusClient>,
212 node_role: NodeRole,
213 ) -> Self {
214 let metrics = Arc::new(ConsensusManagerMetrics::new(
215 ®istry_service.default_registry(),
216 ));
217 let client = Arc::new(LazyMysticetiClient::new());
218 let (consumer_monitor_sender, _) = broadcast::channel(1);
219 let protocol_keypair = if node_role.is_validator() {
220 Some(ProtocolKeyPair::new(node_config.worker_key_pair().copy()))
221 } else {
222 None
223 };
224 Self {
225 consensus_config: consensus_config.clone(),
226 protocol_keypair,
227 network_keypair: NetworkKeyPair::new(node_config.network_key_pair().copy()),
228 storage_base_path: consensus_config.db_path().to_path_buf(),
229 metrics,
230 registry_service: registry_service.clone(),
231 authority: ArcSwapOption::empty(),
232 client,
233 consensus_client,
234 consensus_handler: Mutex::new(None),
235 consumer_monitor: ArcSwapOption::empty(),
236 consumer_monitor_sender,
237 running: Mutex::new(Running::False),
238 boot_counter: Mutex::new(0),
239 address_overrides: parking_lot::Mutex::new(AddressOverridesMap::new()),
240 }
241 }
242
243 pub async fn start(
244 &self,
245 node_config: &NodeConfig,
246 epoch_store: Arc<AuthorityPerEpochStore>,
247 consensus_handler_initializer: ConsensusHandlerInitializer,
248 tx_validator: SuiTxValidator,
249 randomness_signature_handler: Option<Arc<dyn RandomnessSignatureHandler>>,
250 ) {
251 let epoch = epoch_store.epoch();
252 let protocol_config = epoch_store.protocol_config();
253 let consensus_protocol_config =
254 to_consensus_protocol_config(protocol_config, epoch_store.get_chain());
255 let system_state = epoch_store.epoch_start_state();
256 let committee = if consensus_protocol_config.enable_v3() {
257 apply_v3_threshold_overrides(system_state.get_consensus_committee())
258 } else {
259 system_state.get_consensus_committee()
260 };
261
262 let start_time = Instant::now();
264 let mut running = self.running.lock().await;
265 if let Running::True(running_epoch, running_version) = *running {
266 error!(
267 "Consensus is already Running for epoch {running_epoch:?} & protocol version {running_version:?} - shutdown first before starting",
268 );
269 return;
270 }
271 *running = Running::True(epoch, protocol_config.version);
272
273 info!(
274 "Starting up consensus for epoch {epoch:?} & protocol version {:?}",
275 protocol_config.version
276 );
277
278 self.consensus_client.set(self.client.clone());
279
280 let consensus_config = node_config
281 .consensus_config()
282 .expect("consensus_config should exist");
283
284 let parameters = Parameters {
285 db_path: self.get_store_path(epoch),
286 listen_address_override: consensus_config.listen_address.clone(),
287 ..consensus_config.parameters.clone().unwrap_or_default()
288 };
289
290 let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
291
292 let consensus_handler = consensus_handler_initializer.new_consensus_handler();
293
294 let num_prior_commits = protocol_config.consensus_num_requested_prior_commits_at_startup();
295 let last_processed_commit_index =
296 consensus_handler.last_processed_subdag_index() as CommitIndex;
297 let replay_after_commit_index =
298 last_processed_commit_index.saturating_sub(num_prior_commits);
299
300 let (commit_consumer, commit_receiver) =
301 CommitConsumerArgs::new(replay_after_commit_index, last_processed_commit_index);
302 let monitor = commit_consumer.monitor();
303
304 let node_role = epoch_store.node_role();
305
306 let handler = MysticetiConsensusHandler::new(
308 last_processed_commit_index,
309 consensus_handler,
310 commit_receiver,
311 monitor.clone(),
312 node_role,
313 );
314 let mut consensus_handler = self.consensus_handler.lock().await;
315 *consensus_handler = Some(handler);
316
317 let participated_on_previous_run =
321 if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
322 previous_monitor.highest_handled_commit() > 0
323 } else {
324 false
325 };
326
327 let mut boot_counter = self.boot_counter.lock().await;
332 if participated_on_previous_run {
333 *boot_counter += 1;
334 } else {
335 info!(
336 "Node has not participated in previous epoch consensus. Boot counter ({}) will not increment.",
337 *boot_counter
338 );
339 }
340
341 let authority = ConsensusAuthority::start(
342 NetworkType::Tonic,
343 epoch_store.epoch_start_config().epoch_start_timestamp_ms(),
344 committee.clone(),
345 parameters.clone(),
346 consensus_protocol_config,
347 self.protocol_keypair.clone(),
348 self.network_keypair.clone(),
349 Arc::new(Clock::default()),
350 Arc::new(tx_validator.clone()),
351 commit_consumer,
352 registry.clone(),
353 *boot_counter,
354 randomness_signature_handler,
355 )
356 .await;
357 let client = authority.transaction_client();
358
359 let registry_id = self.registry_service.add(registry.clone());
360
361 let registered_authority = Arc::new((authority, registry_id));
362 self.authority.swap(Some(registered_authority.clone()));
363
364 let highest_priority_addresses = self
366 .address_overrides
367 .lock()
368 .get_all_highest_priority_addresses();
369 for (network_pubkey, address) in highest_priority_addresses {
370 registered_authority
371 .0
372 .update_peer_address(network_pubkey, Some(address.clone()));
373 }
374
375 self.client.set(client);
377
378 let _ = self.consumer_monitor_sender.send(monitor);
380
381 let elapsed = start_time.elapsed().as_secs_f64();
382 self.metrics.start_latency.set(elapsed as i64);
383
384 tracing::info!(
385 "Started consensus for epoch {} & protocol version {:?} completed - took {} seconds",
386 epoch,
387 protocol_config.version,
388 elapsed
389 );
390 }
391
392 pub async fn shutdown(&self) {
393 info!("Shutting down consensus ...");
394
395 let start_time = Instant::now();
397 let mut running = self.running.lock().await;
398 let (shutdown_epoch, shutdown_version) = match *running {
399 Running::True(epoch, version) => {
400 tracing::info!(
401 "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
402 );
403 *running = Running::False;
404 (epoch, version)
405 }
406 Running::False => {
407 error!("Consensus shutdown was called but consensus is not running");
408 return;
409 }
410 };
411
412 self.client.clear();
414
415 let r = self.authority.swap(None).unwrap();
417 let Ok((authority, registry_id)) = Arc::try_unwrap(r) else {
418 panic!("Failed to retrieve the Mysticeti authority");
419 };
420
421 authority.stop().await;
423
424 let mut consensus_handler = self.consensus_handler.lock().await;
426 if let Some(mut handler) = consensus_handler.take() {
427 handler.abort().await;
428 }
429
430 self.registry_service.remove(registry_id);
432
433 self.consensus_client.clear();
434
435 let elapsed = start_time.elapsed().as_secs_f64();
436 self.metrics.shutdown_latency.set(elapsed as i64);
437
438 tracing::info!(
439 "Consensus stopped for epoch {shutdown_epoch:?} & protocol version {shutdown_version:?} is complete - took {} seconds",
440 elapsed
441 );
442 }
443
444 pub async fn is_running(&self) -> bool {
445 let running = self.running.lock().await;
446 matches!(*running, Running::True(_, _))
447 }
448
449 pub fn replay_waiter(&self) -> ReplayWaiter {
450 let consumer_monitor_receiver = self.consumer_monitor_sender.subscribe();
451 ReplayWaiter::new(consumer_monitor_receiver)
452 }
453
454 pub fn get_storage_base_path(&self) -> PathBuf {
455 self.consensus_config.db_path().to_path_buf()
456 }
457
458 fn get_store_path(&self, epoch: EpochId) -> PathBuf {
459 let mut store_path = self.storage_base_path.clone();
460 store_path.push(format!("{}", epoch));
461 store_path
462 }
463}
464
465impl ConsensusAddressUpdater for ConsensusManager {
467 fn update_address(
468 &self,
469 network_pubkey: NetworkPublicKey,
470 source: sui_network::endpoint_manager::AddressSource,
471 addresses: Vec<Multiaddr>,
472 ) -> SuiResult<()> {
473 let network_pubkey = ConsensusNetworkPublicKey::new(network_pubkey.clone());
475
476 let address_to_apply = {
478 let mut address_overrides = self.address_overrides.lock();
479
480 if addresses.is_empty() {
481 address_overrides.remove(network_pubkey.clone(), source);
482 } else {
483 address_overrides.insert(network_pubkey.clone(), source, addresses.clone());
484 }
485
486 address_overrides.get_highest_priority_address(network_pubkey.clone())
487 };
488
489 if let Some(authority) = self.authority.load_full() {
491 authority
492 .0
493 .update_peer_address(network_pubkey, address_to_apply);
494 Ok(())
495 } else {
496 info!(
497 "Consensus authority node is not running, address update persisted for peer {:?} from source {:?} and will be applied on next start",
498 network_pubkey, source
499 );
500 Err(SuiErrorKind::GenericAuthorityError {
501 error: "Consensus authority node is not running. Can not apply address update"
502 .to_string(),
503 }
504 .into())
505 }
506 }
507}
508
509#[derive(Default)]
512pub struct UpdatableConsensusClient {
513 client: ArcSwapOption<Arc<dyn ConsensusClient>>,
515}
516
517impl UpdatableConsensusClient {
518 pub fn new() -> Self {
519 Self {
520 client: ArcSwapOption::empty(),
521 }
522 }
523
524 async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
525 const START_TIMEOUT: Duration = Duration::from_secs(300);
526 const RETRY_INTERVAL: Duration = Duration::from_millis(100);
527 if let Ok(client) = timeout(START_TIMEOUT, async {
528 loop {
529 let Some(client) = self.client.load_full() else {
530 sleep(RETRY_INTERVAL).await;
531 continue;
532 };
533 return client;
534 }
535 })
536 .await
537 {
538 return client;
539 }
540
541 panic!(
542 "Timed out after {:?} waiting for Consensus to start!",
543 START_TIMEOUT,
544 );
545 }
546
547 pub fn set(&self, client: Arc<dyn ConsensusClient>) {
548 self.client.store(Some(Arc::new(client)));
549 }
550
551 pub fn clear(&self) {
552 self.client.store(None);
553 }
554}
555
556#[async_trait]
557impl ConsensusClient for UpdatableConsensusClient {
558 async fn submit(
559 &self,
560 transactions: &[ConsensusTransaction],
561 epoch_store: &Arc<AuthorityPerEpochStore>,
562 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
563 let client = self.get().await;
564 client.submit(transactions, epoch_store).await
565 }
566}
567
568pub struct ReplayWaiter {
570 consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
571}
572
573impl ReplayWaiter {
574 pub(crate) fn new(
575 consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
576 ) -> Self {
577 Self {
578 consumer_monitor_receiver,
579 }
580 }
581
582 pub(crate) async fn wait_for_replay(mut self) {
583 loop {
584 info!("Waiting for consensus to start replaying ...");
585 let Ok(monitor) = self.consumer_monitor_receiver.recv().await else {
586 continue;
587 };
588 info!("Waiting for consensus handler to finish replaying ...");
589 monitor
590 .replay_to_consumer_last_processed_commit_complete()
591 .await;
592 break;
593 }
594 }
595}
596
597impl Clone for ReplayWaiter {
598 fn clone(&self) -> Self {
599 Self {
600 consumer_monitor_receiver: self.consumer_monitor_receiver.resubscribe(),
601 }
602 }
603}
604
605pub struct ConsensusManagerMetrics {
606 start_latency: IntGauge,
607 shutdown_latency: IntGauge,
608}
609
610impl ConsensusManagerMetrics {
611 pub fn new(registry: &Registry) -> Self {
612 Self {
613 start_latency: register_int_gauge_with_registry!(
614 "consensus_manager_start_latency",
615 "The latency of starting up consensus nodes",
616 registry,
617 )
618 .unwrap(),
619 shutdown_latency: register_int_gauge_with_registry!(
620 "consensus_manager_shutdown_latency",
621 "The latency of shutting down consensus nodes",
622 registry,
623 )
624 .unwrap(),
625 }
626 }
627}