1use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
4use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient};
5use crate::consensus_handler::{ConsensusHandlerInitializer, MysticetiConsensusHandler};
6use crate::consensus_validator::SuiTxValidator;
7use crate::mysticeti_adapter::LazyMysticetiClient;
8use arc_swap::ArcSwapOption;
9use async_trait::async_trait;
10use consensus_config::{
11 ChainType, Committee, ConsensusProtocolConfig, NetworkKeyPair,
12 NetworkPublicKey as ConsensusNetworkPublicKey, Parameters, ProtocolKeyPair,
13};
14use consensus_core::{
15 Clock, CommitConsumerArgs, CommitConsumerMonitor, CommitIndex, ConsensusAuthority, NetworkType,
16};
17use core::panic;
18use fastcrypto::traits::KeyPair as _;
19use mysten_metrics::{RegistryID, RegistryService};
20use mysten_network::Multiaddr;
21use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
22use std::collections::BTreeMap;
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use sui_config::{ConsensusConfig, NodeConfig};
27use sui_network::endpoint_manager::ConsensusAddressUpdater;
28use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
29use sui_types::crypto::NetworkPublicKey;
30use sui_types::error::{SuiErrorKind, SuiResult};
31use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
32use sui_types::{
33 committee::EpochId, sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
34};
35use tokio::sync::{Mutex, broadcast};
36use tokio::time::{sleep, timeout};
37use tracing::{error, info};
38
39#[cfg(test)]
40#[path = "../unit_tests/consensus_manager_tests.rs"]
41pub mod consensus_manager_tests;
42
43#[derive(PartialEq)]
44enum Running {
45 True(EpochId, ProtocolVersion),
46 False,
47}
48
49struct AddressOverridesMap {
52 map: BTreeMap<
54 ConsensusNetworkPublicKey,
55 BTreeMap<sui_network::endpoint_manager::AddressSource, Vec<Multiaddr>>,
56 >,
57}
58
59impl AddressOverridesMap {
60 pub fn new() -> Self {
61 Self {
62 map: BTreeMap::new(),
63 }
64 }
65
66 pub fn insert(
67 &mut self,
68 network_pubkey: ConsensusNetworkPublicKey,
69 source: sui_network::endpoint_manager::AddressSource,
70 addresses: Vec<Multiaddr>,
71 ) {
72 self.map
73 .entry(network_pubkey)
74 .or_default()
75 .insert(source, addresses);
76 }
77
78 pub fn remove(
79 &mut self,
80 network_pubkey: ConsensusNetworkPublicKey,
81 source: sui_network::endpoint_manager::AddressSource,
82 ) {
83 self.map
84 .entry(network_pubkey.clone())
85 .or_default()
86 .remove(&source);
87
88 if self.map.get(&network_pubkey.clone()).unwrap().is_empty() {
90 self.map.remove(&network_pubkey);
91 }
92 }
93
94 pub fn get_highest_priority_address(
95 &self,
96 network_pubkey: ConsensusNetworkPublicKey,
97 ) -> Option<Multiaddr> {
98 self.map
99 .get(&network_pubkey)
100 .and_then(|sources| sources.first_key_value())
101 .and_then(|(_, addresses)| addresses.first().cloned())
102 }
103
104 pub fn get_all_highest_priority_addresses(
105 &self,
106 ) -> Vec<(ConsensusNetworkPublicKey, Multiaddr)> {
107 let mut result = Vec::new();
108
109 for (network_pubkey, sources) in self.map.iter() {
110 if let Some((_source, addresses)) = sources.first_key_value()
111 && let Some(address) = addresses.first()
112 {
113 result.push((network_pubkey.clone(), address.clone()));
114 }
115 }
116 result
117 }
118}
119
120fn to_consensus_protocol_config(config: &ProtocolConfig, chain: Chain) -> ConsensusProtocolConfig {
121 let chain_type = match chain {
122 Chain::Mainnet => ChainType::Mainnet,
123 Chain::Testnet => ChainType::Testnet,
124 Chain::Unknown => ChainType::Unknown,
125 };
126 ConsensusProtocolConfig::new(
127 config.version.as_u64(),
128 chain_type,
129 config.max_transaction_size_bytes(),
130 config.max_transactions_in_block_bytes(),
131 config.max_num_transactions_in_block(),
132 config.gc_depth(),
133 true,
134 config.mysticeti_num_leaders_per_round(),
135 config.consensus_bad_nodes_stake_threshold(),
136 )
137}
138
139pub struct ConsensusManager {
141 consensus_config: ConsensusConfig,
142 protocol_keypair: ProtocolKeyPair,
143 network_keypair: NetworkKeyPair,
144 storage_base_path: PathBuf,
145 metrics: Arc<ConsensusManagerMetrics>,
146 registry_service: RegistryService,
147 authority: ArcSwapOption<(ConsensusAuthority, RegistryID)>,
148
149 client: Arc<LazyMysticetiClient>,
152 consensus_client: Arc<UpdatableConsensusClient>,
153
154 consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
155
156 #[cfg(test)]
157 pub(crate) consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
158 #[cfg(not(test))]
159 consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
160 consumer_monitor_sender: broadcast::Sender<Arc<CommitConsumerMonitor>>,
161
162 running: Mutex<Running>,
163
164 #[cfg(test)]
165 pub(crate) boot_counter: Mutex<u64>,
166 #[cfg(not(test))]
167 boot_counter: Mutex<u64>,
168
169 address_overrides: Mutex<AddressOverridesMap>,
172}
173
174impl ConsensusManager {
175 pub fn new(
176 node_config: &NodeConfig,
177 consensus_config: &ConsensusConfig,
178 registry_service: &RegistryService,
179 consensus_client: Arc<UpdatableConsensusClient>,
180 ) -> Self {
181 let metrics = Arc::new(ConsensusManagerMetrics::new(
182 ®istry_service.default_registry(),
183 ));
184 let client = Arc::new(LazyMysticetiClient::new());
185 let (consumer_monitor_sender, _) = broadcast::channel(1);
186 Self {
187 consensus_config: consensus_config.clone(),
188 protocol_keypair: ProtocolKeyPair::new(node_config.worker_key_pair().copy()),
189 network_keypair: NetworkKeyPair::new(node_config.network_key_pair().copy()),
190 storage_base_path: consensus_config.db_path().to_path_buf(),
191 metrics,
192 registry_service: registry_service.clone(),
193 authority: ArcSwapOption::empty(),
194 client,
195 consensus_client,
196 consensus_handler: Mutex::new(None),
197 consumer_monitor: ArcSwapOption::empty(),
198 consumer_monitor_sender,
199 running: Mutex::new(Running::False),
200 boot_counter: Mutex::new(0),
201 address_overrides: Mutex::new(AddressOverridesMap::new()),
202 }
203 }
204
205 pub async fn start(
206 &self,
207 node_config: &NodeConfig,
208 epoch_store: Arc<AuthorityPerEpochStore>,
209 consensus_handler_initializer: ConsensusHandlerInitializer,
210 tx_validator: SuiTxValidator,
211 ) {
212 let system_state = epoch_store.epoch_start_state();
213 let committee: Committee = system_state.get_consensus_committee();
214 let epoch = epoch_store.epoch();
215 let protocol_config = epoch_store.protocol_config();
216
217 let start_time = Instant::now();
219 let mut running = self.running.lock().await;
220 if let Running::True(running_epoch, running_version) = *running {
221 error!(
222 "Consensus is already Running for epoch {running_epoch:?} & protocol version {running_version:?} - shutdown first before starting",
223 );
224 return;
225 }
226 *running = Running::True(epoch, protocol_config.version);
227
228 info!(
229 "Starting up consensus for epoch {epoch:?} & protocol version {:?}",
230 protocol_config.version
231 );
232
233 self.consensus_client.set(self.client.clone());
234
235 let consensus_config = node_config
236 .consensus_config()
237 .expect("consensus_config should exist");
238
239 let parameters = Parameters {
240 db_path: self.get_store_path(epoch),
241 listen_address_override: consensus_config.listen_address.clone(),
242 ..consensus_config.parameters.clone().unwrap_or_default()
243 };
244
245 let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
246
247 let consensus_handler = consensus_handler_initializer.new_consensus_handler();
248
249 let num_prior_commits = protocol_config.consensus_num_requested_prior_commits_at_startup();
250 let last_processed_commit_index =
251 consensus_handler.last_processed_subdag_index() as CommitIndex;
252 let replay_after_commit_index =
253 last_processed_commit_index.saturating_sub(num_prior_commits);
254
255 let (commit_consumer, commit_receiver) =
256 CommitConsumerArgs::new(replay_after_commit_index, last_processed_commit_index);
257 let monitor = commit_consumer.monitor();
258
259 let handler = MysticetiConsensusHandler::new(
261 last_processed_commit_index,
262 consensus_handler,
263 commit_receiver,
264 monitor.clone(),
265 );
266 let mut consensus_handler = self.consensus_handler.lock().await;
267 *consensus_handler = Some(handler);
268
269 let participated_on_previous_run =
273 if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
274 previous_monitor.highest_handled_commit() > 0
275 } else {
276 false
277 };
278
279 let mut boot_counter = self.boot_counter.lock().await;
284 if participated_on_previous_run {
285 *boot_counter += 1;
286 } else {
287 info!(
288 "Node has not participated in previous epoch consensus. Boot counter ({}) will not increment.",
289 *boot_counter
290 );
291 }
292
293 let authority = ConsensusAuthority::start(
294 NetworkType::Tonic,
295 epoch_store.epoch_start_config().epoch_start_timestamp_ms(),
296 committee.clone(),
297 parameters.clone(),
298 to_consensus_protocol_config(protocol_config, epoch_store.get_chain()),
299 Some(self.protocol_keypair.clone()),
300 self.network_keypair.clone(),
301 Arc::new(Clock::default()),
302 Arc::new(tx_validator.clone()),
303 commit_consumer,
304 registry.clone(),
305 *boot_counter,
306 )
307 .await;
308 let client = authority.transaction_client();
309
310 let registry_id = self.registry_service.add(registry.clone());
311
312 let registered_authority = Arc::new((authority, registry_id));
313 self.authority.swap(Some(registered_authority.clone()));
314
315 let address_overrides = self.address_overrides.lock().await;
317 let highest_priority_addresses = address_overrides.get_all_highest_priority_addresses();
318 for (network_pubkey, address) in highest_priority_addresses {
319 registered_authority
320 .0
321 .update_peer_address(network_pubkey, Some(address.clone()));
322 }
323
324 self.client.set(client);
326
327 let _ = self.consumer_monitor_sender.send(monitor);
329
330 let elapsed = start_time.elapsed().as_secs_f64();
331 self.metrics.start_latency.set(elapsed as i64);
332
333 tracing::info!(
334 "Started consensus for epoch {} & protocol version {:?} completed - took {} seconds",
335 epoch,
336 protocol_config.version,
337 elapsed
338 );
339 }
340
341 pub async fn shutdown(&self) {
342 info!("Shutting down consensus ...");
343
344 let start_time = Instant::now();
346 let mut running = self.running.lock().await;
347 let (shutdown_epoch, shutdown_version) = match *running {
348 Running::True(epoch, version) => {
349 tracing::info!(
350 "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
351 );
352 *running = Running::False;
353 (epoch, version)
354 }
355 Running::False => {
356 error!("Consensus shutdown was called but consensus is not running");
357 return;
358 }
359 };
360
361 self.client.clear();
363
364 let r = self.authority.swap(None).unwrap();
366 let Ok((authority, registry_id)) = Arc::try_unwrap(r) else {
367 panic!("Failed to retrieve the Mysticeti authority");
368 };
369
370 authority.stop().await;
372
373 let mut consensus_handler = self.consensus_handler.lock().await;
375 if let Some(mut handler) = consensus_handler.take() {
376 handler.abort().await;
377 }
378
379 self.registry_service.remove(registry_id);
381
382 self.consensus_client.clear();
383
384 let elapsed = start_time.elapsed().as_secs_f64();
385 self.metrics.shutdown_latency.set(elapsed as i64);
386
387 tracing::info!(
388 "Consensus stopped for epoch {shutdown_epoch:?} & protocol version {shutdown_version:?} is complete - took {} seconds",
389 elapsed
390 );
391 }
392
393 pub async fn is_running(&self) -> bool {
394 let running = self.running.lock().await;
395 matches!(*running, Running::True(_, _))
396 }
397
398 pub fn replay_waiter(&self) -> ReplayWaiter {
399 let consumer_monitor_receiver = self.consumer_monitor_sender.subscribe();
400 ReplayWaiter::new(consumer_monitor_receiver)
401 }
402
403 pub fn get_storage_base_path(&self) -> PathBuf {
404 self.consensus_config.db_path().to_path_buf()
405 }
406
407 fn get_store_path(&self, epoch: EpochId) -> PathBuf {
408 let mut store_path = self.storage_base_path.clone();
409 store_path.push(format!("{}", epoch));
410 store_path
411 }
412}
413
414impl ConsensusAddressUpdater for ConsensusManager {
416 fn update_address(
417 &self,
418 network_pubkey: NetworkPublicKey,
419 source: sui_network::endpoint_manager::AddressSource,
420 addresses: Vec<Multiaddr>,
421 ) -> SuiResult<()> {
422 let network_pubkey = ConsensusNetworkPublicKey::new(network_pubkey.clone());
424
425 let address_to_apply = {
427 let mut address_overrides = self.address_overrides.blocking_lock();
428
429 if addresses.is_empty() {
430 address_overrides.remove(network_pubkey.clone(), source);
431 } else {
432 address_overrides.insert(network_pubkey.clone(), source, addresses.clone());
433 }
434
435 address_overrides.get_highest_priority_address(network_pubkey.clone())
436 };
437
438 if let Some(authority) = self.authority.load_full() {
440 authority
441 .0
442 .update_peer_address(network_pubkey, address_to_apply);
443 Ok(())
444 } else {
445 info!(
446 "Consensus authority node is not running, address update persisted for peer {:?} from source {:?} and will be applied on next start",
447 network_pubkey, source
448 );
449 Err(SuiErrorKind::GenericAuthorityError {
450 error: "Consensus authority node is not running. Can not apply address update"
451 .to_string(),
452 }
453 .into())
454 }
455 }
456}
457
458#[derive(Default)]
461pub struct UpdatableConsensusClient {
462 client: ArcSwapOption<Arc<dyn ConsensusClient>>,
464}
465
466impl UpdatableConsensusClient {
467 pub fn new() -> Self {
468 Self {
469 client: ArcSwapOption::empty(),
470 }
471 }
472
473 async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
474 const START_TIMEOUT: Duration = Duration::from_secs(300);
475 const RETRY_INTERVAL: Duration = Duration::from_millis(100);
476 if let Ok(client) = timeout(START_TIMEOUT, async {
477 loop {
478 let Some(client) = self.client.load_full() else {
479 sleep(RETRY_INTERVAL).await;
480 continue;
481 };
482 return client;
483 }
484 })
485 .await
486 {
487 return client;
488 }
489
490 panic!(
491 "Timed out after {:?} waiting for Consensus to start!",
492 START_TIMEOUT,
493 );
494 }
495
496 pub fn set(&self, client: Arc<dyn ConsensusClient>) {
497 self.client.store(Some(Arc::new(client)));
498 }
499
500 pub fn clear(&self) {
501 self.client.store(None);
502 }
503}
504
505#[async_trait]
506impl ConsensusClient for UpdatableConsensusClient {
507 async fn submit(
508 &self,
509 transactions: &[ConsensusTransaction],
510 epoch_store: &Arc<AuthorityPerEpochStore>,
511 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
512 let client = self.get().await;
513 client.submit(transactions, epoch_store).await
514 }
515}
516
517pub struct ReplayWaiter {
519 consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
520}
521
522impl ReplayWaiter {
523 pub(crate) fn new(
524 consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
525 ) -> Self {
526 Self {
527 consumer_monitor_receiver,
528 }
529 }
530
531 pub(crate) async fn wait_for_replay(mut self) {
532 loop {
533 info!("Waiting for consensus to start replaying ...");
534 let Ok(monitor) = self.consumer_monitor_receiver.recv().await else {
535 continue;
536 };
537 info!("Waiting for consensus handler to finish replaying ...");
538 monitor
539 .replay_to_consumer_last_processed_commit_complete()
540 .await;
541 break;
542 }
543 }
544}
545
546impl Clone for ReplayWaiter {
547 fn clone(&self) -> Self {
548 Self {
549 consumer_monitor_receiver: self.consumer_monitor_receiver.resubscribe(),
550 }
551 }
552}
553
554pub struct ConsensusManagerMetrics {
555 start_latency: IntGauge,
556 shutdown_latency: IntGauge,
557}
558
559impl ConsensusManagerMetrics {
560 pub fn new(registry: &Registry) -> Self {
561 Self {
562 start_latency: register_int_gauge_with_registry!(
563 "consensus_manager_start_latency",
564 "The latency of starting up consensus nodes",
565 registry,
566 )
567 .unwrap(),
568 shutdown_latency: register_int_gauge_with_registry!(
569 "consensus_manager_shutdown_latency",
570 "The latency of shutting down consensus nodes",
571 registry,
572 )
573 .unwrap(),
574 }
575 }
576}