1use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
4use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient};
5use crate::consensus_handler::{ConsensusHandlerInitializer, MysticetiConsensusHandler};
6use crate::consensus_validator::SuiTxValidator;
7use crate::mysticeti_adapter::LazyMysticetiClient;
8use arc_swap::ArcSwapOption;
9use async_trait::async_trait;
10use consensus_config::{
11 ChainType, Committee, ConsensusProtocolConfig, NetworkKeyPair,
12 NetworkPublicKey as ConsensusNetworkPublicKey, Parameters, ProtocolKeyPair, Stake,
13};
14use consensus_core::{
15 Clock, CommitConsumerArgs, CommitConsumerMonitor, CommitIndex, ConsensusAuthority, NetworkType,
16 RandomnessSignatureHandler, storage::rocksdb_store::RocksDBStore,
17};
18use core::panic;
19use fastcrypto::encoding::{Encoding, Hex};
20use fastcrypto::traits::KeyPair as _;
21use mysten_metrics::{RegistryID, RegistryService};
22use mysten_network::Multiaddr;
23use prometheus::{
24 IntGauge, IntGaugeVec, Registry, register_int_gauge_vec_with_registry,
25 register_int_gauge_with_registry,
26};
27use std::collections::BTreeMap;
28use std::path::PathBuf;
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31use sui_config::{ConsensusConfig, NodeConfig};
32use sui_network::endpoint_manager::{AddressSource, ConsensusAddressUpdater};
33use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
34use sui_types::crypto::NetworkPublicKey;
35use sui_types::error::{SuiErrorKind, SuiResult};
36use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
37use sui_types::node_role::NodeRole;
38use sui_types::{
39 committee::EpochId, sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
40};
41use tokio::sync::{Mutex, broadcast};
42use tokio::time::{sleep, timeout};
43use tracing::{error, info};
44
45#[cfg(test)]
46#[path = "../unit_tests/consensus_manager_tests.rs"]
47pub mod consensus_manager_tests;
48
49#[derive(PartialEq)]
50enum Running {
51 True(EpochId, ProtocolVersion),
52 False,
53}
54
55struct AddressOverridesMap {
58 map: BTreeMap<
60 ConsensusNetworkPublicKey,
61 BTreeMap<sui_network::endpoint_manager::AddressSource, Vec<Multiaddr>>,
62 >,
63}
64
65impl AddressOverridesMap {
66 pub fn new() -> Self {
67 Self {
68 map: BTreeMap::new(),
69 }
70 }
71
72 pub fn insert(
73 &mut self,
74 network_pubkey: ConsensusNetworkPublicKey,
75 source: sui_network::endpoint_manager::AddressSource,
76 addresses: Vec<Multiaddr>,
77 ) {
78 self.map
79 .entry(network_pubkey)
80 .or_default()
81 .insert(source, addresses);
82 }
83
84 pub fn remove(
85 &mut self,
86 network_pubkey: ConsensusNetworkPublicKey,
87 source: sui_network::endpoint_manager::AddressSource,
88 ) {
89 self.map
90 .entry(network_pubkey.clone())
91 .or_default()
92 .remove(&source);
93
94 if self.map.get(&network_pubkey.clone()).unwrap().is_empty() {
96 self.map.remove(&network_pubkey);
97 }
98 }
99
100 pub fn get_highest_priority_source_and_address(
104 &self,
105 network_pubkey: ConsensusNetworkPublicKey,
106 ) -> Option<(sui_network::endpoint_manager::AddressSource, Multiaddr)> {
107 self.map
108 .get(&network_pubkey)
109 .and_then(|sources| sources.first_key_value())
110 .and_then(|(source, addresses)| {
111 addresses.first().cloned().map(|address| (*source, address))
112 })
113 }
114
115 pub fn get_all_highest_priority_addresses(
116 &self,
117 ) -> Vec<(ConsensusNetworkPublicKey, Multiaddr)> {
118 let mut result = Vec::new();
119
120 for (network_pubkey, sources) in self.map.iter() {
121 if let Some((_source, addresses)) = sources.first_key_value()
122 && let Some(address) = addresses.first()
123 {
124 result.push((network_pubkey.clone(), address.clone()));
125 }
126 }
127 result
128 }
129}
130
131fn apply_v3_threshold_overrides(committee: Committee) -> Committee {
137 let malicious_stake: Stake = std::env::var("SUI_CONSENSUS_V3_MALICIOUS_STAKE")
138 .ok()
139 .and_then(|s| s.parse().ok())
140 .unwrap_or(1_250);
141 let crash_stake: Stake = std::env::var("SUI_CONSENSUS_V3_CRASH_STAKE")
142 .ok()
143 .and_then(|s| s.parse().ok())
144 .unwrap_or(1_250);
145 info!(
146 "consensus_manager: applying v3 committee thresholds \
147 (malicious_stake={malicious_stake}, crash_stake={crash_stake})"
148 );
149 Committee::new_v3(
150 committee.epoch(),
151 committee.authorities_slice().to_vec(),
152 malicious_stake,
153 crash_stake,
154 )
155}
156
157fn to_consensus_protocol_config(config: &ProtocolConfig) -> ConsensusProtocolConfig {
158 let chain_type = match config.chain() {
159 Chain::Mainnet => ChainType::Mainnet,
160 Chain::Testnet => ChainType::Testnet,
161 Chain::Unknown => ChainType::Unknown,
162 };
163 ConsensusProtocolConfig::new(
164 config.version.as_u64(),
165 chain_type,
166 config.max_transaction_size_bytes(),
167 config.max_transactions_in_block_bytes(),
168 config.max_num_transactions_in_block(),
169 config.gc_depth(),
170 true,
171 config.mysticeti_num_leaders_per_round(),
172 config.consensus_bad_nodes_stake_threshold(),
173 false,
174 300,
175 12,
176 )
177}
178
179pub struct ConsensusManager {
182 consensus_config: ConsensusConfig,
183 protocol_keypair: Option<ProtocolKeyPair>,
184 network_keypair: NetworkKeyPair,
185 storage_base_path: PathBuf,
186 metrics: Arc<ConsensusManagerMetrics>,
187 registry_service: RegistryService,
188 authority: ArcSwapOption<(ConsensusAuthority, RegistryID)>,
189
190 client: Arc<LazyMysticetiClient>,
193 consensus_client: Arc<UpdatableConsensusClient>,
194
195 consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
196
197 #[cfg(test)]
198 pub(crate) consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
199 #[cfg(not(test))]
200 consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
201 consumer_monitor_sender: broadcast::Sender<Arc<CommitConsumerMonitor>>,
202
203 running: Mutex<Running>,
204
205 #[cfg(test)]
206 pub(crate) boot_counter: Mutex<u64>,
207 #[cfg(not(test))]
208 boot_counter: Mutex<u64>,
209
210 address_overrides: parking_lot::Mutex<AddressOverridesMap>,
213}
214
215impl ConsensusManager {
216 pub fn new(
217 node_config: &NodeConfig,
218 consensus_config: &ConsensusConfig,
219 registry_service: &RegistryService,
220 consensus_client: Arc<UpdatableConsensusClient>,
221 node_role: NodeRole,
222 ) -> Self {
223 let metrics = Arc::new(ConsensusManagerMetrics::new(
224 ®istry_service.default_registry(),
225 ));
226 let client = Arc::new(LazyMysticetiClient::new());
227 let (consumer_monitor_sender, _) = broadcast::channel(1);
228 let protocol_keypair = if node_role.is_validator() {
229 Some(ProtocolKeyPair::new(node_config.worker_key_pair().copy()))
230 } else {
231 None
232 };
233 Self {
234 consensus_config: consensus_config.clone(),
235 protocol_keypair,
236 network_keypair: NetworkKeyPair::new(node_config.network_key_pair().copy()),
237 storage_base_path: consensus_config.db_path().to_path_buf(),
238 metrics,
239 registry_service: registry_service.clone(),
240 authority: ArcSwapOption::empty(),
241 client,
242 consensus_client,
243 consensus_handler: Mutex::new(None),
244 consumer_monitor: ArcSwapOption::empty(),
245 consumer_monitor_sender,
246 running: Mutex::new(Running::False),
247 boot_counter: Mutex::new(0),
248 address_overrides: parking_lot::Mutex::new(AddressOverridesMap::new()),
249 }
250 }
251
252 pub async fn start(
253 &self,
254 node_config: &NodeConfig,
255 epoch_store: Arc<AuthorityPerEpochStore>,
256 consensus_handler_initializer: ConsensusHandlerInitializer,
257 tx_validator: SuiTxValidator,
258 randomness_signature_handler: Option<Arc<dyn RandomnessSignatureHandler>>,
259 ) {
260 let epoch = epoch_store.epoch();
261 let protocol_config = epoch_store.protocol_config();
262 let consensus_protocol_config = to_consensus_protocol_config(protocol_config);
263 let system_state = epoch_store.epoch_start_state();
264 let committee = if consensus_protocol_config.enable_v3() {
265 apply_v3_threshold_overrides(system_state.get_consensus_committee())
266 } else {
267 system_state.get_consensus_committee()
268 };
269
270 let start_time = Instant::now();
272 let mut running = self.running.lock().await;
273 if let Running::True(running_epoch, running_version) = *running {
274 error!(
275 "Consensus is already Running for epoch {running_epoch:?} & protocol version {running_version:?} - shutdown first before starting",
276 );
277 return;
278 }
279 *running = Running::True(epoch, protocol_config.version);
280
281 info!(
282 "Starting up consensus for epoch {epoch:?} & protocol version {:?}",
283 protocol_config.version
284 );
285
286 self.consensus_client.set(self.client.clone());
287
288 let consensus_config = node_config
289 .consensus_config()
290 .expect("consensus_config should exist");
291
292 let parameters = Parameters {
293 db_path: self.get_store_path(epoch),
294 listen_address_override: consensus_config.listen_address.clone(),
295 ..consensus_config.parameters.clone().unwrap_or_default()
296 };
297
298 let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
299
300 let consensus_handler = consensus_handler_initializer.new_consensus_handler();
301
302 let num_prior_commits = protocol_config.consensus_num_requested_prior_commits_at_startup();
303 let last_processed_commit_index =
304 consensus_handler.last_processed_subdag_index() as CommitIndex;
305 let replay_after_commit_index =
306 last_processed_commit_index.saturating_sub(num_prior_commits);
307
308 let (commit_consumer, commit_receiver) =
309 CommitConsumerArgs::new(replay_after_commit_index, last_processed_commit_index);
310 let monitor = commit_consumer.monitor();
311
312 let handler = MysticetiConsensusHandler::new(
314 last_processed_commit_index,
315 consensus_handler,
316 commit_receiver,
317 monitor.clone(),
318 );
319 let mut consensus_handler = self.consensus_handler.lock().await;
320 *consensus_handler = Some(handler);
321
322 let participated_on_previous_run =
326 if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
327 previous_monitor.highest_handled_commit() > 0
328 } else {
329 false
330 };
331
332 let mut boot_counter = self.boot_counter.lock().await;
337 if participated_on_previous_run {
338 *boot_counter += 1;
339 } else {
340 info!(
341 "Node has not participated in previous epoch consensus. Boot counter ({}) will not increment.",
342 *boot_counter
343 );
344 }
345
346 let authority = ConsensusAuthority::start(
347 NetworkType::Tonic,
348 epoch_store.epoch_start_config().epoch_start_timestamp_ms(),
349 committee.clone(),
350 parameters.clone(),
351 consensus_protocol_config,
352 self.protocol_keypair.clone(),
353 self.network_keypair.clone(),
354 Arc::new(Clock::default()),
355 Arc::new(tx_validator.clone()),
356 commit_consumer,
357 registry.clone(),
358 *boot_counter,
359 randomness_signature_handler,
360 )
361 .await;
362 let client = authority.transaction_client();
363
364 let registry_id = self.registry_service.add(registry.clone());
365
366 let registered_authority = Arc::new((authority, registry_id));
367 self.authority.swap(Some(registered_authority.clone()));
368
369 let highest_priority_addresses = self
371 .address_overrides
372 .lock()
373 .get_all_highest_priority_addresses();
374 for (network_pubkey, address) in highest_priority_addresses {
375 registered_authority
376 .0
377 .update_peer_address(network_pubkey, Some(address.clone()));
378 }
379
380 self.client.set(client);
382
383 let _ = self.consumer_monitor_sender.send(monitor);
385
386 let elapsed = start_time.elapsed().as_secs_f64();
387 self.metrics.start_latency.set(elapsed as i64);
388
389 tracing::info!(
390 "Started consensus for epoch {} & protocol version {:?} completed - took {} seconds",
391 epoch,
392 protocol_config.version,
393 elapsed
394 );
395 }
396
397 pub async fn shutdown(&self) {
398 info!("Shutting down consensus ...");
399
400 let start_time = Instant::now();
402 let mut running = self.running.lock().await;
403 let (shutdown_epoch, shutdown_version) = match *running {
404 Running::True(epoch, version) => {
405 tracing::info!(
406 "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
407 );
408 *running = Running::False;
409 (epoch, version)
410 }
411 Running::False => {
412 error!("Consensus shutdown was called but consensus is not running");
413 return;
414 }
415 };
416
417 self.client.clear();
419
420 let r = self.authority.swap(None).unwrap();
422 let Ok((authority, registry_id)) = Arc::try_unwrap(r) else {
423 panic!("Failed to retrieve the Mysticeti authority");
424 };
425
426 authority.stop().await;
428
429 let mut consensus_handler = self.consensus_handler.lock().await;
431 if let Some(mut handler) = consensus_handler.take() {
432 handler.abort().await;
433 }
434
435 self.registry_service.remove(registry_id);
437
438 self.consensus_client.clear();
439
440 let elapsed = start_time.elapsed().as_secs_f64();
441 self.metrics.shutdown_latency.set(elapsed as i64);
442
443 tracing::info!(
444 "Consensus stopped for epoch {shutdown_epoch:?} & protocol version {shutdown_version:?} is complete - took {} seconds",
445 elapsed
446 );
447 }
448
449 pub async fn is_running(&self) -> bool {
450 let running = self.running.lock().await;
451 matches!(*running, Running::True(_, _))
452 }
453
454 pub fn replay_waiter(&self) -> ReplayWaiter {
455 let consumer_monitor_receiver = self.consumer_monitor_sender.subscribe();
456 ReplayWaiter::new(consumer_monitor_receiver)
457 }
458
459 pub fn get_storage_base_path(&self) -> PathBuf {
460 self.consensus_config.db_path().to_path_buf()
461 }
462
463 pub fn consensus_store(&self) -> Option<Arc<RocksDBStore>> {
464 self.authority.load().as_ref().map(|a| a.0.store())
465 }
466
467 fn get_store_path(&self, epoch: EpochId) -> PathBuf {
468 let mut store_path = self.storage_base_path.clone();
469 store_path.push(format!("{}", epoch));
470 store_path
471 }
472}
473
474impl ConsensusAddressUpdater for ConsensusManager {
476 fn update_address(
477 &self,
478 network_pubkey: NetworkPublicKey,
479 source: sui_network::endpoint_manager::AddressSource,
480 addresses: Vec<Multiaddr>,
481 ) -> SuiResult<()> {
482 let network_pubkey = ConsensusNetworkPublicKey::new(network_pubkey.clone());
484
485 let highest_priority = {
487 let mut address_overrides = self.address_overrides.lock();
488
489 if addresses.is_empty() {
490 address_overrides.remove(network_pubkey.clone(), source);
491 } else {
492 address_overrides.insert(network_pubkey.clone(), source, addresses.clone());
493 }
494
495 address_overrides.get_highest_priority_source_and_address(network_pubkey.clone())
496 };
497 self.metrics.set_active_address_source(
498 &Hex::encode(network_pubkey.to_bytes()),
499 highest_priority.as_ref().map(|(source, _)| *source),
500 );
501
502 let address_to_apply = highest_priority.map(|(_, address)| address);
504 if let Some(authority) = self.authority.load_full() {
505 authority
506 .0
507 .update_peer_address(network_pubkey, address_to_apply);
508 Ok(())
509 } else {
510 info!(
511 "Consensus authority node is not running, address update persisted for peer {:?} from source {:?} and will be applied on next start",
512 network_pubkey, source
513 );
514 Err(SuiErrorKind::GenericAuthorityError {
515 error: "Consensus authority node is not running. Can not apply address update"
516 .to_string(),
517 }
518 .into())
519 }
520 }
521}
522
523#[derive(Default)]
526pub struct UpdatableConsensusClient {
527 client: ArcSwapOption<Arc<dyn ConsensusClient>>,
529}
530
531impl UpdatableConsensusClient {
532 pub fn new() -> Self {
533 Self {
534 client: ArcSwapOption::empty(),
535 }
536 }
537
538 async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
539 const START_TIMEOUT: Duration = Duration::from_secs(300);
540 const RETRY_INTERVAL: Duration = Duration::from_millis(100);
541 if let Ok(client) = timeout(START_TIMEOUT, async {
542 loop {
543 let Some(client) = self.client.load_full() else {
544 sleep(RETRY_INTERVAL).await;
545 continue;
546 };
547 return client;
548 }
549 })
550 .await
551 {
552 return client;
553 }
554
555 panic!(
556 "Timed out after {:?} waiting for Consensus to start!",
557 START_TIMEOUT,
558 );
559 }
560
561 pub fn set(&self, client: Arc<dyn ConsensusClient>) {
562 self.client.store(Some(Arc::new(client)));
563 }
564
565 pub fn clear(&self) {
566 self.client.store(None);
567 }
568}
569
570#[async_trait]
571impl ConsensusClient for UpdatableConsensusClient {
572 async fn submit(
573 &self,
574 transactions: &[ConsensusTransaction],
575 epoch_store: &Arc<AuthorityPerEpochStore>,
576 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
577 let client = self.get().await;
578 client.submit(transactions, epoch_store).await
579 }
580}
581
582pub struct ReplayWaiter {
584 consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
585}
586
587impl ReplayWaiter {
588 pub(crate) fn new(
589 consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
590 ) -> Self {
591 Self {
592 consumer_monitor_receiver,
593 }
594 }
595
596 pub(crate) async fn wait_for_replay(mut self) {
597 loop {
598 info!("Waiting for consensus to start replaying ...");
599 let Ok(monitor) = self.consumer_monitor_receiver.recv().await else {
600 continue;
601 };
602 info!("Waiting for consensus handler to finish replaying ...");
603 monitor
604 .replay_to_consumer_last_processed_commit_complete()
605 .await;
606 break;
607 }
608 }
609}
610
611impl Clone for ReplayWaiter {
612 fn clone(&self) -> Self {
613 Self {
614 consumer_monitor_receiver: self.consumer_monitor_receiver.resubscribe(),
615 }
616 }
617}
618
619pub struct ConsensusManagerMetrics {
620 start_latency: IntGauge,
621 shutdown_latency: IntGauge,
622 active_address_source: IntGaugeVec,
623}
624
625impl ConsensusManagerMetrics {
626 pub fn new(registry: &Registry) -> Self {
627 Self {
628 start_latency: register_int_gauge_with_registry!(
629 "consensus_manager_start_latency",
630 "The latency of starting up consensus nodes",
631 registry,
632 )
633 .unwrap(),
634 shutdown_latency: register_int_gauge_with_registry!(
635 "consensus_manager_shutdown_latency",
636 "The latency of shutting down consensus nodes",
637 registry,
638 )
639 .unwrap(),
640 active_address_source: register_int_gauge_vec_with_registry!(
641 "consensus_active_address_source",
642 "Active consensus address source per committee peer, encoded as the gauge \
643 value: 0=committee (no override active; the on-chain committee address is in \
644 use), 1=admin, 2=config, 3=discovery, 4=seed, 5=chain (override priority \
645 highest to lowest). One series per peer; `peer_id` is the full hex consensus \
646 network public key.",
647 &["peer_id"],
648 registry,
649 )
650 .unwrap(),
651 }
652 }
653
654 fn set_active_address_source(
658 &self,
659 peer_id: &str,
660 active: Option<sui_network::endpoint_manager::AddressSource>,
661 ) {
662 let code = active.map_or(
663 AddressSource::DEFAULT_ADDRESS_SOURCE_CODE,
664 sui_network::endpoint_manager::AddressSource::metric_code,
665 );
666 self.active_address_source
667 .with_label_values(&[peer_id])
668 .set(code);
669 }
670}