1use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
4use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient};
5use crate::consensus_handler::{ConsensusHandlerInitializer, MysticetiConsensusHandler};
6use crate::consensus_validator::SuiTxValidator;
7use crate::mysticeti_adapter::LazyMysticetiClient;
8use arc_swap::ArcSwapOption;
9use async_trait::async_trait;
10use consensus_config::{
11 ChainType, Committee, ConsensusProtocolConfig, NetworkKeyPair,
12 NetworkPublicKey as ConsensusNetworkPublicKey, Parameters, ProtocolKeyPair,
13};
14use consensus_core::{
15 Clock, CommitConsumerArgs, CommitConsumerMonitor, CommitIndex, ConsensusAuthority, NetworkType,
16};
17use core::panic;
18use fastcrypto::traits::KeyPair as _;
19use mysten_metrics::{RegistryID, RegistryService};
20use mysten_network::Multiaddr;
21use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
22use std::collections::BTreeMap;
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use sui_config::{ConsensusConfig, NodeConfig};
27use sui_network::endpoint_manager::ConsensusAddressUpdater;
28use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
29use sui_types::crypto::NetworkPublicKey;
30use sui_types::error::{SuiErrorKind, SuiResult};
31use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
32use sui_types::node_role::NodeRole;
33use sui_types::{
34 committee::EpochId, sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
35};
36use tokio::sync::{Mutex, broadcast};
37use tokio::time::{sleep, timeout};
38use tracing::{error, info};
39
40#[cfg(test)]
41#[path = "../unit_tests/consensus_manager_tests.rs"]
42pub mod consensus_manager_tests;
43
44#[derive(PartialEq)]
45enum Running {
46 True(EpochId, ProtocolVersion),
47 False,
48}
49
50struct AddressOverridesMap {
53 map: BTreeMap<
55 ConsensusNetworkPublicKey,
56 BTreeMap<sui_network::endpoint_manager::AddressSource, Vec<Multiaddr>>,
57 >,
58}
59
60impl AddressOverridesMap {
61 pub fn new() -> Self {
62 Self {
63 map: BTreeMap::new(),
64 }
65 }
66
67 pub fn insert(
68 &mut self,
69 network_pubkey: ConsensusNetworkPublicKey,
70 source: sui_network::endpoint_manager::AddressSource,
71 addresses: Vec<Multiaddr>,
72 ) {
73 self.map
74 .entry(network_pubkey)
75 .or_default()
76 .insert(source, addresses);
77 }
78
79 pub fn remove(
80 &mut self,
81 network_pubkey: ConsensusNetworkPublicKey,
82 source: sui_network::endpoint_manager::AddressSource,
83 ) {
84 self.map
85 .entry(network_pubkey.clone())
86 .or_default()
87 .remove(&source);
88
89 if self.map.get(&network_pubkey.clone()).unwrap().is_empty() {
91 self.map.remove(&network_pubkey);
92 }
93 }
94
95 pub fn get_highest_priority_address(
96 &self,
97 network_pubkey: ConsensusNetworkPublicKey,
98 ) -> Option<Multiaddr> {
99 self.map
100 .get(&network_pubkey)
101 .and_then(|sources| sources.first_key_value())
102 .and_then(|(_, addresses)| addresses.first().cloned())
103 }
104
105 pub fn get_all_highest_priority_addresses(
106 &self,
107 ) -> Vec<(ConsensusNetworkPublicKey, Multiaddr)> {
108 let mut result = Vec::new();
109
110 for (network_pubkey, sources) in self.map.iter() {
111 if let Some((_source, addresses)) = sources.first_key_value()
112 && let Some(address) = addresses.first()
113 {
114 result.push((network_pubkey.clone(), address.clone()));
115 }
116 }
117 result
118 }
119}
120
121fn to_consensus_protocol_config(config: &ProtocolConfig, chain: Chain) -> ConsensusProtocolConfig {
122 let chain_type = match chain {
123 Chain::Mainnet => ChainType::Mainnet,
124 Chain::Testnet => ChainType::Testnet,
125 Chain::Unknown => ChainType::Unknown,
126 };
127 ConsensusProtocolConfig::new(
128 config.version.as_u64(),
129 chain_type,
130 config.max_transaction_size_bytes(),
131 config.max_transactions_in_block_bytes(),
132 config.max_num_transactions_in_block(),
133 config.gc_depth(),
134 true,
135 config.mysticeti_num_leaders_per_round(),
136 config.consensus_bad_nodes_stake_threshold(),
137 false,
138 )
139}
140
141pub struct ConsensusManager {
144 consensus_config: ConsensusConfig,
145 protocol_keypair: Option<ProtocolKeyPair>,
146 network_keypair: NetworkKeyPair,
147 storage_base_path: PathBuf,
148 metrics: Arc<ConsensusManagerMetrics>,
149 registry_service: RegistryService,
150 authority: ArcSwapOption<(ConsensusAuthority, RegistryID)>,
151
152 client: Arc<LazyMysticetiClient>,
155 consensus_client: Arc<UpdatableConsensusClient>,
156
157 consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
158
159 #[cfg(test)]
160 pub(crate) consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
161 #[cfg(not(test))]
162 consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
163 consumer_monitor_sender: broadcast::Sender<Arc<CommitConsumerMonitor>>,
164
165 running: Mutex<Running>,
166
167 #[cfg(test)]
168 pub(crate) boot_counter: Mutex<u64>,
169 #[cfg(not(test))]
170 boot_counter: Mutex<u64>,
171
172 address_overrides: parking_lot::Mutex<AddressOverridesMap>,
175}
176
177impl ConsensusManager {
178 pub fn new(
179 node_config: &NodeConfig,
180 consensus_config: &ConsensusConfig,
181 registry_service: &RegistryService,
182 consensus_client: Arc<UpdatableConsensusClient>,
183 node_role: NodeRole,
184 ) -> Self {
185 let metrics = Arc::new(ConsensusManagerMetrics::new(
186 ®istry_service.default_registry(),
187 ));
188 let client = Arc::new(LazyMysticetiClient::new());
189 let (consumer_monitor_sender, _) = broadcast::channel(1);
190 let protocol_keypair = if node_role.is_validator() {
191 Some(ProtocolKeyPair::new(node_config.worker_key_pair().copy()))
192 } else {
193 None
194 };
195 Self {
196 consensus_config: consensus_config.clone(),
197 protocol_keypair,
198 network_keypair: NetworkKeyPair::new(node_config.network_key_pair().copy()),
199 storage_base_path: consensus_config.db_path().to_path_buf(),
200 metrics,
201 registry_service: registry_service.clone(),
202 authority: ArcSwapOption::empty(),
203 client,
204 consensus_client,
205 consensus_handler: Mutex::new(None),
206 consumer_monitor: ArcSwapOption::empty(),
207 consumer_monitor_sender,
208 running: Mutex::new(Running::False),
209 boot_counter: Mutex::new(0),
210 address_overrides: parking_lot::Mutex::new(AddressOverridesMap::new()),
211 }
212 }
213
214 pub async fn start(
215 &self,
216 node_config: &NodeConfig,
217 epoch_store: Arc<AuthorityPerEpochStore>,
218 consensus_handler_initializer: ConsensusHandlerInitializer,
219 tx_validator: SuiTxValidator,
220 ) {
221 let system_state = epoch_store.epoch_start_state();
222 let committee: Committee = system_state.get_consensus_committee();
223 let epoch = epoch_store.epoch();
224 let protocol_config = epoch_store.protocol_config();
225
226 let start_time = Instant::now();
228 let mut running = self.running.lock().await;
229 if let Running::True(running_epoch, running_version) = *running {
230 error!(
231 "Consensus is already Running for epoch {running_epoch:?} & protocol version {running_version:?} - shutdown first before starting",
232 );
233 return;
234 }
235 *running = Running::True(epoch, protocol_config.version);
236
237 info!(
238 "Starting up consensus for epoch {epoch:?} & protocol version {:?}",
239 protocol_config.version
240 );
241
242 self.consensus_client.set(self.client.clone());
243
244 let consensus_config = node_config
245 .consensus_config()
246 .expect("consensus_config should exist");
247
248 let parameters = Parameters {
249 db_path: self.get_store_path(epoch),
250 listen_address_override: consensus_config.listen_address.clone(),
251 ..consensus_config.parameters.clone().unwrap_or_default()
252 };
253
254 let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
255
256 let consensus_handler = consensus_handler_initializer.new_consensus_handler();
257
258 let num_prior_commits = protocol_config.consensus_num_requested_prior_commits_at_startup();
259 let last_processed_commit_index =
260 consensus_handler.last_processed_subdag_index() as CommitIndex;
261 let replay_after_commit_index =
262 last_processed_commit_index.saturating_sub(num_prior_commits);
263
264 let (commit_consumer, commit_receiver) =
265 CommitConsumerArgs::new(replay_after_commit_index, last_processed_commit_index);
266 let monitor = commit_consumer.monitor();
267
268 let node_role = epoch_store.node_role();
269
270 let handler = MysticetiConsensusHandler::new(
272 last_processed_commit_index,
273 consensus_handler,
274 commit_receiver,
275 monitor.clone(),
276 node_role.should_process_consensus_commits(),
277 );
278 let mut consensus_handler = self.consensus_handler.lock().await;
279 *consensus_handler = Some(handler);
280
281 let participated_on_previous_run =
285 if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
286 previous_monitor.highest_handled_commit() > 0
287 } else {
288 false
289 };
290
291 let mut boot_counter = self.boot_counter.lock().await;
296 if participated_on_previous_run {
297 *boot_counter += 1;
298 } else {
299 info!(
300 "Node has not participated in previous epoch consensus. Boot counter ({}) will not increment.",
301 *boot_counter
302 );
303 }
304
305 let authority = ConsensusAuthority::start(
306 NetworkType::Tonic,
307 epoch_store.epoch_start_config().epoch_start_timestamp_ms(),
308 committee.clone(),
309 parameters.clone(),
310 to_consensus_protocol_config(protocol_config, epoch_store.get_chain()),
311 self.protocol_keypair.clone(),
312 self.network_keypair.clone(),
313 Arc::new(Clock::default()),
314 Arc::new(tx_validator.clone()),
315 commit_consumer,
316 registry.clone(),
317 *boot_counter,
318 )
319 .await;
320 let client = authority.transaction_client();
321
322 let registry_id = self.registry_service.add(registry.clone());
323
324 let registered_authority = Arc::new((authority, registry_id));
325 self.authority.swap(Some(registered_authority.clone()));
326
327 let highest_priority_addresses = self
329 .address_overrides
330 .lock()
331 .get_all_highest_priority_addresses();
332 for (network_pubkey, address) in highest_priority_addresses {
333 registered_authority
334 .0
335 .update_peer_address(network_pubkey, Some(address.clone()));
336 }
337
338 self.client.set(client);
340
341 let _ = self.consumer_monitor_sender.send(monitor);
343
344 let elapsed = start_time.elapsed().as_secs_f64();
345 self.metrics.start_latency.set(elapsed as i64);
346
347 tracing::info!(
348 "Started consensus for epoch {} & protocol version {:?} completed - took {} seconds",
349 epoch,
350 protocol_config.version,
351 elapsed
352 );
353 }
354
355 pub async fn shutdown(&self) {
356 info!("Shutting down consensus ...");
357
358 let start_time = Instant::now();
360 let mut running = self.running.lock().await;
361 let (shutdown_epoch, shutdown_version) = match *running {
362 Running::True(epoch, version) => {
363 tracing::info!(
364 "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
365 );
366 *running = Running::False;
367 (epoch, version)
368 }
369 Running::False => {
370 error!("Consensus shutdown was called but consensus is not running");
371 return;
372 }
373 };
374
375 self.client.clear();
377
378 let r = self.authority.swap(None).unwrap();
380 let Ok((authority, registry_id)) = Arc::try_unwrap(r) else {
381 panic!("Failed to retrieve the Mysticeti authority");
382 };
383
384 authority.stop().await;
386
387 let mut consensus_handler = self.consensus_handler.lock().await;
389 if let Some(mut handler) = consensus_handler.take() {
390 handler.abort().await;
391 }
392
393 self.registry_service.remove(registry_id);
395
396 self.consensus_client.clear();
397
398 let elapsed = start_time.elapsed().as_secs_f64();
399 self.metrics.shutdown_latency.set(elapsed as i64);
400
401 tracing::info!(
402 "Consensus stopped for epoch {shutdown_epoch:?} & protocol version {shutdown_version:?} is complete - took {} seconds",
403 elapsed
404 );
405 }
406
407 pub async fn is_running(&self) -> bool {
408 let running = self.running.lock().await;
409 matches!(*running, Running::True(_, _))
410 }
411
412 pub fn replay_waiter(&self) -> ReplayWaiter {
413 let consumer_monitor_receiver = self.consumer_monitor_sender.subscribe();
414 ReplayWaiter::new(consumer_monitor_receiver)
415 }
416
417 pub fn get_storage_base_path(&self) -> PathBuf {
418 self.consensus_config.db_path().to_path_buf()
419 }
420
421 fn get_store_path(&self, epoch: EpochId) -> PathBuf {
422 let mut store_path = self.storage_base_path.clone();
423 store_path.push(format!("{}", epoch));
424 store_path
425 }
426}
427
428impl ConsensusAddressUpdater for ConsensusManager {
430 fn update_address(
431 &self,
432 network_pubkey: NetworkPublicKey,
433 source: sui_network::endpoint_manager::AddressSource,
434 addresses: Vec<Multiaddr>,
435 ) -> SuiResult<()> {
436 let network_pubkey = ConsensusNetworkPublicKey::new(network_pubkey.clone());
438
439 let address_to_apply = {
441 let mut address_overrides = self.address_overrides.lock();
442
443 if addresses.is_empty() {
444 address_overrides.remove(network_pubkey.clone(), source);
445 } else {
446 address_overrides.insert(network_pubkey.clone(), source, addresses.clone());
447 }
448
449 address_overrides.get_highest_priority_address(network_pubkey.clone())
450 };
451
452 if let Some(authority) = self.authority.load_full() {
454 authority
455 .0
456 .update_peer_address(network_pubkey, address_to_apply);
457 Ok(())
458 } else {
459 info!(
460 "Consensus authority node is not running, address update persisted for peer {:?} from source {:?} and will be applied on next start",
461 network_pubkey, source
462 );
463 Err(SuiErrorKind::GenericAuthorityError {
464 error: "Consensus authority node is not running. Can not apply address update"
465 .to_string(),
466 }
467 .into())
468 }
469 }
470}
471
472#[derive(Default)]
475pub struct UpdatableConsensusClient {
476 client: ArcSwapOption<Arc<dyn ConsensusClient>>,
478}
479
480impl UpdatableConsensusClient {
481 pub fn new() -> Self {
482 Self {
483 client: ArcSwapOption::empty(),
484 }
485 }
486
487 async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
488 const START_TIMEOUT: Duration = Duration::from_secs(300);
489 const RETRY_INTERVAL: Duration = Duration::from_millis(100);
490 if let Ok(client) = timeout(START_TIMEOUT, async {
491 loop {
492 let Some(client) = self.client.load_full() else {
493 sleep(RETRY_INTERVAL).await;
494 continue;
495 };
496 return client;
497 }
498 })
499 .await
500 {
501 return client;
502 }
503
504 panic!(
505 "Timed out after {:?} waiting for Consensus to start!",
506 START_TIMEOUT,
507 );
508 }
509
510 pub fn set(&self, client: Arc<dyn ConsensusClient>) {
511 self.client.store(Some(Arc::new(client)));
512 }
513
514 pub fn clear(&self) {
515 self.client.store(None);
516 }
517}
518
519#[async_trait]
520impl ConsensusClient for UpdatableConsensusClient {
521 async fn submit(
522 &self,
523 transactions: &[ConsensusTransaction],
524 epoch_store: &Arc<AuthorityPerEpochStore>,
525 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
526 let client = self.get().await;
527 client.submit(transactions, epoch_store).await
528 }
529}
530
531pub struct ReplayWaiter {
533 consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
534}
535
536impl ReplayWaiter {
537 pub(crate) fn new(
538 consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
539 ) -> Self {
540 Self {
541 consumer_monitor_receiver,
542 }
543 }
544
545 pub(crate) async fn wait_for_replay(mut self) {
546 loop {
547 info!("Waiting for consensus to start replaying ...");
548 let Ok(monitor) = self.consumer_monitor_receiver.recv().await else {
549 continue;
550 };
551 info!("Waiting for consensus handler to finish replaying ...");
552 monitor
553 .replay_to_consumer_last_processed_commit_complete()
554 .await;
555 break;
556 }
557 }
558}
559
560impl Clone for ReplayWaiter {
561 fn clone(&self) -> Self {
562 Self {
563 consumer_monitor_receiver: self.consumer_monitor_receiver.resubscribe(),
564 }
565 }
566}
567
568pub struct ConsensusManagerMetrics {
569 start_latency: IntGauge,
570 shutdown_latency: IntGauge,
571}
572
573impl ConsensusManagerMetrics {
574 pub fn new(registry: &Registry) -> Self {
575 Self {
576 start_latency: register_int_gauge_with_registry!(
577 "consensus_manager_start_latency",
578 "The latency of starting up consensus nodes",
579 registry,
580 )
581 .unwrap(),
582 shutdown_latency: register_int_gauge_with_registry!(
583 "consensus_manager_shutdown_latency",
584 "The latency of shutting down consensus nodes",
585 registry,
586 )
587 .unwrap(),
588 }
589 }
590}