sui_core/consensus_manager/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
4use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient};
5use crate::consensus_handler::{ConsensusHandlerInitializer, MysticetiConsensusHandler};
6use crate::consensus_validator::SuiTxValidator;
7use crate::mysticeti_adapter::LazyMysticetiClient;
8use arc_swap::ArcSwapOption;
9use async_trait::async_trait;
10use consensus_config::{
11    ChainType, Committee, ConsensusProtocolConfig, NetworkKeyPair,
12    NetworkPublicKey as ConsensusNetworkPublicKey, Parameters, ProtocolKeyPair, Stake,
13};
14use consensus_core::{
15    Clock, CommitConsumerArgs, CommitConsumerMonitor, CommitIndex, ConsensusAuthority, NetworkType,
16    RandomnessSignatureHandler,
17};
18use core::panic;
19use fastcrypto::traits::KeyPair as _;
20use mysten_metrics::{RegistryID, RegistryService};
21use mysten_network::Multiaddr;
22use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
23use std::collections::BTreeMap;
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use sui_config::{ConsensusConfig, NodeConfig};
28use sui_network::endpoint_manager::ConsensusAddressUpdater;
29use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
30use sui_types::crypto::NetworkPublicKey;
31use sui_types::error::{SuiErrorKind, SuiResult};
32use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
33use sui_types::node_role::NodeRole;
34use sui_types::{
35    committee::EpochId, sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
36};
37use tokio::sync::{Mutex, broadcast};
38use tokio::time::{sleep, timeout};
39use tracing::{error, info};
40
41#[cfg(test)]
42#[path = "../unit_tests/consensus_manager_tests.rs"]
43pub mod consensus_manager_tests;
44
45#[derive(PartialEq)]
46enum Running {
47    True(EpochId, ProtocolVersion),
48    False,
49}
50
51/// Stores address updates that should be persisted across epoch changes.
52/// We store the consensus NetworkPublicKey to avoid repeated conversions.
53struct AddressOverridesMap {
54    // We store the AddressSource on a BTreeMap as it helps accessing the keys in priority order.
55    map: BTreeMap<
56        ConsensusNetworkPublicKey,
57        BTreeMap<sui_network::endpoint_manager::AddressSource, Vec<Multiaddr>>,
58    >,
59}
60
61impl AddressOverridesMap {
62    pub fn new() -> Self {
63        Self {
64            map: BTreeMap::new(),
65        }
66    }
67
68    pub fn insert(
69        &mut self,
70        network_pubkey: ConsensusNetworkPublicKey,
71        source: sui_network::endpoint_manager::AddressSource,
72        addresses: Vec<Multiaddr>,
73    ) {
74        self.map
75            .entry(network_pubkey)
76            .or_default()
77            .insert(source, addresses);
78    }
79
80    pub fn remove(
81        &mut self,
82        network_pubkey: ConsensusNetworkPublicKey,
83        source: sui_network::endpoint_manager::AddressSource,
84    ) {
85        self.map
86            .entry(network_pubkey.clone())
87            .or_default()
88            .remove(&source);
89
90        // If no sources remain for this peer, remove the peer entry entirely
91        if self.map.get(&network_pubkey.clone()).unwrap().is_empty() {
92            self.map.remove(&network_pubkey);
93        }
94    }
95
96    pub fn get_highest_priority_address(
97        &self,
98        network_pubkey: ConsensusNetworkPublicKey,
99    ) -> Option<Multiaddr> {
100        self.map
101            .get(&network_pubkey)
102            .and_then(|sources| sources.first_key_value())
103            .and_then(|(_, addresses)| addresses.first().cloned())
104    }
105
106    pub fn get_all_highest_priority_addresses(
107        &self,
108    ) -> Vec<(ConsensusNetworkPublicKey, Multiaddr)> {
109        let mut result = Vec::new();
110
111        for (network_pubkey, sources) in self.map.iter() {
112            if let Some((_source, addresses)) = sources.first_key_value()
113                && let Some(address) = addresses.first()
114            {
115                result.push((network_pubkey.clone(), address.clone()));
116            }
117        }
118        result
119    }
120}
121
122/// Rebuilds the consensus `Committee` with Mysticeti v3 threshold parameters.
123/// `malicious_stake` and `crash_stake` come from env vars with reference-budget
124/// defaults (`f = c = 1250`); the nominal `threshold_total_stake = 5f + 3c + 1`
125/// is derived inside `Committee::new_v3`. This is a temporary iteration knob
126/// until the thresholds are promoted into `ProtocolConfig`.
127fn apply_v3_threshold_overrides(committee: Committee) -> Committee {
128    let malicious_stake: Stake = std::env::var("SUI_CONSENSUS_V3_MALICIOUS_STAKE")
129        .ok()
130        .and_then(|s| s.parse().ok())
131        .unwrap_or(1_250);
132    let crash_stake: Stake = std::env::var("SUI_CONSENSUS_V3_CRASH_STAKE")
133        .ok()
134        .and_then(|s| s.parse().ok())
135        .unwrap_or(1_250);
136    info!(
137        "consensus_manager: applying v3 committee thresholds \
138         (malicious_stake={malicious_stake}, crash_stake={crash_stake})"
139    );
140    Committee::new_v3(
141        committee.epoch(),
142        committee.authorities_slice().to_vec(),
143        malicious_stake,
144        crash_stake,
145    )
146}
147
148fn to_consensus_protocol_config(config: &ProtocolConfig, chain: Chain) -> ConsensusProtocolConfig {
149    let chain_type = match chain {
150        Chain::Mainnet => ChainType::Mainnet,
151        Chain::Testnet => ChainType::Testnet,
152        Chain::Unknown => ChainType::Unknown,
153    };
154    ConsensusProtocolConfig::new(
155        config.version.as_u64(),
156        chain_type,
157        config.max_transaction_size_bytes(),
158        config.max_transactions_in_block_bytes(),
159        config.max_num_transactions_in_block(),
160        config.gc_depth(),
161        /* transaction_voting_enabled */ true,
162        config.mysticeti_num_leaders_per_round(),
163        config.consensus_bad_nodes_stake_threshold(),
164        /* enable_v3 */ false,
165        /* leader_schedule_window_size */ 300,
166        /* leader_schedule_update_interval */ 12,
167    )
168}
169
170/// Used by Sui to start consensus protocol for each epoch.
171/// Supports both validator mode (with protocol keypair) and observer mode (without).
172pub struct ConsensusManager {
173    consensus_config: ConsensusConfig,
174    protocol_keypair: Option<ProtocolKeyPair>,
175    network_keypair: NetworkKeyPair,
176    storage_base_path: PathBuf,
177    metrics: Arc<ConsensusManagerMetrics>,
178    registry_service: RegistryService,
179    authority: ArcSwapOption<(ConsensusAuthority, RegistryID)>,
180
181    // Use a shared lazy Mysticeti client so we can update the internal Mysticeti
182    // client that gets created for every new epoch.
183    client: Arc<LazyMysticetiClient>,
184    consensus_client: Arc<UpdatableConsensusClient>,
185
186    consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
187
188    #[cfg(test)]
189    pub(crate) consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
190    #[cfg(not(test))]
191    consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
192    consumer_monitor_sender: broadcast::Sender<Arc<CommitConsumerMonitor>>,
193
194    running: Mutex<Running>,
195
196    #[cfg(test)]
197    pub(crate) boot_counter: Mutex<u64>,
198    #[cfg(not(test))]
199    boot_counter: Mutex<u64>,
200
201    // Persistent storage for address updates across epoch changes.
202    // Keyed by NetworkPublicKey and then by AddressSource.
203    address_overrides: parking_lot::Mutex<AddressOverridesMap>,
204}
205
206impl ConsensusManager {
207    pub fn new(
208        node_config: &NodeConfig,
209        consensus_config: &ConsensusConfig,
210        registry_service: &RegistryService,
211        consensus_client: Arc<UpdatableConsensusClient>,
212        node_role: NodeRole,
213    ) -> Self {
214        let metrics = Arc::new(ConsensusManagerMetrics::new(
215            &registry_service.default_registry(),
216        ));
217        let client = Arc::new(LazyMysticetiClient::new());
218        let (consumer_monitor_sender, _) = broadcast::channel(1);
219        let protocol_keypair = if node_role.is_validator() {
220            Some(ProtocolKeyPair::new(node_config.worker_key_pair().copy()))
221        } else {
222            None
223        };
224        Self {
225            consensus_config: consensus_config.clone(),
226            protocol_keypair,
227            network_keypair: NetworkKeyPair::new(node_config.network_key_pair().copy()),
228            storage_base_path: consensus_config.db_path().to_path_buf(),
229            metrics,
230            registry_service: registry_service.clone(),
231            authority: ArcSwapOption::empty(),
232            client,
233            consensus_client,
234            consensus_handler: Mutex::new(None),
235            consumer_monitor: ArcSwapOption::empty(),
236            consumer_monitor_sender,
237            running: Mutex::new(Running::False),
238            boot_counter: Mutex::new(0),
239            address_overrides: parking_lot::Mutex::new(AddressOverridesMap::new()),
240        }
241    }
242
243    pub async fn start(
244        &self,
245        node_config: &NodeConfig,
246        epoch_store: Arc<AuthorityPerEpochStore>,
247        consensus_handler_initializer: ConsensusHandlerInitializer,
248        tx_validator: SuiTxValidator,
249        randomness_signature_handler: Option<Arc<dyn RandomnessSignatureHandler>>,
250    ) {
251        let epoch = epoch_store.epoch();
252        let protocol_config = epoch_store.protocol_config();
253        let consensus_protocol_config =
254            to_consensus_protocol_config(protocol_config, epoch_store.get_chain());
255        let system_state = epoch_store.epoch_start_state();
256        let committee = if consensus_protocol_config.enable_v3() {
257            apply_v3_threshold_overrides(system_state.get_consensus_committee())
258        } else {
259            system_state.get_consensus_committee()
260        };
261
262        // Ensure start() is not called twice.
263        let start_time = Instant::now();
264        let mut running = self.running.lock().await;
265        if let Running::True(running_epoch, running_version) = *running {
266            error!(
267                "Consensus is already Running for epoch {running_epoch:?} & protocol version {running_version:?} - shutdown first before starting",
268            );
269            return;
270        }
271        *running = Running::True(epoch, protocol_config.version);
272
273        info!(
274            "Starting up consensus for epoch {epoch:?} & protocol version {:?}",
275            protocol_config.version
276        );
277
278        self.consensus_client.set(self.client.clone());
279
280        let consensus_config = node_config
281            .consensus_config()
282            .expect("consensus_config should exist");
283
284        let parameters = Parameters {
285            db_path: self.get_store_path(epoch),
286            listen_address_override: consensus_config.listen_address.clone(),
287            ..consensus_config.parameters.clone().unwrap_or_default()
288        };
289
290        let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
291
292        let consensus_handler = consensus_handler_initializer.new_consensus_handler();
293
294        let num_prior_commits = protocol_config.consensus_num_requested_prior_commits_at_startup();
295        let last_processed_commit_index =
296            consensus_handler.last_processed_subdag_index() as CommitIndex;
297        let replay_after_commit_index =
298            last_processed_commit_index.saturating_sub(num_prior_commits);
299
300        let (commit_consumer, commit_receiver) =
301            CommitConsumerArgs::new(replay_after_commit_index, last_processed_commit_index);
302        let monitor = commit_consumer.monitor();
303
304        let node_role = epoch_store.node_role();
305
306        // Spin up the new Mysticeti consensus handler to listen for committed sub dags, before starting authority.
307        let handler = MysticetiConsensusHandler::new(
308            last_processed_commit_index,
309            consensus_handler,
310            commit_receiver,
311            monitor.clone(),
312            node_role,
313        );
314        let mut consensus_handler = self.consensus_handler.lock().await;
315        *consensus_handler = Some(handler);
316
317        // If there is a previous consumer monitor, it indicates that the consensus engine has been restarted, due to an epoch change. However, that on its
318        // own doesn't tell us much whether it participated on an active epoch or an old one. We need to check if it has handled any commits to determine this.
319        // If indeed any commits did happen, then we assume that node did participate on previous run.
320        let participated_on_previous_run =
321            if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
322                previous_monitor.highest_handled_commit() > 0
323            } else {
324                false
325            };
326
327        // Increment the boot counter only if the consensus successfully participated in the previous run.
328        // This is typical during normal epoch changes, where the node restarts as expected, and the boot counter is incremented to prevent amnesia recovery on the next start.
329        // If the node is recovering from a restore process and catching up across multiple epochs, it won't handle any commits until it reaches the last active epoch.
330        // In this scenario, we do not increment the boot counter, as we need amnesia recovery to run.
331        let mut boot_counter = self.boot_counter.lock().await;
332        if participated_on_previous_run {
333            *boot_counter += 1;
334        } else {
335            info!(
336                "Node has not participated in previous epoch consensus. Boot counter ({}) will not increment.",
337                *boot_counter
338            );
339        }
340
341        let authority = ConsensusAuthority::start(
342            NetworkType::Tonic,
343            epoch_store.epoch_start_config().epoch_start_timestamp_ms(),
344            committee.clone(),
345            parameters.clone(),
346            consensus_protocol_config,
347            self.protocol_keypair.clone(),
348            self.network_keypair.clone(),
349            Arc::new(Clock::default()),
350            Arc::new(tx_validator.clone()),
351            commit_consumer,
352            registry.clone(),
353            *boot_counter,
354            randomness_signature_handler,
355        )
356        .await;
357        let client = authority.transaction_client();
358
359        let registry_id = self.registry_service.add(registry.clone());
360
361        let registered_authority = Arc::new((authority, registry_id));
362        self.authority.swap(Some(registered_authority.clone()));
363
364        // Reapply all stored address updates to the new consensus instance.
365        let highest_priority_addresses = self
366            .address_overrides
367            .lock()
368            .get_all_highest_priority_addresses();
369        for (network_pubkey, address) in highest_priority_addresses {
370            registered_authority
371                .0
372                .update_peer_address(network_pubkey, Some(address.clone()));
373        }
374
375        // Initialize the client to send transactions to this Mysticeti instance.
376        self.client.set(client);
377
378        // Send the consumer monitor to the replay waiter.
379        let _ = self.consumer_monitor_sender.send(monitor);
380
381        let elapsed = start_time.elapsed().as_secs_f64();
382        self.metrics.start_latency.set(elapsed as i64);
383
384        tracing::info!(
385            "Started consensus for epoch {} & protocol version {:?} completed - took {} seconds",
386            epoch,
387            protocol_config.version,
388            elapsed
389        );
390    }
391
392    pub async fn shutdown(&self) {
393        info!("Shutting down consensus ...");
394
395        // Ensure shutdown() is called on a running consensus and get the epoch/version info.
396        let start_time = Instant::now();
397        let mut running = self.running.lock().await;
398        let (shutdown_epoch, shutdown_version) = match *running {
399            Running::True(epoch, version) => {
400                tracing::info!(
401                    "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
402                );
403                *running = Running::False;
404                (epoch, version)
405            }
406            Running::False => {
407                error!("Consensus shutdown was called but consensus is not running");
408                return;
409            }
410        };
411
412        // Stop consensus submissions.
413        self.client.clear();
414
415        // swap with empty to ensure there is no other reference to authority and we can safely do Arc unwrap
416        let r = self.authority.swap(None).unwrap();
417        let Ok((authority, registry_id)) = Arc::try_unwrap(r) else {
418            panic!("Failed to retrieve the Mysticeti authority");
419        };
420
421        // shutdown the authority and wait for it
422        authority.stop().await;
423
424        // drop the old consensus handler to force stop any underlying task running.
425        let mut consensus_handler = self.consensus_handler.lock().await;
426        if let Some(mut handler) = consensus_handler.take() {
427            handler.abort().await;
428        }
429
430        // unregister the registry id
431        self.registry_service.remove(registry_id);
432
433        self.consensus_client.clear();
434
435        let elapsed = start_time.elapsed().as_secs_f64();
436        self.metrics.shutdown_latency.set(elapsed as i64);
437
438        tracing::info!(
439            "Consensus stopped for epoch {shutdown_epoch:?} & protocol version {shutdown_version:?} is complete - took {} seconds",
440            elapsed
441        );
442    }
443
444    pub async fn is_running(&self) -> bool {
445        let running = self.running.lock().await;
446        matches!(*running, Running::True(_, _))
447    }
448
449    pub fn replay_waiter(&self) -> ReplayWaiter {
450        let consumer_monitor_receiver = self.consumer_monitor_sender.subscribe();
451        ReplayWaiter::new(consumer_monitor_receiver)
452    }
453
454    pub fn get_storage_base_path(&self) -> PathBuf {
455        self.consensus_config.db_path().to_path_buf()
456    }
457
458    fn get_store_path(&self, epoch: EpochId) -> PathBuf {
459        let mut store_path = self.storage_base_path.clone();
460        store_path.push(format!("{}", epoch));
461        store_path
462    }
463}
464
465// Implementing the interface so we can update the consensus peer addresses when requested.
466impl ConsensusAddressUpdater for ConsensusManager {
467    fn update_address(
468        &self,
469        network_pubkey: NetworkPublicKey,
470        source: sui_network::endpoint_manager::AddressSource,
471        addresses: Vec<Multiaddr>,
472    ) -> SuiResult<()> {
473        // Convert to consensus network public key once
474        let network_pubkey = ConsensusNetworkPublicKey::new(network_pubkey.clone());
475
476        // Determine what address should be used after this update (if any)
477        let address_to_apply = {
478            let mut address_overrides = self.address_overrides.lock();
479
480            if addresses.is_empty() {
481                address_overrides.remove(network_pubkey.clone(), source);
482            } else {
483                address_overrides.insert(network_pubkey.clone(), source, addresses.clone());
484            }
485
486            address_overrides.get_highest_priority_address(network_pubkey.clone())
487        };
488
489        // Apply the update to running consensus if it exists
490        if let Some(authority) = self.authority.load_full() {
491            authority
492                .0
493                .update_peer_address(network_pubkey, address_to_apply);
494            Ok(())
495        } else {
496            info!(
497                "Consensus authority node is not running, address update persisted for peer {:?} from source {:?} and will be applied on next start",
498                network_pubkey, source
499            );
500            Err(SuiErrorKind::GenericAuthorityError {
501                error: "Consensus authority node is not running. Can not apply address update"
502                    .to_string(),
503            }
504            .into())
505        }
506    }
507}
508
509/// A ConsensusClient that can be updated internally at any time. This usually happening during epoch
510/// change where a client is set after the new consensus is started for the new epoch.
511#[derive(Default)]
512pub struct UpdatableConsensusClient {
513    // An extra layer of Arc<> is needed as required by ArcSwapAny.
514    client: ArcSwapOption<Arc<dyn ConsensusClient>>,
515}
516
517impl UpdatableConsensusClient {
518    pub fn new() -> Self {
519        Self {
520            client: ArcSwapOption::empty(),
521        }
522    }
523
524    async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
525        const START_TIMEOUT: Duration = Duration::from_secs(300);
526        const RETRY_INTERVAL: Duration = Duration::from_millis(100);
527        if let Ok(client) = timeout(START_TIMEOUT, async {
528            loop {
529                let Some(client) = self.client.load_full() else {
530                    sleep(RETRY_INTERVAL).await;
531                    continue;
532                };
533                return client;
534            }
535        })
536        .await
537        {
538            return client;
539        }
540
541        panic!(
542            "Timed out after {:?} waiting for Consensus to start!",
543            START_TIMEOUT,
544        );
545    }
546
547    pub fn set(&self, client: Arc<dyn ConsensusClient>) {
548        self.client.store(Some(Arc::new(client)));
549    }
550
551    pub fn clear(&self) {
552        self.client.store(None);
553    }
554}
555
556#[async_trait]
557impl ConsensusClient for UpdatableConsensusClient {
558    async fn submit(
559        &self,
560        transactions: &[ConsensusTransaction],
561        epoch_store: &Arc<AuthorityPerEpochStore>,
562    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
563        let client = self.get().await;
564        client.submit(transactions, epoch_store).await
565    }
566}
567
568/// Waits for consensus to finish replaying at consensus handler.
569pub struct ReplayWaiter {
570    consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
571}
572
573impl ReplayWaiter {
574    pub(crate) fn new(
575        consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
576    ) -> Self {
577        Self {
578            consumer_monitor_receiver,
579        }
580    }
581
582    pub(crate) async fn wait_for_replay(mut self) {
583        loop {
584            info!("Waiting for consensus to start replaying ...");
585            let Ok(monitor) = self.consumer_monitor_receiver.recv().await else {
586                continue;
587            };
588            info!("Waiting for consensus handler to finish replaying ...");
589            monitor
590                .replay_to_consumer_last_processed_commit_complete()
591                .await;
592            break;
593        }
594    }
595}
596
597impl Clone for ReplayWaiter {
598    fn clone(&self) -> Self {
599        Self {
600            consumer_monitor_receiver: self.consumer_monitor_receiver.resubscribe(),
601        }
602    }
603}
604
605pub struct ConsensusManagerMetrics {
606    start_latency: IntGauge,
607    shutdown_latency: IntGauge,
608}
609
610impl ConsensusManagerMetrics {
611    pub fn new(registry: &Registry) -> Self {
612        Self {
613            start_latency: register_int_gauge_with_registry!(
614                "consensus_manager_start_latency",
615                "The latency of starting up consensus nodes",
616                registry,
617            )
618            .unwrap(),
619            shutdown_latency: register_int_gauge_with_registry!(
620                "consensus_manager_shutdown_latency",
621                "The latency of shutting down consensus nodes",
622                registry,
623            )
624            .unwrap(),
625        }
626    }
627}