sui_core/consensus_manager/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
4use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient};
5use crate::consensus_handler::{
6    ConsensusBlockHandler, ConsensusHandlerInitializer, MysticetiConsensusHandler,
7};
8use crate::consensus_validator::SuiTxValidator;
9use crate::mysticeti_adapter::LazyMysticetiClient;
10use arc_swap::ArcSwapOption;
11use async_trait::async_trait;
12use consensus_config::{Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
13use consensus_core::{
14    Clock, CommitConsumerArgs, CommitConsumerMonitor, CommitIndex, ConsensusAuthority,
15};
16use core::panic;
17use fastcrypto::traits::KeyPair as _;
18use mysten_metrics::{RegistryID, RegistryService};
19use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
20use std::path::PathBuf;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use sui_config::{ConsensusConfig, NodeConfig};
24use sui_protocol_config::{ConsensusNetwork, ProtocolVersion};
25use sui_types::error::SuiResult;
26use sui_types::messages_consensus::{ConsensusPosition, ConsensusTransaction};
27use sui_types::{
28    committee::EpochId, sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
29};
30use tokio::sync::{Mutex, broadcast};
31use tokio::time::{sleep, timeout};
32use tracing::{error, info};
33
34#[cfg(test)]
35#[path = "../unit_tests/consensus_manager_tests.rs"]
36pub mod consensus_manager_tests;
37
38#[derive(PartialEq)]
39enum Running {
40    True(EpochId, ProtocolVersion),
41    False,
42}
43
44/// Used by Sui validator to start consensus protocol for each epoch.
45pub struct ConsensusManager {
46    consensus_config: ConsensusConfig,
47    protocol_keypair: ProtocolKeyPair,
48    network_keypair: NetworkKeyPair,
49    storage_base_path: PathBuf,
50    metrics: Arc<ConsensusManagerMetrics>,
51    registry_service: RegistryService,
52    authority: ArcSwapOption<(ConsensusAuthority, RegistryID)>,
53
54    // Use a shared lazy Mysticeti client so we can update the internal Mysticeti
55    // client that gets created for every new epoch.
56    client: Arc<LazyMysticetiClient>,
57    consensus_client: Arc<UpdatableConsensusClient>,
58
59    consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
60
61    #[cfg(test)]
62    pub(crate) consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
63    #[cfg(not(test))]
64    consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
65    consumer_monitor_sender: broadcast::Sender<Arc<CommitConsumerMonitor>>,
66
67    running: Mutex<Running>,
68
69    #[cfg(test)]
70    pub(crate) boot_counter: Mutex<u64>,
71    #[cfg(not(test))]
72    boot_counter: Mutex<u64>,
73}
74
75impl ConsensusManager {
76    pub fn new(
77        node_config: &NodeConfig,
78        consensus_config: &ConsensusConfig,
79        registry_service: &RegistryService,
80        consensus_client: Arc<UpdatableConsensusClient>,
81    ) -> Self {
82        let metrics = Arc::new(ConsensusManagerMetrics::new(
83            &registry_service.default_registry(),
84        ));
85        let client = Arc::new(LazyMysticetiClient::new());
86        let (consumer_monitor_sender, _) = broadcast::channel(1);
87        Self {
88            consensus_config: consensus_config.clone(),
89            protocol_keypair: ProtocolKeyPair::new(node_config.worker_key_pair().copy()),
90            network_keypair: NetworkKeyPair::new(node_config.network_key_pair().copy()),
91            storage_base_path: consensus_config.db_path().to_path_buf(),
92            metrics,
93            registry_service: registry_service.clone(),
94            authority: ArcSwapOption::empty(),
95            client,
96            consensus_client,
97            consensus_handler: Mutex::new(None),
98            consumer_monitor: ArcSwapOption::empty(),
99            consumer_monitor_sender,
100            running: Mutex::new(Running::False),
101            boot_counter: Mutex::new(0),
102        }
103    }
104
105    pub async fn start(
106        &self,
107        node_config: &NodeConfig,
108        epoch_store: Arc<AuthorityPerEpochStore>,
109        consensus_handler_initializer: ConsensusHandlerInitializer,
110        tx_validator: SuiTxValidator,
111    ) {
112        let system_state = epoch_store.epoch_start_state();
113        let committee: Committee = system_state.get_consensus_committee();
114        let epoch = epoch_store.epoch();
115        let protocol_config = epoch_store.protocol_config();
116        let network_type = self.pick_network(&epoch_store);
117
118        // Ensure start() is not called twice.
119        let start_time = Instant::now();
120        let mut running = self.running.lock().await;
121        if let Running::True(running_epoch, running_version) = *running {
122            error!(
123                "Consensus is already Running for epoch {running_epoch:?} & protocol version {running_version:?} - shutdown first before starting",
124            );
125            return;
126        }
127        *running = Running::True(epoch, protocol_config.version);
128
129        info!(
130            "Starting up consensus for epoch {epoch:?} & protocol version {:?}",
131            protocol_config.version
132        );
133
134        self.consensus_client.set(self.client.clone());
135
136        let consensus_config = node_config
137            .consensus_config()
138            .expect("consensus_config should exist");
139
140        let parameters = Parameters {
141            db_path: self.get_store_path(epoch),
142            ..consensus_config.parameters.clone().unwrap_or_default()
143        };
144
145        let own_protocol_key = self.protocol_keypair.public();
146        let (own_index, _) = committee
147            .authorities()
148            .find(|(_, a)| a.protocol_key == own_protocol_key)
149            .expect("Own authority should be among the consensus authorities!");
150
151        let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap();
152
153        let consensus_handler = consensus_handler_initializer.new_consensus_handler();
154
155        let num_prior_commits = protocol_config.consensus_num_requested_prior_commits_at_startup();
156        let last_processed_commit_index =
157            consensus_handler.last_processed_subdag_index() as CommitIndex;
158        let replay_after_commit_index =
159            last_processed_commit_index.saturating_sub(num_prior_commits);
160
161        let (commit_consumer, commit_receiver, block_receiver) =
162            CommitConsumerArgs::new(replay_after_commit_index, last_processed_commit_index);
163        let monitor = commit_consumer.monitor();
164
165        // Spin up the new Mysticeti consensus handler to listen for committed sub dags, before starting authority.
166        let consensus_block_handler = ConsensusBlockHandler::new(
167            epoch_store.clone(),
168            consensus_handler.execution_scheduler_sender().clone(),
169            consensus_handler_initializer.backpressure_subscriber(),
170            consensus_handler_initializer.metrics().clone(),
171        );
172        let handler = MysticetiConsensusHandler::new(
173            last_processed_commit_index,
174            consensus_handler,
175            consensus_block_handler,
176            commit_receiver,
177            block_receiver,
178            monitor.clone(),
179        );
180        let mut consensus_handler = self.consensus_handler.lock().await;
181        *consensus_handler = Some(handler);
182
183        // If there is a previous consumer monitor, it indicates that the consensus engine has been restarted, due to an epoch change. However, that on its
184        // own doesn't tell us much whether it participated on an active epoch or an old one. We need to check if it has handled any commits to determine this.
185        // If indeed any commits did happen, then we assume that node did participate on previous run.
186        let participated_on_previous_run =
187            if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
188                previous_monitor.highest_handled_commit() > 0
189            } else {
190                false
191            };
192
193        // Increment the boot counter only if the consensus successfully participated in the previous run.
194        // This is typical during normal epoch changes, where the node restarts as expected, and the boot counter is incremented to prevent amnesia recovery on the next start.
195        // If the node is recovering from a restore process and catching up across multiple epochs, it won't handle any commits until it reaches the last active epoch.
196        // In this scenario, we do not increment the boot counter, as we need amnesia recovery to run.
197        let mut boot_counter = self.boot_counter.lock().await;
198        if participated_on_previous_run {
199            *boot_counter += 1;
200        } else {
201            info!(
202                "Node has not participated in previous epoch consensus. Boot counter ({}) will not increment.",
203                *boot_counter
204            );
205        }
206
207        let authority = ConsensusAuthority::start(
208            network_type,
209            epoch_store.epoch_start_config().epoch_start_timestamp_ms(),
210            own_index,
211            committee.clone(),
212            parameters.clone(),
213            protocol_config.clone(),
214            self.protocol_keypair.clone(),
215            self.network_keypair.clone(),
216            Arc::new(Clock::default()),
217            Arc::new(tx_validator.clone()),
218            commit_consumer,
219            registry.clone(),
220            *boot_counter,
221        )
222        .await;
223        let client = authority.transaction_client();
224
225        let registry_id = self.registry_service.add(registry.clone());
226
227        let registered_authority = Arc::new((authority, registry_id));
228        self.authority.swap(Some(registered_authority.clone()));
229
230        // Initialize the client to send transactions to this Mysticeti instance.
231        self.client.set(client);
232
233        // Send the consumer monitor to the replay waiter.
234        let _ = self.consumer_monitor_sender.send(monitor);
235
236        let elapsed = start_time.elapsed().as_secs_f64();
237        self.metrics.start_latency.set(elapsed as i64);
238
239        tracing::info!(
240            "Started consensus for epoch {} & protocol version {:?} completed - took {} seconds",
241            epoch,
242            protocol_config.version,
243            elapsed
244        );
245    }
246
247    pub async fn shutdown(&self) {
248        info!("Shutting down consensus ...");
249
250        // Ensure shutdown() is called on a running consensus and get the epoch/version info.
251        let start_time = Instant::now();
252        let mut running = self.running.lock().await;
253        let (shutdown_epoch, shutdown_version) = match *running {
254            Running::True(epoch, version) => {
255                tracing::info!(
256                    "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}"
257                );
258                *running = Running::False;
259                (epoch, version)
260            }
261            Running::False => {
262                error!("Consensus shutdown was called but consensus is not running");
263                return;
264            }
265        };
266
267        // Stop consensus submissions.
268        self.client.clear();
269
270        // swap with empty to ensure there is no other reference to authority and we can safely do Arc unwrap
271        let r = self.authority.swap(None).unwrap();
272        let Ok((authority, registry_id)) = Arc::try_unwrap(r) else {
273            panic!("Failed to retrieve the Mysticeti authority");
274        };
275
276        // shutdown the authority and wait for it
277        authority.stop().await;
278
279        // drop the old consensus handler to force stop any underlying task running.
280        let mut consensus_handler = self.consensus_handler.lock().await;
281        if let Some(mut handler) = consensus_handler.take() {
282            handler.abort().await;
283        }
284
285        // unregister the registry id
286        self.registry_service.remove(registry_id);
287
288        self.consensus_client.clear();
289
290        let elapsed = start_time.elapsed().as_secs_f64();
291        self.metrics.shutdown_latency.set(elapsed as i64);
292
293        tracing::info!(
294            "Consensus stopped for epoch {shutdown_epoch:?} & protocol version {shutdown_version:?} is complete - took {} seconds",
295            elapsed
296        );
297    }
298
299    pub async fn is_running(&self) -> bool {
300        let running = self.running.lock().await;
301        matches!(*running, Running::True(_, _))
302    }
303
304    pub fn replay_waiter(&self) -> ReplayWaiter {
305        let consumer_monitor_receiver = self.consumer_monitor_sender.subscribe();
306        ReplayWaiter::new(consumer_monitor_receiver)
307    }
308
309    pub fn get_storage_base_path(&self) -> PathBuf {
310        self.consensus_config.db_path().to_path_buf()
311    }
312
313    fn get_store_path(&self, epoch: EpochId) -> PathBuf {
314        let mut store_path = self.storage_base_path.clone();
315        store_path.push(format!("{}", epoch));
316        store_path
317    }
318
319    fn pick_network(&self, epoch_store: &AuthorityPerEpochStore) -> ConsensusNetwork {
320        if let Ok(type_str) = std::env::var("CONSENSUS_NETWORK") {
321            match type_str.to_lowercase().as_str() {
322                "anemo" => return ConsensusNetwork::Anemo,
323                "tonic" => return ConsensusNetwork::Tonic,
324                _ => {
325                    info!(
326                        "Invalid consensus network type {} in env var. Continue to use the value from protocol config.",
327                        type_str
328                    );
329                }
330            }
331        }
332        epoch_store.protocol_config().consensus_network()
333    }
334}
335
336/// A ConsensusClient that can be updated internally at any time. This usually happening during epoch
337/// change where a client is set after the new consensus is started for the new epoch.
338#[derive(Default)]
339pub struct UpdatableConsensusClient {
340    // An extra layer of Arc<> is needed as required by ArcSwapAny.
341    client: ArcSwapOption<Arc<dyn ConsensusClient>>,
342}
343
344impl UpdatableConsensusClient {
345    pub fn new() -> Self {
346        Self {
347            client: ArcSwapOption::empty(),
348        }
349    }
350
351    async fn get(&self) -> Arc<Arc<dyn ConsensusClient>> {
352        const START_TIMEOUT: Duration = Duration::from_secs(300);
353        const RETRY_INTERVAL: Duration = Duration::from_millis(100);
354        if let Ok(client) = timeout(START_TIMEOUT, async {
355            loop {
356                let Some(client) = self.client.load_full() else {
357                    sleep(RETRY_INTERVAL).await;
358                    continue;
359                };
360                return client;
361            }
362        })
363        .await
364        {
365            return client;
366        }
367
368        panic!(
369            "Timed out after {:?} waiting for Consensus to start!",
370            START_TIMEOUT,
371        );
372    }
373
374    pub fn set(&self, client: Arc<dyn ConsensusClient>) {
375        self.client.store(Some(Arc::new(client)));
376    }
377
378    pub fn clear(&self) {
379        self.client.store(None);
380    }
381}
382
383#[async_trait]
384impl ConsensusClient for UpdatableConsensusClient {
385    async fn submit(
386        &self,
387        transactions: &[ConsensusTransaction],
388        epoch_store: &Arc<AuthorityPerEpochStore>,
389    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
390        let client = self.get().await;
391        client.submit(transactions, epoch_store).await
392    }
393}
394
395/// Waits for consensus to finish replaying at consensus handler.
396pub struct ReplayWaiter {
397    consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
398}
399
400impl ReplayWaiter {
401    pub(crate) fn new(
402        consumer_monitor_receiver: broadcast::Receiver<Arc<CommitConsumerMonitor>>,
403    ) -> Self {
404        Self {
405            consumer_monitor_receiver,
406        }
407    }
408
409    pub(crate) async fn wait_for_replay(mut self) {
410        loop {
411            info!("Waiting for consensus to start replaying ...");
412            let Ok(monitor) = self.consumer_monitor_receiver.recv().await else {
413                continue;
414            };
415            info!("Waiting for consensus handler to finish replaying ...");
416            monitor
417                .replay_to_consumer_last_processed_commit_complete()
418                .await;
419            break;
420        }
421    }
422}
423
424impl Clone for ReplayWaiter {
425    fn clone(&self) -> Self {
426        Self {
427            consumer_monitor_receiver: self.consumer_monitor_receiver.resubscribe(),
428        }
429    }
430}
431
432pub struct ConsensusManagerMetrics {
433    start_latency: IntGauge,
434    shutdown_latency: IntGauge,
435}
436
437impl ConsensusManagerMetrics {
438    pub fn new(registry: &Registry) -> Self {
439        Self {
440            start_latency: register_int_gauge_with_registry!(
441                "consensus_manager_start_latency",
442                "The latency of starting up consensus nodes",
443                registry,
444            )
445            .unwrap(),
446            shutdown_latency: register_int_gauge_with_registry!(
447                "consensus_manager_shutdown_latency",
448                "The latency of shutting down consensus nodes",
449                registry,
450            )
451            .unwrap(),
452        }
453    }
454}