sui_core/consensus_manager/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
4use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient};
5use crate::consensus_handler::{ConsensusHandlerInitializer, MysticetiConsensusHandler};
6use crate::consensus_validator::SuiTxValidator;
7use crate::mysticeti_adapter::LazyMysticetiClient;
8use arc_swap::ArcSwapOption;
9use async_trait::async_trait;
10use consensus_config::{
11    ChainType, Committee, ConsensusProtocolConfig, NetworkKeyPair,
12    NetworkPublicKey as ConsensusNetworkPublicKey, Parameters, ProtocolKeyPair,
13};
14use consensus_core::{
15    Clock, CommitConsumerArgs, CommitConsumerMonitor, CommitIndex, ConsensusAuthority, NetworkType,
16};
17use core::panic;
18use fastcrypto::traits::KeyPair as _;
19use mysten_metrics::{RegistryID, RegistryService};
20use mysten_network::Multiaddr;
21use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
22use std::collections::BTreeMap;
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use sui_config::{ConsensusConfig, NodeConfig};
27use sui_network::endpoint_manager::ConsensusAddressUpdater;
28use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
29use sui_types::crypto::NetworkPublicKey;
30use sui_types::error::{SuiErrorKind, SuiResult};
31use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
32use sui_types::{
33    committee::EpochId, sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
34};
35use tokio::sync::{Mutex, broadcast};
36use tokio::time::{sleep, timeout};
37use tracing::{error, info};
38
39#[cfg(test)]
40#[path = "../unit_tests/consensus_manager_tests.rs"]
41pub mod consensus_manager_tests;
42
43#[derive(PartialEq)]
44enum Running {
45    True(EpochId, ProtocolVersion),
46    False,
47}
48
49/// Stores address updates that should be persisted across epoch changes.
50/// We store the consensus NetworkPublicKey to avoid repeated conversions.
51struct AddressOverridesMap {
52    // We store the AddressSource on a BTreeMap as it helps accessing the keys in priority order.
53    map: BTreeMap<
54        ConsensusNetworkPublicKey,
55        BTreeMap<sui_network::endpoint_manager::AddressSource, Vec<Multiaddr>>,
56    >,
57}
58
59impl AddressOverridesMap {
60    pub fn new() -> Self {
61        Self {
62            map: BTreeMap::new(),
63        }
64    }
65
66    pub fn insert(
67        &mut self,
68        network_pubkey: ConsensusNetworkPublicKey,
69        source: sui_network::endpoint_manager::AddressSource,
70        addresses: Vec<Multiaddr>,
71    ) {
72        self.map
73            .entry(network_pubkey)
74            .or_default()
75            .insert(source, addresses);
76    }
77
78    pub fn remove(
79        &mut self,
80        network_pubkey: ConsensusNetworkPublicKey,
81        source: sui_network::endpoint_manager::AddressSource,
82    ) {
83        self.map
84            .entry(network_pubkey.clone())
85            .or_default()
86            .remove(&source);
87
88        // If no sources remain for this peer, remove the peer entry entirely
89        if self.map.get(&network_pubkey.clone()).unwrap().is_empty() {
90            self.map.remove(&network_pubkey);
91        }
92    }
93
94    pub fn get_highest_priority_address(
95        &self,
96        network_pubkey: ConsensusNetworkPublicKey,
97    ) -> Option<Multiaddr> {
98        self.map
99            .get(&network_pubkey)
100            .and_then(|sources| sources.first_key_value())
101            .and_then(|(_, addresses)| addresses.first().cloned())
102    }
103
104    pub fn get_all_highest_priority_addresses(
105        &self,
106    ) -> Vec<(ConsensusNetworkPublicKey, Multiaddr)> {
107        let mut result = Vec::new();
108
109        for (network_pubkey, sources) in self.map.iter() {
110            if let Some((_source, addresses)) = sources.first_key_value()
111                && let Some(address) = addresses.first()
112            {
113                result.push((network_pubkey.clone(), address.clone()));
114            }
115        }
116        result
117    }
118}
119
120fn to_consensus_protocol_config(config: &ProtocolConfig, chain: Chain) -> ConsensusProtocolConfig {
121    let chain_type = match chain {
122        Chain::Mainnet => ChainType::Mainnet,
123        Chain::Testnet => ChainType::Testnet,
124        Chain::Unknown => ChainType::Unknown,
125    };
126    ConsensusProtocolConfig::new(
127        config.version.as_u64(),
128        chain_type,
129        config.max_transaction_size_bytes(),
130        config.max_transactions_in_block_bytes(),
131        config.max_num_transactions_in_block(),
132        config.gc_depth(),
133        /* transaction_voting_enabled */ true,
134        config.mysticeti_num_leaders_per_round(),
135        config.consensus_bad_nodes_stake_threshold(),
136    )
137}
138
139/// Used by Sui validator to start consensus protocol for each epoch.
140pub struct ConsensusManager {
141    consensus_config: ConsensusConfig,
142    protocol_keypair: ProtocolKeyPair,
143    network_keypair: NetworkKeyPair,
144    storage_base_path: PathBuf,
145    metrics: Arc<ConsensusManagerMetrics>,
146    registry_service: RegistryService,
147    authority: ArcSwapOption<(ConsensusAuthority, RegistryID)>,
148
149    // Use a shared lazy Mysticeti client so we can update the internal Mysticeti
150    // client that gets created for every new epoch.
151    client: Arc<LazyMysticetiClient>,
152    consensus_client: Arc<UpdatableConsensusClient>,
153
154    consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
155
156    #[cfg(test)]
157    pub(crate) consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
158    #[cfg(not(test))]
159    consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
160    consumer_monitor_sender: broadcast::Sender<Arc<CommitConsumerMonitor>>,
161
162    running: Mutex<Running>,
163
164    #[cfg(test)]
165    pub(crate) boot_counter: Mutex<u64>,
166    #[cfg(not(test))]
167    boot_counter: Mutex<u64>,
168
169    // Persistent storage for address updates across epoch changes.
170    // Keyed by NetworkPublicKey and then by AddressSource.
171    address_overrides: Mutex<AddressOverridesMap>,
172}
173
174impl ConsensusManager {
175    pub fn new(
176        node_config: &NodeConfig,
177        consensus_config: &ConsensusConfig,
178        registry_service: &RegistryService,
179        consensus_client: Arc<UpdatableConsensusClient>,
180    ) -> Self {
181        let metrics = Arc::new(ConsensusManagerMetrics::new(
182            &registry_service.default_registry(),
183        ));
184        let client = Arc::new(LazyMysticetiClient::new());
185        let (consumer_monitor_sender, _) = broadcast::channel(1);
186        Self {
187            consensus_config: consensus_config.clone(),
188            protocol_keypair: ProtocolKeyPair::new(node_config.worker_key_pair().copy()),
189            network_keypair: NetworkKeyPair::new(node_config.network_key_pair().copy()),
190            storage_base_path: consensus_config.db_path().to_path_buf(),
191            metrics,
192            registry_service: registry_service.clone(),
193            authority: ArcSwapOption::empty(),
194            client,
195            consensus_client,
196            consensus_handler: Mutex::new(None),
197            consumer_monitor: ArcSwapOption::empty(),
198            consumer_monitor_sender,
199            running: Mutex::new(Running::False),
200            boot_counter: Mutex::new(0),
201            address_overrides: Mutex::new(AddressOverridesMap::new()),
202        }
203    }
204
205    pub async fn start(
206        &self,
207        node_config: &NodeConfig,
208        epoch_store: Arc<AuthorityPerEpochStore>,
209        consensus_handler_initializer: ConsensusHandlerInitializer,
210        tx_validator: SuiTxValidator,
211    ) {
212        let system_state = epoch_store.epoch_start_state();
213        let committee: Committee = system_state.get_consensus_committee();
214        let epoch = epoch_store.epoch();
215        let protocol_config = epoch_store.protocol_config();
216
217        // Ensure start() is not called twice.
218        let start_time = Instant::now();
219        let mut running = self.running.lock().await;
220        if let Running::True(running_epoch, running_version) = *running {
221            error!(
222                "Consensus is already Running for epoch {running_epoch:?} & protocol version {running_version:?} - shutdown first before starting",
223            );
224            return;
225        }
226        *running = Running::True(epoch, protocol_config.version);
227
228        info!(
229            "Starting up consensus for epoch {epoch:?} & protocol version {:?}",
230            protocol_config.version
231        );
232
233        self.consensus_client.set(self.client.clone());
234
235        let consensus_config = node_config
236            .consensus_config()
237            .expect("consensus_config should exist");
238
239        let parameters = Parameters {
240            db_path: self.get_store_path(epoch),
241            listen_address_override: consensus_config.listen_address.clone(),
242            ..consensus_config.parameters.clone().unwrap_or_default()
243        };
244
245        let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
246
247        let consensus_handler = consensus_handler_initializer.new_consensus_handler();
248
249        let num_prior_commits = protocol_config.consensus_num_requested_prior_commits_at_startup();
250        let last_processed_commit_index =
251            consensus_handler.last_processed_subdag_index() as CommitIndex;
252        let replay_after_commit_index =
253            last_processed_commit_index.saturating_sub(num_prior_commits);
254
255        let (commit_consumer, commit_receiver) =
256            CommitConsumerArgs::new(replay_after_commit_index, last_processed_commit_index);
257        let monitor = commit_consumer.monitor();
258
259        // Spin up the new Mysticeti consensus handler to listen for committed sub dags, before starting authority.
260        let handler = MysticetiConsensusHandler::new(
261            last_processed_commit_index,
262            consensus_handler,
263            commit_receiver,
264            monitor.clone(),
265        );
266        let mut consensus_handler = self.consensus_handler.lock().await;
267        *consensus_handler = Some(handler);
268
269        // If there is a previous consumer monitor, it indicates that the consensus engine has been restarted, due to an epoch change. However, that on its
270        // own doesn't tell us much whether it participated on an active epoch or an old one. We need to check if it has handled any commits to determine this.
271        // If indeed any commits did happen, then we assume that node did participate on previous run.
272        let participated_on_previous_run =
273            if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
274                previous_monitor.highest_handled_commit() > 0
275            } else {
276                false
277            };
278
279        // Increment the boot counter only if the consensus successfully participated in the previous run.
280        // This is typical during normal epoch changes, where the node restarts as expected, and the boot counter is incremented to prevent amnesia recovery on the next start.
281        // If the node is recovering from a restore process and catching up across multiple epochs, it won't handle any commits until it reaches the last active epoch.
282        // In this scenario, we do not increment the boot counter, as we need amnesia recovery to run.
283        let mut boot_counter = self.boot_counter.lock().await;
284        if participated_on_previous_run {
285            *boot_counter += 1;
286        } else {
287            info!(
288                "Node has not participated in previous epoch consensus. Boot counter ({}) will not increment.",
289                *boot_counter
290            );
291        }
292
293        let authority = ConsensusAuthority::start(
294            NetworkType::Tonic,
295            epoch_store.epoch_start_config().epoch_start_timestamp_ms(),
296            committee.clone(),
297            parameters.clone(),
298            to_consensus_protocol_config(protocol_config, epoch_store.get_chain()),
299            Some(self.protocol_keypair.clone()),
300            self.network_keypair.clone(),
301            Arc::new(Clock::default()),
302            Arc::new(tx_validator.clone()),
303            commit_consumer,
304            registry.clone(),
305            *boot_counter,
306        )
307        .await;
308        let client = authority.transaction_client();
309
310        let registry_id = self.registry_service.add(registry.clone());
311
312        let registered_authority = Arc::new((authority, registry_id));
313        self.authority.swap(Some(registered_authority.clone()));
314
315        // Reapply all stored address updates to the new consensus instance.
316        let address_overrides = self.address_overrides.lock().await;
317        let highest_priority_addresses = address_overrides.get_all_highest_priority_addresses();
318        for (network_pubkey, address) in highest_priority_addresses {
319            registered_authority
320                .0
321                .update_peer_address(network_pubkey, Some(address.clone()));
322        }
323
324        // Initialize the client to send transactions to this Mysticeti instance.
325        self.client.set(client);
326
327        // Send the consumer monitor to the replay waiter.
328        let _ = self.consumer_monitor_sender.send(monitor);
329
330        let elapsed = start_time.elapsed().as_secs_f64();
331        self.metrics.start_latency.set(elapsed as i64);
332
333        tracing::info!(
334            "Started consensus for epoch {} & protocol version {:?} completed - took {} seconds",
335            epoch,
336            protocol_config.version,
337            elapsed
338        );
339    }
340
341    pub async fn shutdown(&self) {
342        info!("Shutting down consensus ...");
343
344        // Ensure shutdown() is called on a running consensus and get the epoch/version info.
345        let start_time = Instant::now();
346        let mut running = self.running.lock().await;
347        let (shutdown_epoch, shutdown_version) = match *running {
348            Running::True(epoch, version) => {
349                tracing::info!(
350                    "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
351                );
352                *running = Running::False;
353                (epoch, version)
354            }
355            Running::False => {
356                error!("Consensus shutdown was called but consensus is not running");
357                return;
358            }
359        };
360
361        // Stop consensus submissions.
362        self.client.clear();
363
364        // swap with empty to ensure there is no other reference to authority and we can safely do Arc unwrap
365        let r = self.authority.swap(None).unwrap();
366        let Ok((authority, registry_id)) = Arc::try_unwrap(r) else {
367            panic!("Failed to retrieve the Mysticeti authority");
368        };
369
370        // shutdown the authority and wait for it
371        authority.stop().await;
372
373        // drop the old consensus handler to force stop any underlying task running.
374        let mut consensus_handler = self.consensus_handler.lock().await;
375        if let Some(mut handler) = consensus_handler.take() {
376            handler.abort().await;
377        }
378
379        // unregister the registry id
380        self.registry_service.remove(registry_id);
381
382        self.consensus_client.clear();
383
384        let elapsed = start_time.elapsed().as_secs_f64();
385        self.metrics.shutdown_latency.set(elapsed as i64);
386
387        tracing::info!(
388            "Consensus stopped for epoch {shutdown_epoch:?} & protocol version {shutdown_version:?} is complete - took {} seconds",
389            elapsed
390        );
391    }
392
393    pub async fn is_running(&self) -> bool {
394        let running = self.running.lock().await;
395        matches!(*running, Running::True(_, _))
396    }
397
398    pub fn replay_waiter(&self) -> ReplayWaiter {
399        let consumer_monitor_receiver = self.consumer_monitor_sender.subscribe();
400        ReplayWaiter::new(consumer_monitor_receiver)
401    }
402
403    pub fn get_storage_base_path(&self) -> PathBuf {
404        self.consensus_config.db_path().to_path_buf()
405    }
406
407    fn get_store_path(&self, epoch: EpochId) -> PathBuf {
408        let mut store_path = self.storage_base_path.clone();
409        store_path.push(format!("{}", epoch));
410        store_path
411    }
412}
413
414// Implementing the interface so we can update the consensus peer addresses when requested.
415impl ConsensusAddressUpdater for ConsensusManager {
416    fn update_address(
417        &self,
418        network_pubkey: NetworkPublicKey,
419        source: sui_network::endpoint_manager::AddressSource,
420        addresses: Vec<Multiaddr>,
421    ) -> SuiResult<()> {
422        // Convert to consensus network public key once
423        let network_pubkey = ConsensusNetworkPublicKey::new(network_pubkey.clone());
424
425        // Determine what address should be used after this update (if any)
426        let address_to_apply = {
427            let mut address_overrides = self.address_overrides.blocking_lock();
428
429            if addresses.is_empty() {
430                address_overrides.remove(network_pubkey.clone(), source);
431            } else {
432                address_overrides.insert(network_pubkey.clone(), source, addresses.clone());
433            }
434
435            address_overrides.get_highest_priority_address(network_pubkey.clone())
436        };
437
438        // Apply the update to running consensus if it exists
439        if let Some(authority) = self.authority.load_full() {
440            authority
441                .0
442                .update_peer_address(network_pubkey, address_to_apply);
443            Ok(())
444        } else {
445            info!(
446                "Consensus authority node is not running, address update persisted for peer {:?} from source {:?} and will be applied on next start",
447                network_pubkey, source
448            );
449            Err(SuiErrorKind::GenericAuthorityError {
450                error: "Consensus authority node is not running. Can not apply address update"
451                    .to_string(),
452            }
453            .into())
454        }
455    }
456}
457
458/// A ConsensusClient that can be updated internally at any time. This usually happening during epoch
459/// change where a client is set after the new consensus is started for the new epoch.
460#[derive(Default)]
461pub struct UpdatableConsensusClient {
462    // An extra layer of Arc<> is needed as required by ArcSwapAny.
463    client: ArcSwapOption<Arc<dyn ConsensusClient>>,
464}
465
466impl UpdatableConsensusClient {
467    pub fn new() -> Self {
468        Self {
469            client: ArcSwapOption::empty(),
470        }
471    }
472
473    async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
474        const START_TIMEOUT: Duration = Duration::from_secs(300);
475        const RETRY_INTERVAL: Duration = Duration::from_millis(100);
476        if let Ok(client) = timeout(START_TIMEOUT, async {
477            loop {
478                let Some(client) = self.client.load_full() else {
479                    sleep(RETRY_INTERVAL).await;
480                    continue;
481                };
482                return client;
483            }
484        })
485        .await
486        {
487            return client;
488        }
489
490        panic!(
491            "Timed out after {:?} waiting for Consensus to start!",
492            START_TIMEOUT,
493        );
494    }
495
496    pub fn set(&self, client: Arc<dyn ConsensusClient>) {
497        self.client.store(Some(Arc::new(client)));
498    }
499
500    pub fn clear(&self) {
501        self.client.store(None);
502    }
503}
504
505#[async_trait]
506impl ConsensusClient for UpdatableConsensusClient {
507    async fn submit(
508        &self,
509        transactions: &[ConsensusTransaction],
510        epoch_store: &Arc<AuthorityPerEpochStore>,
511    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
512        let client = self.get().await;
513        client.submit(transactions, epoch_store).await
514    }
515}
516
517/// Waits for consensus to finish replaying at consensus handler.
518pub struct ReplayWaiter {
519    consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
520}
521
522impl ReplayWaiter {
523    pub(crate) fn new(
524        consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
525    ) -> Self {
526        Self {
527            consumer_monitor_receiver,
528        }
529    }
530
531    pub(crate) async fn wait_for_replay(mut self) {
532        loop {
533            info!("Waiting for consensus to start replaying ...");
534            let Ok(monitor) = self.consumer_monitor_receiver.recv().await else {
535                continue;
536            };
537            info!("Waiting for consensus handler to finish replaying ...");
538            monitor
539                .replay_to_consumer_last_processed_commit_complete()
540                .await;
541            break;
542        }
543    }
544}
545
546impl Clone for ReplayWaiter {
547    fn clone(&self) -> Self {
548        Self {
549            consumer_monitor_receiver: self.consumer_monitor_receiver.resubscribe(),
550        }
551    }
552}
553
554pub struct ConsensusManagerMetrics {
555    start_latency: IntGauge,
556    shutdown_latency: IntGauge,
557}
558
559impl ConsensusManagerMetrics {
560    pub fn new(registry: &Registry) -> Self {
561        Self {
562            start_latency: register_int_gauge_with_registry!(
563                "consensus_manager_start_latency",
564                "The latency of starting up consensus nodes",
565                registry,
566            )
567            .unwrap(),
568            shutdown_latency: register_int_gauge_with_registry!(
569                "consensus_manager_shutdown_latency",
570                "The latency of shutting down consensus nodes",
571                registry,
572            )
573            .unwrap(),
574        }
575    }
576}