sui_core/consensus_manager/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
4use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient};
5use crate::consensus_handler::{ConsensusHandlerInitializer, MysticetiConsensusHandler};
6use crate::consensus_validator::SuiTxValidator;
7use crate::mysticeti_adapter::LazyMysticetiClient;
8use arc_swap::ArcSwapOption;
9use async_trait::async_trait;
10use consensus_config::{
11    ChainType, Committee, ConsensusProtocolConfig, NetworkKeyPair,
12    NetworkPublicKey as ConsensusNetworkPublicKey, Parameters, ProtocolKeyPair,
13};
14use consensus_core::{
15    Clock, CommitConsumerArgs, CommitConsumerMonitor, CommitIndex, ConsensusAuthority, NetworkType,
16};
17use core::panic;
18use fastcrypto::traits::KeyPair as _;
19use mysten_metrics::{RegistryID, RegistryService};
20use mysten_network::Multiaddr;
21use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
22use std::collections::BTreeMap;
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use sui_config::{ConsensusConfig, NodeConfig};
27use sui_network::endpoint_manager::ConsensusAddressUpdater;
28use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
29use sui_types::crypto::NetworkPublicKey;
30use sui_types::error::{SuiErrorKind, SuiResult};
31use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
32use sui_types::node_role::NodeRole;
33use sui_types::{
34    committee::EpochId, sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
35};
36use tokio::sync::{Mutex, broadcast};
37use tokio::time::{sleep, timeout};
38use tracing::{error, info};
39
40#[cfg(test)]
41#[path = "../unit_tests/consensus_manager_tests.rs"]
42pub mod consensus_manager_tests;
43
44#[derive(PartialEq)]
45enum Running {
46    True(EpochId, ProtocolVersion),
47    False,
48}
49
50/// Stores address updates that should be persisted across epoch changes.
51/// We store the consensus NetworkPublicKey to avoid repeated conversions.
52struct AddressOverridesMap {
53    // We store the AddressSource on a BTreeMap as it helps accessing the keys in priority order.
54    map: BTreeMap<
55        ConsensusNetworkPublicKey,
56        BTreeMap<sui_network::endpoint_manager::AddressSource, Vec<Multiaddr>>,
57    >,
58}
59
60impl AddressOverridesMap {
61    pub fn new() -> Self {
62        Self {
63            map: BTreeMap::new(),
64        }
65    }
66
67    pub fn insert(
68        &mut self,
69        network_pubkey: ConsensusNetworkPublicKey,
70        source: sui_network::endpoint_manager::AddressSource,
71        addresses: Vec<Multiaddr>,
72    ) {
73        self.map
74            .entry(network_pubkey)
75            .or_default()
76            .insert(source, addresses);
77    }
78
79    pub fn remove(
80        &mut self,
81        network_pubkey: ConsensusNetworkPublicKey,
82        source: sui_network::endpoint_manager::AddressSource,
83    ) {
84        self.map
85            .entry(network_pubkey.clone())
86            .or_default()
87            .remove(&source);
88
89        // If no sources remain for this peer, remove the peer entry entirely
90        if self.map.get(&network_pubkey.clone()).unwrap().is_empty() {
91            self.map.remove(&network_pubkey);
92        }
93    }
94
95    pub fn get_highest_priority_address(
96        &self,
97        network_pubkey: ConsensusNetworkPublicKey,
98    ) -> Option<Multiaddr> {
99        self.map
100            .get(&network_pubkey)
101            .and_then(|sources| sources.first_key_value())
102            .and_then(|(_, addresses)| addresses.first().cloned())
103    }
104
105    pub fn get_all_highest_priority_addresses(
106        &self,
107    ) -> Vec<(ConsensusNetworkPublicKey, Multiaddr)> {
108        let mut result = Vec::new();
109
110        for (network_pubkey, sources) in self.map.iter() {
111            if let Some((_source, addresses)) = sources.first_key_value()
112                && let Some(address) = addresses.first()
113            {
114                result.push((network_pubkey.clone(), address.clone()));
115            }
116        }
117        result
118    }
119}
120
121fn to_consensus_protocol_config(config: &ProtocolConfig, chain: Chain) -> ConsensusProtocolConfig {
122    let chain_type = match chain {
123        Chain::Mainnet => ChainType::Mainnet,
124        Chain::Testnet => ChainType::Testnet,
125        Chain::Unknown => ChainType::Unknown,
126    };
127    ConsensusProtocolConfig::new(
128        config.version.as_u64(),
129        chain_type,
130        config.max_transaction_size_bytes(),
131        config.max_transactions_in_block_bytes(),
132        config.max_num_transactions_in_block(),
133        config.gc_depth(),
134        /* transaction_voting_enabled */ true,
135        config.mysticeti_num_leaders_per_round(),
136        config.consensus_bad_nodes_stake_threshold(),
137        /* enable_v3 */ false,
138    )
139}
140
141/// Used by Sui to start consensus protocol for each epoch.
142/// Supports both validator mode (with protocol keypair) and observer mode (without).
143pub struct ConsensusManager {
144    consensus_config: ConsensusConfig,
145    protocol_keypair: Option<ProtocolKeyPair>,
146    network_keypair: NetworkKeyPair,
147    storage_base_path: PathBuf,
148    metrics: Arc<ConsensusManagerMetrics>,
149    registry_service: RegistryService,
150    authority: ArcSwapOption<(ConsensusAuthority, RegistryID)>,
151
152    // Use a shared lazy Mysticeti client so we can update the internal Mysticeti
153    // client that gets created for every new epoch.
154    client: Arc<LazyMysticetiClient>,
155    consensus_client: Arc<UpdatableConsensusClient>,
156
157    consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
158
159    #[cfg(test)]
160    pub(crate) consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
161    #[cfg(not(test))]
162    consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
163    consumer_monitor_sender: broadcast::Sender<Arc<CommitConsumerMonitor>>,
164
165    running: Mutex<Running>,
166
167    #[cfg(test)]
168    pub(crate) boot_counter: Mutex<u64>,
169    #[cfg(not(test))]
170    boot_counter: Mutex<u64>,
171
172    // Persistent storage for address updates across epoch changes.
173    // Keyed by NetworkPublicKey and then by AddressSource.
174    address_overrides: parking_lot::Mutex<AddressOverridesMap>,
175}
176
177impl ConsensusManager {
178    pub fn new(
179        node_config: &NodeConfig,
180        consensus_config: &ConsensusConfig,
181        registry_service: &RegistryService,
182        consensus_client: Arc<UpdatableConsensusClient>,
183        node_role: NodeRole,
184    ) -> Self {
185        let metrics = Arc::new(ConsensusManagerMetrics::new(
186            &registry_service.default_registry(),
187        ));
188        let client = Arc::new(LazyMysticetiClient::new());
189        let (consumer_monitor_sender, _) = broadcast::channel(1);
190        let protocol_keypair = if node_role.is_validator() {
191            Some(ProtocolKeyPair::new(node_config.worker_key_pair().copy()))
192        } else {
193            None
194        };
195        Self {
196            consensus_config: consensus_config.clone(),
197            protocol_keypair,
198            network_keypair: NetworkKeyPair::new(node_config.network_key_pair().copy()),
199            storage_base_path: consensus_config.db_path().to_path_buf(),
200            metrics,
201            registry_service: registry_service.clone(),
202            authority: ArcSwapOption::empty(),
203            client,
204            consensus_client,
205            consensus_handler: Mutex::new(None),
206            consumer_monitor: ArcSwapOption::empty(),
207            consumer_monitor_sender,
208            running: Mutex::new(Running::False),
209            boot_counter: Mutex::new(0),
210            address_overrides: parking_lot::Mutex::new(AddressOverridesMap::new()),
211        }
212    }
213
214    pub async fn start(
215        &self,
216        node_config: &NodeConfig,
217        epoch_store: Arc<AuthorityPerEpochStore>,
218        consensus_handler_initializer: ConsensusHandlerInitializer,
219        tx_validator: SuiTxValidator,
220    ) {
221        let system_state = epoch_store.epoch_start_state();
222        let committee: Committee = system_state.get_consensus_committee();
223        let epoch = epoch_store.epoch();
224        let protocol_config = epoch_store.protocol_config();
225
226        // Ensure start() is not called twice.
227        let start_time = Instant::now();
228        let mut running = self.running.lock().await;
229        if let Running::True(running_epoch, running_version) = *running {
230            error!(
231                "Consensus is already Running for epoch {running_epoch:?} & protocol version {running_version:?} - shutdown first before starting",
232            );
233            return;
234        }
235        *running = Running::True(epoch, protocol_config.version);
236
237        info!(
238            "Starting up consensus for epoch {epoch:?} & protocol version {:?}",
239            protocol_config.version
240        );
241
242        self.consensus_client.set(self.client.clone());
243
244        let consensus_config = node_config
245            .consensus_config()
246            .expect("consensus_config should exist");
247
248        let parameters = Parameters {
249            db_path: self.get_store_path(epoch),
250            listen_address_override: consensus_config.listen_address.clone(),
251            ..consensus_config.parameters.clone().unwrap_or_default()
252        };
253
254        let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
255
256        let consensus_handler = consensus_handler_initializer.new_consensus_handler();
257
258        let num_prior_commits = protocol_config.consensus_num_requested_prior_commits_at_startup();
259        let last_processed_commit_index =
260            consensus_handler.last_processed_subdag_index() as CommitIndex;
261        let replay_after_commit_index =
262            last_processed_commit_index.saturating_sub(num_prior_commits);
263
264        let (commit_consumer, commit_receiver) =
265            CommitConsumerArgs::new(replay_after_commit_index, last_processed_commit_index);
266        let monitor = commit_consumer.monitor();
267
268        let node_role = epoch_store.node_role();
269
270        // Spin up the new Mysticeti consensus handler to listen for committed sub dags, before starting authority.
271        let handler = MysticetiConsensusHandler::new(
272            last_processed_commit_index,
273            consensus_handler,
274            commit_receiver,
275            monitor.clone(),
276            node_role.should_process_consensus_commits(),
277        );
278        let mut consensus_handler = self.consensus_handler.lock().await;
279        *consensus_handler = Some(handler);
280
281        // If there is a previous consumer monitor, it indicates that the consensus engine has been restarted, due to an epoch change. However, that on its
282        // own doesn't tell us much whether it participated on an active epoch or an old one. We need to check if it has handled any commits to determine this.
283        // If indeed any commits did happen, then we assume that node did participate on previous run.
284        let participated_on_previous_run =
285            if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
286                previous_monitor.highest_handled_commit() > 0
287            } else {
288                false
289            };
290
291        // Increment the boot counter only if the consensus successfully participated in the previous run.
292        // This is typical during normal epoch changes, where the node restarts as expected, and the boot counter is incremented to prevent amnesia recovery on the next start.
293        // If the node is recovering from a restore process and catching up across multiple epochs, it won't handle any commits until it reaches the last active epoch.
294        // In this scenario, we do not increment the boot counter, as we need amnesia recovery to run.
295        let mut boot_counter = self.boot_counter.lock().await;
296        if participated_on_previous_run {
297            *boot_counter += 1;
298        } else {
299            info!(
300                "Node has not participated in previous epoch consensus. Boot counter ({}) will not increment.",
301                *boot_counter
302            );
303        }
304
305        let authority = ConsensusAuthority::start(
306            NetworkType::Tonic,
307            epoch_store.epoch_start_config().epoch_start_timestamp_ms(),
308            committee.clone(),
309            parameters.clone(),
310            to_consensus_protocol_config(protocol_config, epoch_store.get_chain()),
311            self.protocol_keypair.clone(),
312            self.network_keypair.clone(),
313            Arc::new(Clock::default()),
314            Arc::new(tx_validator.clone()),
315            commit_consumer,
316            registry.clone(),
317            *boot_counter,
318        )
319        .await;
320        let client = authority.transaction_client();
321
322        let registry_id = self.registry_service.add(registry.clone());
323
324        let registered_authority = Arc::new((authority, registry_id));
325        self.authority.swap(Some(registered_authority.clone()));
326
327        // Reapply all stored address updates to the new consensus instance.
328        let highest_priority_addresses = self
329            .address_overrides
330            .lock()
331            .get_all_highest_priority_addresses();
332        for (network_pubkey, address) in highest_priority_addresses {
333            registered_authority
334                .0
335                .update_peer_address(network_pubkey, Some(address.clone()));
336        }
337
338        // Initialize the client to send transactions to this Mysticeti instance.
339        self.client.set(client);
340
341        // Send the consumer monitor to the replay waiter.
342        let _ = self.consumer_monitor_sender.send(monitor);
343
344        let elapsed = start_time.elapsed().as_secs_f64();
345        self.metrics.start_latency.set(elapsed as i64);
346
347        tracing::info!(
348            "Started consensus for epoch {} & protocol version {:?} completed - took {} seconds",
349            epoch,
350            protocol_config.version,
351            elapsed
352        );
353    }
354
355    pub async fn shutdown(&self) {
356        info!("Shutting down consensus ...");
357
358        // Ensure shutdown() is called on a running consensus and get the epoch/version info.
359        let start_time = Instant::now();
360        let mut running = self.running.lock().await;
361        let (shutdown_epoch, shutdown_version) = match *running {
362            Running::True(epoch, version) => {
363                tracing::info!(
364                    "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
365                );
366                *running = Running::False;
367                (epoch, version)
368            }
369            Running::False => {
370                error!("Consensus shutdown was called but consensus is not running");
371                return;
372            }
373        };
374
375        // Stop consensus submissions.
376        self.client.clear();
377
378        // swap with empty to ensure there is no other reference to authority and we can safely do Arc unwrap
379        let r = self.authority.swap(None).unwrap();
380        let Ok((authority, registry_id)) = Arc::try_unwrap(r) else {
381            panic!("Failed to retrieve the Mysticeti authority");
382        };
383
384        // shutdown the authority and wait for it
385        authority.stop().await;
386
387        // drop the old consensus handler to force stop any underlying task running.
388        let mut consensus_handler = self.consensus_handler.lock().await;
389        if let Some(mut handler) = consensus_handler.take() {
390            handler.abort().await;
391        }
392
393        // unregister the registry id
394        self.registry_service.remove(registry_id);
395
396        self.consensus_client.clear();
397
398        let elapsed = start_time.elapsed().as_secs_f64();
399        self.metrics.shutdown_latency.set(elapsed as i64);
400
401        tracing::info!(
402            "Consensus stopped for epoch {shutdown_epoch:?} & protocol version {shutdown_version:?} is complete - took {} seconds",
403            elapsed
404        );
405    }
406
407    pub async fn is_running(&self) -> bool {
408        let running = self.running.lock().await;
409        matches!(*running, Running::True(_, _))
410    }
411
412    pub fn replay_waiter(&self) -> ReplayWaiter {
413        let consumer_monitor_receiver = self.consumer_monitor_sender.subscribe();
414        ReplayWaiter::new(consumer_monitor_receiver)
415    }
416
417    pub fn get_storage_base_path(&self) -> PathBuf {
418        self.consensus_config.db_path().to_path_buf()
419    }
420
421    fn get_store_path(&self, epoch: EpochId) -> PathBuf {
422        let mut store_path = self.storage_base_path.clone();
423        store_path.push(format!("{}", epoch));
424        store_path
425    }
426}
427
428// Implementing the interface so we can update the consensus peer addresses when requested.
429impl ConsensusAddressUpdater for ConsensusManager {
430    fn update_address(
431        &self,
432        network_pubkey: NetworkPublicKey,
433        source: sui_network::endpoint_manager::AddressSource,
434        addresses: Vec<Multiaddr>,
435    ) -> SuiResult<()> {
436        // Convert to consensus network public key once
437        let network_pubkey = ConsensusNetworkPublicKey::new(network_pubkey.clone());
438
439        // Determine what address should be used after this update (if any)
440        let address_to_apply = {
441            let mut address_overrides = self.address_overrides.lock();
442
443            if addresses.is_empty() {
444                address_overrides.remove(network_pubkey.clone(), source);
445            } else {
446                address_overrides.insert(network_pubkey.clone(), source, addresses.clone());
447            }
448
449            address_overrides.get_highest_priority_address(network_pubkey.clone())
450        };
451
452        // Apply the update to running consensus if it exists
453        if let Some(authority) = self.authority.load_full() {
454            authority
455                .0
456                .update_peer_address(network_pubkey, address_to_apply);
457            Ok(())
458        } else {
459            info!(
460                "Consensus authority node is not running, address update persisted for peer {:?} from source {:?} and will be applied on next start",
461                network_pubkey, source
462            );
463            Err(SuiErrorKind::GenericAuthorityError {
464                error: "Consensus authority node is not running. Can not apply address update"
465                    .to_string(),
466            }
467            .into())
468        }
469    }
470}
471
472/// A ConsensusClient that can be updated internally at any time. This usually happening during epoch
473/// change where a client is set after the new consensus is started for the new epoch.
474#[derive(Default)]
475pub struct UpdatableConsensusClient {
476    // An extra layer of Arc<> is needed as required by ArcSwapAny.
477    client: ArcSwapOption<Arc<dyn ConsensusClient>>,
478}
479
480impl UpdatableConsensusClient {
481    pub fn new() -> Self {
482        Self {
483            client: ArcSwapOption::empty(),
484        }
485    }
486
487    async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
488        const START_TIMEOUT: Duration = Duration::from_secs(300);
489        const RETRY_INTERVAL: Duration = Duration::from_millis(100);
490        if let Ok(client) = timeout(START_TIMEOUT, async {
491            loop {
492                let Some(client) = self.client.load_full() else {
493                    sleep(RETRY_INTERVAL).await;
494                    continue;
495                };
496                return client;
497            }
498        })
499        .await
500        {
501            return client;
502        }
503
504        panic!(
505            "Timed out after {:?} waiting for Consensus to start!",
506            START_TIMEOUT,
507        );
508    }
509
510    pub fn set(&self, client: Arc<dyn ConsensusClient>) {
511        self.client.store(Some(Arc::new(client)));
512    }
513
514    pub fn clear(&self) {
515        self.client.store(None);
516    }
517}
518
519#[async_trait]
520impl ConsensusClient for UpdatableConsensusClient {
521    async fn submit(
522        &self,
523        transactions: &[ConsensusTransaction],
524        epoch_store: &Arc<AuthorityPerEpochStore>,
525    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
526        let client = self.get().await;
527        client.submit(transactions, epoch_store).await
528    }
529}
530
531/// Waits for consensus to finish replaying at consensus handler.
532pub struct ReplayWaiter {
533    consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
534}
535
536impl ReplayWaiter {
537    pub(crate) fn new(
538        consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
539    ) -> Self {
540        Self {
541            consumer_monitor_receiver,
542        }
543    }
544
545    pub(crate) async fn wait_for_replay(mut self) {
546        loop {
547            info!("Waiting for consensus to start replaying ...");
548            let Ok(monitor) = self.consumer_monitor_receiver.recv().await else {
549                continue;
550            };
551            info!("Waiting for consensus handler to finish replaying ...");
552            monitor
553                .replay_to_consumer_last_processed_commit_complete()
554                .await;
555            break;
556        }
557    }
558}
559
560impl Clone for ReplayWaiter {
561    fn clone(&self) -> Self {
562        Self {
563            consumer_monitor_receiver: self.consumer_monitor_receiver.resubscribe(),
564        }
565    }
566}
567
568pub struct ConsensusManagerMetrics {
569    start_latency: IntGauge,
570    shutdown_latency: IntGauge,
571}
572
573impl ConsensusManagerMetrics {
574    pub fn new(registry: &Registry) -> Self {
575        Self {
576            start_latency: register_int_gauge_with_registry!(
577                "consensus_manager_start_latency",
578                "The latency of starting up consensus nodes",
579                registry,
580            )
581            .unwrap(),
582            shutdown_latency: register_int_gauge_with_registry!(
583                "consensus_manager_shutdown_latency",
584                "The latency of shutting down consensus nodes",
585                registry,
586            )
587            .unwrap(),
588        }
589    }
590}