1use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
4use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient};
5use crate::consensus_handler::{ConsensusHandlerInitializer, MysticetiConsensusHandler};
6use crate::consensus_validator::SuiTxValidator;
7use crate::mysticeti_adapter::LazyMysticetiClient;
8use arc_swap::ArcSwapOption;
9use async_trait::async_trait;
10use consensus_config::{
11 Committee, NetworkKeyPair, NetworkPublicKey as ConsensusNetworkPublicKey, Parameters,
12 ProtocolKeyPair,
13};
14use consensus_core::{
15 Clock, CommitConsumerArgs, CommitConsumerMonitor, CommitIndex, ConsensusAuthority, NetworkType,
16};
17use core::panic;
18use fastcrypto::traits::KeyPair as _;
19use mysten_metrics::{RegistryID, RegistryService};
20use mysten_network::Multiaddr;
21use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
22use std::collections::BTreeMap;
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use sui_config::{ConsensusConfig, NodeConfig};
27use sui_network::endpoint_manager::ConsensusAddressUpdater;
28use sui_protocol_config::ProtocolVersion;
29use sui_types::crypto::NetworkPublicKey;
30use sui_types::error::{SuiErrorKind, SuiResult};
31use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
32use sui_types::{
33 committee::EpochId, sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
34};
35use tokio::sync::{Mutex, broadcast};
36use tokio::time::{sleep, timeout};
37use tracing::{error, info};
38
39#[cfg(test)]
40#[path = "../unit_tests/consensus_manager_tests.rs"]
41pub mod consensus_manager_tests;
42
43#[derive(PartialEq)]
44enum Running {
45 True(EpochId, ProtocolVersion),
46 False,
47}
48
49struct AddressOverridesMap {
52 map: BTreeMap<
54 ConsensusNetworkPublicKey,
55 BTreeMap<sui_network::endpoint_manager::AddressSource, Vec<Multiaddr>>,
56 >,
57}
58
59impl AddressOverridesMap {
60 pub fn new() -> Self {
61 Self {
62 map: BTreeMap::new(),
63 }
64 }
65
66 pub fn insert(
67 &mut self,
68 network_pubkey: ConsensusNetworkPublicKey,
69 source: sui_network::endpoint_manager::AddressSource,
70 addresses: Vec<Multiaddr>,
71 ) {
72 self.map
73 .entry(network_pubkey)
74 .or_default()
75 .insert(source, addresses);
76 }
77
78 pub fn remove(
79 &mut self,
80 network_pubkey: ConsensusNetworkPublicKey,
81 source: sui_network::endpoint_manager::AddressSource,
82 ) {
83 self.map
84 .entry(network_pubkey.clone())
85 .or_default()
86 .remove(&source);
87
88 if self.map.get(&network_pubkey.clone()).unwrap().is_empty() {
90 self.map.remove(&network_pubkey);
91 }
92 }
93
94 pub fn get_highest_priority_address(
95 &self,
96 network_pubkey: ConsensusNetworkPublicKey,
97 ) -> Option<Multiaddr> {
98 self.map
99 .get(&network_pubkey)
100 .and_then(|sources| sources.first_key_value())
101 .and_then(|(_, addresses)| addresses.first().cloned())
102 }
103
104 pub fn get_all_highest_priority_addresses(
105 &self,
106 ) -> Vec<(ConsensusNetworkPublicKey, Multiaddr)> {
107 let mut result = Vec::new();
108
109 for (network_pubkey, sources) in self.map.iter() {
110 if let Some((_source, addresses)) = sources.first_key_value()
111 && let Some(address) = addresses.first()
112 {
113 result.push((network_pubkey.clone(), address.clone()));
114 }
115 }
116 result
117 }
118}
119
120pub struct ConsensusManager {
122 consensus_config: ConsensusConfig,
123 protocol_keypair: ProtocolKeyPair,
124 network_keypair: NetworkKeyPair,
125 storage_base_path: PathBuf,
126 metrics: Arc<ConsensusManagerMetrics>,
127 registry_service: RegistryService,
128 authority: ArcSwapOption<(ConsensusAuthority, RegistryID)>,
129
130 client: Arc<LazyMysticetiClient>,
133 consensus_client: Arc<UpdatableConsensusClient>,
134
135 consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
136
137 #[cfg(test)]
138 pub(crate) consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
139 #[cfg(not(test))]
140 consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
141 consumer_monitor_sender: broadcast::Sender<Arc<CommitConsumerMonitor>>,
142
143 running: Mutex<Running>,
144
145 #[cfg(test)]
146 pub(crate) boot_counter: Mutex<u64>,
147 #[cfg(not(test))]
148 boot_counter: Mutex<u64>,
149
150 address_overrides: Mutex<AddressOverridesMap>,
153}
154
155impl ConsensusManager {
156 pub fn new(
157 node_config: &NodeConfig,
158 consensus_config: &ConsensusConfig,
159 registry_service: &RegistryService,
160 consensus_client: Arc<UpdatableConsensusClient>,
161 ) -> Self {
162 let metrics = Arc::new(ConsensusManagerMetrics::new(
163 ®istry_service.default_registry(),
164 ));
165 let client = Arc::new(LazyMysticetiClient::new());
166 let (consumer_monitor_sender, _) = broadcast::channel(1);
167 Self {
168 consensus_config: consensus_config.clone(),
169 protocol_keypair: ProtocolKeyPair::new(node_config.worker_key_pair().copy()),
170 network_keypair: NetworkKeyPair::new(node_config.network_key_pair().copy()),
171 storage_base_path: consensus_config.db_path().to_path_buf(),
172 metrics,
173 registry_service: registry_service.clone(),
174 authority: ArcSwapOption::empty(),
175 client,
176 consensus_client,
177 consensus_handler: Mutex::new(None),
178 consumer_monitor: ArcSwapOption::empty(),
179 consumer_monitor_sender,
180 running: Mutex::new(Running::False),
181 boot_counter: Mutex::new(0),
182 address_overrides: Mutex::new(AddressOverridesMap::new()),
183 }
184 }
185
186 pub async fn start(
187 &self,
188 node_config: &NodeConfig,
189 epoch_store: Arc<AuthorityPerEpochStore>,
190 consensus_handler_initializer: ConsensusHandlerInitializer,
191 tx_validator: SuiTxValidator,
192 ) {
193 let system_state = epoch_store.epoch_start_state();
194 let committee: Committee = system_state.get_consensus_committee();
195 let epoch = epoch_store.epoch();
196 let protocol_config = epoch_store.protocol_config();
197
198 let start_time = Instant::now();
200 let mut running = self.running.lock().await;
201 if let Running::True(running_epoch, running_version) = *running {
202 error!(
203 "Consensus is already Running for epoch {running_epoch:?} & protocol version {running_version:?} - shutdown first before starting",
204 );
205 return;
206 }
207 *running = Running::True(epoch, protocol_config.version);
208
209 info!(
210 "Starting up consensus for epoch {epoch:?} & protocol version {:?}",
211 protocol_config.version
212 );
213
214 self.consensus_client.set(self.client.clone());
215
216 let consensus_config = node_config
217 .consensus_config()
218 .expect("consensus_config should exist");
219
220 let parameters = Parameters {
221 db_path: self.get_store_path(epoch),
222 listen_address_override: consensus_config.listen_address.clone(),
223 ..consensus_config.parameters.clone().unwrap_or_default()
224 };
225
226 let own_protocol_key = self.protocol_keypair.public();
227 let (own_index, _) = committee
228 .authorities()
229 .find(|(_, a)| a.protocol_key == own_protocol_key)
230 .expect("Own authority should be among the consensus authorities!");
231
232 let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
233
234 let consensus_handler = consensus_handler_initializer.new_consensus_handler();
235
236 let num_prior_commits = protocol_config.consensus_num_requested_prior_commits_at_startup();
237 let last_processed_commit_index =
238 consensus_handler.last_processed_subdag_index() as CommitIndex;
239 let replay_after_commit_index =
240 last_processed_commit_index.saturating_sub(num_prior_commits);
241
242 let (commit_consumer, commit_receiver) =
243 CommitConsumerArgs::new(replay_after_commit_index, last_processed_commit_index);
244 let monitor = commit_consumer.monitor();
245
246 let handler = MysticetiConsensusHandler::new(
248 last_processed_commit_index,
249 consensus_handler,
250 commit_receiver,
251 monitor.clone(),
252 );
253 let mut consensus_handler = self.consensus_handler.lock().await;
254 *consensus_handler = Some(handler);
255
256 let participated_on_previous_run =
260 if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
261 previous_monitor.highest_handled_commit() > 0
262 } else {
263 false
264 };
265
266 let mut boot_counter = self.boot_counter.lock().await;
271 if participated_on_previous_run {
272 *boot_counter += 1;
273 } else {
274 info!(
275 "Node has not participated in previous epoch consensus. Boot counter ({}) will not increment.",
276 *boot_counter
277 );
278 }
279
280 let authority = ConsensusAuthority::start(
281 NetworkType::Tonic,
282 epoch_store.epoch_start_config().epoch_start_timestamp_ms(),
283 own_index,
284 committee.clone(),
285 parameters.clone(),
286 protocol_config.clone(),
287 self.protocol_keypair.clone(),
288 self.network_keypair.clone(),
289 Arc::new(Clock::default()),
290 Arc::new(tx_validator.clone()),
291 commit_consumer,
292 registry.clone(),
293 *boot_counter,
294 )
295 .await;
296 let client = authority.transaction_client();
297
298 let registry_id = self.registry_service.add(registry.clone());
299
300 let registered_authority = Arc::new((authority, registry_id));
301 self.authority.swap(Some(registered_authority.clone()));
302
303 let address_overrides = self.address_overrides.lock().await;
305 let highest_priority_addresses = address_overrides.get_all_highest_priority_addresses();
306 for (network_pubkey, address) in highest_priority_addresses {
307 registered_authority
308 .0
309 .update_peer_address(network_pubkey, Some(address.clone()));
310 }
311
312 self.client.set(client);
314
315 let _ = self.consumer_monitor_sender.send(monitor);
317
318 let elapsed = start_time.elapsed().as_secs_f64();
319 self.metrics.start_latency.set(elapsed as i64);
320
321 tracing::info!(
322 "Started consensus for epoch {} & protocol version {:?} completed - took {} seconds",
323 epoch,
324 protocol_config.version,
325 elapsed
326 );
327 }
328
329 pub async fn shutdown(&self) {
330 info!("Shutting down consensus ...");
331
332 let start_time = Instant::now();
334 let mut running = self.running.lock().await;
335 let (shutdown_epoch, shutdown_version) = match *running {
336 Running::True(epoch, version) => {
337 tracing::info!(
338 "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
339 );
340 *running = Running::False;
341 (epoch, version)
342 }
343 Running::False => {
344 error!("Consensus shutdown was called but consensus is not running");
345 return;
346 }
347 };
348
349 self.client.clear();
351
352 let r = self.authority.swap(None).unwrap();
354 let Ok((authority, registry_id)) = Arc::try_unwrap(r) else {
355 panic!("Failed to retrieve the Mysticeti authority");
356 };
357
358 authority.stop().await;
360
361 let mut consensus_handler = self.consensus_handler.lock().await;
363 if let Some(mut handler) = consensus_handler.take() {
364 handler.abort().await;
365 }
366
367 self.registry_service.remove(registry_id);
369
370 self.consensus_client.clear();
371
372 let elapsed = start_time.elapsed().as_secs_f64();
373 self.metrics.shutdown_latency.set(elapsed as i64);
374
375 tracing::info!(
376 "Consensus stopped for epoch {shutdown_epoch:?} & protocol version {shutdown_version:?} is complete - took {} seconds",
377 elapsed
378 );
379 }
380
381 pub async fn is_running(&self) -> bool {
382 let running = self.running.lock().await;
383 matches!(*running, Running::True(_, _))
384 }
385
386 pub fn replay_waiter(&self) -> ReplayWaiter {
387 let consumer_monitor_receiver = self.consumer_monitor_sender.subscribe();
388 ReplayWaiter::new(consumer_monitor_receiver)
389 }
390
391 pub fn get_storage_base_path(&self) -> PathBuf {
392 self.consensus_config.db_path().to_path_buf()
393 }
394
395 fn get_store_path(&self, epoch: EpochId) -> PathBuf {
396 let mut store_path = self.storage_base_path.clone();
397 store_path.push(format!("{}", epoch));
398 store_path
399 }
400}
401
402impl ConsensusAddressUpdater for ConsensusManager {
404 fn update_address(
405 &self,
406 network_pubkey: NetworkPublicKey,
407 source: sui_network::endpoint_manager::AddressSource,
408 addresses: Vec<Multiaddr>,
409 ) -> SuiResult<()> {
410 let network_pubkey = ConsensusNetworkPublicKey::new(network_pubkey.clone());
412
413 let address_to_apply = {
415 let mut address_overrides = self.address_overrides.blocking_lock();
416
417 if addresses.is_empty() {
418 address_overrides.remove(network_pubkey.clone(), source);
419 } else {
420 address_overrides.insert(network_pubkey.clone(), source, addresses.clone());
421 }
422
423 address_overrides.get_highest_priority_address(network_pubkey.clone())
424 };
425
426 if let Some(authority) = self.authority.load_full() {
428 authority
429 .0
430 .update_peer_address(network_pubkey, address_to_apply);
431 Ok(())
432 } else {
433 info!(
434 "Consensus authority node is not running, address update persisted for peer {:?} from source {:?} and will be applied on next start",
435 network_pubkey, source
436 );
437 Err(SuiErrorKind::GenericAuthorityError {
438 error: "Consensus authority node is not running. Can not apply address update"
439 .to_string(),
440 }
441 .into())
442 }
443 }
444}
445
446#[derive(Default)]
449pub struct UpdatableConsensusClient {
450 client: ArcSwapOption<Arc<dyn ConsensusClient>>,
452}
453
454impl UpdatableConsensusClient {
455 pub fn new() -> Self {
456 Self {
457 client: ArcSwapOption::empty(),
458 }
459 }
460
461 async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
462 const START_TIMEOUT: Duration = Duration::from_secs(300);
463 const RETRY_INTERVAL: Duration = Duration::from_millis(100);
464 if let Ok(client) = timeout(START_TIMEOUT, async {
465 loop {
466 let Some(client) = self.client.load_full() else {
467 sleep(RETRY_INTERVAL).await;
468 continue;
469 };
470 return client;
471 }
472 })
473 .await
474 {
475 return client;
476 }
477
478 panic!(
479 "Timed out after {:?} waiting for Consensus to start!",
480 START_TIMEOUT,
481 );
482 }
483
484 pub fn set(&self, client: Arc<dyn ConsensusClient>) {
485 self.client.store(Some(Arc::new(client)));
486 }
487
488 pub fn clear(&self) {
489 self.client.store(None);
490 }
491}
492
493#[async_trait]
494impl ConsensusClient for UpdatableConsensusClient {
495 async fn submit(
496 &self,
497 transactions: &[ConsensusTransaction],
498 epoch_store: &Arc<AuthorityPerEpochStore>,
499 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
500 let client = self.get().await;
501 client.submit(transactions, epoch_store).await
502 }
503}
504
505pub struct ReplayWaiter {
507 consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
508}
509
510impl ReplayWaiter {
511 pub(crate) fn new(
512 consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
513 ) -> Self {
514 Self {
515 consumer_monitor_receiver,
516 }
517 }
518
519 pub(crate) async fn wait_for_replay(mut self) {
520 loop {
521 info!("Waiting for consensus to start replaying ...");
522 let Ok(monitor) = self.consumer_monitor_receiver.recv().await else {
523 continue;
524 };
525 info!("Waiting for consensus handler to finish replaying ...");
526 monitor
527 .replay_to_consumer_last_processed_commit_complete()
528 .await;
529 break;
530 }
531 }
532}
533
534impl Clone for ReplayWaiter {
535 fn clone(&self) -> Self {
536 Self {
537 consumer_monitor_receiver: self.consumer_monitor_receiver.resubscribe(),
538 }
539 }
540}
541
542pub struct ConsensusManagerMetrics {
543 start_latency: IntGauge,
544 shutdown_latency: IntGauge,
545}
546
547impl ConsensusManagerMetrics {
548 pub fn new(registry: &Registry) -> Self {
549 Self {
550 start_latency: register_int_gauge_with_registry!(
551 "consensus_manager_start_latency",
552 "The latency of starting up consensus nodes",
553 registry,
554 )
555 .unwrap(),
556 shutdown_latency: register_int_gauge_with_registry!(
557 "consensus_manager_shutdown_latency",
558 "The latency of shutting down consensus nodes",
559 registry,
560 )
561 .unwrap(),
562 }
563 }
564}